### REI505M Final project: Music genre classification starter pack

The following Dataset class operates on the GTZAN dataset.

* The duration of most GTZAN files are 30 seconds (3022050=661500 samples) but some are slightly shorter (approx 29.9 seconds). For this reason we truncate at 660000 samples below.
* It may be beneficial to work with smaller chunks than ~30 seconds.
* You may want to perform the data augmentations in the `__get_item__` function.
* For now, `train_dataset` contains all the dataset, you need to set aside some examples for validation and test sets.

In [1]:
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import scipy.io.wavfile as wav
import os
import numpy as np
import librosa
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

audio_dir = 'music/' # Path to folder with GTZAN files
# music/
#  - rock/
#       rock.00099.wav
#       ...
#  - reggie/
#  ...
#  - blues/

batch_size = 32
class AudioDataset(Dataset):
    def __init__(self, audio_files, labels, audio_path,
                 maxlen, sampling_rate, duration, augment=False):
        self.audio_files = audio_files
        self.audio_path = audio_path
        self.labels = labels
        self.maxlen = maxlen
        self.sampling_rate = sampling_rate
        self.duration = duration          # seconds
        self.augment = augment

        self.target_len = int(self.duration * self.sampling_rate)

    def __len__(self):
        return len(self.audio_files)

    def _crop_or_pad(self, x: torch.Tensor) -> torch.Tensor:
        """Ensure fixed length target_len via crop/pad. Crop start is random if augment=True."""
        if len(x) > self.maxlen:
            x = x[:self.maxlen]

        L = len(x)
        if L > self.target_len:
            if self.augment:
                start = torch.randint(0, L - self.target_len + 1, (1,)).item()
            else:
                start = (L - self.target_len) // 2
            x = x[start:start + self.target_len]
        elif L < self.target_len:
            pad = self.target_len - L
            x = torch.cat([x, torch.zeros(pad, dtype=x.dtype)], dim=0)

        return x

    def _augment_audio(self, x: torch.Tensor) -> torch.Tensor:
        """
        Augment with:
        - pitch shifting
        - time stretching
        - loudness (gain)
        - additive noise
        """
        x_np = x.numpy().astype(np.float32)

        # Pitch shift (random 2 semitones)
        if np.random.rand() < 0.7:
            n_steps = np.random.uniform(-2.0, 2.0)
            x_np = librosa.effects.pitch_shift(x_np, sr=self.sampling_rate, n_steps=n_steps)

        # Time stretch (0.8x–1.2x)
        if np.random.rand() < 0.7 and len(x_np) > 2:
            rate = np.random.uniform(0.8, 1.2)
            x_np = librosa.effects.time_stretch(x_np, rate)

        x = torch.from_numpy(x_np)

        # Loudness / gain
        if torch.rand(1).item() < 0.7:
            gain = 0.7 + 0.6 * torch.rand(1).item()  # 0.7x–1.3x
            x = x * gain

        # Additive Gaussian noise
        if torch.rand(1).item() < 0.7:
            std = x.std()
            if std > 0:
                noise_level = 0.02 * std   
                noise = torch.randn_like(x) * noise_level
                x = x + noise

        return x

    def __getitem__(self, idx):
        label = self.labels[idx]
        audio_file = self.audio_files[idx]

        genre_dir = audio_file[:audio_file.index('.')]
        file_path = os.path.join(self.audio_path, genre_dir, audio_file)

        rate, audio_samples = wav.read(file_path)

        if audio_samples.ndim == 2:
            audio_samples = audio_samples.mean(axis=1)

        audio_samples = torch.from_numpy(audio_samples).to(torch.float32)

        # Apply augmentation only to training data
        if self.augment:
            audio_samples = self._augment_audio(audio_samples)

        # Fix length after augmentation
        audio_samples = self._crop_or_pad(audio_samples)
        audio_samples = audio_samples.unsqueeze(0)

        return audio_samples, label

label_map={'blues' : 0, 'classical' : 1, 'country' : 2,
           'disco' : 3, 'hiphop'    : 4, 'jazz'    : 5,
           'metal' : 6, 'pop'       : 7, 'reggae'  : 8, 'rock' : 9}

audio_files = []
labels = []
for root, subdirs, files in os.walk(audio_dir):
    for fname in files:
        if fname == '.DS_Store':
            continue
        audio_files.append(fname)
        labels.append(label_map[fname[:fname.index('.')]])

torch.manual_seed(0) # Reproducible results

# Create train/validation/test splits
audio_length = len(audio_files)
indices = torch.randperm(audio_length)

train_size = int(0.7 * audio_length)
val_size = int(0.15 * audio_length)
test_size = audio_length - train_size - val_size

train_indices = indices[:train_size]
val_indices = indices[train_size:train_size + val_size]
test_indices = indices[train_size + val_size:]

