In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import torch


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
!cp -r /content/drive/MyDrive/Thesis/FSD50K /content/

In [None]:
!rsync -av --progress /content/drive/MyDrive/Thesis/FSD50K /content/

In [None]:
train_csv_path = '/content/FSD50K/FSD50K.metadata/collection/modified_collection_dev.csv'
test_csv_path = '/content/FSD50K/FSD50K.metadata/collection/modified_collection_test.csv'
train_audio_dir = '/content/FSD50K/FSD50K.dev_audio/'
test_audio_dir = '/content/FSD50K/FSD50K.eval_audio/'

In [None]:
import pandas as pd
import torchaudio
from torch.utils.data import Dataset, DataLoader
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchaudio.transforms as transforms
import torch.optim as optim
from sklearn.cluster import KMeans

# Preprocessing


In [None]:
train_df = pd.read_csv(train_csv_path)
test_df = pd.read_csv(test_csv_path)

train_df_sampled = train_df.sample(n=20483, random_state=42)

#Save the new CSV file with the reduced training dataset
train_csv_sampled_path = '/content/FSD50K/FSD50K.metadata/collection/modified_collection_dev_20483_new.csv'
train_df_sampled.to_csv(train_csv_sampled_path, index=False)


print(f"Reduced CSV file saved to: {train_csv_sampled_path}")

test_df_sampled = test_df.sample(n=5116, random_state=42)  # Adjust 'n' as needed

#Save the new CSV file with the reduced test dataset
test_csv_sampled_path = '/content/FSD50K/FSD50K.metadata/collection/modified_collection_test_5116_new.csv'
test_df_sampled.to_csv(test_csv_sampled_path, index=False)

print(f"Reduced test CSV file saved to: {test_csv_sampled_path}")

#Update the DataFrame to be used in further processing
train_df = train_df_sampled

#Update the test DataFrame to be used in further processing
test_df = test_df_sampled

print(train_df['single_label'].dtype)
print(train_df['single_label'].unique())

train_df['single_label'] = train_df['single_label'].astype(str)
test_df['single_label'] = test_df['single_label'].astype(str)


unique_labels = sorted(train_df['single_label'].unique())
label_to_index = {label: idx for idx, label in enumerate(unique_labels)}
print(f"New label mapping: {label_to_index}")

#Apply the mapping
train_df['label_idx'] = train_df['single_label'].map(label_to_index)
test_df['label_idx'] = test_df['single_label'].map(label_to_index)

#Verify that the label indices are within the correct range
print(train_df['label_idx'].value_counts())  # Check the distribution of label indices
print(train_df['label_idx'].max())  # Ensure that the maximum index does not exceed num_classes - 1

#Convert filenames to strings if necessary
train_df['fname'] = train_df['fname'].astype(str)
test_df['fname'] = test_df['fname'].astype(str)

In [None]:
train_audio_dir = '/content/drive/MyDrive/Thesis/FSD50K/FSD50K.dev_audio/'
test_audio_dir = '/content/drive/MyDrive/Thesis/FSD50K/FSD50K.eval_audio/'

In [None]:
from imblearn.over_sampling import RandomOverSampler

class FSD50KDataset(Dataset):
    def __init__(self, csv_file, audio_dir, transform=None, max_len=1000):
        self.dataframe = pd.read_csv(csv_file)
        self.audio_dir = audio_dir
        self.transform = transform
        self.max_len = max_len  #Maximum length to pad/truncate sequences
        self.mel_spectrogram = transforms.MelSpectrogram(
            sample_rate=16000,
            n_mels=64,
            n_fft=1024,
            hop_length=512
        )


        self.dataframe = self.dataframe.dropna(subset=['single_label'])


        self.label_to_idx = {label: idx for idx, label in enumerate(self.dataframe['single_label'].unique())}
        self.dataframe['encoded_label'] = self.dataframe['single_label'].map(self.label_to_idx)


        X = self.dataframe.drop('encoded_label', axis=1)
        y = self.dataframe['encoded_label']


        ros = RandomOverSampler()
        X_resampled, y_resampled = ros.fit_resample(X, y)

        #Combine the resampled features and labels back into a DataFrame
        self.dataframe = X_resampled.copy()
        self.dataframe['encoded_label'] = y_resampled

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        audio_name = os.path.join(self.audio_dir, str(self.dataframe.iloc[idx, 0]) + '.wav')
        waveform, sample_rate = torchaudio.load(audio_name)

        #Resample if necessary
        if sample_rate != 16000:
            waveform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)(waveform)

        #Convert to Mel spectrogram
        mel_spec = self.mel_spectrogram(waveform)

        #Normalize the spectrogram to the range [0, 1]
        mel_spec = (mel_spec - mel_spec.min()) / (mel_spec.max() - mel_spec.min())

        #Ensure the spectrogram has the correct shape (1 channel)
        if mel_spec.size(0) != 1:
            mel_spec = mel_spec.mean(dim=0, keepdim=True)  # Convert multi-channel to single-channel by averaging

        #Pad or truncate the spectrogram to the max_len
        if mel_spec.size(-1) < self.max_len:
            padding = self.max_len - mel_spec.size(-1)
            mel_spec = F.pad(mel_spec, (0, padding))
        else:
            mel_spec = mel_spec[:, :, :self.max_len]

        #Retrieve the encoded label
        label = self.dataframe.iloc[idx]['encoded_label']
        label = torch.tensor(label).long()  #Convert label to tensor

        return mel_spec, label

