# Original

In [3]:
import os
import numpy as np
import pandas as pd
import librosa
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import TensorDataset, DataLoader
import pickle

class AudioFeatureExtractor:
    def __init__(self, dataset_path, base_dir, sample_rate=44100):
        self.base_dir = base_dir
        self.dataset = pd.read_csv(dataset_path)
        self.sample_rate = sample_rate

    def extract_features(self, audio_path):
        wave, sr = librosa.load(audio_path, sr=self.sample_rate)
        if len(wave) < sr * 45:
            wave = np.pad(wave, (0, sr * 45 - len(wave)), 'constant')
        wave = wave[:sr * 45]

        hop_length = int(sr * 0.01)
        win_length = int(sr * 0.025)

        mfcc = librosa.feature.mfcc(y=wave, sr=sr, n_mfcc=20, n_fft=2048, hop_length=hop_length, win_length=win_length)
        chroma = librosa.feature.chroma_stft(y=wave, sr=sr, n_fft=2048, hop_length=hop_length)
        contrast = librosa.feature.spectral_contrast(y=wave, sr=sr, n_fft=2048, hop_length=hop_length)

        return np.concatenate((mfcc, chroma, contrast), axis=0)

    def prepare_data_loaders(self, test_size=0.2, batch_size=32):
        feature_file = os.path.join(self.base_dir, 'features.npy')
        annotation_file = os.path.join(self.base_dir, 'annotations.npy')

        if os.path.exists(feature_file) and os.path.exists(annotation_file):
            features = np.load(feature_file)
            annotations = np.load(annotation_file)
        else:
            features = []
            annotations = []
            total_files = len(self.dataset)
            for index, row in self.dataset.iterrows():
                print(f"Processing file {index+1}/{total_files}: {row['file_path']}")
                audio_path = os.path.join(self.base_dir, row['file_path'])
                feature = self.extract_features(audio_path)
                features.append(feature)
                annotations.append([row[' valence_mean'], row[' arousal_mean']])

            features = np.array(features, dtype=np.float32)
            annotations = np.array(annotations, dtype=np.float32)
            np.save(feature_file, features)
            np.save(annotation_file, annotations)

        X_train, X_test, y_train, y_test = train_test_split(features, annotations, test_size=test_size, random_state=42)
        train_dataset = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train))
        test_dataset = TensorDataset(torch.from_numpy(X_test), torch.from_numpy(y_test))

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        return train_loader, test_loader
    
    def prepare_data_loaders(self, val_size=0.1, test_size=0.1, batch_size=64):
        feature_file = os.path.join(self.base_dir, 'features.npy')
        annotation_file = os.path.join(self.base_dir, 'annotations.npy')

        if os.path.exists(feature_file) and os.path.exists(annotation_file):
            features = np.load(feature_file)
            annotations = np.load(annotation_file)
        else:
            features = []
            annotations = []
            total_files = len(self.dataset)
            for index, row in self.dataset.iterrows():
                print(f"Processing file {index+1}/{total_files}: {row['file_path']}")
                audio_path = os.path.join(self.base_dir, row['file_path'])
                feature = self.extract_features(audio_path)
                features.append(feature)
                annotations.append([row[' valence_mean'], row[' arousal_mean']])

            features = np.array(features, dtype=np.float32)
            annotations = np.array(annotations, dtype=np.float32)
            np.save(feature_file, features)
            np.save(annotation_file, annotations)

        X_train, X_temp, y_train, y_temp = train_test_split(features, annotations, test_size=val_size + test_size, random_state=42)
        X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=test_size/(val_size + test_size), random_state=42)

        train_dataset = TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train))
        val_dataset = TensorDataset(torch.from_numpy(X_val), torch.from_numpy(y_val))
        test_dataset = TensorDataset(torch.from_numpy(X_test), torch.from_numpy(y_test))

        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        return train_loader, val_loader, test_loader

base_dir = ''
dataset_path = os.path.join(base_dir, 'more_augmented_dataset.csv')
feature_extractor = AudioFeatureExtractor(dataset_path, base_dir)
# train_loader, test_loader = feature_extractor.prepare_data_loaders()

In [4]:
train_loader, val_loader, test_loader = feature_extractor.prepare_data_loaders()

In [5]:
len(feature_extractor.dataset)

8547

In [6]:
train_features, train_labels = next(iter(train_loader))
print("Training batch shape:")
print("Features shape:", train_features.shape)  # Shape of the input features
print("Labels shape:", train_labels.shape)      # Shape of the labels
print("Batch size:", train_features.size(0))    # The size of the batch

test_features, test_labels = next(iter(test_loader))
print("Test batch shape:")
print("Features shape:", test_features.shape)   # Shape of the input features
print("Labels shape:", test_labels.shape)       # Shape of the labels
print("Batch size:", test_features.size(0))     # The size of the batch

Training batch shape:
Features shape: torch.Size([64, 39, 4501])
Labels shape: torch.Size([64, 2])
Batch size: 64
Test batch shape:
Features shape: torch.Size([64, 39, 4501])
Labels shape: torch.Size([64, 2])
Batch size: 64


