In [None]:
import os
import pandas as pd
import numpy as np
import torch

import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

import torchaudio
from transformers import RobertaTokenizer, TFRobertaModel
from transformers import TFXLMRobertaModel, XLMRobertaTokenizer

from tqdm import tqdm

### Text data

In [None]:
#XLSM-Roberta-example
roberta_model = TFXLMRobertaModel.from_pretrained("xlm-roberta-large")
tokenizer = XLMRobertaTokenizer.from_pretrained("xlm-roberta-large")

In [None]:
#Roberta-example
roberta_model = TFRobertaModel.from_pretrained("roberta-large")
tokenizer = RobertaTokenizer.from_pretrained("roberta-large") 

In [None]:
data = pd.read_csv('all_data.csv')
data

In [None]:
token_counts = data['negative'].apply(lambda x: len(tokenizer.tokenize(x)))

print("Minimum token count:", token_counts.min())
print("Maximum token count:", token_counts.max())
print("Mean token count:", token_counts.mean())
print("Median token count:", token_counts.median())
print("Standard deviation of token count:", token_counts.std())

In [None]:
token_counts = data['neutral'].apply(lambda x: len(tokenizer.tokenize(x)))

print("Minimum token count:", token_counts.min())
print("Maximum token count:", token_counts.max())
print("Mean token count:", token_counts.mean())
print("Median token count:", token_counts.median())
print("Standard deviation of token count:", token_counts.std())

In [None]:
token_counts = data['positive'].apply(lambda x: len(tokenizer.tokenize(x)))

print("Minimum token count:", token_counts.min())
print("Maximum token count:", token_counts.max())
print("Mean token count:", token_counts.mean())
print("Median token count:", token_counts.median())
print("Standard deviation of token count:", token_counts.std())

In [None]:
def neg_extract_text_features(texts):
    input_ids = tokenizer(texts, padding=True, truncation=True, max_length=neg_max_length, return_tensors="tf")["input_ids"]
    return input_ids

def nue_extract_text_features(texts):
    input_ids = tokenizer(texts, padding=True, truncation=True, max_length=neu_max_length, return_tensors="tf")["input_ids"]
    return input_ids

def pos_extract_text_features(texts):
    input_ids = tokenizer(texts, padding=True, truncation=True, max_length=pos_max_length, return_tensors="tf")["input_ids"]
    return input_ids

def load_data(csv_file):
    df = pd.read_csv(csv_file)
    neg_txt = [str(item) for item in df['negative']]
    neu_txt = [str(item) for item in df['neutral']]
    pos_txt = [str(item) for item in df['positive']]
    
    neg_features = neg_extract_text_features(neg_txt)
    neu_features = nue_extract_text_features(neu_txt)
    pos_features = pos_extract_text_features(pos_txt)
    
    features = np.concatenate([neg_features, neu_features, pos_features], axis=1)
    labels = df['labels'].values
    
    return features, labels

train_text_features, train_labels = load_data('train_data.csv')
test_text_features, test_labels = load_data('test_data.csv')

### Audio data

In [None]:
import torchaudio.transforms as T

melspectrogram = T.MelSpectrogram(sample_rate=16000, n_fft=400, hop_length=160, n_mels=23)
resampler = T.Resample(orig_freq=48000, new_freq=16000)

def extract_features(file_paths, max_len=None):
    features = []
   
    for file_path in tqdm(file_paths, desc="Extracting features"):
        speech,_ = torchaudio.load(file_path)
        speech = resampler(speech)
        
        if speech.shape[0] == 2:
            speech = speech.mean(dim=0, keepdim=True)
            
        speech = speech.squeeze().numpy()
        
        spectrogram = melspectrogram(torch.from_numpy(speech))
        spectrogram = spectrogram.numpy()
        
        if max_len is not None:
            if spectrogram.shape[1] < max_len:
                padding = np.zeros((spectrogram.shape[0], max_len - spectrogram.shape[1]))
                spectrogram = np.concatenate((spectrogram, padding), axis=1)
                
            elif spectrogram.shape[1] > max_len:
                spectrogram = spectrogram[:, :max_len]
                
        features.append(spectrogram.squeeze())
        
    return features

In [None]:
def wav_load_data(csv_file):
    df = pd.read_csv(csv_file)
    neg_paths, neu_paths, pos_paths = [], [], []
    
    for _, row in df.iterrows():
        neg_paths.append(os.path.join(row['negative_Wav']).replace('\\','/'))
        neu_paths.append(os.path.join(row['neutral_Wav']).replace('\\','/'))
        pos_paths.append(os.path.join(row['positive_Wav']).replace('\\','/'))
        
    return neg_paths, neu_paths, pos_paths

In [None]:
neg_train_paths, neu_train_paths, pos_train_paths = wav_load_data('train_data.csv')
neg_test_paths, neu_test_paths, pos_test_paths = wav_load_data('test_data.csv')

In [None]:
def pad_sequences(sequences, max_len=None, padding_value=0.0):
    if max_len is None:
        max_len = max(seq.shape[1] for seq in sequences)
        
    padded_sequences = []
    for seq in sequences:
        if seq.shape[1] < max_len:
            padding = np.full((seq.shape[0], max_len - seq.shape[1]), padding_value)
            seq_padded = np.concatenate((seq, padding), axis=1)
            
        else:
            seq_padded = seq[:, :max_len]
        padded_sequences.append(seq_padded)
        
    return np.array(padded_sequences)

In [None]:
all_file_paths = neg_train_paths + neu_train_paths + pos_train_paths + neg_test_paths + neu_test_paths + pos_test_paths
all_features = extract_features(all_file_paths)

In [None]:
median_len = int(np.median([feat.shape[1] for feat in all_features]))
median_len

In [None]:
train_neg_features = pad_sequences(extract_features(neg_train_paths, median_len))
train_neu_features = pad_sequences(extract_features(neu_train_paths, median_len))
train_pos_features = pad_sequences(extract_features(pos_train_paths, median_len))

In [None]:
train_audio_features = np.concatenate([train_neg_features, train_neu_features, train_pos_features], axis=-1)

In [None]:
test_neg_features = pad_sequences(extract_features(neg_test_paths, median_len))
test_neu_features = pad_sequences(extract_features(neu_test_paths, median_len))
test_pos_features = pad_sequences(extract_features(pos_test_paths, median_len))

In [None]:
test_audio_features = np.concatenate([test_neg_features, test_neu_features, test_pos_features], axis=-1)