In [None]:
def pad_collate_fn(batch):
    #Find the maximum width (time dimension) of the spectrograms in the batch
    max_len = max([item[0].size(-1) for item in batch])  # Using size(-1) to get the last dimension

    #Pad all spectrograms in the batch to have the same width
    batch_padded = []
    for waveform, label in batch:
        #Padding the last dimension to the max_len
        padded_waveform = F.pad(waveform, (0, max_len - waveform.size(-1)))
        batch_padded.append((padded_waveform, label))

    #Stack the tensors to form the batch
    waveforms = torch.stack([item[0] for item in batch_padded])
    labels = torch.tensor([item[1] for item in batch_padded])

    return waveforms, labels

In [None]:
batch_size = 32

train_dataset = FSD50KDataset(csv_file=train_csv_path, audio_dir=train_audio_dir)
test_dataset = FSD50KDataset(csv_file=test_csv_path, audio_dir=test_audio_dir)

#Create data loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=pad_collate_fn, num_workers=4, pin_memory=True)
'''
for i, (inputs, labels) in enumerate(train_loader):
    try:
        # Attempt to transfer inputs and labels to the device
        inputs, labels = inputs.to(device), labels.to(device)
    except RuntimeError as e:
        # Log detailed information about the error
        print(f"Error with batch {i}, error: {e}")

        # Log the filenames involved in the problematic batch
        problematic_files = train_df.iloc[i * batch_size:(i + 1) * batch_size]['fname'].values
        print(f"Files in this batch: {problematic_files}")

        # Skip this batch and continue with the next one
        continue
'''

test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=pad_collate_fn, num_workers=4, pin_memory=True)

# Supervised


