# Imports

In [None]:
# Imports

# Type annotations
from typing import Tuple

# General
import os
import zipfile
import librosa
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from transformers import AutoProcessor, ASTModel

# Image preprocessing
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder

# Model
import torch.nn as nn
import torch.optim as optim   // Optimizer
from tqdm import tqdm         // Progress bar
import torchsummary           // Model summary

# Metrics
from sklearn.metrics import  accuracy_score
from sklearn.metrics import  precision_score
from sklearn.metrics import  recall_score
from sklearn.metrics import  f1_score
from sklearn.metrics import  classification_report
from sklearn.metrics import precision_recall_curve
from sklearn.metrics import  roc_auc_score
from sklearn.metrics import confusion_matrix

# Setting up file structure

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Unzip
zip_file_paths = ['/content/drive/My Drive/GTZAN Genre Collection.zip',
                  '/content/drive/My Drive/GTZAN Genre Collection Spectrograms.zip']

dataset_dirs = ['/content/drive/My Drive/GTZAN Genre Collection',
                '/content/drive/My Drive/GTZAN Genre Collection Spectrograms']

for zip_file_path, dataset_dir in tqdm(zip(zip_file_paths, dataset_dirs)):
    if os.path.exists(zip_file_path):
        print(f"Extracting {zip_file_path} to {dataset_dir}")
        with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
            zip_ref.extractall(dataset_dir)
    else:
        print(f"Zip file {zip_file_path} does not exist.")


# Evaluate dataset size and experiment with data visualization

In [1]:
# Get and display sample distribution count
dataset_dirs = ['/content/drive/My Drive/GTZAN Genre Collection',
                '/content/drive/My Drive/GTZAN Genre Collection Spectrograms']
def load_class_info(dataset_path: str):
  df_contents = {'file': [], 'class': []}
  for c in os.listdir(dataset_path):
    for image_name in os.listdir(f'{dataset_path}/{c}'):
      df_contents['file'].append(image_name)
      df_contents['class'].append(c)
  return pd.DataFrame(df_contents)


def display_df_info(df_name: str, df) -> None:
  num_classes = df['class'].nunique()
  print(f"'{df_name}' has a total of {len(df.axes[0])} samples distributed over {num_classes} classes.")


audio_df = load_class_info(dataset_dirs[0] + "/genres_original")
spectrogram_df = load_class_info(dataset_dirs[1] + "/images_original")

display_df_info('Audio Dataset', audio_df)
display_df_info('Spectrogram Dataset', spectrogram_df)

print("Note that for some reason the spectrogram dataset is missing one sample (jazz00054.png).")

NameError: name 'os' is not defined

In [None]:
# Distribution of dataset
def plot_distribution(title: str, df):
  plt.title(title)
  dictionary = df['class'].value_counts().to_dict()
  plt.bar(range(len(dictionary)), list(dictionary.values()))
  plt.xticks(range(len(dictionary)), list(dictionary.keys()))
  plt.show()


plot_distribution('Audio Dataset', audio_df)

# Not using this spectrogram but here it is just to see
plot_distribution('Spectrogram Dataset', spectrogram_df)

Hooray, balanced dataset (mostly)!

In [None]:
# Load one audio file
y, sr = librosa.load(dataset_dirs[0] + "/genres_original/rock/rock.00000.wav")

# Show the waveform
librosa.display.waveshow(y, sr=sr)
plt.show()
plt.close()

'''
Generate mel spectrogram on the log scale, this is the spectrogram that will
be used instead of the given spectrogram in an attempt to improve model
performance.
'''
mel_spectrogram_decibels = librosa.power_to_db(
    librosa.feature.melspectrogram(y=y, sr=sr),
    ref=np.max
)
librosa.display.specshow(mel_spectrogram_decibels)
plt.show()
plt.close()

# Preprocessing

Getting the Mel Frequency Spectrogram from an audio file. It's supposed to help with timbre visualization and is used widely ([hugging face link](https://huggingface.co/learn/audio-course/en/chapter1/audio_data)).

In [None]:
# Preprocessing

# Loop through all audio files and create spectrogram pngs for each, save to drive
def preprocess_audio(y, sr: int) -> None:
  new_sr = 22050
  y = librosa.resample(y, orig_sr=sr, target_sr=new_sr)
  y = librosa.to_mono(y)
  y = librosa.util.fix_length(y, size=new_sr*29)
  return y, new_sr


