## Libraries

In [1]:
# [IMPORTS] Core & Audio Processing
import os
import glob
import random
import numpy as np
import pandas as pd
from tqdm import tqdm
import copy

# [IMPORTS] Audio & Visualization
import librosa
import librosa.display
import matplotlib.pyplot as plt
import IPython.display as ipd

# [IMPORTS] PyTorch & ML
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio.transforms as T
import torch.nn.init as init
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts
from sklearn.svm import SVR
import xgboost as xgb
from xgboost import XGBRegressor
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest, mutual_info_regression
from sklearn.pipeline import Pipeline

# [IMPORTS] Metrics
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.model_selection import train_test_split
from scipy.stats import pearsonr

# [IMPORTS] Other
from google.colab import drive
import warnings
warnings.filterwarnings('ignore')


## Dataset loading from google drive



In [2]:
drive.mount('/content/drive',force_remount=True)

Mounted at /content/drive


In [3]:
import tarfile
# Percorsi di origine e destinazione
drive_root = '/content/drive/MyDrive/mel_sequences_final'
base_output = '/content/mel_dataset'

# Cartelle di output organizzate
split_dirs = {
    'train': os.path.join(base_output, 'train'),
    'valid': os.path.join(base_output, 'valid'),
    'test': os.path.join(base_output, 'test')
}

# Crea/Reset cartelle di output
for split, out_dir in split_dirs.items():
    os.makedirs(out_dir, exist_ok=True)

# Mappa dei file da estrarre in ciascuna cartella
tar_map = {
    'mel_sequences_train.tar.gz': split_dirs['train'],
    'mel_sequences_valid.tar.gz': split_dirs['valid'],
    'mel_sequences_test.tar.gz':  split_dirs['test'],
}

# Estrazione per split
for tar_name, output_dir in tar_map.items():
    tar_path = os.path.join(drive_root, tar_name)
    print(f"üì¶ Estraendo {tar_name} in {output_dir}...")
    with tarfile.open(tar_path, 'r:gz') as tar:
        tar.extractall(path=output_dir)

print("Estrazione completata. Cartelle create:")
for split, path in split_dirs.items():
    print(f"  üìÅ {split}: {path}")

üì¶ Estraendo mel_sequences_train.tar.gz in /content/mel_dataset/train...
üì¶ Estraendo mel_sequences_valid.tar.gz in /content/mel_dataset/valid...
üì¶ Estraendo mel_sequences_test.tar.gz in /content/mel_dataset/test...
Estrazione completata. Cartelle create:
  üìÅ train: /content/mel_dataset/train
  üìÅ valid: /content/mel_dataset/valid
  üìÅ test: /content/mel_dataset/test


In [4]:
class MelSequenceDataset(Dataset):

    def __init__(self, mel_dir, transform=None):
        self.files = sorted(glob.glob(os.path.join(mel_dir, '*.npz')))
        self.transform = transform

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        data = np.load(self.files[idx])
        mel = data['mel']
        label = data['label']
        if self.transform:
            mel = self.transform(mel)
        return torch.tensor(mel, dtype=torch.float32), torch.tensor(label, dtype=torch.float32)



In [5]:
train_dataset = MelSequenceDataset('/content/mel_dataset/train')
valid_dataset = MelSequenceDataset('/content/mel_dataset/valid')
test_dataset  = MelSequenceDataset('/content/mel_dataset/test')

