In [None]:
import os
import torch
import pandas as pd
import torchaudio
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
import time
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from tqdm.notebook import tqdm
from torch.utils.data import random_split, Subset

In [None]:
DATA_DIR = 'genres' 
SAMPLE_RATE = 22050 # GTZAN default sample rate
DURATION_SECONDS = 30 # Each audio file is 30 seconds
TARGET_SAMPLES = SAMPLE_RATE * DURATION_SECONDS # Total samples per audio file
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Parameters for Mel Spectrogram
N_MELS = 128
N_FFT = 2048
HOP_LENGTH = 512

def log(message , file_path="log.txt"):
    try:
        with open(file_path,'a') as fl:
            fl.write(message + '\n')
    except Exception as e:
        print("File not found")

# --- Custom Dataset Class ---
class Data_Preprocessing(Dataset):
    def __init__(self, data_dir, sample_rate=22050, duration_seconds=30, transform=None):
        self.data_dir = data_dir
        self.sample_rate = sample_rate
        self.duration_samples = sample_rate * duration_seconds
        self.transform = transform
        self.audio_files = []
        self.labels = []
        self.label_map = {}
        self._load_dataset()

    def _load_dataset(self):
        genres = [d for d in os.listdir(self.data_dir) if os.path.isdir(os.path.join(self.data_dir, d))]
        genres.sort()
        for i, genre in enumerate(genres):
            self.label_map[genre] = i
            genre_path = os.path.join(self.data_dir, genre)
            for audio_file in os.listdir(genre_path):
                if audio_file.endswith('.au'):
                    self.audio_files.append(os.path.join(genre_path, audio_file))
                    self.labels.append(self.label_map[genre])

    def __len__(self):
        return len(self.audio_files)

    def __getitem__(self, idx):
        audio_path = self.audio_files[idx]
        label = self.labels[idx]

        # Load audio (waveform, sample_rate)
        # normalized to [-1, 1]
        waveform, sr = torchaudio.load(audio_path)
        waveform = waveform / waveform.abs().max()

        # Preprocessing: Resampling, Padding/Trimming, and moving to GPU
        waveform = waveform.to(device)

        # 1. Resampling to 22050Hz 
        if sr != self.sample_rate:
            resampler = torchaudio.transforms.Resample(orig_freq=sr, new_freq=self.sample_rate).to(device)
            waveform = resampler(waveform)

        # 2. Padding or Trimming to a fixed length (critical for batching in DL)
        if waveform.shape[1] < self.duration_samples:
            padding_needed = self.duration_samples - waveform.shape[1]
            waveform = F.pad(waveform, (0, padding_needed))
        elif waveform.shape[1] > self.duration_samples:
            waveform = waveform[:, :self.duration_samples]

        if self.transform:
            features = self.transform(waveform)
            return features, torch.tensor(label, dtype=torch.long)
        else:
            return waveform, torch.tensor(label, dtype=torch.long)

# --- Define Feature Extraction Transform ---
# MelSpectrogram is a common choice and is GPU-accelerated by torchaudio
mel_spectrogram_transform = nn.Sequential(torchaudio.transforms.MelSpectrogram(sample_rate=SAMPLE_RATE,n_fft=N_FFT,hop_length=HOP_LENGTH,n_mels=N_MELS,power=2.0),torchaudio.transforms.AmplitudeToDB(top_db=100.0)).to(device)

# --- Initialize Dataset and DataLoader ---
dataset = Data_Preprocessing(data_dir=DATA_DIR,sample_rate=SAMPLE_RATE,duration_seconds=DURATION_SECONDS,transform=mel_spectrogram_transform)


BATCH_SIZE = 16 # Adjust based on your GPU memory
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

log(f"Total number of audio files: {len(dataset)}")
log(f"Number of genres: {len(dataset.label_map)}")
log(f"Genre map: {dataset.label_map}")

OUTPUT_SPECTROGRAMS_DIR = 'Spectrogram'
DPI = 100 # Dots per inch for saved image resolution
FIGURE_SIZE = (10, 4) # Width, Height in inches, similar to your plt.figure()

for i in range(len(dataset)):
    features, label_idx = dataset[i]
    features_np = features.squeeze(0).cpu().numpy()

    # Get the genre name
    genre_name = list(dataset.label_map.keys())[label_idx.item()]

    # Create genre-specific subdirectory
    genre_output_dir = os.path.join(OUTPUT_SPECTROGRAMS_DIR, genre_name)
    if not os.path.exists(genre_output_dir):
        os.makedirs(genre_output_dir)

    audio_filename = os.path.basename(dataset.data_info[i]['filepath'])
    spectrogram_filename = f"{os.path.splitext(audio_filename)[0]}.png"

    save_path = os.path.join(genre_output_dir, spectrogram_filename)

    # Create plot and save
    plt.figure(figsize=FIGURE_SIZE, dpi=DPI)
    plt.imshow(features_np, origin='lower', aspect='auto', cmap='magma')
    plt.title(f"Genre: {genre_name}")
    plt.xlabel("Time Frames")
    plt.ylabel("Mel Bins")
    plt.colorbar(format='%+2.0f dB')
    plt.tight_layout()
    plt.savefig(save_path, bbox_inches='tight', pad_inches=0)
    plt.close() # Close the plot to free memory

