In [None]:
import pandas as pd
import numpy as np
import os
import seaborn as sns
import matplotlib.pyplot as plt
import librosa
import librosa.display
import io
import soundfile as sf
from IPython.display import Audio

import torchaudio
import torch
import torch.nn as nn
from torch.nn import CrossEntropyLoss
from sklearn.model_selection import train_test_split, StratifiedKFold

from torch.utils.data import Dataset, DataLoader
from transformers import Trainer, TrainingArguments, Wav2Vec2ForSequenceClassification, Wav2Vec2Model, Wav2Vec2PreTrainedModel, AutoProcessor, AutoModelForCTC
from transformers.modeling_outputs import SequenceClassifierOutput
import random

import gc
import warnings
warnings.filterwarnings("ignore")

num_of_folds = 3

# Extracting Files

## Custom Dataset

In [None]:
label = []
path = []
for dir_name, _, file_list in os.walk("/kaggle/input/elderly-speech-emotion-recognition"):
    
    for file in file_list:
        # print(file)
        label.append(dir_name.split('_')[-1].lower())
        path.append(os.path.join(dir_name, file))
df = pd.DataFrame()
df["path"] = path
df["label"] = label

## YueMotion Dataset

In [None]:
from datasets import load_dataset

yue_ds = load_dataset("CAiRE/YueMotion")
yue_df_list = []
for i in yue_ds:
    yue_df_list.append(yue_ds[f"{i}"].to_pandas())
yue_df = pd.concat(yue_df_list)



# Loading the Audios

In [None]:
def bytes_to_audio(bytes):
    bytes = bytes["bytes"]
    return sf.read(io.BytesIO(bytes))[0]
    # print(bytes)
    
yue_df["audio"] = yue_df["audio"].map(bytes_to_audio)
df_audio = []
for i in range(len(df)):
    df_audio.append(librosa.load(df["path"].iloc[i], sr=16000)[0])
df.insert(loc=len(df.keys()), column="audio", value=df_audio)
df.insert(loc=len(df.keys()), column="group", value=["elderly"]*len(df))

In [None]:
# Splitting Audios into Adult and Elderly

In [None]:
def classify(x):
    if x>54:
        return "elderly"
    else:
        return "adult"
# yue_df = yue_df.drop(yue_df[yue_df.age<54].index)
yue_df.insert(loc=len(yue_df.keys()), column="group", value=[classify(yue_df["age"].iloc[x]) for x in range(len(yue_df))])
yue_df = yue_df.drop(columns=["split", "path", "age", "speaker_id", "gender", "sentence_id", "label_id"], axis=1)
yue_df = yue_df.drop(yue_df[yue_df.label=="disgust"].index)
df.drop(columns=["path"], axis=1)
# df = pd.concat([yue_df, df])
# df = yue_df

In [None]:
len(df)
# 176 362 895

# Data Processing

In [None]:
def generalize(x):
    if x == "angry":
        return "anger"
    return x

emotions = list(df['label'].unique())
EtoI = {label: ind for ind, label in enumerate(df['label'].unique())}
ItoE = {i: l for l, i in EtoI.items()}
df['label'] = df['label'].map(generalize)
df['label'] = df['label'].map(EtoI)

In [None]:
class MixedDataset(Dataset):
    def __init__(self, df, processor, max_length=160000):
        self.df = df
        self.processor = processor
        self.max_length = max_length
        # self.audio = [processor(librosa.load(df["path"].iloc[i])[0], sampling_rate=16000, return_tensors="pt", padding=True, max_length=max_length) for i in range(len(df))]
        # self.audio = [processor()]

    def __len__(self):
        return len(self.df)

    def __getitem__(self, ind):
        label = self.df.iloc[ind]['label']
        speech = self.df.iloc[ind]['audio']
        text_feature = self.df.iloc[ind]['text_rep']

        if len(speech)>self.max_length:
            speech = speech[:self.max_length]
        else:
            speech = np.pad(speech, (0, self.max_length-len(speech)), 'constant')

        audios = self.processor(speech, sampling_rate=16000, return_tensors='pt', padding=True, max_length=self.max_length)
        # if self.audio[ind] == 0:
        #     self.audio[ind] = processor(librosa.load(df["path"].iloc[ind])[0], sampling=16000, return_tensors="pt", padding=True, max_length=self.max_length)
        # input_val = self.audio[ind].input_values.squeeze()
        # label = self.df["label"].iloc[ind]
        audio_feature = audios.input_values.squeeze()
        text_feature = torch.from_numpy(text_feature)
        
        input_val = torch.cat((audio_feature, text_feature))
        # input_val = audio_feature
        return {'input_values':input_val, 'labels':torch.tensor(label, dtype=torch.long)}

# Importing Pre-trained Models

In [None]:
# Classifier for Wav2Vec2
from sklearn.utils.class_weight import compute_class_weight
class Wav2Vec2ClassificationHead(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
        self.out_proj = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, features, **kwargs):
        x = features
        # x = self.relu(x)
        x = self.dense(x)
        # x = self.dropout(x)
        x = self.relu(x)
        # x = self.dense(x)
        # x = torch.tanh(x)
        # x = self.dropout(x)
        x = self.out_proj(x)
        return x

class_weights = compute_class_weight(class_weight="balanced", classes=range(5) , y=df["label"])
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device="cuda")

