**Modules**
os to deal with the data
visualization: seaborn, matplotlib
audio visualization:librosa
play audio: Iputhon Audio

In [1]:
import os
import torch
import torchaudio
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import math
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
from transformers import Wav2Vec2Model, Wav2Vec2Config
from tqdm import tqdm
import matplotlib.pyplot as plt
from torch.cuda.amp import autocast, GradScaler
import warnings
warnings.filterwarnings("ignore")


In [2]:
torch.backends.cudnn.benchmark = True  
torch.backends.cuda.matmul.allow_tf32 = True  
torch.backends.cudnn.allow_tf32 = True

Hardware Check

In [3]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"VRAM: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB")


Using device: cuda
GPU: NVIDIA GeForce RTX 3070 Ti
VRAM: 8.00 GB


In [4]:
torch.cuda.empty_cache()

In [None]:
BATCH_SIZE = 16  
MAX_LENGTH = 16000 * 4  
SAMPLE_RATE = 16000
NUM_EPOCHS = 25
LEARNING_RATE = 2e-5
GRADIENT_ACCUMULATION_STEPS = 2


train_dataset_path = 'C:/Users/NJS/Desktop/Thesis/TRAINING_DATASET'
test_dataset_path = 'C:/Users/NJS/Desktop/Thesis/TESTING_DATASET'


**Dataset Label Load**

In [None]:
def load_dataset(dataset_path):
    paths = []
    labels = []

    for dirname, _, filenames in os.walk(dataset_path):
        for filename in filenames:
            file_path = os.path.join(dirname, filename)
            paths.append(file_path)
            label = filename.split('_')[0]
            labels.append(label.lower())

    dataframe = pd.DataFrame({'audio': paths, 'emotion': labels})
    dataframe = dataframe[dataframe['emotion'] != 'calm'].reset_index(drop=True)
    return dataframe

train_df = load_dataset(train_dataset_path)
test_df = load_dataset(test_dataset_path)

label_encoder = LabelEncoder()
label_encoder.fit(train_df['emotion'])
NUM_CLASSES = len(label_encoder.classes_)
print(f"Emotion classes: {label_encoder.classes_}")

Emotion classes: ['angry' 'disgust' 'fear' 'happy' 'neutral' 'sad' 'surprise']


In [None]:
class AudioFeatureExtractor(nn.Module):
    def __init__(self, feature_type='wav2vec'):
        super().__init__()
        self.feature_type = feature_type
        
        if feature_type == 'wav2vec':
            config = Wav2Vec2Config.from_pretrained("facebook/wav2vec2-base")
            self.feature_extractor = Wav2Vec2Model(config)
            for param in self.feature_extractor.parameters():
                param.requires_grad = False
        elif feature_type == 'melspectrogram':
            self.transform = torchaudio.transforms.MelSpectrogram(
                sample_rate=SAMPLE_RATE,
                n_fft=1024,
                hop_length=256,
                n_mels=80
            )
        elif feature_type == 'mfcc':
            self.transform = torchaudio.transforms.MFCC(
                sample_rate=SAMPLE_RATE,
                n_mfcc=40,
                melkwargs={'n_fft': 1024, 'hop_length': 256, 'n_mels': 80}
            )
    
    def forward(self, x):
        x = x.squeeze(1)  
        
        if self.feature_type == 'wav2vec':
            with torch.no_grad():
                features = self.feature_extractor(x).last_hidden_state
        else:
            features = self.transform(x)
            if len(features.shape) == 3:  
                features = features.permute(0, 2, 1)  
        return features

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)  
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)  
        pe[:, 1::2] = torch.cos(position * div_term)  
        self.register_buffer('pe', pe)

    def forward(self, x):
        seq_len = x.size(1) 
        pe = self.pe[:, :seq_len]  
        pe = pe.to(x.device)  
        return x + pe

In [None]:
class AudioEmotionTransformer(nn.Module):
    def __init__(self, feature_type='wav2vec'):
        super().__init__()
        self.feature_extractor = AudioFeatureExtractor(feature_type).to(device)
        
        if feature_type == 'wav2vec':
            self.d_model = 768
        elif feature_type == 'melspectrogram':
            self.d_model = 80
        elif feature_type == 'mfcc':
            self.d_model = 40
        
        self.positional_encoding = PositionalEncoding(self.d_model)
        
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(
                d_model=self.d_model,
                nhead=8,
                dim_feedforward=2048,
                dropout=0.1,
                batch_first=True
            ),
            num_layers=4
        )
        
        self.classifier = nn.Sequential(
            nn.Linear(self.d_model, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, NUM_CLASSES))
    
    def forward(self, x):

        features = self.feature_extractor(x.unsqueeze(1))
        

        if len(features.shape) == 3: 
            features = self.positional_encoding(features)
        elif len(features.shape) == 4:  
            features = features.squeeze(1) 
            features = features.permute(0, 2, 1)  
            features = self.positional_encoding(features)
        
        features = self.transformer(features)
        

        pooled = features.mean(dim=1)
        return self.classifier(pooled)