In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class AudioNet(nn.Module):
    def __init__(self, params_dict):
        super().__init__()

        self.in_ch = params_dict['in_ch']
        self.num_filters1 = params_dict['num_filters1']
        self.num_filters2 = params_dict['num_filters2']
        self.num_hidden = params_dict['num_hidden']
        self.out_size = params_dict['out_size']

        self.conv1 = nn.Sequential(
            nn.Conv1d(self.in_ch, self.num_filters1, 10, 1),
            nn.BatchNorm1d(self.num_filters1),
            nn.ReLU(),
            nn.AvgPool1d(2, 2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv1d(self.num_filters1, self.num_filters2, 10, 1),
            nn.BatchNorm1d(self.num_filters2),
            nn.ReLU(),
            nn.AvgPool1d(2, 2)
        )
        self.pool = nn.AvgPool1d(10, 10)

        self._to_linear = self._get_to_linear()

        self.fc1 = nn.Linear(self._to_linear, self.num_hidden)
        self.fc2 = nn.Linear(self.num_hidden, self.out_size)
        self.drop = nn.Dropout(0.5)
        self.act = nn.ReLU()

    def _get_to_linear(self):

        x = torch.randn(64, self.in_ch, 4501)  
        with torch.no_grad():
            x = self.conv1(x)
            x = self.conv2(x)
            x = self.pool(x)
        return x.numel() // x.shape[0]

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.pool(x)
        

        x = x.view(-1, self._to_linear) 

        x = self.fc1(x)
        x = self.drop(x)
        x = self.act(x)
        x = self.fc2(x)

        return x

In [8]:
import os
import torch
import torch.nn as nn
from sklearn.metrics import mean_squared_error, r2_score
import numpy as np

class Trainer:
    def __init__(self, args):

        self.dimension = args.dimension
        self.num_epochs = getattr(args, 'num_epochs', 50)
        self._data_dir = args.data_dir
        self._models_dir = args.models_dir
        self._plots_dir = args.plots_dir
        self._device = torch.device(args.device if torch.cuda.is_available() else 'cpu')

        self.num_epochs = args.num_epochs
        self.log_interval = args.log_interval

        self._lr = args.lr_init
        self._lr_decay = args.lr_decay
        self._weight_decay = args.weight_decay
        self.train_mse = []
        self.train_r2 = []
        self.test_mse = []
        self.test_r2 = []

        self.train_loader, self.test_loader = train_loader, test_loader
        if self.dimension == 'valence':
            self._params_dict = args.valence_params_dict
        elif self.dimension == 'arousal':
            self._params_dict = args.arousal_params_dict
        else:
            self._params_dict = args.params_dict

        self.model = AudioNet(self._params_dict).to(self._device)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self._lr, weight_decay=self._weight_decay)
        self._criterion = nn.MSELoss()

        if self.dimension == 'both':
            self.train_dict = {'valence_loss': [], 'arousal_loss': []}
            self.test_dict = {'valence_loss': [], 'arousal_loss': []}
        else:
            self.train_dict = {'loss': []}
            self.test_dict = {'loss': []}

    def print_progress(self, epoch, train_loss, train_r2, test_loss, test_r2):
        print(f"Epoch: {epoch}/{self.num_epochs}, Train Loss: {train_loss:.4f}, Train R2: {train_r2:.4f}, "
              f"Train MSE: {test_loss:.4f}")

    def calculate_metrics(self, true_values, predictions):
        true_values = np.array(true_values)
        predictions = np.array(predictions)
        mse = mean_squared_error(true_values, predictions)
        r2 = r2_score(true_values, predictions)
        return mse, r2

    def save_model(self):
        os.makedirs(self._models_dir, exist_ok=True)
        model_path = os.path.join(self._models_dir, f'model_{self.dimension}.pth')
        torch.save(self.model.state_dict(), model_path)
        print(f"Model saved to {model_path}")


    def update_learning_rate(self):
        self._lr *= self._lr_decay

        for param_group in self.optimizer.param_groups:
            param_group['lr'] = self._lr
        success_message = 'Learning rate updated to {:.1e}'.format(self._lr)
        print(success_message)


    def train_1d(self):
        print(f"Training for {self.dimension}...")
        for epoch in range(1, self.num_epochs + 1):
            self.model.train()
            train_losses, train_targets, train_outputs = [], [], []
            
            for data, target in self.train_loader:
                data, target = data.to(self._device), target.to(self._device)
                self.optimizer.zero_grad()
                if torch.isnan(data).any():
                    print('NaN values detected in data tensor.')
                    print(data)
                output = self.model(data)
                if self.dimension == 'valence':
                    target = target[:, 0]
                elif self.dimension == 'arousal':
                    target = target[:, 1]

                loss = self._criterion(output.squeeze(), target)
                loss.backward()
                self.optimizer.step()

                train_losses.append(loss.item())
                train_targets.extend(target.detach().cpu().numpy())
                train_outputs.extend(output.detach().cpu().numpy())
            
            train_loss = np.mean(train_losses)
            train_mse, train_r2 = self.calculate_metrics(train_targets, train_outputs)
            self.train_mse.append(train_mse)
            self.train_r2.append(train_r2)


    def validate_1d(self):
        self.model.eval()
        test_losses, test_targets, test_outputs = [], [], []
        
        with torch.no_grad():
            for data, target in self.test_loader:
                data, target = data.to(self._device), target.to(self._device)
                output = self.model(data)

                if self.dimension == 'valence':
                    target = target[:, 0]
                elif self.dimension == 'arousal':
                    target = target[:, 1]

                loss = self._criterion(output.squeeze(), target)
                test_losses.append(loss.item())
                test_targets.extend(target.detach().cpu().numpy())
                test_outputs.extend(output.detach().cpu().numpy())
        
        test_loss = np.mean(test_losses)
        test_mse, test_r2 = self.calculate_metrics(test_targets, test_outputs)
        self.test_mse.append(test_mse)
        self.test_r2.append(test_r2)

        print(f"Validation - Dimension: {self.dimension}, Test Loss: {test_loss:.4f}, Test R2: {test_r2:.4f}")
        return test_loss, test_mse, test_r2

