In [None]:
import os
import glob
import pandas as pd
import csv

from datasets import load_dataset, load_metric
from dataclasses import dataclass
import torch
from typing import Dict, List, Optional, Union
import torch.nn as nn
from dataclasses import dataclass
from typing import Optional, Tuple
from transformers.models.wav2vec2.modeling_wav2vec2 import (Wav2Vec2PreTrainedModel,Wav2Vec2Model)
from typing import Any, Dict, Union
from packaging import version
from torch import nn

from transformers.file_utils import ModelOutput

import transformers
from transformers import AutoConfig, Wav2Vec2Processor, Wav2Vec2Model
import soundfile as sf
import librosa
import torchaudio

### Preparing data

In [None]:
data_files = {
    "train": "C:/Data/Sentiment Analysis/MELD/Processed/wav2vec/wav_train.csv", 
    "dev": "C:/Data/Sentiment Analysis/MELD/Processed/wav2vec/wav_dev.csv",
    "test": "C:/Data/Sentiment Analysis/MELD/Processed/wav2vec/wav_test.csv",
    
}

dataset = load_dataset("csv", data_files=data_files)
train_dataset = dataset["train"]
eval_dataset = dataset["dev"]
test_dataset = dataset["test"]

print(train_dataset)
print(eval_dataset)
print(test_dataset)

In [None]:
# We need to specify the input and output column
input_column = "path"
output_column = "sentiment"

In [None]:
# we need to distinguish the unique labels in our SER dataset
label_list = train_dataset.unique(output_column)
label_list.sort()  # Let's sort it for determinism
num_labels = len(label_list)
print(f"A classification problem with {num_labels} classes: {label_list}")

In [None]:
model_name_or_path = "facebook/wav2vec2-large-960h"
pooling_mode = "mean"

In [None]:
# config
config = AutoConfig.from_pretrained(
    model_name_or_path,
    num_labels=num_labels,
    label2id={label: i for i, label in enumerate(label_list)},
    id2label={i: label for i, label in enumerate(label_list)},
    finetuning_task="wav2vec2_clf",
)
setattr(config, 'pooling_mode', pooling_mode)

In [None]:
config

In [None]:
processor = Wav2Vec2Processor.from_pretrained(model_name_or_path)
target_sampling_rate = processor.feature_extractor.sampling_rate
print(f"The target sampling rate: {target_sampling_rate}")

### Preprocess data

In [None]:
def speech_file_to_array_fn(path):
    speech_array, sampling_rate = torchaudio.load(path)
    resampler = torchaudio.transforms.Resample(sampling_rate, target_sampling_rate)
    speech = resampler(speech_array).squeeze().numpy()
    return speech

def label_to_id(label, label_list):

    if len(label_list) > 0:
        return label_list.index(label) if label in label_list else -1

    return label

def preprocess_function(examples):
    speech_list = [speech_file_to_array_fn(path) for path in examples[input_column]]
    target_list = [label_to_id(label, label_list) for label in examples[output_column]]

    result = processor(speech_list, sampling_rate=target_sampling_rate)
    result["labels"] = list(target_list)
    
    #print(len(speech_list))
    #print(speech_list)
    #print(len(result))
    #print(result)
    
    return result

In [None]:
#preprocess_function(train_dataset[:2]) #debugging

In [None]:
train_dataset = train_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=1
)
eval_dataset = eval_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=1
)
test_dataset = test_dataset.map(
    preprocess_function,
    batch_size=100,
    batched=True,
    num_proc=1
)

In [None]:
idx = 0
print(f"Training input_values: {train_dataset[idx]['input_values']}")
print(f"Training labels: {train_dataset[idx]['labels']} - {train_dataset[idx]['sentiment']}")

In [None]:
len(train_dataset[idx]['input_values'])

### Model 