In [6]:
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
devel_loader = DataLoader(valid_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

## Model

In [7]:
import torch
import torch.nn as nn
import torch.nn.init as init

class CNNRNNAttention(nn.Module):
    def __init__(self, n_mels=128, hidden_size=128, n_lstm_layers=2):
        super(CNNRNNAttention, self).__init__()

        self.conv_block = nn.Sequential(
            nn.Conv2d(1, 8, kernel_size=3, padding=1),
            nn.BatchNorm2d(8),
            nn.ELU(),
            nn.MaxPool2d(2),

            nn.Conv2d(8, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ELU(),
            nn.MaxPool2d(2),

            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ELU(),
            nn.MaxPool2d(4),

        )

        self.rnn = nn.LSTM(input_size=512, hidden_size=hidden_size,
                           num_layers=n_lstm_layers, batch_first=True, bidirectional=True)

        self.attn = nn.Linear(2*hidden_size, 1)

        self.fc_valence = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(4*hidden_size, 1),
        )

        self.initialize_weights()

    def forward(self, x):
        batch_size, seq_len, C, H, W = x.shape

        x = x.view(batch_size * seq_len, C, H, W)
        x = self.conv_block(x)

        x = x.view(batch_size, seq_len, -1)
        rnn_out, _ = self.rnn(x)

        scores = torch.tanh(self.attn(rnn_out))
        alpha  = torch.softmax(scores, dim=1)
        context = torch.sum(rnn_out * alpha, dim=1)

        last_out=rnn_out[:, -1, :]
        out=torch.cat([context,last_out], dim=1)

        valence_output = self.fc_valence(out)

        return valence_output


    def initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)

            elif isinstance(m, nn.Linear):
                init.kaiming_normal_(m.weight, nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)

            elif isinstance(m, nn.LSTM):
                for name, param in m.named_parameters():
                    if 'weight_ih' in name:
                        init.xavier_uniform_(param.data)
                    elif 'weight_hh' in name:
                        init.orthogonal_(param.data)
                    elif 'bias' in name:
                        param.data.fill_(0)
                        # Inizializzazione dei forget gate bias a 1
                        n = param.size(0)
                        param.data[n//4:n//2].fill_(1)

            elif isinstance(m, nn.BatchNorm2d) or isinstance(m, nn.BatchNorm1d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)


    def extract_features(model, x):

      with torch.no_grad():
        batch_size, seq_len, C, H, W = x.shape
        x = x.view(batch_size * seq_len, C, H, W)
        x = model.conv_block(x)
        x = x.view(batch_size, seq_len, -1)
        rnn_out, _ = model.rnn(x)
        scores = torch.tanh(model.attn(rnn_out))
        alpha = torch.softmax(scores, dim=1)
        context = torch.sum(rnn_out * alpha, dim=1)
        last_out = rnn_out[:, -1, :]
        out = torch.cat([context, last_out], dim=1)
        return out


In [8]:
import os
import numpy as np
import torch

def extract_and_save_batches(model, dataloader, save_dir, prefix='train', device='cuda'):
    model.eval()
    os.makedirs(save_dir, exist_ok=True)

    with torch.no_grad():
        for i, (x, y) in enumerate(dataloader):
            x, y = x.to(device), y.to(device)
            feats = model.extract_features(x).cpu().numpy()
            labels = y[:, -1, 0].cpu().numpy()

            np.save(os.path.join(save_dir, f'{prefix}_features_batch_{i}.npy'), feats)
            np.save(os.path.join(save_dir, f'{prefix}_labels_batch_{i}.npy'), labels)

            print(f"Batch {i} salvato.")


In [9]:
def merge_batches_to_npz(save_dir, prefix, out_path):
    feat_files = sorted([f for f in os.listdir(save_dir) if f.startswith(f'{prefix}_features')])
    label_files = sorted([f for f in os.listdir(save_dir) if f.startswith(f'{prefix}_labels')])

    feats = [np.load(os.path.join(save_dir, f)) for f in feat_files]
    labels = [np.load(os.path.join(save_dir, f)) for f in label_files]

    X = np.concatenate(feats, axis=0)
    y = np.concatenate(labels, axis=0)
    np.savez_compressed(out_path, X=X, y=y)
    print(f"Salvato {prefix} in {out_path}")


In [55]:
def ccc(y_true, y_pred, eps=1e-8):
    mean_true = np.mean(y_true)
    mean_pred = np.mean(y_pred)
    var_true = np.var(y_true)
    var_pred = np.var(y_pred)
    covariance = np.mean((y_true - mean_true) * (y_pred - mean_pred))
    ccc = (2 * covariance) / (var_true + var_pred + (mean_true - mean_pred) ** 2 + eps)
    return ccc

In [16]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = CNNRNNAttention().to(device)
model.load_state_dict(torch.load( '/content/best.ckpt', map_location='cpu',weights_only=False)['model_state'])


extract_and_save_batches(model, train_loader, save_dir='features/', prefix='train', device=device)
extract_and_save_batches(model, devel_loader, save_dir='features/', prefix='val', device=device)
extract_and_save_batches(model, test_loader, save_dir='features/', prefix='test', device=device)


Batch 0 salvato.
Batch 1 salvato.
Batch 2 salvato.
Batch 3 salvato.
Batch 4 salvato.
Batch 5 salvato.
Batch 6 salvato.
Batch 7 salvato.
Batch 8 salvato.
Batch 9 salvato.
Batch 10 salvato.
Batch 11 salvato.
Batch 12 salvato.
Batch 13 salvato.
Batch 14 salvato.
Batch 15 salvato.
Batch 16 salvato.
Batch 17 salvato.
Batch 18 salvato.
Batch 19 salvato.
Batch 20 salvato.
Batch 21 salvato.
Batch 22 salvato.
Batch 23 salvato.
Batch 24 salvato.
Batch 25 salvato.
Batch 26 salvato.
Batch 27 salvato.
Batch 28 salvato.
Batch 29 salvato.
Batch 30 salvato.
Batch 31 salvato.
Batch 32 salvato.
Batch 33 salvato.
Batch 34 salvato.
Batch 35 salvato.
Batch 36 salvato.
Batch 37 salvato.
Batch 38 salvato.
Batch 39 salvato.
Batch 40 salvato.
Batch 41 salvato.
Batch 42 salvato.
Batch 43 salvato.
Batch 44 salvato.
Batch 45 salvato.
Batch 46 salvato.
Batch 47 salvato.
Batch 48 salvato.
Batch 49 salvato.
Batch 50 salvato.
Batch 51 salvato.
Batch 52 salvato.
Batch 53 salvato.
Batch 54 salvato.
Batch 55 salvato.
Ba

In [18]:
merge_batches_to_npz('features/', prefix='train', out_path='train_features.npz')
merge_batches_to_npz('features/', prefix='val',   out_path='val_features.npz')
merge_batches_to_npz('features/', prefix='test',  out_path='test_features.npz')


Salvato train in train_features.npz
Salvato val in val_features.npz
Salvato test in test_features.npz


In [47]:
train = np.load('train_features.npz')
X_train, y_train = train['X'], train['y']

val = np.load('val_features.npz')
X_val, y_val = val['X'], val['y']

test = np.load('test_features.npz')
X_test, y_test = test['X'], test['y']


In [20]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.ensemble import VotingRegressor
from sklearn.linear_model import Ridge, Lasso, LinearRegression

class IdentityTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None): return self
    def transform(self, X): return X

