In [58]:
import pandas as pd
import numpy as np
import librosa
import matplotlib.pyplot as plt 
import os
from tqdm import tqdm
import librosa.display

In [None]:
np.complex = complex  # 🔧 Patch for librosa compatibility
# === CONFIG ===
AUDIO_DIR = r"D:\DEAM_audio_wav"       # <-- your .wav folder with 1744 files
OUTPUT_DIR = r"C:\Users\Ashutosh Gupta\OneDrive\문서\emotion_recognition_research\data\NPY_files"     # <-- where you want to save .npy
SAMPLE_RATE = 22050
DURATION = 3  # seconds
N_MELS = 128
HOP_LENGTH = 512
FIXED_SHAPE = (128, 128)

os.makedirs(OUTPUT_DIR, exist_ok=True)

# === Function to extract fixed-size log-mel spectrogram ===
def extract_logmel(file_path, sr=SAMPLE_RATE, duration=DURATION):
    y, _ = librosa.load(file_path, sr=sr, duration=duration)

    # Pad or trim
    desired_len = sr * duration
    if len(y) < desired_len:
        y = np.pad(y, (0, desired_len - len(y)))
    else:
        y = y[:desired_len]

    mel_spec = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=N_MELS, hop_length=HOP_LENGTH)
    log_mel = librosa.power_to_db(mel_spec, ref=np.max)

    # Resize to fixed shape
    if log_mel.shape[1] < FIXED_SHAPE[1]:
        pad_width = FIXED_SHAPE[1] - log_mel.shape[1]
        log_mel = np.pad(log_mel, ((0, 0), (0, pad_width)), mode='constant')
    else:
        log_mel = log_mel[:, :FIXED_SHAPE[1]]

    return log_mel

# === Batch Process and Save ===
all_files = sorted([f for f in os.listdir(AUDIO_DIR) if f.endswith('.wav')])

for file in tqdm(all_files, desc="Extracting Spectrograms"):
    file_path = os.path.join(AUDIO_DIR, file)
    song_id = os.path.splitext(file)[0]
    try:
        mel = extract_logmel(file_path)
        np.save(os.path.join(OUTPUT_DIR, f"{song_id}.npy"), mel)
    except Exception as e:
        print(f"❌ Error processing {file}: {e}")

Extracting Spectrograms: 100%|██████████| 1744/1744 [00:40<00:00, 42.92it/s]


In [60]:
import torch
from torch.utils.data import Dataset

# === CONFIG ===
NPY_DIR = r"C:\Users\Ashutosh Gupta\OneDrive\문서\emotion_recognition_research\data\NPY_files"
LABEL_CSV =r'labels_va.csv'

# === Load labels (valence/arousal mean) ===
label_df = pd.read_csv(LABEL_CSV)
label_df = label_df[['song_id', ' valence_mean', ' arousal_mean']]
label_df['song_id'] = label_df['song_id'].astype(str)
label_map = label_df.set_index('song_id').to_dict('index')

# === Dataset Class ===
class LogMelEmotionDataset(Dataset):
    def __init__(self, npy_dir, label_map, target='valence_mean'):
        self.npy_dir = npy_dir
        self.label_map = label_map
        self.target = target
        self.file_list = [f for f in os.listdir(npy_dir) if f.endswith('.npy')]

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        file = self.file_list[idx]
        song_id = os.path.splitext(file)[0]

        # Load spectrogram
        mel = np.load(os.path.join(self.npy_dir, file))
        mel_tensor = torch.tensor(mel, dtype=torch.float32).unsqueeze(0)  # (1, 128, 128)

        # Get label
        label = self.label_map[song_id][self.target]
        label_tensor = torch.tensor(label, dtype=torch.float32)

        return mel_tensor, label_tensor


In [61]:
from torch.utils.data import DataLoader, random_split

# === Hyperparams ===
BATCH_SIZE = 32
VAL_SPLIT = 0.2
SEED = 42

# === Full Dataset for Valence or Arousal ===
dataset = LogMelEmotionDataset(NPY_DIR, label_map, target=' valence_mean')  # or 'arousal_mean'

