In [1]:
import os, sys
from os.path import isfile, isdir, join
import yaml, json

import librosa 
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import cv2
import random

import torch 
import torch.nn as nn
import torch.nn.functional as F

In [2]:
from linformer import Linformer
from PIL import Image
from torch.optim.lr_scheduler import StepLR
from tqdm.notebook import tqdm
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
# from vit_pytorch.efficient import ViT
from vit_pytorch import ViT
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, roc_curve, auc, classification_report
import torch.utils.data as data
import torchvision
from torchvision.transforms import ToTensor
torch.cuda.is_available()

True

In [3]:
with open('./meld.yaml', 'r') as fp:
    meld_dict = yaml.safe_load(fp)
    
train_split = meld_dict['train']
test_split = meld_dict['test']
dev_split = meld_dict['dev']

In [4]:
for aud_key in train_split.keys():
    aud_path = f"./dataset_extracted/output_train_extracted/{aud_key}.wav"
    
     # Check if file exists, skip if not
    if not os.path.exists(aud_path):
        print(f"File not found, skipping: {aud_path}")
        continue
    
    aud_properties = train_split[aud_key]
    text = aud_properties['Utterance']
    emotion_label = aud_properties['Emotion']
    # print(aud_path)
    # print(text)
    # break 

In [6]:
for aud_key in dev_split.keys():
    aud_path = f"./dataset_extracted/output_dev_extracted/{aud_key}.wav"
    
     # Check if file exists, skip if not
    if not os.path.exists(aud_path):
        print(f"File not found, skipping: {aud_path}")
        continue
    
    aud_properties = dev_split[aud_key]
    text = aud_properties['Utterance']
    emotion_label = aud_properties['Emotion']

In [None]:
# def custom_model(nn.Module):
#     def __init__(self):
#         super(custom_model, self).__init__()
#         self.activation = nn.ReLU()
#         self.classification = nn.Linear(128*3, 7)
    
#     def forward(self, x1, x2, x3):
#           y1 = model(x1)
#           y = torch.cat([y1, y2, y3])
#           y = self.activation(y)
#           y = self.classification(y)
#           y = torch.softmax(y, dim=1)
#             return y

In [None]:
from torch.utils.data import Dataset, DataLoader