In [9]:
import gc

gc.collect()

0

In [10]:
import json
import torch
import os

class Args:
    def __init__(self, dictionary):
        for key, value in dictionary.items():
            if "params_dict" in key and isinstance(value, str):
                setattr(self, key, json.loads(value))
            else:
                setattr(self, key, value)

def train_for_dimension(dimension, args_dict):
    args_dict["dimension"] = dimension
    args_dict["models_dir"] = f"conv1D_features_augmented/base/{dimension}"
    if dimension == "valence":
        args_dict["lr_init"] = 0.001
        args_dict["num_epochs"] = 100
        args_dict["weight_decay"] = 0.005
        valence_params = json.loads(args_dict["valence_params_dict"])
        valence_params["num_hidden"] = 64
        args_dict["valence_params_dict"] = json.dumps(valence_params)
    elif dimension == "arousal":
        args_dict["lr_init"] = 0.001
        args_dict["num_epochs"] = 100
        args_dict["weight_decay"] = 0.01
        # Update arousal specific parameters
        arousal_params = json.loads(args_dict["arousal_params_dict"])
        arousal_params["num_hidden"] = 128
        args_dict["arousal_params_dict"] = json.dumps(arousal_params)
    
    args = Args(args_dict)
    trainer = Trainer(args)
    trainer.train_1d()
    trainer.validate_1d()
    trainer.save_model()

args_json = """
{
    "data_dir": "",
    "deam_dir":  "waveforms/",
    "font_dir": "Font",
    "models_dir": "",
    "plots_dir": "Plots",
    "audio_extension": "mp3",
    "sample_rate": 44100,
    "device": "cuda",
    "mode": "train",
    "dimension": "valence",
    "params_dict": "{\\"in_ch\\": 39, \\"num_filters1\\": 32, \\"num_filters2\\": 64, \\"num_hidden\\": 128, \\"out_size\\": 2}",
    "valence_params_dict": "{\\"in_ch\\": 39, \\"num_filters1\\": 32, \\"num_filters2\\": 64, \\"num_hidden\\": 64, \\"out_size\\": 1}",
    "arousal_params_dict": "{\\"in_ch\\": 39, \\"num_filters1\\": 32, \\"num_filters2\\": 32, \\"num_hidden\\": 64, \\"out_size\\": 1}",
    "lr_init": 0.001,
    "lr_decay": 0.1,
    "decay_interval": 1000,
    "weight_decay": 0.01,
    "num_epochs": 50,
    "log_interval": 1
}
"""

args_dict = json.loads(args_json)


In [11]:
train_for_dimension("valence", args_dict)

Training for valence...
Validation - Dimension: valence, Test Loss: 0.4943, Test R2: 0.5242
Model saved to conv1D_features_augmented/base/valence\model_valence.pth


In [12]:
train_for_dimension("arousal", args_dict)

Training for arousal...
Validation - Dimension: arousal, Test Loss: 0.6772, Test R2: 0.6135
Model saved to conv1D_features_augmented/base/arousal\model_arousal.pth


In [13]:
import torch
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.cluster import KMeans
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import seaborn as sns
import math
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, f1_score, precision_score

kmeans = KMeans()

emotions = {
    "Sleepy": {"valence": 0.01, "arousal": -1.00},
    "Tired": {"valence": -0.01, "arousal": -1.00},
    "Afraid": {"valence": -0.12, "arousal": 0.79},  
    "Angry":{"valence": -0.40, "arousal": 0.79},
    "Calm":{"valence": 0.78, "arousal": -0.68},
    "Relaxed":{"valence": 0.71, "arousal": -0.65},
    "Content":{"valence": 0.81, "arousal": -0.55},
    "Depressed":{"valence": -0.81, "arousal": -0.48},
    "Discontent":{"valence": -0.68, "arousal": -0.32},
    "Determined":{"valence": 0.73, "arousal": 0.26},
    "Happy":{"valence": 0.89, "arousal": 0.17},
    "Anxious":{"valence": -0.72, "arousal": -0.80},
    "Good":{"valence": 0.90, "arousal": -0.08},
    "Pensive":{"valence": 0.03, "arousal": -0.60},
    "Impressed":{"valence": 0.39, "arousal": -0.06},
    "Frustrated":{"valence": -0.60, "arousal": 0.40},
    "Disappointed":{"valence": -0.80, "arousal": -0.03},
    "Bored":{"valence": -0.35, "arousal": -0.78},
    "Annoyed":{"valence": -0.44, "arousal": 0.76},
    "Enraged":{"valence": -0.18, "arousal": 0.83},
    "Excited":{"valence": 0.70, "arousal": 0.71},
    "Melancholy":{"valence": -0.05, "arousal": -0.65},
    "Satisfied":{"valence": 0.77, "arousal": -0.63},
    "Distressed":{"valence": -0.71, "arousal": 0.55},
    "Uncomfortable":{"valence": -0.68, "arousal": -0.37},
    "Worried":{"valence": -0.07, "arousal": -0.32},
    "Amused":{"valence": 0.55, "arousal": 0.19},
    "Apathetic":{"valence": -0.20, "arousal": -0.12},
    "Peaceful":{"valence": 0.55, "arousal": -0.80},
    "Contemplative":{"valence": 0.58, "arousal": -0.60},
    "Embarrassed":{"valence": -0.31, "arousal": -0.60},
    "Sad":{"valence": -0.81, "arousal": -0.40},
    "Hopeful":{"valence": 0.61, "arousal": -0.30},
    "Pleased":{"valence": 0.89, "arousal": -0.10},
}