In [None]:
class AudioCNN(nn.Module):
    def __init__(self, num_classes, max_len=1000):
        super(AudioCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.conv5 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(2, 2)

        #Update flattened size based on the output after all conv and pool layers
        flattened_size = 256 * 2 * 31

        #Define fully connected layers
        self.fc1 = nn.Linear(flattened_size, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = self.pool(F.relu(self.conv4(x)))
        x = self.pool(F.relu(self.conv5(x)))

        #Flatten the tensor for the fully connected layers
        x = x.view(x.size(0), -1)

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [None]:
label_distribution = train_df['single_label'].value_counts()
print(label_distribution)

In [None]:
class_weights = 1. / label_distribution
class_weights = class_weights / class_weights.sum()  #Normalize so that weights sum to 1
class_weights = torch.tensor(class_weights.values, dtype=torch.float32)

print("Class Weights:", class_weights)

In [None]:
num_classes = train_df.iloc[:, 1].nunique()

num_classes = train_df.iloc[:, 1].nunique()
print(f"Number of classes: {num_classes}")

train_df = train_df[train_df['label_idx'] < num_classes]

max_label_in_test = test_df['label_idx'].max()
min_label_in_test = test_df['label_idx'].min()
print(f"Max label in test set: {max_label_in_test}, Min label: {min_label_in_test}")
assert max_label_in_test < num_classes, "Test set has out-of-bounds labels."

test_df = test_df[test_df['label_idx'] < num_classes]

#Re-create the DataLoader with the filtered dataset
train_dataset = FSD50KDataset(csv_file=train_csv_sampled_path, audio_dir=train_audio_dir)
test_dataset = FSD50KDataset(csv_file=test_csv_sampled_path, audio_dir=test_audio_dir)

print(f"Number of classes: {num_classes}")
print(f"Max label in train set: {train_df['label_idx'].max()}, Min label: {train_df['label_idx'].min()}")
print(f"Max label in test set: {max_label_in_test}, Min label: {min_label_in_test}")

#Check if there's any label in the training or test set that is out of bounds
out_of_bounds_train = train_df[train_df['label_idx'] >= num_classes]
out_of_bounds_test = test_df[test_df['label_idx'] >= num_classes]

print(f"Number of out-of-bounds labels in train set: {len(out_of_bounds_train)}")
print(f"Number of out-of-bounds labels in test set: {len(out_of_bounds_test)}")

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=pad_collate_fn, num_workers=4, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=pad_collate_fn, num_workers=4, pin_memory=True)

In [None]:
#Initialize model, loss function, and optimizer
model = AudioCNN(num_classes)
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

In [None]:
num_batches = len(train_loader)
print(f'Total number of batches: {num_batches}')

In [None]:
num_epochs = 10
print(f"Before batching, unique label indices:", train_df['label_idx'].unique())

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        #Debugging print for labels in each batch
        print(f"Batch {i} - Labels: {labels.tolist()}")

        #Ensure labels are within the correct range
        if any(lbl >= num_classes for lbl in labels):
            print(f"Error: Found label {lbl.item()} which is out of bounds!")
            print(f"Batch {i} - Inputs shape: {inputs.shape}")
            break

        optimizer.zero_grad()
        outputs = model(inputs)

        #Compute loss
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    #Step the scheduler
    scheduler.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

print(f"Final learning rate: {scheduler.get_last_lr()}")

In [None]:
model_save_path = '/content/drive/MyDrive/Thesis/FSD50K/trained_audio_cnn_model5.pth'
torch.save(model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")

In [None]:
print(f"Max label in test set: {test_df['label_idx'].max()}")
print(f"Number of classes the model was trained on: {num_classes}")

In [None]:
print(f"Unique labels in test set: {sorted(test_df['label_idx'].unique())}")
print(f"Unique labels in training set: {sorted(train_df['label_idx'].unique())}")

In [None]:
model = AudioCNN(num_classes)  #Adjust the number of classes if necessary
model.load_state_dict(torch.load(model_save_path))

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix, classification_report, precision_recall_curve, roc_curve, auc
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import average_precision_score
import numpy as np

model.eval()
criterion = nn.CrossEntropyLoss()
total_loss = 0.0
correct_predictions = 0
total_samples = 0

all_labels = []
all_predictions = []
all_probs = []  #To store the predicted probabilities for mAP calculation

with torch.no_grad():  #Disable gradient computation for evaluation
    for batch_idx, (inputs, labels) in enumerate(test_loader):

        valid_indices = labels < num_classes
        inputs = inputs[valid_indices]
        labels = labels[valid_indices]

        if len(labels) == 0:
            continue

        #Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        total_loss += loss.item() * inputs.size(0)
        probs = torch.softmax(outputs, dim=1)
        _, predicted = torch.max(outputs, 1)
        correct_predictions += (predicted == labels).sum().item()
        total_samples += labels.size(0)

        #Store all labels and predictions for metric calculation
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())
        all_probs.extend(probs.cpu().numpy())

#Calculate average loss and accuracy
average_loss = total_loss / total_samples
accuracy = correct_predictions / total_samples

#Calculate precision, recall, and f1 score
precision = precision_score(all_labels, all_predictions, average='weighted', zero_division=0)
recall = recall_score(all_labels, all_predictions, average='weighted')
f1 = f1_score(all_labels, all_predictions, average='weighted')

#Calculate mAP (One-vs-Rest)
all_labels_one_hot = np.eye(num_classes)[all_labels]
mAP = average_precision_score(all_labels_one_hot, all_probs, average='macro')

#Print metrics
print(f'Average loss: {average_loss:.4f}')
print(f'Accuracy: {accuracy:.4f}')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')
print(f'mAP (One-vs-Rest): {mAP:.4f}')

#Confusion Matrix
cm = confusion_matrix(all_labels, all_predictions)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=range(num_classes), yticklabels=range(num_classes))
plt.title('Confusion Matrix')
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.show()

In [None]:
from sklearn.metrics import roc_curve, auc

fpr, tpr, _ = roc_curve(all_labels, all_predictions, pos_label=1)
roc_auc = auc(fpr, tpr)

#Plot ROC curve
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()


In [None]:
from sklearn.metrics import cohen_kappa_score

kappa = cohen_kappa_score(all_labels, all_predictions)
print(f'Cohen\'s Kappa: {kappa:.2f}')

In [None]:
from collections import Counter
import matplotlib.pyplot as plt

label_distribution = Counter(all_labels)
prediction_distribution = Counter(all_predictions)

print("Label Distribution:", label_distribution)
print("Prediction Distribution:", prediction_distribution)

#Plot the class distribution
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.bar(label_distribution.keys(), label_distribution.values(), color='blue')
plt.title("Label Distribution")
plt.xlabel("Class")
plt.ylabel("Frequency")

