# ΕΠ08 Αναγνώριση Προτύπων – Μηχανική Μάθηση 2η Εργασία
Όνομα: Μανίκα Θεοδώρα

ΑΜ: 1115202100267

In [None]:
from google.colab import drive
import os
import numpy as np
import torch
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import LabelEncoder
import torch.nn as nn
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix
import time
import torch.optim as optim
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt
import random

!pip install yt-dlp pydub

import yt_dlp
from pydub import AudioSegment
import librosa
import pandas as pd
import seaborn as sns
from scipy.stats import mode

# Ερώτημα 1: Feedforward Neural Network

## Βήμα 1: Φόρτωση δεδομένων (mfccs)

In [None]:
# Mount Google Drive
drive.mount('/content/drive')

data_path = "/content/drive/MyDrive/music_genre_data_di"
os.chdir(data_path)

print("Current working directory:", os.getcwd())
print("\nContents of the directory:")
!ls

In [None]:
data_paths = {
    'train': 'train/pyaudioanalysis/',
    'val': 'val/pyaudioanalysis/',
    'test': 'test/pyaudioanalysis/'
}

# Initialize dictionaries to store the data
X_data = {}
y_data = {}

# Load data
for dataset, path in data_paths.items():
    X_data[dataset] = np.load(os.path.join(path, 'X.npy'))
    y_data[dataset] = np.load(os.path.join(path, 'labels.npy'))

X_train, y_train = X_data['train'], y_data['train']
X_val, y_val = X_data['val'], y_data['val']
X_test, y_test = X_data['test'], y_data['test']

# Convert labels from strings to integers (0-3)
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
y_val_encoded = label_encoder.transform(y_val)
y_test_encoded = label_encoder.transform(y_test)

# Create mapping dictionary for reference
class_mapping = {i: label for i, label in enumerate(label_encoder.classes_)}
print("Class mapping:", class_mapping)


In [None]:
# Convert numpy arrays to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.LongTensor(y_train_encoded)
X_val_tensor = torch.FloatTensor(X_val)
y_val_tensor = torch.LongTensor(y_val_encoded)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.LongTensor(y_test_encoded)

# Create TensorDatasets
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

# Create DataLoaders with batch size 16
batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
# Print information about the loaded data
print("\nData loading complete!")
print(f"Number of training batches: {len(train_loader)}")
print(f"Number of validation batches: {len(val_loader)}")
print(f"Number of test batches: {len(test_loader)}")

# Check the shape of one batch
sample_features, sample_labels = next(iter(train_loader))
print(f"\nSample batch features shape: {sample_features.shape}")
print(f"Sample batch labels shape: {sample_labels.shape}")

## Βήμα 2: Ορισμός Νευρωνικού Δικτύου

In [None]:
class FCNN(nn.Module):
    def __init__(self, input_dim=26):
        """
        Fully Connected Neural Network with 3 layers:
        - Input layer: input_dim (26) → 128 neurons
        - Hidden layer: 128 → 32 neurons
        - Output layer: 32 → 4 neurons (output classes)

        No activation functions are used as specified.
        """
        super(FCNN, self).__init__()

        # Define the network layers
        self.layer1 = nn.Linear(input_dim, 128)  # Input to first hidden layer
        self.layer2 = nn.Linear(128, 32)         # First to second hidden layer
        self.layer3 = nn.Linear(32, 4)           # Second hidden to output layer

    def forward(self, x):
        """
        Forward pass of the network
        Args:
            x: Input tensor of shape (batch_size, 26)
        Returns:
            Output tensor of shape (batch_size, 4)
        """
        # Pass through each layer without activation functions
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)

        return x

## Βήμα 3: Ορισμός διαδικασίας εκπαίδευσης

In [None]:
def train_model(model, train_loader, criterion, optimizer, num_epochs):
    """
    Training function for the neural network

    Args:
        model: The neural network to train
        train_loader: DataLoader with training data
        criterion: Loss function
        optimizer: Optimization algorithm
        num_epochs: Number of training epochs

    Returns:
        The trained model
    """
    model.train()  # Set model to training mode

    for epoch in range(num_epochs):
        running_loss = 0.0

        for batch_idx, (inputs, labels) in enumerate(train_loader):
            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)

            # Calculate loss
            loss = criterion(outputs, labels)

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            # Print statistics
            running_loss += loss.item()
            if batch_idx % 100 == 99:  # Print every 100 batches
                print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}/{len(train_loader)}], '
                      f'Loss: {running_loss/100:.4f}')
                running_loss = 0.0

        # Print epoch statistics
        epoch_loss = running_loss / len(train_loader)
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}')

    print('Training complete!')
    return model

## Βήμα 4: Ορισμός διαδικασίας αξιολόγησης

In [None]:
def evaluate_model_device(model, dataloader, criterion, device): # Added device argument
    """
    Evaluation function for the neural network

    Args:
        model: The neural network to evaluate
        dataloader: DataLoader with evaluation data
        criterion: Loss function
        device: The device to perform evaluation on ('cuda' or 'cpu')

    Returns:
        A tuple containing:
        - loss (float): Average loss across all batches
        - f1 (float): Macro averaged F1 score
        - accuracy (float): Accuracy score
        - cm (numpy array): Confusion matrix
    """
    model.eval()  # Set model to evaluation mode
    total_loss = 0.0
    all_predictions = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation
        for inputs, labels in dataloader:
            # Move inputs and labels to the specified device
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass
            outputs = model(inputs)

            # Calculate loss
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            # Get predictions
            _, predicted = torch.max(outputs.data, 1)

            # Store predictions and labels for metrics calculation
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Calculate metrics
    avg_loss = total_loss / len(dataloader)
    f1 = f1_score(all_labels, all_predictions, average='macro')
    accuracy = accuracy_score(all_labels, all_predictions)
    cm = confusion_matrix(all_labels, all_predictions)

    return avg_loss, f1, accuracy, cm