def find_emotion(valence, arousal):
    closest_emotion = None
    min_distance = math.inf
    
    for emotion, scores in emotions.items():
        distance = math.sqrt((valence - scores["valence"])**2 + (arousal - scores["arousal"])**2)
        
        if distance < min_distance:
            min_distance = distance
            closest_emotion = emotion
            
    return closest_emotion

clustered_emotions = {'blue': ['Determined',
  'Happy',
  'Good',
  'Impressed',
  'Excited',
  'Amused',
  'Hopeful',
  'Pleased'],
 'red': ['Depressed',
  'Discontent',
  'Anxious',
  'Disappointed',
  'Bored',
  'Uncomfortable',
  'Worried',
  'Apathetic',
  'Embarrassed',
  'Sad'],
 'green': ['Afraid', 'Angry', 'Frustrated', 'Annoyed', 'Enraged', 'Distressed'],
 'purple': ['Sleepy',
  'Tired',
  'Calm',
  'Relaxed',
  'Content',
  'Pensive',
  'Melancholy',
  'Satisfied',
  'Peaceful',
  'Contemplative']}

  
def get_colormap(valence, arousal):
    emotion = find_emotion(valence, arousal)
    for color, emotion_list in clustered_emotions.items():
        if emotion in emotion_list:
            return color  
    return None

def normalize_value(value):
    return (value - 1) / 4 - 1

def evaluate_regression_metrics(valence_model, arousal_model, test_loader, device):
    valence_model.eval()
    arousal_model.eval()
    valence_predictions, valence_targets = [], []
    arousal_predictions, arousal_targets = [], []

    with torch.no_grad():
        for data, true_valence_arousal in test_loader:
            data = data.to(device)
            true_valence, true_arousal = true_valence_arousal[:, 0].cpu().numpy(), true_valence_arousal[:, 1].cpu().numpy()

            valence_outputs = valence_model(data).cpu().detach().numpy().squeeze()
            arousal_outputs = arousal_model(data).cpu().detach().numpy().squeeze()
            valence_predictions.extend(normalize_value(valence_outputs))
            arousal_predictions.extend(normalize_value(arousal_outputs))
            valence_targets.extend(normalize_value(true_valence))
            arousal_targets.extend(normalize_value(true_arousal))

    valence_mse = mean_squared_error(valence_targets, valence_predictions)
    arousal_mse = mean_squared_error(arousal_targets, arousal_predictions)
    
    valence_mae = mean_absolute_error(valence_targets, valence_predictions)
    arousal_mae = mean_absolute_error(arousal_targets, arousal_predictions)
    
    valence_r2 = r2_score(valence_targets, valence_predictions)
    arousal_r2 = r2_score(arousal_targets, arousal_predictions)
    
    print(f"Valence MSE: {valence_mse:.4f}, MAE: {valence_mae:.4f}, R²: {valence_r2:.4f}")
    print(f"Arousal MSE: {arousal_mse:.4f}, MAE: {arousal_mae:.4f}, R²: {arousal_r2:.4f}")


def evaluate_combined_models(valence_model, arousal_model, test_loader, device):
    valence_model.eval()
    arousal_model.eval()
    predictions, targets = [], []

    with torch.no_grad():
        for data, true_valence_arousal in test_loader:
            data = data.to(device)
            true_valence, true_arousal = true_valence_arousal[:, 0].cpu().numpy(), true_valence_arousal[:, 1].cpu().numpy()

            valence_outputs = valence_model(data).cpu().detach().numpy().squeeze()
            arousal_outputs = arousal_model(data).cpu().detach().numpy().squeeze()

            for i in range(len(valence_outputs)):
                valence_outputs[i] = normalize_value(valence_outputs[i])
                arousal_outputs[i] = normalize_value(arousal_outputs[i])
                true_valence[i] = normalize_value(true_valence[i])
                true_arousal[i] = normalize_value(true_arousal[i])

                predicted_color = get_colormap(valence_outputs[i], arousal_outputs[i])
                true_color = get_colormap(true_valence[i], true_arousal[i])

                predictions.append(predicted_color)
                targets.append(true_color)

    accuracy = accuracy_score(targets, predictions)
    f1 = f1_score(targets, predictions, average='macro')
    precision = precision_score(targets, predictions, average='macro')

    print(f"Accuracy of emotion color classification: {accuracy:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"Precision: {precision:.4f}")

    all_colors = ['blue', 'red', 'green', 'purple']
    cm = confusion_matrix(targets, predictions, labels=all_colors)

    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=all_colors, yticklabels=all_colors)
    plt.xlabel('Predicted Emotion Colors')
    plt.ylabel('True Emotion Colors')
    plt.title('Confusion Matrix of Emotion Colors')
    plt.show()