plt.subplot(1, 2, 2)
plt.bar(prediction_distribution.keys(), prediction_distribution.values(), color='red')
plt.title("Prediction Distribution")
plt.xlabel("Class")
plt.ylabel("Frequency")

plt.tight_layout()

# VGGish

In [None]:
!pip install torchvggish

In [None]:
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
import torchaudio.transforms as transforms
from torchvision import models
import torch.nn as nn
import torch.nn.functional as F
from torchvision.transforms import Resize

In [None]:
class FSD50KDataset(Dataset):
    def __init__(self, csv_file, audio_dir, transform=None, max_len=1000):
        self.dataframe = pd.read_csv(csv_file)
        self.audio_dir = audio_dir
        self.transform = transform
        self.max_len = max_len

        #Encode string labels to integers
        label_to_idx = {label: idx for idx, label in enumerate(self.dataframe['single_label'].unique())}
        self.dataframe['encoded_label'] = self.dataframe['single_label'].map(label_to_idx)

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        audio_name = os.path.join(self.audio_dir, str(self.dataframe.iloc[idx, 0]) + '.wav')
        waveform, sample_rate = torchaudio.load(audio_name)

        if self.transform:
            waveform = self.transform(waveform)

        mel_spec = torchaudio.transforms.MelSpectrogram(
            sample_rate=sample_rate, n_mels=64, n_fft=1024, hop_length=512
        )(waveform)

        #Ensure the spectrogram has the correct shape (1 channel)
        mel_spec = mel_spec.mean(dim=0, keepdim=True)

        #Normalize the spectrogram
        mel_spec = (mel_spec - mel_spec.mean()) / mel_spec.std()

        #Pad or truncate the spectrogram to the max_len
        if mel_spec.size(-1) < self.max_len:
            padding = self.max_len - mel_spec.size(-1)
            mel_spec = F.pad(mel_spec, (0, padding))
        else:
            mel_spec = mel_spec[:, :, :self.max_len]

        mel_spec = mel_spec.repeat(3, 1, 1)

        label = self.dataframe.iloc[idx]['encoded_label']
        label = torch.tensor(label).long()  # Convert label to tensor

        return mel_spec, label

In [None]:
class VGGishClassifier(nn.Module):
    def __init__(self, num_classes):
        super(VGGishClassifier, self).__init__()
        self.vggish = models.vgg16(pretrained=True)

        #Freeze the VGGish layers
        for param in self.vggish.parameters():
            param.requires_grad = False

        #Replace the classifier to fit your task
        self.vggish.classifier[-1] = nn.Linear(self.vggish.classifier[-1].in_features, num_classes)

    def forward(self, x):
        x = self.vggish(x)
        return x

In [None]:
train_dataset = FSD50KDataset(csv_file=train_csv_path, audio_dir=train_audio_dir)
test_dataset = FSD50KDataset(csv_file=test_csv_path, audio_dir=test_audio_dir)

#Identifying and keeping top n classes
class_distribution = train_dataset.dataframe['encoded_label'].value_counts()
top_n_classes = class_distribution.head(50).index
filtered_df = train_dataset.dataframe[train_dataset.dataframe['encoded_label'].isin(top_n_classes)]
train_dataset.dataframe = filtered_df.reset_index(drop=True)

#Recalculate number of classes and remap labels
num_classes = len(filtered_df['encoded_label'].unique())
label_to_idx = {label: idx for idx, label in enumerate(filtered_df['encoded_label'].unique())}
train_dataset.dataframe['encoded_label'] = train_dataset.dataframe['encoded_label'].map(label_to_idx)


In [None]:

#DataLoader
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)


test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)



In [None]:
print(train_dataset.dataframe.columns)

In [None]:
num_classes = len(train_dataset.dataframe['encoded_label'].unique())
print(num_classes)

model = VGGishClassifier(num_classes=num_classes)
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)

In [None]:
print(f"Unique labels in training data: {train_dataset.dataframe['encoded_label'].unique()}")

In [None]:
print("Minimum label value:", train_dataset.dataframe['encoded_label'].min())
print("Maximum label value:", train_dataset.dataframe['encoded_label'].max())

In [None]:
unique_labels = train_dataset.dataframe['encoded_label'].unique()
print(f"Unique labels in the dataset: {unique_labels}")
print(f"Number of unique labels: {len(unique_labels)}")

In [None]:
for inputs, labels in train_loader:
    print(f"Sample inputs: {inputs}")
    print(f"Sample labels: {labels}")
    break

In [None]:
from torch.optim.lr_scheduler import StepLR

num_epochs = 5

from torch.nn.utils import clip_grad_norm_

optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
scheduler = StepLR(optimizer, step_size=5, gamma=0.1)

#Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for batch_idx, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()

        #Clip gradients to prevent exploding gradients
        clip_grad_norm_(model.parameters(), max_norm=1.0)

        optimizer.step()
        running_loss += loss.item()

        if batch_idx % 10 == 0:
            print(f"Batch {batch_idx}/{len(train_loader)} - Loss: {loss.item():.4f}")

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")
    scheduler.step()

In [None]:
!pip install torch
!pip install torchaudio
!pip install torchvision
!pip install torch-audiomentations

In [None]:
!pip install imbalanced-learn

In [None]:
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from torchvision import models
import torch.nn as nn
from imblearn.over_sampling import RandomOverSampler

In [None]:
class FSD50KDataset(Dataset):
    def __init__(self, csv_file, audio_dir, transform=None, max_len=1000):
        self.dataframe = pd.read_csv(csv_file)
        self.audio_dir = audio_dir
        self.transform = transform
        self.max_len = max_len

        #Encode string labels to integers
        label_to_idx = {label: idx for idx, label in enumerate(self.dataframe['single_label'].unique())}
        self.dataframe['encoded_label'] = self.dataframe['single_label'].map(label_to_idx)

        #Filter to keep only the top 50 classes
        top_n_classes = self.dataframe['encoded_label'].value_counts().head(50).index
        self.dataframe = self.dataframe[self.dataframe['encoded_label'].isin(top_n_classes)]
        self.dataframe.reset_index(drop=True, inplace=True)

        #Re-encode labels after filtering
        label_to_idx = {label: idx for idx, label in enumerate(self.dataframe['encoded_label'].unique())}
        self.dataframe['encoded_label'] = self.dataframe['encoded_label'].map(label_to_idx)

        #Separate features and labels for oversampling
        X = self.dataframe.drop('encoded_label', axis=1)
        y = self.dataframe['encoded_label']

        #Perform oversampling
        ros = RandomOverSampler()
        X_resampled, y_resampled = ros.fit_resample(X, y)

        #Combine the resampled features and labels back into a DataFrame
        self.dataframe = X_resampled.copy()
        self.dataframe['encoded_label'] = y_resampled

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        audio_name = os.path.join(self.audio_dir, str(self.dataframe.iloc[idx, 0]) + '.wav')
        waveform, sample_rate = torchaudio.load(audio_name)

        if self.transform:
            waveform = self.transform(waveform)

        mel_spec = torchaudio.transforms.MelSpectrogram(
            sample_rate=sample_rate, n_mels=64, n_fft=1024, hop_length=512)(waveform)

        mel_spec = mel_spec.mean(dim=0, keepdim=True)
        mel_spec = (mel_spec - mel_spec.mean()) / (mel_spec.std() + 1e-6)

        if mel_spec.size(-1) < self.max_len:
            padding = self.max_len - mel_spec.size(-1)
            mel_spec = nn.functional.pad(mel_spec, (0, padding))
        else:
            mel_spec = mel_spec[:, :, :self.max_len]

        mel_spec = nn.functional.interpolate(mel_spec.unsqueeze(0), size=(224, 224), mode='bilinear', align_corners=False).squeeze(0)
        mel_spec = mel_spec.repeat(3, 1, 1)

        label = torch.tensor(self.dataframe.iloc[idx]['encoded_label']).long()
        return mel_spec, label

In [None]:
class VGGishClassifier(nn.Module):
    def __init__(self, num_classes):
        super(VGGishClassifier, self).__init__()
        self.vggish = models.vgg16(pretrained=True)

        for param in self.vggish.parameters():
            param.requires_grad = False

        self.vggish.classifier[-1] = nn.Linear(self.vggish.classifier[-1].in_features, num_classes)

    def forward(self, x):
        x = self.vggish(x)
        return x

In [None]:
def set_bn_eval(m):
    if isinstance(m, torch.nn.modules.batchnorm._BatchNorm):
        m.eval()
        m.weight.requires_grad = False
        m.bias.requires_grad = False

In [None]:
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torch.nn.utils import clip_grad_norm_

model = VGGishClassifier(num_classes=50)

model.apply(set_bn_eval)

num_epochs = 10
learning_rate = 1e-4
batch_size = 32

train_dataset = FSD50KDataset(csv_file=train_csv_sampled_path, audio_dir=train_audio_dir)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)

In [None]:
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

labels = train_dataset.dataframe['encoded_label']
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(labels), y=labels)
class_weights = torch.tensor(class_weights, dtype=torch.float32)

print(class_weights)

In [None]:
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = StepLR(optimizer, step_size=5, gamma=0.1)