In [81]:
scalers = {
    'none': IdentityTransformer(),
    'standard': StandardScaler(),
    'minmax': MinMaxScaler(),
}

reducers = {
    'none': IdentityTransformer(),
    'pca_10': PCA(n_components=10),
    'pca_20': PCA(n_components=20),
    'knn': SelectKBest(mutual_info_regression, k=16)
}


In [79]:
# SVR
svr = SVR(C=1,epsilon=0.5,kernel='poly',degree=5)

# XGBoost
xgboost = xgb.XGBRegressor(n_estimators=50, learning_rate=0.1, max_depth=1, verbosity=0, random_state=42)

# Ensemble tra LinearRegression e SVR
ensemble = VotingRegressor(estimators=[
    ('lr', LinearRegression()),
    ('svr', svr)
])

models = {
   'linear': LinearRegression(),
   'ridge': Ridge(),
   'svr': svr,
   'xgb': xgboost,
   'ensemble': ensemble,
}


In [83]:
import itertools
combinations = list(itertools.product(scalers.items(), reducers.items(), models.items()))



In [50]:
def evaluate_combinations(X_train, y_train, X_test, y_test, selected_combinations, scalers, reducers, models):

    results = []

    for (scaler_name, scaler_obj), (reducer_name, reducer_obj), (model_name, model_obj) in selected_combinations:
        print(f"Testing: scaler={scaler_name}, reducer={reducer_name}, model={model_name}")

        pipe = Pipeline([
            ('scaler', scaler_obj),
            ('reducer', reducer_obj),
            ('model', model_obj)
        ])

        pipe.fit(X_train, y_train)
        y_pred = pipe.predict(X_test)
        ccc_score=ccc(y_test, y_pred)

        results.append({
            'scaler': scaler_name,
            'reducer': reducer_name,
            'model': model_name,
            'ccc': ccc_score
        })

    return pd.DataFrame(results).sort_values(by='ccc')


In [84]:
df_results = evaluate_combinations(X_train, y_train, X_test, y_test, combinations, scalers, reducers, models)
pd.set_option('display.max_rows', None)
print(df_results)

Testing: scaler=none, reducer=none, model=linear
Testing: scaler=none, reducer=none, model=ridge
Testing: scaler=none, reducer=none, model=svr
Testing: scaler=none, reducer=none, model=xgb
Testing: scaler=none, reducer=none, model=ensemble
Testing: scaler=none, reducer=pca_10, model=linear
Testing: scaler=none, reducer=pca_10, model=ridge
Testing: scaler=none, reducer=pca_10, model=svr
Testing: scaler=none, reducer=pca_10, model=xgb
Testing: scaler=none, reducer=pca_10, model=ensemble
Testing: scaler=none, reducer=pca_20, model=linear
Testing: scaler=none, reducer=pca_20, model=ridge
Testing: scaler=none, reducer=pca_20, model=svr
Testing: scaler=none, reducer=pca_20, model=xgb
Testing: scaler=none, reducer=pca_20, model=ensemble
Testing: scaler=none, reducer=knn, model=linear
Testing: scaler=none, reducer=knn, model=ridge
Testing: scaler=none, reducer=knn, model=svr
Testing: scaler=none, reducer=knn, model=xgb
Testing: scaler=none, reducer=knn, model=ensemble
Testing: scaler=standard,