def generate_mel_spectrogram(y, sr: int, save_path: str) -> None:
  mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=128, fmax=8000)
  decibel_mel_spectrogram = librosa.power_to_db(mel_spectrogram, ref=np.max)
  plt.figure(figsize=(8, 6))
  librosa.display.specshow(decibel_mel_spectrogram, sr=sr)
  plt.savefig(save_path, bbox_inches='tight')
  plt.close()


for directory, _, files in os.walk(dataset_dirs[0] + "/genres_original"):
  for file in tqdm(files, desc=f"Processing all files in '{directory}'"):
    if 'jazz' in file and '00054' in file:
      # Skip
      a = 0
    else:
      y, sr = librosa.load(os.path.join(directory, file))
      y, sr = preprocess_audio(y, sr)
      generate_mel_spectrogram(y, sr, f'{dataset_dirs[1]}/images_original/{file.split(".")[0]}/{file.split(".wav")[0].replace(".", "")}.png')



In [None]:
# Split into train and test (no validation for now)

# Spectrograms have been generated above, import with ImageFolder
IMAGE_SIZE = (240, 180)

spectrogram_transforms = transforms.Compose([
    transforms.Resize(IMAGE_SIZE),
    transforms.Grayscale(1),
    transforms.ToTensor()
])

mel_spec_dataset = ImageFolder(dataset_dirs[1] + "/images_original", spectrogram_transforms)

# Perform split
BATCH_SIZE = 32

def split_dataset(ds_name: str, ds: Dataset) -> Tuple[Dataset, Dataset]:
  ds_train, ds_test = random_split(ds, [0.8, 0.2])
  print(f'{ds_name} -> train size: {len(ds_train)}, test_size: {len(ds_test)}')
  return ds_train, ds_test


def get_loaders(train: Dataset, test: Dataset, batch_size: int) -> Tuple[DataLoader, DataLoader]:
  train_loader = DataLoader(train, batch_size=batch_size, num_workers=1, shuffle=True)
  test_loader = DataLoader(test, batch_size=batch_size, num_workers=1, shuffle=True)
  return train_loader, test_loader

spectrogram_train, spectrogram_test = split_dataset("Spectrogram Dataset", mel_spec_dataset)
spectrogram_train_loader, spectrogram_test_loader = get_loaders(spectrogram_train, spectrogram_test, BATCH_SIZE)

# Model

