### Path

e.g. './EATD-Corpus'

In [26]:
import os
import pandas as pd
import re
import numpy as np
from tqdm import tqdm

In [27]:
# Function to extract the numeric part
def extract_number(folder_name):
    match = re.search(r'\d+', folder_name)
    if match:
        return int(match.group())
    return 0

# Retrieve the list of folders and sort them based on their numeric parts
folders = os.listdir('raw-data')
sorted_folders = sorted(folders, key=extract_number)

In [28]:
# Initialize an empty DataFrame
df = pd.DataFrame(columns=['folder', 'negative', 'neutral', 'positive', 'new_label'])

# Traverse through all folders in the 'raw-data' directory with tqdm progress bar
for folder in tqdm(sorted_folders, desc="Processing folders"):
    folder_path = os.path.join('raw-data', folder)

    # Process only if it is a folder
    if os.path.isdir(folder_path):

        # Dictionary to store the content of each text file
        data = {'folder': folder}

        # Read required text files and store their content in the dictionary
        for txt_file in ['negative.txt', 'neutral.txt', 'positive.txt', 'new_label.txt']:
            txt_path = os.path.join(folder_path, txt_file)

            if os.path.exists(txt_path):
                with open(txt_path, 'r', encoding='utf-8') as f:
                    content = f.read().strip()
                data[txt_file[:-4].lower()] = content  # Remove the file extension and save the name in lowercase
            else:
                data[txt_file[:-4].lower()] = None  # Set to None if the file does not exist

        # Collect paths to wav files
        for wav_file in ['negative.wav', 'neutral.wav', 'positive.wav']:
            wav_path = os.path.join(folder_path, wav_file)
            if os.path.exists(wav_path):
                data[wav_file[:-4] + '_Wav'] = os.path.abspath(wav_path)
            else:
                data[wav_file[:-4] + '_Wav'] = None

        # Convert the dictionary to a DataFrame and concatenate it with the existing DataFrame
        df = pd.concat([df, pd.DataFrame([data])], ignore_index=True)

Processing folders: 100%|██████████| 125/125 [00:00<00:00, 966.04it/s]


In [29]:
df.columns

Index(['folder', 'negative', 'neutral', 'positive', 'new_label',
       'negative_Wav', 'neutral_Wav', 'positive_Wav'],
      dtype='object')

In [30]:
df['new_label'] = df['new_label'].astype(float)