In [None]:
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for batch_idx, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()

        clip_grad_norm_(model.parameters(), max_norm=1.0)

        optimizer.step()
        running_loss += loss.item()

        if batch_idx % 10 == 0:
            print(f"Batch {batch_idx}/{len(train_loader)} - Loss: {loss.item():.4f}")

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}")
    scheduler.step()

In [None]:
model_save_path = '/content/drive/MyDrive/Thesis/FSD50K/vggish_trained_audio_model.pth' #
torch.save(model.state_dict(), model_save_path)
print(f"Model saved to {model_save_path}")

In [None]:
test_dataset = FSD50KDataset(csv_file=test_csv_path, audio_dir=test_audio_dir) #
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

In [None]:
unique_labels = test_dataset.dataframe['encoded_label'].unique()
print(f"Unique labels in the dataset: {unique_labels}")
print(f"Number of unique labels: {len(unique_labels)}")

In [None]:
from sklearn.metrics import confusion_matrix, classification_report, precision_recall_fscore_support
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np

In [None]:
class_names = test_dataset.dataframe['single_label'].unique().tolist()

In [None]:
model.eval()
correct = 0
total = 0
all_labels = []
all_predictions = []

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

accuracy = 100 * correct / total
print(f'Accuracy: {accuracy:.2f}%')

cm = confusion_matrix(all_labels, all_predictions)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()


precision, recall, f1_score, _ = precision_recall_fscore_support(all_labels, all_predictions, average=None)
for i, class_name in enumerate(class_names):
    print(f"Class: {class_name} - Precision: {precision[i]:.2f}, Recall: {recall[i]:.2f}, F1-Score: {f1_score[i]:.2f}")

precision, recall, f1_score, _ = precision_recall_fscore_support(all_labels, all_predictions, average='macro')
print(f"Overall (Macro) - Precision: {precision:.2f}, Recall: {recall:.2f}, F1-Score: {f1_score:.2f}")

precision, recall, f1_score, _ = precision_recall_fscore_support(all_labels, all_predictions, average='weighted')
print(f"Overall (Weighted) - Precision: {precision:.2f}, Recall: {recall:.2f}, F1-Score: {f1_score:.2f}")



In [None]:
from sklearn.metrics import roc_curve, auc

fpr, tpr, _ = roc_curve(all_labels, all_predictions, pos_label=1)
roc_auc = auc(fpr, tpr)

#Plot ROC curve
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc="lower right")
plt.show()

In [None]:
sns.heatmap(cm.astype('float') / cm.sum(axis=1)[:, np.newaxis], annot=True, fmt='.2%', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Normalized Confusion Matrix')
plt.show()

In [None]:
from sklearn.metrics import cohen_kappa_score

kappa = cohen_kappa_score(all_labels, all_predictions)
print(f'Cohen\'s Kappa: {kappa:.2f}')

# YAMNet

In [None]:
!pip install tensorflow tensorflow-hub

In [None]:
import tensorflow as tf
import tensorflow_hub as hub
import pandas as pd
import numpy as np
from sklearn.utils import class_weight
from imblearn.over_sampling import RandomOverSampler
from sklearn.model_selection import train_test_split

In [None]:
yamnet_model_handle = 'https://tfhub.dev/google/yamnet/1'
yamnet_model = hub.load(yamnet_model_handle)

In [None]:
#CSV data
train_df = pd.read_csv(train_csv_path)
train_df = train_df.sample(n=20483, random_state=42)
train_filenames = train_df['fname'].astype(str).values

import librosa

def load_and_preprocess_audio(filename, label):
    if isinstance(filename, tf.Tensor):
        filename = tf.compat.as_str_any(filename.numpy())

    audio_path = os.path.join(train_audio_dir, filename + '.wav')

    waveform, _ = librosa.load(audio_path, sr=16000)

    waveform = (waveform - waveform.mean()) / waveform.max()

    if len(waveform) < 16000:
        waveform = np.pad(waveform, (0, 16000 - len(waveform)), mode='constant')
    elif len(waveform) > 16000:
        waveform = waveform[:16000]

    return tf.convert_to_tensor(waveform, dtype=tf.float32), label

def load_and_preprocess_audio_wrapper(filename, label):
    waveform, label = tf.py_function(
        load_and_preprocess_audio,
        [filename, label],
        [tf.float32, tf.int32]
    )
    label = tf.cast(label, tf.int32)
    return waveform, label


In [None]:
print(train_df.columns)

In [None]:
from sklearn.preprocessing import LabelEncoder

le = LabelEncoder()
train_df['encoded_label'] = le.fit_transform(train_df['single_label'])

In [None]:
train_filenames = train_df['fname'].values
train_labels = train_df['encoded_label'].values

train_dataset = tf.data.Dataset.from_tensor_slices((train_filenames, train_labels.astype(np.int32)))
train_dataset = train_dataset.map(load_and_preprocess_audio_wrapper)
train_loader = train_dataset.shuffle(buffer_size=1024).batch(32)

In [None]:
class YAMNetClassifier(tf.keras.Model):
    def __init__(self, num_classes):
        super(YAMNetClassifier, self).__init__()
        self.yamnet_model = hub.KerasLayer("https://tfhub.dev/google/yamnet/1", trainable=False)
        self.global_avg_pool = tf.keras.layers.GlobalAveragePooling1D()
        self.fc = tf.keras.layers.Dense(num_classes)

    def call(self, inputs, training=False):
        embeddings, _, _ = self.yamnet_model(inputs)  #Get embeddings from YAMNet
        pooled_embeddings = self.global_avg_pool(embeddings)  #Pool the embeddings
        return self.fc(pooled_embeddings)  #Pass through final dense layer

In [None]:
class SimpleModel(tf.keras.Model):
    def __init__(self, num_classes):
        super(SimpleModel, self).__init__()
        self.fc = tf.keras.layers.Dense(num_classes)

    def call(self, inputs):
        print("Input shape:", inputs.shape)
        return self.fc(inputs)



In [None]:
from sklearn.preprocessing import LabelEncoder

le = LabelEncoder()
train_df['encoded_label'] = le.fit_transform(train_df['single_label'])
num_classes = len(le.classes_)

print(num_classes)

In [None]:
print(train_df['encoded_label'].unique())

In [None]:
model = YAMNetClassifier(num_classes=num_classes)

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
)