args = Args(args_dict)
valence_dict = args.valence_params_dict
arousal_dict = args.arousal_params_dict
valence_path = 'conv1D_features_augmented/base/valence/model_valence.pth'
arousal_path = 'conv1D_features_augmented/attention/grid_search/model_arousal.pth'

valence_model = AudioNet(valence_dict)
arousal_model = AudioNet(arousal_dict)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
valence_model = valence_model.to(device)
arousal_model = arousal_model.to(device)
valence_model.load_state_dict(torch.load(valence_path, map_location=device))
arousal_model.load_state_dict(torch.load(arousal_path, map_location=device))

evaluate_combined_models(valence_model, arousal_model, test_loader, 'cuda')
evaluate_regression_metrics(valence_model, arousal_model, test_loader, 'cuda')

FileNotFoundError: [Errno 2] No such file or directory: 'conv1D_features_augmented/attention/grid_search/model_arousal.pth'

# Attention Mechanism

In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Attention(nn.Module):
    def __init__(self, feature_dim):
        super(Attention, self).__init__()
        self.feature_dim = feature_dim
        self.attention = nn.Sequential(
            nn.Linear(feature_dim, 64),
            nn.ReLU(inplace=True),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        scores = self.attention(x)
        alpha = F.softmax(scores, dim=1)
        attended_features = x * alpha
        return attended_features.view(-1, self.feature_dim)

class AudioNet(nn.Module):
    def __init__(self, params_dict):
        super(AudioNet, self).__init__()
        self.in_ch = params_dict.get('in_ch', 1)
        self.num_filters1 = params_dict.get('num_filters1', 32)
        self.num_filters2 = params_dict.get('num_filters2', 64)
        self.num_hidden = params_dict.get('num_hidden', 128)
        self.out_size = params_dict.get('out_size', 1)

        self.conv1 = nn.Sequential(
            nn.Conv1d(self.in_ch, self.num_filters1, kernel_size=10, stride=1),
            nn.BatchNorm1d(self.num_filters1),
            nn.ReLU(inplace=True),
            nn.AvgPool1d(kernel_size=2, stride=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv1d(self.num_filters1, self.num_filters2, kernel_size=10, stride=1),
            nn.BatchNorm1d(self.num_filters2),
            nn.ReLU(inplace=True),
            nn.AvgPool1d(kernel_size=2, stride=2)
        )
        self.pool = nn.AvgPool1d(kernel_size=10, stride=10)

        self._to_linear = None
        self.attention = Attention(self._get_to_linear())

        self.fc1 = nn.Linear(self._get_to_linear(), self.num_hidden)
        self.fc2 = nn.Linear(self.num_hidden, self.out_size)
        self.drop = nn.Dropout(p=0.5)
        self.act = nn.ReLU(inplace=True)

    def _get_to_linear(self):
        if self._to_linear is None:
            x = torch.randn(1, self.in_ch, 4501)
            with torch.no_grad():
                x = self.conv1(x)
                x = self.conv2(x)
                x = self.pool(x)
                self._to_linear = x.numel() // x.shape[0]
        return self._to_linear

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.pool(x)
        x = x.view(-1, self._get_to_linear())
        x = self.attention(x)
        x = self.fc1(x)
        x = self.drop(x)
        x = self.act(x)
        x = self.fc2(x)
        return x.to(x.device)

In [8]:
import os
import torch
import torch.nn as nn
from sklearn.metrics import mean_squared_error, r2_score
import numpy as np
import random

class Trainer:
    def __init__(self, args):

        self.dimension = args.dimension
        self.num_epochs = getattr(args, 'num_epochs', 50)
        self._data_dir = args.data_dir
        self._models_dir = args.models_dir
        self._plots_dir = args.plots_dir
        self._device = torch.device(args.device if torch.cuda.is_available() else 'cpu')

        self.num_epochs = args.num_epochs
        self.log_interval = args.log_interval

        self._lr = args.lr_init
        self._lr_decay = args.lr_decay
        self._weight_decay = args.weight_decay
        self.train_mse = []
        self.train_r2 = []
        self.test_mse = []
        self.test_r2 = []

        self.train_loader, self.test_loader = train_loader, test_loader
        if self.dimension == 'valence':
            self._params_dict = args.valence_params_dict
        elif self.dimension == 'arousal':
            self._params_dict = args.arousal_params_dict
        else:
            self._params_dict = args.params_dict

        self.model = AudioNet(self._params_dict).to(self._device)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self._lr, weight_decay=self._weight_decay)
        self._criterion = nn.MSELoss()

        if self.dimension == 'both':
            self.train_dict = {'valence_loss': [], 'arousal_loss': []}
            self.test_dict = {'valence_loss': [], 'arousal_loss': []}
        else:
            self.train_dict = {'loss': []}
            self.test_dict = {'loss': []}

    def print_progress(self, epoch, train_loss, train_r2, test_loss, test_r2):
        print(f"Epoch: {epoch}/{self.num_epochs}, Train Loss: {train_loss:.4f}, Train R2: {train_r2:.4f}, "
              f"Train MSE: {test_loss:.4f}")

    def calculate_metrics(self, true_values, predictions):
        true_values = np.array(true_values)
        predictions = np.array(predictions)
        mse = mean_squared_error(true_values, predictions)
        r2 = r2_score(true_values, predictions)
        return mse, r2

    def save_model(self):
        os.makedirs(self._models_dir, exist_ok=True)
        model_path = os.path.join(self._models_dir, f'model_{self.dimension}.pth')
        torch.save(self.model.state_dict(), model_path)
        print(f"Model saved to {model_path}")


    def update_learning_rate(self):
        self._lr *= self._lr_decay

        for param_group in self.optimizer.param_groups:
            param_group['lr'] = self._lr
        success_message = 'Learning rate updated to {:.1e}'.format(self._lr)
        print(success_message)


    def train_1d(self):
        print(f"Training for {self.dimension}...")
        for epoch in range(1, self.num_epochs + 1):
            self.model.train()
            train_losses, train_targets, train_outputs = [], [], []
            
            for data, target in self.train_loader:
                data, target = data.to(self._device), target.to(self._device)
                self.optimizer.zero_grad()
                output = self.model(data)
                if self.dimension == 'valence' or self.dimension == 'arousal':
                    target = target[:, 0 if self.dimension == 'valence' else 1]
                output = output.squeeze()
                loss = self._criterion(output, target)
                loss.backward()
                self.optimizer.step()

                train_losses.append(loss.item())
                train_targets.extend(target.detach().cpu().numpy())
                train_outputs.extend(output.detach().cpu().numpy())
            
            train_loss = np.mean(train_losses)
            train_mse, train_r2 = self.calculate_metrics(train_targets, train_outputs)
            self.train_mse.append(train_mse)
            self.train_r2.append(train_r2)
            
            print(f"Epoch {epoch} training results: Train R²: {train_r2:.4f}")
    
    def validate_1d(self):
        self.model.eval()
        test_losses, test_targets, test_outputs = [], [], []
        
        with torch.no_grad():
            for data, target in self.test_loader:
                data, target = data.to(self._device), target.to(self._device)
                output = self.model(data)
                if self.dimension == 'valence' or self.dimension == 'arousal':
                    target = target[:, 0 if self.dimension == 'valence' else 1]
                output = output.squeeze()
                loss = self._criterion(output, target)
                test_losses.append(loss.item())
                test_targets.extend(target.detach().cpu().numpy())
                test_outputs.extend(output.detach().cpu().numpy())
            
            test_loss = np.mean(test_losses)
            test_mse, test_r2 = self.calculate_metrics(test_targets, test_outputs)
            self.test_mse.append(test_mse)
            self.test_r2.append(test_r2)

            print(f"Validation - Dimension: {self.dimension}, Test Loss: {test_loss:.4f}, Test R²: {test_r2:.4f}")
            print("Sample of 10 random test results:")
            sample_indices = random.sample(range(len(test_targets)), 10)
            for idx in sample_indices:
                print(f"True: {test_targets[idx]:.4f}, Predicted: {test_outputs[idx]:.4f}")
            return test_loss, test_mse, test_r2

In [9]:
import json
import torch
import os

class Args:
    def __init__(self, dictionary):
        for key, value in dictionary.items():
            if "params_dict" in key and isinstance(value, str):
                setattr(self, key, json.loads(value))
            else:
                setattr(self, key, value)

def train_for_dimension(dimension, args_dict):
    args_dict["dimension"] = dimension
    args_dict["models_dir"] = f"New_Final/{dimension}"
    # Apply best hyperparameters based on dimension
    if dimension == "valence":
        args_dict["lr_init"] = 0.001
        args_dict["num_epochs"] = 100
        args_dict["weight_decay"] = 0.005
        valence_params = json.loads(args_dict["valence_params_dict"])
        valence_params["num_hidden"] = 64
        args_dict["valence_params_dict"] = json.dumps(valence_params)
    elif dimension == "arousal":
        args_dict["lr_init"] = 0.001
        args_dict["num_epochs"] = 100
        args_dict["weight_decay"] = 0.01
        # Update arousal specific parameters0
        arousal_params = json.loads(args_dict["arousal_params_dict"])
        arousal_params["num_hidden"] = 128
        args_dict["arousal_params_dict"] = json.dumps(arousal_params)
    
    args = Args(args_dict)
    trainer = Trainer(args)
    trainer.train_1d()
    trainer.validate_1d()
    trainer.save_model()


args_json = """
{
    "data_dir": "",
    "deam_dir":  "waveforms/",
    "font_dir": "Font",
    "models_dir": "",
    "plots_dir": "Plots",
    "audio_extension": "mp3",
    "sample_rate": 44100,
    "device": "cuda",
    "mode": "train",
    "dimension": "valence",
    "params_dict": "{\\"in_ch\\": 39, \\"num_filters1\\": 32, \\"num_filters2\\": 64, \\"num_hidden\\": 128, \\"out_size\\": 2, \\"hidden_size\\": 128}",
    "valence_params_dict": "{\\"in_ch\\": 39, \\"num_filters1\\": 32, \\"num_filters2\\": 64, \\"num_hidden\\": 64, \\"out_size\\": 1, \\"hidden_size\\": 64}",
    "arousal_params_dict": "{\\"in_ch\\": 39, \\"num_filters1\\": 32, \\"num_filters2\\": 32, \\"num_hidden\\": 64, \\"out_size\\": 1, \\"hidden_size\\": 64}",
    "lr_init": 0.001,
    "lr_decay": 0.1,
    "decay_interval": 1000,
    "weight_decay": 0.01,
    "num_epochs": 150,
    "log_interval": 1
}
"""

args_dict = json.loads(args_json)

In [10]:
train_for_dimension("valence", args_dict)

Training for valence...
Epoch 1 training results: Train R²: -1.8708
Epoch 2 training results: Train R²: -1.0567
Epoch 3 training results: Train R²: -0.9638
Epoch 4 training results: Train R²: -0.8508
Epoch 5 training results: Train R²: -0.7238
Epoch 6 training results: Train R²: -0.6208
Epoch 7 training results: Train R²: -0.5414
Epoch 8 training results: Train R²: -0.5364
Epoch 9 training results: Train R²: -0.4994
Epoch 10 training results: Train R²: -0.4321
Epoch 11 training results: Train R²: -0.3587
Epoch 12 training results: Train R²: -0.3866
Epoch 13 training results: Train R²: -0.3151
Epoch 14 training results: Train R²: -0.2904
Epoch 15 training results: Train R²: -0.2532
Epoch 16 training results: Train R²: -0.2284
Epoch 17 training results: Train R²: -0.2054
Epoch 18 training results: Train R²: -0.1694
Epoch 19 training results: Train R²: -0.0838
Epoch 20 training results: Train R²: -0.1044
Epoch 21 training results: Train R²: -0.0814
Epoch 22 training results: Train R²: -0.

In [11]:
train_for_dimension("arousal", args_dict)

Training for arousal...
Epoch 1 training results: Train R²: -0.5020
Epoch 2 training results: Train R²: -0.0376
Epoch 3 training results: Train R²: 0.0266
Epoch 4 training results: Train R²: 0.0849
Epoch 5 training results: Train R²: 0.1639
Epoch 6 training results: Train R²: 0.2029
Epoch 7 training results: Train R²: 0.2290
Epoch 8 training results: Train R²: 0.2446
Epoch 9 training results: Train R²: 0.2758
Epoch 10 training results: Train R²: 0.2605
Epoch 11 training results: Train R²: 0.3156
Epoch 12 training results: Train R²: 0.2827
Epoch 13 training results: Train R²: 0.3249
Epoch 14 training results: Train R²: 0.3550
Epoch 15 training results: Train R²: 0.3759
Epoch 16 training results: Train R²: 0.3589
Epoch 17 training results: Train R²: 0.3990
Epoch 18 training results: Train R²: 0.3604
Epoch 19 training results: Train R²: 0.3460
Epoch 20 training results: Train R²: 0.4179
Epoch 21 training results: Train R²: 0.4365
Epoch 22 training results: Train R²: 0.4301
Epoch 23 traini

# Grid Search

In [10]:
import os
import torch
import torch.nn as nn
from sklearn.metrics import mean_squared_error, r2_score

class Trainer:
    def __init__(self, args):

        self.dimension = args.dimension
        self.num_epochs = getattr(args, 'num_epochs', 50)
        self._data_dir = args.data_dir
        self._models_dir = args.models_dir
        self._plots_dir = args.plots_dir
        self._device = torch.device(args.device if torch.cuda.is_available() else 'cpu')

        self.num_epochs = args.num_epochs
        self.log_interval = args.log_interval

        self._lr = args.lr_init
        self._lr_decay = args.lr_decay
        self._weight_decay = args.weight_decay
        self.train_mse = []
        self.train_r2 = []
        self.test_mse = []
        self.test_r2 = []

        self.train_loader, self.test_loader = train_loader, test_loader
        if self.dimension == 'valence':
            self._params_dict = args.valence_params_dict
        elif self.dimension == 'arousal':
            self._params_dict = args.arousal_params_dict
        else:
            self._params_dict = args.params_dict

        self.model = AudioNet(self._params_dict).to(self._device)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self._lr, weight_decay=self._weight_decay)
        self._criterion = nn.MSELoss()

        if self.dimension == 'both':
            self.train_dict = {'valence_loss': [], 'arousal_loss': []}
            self.test_dict = {'valence_loss': [], 'arousal_loss': []}
        else:
            self.train_dict = {'loss': []}
            self.test_dict = {'loss': []}

    def print_progress(self, epoch, train_loss, train_r2, test_loss, test_r2):
        print(f"Epoch: {epoch}/{self.num_epochs}, Train Loss: {train_loss:.4f}, Train R2: {train_r2:.4f}, "
              f"Train MSE: {test_loss:.4f}")

    def calculate_metrics(self, true_values, predictions):
        true_values = np.array(true_values)
        predictions = np.array(predictions)
        mse = mean_squared_error(true_values, predictions)
        r2 = r2_score(true_values, predictions)
        return mse, r2

    def save_model(self, model_path):
        torch.save(self.model.state_dict(), model_path)
        print(f"Model saved to {model_path}")


    def update_learning_rate(self):
        self._lr *= self._lr_decay

        for param_group in self.optimizer.param_groups:
            param_group['lr'] = self._lr
        success_message = 'Learning rate updated to {:.1e}'.format(self._lr)
        print(success_message)


    def train_1d(self):
        print(f"Training for {self.dimension}...")
        for epoch in range(1, self.num_epochs + 1):
            self.model.train()
            train_losses, train_targets, train_outputs = [], [], []
            
            for data, target in self.train_loader:
                data, target = data.to(self._device), target.to(self._device)
                self.optimizer.zero_grad()
                if torch.isnan(data).any():
                    print('NaN values detected in data tensor.')
                    print(data)
                output = self.model(data)
                if self.dimension == 'valence':
                    target = target[:, 0]
                elif self.dimension == 'arousal':
                    target = target[:, 1]

                loss = self._criterion(output.squeeze(), target)
                loss.backward()
                self.optimizer.step()

                train_losses.append(loss.item())
                train_targets.extend(target.detach().cpu().numpy())
                train_outputs.extend(output.detach().cpu().numpy())
            
            train_loss = np.mean(train_losses)
            train_mse, train_r2 = self.calculate_metrics(train_targets, train_outputs)
            self.train_mse.append(train_mse)
            self.train_r2.append(train_r2)


    def validate_1d(self):
        self.model.eval()
        test_losses, test_targets, test_outputs = [], [], []
        
        with torch.no_grad():
            for data, target in self.test_loader:
                data, target = data.to(self._device), target.to(self._device)
                output = self.model(data)

                if self.dimension == 'valence':
                    target = target[:, 0]
                elif self.dimension == 'arousal':
                    target = target[:, 1]

                loss = self._criterion(output.squeeze(), target)
                test_losses.append(loss.item())
                test_targets.extend(target.detach().cpu().numpy())
                test_outputs.extend(output.detach().cpu().numpy())
        
        test_loss = np.mean(test_losses)
        test_mse, test_r2 = self.calculate_metrics(test_targets, test_outputs)
        self.test_mse.append(test_mse)
        self.test_r2.append(test_r2)

        print(f"Validation - Dimension: {self.dimension}, Test Loss: {test_loss:.4f}, Test R2: {test_r2:.4f}")
        return test_loss, test_mse, test_r2