class meld_dataset(Dataset):
    def __init__(self, dataset_path='./meld.yaml', split_name='train', sr=16000, audio_seq_len=10, text_seq_len=512):
        super(meld_dataset, self).__init__()
        with open(dataset_path, 'r') as fp:
            meld_dict = yaml.safe_load(fp)
        self.train_split = meld_dict['train']
        self.test_split = meld_dict['test']
        self.dev_split = meld_dict['dev']
        self.split_name = split_name
        
        self.train_keys = list(self.train_split.keys())
        self.test_keys = list(self.test_split.keys())
        self.dev_keys = list(self.dev_split.keys())
        
        self.audio_model = None  # Placeholder for audio model initialization
        self.sr = sr
        self.audio_seq_len = audio_seq_len
        self.text_seq_len = text_seq_len
        
        # self.img_transforms = transforms.Compose([
        #     transforms.RandomCrop(224),
        #     transforms.RandomHorizontalFlip(),
        #     transforms.RandomAffine(degrees=45., translate=(0.1, 0.1), scale=(0.8, 1.2)),
        # ])
        
    def __len__(self):
        if self.split_name == 'train':
            return len(self.train_split)
        if self.split_name == 'test':
            return len(self.test_split)
        if self.split_name == 'dev':
            return len(self.dev_split)
    
    # For train split
    def __getitem__(self, index):
        if self.split_name == 'train':
            aud_id = self.train_keys[index]
            aud_path = f"./dataset_extracted/output_train_extracted/{aud_id}.wav"
            audio_input, _ = librosa.load(aud_path, sr=self.sr)
            
            # Trimming silence from audio
            audio_input, _ = librosa.effects.trim(audio_input, top_db=20)
            
            spectrogram  = librosa.feature.melspectrogram(y=audio_input)
            spectrogram_db = librosa.amplitude_to_db(spectrogram, ref=np.max)
            text_feature = self.train_split[aud_id]['Utterance']
            emotion_label = self.train_split[aud_id]['Emotion']
        
        # For test split    
        elif self.split_name == 'test':
            aud_id = self.test_keys[index]
            aud_path = f"./dataset_extracted/output_test_extracted/{aud_id}.wav"
            audio_input, _ = librosa.load(aud_path, sr=self.sr)
            
            spectrogram  = librosa.feature.melspectrogram(y=audio_input)
            text_feature = self.test_split[aud_id]['Utterance']
            emotion_label = self.test_split[aud_id]['Emotion']
        
        
        # For dev split    
        elif self.split_name == 'dev':
            aud_dev_id = self.dev_keys[index]
            aud_dev_path = f"./dataset_extracted/output_dev_extracted/{aud_dev_id}.wav"
            audio_dev_input, _ = librosa.load(aud_dev_path, sr=self.sr)
            
            # Trimming silence from audio
            audio_dev_input, _ = librosa.effects.trim(audio_dev_input, top_db=20)
        
            spectrogram_dev  = librosa.feature.melspectrogram(y=audio_dev_input)
            spectrogram_dev_db = librosa.amplitude_to_db(spectrogram_dev, ref=np.max)
            
            text_feature = self.dev_split[aud_id]['Utterance']
            emotion_label = self.dev_split[aud_id]['Emotion']
        
        # print(audio_input.shape[0]/self.sr)
        # print(spectrogram_db.shape)
        # print(spectrogram_db.shape[1]/(audio_input.shape[0]/self.sr))
        
        
        # Preprocess training data features by trimming/padding text, audio, and spectrogram 
        # to fixed sequence lengths for consistency during training.
        text_feature = text_feature[:min(len(text_feature), self.text_seq_len)] + (" "*(self.text_seq_len - len(text_feature)) if len(text_feature) < self.text_seq_len else "")
        audio_input = audio_input[:min(len(audio_input), self.audio_seq_len * self.sr)]
        audio_input = np.pad(audio_input, (0, (self.audio_seq_len * self.sr) - len(audio_input)), 'constant')
        spectrogram_db = spectrogram_db[:,:min(spectrogram_db.shape[1], int(self.audio_seq_len * 32))]
        spectrogram_db = np.pad(spectrogram_db, ((0, 0), (0, (self.audio_seq_len * 32) - spectrogram_db.shape[1])), 'constant')
        
        # Convert all training features to tensors
        spectrogram_db = spectrogram_db.astype(np.float32)
        spectrogram_db = (spectrogram_db - spectrogram_db.min())/(spectrogram_db.max() - spectrogram_db.min())
        spectrogram_db = cv2.resize(spectrogram_db, (256, 256), interpolation=cv2.INTER_NEAREST)
        spectrogram_tensor = torch.from_numpy(spectrogram_db[np.newaxis, ...])
        
        
        
        audio_tensor = torch.from_numpy(audio_input)
        text_tensor = torch.from_numpy(np.array([ord(c) for c in text_feature], dtype=np.long))  # Convert text to ASCII values
        
        # Convert emotion label to tensor
        # You might want to create an emotion_to_idx mapping in __init__
        emotion_to_idx = {
            'neutral': 0, 'surprise': 1, 'fear': 2, 'sadness': 3,
            'joy': 4, 'disgust': 5, 'anger': 6
        }
        emotion_tensor = torch.tensor(emotion_to_idx[emotion_label], dtype=torch.long)

        return {
            'spectrogram': spectrogram_tensor,
            'text_feature': text_tensor,
            'audio_feature': audio_tensor,
            'emotion_label': emotion_tensor
        }

In [None]:
# Set random seed for reproducibility
seed = 142
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    # torch.backends.cudnn.deterministic = True
    # torch.backends.cudnn.benchmark = False