## Βήμα 5: Εκπαίδευση δικτύου

In [None]:
# Εκπαίδευση με CPU για σύγκριση
# Ensure we use the CPU device for this part
device_cpu = torch.device('cpu')
print(f"Using device: {device_cpu}")
model_cpu_fcnn = FCNN(input_dim=26).to(device_cpu) # Move model to CPU
optimizer_cpu = optim.SGD(model_cpu_fcnn.parameters(), lr=0.002)
criterion_cpu = nn.CrossEntropyLoss() # Create a separate criterion if needed, or reuse

print("\nTraining with CPU...")
start_time_cpu = time.time()
num_epochs = 30

for epoch in range(num_epochs):
    model_cpu_fcnn.train()
    running_loss = 0.0

    for batch_idx, (inputs, labels) in enumerate(train_loader):
        # Move inputs and labels to the CPU device
        inputs, labels = inputs.to(device_cpu), labels.to(device_cpu)

        optimizer_cpu.zero_grad()
        outputs = model_cpu_fcnn(inputs)
        loss = criterion_cpu(outputs, labels)
        loss.backward()
        optimizer_cpu.step()

        running_loss += loss.item()
        if batch_idx % 100 == 99:
            print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}/{len(train_loader)}], Loss: {running_loss/100:.4f}')
            running_loss = 0.0

    # Optional: Print epoch loss at the end of each epoch
    if running_loss > 0:
         print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}/{len(train_loader)}], Remaining Loss: {running_loss/(batch_idx % 100 + 1):.4f}')


cpu_duration = time.time() - start_time_cpu
print(f'CPU Training completed in {cpu_duration:.2f} seconds')

# Αξιολόγηση με CPU - Pass the device_cpu to evaluate_model
test_loss_cpu, test_f1_cpu, test_acc_cpu, test_cm_cpu = evaluate_model_device(model_cpu_fcnn, test_loader, criterion_cpu, device_cpu)
print("\nCPU Test Results:")
print(f"Loss: {test_loss_cpu:.4f}")
print(f"F1 Score (macro): {test_f1_cpu:.4f}")
print(f"Accuracy: {test_acc_cpu:.4f}")
print("\nConfusion Matrix (CPU):")
print(test_cm_cpu)

## Βήμα 6: Εκπαίδευση δικτύου με GPU

In [None]:
# Έλεγχος για GPU διαθεσιμότητα
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Μεταφορά μοντέλου και δεδομένων στη GPU
# Create a new model instance for GPU training to ensure it starts fresh
model_gpu_fcnn = FCNN(input_dim=26).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model_gpu_fcnn.parameters(), lr=0.002)

# Note: We don't need a separate to_device_dataloader function.
# We can move tensors to the device inside the training loop directly.

# Εκπαίδευση με GPU
print("\nTraining with GPU...")
start_time_gpu = time.time()

for epoch in range(num_epochs):
    model_gpu_fcnn.train()
    running_loss = 0.0

    for batch_idx, (inputs, labels) in enumerate(train_loader):
        # Move inputs and labels to the selected device (GPU or CPU)
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model_gpu_fcnn(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if batch_idx % 100 == 99:
            # No need to calculate epoch_loss here, as we print per 100 batches
            print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}/{len(train_loader)}], Loss: {running_loss/100:.4f}')
            running_loss = 0.0

    # Optional: Print epoch loss at the end of each epoch
    # If running_loss is not zero, calculate and print the remaining average loss
    if running_loss > 0:
         print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx+1}/{len(train_loader)}], Remaining Loss: {running_loss/(batch_idx % 100 + 1):.4f}')


gpu_duration = time.time() - start_time_gpu
print(f'GPU Training completed in {gpu_duration:.2f} seconds')


# Αξιολόγηση με GPU - Pass the device to evaluate_model
# The evaluate_model function itself handles moving data to the device
test_loss_gpu, test_f1_gpu, test_acc_gpu, test_cm_gpu = evaluate_model_device(model_gpu_fcnn, test_loader, criterion, device)
print("\nGPU Test Results:")
print(f"Loss: {test_loss_gpu:.4f}")
print(f"F1 Score (macro): {test_f1_gpu:.4f}")
print(f"Accuracy: {test_acc_gpu:.4f}")
print("\nConfusion Matrix (GPU):")
print(test_cm_gpu)


In [None]:
# Σύγκριση χρόνων
print("\nTraining Time Comparison:")
print(f"GPU Time: {gpu_duration:.2f} seconds")
print(f"CPU Time: {cpu_duration:.2f} seconds")

# Only calculate speedup if a GPU was actually used (device is 'cuda')
if device.type == 'cuda':
    print(f"Speedup: {cpu_duration/gpu_duration:.2f}x faster with GPU")
else:
    print("GPU not available, cannot calculate speedup.")

# Optional: Print classification report for GPU model evaluation
all_preds_gpu = []
all_labels_gpu = []
model_gpu_fcnn.eval()
with torch.no_grad():
    for inputs, labels in test_loader:
        # Move inputs and labels to the device used for GPU model
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model_gpu_fcnn(inputs)
        _, predicted = torch.max(outputs.data, 1)
        all_preds_gpu.extend(predicted.cpu().numpy())
        all_labels_gpu.extend(labels.cpu().numpy())