In [None]:
class AudioEmotionDataset(Dataset):
    def __init__(self, dataframe):
        self.audio_paths = dataframe['audio'].values
        self.labels = label_encoder.transform(dataframe['emotion'])
        self.resampler = torchaudio.transforms.Resample(orig_freq=SAMPLE_RATE, new_freq=SAMPLE_RATE)
        
    def __len__(self):
        return len(self.audio_paths)
    
    def __getitem__(self, idx):
        waveform, sample_rate = torchaudio.load(self.audio_paths[idx])
        
        if sample_rate != SAMPLE_RATE:
            waveform = self.resampler(waveform)
        
        if waveform.shape[0] > 1:
            waveform = torch.mean(waveform, dim=0, keepdim=True)
        
        if waveform.shape[1] < MAX_LENGTH:
            waveform = torch.nn.functional.pad(waveform, (0, MAX_LENGTH - waveform.shape[1]))
        else:
            waveform = waveform[:, :MAX_LENGTH]
        
        return waveform.squeeze(0), torch.tensor(self.labels[idx], dtype=torch.long)

In [None]:
def train_and_evaluate():

    train_dataset = AudioEmotionDataset(train_df)
    test_dataset = AudioEmotionDataset(test_df)
    
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0, pin_memory=True)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)
    

    model = AudioEmotionTransformer(feature_type='wav2vec').to(device)
    optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.5)
    criterion = nn.CrossEntropyLoss()
    scaler = GradScaler()
    
    best_accuracy = 0.0
    history = {'train_loss': [], 'val_loss': [], 'val_acc': []}

    for epoch in range(NUM_EPOCHS):
        model.train()
        running_loss = 0.0
        optimizer.zero_grad()

        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS}")
        
        for i, (inputs, labels) in enumerate(progress_bar):
            inputs, labels = inputs.to(device), labels.to(device)

            with autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)


            scaled_loss = scaler.scale(loss / GRADIENT_ACCUMULATION_STEPS)
            scaled_loss.backward()

            if (i + 1) % GRADIENT_ACCUMULATION_STEPS == 0:
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()

            running_loss += loss.item() * inputs.size(0)
            progress_bar.set_postfix({'loss': loss.item()})


        model.eval()
        val_loss = 0.0
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(torch.long).to(device)
                with autocast():
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                val_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())


        train_loss = running_loss / len(train_dataset)
        val_loss = val_loss / len(test_dataset)
        val_accuracy = (np.array(all_preds) == np.array(all_labels)).mean() * 100

        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_accuracy)

        print(f"\nEpoch {epoch+1}/{NUM_EPOCHS} - Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Val Acc: {val_accuracy:.2f}%")
        
        if val_accuracy > best_accuracy:
            best_accuracy = val_accuracy
            torch.save(model.state_dict(), "best_model.pth")
            print("Saved new best model")

        scheduler.step(val_loss)
        torch.cuda.empty_cache()


    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Train Loss')
    plt.plot(history['val_loss'], label='Val Loss')
    plt.legend()
    plt.title("Loss over epochs")
    
    plt.subplot(1, 2, 2)
    plt.plot(history['val_acc'], label='Val Accuracy')
    plt.legend()
    plt.title("Accuracy over epochs")
    
    plt.tight_layout()
    plt.savefig("training_history.png", dpi=300)
    plt.show()


    print("\nClassification Report:")
    print(classification_report(all_labels, all_preds, target_names=label_encoder.classes_))


In [None]:
if __name__ == "__main__":

    print(f"Initial GPU memory allocated: {torch.cuda.memory_allocated()/1024**2:.2f} MB")
    print(f"Initial GPU memory cached: {torch.cuda.memory_reserved()/1024**2:.2f} MB")
    

    train_and_evaluate()
    

    print(f"\nPeak GPU memory allocated: {torch.cuda.max_memory_allocated()/1024**3:.2f} GB")
    print(f"Peak GPU memory reserved: {torch.cuda.max_memory_reserved()/1024**3:.2f} GB")

Initial GPU memory allocated: 0.00 MB
Initial GPU memory cached: 0.00 MB


Epoch 1/25: 100%|██████████| 452/452 [01:32<00:00,  4.87it/s, loss=2.93]



Epoch 1/25 - Train Loss: 1.7129 | Val Loss: 1.6231 | Val Acc: 36.18%
Saved new best model


Epoch 2/25:  24%|██▍       | 108/452 [00:10<00:32, 10.54it/s, loss=1.35]