In [None]:
for data, label in train_loader.take(1):
    print(data.shape, label.shape)

In [None]:
sample_data, sample_label = next(iter(train_loader))
output = model(sample_data)

print(f"Output shape: {output.shape}")

In [None]:
print("TensorFlow version:", tf.__version__)
print("TensorFlow Hub version:", hub.__version__)

In [None]:
single_sample_data = tf.reshape(single_sample_data, [1, 16000])

#Ensure data type is float32
single_sample_data = tf.cast(single_sample_data, dtype=tf.float32)

#Pass through YAMNet
output = model(single_sample_data)

#Print output shape
print(f"Single sample model output shape: {output.shape}")

In [None]:
sample_data, _ = next(iter(train_loader))
print(f"Sample data shape before model: {sample_data.shape}")

#Ensure correct shape and dtype
sample_data = tf.ensure_shape(sample_data, (None, 16000))
sample_data = tf.cast(sample_data, tf.float32)

#Pass through YAMNet model
output = model(sample_data)
print(f"Model output shape: {output.shape}")

In [None]:
epochs = 10

for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")
    for step, (data, labels) in enumerate(train_loader):
        with tf.GradientTape() as tape:
            predictions = model(data)
            loss = tf.keras.losses.sparse_categorical_crossentropy(labels, predictions, from_logits=True)

        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        if step % 100 == 0:
            print(f"Step {step}, Loss: {tf.reduce_mean(loss).numpy()}")

## Unsupervised