train_files = [audio_files[i] for i in train_indices]
train_labels = [labels[i] for i in train_indices]
validation_files = [audio_files[i] for i in val_indices]
validation_labels = [labels[i] for i in val_indices]
test_files = [audio_files[i] for i in test_indices]
test_labels = [labels[i] for i in test_indices]

print("Training set:", len(train_files))
print("Validation set:", len(validation_files))
print("Test set:", len(test_files))
assert len(set(train_files) & set(validation_files) & set(test_files)) == 0

train_dataset = AudioDataset(audio_files=train_files, labels=train_labels,
                             audio_path=audio_dir, 
                             maxlen=660000, sampling_rate=22050, duration=25, augment=True,)
test_dataset=AudioDataset(audio_files=test_files, labels=test_labels,
                          audio_path=audio_dir,
                          maxlen=660000, sampling_rate=22050, duration=25, augment=True,)
val_dataset=AudioDataset(audio_files=validation_files, labels=validation_labels,
                          audio_path=audio_dir,
                          maxlen=660000, sampling_rate=22050, duration=25, augment=True,)



train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False )
val_loader = DataLoader(val_dataset,  batch_size=batch_size, shuffle=False)

tmp_features, tmp_labels = next(iter(train_loader))
print(f"Feature batch shape: {tmp_features.size()}")
print(f"Labels batch shape: {tmp_labels.size()}")

ModuleNotFoundError: No module named 'torch'

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps") # Mac MPS framework
else:
    device = torch.device("cpu")
print("Device: ", device)

### A 1D CNN

In [None]:
#Hyperparameters are 
# cout try 2^4, 2^5 etc.
# k kernel size 
#fdim is the size of the output of the last max pool layer (its value depends on settings for the other hyper-parameters)
#M is the dimension of the linear layer prior to the final classification layer (try e.g., values the range 100 - 400)
#r is repeats of conblock around 5 - 10 beware of overfitting

def evaluate(model, data_loader, device):
    model.eval()
    test_loss = 0
    correct = 0
    for data, target in data_loader:
        data, target = data.to(device), target.to(device) # Move data to
        with torch.no_grad():
            output = model(data)
            pred = output.data.max(1, keepdim=True)[1] # get the index of
            correct += pred.eq(target.data.view_as(pred)).sum().item()
    accuracy = 100. * correct / len(data_loader.dataset)
    return accuracy

def calculateFdim(L,k,r,stride):
    for _ in range(r):
        L=((L-k)//stride)+1
        L=((L-k)//stride)+1
    return L

    

class convBlock(torch.nn.Module):
    def __init__(self,cin,cout,k):
        super().__init__()
        self.conv1=nn.Conv1d(cin,cout,kernel_size=k,stride=2)
        #self.conv2=nn.Conv1d(cin,cout,k,stride=2) extra conv1d
        self.pool = nn.MaxPool1d(kernel_size=k, stride=2)

    def forward(self, x):
        x=nn.functional.relu(self.conv1(x))
        #x=nn.functional.relu(self.conv2(x))
        x=self.pool(x)
        return x
    

class oneDCNN(torch.nn.Module):
    def __init__(self,cin,cout,k,fdim,M,r):
        super().__init__()
        self.convBlock = nn.Sequential(*[convBlock(cin if i == 0 else cout, cout, k) for i in range(r)])
        self.fc1=nn.Linear(fdim,M)
        self.fc2=nn.Linear(M,10)


    def forward(self, x):    
        x=self.convBlock(x)
        x=torch.flatten(x,start_dim=1)
        x = nn.functional.relu(self.fc1(x))
        x = self.fc2(x)
        return x
    

cin=1
cout=32
kernel_size = 3
stride = 2
r = 5
M=400
n_epochs=50
lam=0.1

fdim=calculateFdim(22050,kernel_size,r,stride)*cout

model=oneDCNN(cin,cout,kernel_size,fdim,M,r)
model.to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, weight_decay=lam)

val_history=[]
loss_history=[]

for epoch in range(n_epochs):
    for batch_idx, (data,target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = nn.functional.cross_entropy(output, target)
        loss.backward() 
        optimizer.step()
    val_acc = evaluate(model, val_loader, device)
    if (epoch + 1) % 5 == 0:
        print(f'Epoch: {epoch + 1:d}. Loss: {loss.item():.4f}, val_acc={val_acc}')
    val_history.append(val_acc)
    loss_history.append(loss.item())



In [None]:
#Plot
plt.plot(loss_history)
plt.xlabel('Epoch')
plt.ylabel('Training Loss')
plt.show()

plt.plot(val_history)
plt.xlabel('Epoch')
plt.ylabel('Validation accuracy')
plt.show()

In [None]:
#Accuracy
test_acc = evaluate(model, test_loader, device)
print(f'Test set accuracy: {test_acc:.3f}%')