print("\nClassification Report (GPU):")
# Assuming label_encoder is available from previous cells
try:
    print(classification_report(all_labels_gpu, all_preds_gpu, target_names=label_encoder.classes_))
except NameError:
    print("label_encoder not found. Cannot print classification report with class names.")
    print(classification_report(all_labels_gpu, all_preds_gpu))


# Optional: Print classification report for CPU model evaluation
all_preds_cpu = []
all_labels_cpu = []
model_cpu_fcnn.eval()
with torch.no_grad():
    for inputs, labels in test_loader:
        # Move inputs and labels to the device used for CPU model
        inputs, labels = inputs.to(device_cpu), labels.to(device_cpu)
        outputs = model_cpu_fcnn(inputs)
        _, predicted = torch.max(outputs.data, 1)
        all_preds_cpu.extend(predicted.cpu().numpy())
        all_labels_cpu.extend(labels.cpu().numpy())

print("\nClassification Report (CPU):")
try:
    print(classification_report(all_labels_cpu, all_preds_cpu, target_names=label_encoder.classes_))
except NameError:
    print("label_encoder not found. Cannot print classification report with class names.")
    print(classification_report(all_labels_cpu, all_preds_cpu))

## Βήμα 7: Επιλογή μοντέλου

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, delta=0, path='best_model.pth'):
        self.patience = patience
        self.delta = delta
        self.path = path
        self.counter = 0
        self.best_score = None
        self.early_stop = False

    def __call__(self, val_f1, model):
        if self.best_score is None:
            self.best_score = val_f1
            self.save_checkpoint(model)
        elif val_f1 < self.best_score + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = val_f1
            self.save_checkpoint(model)
            self.counter = 0

    def save_checkpoint(self, model):
        torch.save(model.state_dict(), self.path)