In [None]:
class Wav2Vec2ForSpeechClassification(Wav2Vec2PreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        #self.num_labels = config.num_labels
        self.pooling_mode = config.pooling_mode
        self.config = config

        self.wav2vec2 = Wav2Vec2Model(config)
        #self.classifier = Wav2Vec2ClassificationHead(config)

        self.init_weights()

    def freeze_feature_extractor(self):
        self.wav2vec2.feature_extractor._freeze_parameters()

    def merged_strategy(
            self,
            hidden_states,
            mode="max"
    ):
        if mode == "mean":
            outputs = torch.mean(hidden_states, dim=1)
        elif mode == "sum":
            outputs = torch.sum(hidden_states, dim=1)
        elif mode == "max":
            outputs = torch.max(hidden_states, dim=1)[0]
        else:
            raise Exception(
                "The pooling method hasn't been defined! Your pooling mode must be one of these ['mean', 'sum', 'max']")

        return outputs

    def forward(
            self,
            input_values,
            attention_mask=None,
            output_attentions=None,
            output_hidden_states=None,
            return_dict=None,
            labels=None,
    ):
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict
        outputs = self.wav2vec2(
            input_values,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        
        hidden_states = outputs[0]
        #print(len(hidden_states))
        #print(hidden_states.shape)
        hidden_states = self.merged_strategy(hidden_states, mode=self.pooling_mode)
        #print(hidden_states.shape)
        return hidden_states

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

In [None]:
model = Wav2Vec2ForSpeechClassification.from_pretrained(model_name_or_path,config=config).to(device)

### Extracting train embeddings

In [None]:
embeddings = {'embeddings' : [], 'labels' : [], 'fileID' : []}

with torch.no_grad():
    for input_values in train_dataset['input_values']:
        tensor = torch.FloatTensor([input_values]).to(device)
        results = model(tensor)
        embeddings["embeddings"].append(results)

for labels in train_dataset['labels']:
    label = labels
    embeddings["labels"].append(label)

for file in train_dataset['name']:
    fileID = file
    embeddings["fileID"].append(fileID)

In [None]:
embeddings['embeddings'][0].shape

In [None]:
embeddings['embeddings'][0]

In [None]:
len(embeddings['embeddings'])

#### Saving and loading tensors 

In [None]:
PATH = "C:/Data/Sentiment Analysis/MELD/Processed/wav2vec/embeddings_v2/train_wav_large.pt"
torch.save(embeddings, PATH)

In [None]:
embgs = torch.load(PATH)

In [None]:
embgs

### Extracting dev embeddings 

In [None]:
embeddings = {'embeddings' : [], 'labels' : [], 'fileID' : []}

with torch.no_grad():
    for input_values in eval_dataset['input_values']:
        tensor = torch.FloatTensor([input_values]).to(device)
        results = model(tensor)
        embeddings["embeddings"].append(results)

for labels in eval_dataset['labels']:
    label = labels
    embeddings["labels"].append(label)

for file in eval_dataset['name']:
    fileID = file
    embeddings["fileID"].append(fileID)

In [None]:
embeddings['embeddings'][0].shape

In [None]:
len(embeddings['embeddings'])

#### Saving and loading tensors 

In [None]:
PATH = "C:/Data/Sentiment Analysis/MELD/Processed/wav2vec/embeddings_v2/dev_wav_large.pt"
torch.save(embeddings, PATH)

In [None]:
embgs = torch.load(PATH)
embgs

### Extracting test embeddings  

In [None]:
embeddings = {'embeddings' : [], 'labels' : [], 'fileID' : []}

with torch.no_grad():
    for input_values in test_dataset['input_values']:
        tensor = torch.FloatTensor([input_values]).to(device)
        results = model(tensor)
        embeddings["embeddings"].append(results)

for labels in test_dataset['labels']:
    label = labels
    embeddings["labels"].append(label)

for file in test_dataset['name']:
    fileID = file
    embeddings["fileID"].append(fileID)

In [None]:
embeddings['embeddings'][0].shape

In [None]:
len(embeddings['embeddings'])

#### Saving and loading tensors 

In [None]:
PATH = "C:/Data/Sentiment Analysis/MELD/Processed/wav2vec/embeddings_v2/test_wav_large.pt"
torch.save(embeddings, PATH)

In [None]:
embgs = torch.load(PATH)
embgs

##  Example:

In [None]:
train_dataset[0]["input_values"]

In [None]:
tensor = torch.FloatTensor([train_dataset[0]['input_values']]).to(device)

In [None]:
tensor

In [None]:
tensor.shape

In [None]:
embeddings = model(tensor)

In [None]:
embeddings