In [31]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 124 entries, 0 to 123
Data columns (total 8 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   folder        124 non-null    object 
 1   negative      124 non-null    object 
 2   neutral       124 non-null    object 
 3   positive      124 non-null    object 
 4   new_label     124 non-null    float64
 5   negative_Wav  124 non-null    object 
 6   neutral_Wav   124 non-null    object 
 7   positive_Wav  124 non-null    object 
dtypes: float64(1), object(7)
memory usage: 7.9+ KB


In [32]:
df['labels'] = np.where(df['new_label'] >= 53.0, 1, 0)

In [33]:
df['labels'].value_counts()

Unnamed: 0_level_0,count
labels,Unnamed: 1_level_1
0,98
1,26


In [34]:
df.to_csv('all_data.csv', index=False)

In [35]:
import os
import pandas as pd
import numpy as np
import torch

import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

import torchaudio
from transformers import RobertaTokenizer, TFRobertaModel
from transformers import TFXLMRobertaModel, XLMRobertaTokenizer

from tqdm import tqdm

Text data

In [36]:
#XLSM-Roberta-example
roberta_model = TFXLMRobertaModel.from_pretrained("xlm-roberta-large")
tokenizer = XLMRobertaTokenizer.from_pretrained("xlm-roberta-large")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/616 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/2.24G [00:00<?, ?B/s]

KeyboardInterrupt: 

In [None]:
#Roberta-example
roberta_model = TFRobertaModel.from_pretrained("roberta-large")
tokenizer = RobertaTokenizer.from_pretrained("roberta-large")

In [None]:
data = pd.read_csv('all_data.csv')
data

In [None]:
token_counts = data['negative'].apply(lambda x: len(tokenizer.tokenize(x)))

print("Minimum token count:", token_counts.min())
print("Maximum token count:", token_counts.max())
print("Mean token count:", token_counts.mean())
print("Median token count:", token_counts.median())
print("Standard deviation of token count:", token_counts.std())

In [None]:
token_counts = data['neutral'].apply(lambda x: len(tokenizer.tokenize(x)))

print("Minimum token count:", token_counts.min())
print("Maximum token count:", token_counts.max())
print("Mean token count:", token_counts.mean())
print("Median token count:", token_counts.median())
print("Standard deviation of token count:", token_counts.std())

In [None]:
token_counts = data['positive'].apply(lambda x: len(tokenizer.tokenize(x)))

print("Minimum token count:", token_counts.min())
print("Maximum token count:", token_counts.max())
print("Mean token count:", token_counts.mean())
print("Median token count:", token_counts.median())
print("Standard deviation of token count:", token_counts.std())

In [None]:
def neg_extract_text_features(texts):
    input_ids = tokenizer(texts, padding=True, truncation=True, max_length=neg_max_length, return_tensors="tf")["input_ids"]
    return input_ids

def nue_extract_text_features(texts):
    input_ids = tokenizer(texts, padding=True, truncation=True, max_length=neu_max_length, return_tensors="tf")["input_ids"]
    return input_ids

def pos_extract_text_features(texts):
    input_ids = tokenizer(texts, padding=True, truncation=True, max_length=pos_max_length, return_tensors="tf")["input_ids"]
    return input_ids

def load_data(csv_file):
    df = pd.read_csv(csv_file)
    neg_txt = [str(item) for item in df['negative']]
    neu_txt = [str(item) for item in df['neutral']]
    pos_txt = [str(item) for item in df['positive']]

    neg_features = neg_extract_text_features(neg_txt)
    neu_features = nue_extract_text_features(neu_txt)
    pos_features = pos_extract_text_features(pos_txt)

    features = np.concatenate([neg_features, neu_features, pos_features], axis=1)
    labels = df['labels'].values

    return features, labels

train_text_features, train_labels = load_data('train_data.csv')
test_text_features, test_labels = load_data('test_data.csv')

Audio data

In [None]:
import torchaudio.transforms as T

melspectrogram = T.MelSpectrogram(sample_rate=16000, n_fft=400, hop_length=160, n_mels=23)
resampler = T.Resample(orig_freq=48000, new_freq=16000)

def extract_features(file_paths, max_len=None):
    features = []

    for file_path in tqdm(file_paths, desc="Extracting features"):
        speech,_ = torchaudio.load(file_path)
        speech = resampler(speech)

        if speech.shape[0] == 2:
            speech = speech.mean(dim=0, keepdim=True)

        speech = speech.squeeze().numpy()

        spectrogram = melspectrogram(torch.from_numpy(speech))
        spectrogram = spectrogram.numpy()

        if max_len is not None:
            if spectrogram.shape[1] < max_len:
                padding = np.zeros((spectrogram.shape[0], max_len - spectrogram.shape[1]))
                spectrogram = np.concatenate((spectrogram, padding), axis=1)

            elif spectrogram.shape[1] > max_len:
                spectrogram = spectrogram[:, :max_len]

        features.append(spectrogram.squeeze())

    return features

In [None]:
def wav_load_data(csv_file):
    df = pd.read_csv(csv_file)
    neg_paths, neu_paths, pos_paths = [], [], []

    for _, row in df.iterrows():
        neg_paths.append(os.path.join(row['negative_Wav']).replace('\\','/'))
        neu_paths.append(os.path.join(row['neutral_Wav']).replace('\\','/'))
        pos_paths.append(os.path.join(row['positive_Wav']).replace('\\','/'))

    return neg_paths, neu_paths, pos_paths

In [None]:
neg_train_paths, neu_train_paths, pos_train_paths = wav_load_data('train_data.csv')
neg_test_paths, neu_test_paths, pos_test_paths = wav_load_data('test_data.csv')

In [None]:
def pad_sequences(sequences, max_len=None, padding_value=0.0):
    if max_len is None:
        max_len = max(seq.shape[1] for seq in sequences)

    padded_sequences = []
    for seq in sequences:
        if seq.shape[1] < max_len:
            padding = np.full((seq.shape[0], max_len - seq.shape[1]), padding_value)
            seq_padded = np.concatenate((seq, padding), axis=1)

        else:
            seq_padded = seq[:, :max_len]
        padded_sequences.append(seq_padded)

    return np.array(padded_sequences)

In [None]:
all_file_paths = neg_train_paths + neu_train_paths + pos_train_paths + neg_test_paths + neu_test_paths + pos_test_paths
all_features = extract_features(all_file_paths)

In [None]:
median_len = int(np.median([feat.shape[1] for feat in all_features]))
median_len

In [None]:
train_neg_features = pad_sequences(extract_features(neg_train_paths, median_len))
train_neu_features = pad_sequences(extract_features(neu_train_paths, median_len))
train_pos_features = pad_sequences(extract_features(pos_train_paths, median_len))

In [None]:
train_audio_features = np.concatenate([train_neg_features, train_neu_features, train_pos_features], axis=-1)

In [None]:
test_neg_features = pad_sequences(extract_features(neg_test_paths, median_len))
test_neu_features = pad_sequences(extract_features(neu_test_paths, median_len))
test_pos_features = pad_sequences(extract_features(pos_test_paths, median_len))

In [None]:
test_audio_features = np.concatenate([test_neg_features, test_neu_features, test_pos_features], axis=-1)