def train_with_validation(model, train_loader, val_loader, criterion, optimizer, num_epochs=30):
    # Determine the device of the model
    model_device = next(model.parameters()).device
    early_stopping = EarlyStopping(patience=5, path='best_f1_model.pth')

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = 0.0
        for inputs, labels in train_loader:
            # Move inputs and labels to the model's device
            inputs, labels = inputs.to(model_device), labels.to(model_device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        # Validation phase
        model.eval()
        val_loss = 0.0
        all_preds = []
        all_labels = []
        with torch.no_grad():
            for inputs, labels in val_loader:
                # Move inputs and labels to the model's device
                inputs, labels = inputs.to(model_device), labels.to(model_device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        # Calculate metrics
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        val_f1 = f1_score(all_labels, all_preds, average='macro')

        print(f'Epoch {epoch+1}/{num_epochs}:')
        print(f'Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Val F1: {val_f1:.4f}')

        # Early stopping check
        early_stopping(val_f1, model)
        if early_stopping.early_stop:
            print("Early stopping triggered")
            break

    # Load the best model state dict onto the current model instance (which is on the correct device)
    model.load_state_dict(torch.load('best_f1_model.pth', map_location=model_device))
    return model

# Initialize model, criterion and optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_fcnn = FCNN(input_dim=26).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model_fcnn.parameters(), lr=0.002)

# Train with validation
best_model = train_with_validation(model_fcnn, train_loader, val_loader, criterion, optimizer, num_epochs=30)

# Evaluate on test set
test_loss, test_f1, test_acc, test_cm = evaluate_model_device(best_model, test_loader, criterion, device)

print("\nFinal Test Results with Best Model:")
print(f"Test Loss: {test_loss:.4f}")
print(f"Test F1 Score (macro): {test_f1:.4f}")
print(f"Test Accuracy: {test_acc:.4f}")
print("\nConfusion Matrix:")
print(test_cm)

# Ερώτημα 2: Convolutional Neural Network

## Βήμα 1: Φόρτωση δεδομένων (spectrograms)

In [None]:
# Define dataset splits and their paths
data_config = {
    'train': 'train/melgrams/',
    'val': 'val/melgrams/',
    'test': 'test/melgrams/'
}

# Dictionary to store loaded data
mel_data = {}

# Load data for each split
for split, path in data_config.items():
    mel_data[f'X_{split}_mel'] = np.load(os.path.join(path, 'X.npy'))
    mel_data[f'y_{split}_mel'] = np.load(os.path.join(path, 'labels.npy'))

# Assign to variables (optional - you can also work directly with the dictionary)
X_train_mel = mel_data['X_train_mel']
y_train_mel = mel_data['y_train_mel']
X_val_mel = mel_data['X_val_mel']
y_val_mel = mel_data['y_val_mel']
X_test_mel = mel_data['X_test_mel']
y_test_mel = mel_data['y_test_mel']

# Convert labels from strings to integers (0-3)
label_encoder = LabelEncoder()
y_train_mel_encoded = label_encoder.fit_transform(y_train_mel)
y_val_mel_encoded = label_encoder.transform(y_val_mel)
y_test_mel_encoded = label_encoder.transform(y_test_mel)

# Create mapping dictionary for reference
class_mapping = {i: label for i, label in enumerate(label_encoder.classes_)}
print("Class mapping:", class_mapping)

In [None]:
# Check the shape of the mel-spectrograms
print("\nMel-spectrogram shapes:")
print(f"Train: {X_train_mel.shape}")
print(f"Validation: {X_val_mel.shape}")
print(f"Test: {X_test_mel.shape}")

# Visualize one random mel-spectrogram from each class
plt.figure(figsize=(15, 10))
for i, genre in enumerate(label_encoder.classes_):
    # Find indices of samples from this class
    indices = np.where(y_train_mel_encoded == i)[0]
    # Select a random sample
    random_idx = random.choice(indices)
    melgram = X_train_mel[random_idx]

    plt.subplot(2, 2, i+1)
    plt.imshow(melgram, aspect='auto', origin='lower')
    plt.title(f"Class: {genre}")
    plt.colorbar()
    plt.xlabel("Time frames")
    plt.ylabel("Mel-frequency bins")

plt.tight_layout()
plt.show()

In [None]:
# Convert numpy arrays to PyTorch tensors and add channel dimension
# Assuming mel-spectrograms have shape (n_samples, height, width)
X_train_mel_tensor = torch.FloatTensor(X_train_mel).unsqueeze(1)  # Add channel dimension
y_train_mel_tensor = torch.LongTensor(y_train_mel_encoded)
X_val_mel_tensor = torch.FloatTensor(X_val_mel).unsqueeze(1)
y_val_mel_tensor = torch.LongTensor(y_val_mel_encoded)
X_test_mel_tensor = torch.FloatTensor(X_test_mel).unsqueeze(1)
y_test_mel_tensor = torch.LongTensor(y_test_mel_encoded)

# Create TensorDatasets
train_mel_dataset = TensorDataset(X_train_mel_tensor, y_train_mel_tensor)
val_mel_dataset = TensorDataset(X_val_mel_tensor, y_val_mel_tensor)
test_mel_dataset = TensorDataset(X_test_mel_tensor, y_test_mel_tensor)

# Create DataLoaders with batch size 16
batch_size = 16
train_mel_loader = DataLoader(train_mel_dataset, batch_size=batch_size, shuffle=True)
val_mel_loader = DataLoader(val_mel_dataset, batch_size=batch_size, shuffle=True)
test_mel_loader = DataLoader(test_mel_dataset, batch_size=batch_size, shuffle=False)

print("\nMel-spectrogram data loading complete!")
print(f"Number of training batches: {len(train_mel_loader)}")
print(f"Number of validation batches: {len(val_mel_loader)}")
print(f"Number of test batches: {len(test_mel_loader)}")

## Βήμα 2: Ορισμός Νευρωνικού Δικτύου

In [None]:
class CNN(nn.Module):
    def __init__(self, input_height=21, input_width=128, out_dim=4):
        super(CNN, self).__init__()

        # Convolutional layers
        self.conv_layers = nn.Sequential(
            # Layer 1: 1 input channel, 16 output channels, kernel size 5
            nn.Conv2d(1, 16, kernel_size=5),
            # Layer 2: 16 input channels, 32 output channels, kernel size 5
            nn.Conv2d(16, 32, kernel_size=5),
            # Layer 3: 32 input channels, 64 output channels, kernel size 5
            nn.Conv2d(32, 64, kernel_size=5),
            # Layer 4: 64 input channels, 128 output channels, kernel size 5
            nn.Conv2d(64, 128, kernel_size=5)
        )

        # Calculate the output dimensions after convolutions
        # We need this to determine the input size for the first fully connected layer
        def conv_output_size(size, kernel_size=5):
            return size - kernel_size + 1

        # Apply conv_output_size twice (once for height, once for width)
        conv_height = conv_output_size(conv_output_size(conv_output_size(conv_output_size(input_height))))
        conv_width = conv_output_size(conv_output_size(conv_output_size(conv_output_size(input_width))))

        self.conv_output_size = 128 * conv_height * conv_width

        # Fully connected layers
        self.fc_layers = nn.Sequential(
            nn.Linear(self.conv_output_size, 1024),
            nn.Linear(1024, 256),
            nn.Linear(256, 32),
            nn.Linear(32, out_dim)
        )

    def forward(self, x):
        # Forward pass through convolutional layers
        x = self.conv_layers(x)

        # Flatten the output for the fully connected layers
        x = x.view(-1, self.conv_output_size)

        # Forward pass through fully connected layers
        x = self.fc_layers(x)

        return x

## Βήμα 3: Εκπαίδευση δικτύου

In [None]:
# Initialize model, criterion and optimizer
input_height, input_width = X_train_mel.shape[1:]
model_cpu_cnn = CNN(input_height=input_height, input_width=input_width).to('cpu')
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model_cpu_cnn.parameters(), lr=0.002)

# Training with validation (with timing)
print("Starting training...")
start_train = time.time()

best_model = train_with_validation(model_cpu_cnn, train_mel_loader, val_mel_loader,
                                 criterion, optimizer, num_epochs=30)

train_time = time.time() - start_train
print(f"\nTraining completed in {train_time:.2f} seconds ({train_time/60:.2f} minutes)")

# Evaluation on test set (with timing)
print("\nStarting evaluation...")
start_eval = time.time()

test_loss, test_f1, test_acc, test_cm = evaluate_model_device(best_model,
                                                           test_mel_loader,
                                                           criterion,
                                                           'cpu')

eval_time = time.time() - start_eval

# Print results
print("\nFinal Test Results with Best Model:")
print(f"Test Loss: {test_loss:.4f}")
print(f"Test F1 Score (macro): {test_f1:.4f}")
print(f"Test Accuracy: {test_acc:.4f}")
print(f"Evaluation completed in {eval_time:.2f} seconds")
print("\nConfusion Matrix:")
print(test_cm)

# Total time
total_time_cpu = train_time + eval_time
print(f"\nTotal execution time: {total_time_cpu:.2f} seconds ({total_time_cpu/60:.2f} minutes)")

In [None]:
# Initialize device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Initialize model, criterion and optimizer
input_height, input_width = X_train_mel.shape[1:]
model_gpu_cnn = CNN(input_height=input_height, input_width=input_width).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model_gpu_cnn.parameters(), lr=0.002)