# === Split ===
val_len = int(len(dataset) * VAL_SPLIT)
train_len = len(dataset) - val_len

train_set, val_set = random_split(dataset, [train_len, val_len], generator=torch.Generator().manual_seed(SEED))

# === Dataloaders ===
train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_set, batch_size=BATCH_SIZE, shuffle=False)


In [62]:
# === CNN Model Definition ===
import torch.nn as nn
import torch.nn.functional as F

class EmotionCNN(nn.Module):
    def __init__(self):
        super(EmotionCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.pool1 = nn.MaxPool2d(2, 2)

        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.pool2 = nn.MaxPool2d(2, 2)

        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.pool3 = nn.MaxPool2d(2, 2)

        self.dropout = nn.Dropout(0.3)
        self.fc1 = nn.Linear(64 * 16 * 16, 128)
        self.fc2 = nn.Linear(128, 1)  # output: valence or arousal

    def forward(self, x):
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        x = self.pool3(F.relu(self.bn3(self.conv3(x))))
        x = x.view(x.size(0), -1)
        x = self.dropout(F.relu(self.fc1(x)))
        x = self.fc2(x)
        return x.squeeze(1)  # (batch,) for regression


In [63]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# --- Squeeze-and-Excitation Block ---
class SEBlock(nn.Module):
    def __init__(self, channels, reduction=16):
        super(SEBlock, self).__init__()
        self.global_pool = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear(channels, channels // reduction)
        self.fc2 = nn.Linear(channels // reduction, channels)

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.global_pool(x).view(b, c)
        y = F.relu(self.fc1(y))
        y = torch.sigmoid(self.fc2(y)).view(b, c, 1, 1)
        return x * y.expand_as(x)

# --- ADFF-Inspired CNN Model ---
class EmotionCNN_SE_Temporal(nn.Module):
    def __init__(self, input_shape=(1, 128, 128)):
        super(EmotionCNN_SE_Temporal, self).__init__()

        # --- Convolutional blocks with SE Attention ---
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.se1 = SEBlock(16)
        self.pool1 = nn.MaxPool2d(2, 2)

        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.se2 = SEBlock(32)
        self.pool2 = nn.MaxPool2d(2, 2)

        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.se3 = SEBlock(64)
        self.pool3 = nn.MaxPool2d(2, 2)

        # --- Temporal modeling ---
        self.temporal_conv = nn.Conv1d(64, 64, kernel_size=3, padding=1)

        # --- Infer flattened size ---
        dummy = torch.zeros(1, *input_shape)
        with torch.no_grad():
            x = self.forward_features(dummy)
            self.flattened_size = x.shape[1]

        self.dropout = nn.Dropout(0.3)
        self.fc1 = nn.Linear(self.flattened_size, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward_features(self, x):
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.se1(x)

        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        x = self.se2(x)

        x = self.pool3(F.relu(self.bn3(self.conv3(x))))
        x = self.se3(x)

        b, c, h, w = x.shape
        x = x.view(b, c, -1)           # (B, C, Time*Freq)
        x = self.temporal_conv(x)      # Conv1D
        x = F.relu(x)
        x = x.view(b, -1)              # Flatten
        return x

    def forward(self, x):
        x = self.forward_features(x)
        x = self.dropout(F.relu(self.fc1(x)))
        x = self.fc2(x)
        return x.squeeze(1)


In [64]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SEBlock(nn.Module):
    def __init__(self, channels, reduction=16):
        super(SEBlock, self).__init__()
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear(channels, channels // reduction)
        self.fc2 = nn.Linear(channels // reduction, channels)

    def forward(self, x):
        b, c, _, _ = x.shape
        y = self.pool(x).view(b, c)
        y = F.relu(self.fc1(y))
        y = torch.sigmoid(self.fc2(y)).view(b, c, 1, 1)
        return x * y

class EmotionCNN_GRU(nn.Module):
    def __init__(self, input_shape=(1, 128, 128), hidden_size=64, num_layers=1):
        super(EmotionCNN_GRU, self).__init__()

        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.se1 = SEBlock(16)
        self.pool1 = nn.MaxPool2d(2, 2)

        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.se2 = SEBlock(32)
        self.pool2 = nn.MaxPool2d(2, 2)

        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.se3 = SEBlock(64)
        self.pool3 = nn.MaxPool2d(2, 2)

        # Determine GRU input size
        dummy = torch.zeros(1, *input_shape)
        with torch.no_grad():
            x = self.forward_conv(dummy)
            _, c, t, f = x.shape
            self.rnn_input_size = f * c
            self.seq_len = t

        self.gru = nn.GRU(
            input_size=self.rnn_input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=False
        )

        self.dropout = nn.Dropout(0.3)
        self.fc1 = nn.Linear(hidden_size, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward_conv(self, x):
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.se1(x)

        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        x = self.se2(x)

        x = self.pool3(F.relu(self.bn3(self.conv3(x))))
        x = self.se3(x)

        return x  # shape: (B, C, T, F)

    def forward(self, x):
        x = self.forward_conv(x)  # (B, C, T, F)
        b, c, t, f = x.shape
        x = x.permute(0, 2, 1, 3).contiguous()  # (B, T, C, F)
        x = x.view(b, t, -1)  # Flatten last two dims → (B, T, C*F)

        _, h_n = self.gru(x)  # Only take final hidden state (h_n)
        x = h_n[-1]  # shape: (B, hidden_size)

        x = self.dropout(F.relu(self.fc1(x)))
        x = self.fc2(x)
        return x.squeeze(1)


In [65]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# --- Squeeze-and-Excitation Block ---
class SEBlock(nn.Module):
    def __init__(self, channels, reduction=16):
        super(SEBlock, self).__init__()
        self.pool = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear(channels, channels // reduction)
        self.fc2 = nn.Linear(channels // reduction, channels)

    def forward(self, x):
        b, c, _, _ = x.shape
        y = self.pool(x).view(b, c)
        y = F.relu(self.fc1(y))
        y = torch.sigmoid(self.fc2(y)).view(b, c, 1, 1)
        return x * y

# --- GRU + Attention Module ---
class Attention(nn.Module):
    def __init__(self, hidden_size):
        super(Attention, self).__init__()
        self.attn = nn.Linear(hidden_size, 1)

    def forward(self, gru_output):
        # gru_output: (B, T, H)
        attn_weights = F.softmax(self.attn(gru_output), dim=1)  # (B, T, 1)
        context = torch.sum(attn_weights * gru_output, dim=1)   # (B, H)
        return context, attn_weights

# --- Final Model ---
class EmotionCNN_GRU_Attention(nn.Module):
    def __init__(self, input_shape=(1, 128, 128), hidden_size=64, num_layers=1):
        super(EmotionCNN_GRU_Attention, self).__init__()

        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(16)
        self.se1 = SEBlock(16)
        self.pool1 = nn.MaxPool2d(2, 2)

        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.se2 = SEBlock(32)
        self.pool2 = nn.MaxPool2d(2, 2)

        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(64)
        self.se3 = SEBlock(64)
        self.pool3 = nn.MaxPool2d(2, 2)

        # Determine GRU input size
        dummy = torch.zeros(1, *input_shape)
        with torch.no_grad():
            x = self.forward_conv(dummy)
            _, c, t, f = x.shape
            self.rnn_input_size = f * c
            self.seq_len = t

        self.gru = nn.GRU(
            input_size=self.rnn_input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=False
        )

        self.attention = Attention(hidden_size)

        self.dropout = nn.Dropout(0.3)
        self.fc1 = nn.Linear(hidden_size, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward_conv(self, x):
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.se1(x)

        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        x = self.se2(x)

        x = self.pool3(F.relu(self.bn3(self.conv3(x))))
        x = self.se3(x)

        return x  # (B, C, T, F)

    def forward(self, x):
        x = self.forward_conv(x)  # (B, C, T, F)
        b, c, t, f = x.shape
        x = x.permute(0, 2, 1, 3).contiguous()  # (B, T, C, F)
        x = x.view(b, t, -1)  # (B, T, C*F)

        gru_out, _ = self.gru(x)  # (B, T, H)
        context, attn_weights = self.attention(gru_out)  # (B, H)

        x = self.dropout(F.relu(self.fc1(context)))
        x = self.fc2(x)
        return x.squeeze(1)


In [66]:
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
import torch
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EmotionCNN_GRU_Attention().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=5, factor=0.5, verbose=True)

EPOCHS = 50
early_stop_patience = 10
best_r2 = -np.inf
patience_counter = 0
best_model_state = None

for epoch in range(EPOCHS):
    model.train()
    running_loss = 0.0

    for inputs, targets in train_loader:
        inputs = inputs.to(device)
        targets = targets.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()

        # 🔧 Gradient clipping (useful for RNNs)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        optimizer.step()
        running_loss += loss.item() * inputs.size(0)

    avg_loss = running_loss / len(train_loader.dataset)

    # --- Evaluation ---
    model.eval()
    preds, trues = [], []

    with torch.no_grad():
        for inputs, targets in val_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)

            outputs = model(inputs)
            preds.extend(outputs.cpu().numpy())
            trues.extend(targets.cpu().numpy())

    r2 = r2_score(trues, preds)
    mse = mean_squared_error(trues, preds)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(trues, preds)

    print(f"Epoch {epoch+1}/{EPOCHS} - Loss: {avg_loss:.4f} | R2: {r2:.4f} | RMSE: {rmse:.4f} | MAE: {mae:.4f}")

    # 🔧 Learning rate scheduler
    scheduler.step(r2)

    # 🔒 Early Stopping
    if r2 > best_r2:
        best_r2 = r2
        best_model_state = model.state_dict()
        patience_counter = 0
    else:
        patience_counter += 1
        if patience_counter >= early_stop_patience:
            print(f"🛑 Early stopping at epoch {epoch+1}. Best R2: {best_r2:.4f}")
            break

# ✅ Load best model state (optional)
if best_model_state:
    model.load_state_dict(best_model_state)



Epoch 1/50 - Loss: 4.7136 | R2: 0.1311 | RMSE: 1.0622 | MAE: 0.8720
Epoch 2/50 - Loss: 1.2374 | R2: -0.1100 | RMSE: 1.2005 | MAE: 0.9898
Epoch 3/50 - Loss: 1.1158 | R2: 0.0340 | RMSE: 1.1199 | MAE: 0.8997
Epoch 4/50 - Loss: 1.0556 | R2: 0.4231 | RMSE: 0.8654 | MAE: 0.6784
Epoch 5/50 - Loss: 1.0603 | R2: 0.3326 | RMSE: 0.9309 | MAE: 0.7253
Epoch 6/50 - Loss: 1.0106 | R2: 0.4155 | RMSE: 0.8711 | MAE: 0.6832
Epoch 7/50 - Loss: 0.9426 | R2: 0.3369 | RMSE: 0.9278 | MAE: 0.7337
Epoch 8/50 - Loss: 0.9363 | R2: 0.4381 | RMSE: 0.8541 | MAE: 0.6733
Epoch 9/50 - Loss: 0.8865 | R2: 0.3623 | RMSE: 0.9099 | MAE: 0.7051
Epoch 10/50 - Loss: 0.8319 | R2: 0.4189 | RMSE: 0.8686 | MAE: 0.6738
Epoch 11/50 - Loss: 0.8744 | R2: 0.2742 | RMSE: 0.9708 | MAE: 0.7652
Epoch 12/50 - Loss: 0.8331 | R2: 0.4187 | RMSE: 0.8688 | MAE: 0.6803
Epoch 13/50 - Loss: 0.7718 | R2: 0.3100 | RMSE: 0.9465 | MAE: 0.7418
Epoch 14/50 - Loss: 0.7816 | R2: 0.4650 | RMSE: 0.8334 | MAE: 0.6496
Epoch 15/50 - Loss: 0.7428 | R2: -0.0822 |