In [None]:
import numpy as np
import pandas as pd
import torchaudio
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

# Load the CSV file
df = pd.read_csv('extracted/set_a.csv')

# Clean the dataset
df_cleaned = df[~df['fname'].str.contains('unlabelled|^__') & df['label'].notnull()]
df_cleaned.reset_index(drop=True, inplace=True)

# Define a function to extract mel-spectrograms using Torchaudio
def extract_mel_spectrogram(file_path, duration=5, sr=22050, n_mels=128):
    try:
        # Load the audio file
        waveform, sample_rate = torchaudio.load(file_path)
        
        # Resample if necessary
        if sample_rate != sr:
            resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=sr)
            waveform = resampler(waveform)

        # Trim or pad the waveform to ensure consistent duration
        num_samples = sr * duration
        if waveform.size(1) > num_samples:
            waveform = waveform[:, :num_samples]
        else:
            waveform = torch.nn.functional.pad(waveform, (0, num_samples - waveform.size(1)))

        # Compute the mel-spectrogram
        mel_spectrogram = torchaudio.transforms.MelSpectrogram(sample_rate=sr, n_mels=n_mels)(waveform)
        
        # Convert to decibels
        mel_spectrogram_db = torchaudio.transforms.AmplitudeToDB()(mel_spectrogram)

        return mel_spectrogram_db.squeeze().numpy()  # Squeeze to remove unnecessary dimensions
    except Exception as e:
        print(f"Error encountered while parsing file: {file_path}, {str(e)}")
        return np.zeros((n_mels, 44))  # Fixed shape of the spectrogram (128 mel bands, 44 frames)

# Apply the extraction function to each audio file
df_cleaned['mel_spectrogram'] = df_cleaned['fname'].apply(lambda x: extract_mel_spectrogram(x))

# Encode the labels
le = LabelEncoder()
df_cleaned['label_encoded'] = le.fit_transform(df_cleaned['label'])

# Convert mel-spectrograms to numpy array
X = np.array([s.flatten() for s in df_cleaned['mel_spectrogram'].tolist()])

# Reshape X to fit the model (batch size, channels, height, width)
X = X.reshape(X.shape[0], 128, -1, 1)  # Assuming the second dimension is flexible

# Create a custom dataset for PyTorch
class AudioDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
        
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Split into train and test datasets
X_train, X_test, y_train, y_test = train_test_split(X, df_cleaned['label_encoded'].values, test_size=0.2, random_state=42)

# Create PyTorch datasets and data loaders
train_dataset = AudioDataset(X_train, y_train)
test_dataset = AudioDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Define a CNN model
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=(3, 3), padding='same')  # 1 input channel (grayscale)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=(3, 3), padding='same')
        self.pool = nn.MaxPool2d(kernel_size=(2, 2))
        self.fc1 = nn.Linear(64 * 32 * 22, 128)  # Adjust size based on the output dimensions after pooling
        self.fc2 = nn.Linear(128, len(np.unique(y_train)))  # Output layer for number of classes
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = x.view(-1, 64 * 32 * 22)  # Flatten the tensor
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return x

# Initialize the model, loss function, and optimizer
model = CNNModel()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training the model
num_epochs = 50
train_loss_history = []
train_acc_history = []

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        running_loss += loss.item()
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        # Calculate accuracy
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100 * correct / total
    train_loss_history.append(epoch_loss)
    train_acc_history.append(epoch_acc)
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.2f}%')

# Testing the model
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

test_accuracy = 100 * correct / total
print(f'Test Accuracy: {test_accuracy:.2f}%')

# Plot training loss and accuracy curves
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(train_loss_history, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss Over Time')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(train_acc_history, label='Training Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.title('Training Accuracy Over Time')
plt.legend()

plt.show()