In [None]:
class ConvAutoencoder(nn.Module):
    def __init__(self):
        super(ConvAutoencoder, self).__init__()
        #Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=2, padding=1),
            nn.ReLU(True),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.ReLU(True),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.ReLU(True)
        )
        #Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(True),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(True),
            nn.ConvTranspose2d(16, 1, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [None]:
class FSD50KDatasetUSP(Dataset):
    def __init__(self, csv_file, audio_dir, transform=None):
        self.dataframe = pd.read_csv(csv_file)
        self.audio_dir = audio_dir
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        audio_name = os.path.join(self.audio_dir, str(self.dataframe.iloc[idx, 0]) + '.wav')
        waveform, sample_rate = torchaudio.load(audio_name)

        if self.transform:
            waveform = self.transform(waveform)

            #Normalize the waveform to the range [0, 1]
            waveform = (waveform - waveform.min()) / (waveform.max() - waveform.min())

        return waveform

In [None]:
def pad_collate_fn(batch):
    #Find the longest sequence in the batch
    max_len = max(item[0].size(-1) for item in batch)

    #Pad the sequences to the same length
    padded_batch = []
    for data in batch:
        waveform, = data
        padding = max_len - waveform.size(-1)
        if padding > 0:
            waveform = F.pad(waveform, (0, padding), "constant", 0)
        padded_batch.append((waveform,))

    #Stack the padded sequences
    batch = torch.stack([item[0] for item in padded_batch])
    return batch

In [None]:
label_distribution = train_df['single_label'].value_counts()
print(label_distribution)

In [None]:
class_weights = 1. / label_distribution
class_weights = class_weights / class_weights.sum()
class_weights = torch.tensor(class_weights.values, dtype=torch.float32)

print("Class Weights:", class_weights)

In [None]:
batch_size = 32

transform = transforms.MelSpectrogram(sample_rate=16000, n_fft=1024, hop_length=512)
train_dataset = FSD50KDatasetUSP(csv_file=train_csv_sampled_path, audio_dir=train_audio_dir, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=pad_collate_fn, num_workers=4, pin_memory=True)

In [None]:
model = ConvAutoencoder()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

In [None]:
import torch.optim.lr_scheduler as lr_scheduler

scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

num_epochs = 3

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch_idx, data in enumerate(train_loader):
        data = data[0]

        #Reshape to the correct shape: (batch_size, 1, height, width)
        data = data.view(data.size(0), 1, data.size(-2), data.size(-1))  # Reshape appropriately

        optimizer.zero_grad()
        outputs = model(data)


        #Pad the input to match the size of the output
        if outputs.size(-1) > data.size(-1):
            padding = outputs.size(-1) - data.size(-1)
            data = F.pad(data, (0, padding))


        loss = criterion(outputs, data)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        print(f"Batch {batch_idx} - Loss: {loss.item():.4f}")

    scheduler.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Learning Rate: {scheduler.get_last_lr()[0]}')

In [None]:
model_save_path = '/content/drive/MyDrive/Thesis/FSD50K/autoencoder_model5.pth'

torch.save(model.state_dict(), model_save_path)

print(f"Model saved to {model_save_path}")

In [None]:
test_dataset = FSD50KDatasetUSP(csv_file=test_csv_sampled_path, audio_dir=test_audio_dir, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=pad_collate_fn, num_workers=4, pin_memory=True)

In [None]:
autoencoder = ConvAutoencoder()
autoencoder.load_state_dict(torch.load('/content/drive/MyDrive/Thesis/FSD50K/autoencoder_model5.pth'))
autoencoder.eval()

In [None]:
label_distribution = test_df['single_label'].value_counts()

In [None]:
import numpy as np

def extract_features_from_autoencoder(autoencoder, dataloader):
    features = []
    with torch.no_grad():
        for data in dataloader:
            inputs = data[0] if isinstance(data, tuple) else data

            if len(inputs.shape) == 3:
                inputs = inputs.unsqueeze(1)

            print(f"Input type: {type(inputs)}, Input shape: {inputs.shape}")

            encoded_features = autoencoder.encoder(inputs)

            flat_features = encoded_features.view(encoded_features.size(0), -1)
            print(f"Flat features shape: {flat_features.shape}")

            features.append(flat_features.numpy())

    #Ensure all features have the same shape
    min_shape = min(f.shape[1] for f in features)
    features = [f[:, :min_shape] for f in features]

    return np.concatenate(features, axis=0)

from sklearn.cluster import KMeans
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score
from sklearn.preprocessing import LabelEncoder

true_labels = test_df['single_label'].values

#Encode the string labels to integers
label_encoder = LabelEncoder()
true_labels_encoded = label_encoder.fit_transform(true_labels)

#Extract features from the test set
test_features = extract_features_from_autoencoder(autoencoder, test_loader)

#Apply K-Means clustering
n_clusters = 332
print("Starting K-Means clustering...")
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
kmeans.fit(test_features)

#Get the cluster labels
cluster_labels = kmeans.labels_

print(f"Cluster labels: {cluster_labels}")

ari = adjusted_rand_score(true_labels_encoded, cluster_labels)
nmi = normalized_mutual_info_score(true_labels_encoded, cluster_labels)
print(f"Adjusted Rand Index: {ari}")
print(f"Normalized Mutual Information: {nmi}")

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import silhouette_samples, silhouette_score
from sklearn.manifold import TSNE
import numpy as np
from sklearn.cluster import KMeans

silhouette_avg = silhouette_score(test_features, cluster_labels)
sample_silhouette_values = silhouette_samples(test_features, cluster_labels)

plt.figure(figsize=(8, 6))
plt.hist(sample_silhouette_values, bins=50)
plt.title('Silhouette Scores')
plt.xlabel('Silhouette Score')
plt.ylabel('Frequency')
plt.show()

In [None]:
tsne = TSNE(n_components=2, random_state=42)
test_features_2d = tsne.fit_transform(test_features)

plt.figure(figsize=(8, 6))
plt.scatter(test_features_2d[:, 0], test_features_2d[:, 1], c=cluster_labels, cmap='viridis', s=50, alpha=0.5)
plt.colorbar()
plt.title('t-SNE plot of Clusters')
plt.xlabel('t-SNE component 1')
plt.ylabel('t-SNE component 2')
plt.show()