# Training with validation (with timing)
print("\nStarting training...")
start_train = time.time()

best_model = train_with_validation(model_gpu_cnn, train_mel_loader, val_mel_loader,
                                 criterion, optimizer, num_epochs=30)

train_time = time.time() - start_train
print(f"\nTraining completed in {train_time:.2f} seconds ({train_time/60:.2f} minutes)")

# Evaluation on test set (with timing)
print("\nStarting evaluation...")
start_eval = time.time()

test_loss, test_f1, test_acc, test_cm = evaluate_model_device(best_model,
                                                           test_mel_loader,
                                                           criterion,
                                                           device)

eval_time = time.time() - start_eval

# Print results
print("\nFinal Test Results with Best Model:")
print(f"Test Loss: {test_loss:.4f}")
print(f"Test F1 Score (macro): {test_f1:.4f}")
print(f"Test Accuracy: {test_acc:.4f}")
print(f"Evaluation completed in {eval_time:.2f} seconds")
print("\nConfusion Matrix:")
print(test_cm)

# Total time
total_time_gpu = train_time + eval_time
print(f"\nTotal execution time: {total_time_gpu:.2f} seconds ({total_time_gpu/60:.2f} minutes)")

In [None]:
# Σύγκριση χρόνων
print("\nTraining Time Comparison:")
print(f"GPU Time: {total_time_gpu:.2f} seconds")
print(f"CPU Time: {total_time_cpu:.2f} seconds")

# Only calculate speedup if a GPU was actually used (device is 'cuda')
if device.type == 'cuda':
    print(f"Speedup: {total_time_cpu/total_time_gpu:.2f}x faster with GPU")
else:
    print("GPU not available, cannot calculate speedup.")

## Βήμα 4: Pooling and padding

In [None]:
class CNNP(nn.Module):
    def __init__(self, input_height, input_width):
        """
        Modified CNN with padding and max pooling for mel-spectrogram classification
        Architecture:
        - 4 convolutional layers with kernel_size=5, padding=2, and max pooling (kernel_size=2)
        - Channels: 1 → 16 → 32 → 64 → 128
        - 4 fully connected layers: flattened_size → 1024 → 256 → 32 → 4 classes
        """
        super().__init__()

        # Convolutional layers with padding and max pooling
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=5, stride=1, padding=2),
            nn.MaxPool2d(kernel_size=2)
        )

        # Calculate flattened size after convolutions and pooling
        # Each max pooling layer reduces dimensions by half
        self.flattened_size = 128 * (input_height//16) * (input_width//16)

        # Fully connected layers
        self.fc1 = nn.Linear(self.flattened_size, 1024)
        self.fc2 = nn.Linear(1024, 256)
        self.fc3 = nn.Linear(256, 32)
        self.fc4 = nn.Linear(32, 4)  # 4 output classes

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)

        x = x.view(-1, self.flattened_size)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        return x

In [None]:
# Initialize model, criterion and optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_cnn_p = CNNP(input_height=input_height, input_width=input_width).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model_cnn_p.parameters(), lr=0.002)

# Train with validation
best_model_p = train_with_validation(model_cnn_p, train_mel_loader, val_mel_loader, criterion, optimizer, num_epochs=30)

# Evaluate on test set
test_loss, test_f1, test_acc, test_cm = evaluate_model_device(best_model_p, test_mel_loader, criterion, device)

print("\nFinal Test Results with Best Model:")
print(f"Test Loss: {test_loss:.4f}")
print(f"Test F1 Score (macro): {test_f1:.4f}")
print(f"Test Accuracy: {test_acc:.4f}")
print("\nConfusion Matrix:")
print(test_cm)

## Βήμα 5: Activation functions

In [None]:
class CNNRELU(nn.Module):
    def __init__(self, input_height, input_width):
        super().__init__()

        # Convolutional layers with ReLU and max pooling
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),  # Added ReLU
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),  # Added ReLU
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),  # Added ReLU
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),  # Added ReLU
            nn.MaxPool2d(kernel_size=2)
        )

        # Calculate flattened size
        self.flattened_size = 128 * (input_height//16) * (input_width//16)

        # Fully connected layers with ReLU
        self.fc1 = nn.Sequential(
            nn.Linear(self.flattened_size, 1024),
            nn.ReLU()  # Added ReLU
        )
        self.fc2 = nn.Sequential(
            nn.Linear(1024, 256),
            nn.ReLU()  # Added ReLU
        )
        self.fc3 = nn.Sequential(
            nn.Linear(256, 32),
            nn.ReLU()  # Added ReLU
        )
        self.fc4 = nn.Linear(32, 4)  # No ReLU before final output

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)

        x = x.view(-1, self.flattened_size)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        return x

In [None]:
# Initialize model, criterion and optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_cnn_relu = CNNRELU(input_height=input_height, input_width=input_width).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model_cnn_relu.parameters(), lr=0.002)

# Train with validation
best_model_relu = train_with_validation(model_cnn_relu, train_mel_loader, val_mel_loader, criterion, optimizer, num_epochs=30)

# Evaluate on test set
test_loss, test_f1, test_acc, test_cm = evaluate_model_device(best_model_relu, test_mel_loader, criterion, device)

print("\nFinal Test Results with Best Model:")
print(f"Test Loss: {test_loss:.4f}")
print(f"Test F1 Score (macro): {test_f1:.4f}")
print(f"Test Accuracy: {test_acc:.4f}")
print("\nConfusion Matrix:")
print(test_cm)

# Ερώτημα 3: Improving Performance