Covolutional Neural Network
- 3 convolutional layers with batch norm and a ReLU activation function
- 3 max pooling layers
- 1 fully connected layer with dropout to avoid overfitting
- First convolutional layer has a large kernel size in order to consider context of a group of pixels and identify patterns (Idea borrowed from [AlexNet](https://pytorch.org/vision/main/models/generated/torchvision.models.alexnet.html))

In [None]:
class CNN(nn.Module):
  def __init__(self):
    super(CNN, self).__init__()

    # Convolutional layers
    self.conv_layers = nn.Sequential(
        nn.Conv2d(1, 32, kernel_size=8, stride=2, padding=1),
        # Dimensions are 158, 118
        nn.BatchNorm2d(32, momentum=0.4),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(kernel_size=3, stride=1, padding=1),
        nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
        nn.BatchNorm2d(64, momentum=0.4),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(kernel_size=3, stride=1, padding=1),
        nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
        nn.BatchNorm2d(128, momentum=0.4),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(kernel_size=3, stride=1, padding=1),
    )

    # Fully connected layers
    self.f_conn_layers = nn.Sequential(
        nn.Dropout(p=0.6),
        nn.Linear(in_features=128 * ((IMAGE_SIZE[0] - 6)//2 + 1) * ((IMAGE_SIZE[1] - 6)//2 + 1), out_features=10),
    )

  def forward(self, x):
    x = self.conv_layers(x)
    x = x.view(-1, 128 * ((IMAGE_SIZE[0] - 6)//2 + 1) * ((IMAGE_SIZE[1] - 6)//2 + 1))
    x = self.f_conn_layers(x)
    return x



In [None]:
# Hyperparameters
LEARNING_RATE = 1e-5
WEIGHT_DECAY = 0.03

# Initialize model
cnn = CNN()
print(cnn)

# Use cuda cores
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
cnn_cuda = cnn.to(device)

# Setup optimizer and loss function
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.AdamW(cnn.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)

# Train

In [None]:
# Hyperparams
EPOCHS = 14

# Device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
cnn_cuda = cnn.to(device)

# Train
for epoch in range(EPOCHS):
  batch_losses = []
  print(f'Epoch: {epoch}')
  cnn_cuda.train()
  for images, labels in tqdm(spectrogram_train_loader):
    images, labels = images.to(device), labels.to(device)

    # Forward pass
    logits = cnn_cuda(images)
    loss = loss_fn(logits, labels)
    batch_losses.append(loss)

    # Backward pass
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

  print(f'Epoch loss: {sum(batch_losses)/len(batch_losses)}')

# Metrics

In [None]:
# Test using accuracy metric
cnn_cuda.eval()
num_test_samples = len(spectrogram_test_loader.dataset)
outputs = torch.empty(num_test_samples, dtype=torch.long, device='cuda')
targets = torch.empty(num_test_samples, dtype=torch.long, device='cuda')

for i, (images, labels) in enumerate(tqdm(spectrogram_test_loader)):
  images, labels = images.cuda(), labels.cuda()
  with torch.no_grad():
    logits = cnn_cuda(images)

  outputs[i * BATCH_SIZE:i * BATCH_SIZE + BATCH_SIZE] = torch.argmax(logits, dim=1)
  targets[i * BATCH_SIZE:i * BATCH_SIZE + BATCH_SIZE] = labels


acc = accuracy_score(outputs.cpu(), targets.cpu())

print(acc)

In [None]:
# Confusion matrix
def cm():
  cm = confusion_matrix(outputs.cpu(), targets.cpu())
  classes = spectrogram_df['class'].unique()
  cm_df = pd.DataFrame(cm, index=classes, columns=classes)
  sns.heatmap(cm_df, annot=True, fmt='d', cmap='Blues', cbar=False)

cm()

# Trying the same thing using resnet (transfer learning) to compare accuracy

In [None]:
from torchvision.models import resnet50

Resnet requires three input channels so must redefine dataset transformations to get RGB images again (can't use grey scale ones).

In [None]:
# Re-load data as RGB

# Same transforms as before but not greyscale!
spectrogram_transforms = transforms.Compose([
    transforms.Resize(IMAGE_SIZE),
    transforms.ToTensor()
])

# Load dataset with new transforms
rgb_dataset = ImageFolder(dataset_dirs[1] + "/images_original", spectrogram_transforms)

# Initialize loaders (using same functions as above)
transfer_train, transfer_test = split_dataset("Spectrogram Dataset RGB", rgb_dataset)
transfer_train_loader, transfer_test_loader = get_loaders(transfer_train, transfer_test, BATCHs_SIZE)


## Initialize model with pretrained weights

In [None]:
# Initialize ResNet50
pretrained_model = resnet50(pretrained=True)

# Freeze gradient on pre-trained weights
for param in pretrained_model.parameters():
  param.requires_grad = False

# Configure output layer (to train)
pretrained_model.fc = nn.Linear(pretrained_model.fc.in_features, 10)

# Loss function
loss = nn.CrossEntropyLoss()

# Optimizer
optimizer = optim.AdamW(pretrained_model.parameters(), lr=0.001, weight_decay=0.9)


## Train

In [None]:
#Train

# Device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
pretrained_model.to(device)

# Train
for epoch in range(10):
  batch_losses = []
  print(f'Epoch: {epoch}')
  pretrained_model.train()
  for images, labels in tqdm(transfer_train_loader):
    images, labels = images.to(device), labels.to(device)

    # Forward pass
    logits = pretrained_model(images)
    loss = loss_fn(logits, labels)
    batch_losses.append(loss)

    # Backward pass
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

  print(f'Epoch loss: {sum(batch_losses)/len(batch_losses)}')

## Metrics

In [None]:
# Test model accuracy
pretrained_model.eval()
num_test_samples = len(transfer_test_loader.dataset)
outputs = torch.empty(num_test_samples, dtype=torch.long, device='cuda')
targets = torch.empty(num_test_samples, dtype=torch.long, device='cuda')

for i, (images, labels) in enumerate(tqdm(transfer_test_loader)):
  images, labels = images.cuda(), labels.cuda()
  with torch.no_grad():
    logits = pretrained_model(images)

  outputs[i * BATCH_SIZE:i * BATCH_SIZE + BATCH_SIZE] = torch.argmax(logits, dim=1)
  targets[i * BATCH_SIZE:i * BATCH_SIZE + BATCH_SIZE] = labels


acc = accuracy_score(outputs.cpu(), targets.cpu())

print(acc)

In [None]:
# Confusion matrix
pretrained_cm()