# Hyperparameters:
epochs = 15
lr = 0.01

# gamma = 0.7
# patch_size = 16
num_classes = 7

In [None]:
# Training device:

# device = 'cuda:0'

device = torch.device('cuda:1' if torch.cuda.is_available() else 'cpu')

# Define spectrogram dimensions and patch size:
spec_height = 256
spec_width = 224
patch_size = 16

# Compute number of patches and sequence length (with class token)
num_patches = (spec_height // patch_size) * (spec_width // patch_size)
seq_len = num_patches + 1  # +1 for class token

# Linear Transformer (adjust seq_len):
efficient_transformer = Linformer(dim=128, seq_len=seq_len, depth=12, heads=8, k=64)

# Vision Transformer Model for audio spectrogram (using 1 channel):
model = ViT(
    image_size=spec_height,
    patch_size=patch_size,
    num_classes=128,
    # transformer=efficient_transformer,
    dim = 1024,
    depth = 6,
    heads = 8,
    mlp_dim = 2048,
    dropout = 0.1,
    emb_dropout = 0.1,
    channels=1,
).to(device)
model.train()

# Loss function, Optimizer and Learning Rate Scheduler:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.7, patience=3, verbose=True)

In [None]:
from tqdm.notebook import tqdm
dataset = meld_dataset(dataset_path='./meld.yaml', split_name='train')
dataloader_obj = DataLoader(dataset, batch_size=16, shuffle=True)

for epoch_num in range(epochs):
    print(f"Epoch: {epoch_num+1}/{epochs}")
    for idx, data_dict in enumerate(tqdm(dataloader_obj)):
        # print(data_dict['spectrogram'].shape)
        # print(data_dict['text_feature'].shape)
        # print(data_dict['audio_feature'].shape)
        # print(data_dict['emotion_label'].shape)
        
        optimizer.zero_grad()
        input = data_dict['spectrogram'].to(device)
        labels = data_dict['emotion_label'].to(device)
        output = model(input)
        output  = torch.softmax(output, dim=1)
        # output  = torch.argmax(output, dim=1)
        # print(output.shape)
        # print(output)
        # print(labels)
        loss = criterion(output, labels)
        loss.backward()
        scheduler.step(loss)
    #     break
    # break
    
    

In [None]:
# Load dataset and get a spectrogram
dataset = meld_dataset(dataset_path='./meld.yaml', split_name='train')
sample_0 = dataset[0]  # Fetch one sample

# Extract the spectrogram correctly
spectrogram_db_0 = sample_0['spectrogram ']  # Fix key to match dictionary

# Plot the spectrogram
plt.figure(figsize=(10, 4))
librosa.display.specshow(spectrogram_db_0, sr=16000, x_axis='time', y_axis='mel', cmap='magma')
plt.colorbar(format='%+2.0f dB')
plt.title('Mel Spectrogram')
plt.tight_layout()
plt.show()

In [None]:
# Load dataset and get a spectrogram
dataset = meld_dataset(dataset_path='./meld.yaml', split_name='train')
sample_1 = dataset[1]  # Fetch one sample

# Extract the spectrogram correctly
spectrogram_db_1 = sample_1['spectrogram ']  # Fix key to match dictionary

# Plot the spectrogram
plt.figure(figsize=(10, 4))
librosa.display.specshow(spectrogram_db_1, sr=16000, x_axis='time', y_axis='mel', cmap='magma')
plt.colorbar(format='%+2.0f dB')
plt.title('Mel Spectrogram')
plt.tight_layout()
plt.show()

In [None]:
pd.Series(sample_0['audio_feature']).plot(figsize=(10, 5),
                  lw=1,
                  title='Raw Audio Example',
                  color='blue')
plt.show()

In [None]:
pd.Series(sample_1['audio_feature']).plot(figsize=(10, 5),
                  lw=1,
                  title='Raw Audio Example',
                  color='blue')
plt.show()