## Βήμα 1: Reproducibility

Δεν υλοποιήθηκε

## Βήμα 2: Αλγόριθμοι βελτιστοποίησης

In [None]:
# Ορισμός των optimizers
optimizers = {
    'SGD': optim.SGD(model_cnn_relu.parameters(), lr=0.002),
    'Adam': optim.Adam(model_cnn_relu.parameters(), lr=0.002),
    'RMSprop': optim.RMSprop(model_cnn_relu.parameters(), lr=0.002),
    'Adagrad': optim.Adagrad(model_cnn_relu.parameters(), lr=0.002),
    'Adadelta': optim.Adadelta(model_cnn_relu.parameters(), lr=0.002),
    'Adamax': optim.Adamax(model_cnn_relu.parameters(), lr=0.002),
    'Nadam': optim.NAdam(model_cnn_relu.parameters(), lr=0.002),
    'Adafactor': optim.Adafactor(model_cnn_relu.parameters(), lr=0.002),
    'Rprop': optim.Rprop(model_cnn_relu.parameters(), lr=0.002),
    'ASGD': optim.ASGD(model_cnn_relu.parameters(), lr=0.002)
}

# Αποθήκευση αποτελεσμάτων
results = {'Optimizer': [], 'Accuracy': [], 'F1 Score': []}

# Εκπαίδευση και αξιολόγηση για κάθε optimizer
for opt_name, optimizer in optimizers.items():
    print(f"\nTraining with {opt_name}...")

    # Initialize model, criterion and optimizer
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model_opt = CNNRELU(input_height=input_height, input_width=input_width).to(device)
    criterion = nn.CrossEntropyLoss()

    # Train with validation
    best_model_opt = train_with_validation(model_opt, train_mel_loader, val_mel_loader, criterion, optimizer, 10)

    # Evaluate on test set
    test_loss, test_f1, test_acc, test_cm = evaluate_model_device(best_model_opt, test_mel_loader, criterion, device)

    results['Optimizer'].append(opt_name)
    results['Accuracy'].append(test_acc)
    results['F1 Score'].append(test_f1)

# Εμφάνιση αποτελεσμάτων
results_df = pd.DataFrame(results)
print("\nResults:")
print(results_df)

## Βήμα 3: Batch Normalization

In [None]:
class CNNBatchNorm(nn.Module):
    def __init__(self, input_height, input_width, out_dim=4):
        super(CNNBatchNorm, self).__init__()

        # Convolutional layers with Batch Normalization and ReLU
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(16),  # Batch Normalization
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),  # Batch Normalization
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(64),  # Batch Normalization
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(128),  # Batch Normalization
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )

        # Calculate flattened size after convolutions and pooling
        self.flattened_size = 128 * (input_height // 16) * (input_width // 16)

        # Fully connected layers
        self.fc1 = nn.Linear(self.flattened_size, 1024)
        self.fc2 = nn.Linear(1024, 256)
        self.fc3 = nn.Linear(256, 32)
        self.fc4 = nn.Linear(32, out_dim)  # 4 output classes

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)

        x = x.view(-1, self.flattened_size)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        return x

In [None]:
# Initialize model, criterion and optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_cnn_batch = CNNBatchNorm(input_height=input_height, input_width=input_width).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model_cnn_batch.parameters(), lr=0.002)

# Train with validation
best_model_batch = train_with_validation(model_cnn_batch, train_mel_loader, val_mel_loader, criterion, optimizer, num_epochs=30)

# Evaluate on test set
test_loss, test_f1, test_acc, test_cm = evaluate_model_device(best_model_batch, test_mel_loader, criterion, device)

print("\nFinal Test Results with Best Model:")
print(f"Test Loss: {test_loss:.4f}")
print(f"Test F1 Score (macro): {test_f1:.4f}")
print(f"Test Accuracy: {test_acc:.4f}")
print("\nConfusion Matrix:")
print(test_cm)

## Βήμα 4: Regularization

In [None]:
# Εκπαίδευση με μόνο weight_decay
optimizer_wd = optim.SGD(model_cnn_batch.parameters(), lr=0.002, weight_decay=1e-4)
model_wd = CNNBatchNorm(input_height=input_height, input_width=input_width).to(device)
model_wd = train_with_validation(model_wd, train_mel_loader, val_mel_loader, criterion, optimizer_wd, num_epochs=30)

# Εκπαίδευση με μόνο dropout
optimizer_dropout = optim.SGD(model_cnn_batch.parameters(), lr=0.002)
model_dropout = CNNBatchNorm(input_height=input_height, input_width=input_width).to(device)
model_dropout = train_with_validation(model_dropout, train_mel_loader, val_mel_loader, criterion, optimizer_dropout, num_epochs=60)

# Εκπαίδευση με και τα δύο (weight_decay και dropout)
optimizer_both = optim.SGD(model_cnn_batch.parameters(), lr=0.002, weight_decay=1e-4)
model_both = CNNBatchNorm(input_height=input_height, input_width=input_width, dropout_prob=0.5).to(device)
model_both = train_with_validation(model_both, train_mel_loader, val_mel_loader, criterion, optimizer_both, num_epochs=60)

# Αξιολόγηση για όλα τα μοντέλα
test_loss_wd, test_f1_wd, test_acc_wd, test_cm_wd = evaluate_model_device(model_wd, test_mel_loader, criterion, device)
test_loss_dropout, test_f1_dropout, test_acc_dropout, test_cm_dropout = evaluate_model_device(model_dropout, test_mel_loader, criterion, device)
test_loss_both, test_f1_both, test_acc_both, test_cm_both = evaluate_model_device(model_both, test_mel_loader, criterion, device)