log(f"All Mel spectrograms saved to '{OUTPUT_SPECTROGRAMS_DIR}' directory.")

In [None]:
# Define split ratios
train_split = 0.8
validation_split = 0.1
test_split = 0.1

# Calculating split sizes
total_size = len(dataset)
train_size = int(train_split * total_size)
validation_size = int(validation_split * total_size)
test_size = total_size - train_size - validation_size # Ensure all samples are used

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, validation_size, test_size],generator=torch.Generator().manual_seed(42))

log(f"Dataset created with sizes: Train={len(train_dataset)}, Val={len(val_dataset)}, Test={len(test_dataset)}")

train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)

In [None]:
class AudioCNN_model(nn.Module):
    def __init__(self, num_classes, n_mels, time_frames):
        super(AudioCNN_model, self).__init__()

        # Convolutional 1
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        )

        # Convolutional 2
        self.conv2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        )

        # Convolutional 3
        self.conv3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        )

        # Calculate output size after conv layers
        dummy_input = torch.randn(1, 1, n_mels, time_frames)
        with torch.no_grad():
            x = self.conv1(dummy_input)
            x = self.conv2(x)
            x = self.conv3(x)
            self._to_linear = x.shape[1] * x.shape[2] * x.shape[3]

        # Fully connected layers
        self.fc = nn.Sequential(
            nn.Linear(self._to_linear, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.5),

            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.5),

            nn.Linear(256, num_classes)
        )

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

time_frames = (SAMPLE_RATE * DURATION_SECONDS) // HOP_LENGTH + 1

num_classes = len(dataset.label_map)
model = AudioCNN_model(num_classes, N_MELS, time_frames).to(device)

In [None]:

LR = 0.001
Epochs = 20

# Loss function and optmz
criterion = nn.CrossEntropyLoss()
optmz = optim.Adam(model.parameters(), lr=LR, weight_decay=1e-5)

def train_model(model, dataloader, criterion, optmz, device):
    model.train()
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    for batch_idx, (features, labels) in enumerate(tqdm(dataloader, desc="Training", leave=False)):
        features = features.to(device)
        labels = labels.to(device)

        optmz.zero_grad()
        outputs = model(features)
        loss = criterion(outputs, labels)

        loss.backward()
        optmz.step()

        running_loss += loss.item() * features.size(0)
        _, predicted = torch.max(outputs.data, 1)
        total_samples += labels.size(0)
        correct_predictions += (predicted == labels).sum().item()

    epoch_loss = running_loss / total_samples
    epoch_accuracy = correct_predictions / total_samples
    return epoch_loss, epoch_accuracy


# Validation Function
def evaluate_model(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0
    all_labels = []
    all_predictions = []

    with torch.no_grad():
        for features, labels in tqdm(dataloader, desc="Evaluating", leave=False):
            features = features.to(device)
            labels = labels.to(device)

            outputs = model(features)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * features.size(0)
            _, predicted = torch.max(outputs.data, 1)

            total_samples += labels.size(0)
            correct_predictions += (predicted == labels).sum().item()

            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())

    epoch_loss = running_loss / total_samples
    epoch_accuracy = correct_predictions / total_samples
    return epoch_loss, epoch_accuracy, all_labels, all_predictions


# Training
log(f"Starting training on {device} for {Epochs} epochs...")
best_val_accuracy = 0.0

for epoch in range(Epochs):
    start_time = time.time()

    train_loss, train_accuracy = train_model(model, train_dataloader, criterion, optmz, device)
    val_loss, val_accuracy, _, _ = evaluate_model(model, val_dataloader, criterion, device)

    end_time = time.time()
    epoch_duration = end_time - start_time

    log(f"Epoch {epoch+1}/{Epochs} - Duration: {epoch_duration:.2f}s")
    log(f"  Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.4f}")
    log(f"  Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f}")

    # Save best model based on validation accuracy
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), 'best_model.pth')
        log(f"  --> Saved best model with Val Acc: {best_val_accuracy:.4f}")

log("\nTraining complete!")

# --- Final Evaluation on Test Set ---
log("\nEvaluating on Test Set...")
model.load_state_dict(torch.load('best_model.pth')) # Load the best model
test_loss, test_accuracy, true_labels, predictions = evaluate_model(model, test_dataloader, criterion, device)

log(f"\nTest Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

# Detailed Classification Report
log("\nClassification Report:")
target_names = list(dataset.label_map.keys())
log(classification_report(true_labels, predictions, target_names=target_names))