In [11]:
import itertools
import json
import torch

hyperparams_grid = {
    'lr': [0.001, 0.005],
    'num_hidden': [64, 128],
    'weight_decay': [0.01],
    'num_epochs': [50, 100]
}

args_dict = {
    "data_dir": "",
    "deam_dir":  "waveforms/",
    "font_dir": "Font",
    "models_dir": "conv1D_features_augmented/attention/grid_search",
    "plots_dir": "Plots",
    "audio_extension": "mp3",
    "sample_rate": 44100,
    "device": "cuda",
    "mode": "train",
    "dimension": "valence",
    "params_dict": {"in_ch": 39, "num_filters1": 32, "num_filters2": 64, "num_hidden": 128, "out_size": 2},
    "valence_params_dict": {"in_ch": 39, "num_filters1": 32, "num_filters2": 64, "num_hidden": 64, "out_size": 1},
    "arousal_params_dict": {"in_ch": 39, "num_filters1": 32, "num_filters2": 32, "num_hidden": 64, "out_size": 1},
    "lr_init": 0.001,
    "lr_decay": 0.1,
    "decay_interval": 1000,
    "weight_decay": 0.01,
    "num_epochs": 50,
    "log_interval": 1
}

class Args:
    def __init__(self, dictionary):
        for key, value in dictionary.items():
            setattr(self, key, value)
        self.update_params_dict()

    def update_params_dict(self):
        if "params_dict" in self.__dict__ and isinstance(self.params_dict, str):
            self.params_dict = json.loads(self.params_dict)
        if self.dimension == 'valence':
            self.params_dict = self.valence_params_dict
        elif self.dimension == 'arousal':
            self.params_dict = self.arousal_params_dict