# Εμφάνιση αποτελεσμάτων
print("\nTest Results (Weight Decay Only):")
print(f"Loss: {test_loss_wd:.4f}")
print(f"F1 Score: {test_f1_wd:.4f}")
print(f"Accuracy: {test_acc_wd:.4f}")

print("\nTest Results (Dropout Only):")
print(f"Loss: {test_loss_dropout:.4f}")
print(f"F1 Score: {test_f1_dropout:.4f}")
print(f"Accuracy: {test_acc_dropout:.4f}")

print("\nTest Results (Both Weight Decay and Dropout):")
print(f"Loss: {test_loss_both:.4f}")
print(f"F1 Score: {test_f1_both:.4f}")
print(f"Accuracy: {test_acc_both:.4f}")


Epoch 1/30:
Train Loss: 1.3831 | Val Loss: 1.3864 | Val F1: 0.1312
Epoch 2/30:
Train Loss: 1.3830 | Val Loss: 1.3864 | Val F1: 0.1318
Epoch 3/30:
Train Loss: 1.3833 | Val Loss: 1.3862 | Val F1: 0.1344
Epoch 4/30:
Train Loss: 1.3833 | Val Loss: 1.3858 | Val F1: 0.1323
Epoch 5/30:
Train Loss: 1.3830 | Val Loss: 1.3871 | Val F1: 0.1324
Epoch 6/30:
Train Loss: 1.3833 | Val Loss: 1.3857 | Val F1: 0.1365
Epoch 7/30:
Train Loss: 1.3832 | Val Loss: 1.3852 | Val F1: 0.1365
Epoch 8/30:
Train Loss: 1.3833 | Val Loss: 1.3857 | Val F1: 0.1339
Epoch 9/30:
Train Loss: 1.3828 | Val Loss: 1.3846 | Val F1: 0.1346
Epoch 10/30:
Train Loss: 1.3830 | Val Loss: 1.3864 | Val F1: 0.1318
Epoch 11/30:
Train Loss: 1.3828 | Val Loss: 1.3848 | Val F1: 0.1366
Epoch 12/30:
Train Loss: 1.3829 | Val Loss: 1.3869 | Val F1: 0.1317
Epoch 13/30:
Train Loss: 1.3833 | Val Loss: 1.3861 | Val F1: 0.1323
Epoch 14/30:
Train Loss: 1.3831 | Val Loss: 1.3864 | Val F1: 0.1318
Epoch 15/30:
Train Loss: 1.3831 | Val Loss: 1.3862 | Val 

TypeError: CNNBatchNorm.__init__() got an unexpected keyword argument 'dropout_prob'

# Ερώτημα 4: Testing

## Βήμα 1: Inference

In [None]:
def get_predictions(model, dataloader, device=None):
    """
    Παράγει προβλέψεις από ένα εκπαιδευμένο μοντέλο CNN για τα δεδομένα ενός dataloader.

    Args:
        model (torch.nn.Module): Το εκπαιδευμένο μοντέλο CNN
        dataloader (torch.utils.data.DataLoader): Ο dataloader με τα δεδομένα (shuffle=False)
        device (torch.device, optional): Σε ποια συσκευή να τρέξει (π.χ. 'cuda' ή 'cpu').
                                        Αν None, θα χρησιμοποιηθεί το ίδιο device με το μοντέλο.

    Returns:
        list: Λίστα με όλες τις προβλέψεις του μοντέλου (στη σειρά που δόθηκαν)
    """
    # Βάζουμε το μοντέλο σε evaluation mode
    model.eval()

    # Αν δεν δοθεί device, χρησιμοποιούμε αυτό του μοντέλου
    if device is None:
        device = next(model.parameters()).device

    predictions = []

    # Απενεργοποιούμε τον υπολογισμό gradients για αποδοτικότητα
    with torch.no_grad():
        for inputs, _ in dataloader:  # αγνοούμε τα labels (αν υπάρχουν)
            # Μεταφέρουμε τα δεδομένα στο σωστό device
            inputs = inputs.to(device)

            # Κάνουμε την πρόβλεψη
            outputs = model(inputs)

            # Παίρνουμε την κλάση με την υψηλότερη πιθανότητα
            _, preds = torch.max(outputs, 1)

            # Μεταφέρουμε τις προβλέψεις σε CPU και τις προσθέτουμε στη λίστα
            predictions.extend(preds.cpu().numpy().tolist())

    return predictions

## Βήμα 2: Κατέβασμα μουσικής από το youtube

In [None]:
def download_youtube(youtube_url, output_path="output.wav"):
    try:
        # Download audio using yt_dlp
        ydl_opts = {
            'format': 'bestaudio/best',
            'outtmpl': 'downloaded_audio.%(ext)s',
            'postprocessors': [{
                'key': 'FFmpegExtractAudio',
                'preferredcodec': 'mp3',
                'preferredquality': '192',
            }],
        }

        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([youtube_url])

        audio_file = "downloaded_audio.mp3"
        audio = AudioSegment.from_file(audio_file)
        audio.export(output_path, format="wav")

        os.remove(audio_file)

    except Exception as e:
        print(f"Download failed: {e}")
        raise



In [None]:
window_length = (50 * 1e-3)
hop_length = (50 * 1e-3)
mel_time_size = 21

def load_wav(filename):
    """Rea audio file and return audio signal and sampling frequency"""
    if not os.path.exists(filename):
        raise FileNotFoundError
    # Load file using librosa
    x, fs = librosa.load(filename, sr=None)
    return x, fs