class Wav2Vec2ForClassification(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)

        self.wav2vec2 = Wav2Vec2Model(config)
        num_layers = config.num_hidden_layers + 1  # transformer layers + input embeddings
        if config.use_weighted_layer_sum:
            self.layer_weights = nn.Parameter(torch.ones(num_layers) / num_layers)
            
        self.classifier = Wav2Vec2ClassificationHead(config)

        # Initialize weights and apply final processing
        self.post_init()

    def freeze_feature_extractor(self):
        warnings.warn(
            "The method `freeze_feature_extractor` is deprecated and will be removed in Transformers v5. "
            "Please use the equivalent `freeze_feature_encoder` method instead.",
            FutureWarning,
        )
        self.freeze_feature_encoder()

    def freeze_feature_encoder(self):
        self.wav2vec2.feature_extractor._freeze_parameters()

    def freeze_base_model(self):
        for param in self.wav2vec2.parameters():
            param.requires_grad = False

    def forward(
        self,
        input_values,
        attention_mask = None,
        output_attentions = None,
        output_hidden_states = None,
        return_dict = None,
        labels = None,
    ):

        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.wav2vec2(
            input_values,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        hidden_states = outputs[0]
        pooled_output = hidden_states.mean(dim=1)

        logits = self.classifier(pooled_output)

        loss = None
        if labels is not None:
            
            loss_fct = CrossEntropyLoss(weight=class_weights)
            # loss_fct = CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.config.num_labels), labels.view(-1))

        if not return_dict:
            output = (logits,) + outputs[_HIDDEN_STATES_START_POSITION:]
            return ((loss,) + output) if loss is not None else output

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [None]:
# acoustic model for speech emotion recognition
processor = AutoProcessor.from_pretrained("ctl/wav2vec2-large-xlsr-cantonese")
models = [Wav2Vec2ForClassification.from_pretrained("ctl/wav2vec2-large-xlsr-cantonese", num_labels=5) for i in range(num_of_folds)]

In [None]:
# speech to text model for text features
from transformers import pipeline
device = "cuda"
MODEL_NAME = "alvanlii/whisper-small-cantonese" 
lang = "zh"
pipe = pipeline(
    task="automatic-speech-recognition",
    model=MODEL_NAME,
    chunk_length_s=30,
    device=device,
)
pipe.model.config.forced_decoder_ids = pipe.tokenizer.get_decoder_prompt_ids(language=lang, task="transcribe")

In [None]:
import fasttext
ft = fasttext.load_model("/kaggle/input/canto-fasttext/toastynews.bin")

# Accuire Text Representation of Audio

In [None]:
text = [pipe(df.iloc[i]["audio"])["text"] for i in range(len(df))]
text_rep = [ft.get_sentence_vector(x) for x in text]
df["text_rep"] = text_rep
pipe = None
ft = None
gc.collect()
torch.cuda.empty_cache()


# Prepare Data for K-Fold Cross-Validation

In [None]:
fold = StratifiedKFold(n_splits=num_of_folds, random_state=42, shuffle=True)
train_df_arr= []
test_df_arr = []
for train_ind, test_ind in fold.split(df[df["group"]=="elderly"], df[df["group"]=="elderly"]["label"]):
    train_df_arr.append(pd.concat([df[df["group"]=="elderly"].iloc[train_ind], df[df["group"]=="adult"]]))
    test_df_arr.append(df[df["group"]=="elderly"].iloc[test_ind])

# Model Training

In [None]:
train_dataset_arr = [MixedDataset(x, processor) for x in train_df_arr]
test_dataset_arr = [MixedDataset(x, processor) for x in test_df_arr]

In [None]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix

def compute_metrics(pred):
  labels = pred.label_ids
  preds = np.argmax(pred.predictions, axis=1)
  accuracy = accuracy_score(labels, preds)
  precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
  s_labels = [ItoE[x] for x in labels]
  s_preds = [ItoE[x] for x in preds]
  conf_matrix = confusion_matrix(s_labels, s_preds, labels=list(EtoI.keys()))
  return {
      'accuracy': accuracy,
      'precision': precision,
      'recall': recall,
      'f1': f1,
      'confusion_matrix': conf_matrix.tolist(),
  }

In [None]:
training_args = TrainingArguments(
    output_dir='./results',
    logging_strategy='epoch',
    eval_strategy='epoch',
    learning_rate=5e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=10,
    weight_decay=0.01,
    report_to=[]
)

In [None]:
res = []
for i in range(num_of_folds):
    train_data = DataLoader(train_dataset_arr[i], batch_size=8, shuffle=True).dataset
    test_data = DataLoader(test_dataset_arr[i], batch_size=8, shuffle=True).dataset
    trainer = Trainer(
        model=models[i],
        args=training_args,
        train_dataset=train_data,
        eval_dataset=test_data,
        compute_metrics=compute_metrics,
        tokenizer=processor.feature_extractor,
    )
    trainer.train()
    res.append(trainer.evaluate())
    trainer = None
    train_data = None
    test_data = None
    gc.collect()
    torch.cuda.empty_cache()

In [None]:
from sklearn.metrics import ConfusionMatrixDisplay
emotions = list(EtoI.keys())
ind = 0
for x in res:
    np_confusion_matrix = np.array(x['eval_confusion_matrix'])
    conf_matrix_plot = ConfusionMatrixDisplay(confusion_matrix=np_confusion_matrix, display_labels=emotions)
    conf_matrix_plot.plot()
    plt.savefig(f'confusion_matrix_{ind}.png')
    ind = ind+1
# plt.savefig('plot.png')

In [None]:
mean_accuracy = 0
for i in res:
    mean_accuracy += i["eval_accuracy"]
mean_accuracy /= num_of_folds
print(mean_accuracy)

In [None]:
for ind, x in enumerate(res):
    print(
        f"Fold {ind}:",
        f"Eval Accuracy: {x['eval_accuracy']}",
        f"Eval F1 Score: {x['eval_f1']}",
        sep="\n"
    )