def grid_search_dimension(args_dict, hyperparams_grid, dimension):
    grid_combinations = list(itertools.product(*hyperparams_grid.values()))
    best_val_score = float('inf')
    best_params = None
    best_model_dir = ""

    args_dict_copy = args_dict.copy()
    args_dict_copy['dimension'] = dimension

    for combination in grid_combinations:
        hyperparams = dict(zip(hyperparams_grid.keys(), combination))
        for key, value in hyperparams.items():
            args_dict_copy[key] = value
            if key == 'num_hidden':
                args_dict_copy['params_dict']['num_hidden'] = value
            elif key == 'lr':
                args_dict_copy['lr_init'] = value

        model_dir = args_dict_copy["models_dir"]
        specific_model_dir = os.path.join(model_dir, f"best_{dimension}_model")
        print(specific_model_dir) 

        args = Args(args_dict_copy)
        trainer = Trainer(args)

        trainer.train_1d()
        val_loss, _, _ = trainer.validate_1d()
        if val_loss < best_val_score:
            best_val_score = val_loss
            best_params = hyperparams
            best_model_dir = specific_model_dir
            path = os.path.join(best_model_dir)
            trainer.save_model(path)

    print(f"Best hyperparameters for {dimension}:", best_params)  
    print(f"Model saved in {best_model_dir}")

In [None]:
grid_search_dimension(args_dict, hyperparams_grid, 'valence')

In [None]:
grid_search_dimension(args_dict, hyperparams_grid, 'arousal')