def melspectrogram(x=None, fs=None, n_fft=None, hop_length=None,
                   fuse=False):
    """Returns a mel spectrogram."""

    if x is None:
        return None
    # Set some values
    if n_fft is None:
        n_fft = int(window_length * fs)
    if hop_length is None:
        hop_length = int(hop_length * fs)
    # Get spectrogram
    spectrogram = librosa.feature.melspectrogram(y=x, sr=fs, n_fft=n_fft,
                                                 hop_length=hop_length)
    # Convert to MEL-Scale
    spectrogram_dB = librosa.power_to_db(spectrogram, ref=np.max)  # (n_mel,t)

    if fuse:
        chroma = librosa.feature.chroma_stft(y=x, sr=fs, n_fft=n_fft,
                                             hop_length=hop_length)
        chroma_dB = librosa.power_to_db(chroma)
        out = np.concatenate((spectrogram_dB.T, chroma_dB.T), axis=1)
    else:
        # Transpose to return (time,n_mel)
        out = spectrogram_dB.T
    return out


def get_melgrams(file):
    signal, fs = load_wav(file)

    segment_length = int((mel_time_size - 1) * window_length * fs)
    sequence_length = signal.shape[0]
    progress = 0
    segments = []
    while progress < sequence_length:
        if progress + segment_length > sequence_length:
            fill_data = sequence_length - progress
            empty_data = segment_length - fill_data
            feature = melspectrogram(
                np.pad(signal[progress:], (0, empty_data), 'constant'),
                fs=fs, n_fft=int(window_length * fs), hop_length=int(hop_length * fs))
            segments.append(feature)
        else:
            feature = melspectrogram(
                signal[progress:progress + segment_length],
                fs=fs, n_fft=int(window_length * fs), hop_length=int(hop_length * fs))

            segments.append(feature)
        progress += segment_length

    return segments


def youtube_to_melgram(url):
    download_youtube(url)
    melgrams = get_melgrams("output.wav")
    np.save("youtube_melgrams.npy", melgrams)

In [None]:
youtube_to_melgram('https://www.youtube.com/watch?v=9E6b3swbnWg')
youtube_to_melgram('https://www.youtube.com/watch?v=EDwb9jOVRtU')
youtube_to_melgram('https://www.youtube.com/watch?v=OMaycNcPsHI')
youtube_to_melgram('https://www.youtube.com/watch?v=l45f28PzfCI')

## Βήμα 3: Προβλέψεις

In [None]:
def plot_predictions_heatmap(predictions, class_names, title="Predictions over Time"):
    """
    Plots a heatmap of predictions over time (per second).

    Args:
        predictions (list): List of predicted class indices (0 to num_classes-1).
        class_names (list): List of class names (e.g., ['rock', 'pop', ...]).
        title (str): Title of the plot.
    """
    num_timesteps = len(predictions)
    num_classes = len(class_names)

    # Create heatmap data (one-hot encoding)
    heatmap_data = np.zeros((num_classes, num_timesteps))
    for t, pred in enumerate(predictions):
        heatmap_data[pred, t] = 1

    # Plot
    plt.figure(figsize=(15, 5))
    sns.heatmap(
        heatmap_data,
        cmap="YlOrRd",
        yticklabels=class_names,
        xticklabels=10,  # Show every 10th timestep for clarity
        cbar_kws={'label': 'Prediction (1=yes, 0=no)'}
    )
    plt.title(title)
    plt.xlabel("Time (seconds)")
    plt.ylabel("Music Genre")
    plt.show()


In [None]:
def evaluate_video_predictions(predictions, test_accuracy, class_names):
    """
    Evaluates video predictions and compares with test set accuracy.

    Args:
        predictions (list): Predicted classes for each timestep.
        test_accuracy (float): Accuracy on the test set (0.0 to 1.0).
        class_names (list): List of class names.
    """
    # Check if predictions list is empty
    if not predictions:
        print("No predictions available for evaluation.")
        return

    # Calculate majority class and consistency using mode result object
    # .mode attribute returns the modal value(s)
    # .count attribute returns the count(s) of the modal value(s)
    mode_result = mode(predictions)
    majority_class = int(mode_result.mode) # Access the mode value using .mode attribute
    consistency = np.mean(np.array(predictions) == majority_class)

    print(f"\nMajority class: {class_names[majority_class]}")
    print(f"Consistency: {consistency:.2%}")
    print(f"Test set accuracy: {test_accuracy:.2%}")

    # Compare with test accuracy
    if consistency >= test_accuracy * 0.9:  # Allow 10% tolerance
        print("Model performance is consistent with test set.")
    else:
        print("Model performance differs significantly from test set.")

In [None]:
# Load your trained model and dataloaders
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = best_model_relu  # Your trained CNN model (from previous steps)
test_accuracy = 0.68
class_names = list(class_mapping.values())  # From your existing code

# Load YouTube melgrams (replace with your actual data)
youtube_melgrams = np.load("youtube_melgrams.npy", allow_pickle=True)
youtube_melgrams = torch.FloatTensor(youtube_melgrams).unsqueeze(1)  # Add channel dim

# Create a DataLoader for inference (shuffle=False)
youtube_dataset = TensorDataset(youtube_melgrams, torch.zeros(len(youtube_melgrams)))
youtube_loader = DataLoader(youtube_dataset, batch_size=16, shuffle=False)

# Get predictions
predictions = get_predictions(model, youtube_loader, device)

# Plot heatmap
plot_predictions_heatmap(predictions, class_names, "YouTube Video Predictions")

# Evaluate predictions
evaluate_video_predictions(predictions, test_accuracy, class_names)