In [1]:
from google.colab import drive
drive.mount('/content/gdrive/', force_remount=True)
!unzip -n gdrive/MyDrive/genres.zip
!pip install torchaudio-augmentations
!cp gdrive/MyDrive/test_filtered.txt test_filtered.txt
!cp gdrive/MyDrive/train_filtered.txt train_filtered.txt
!cp gdrive/MyDrive/valid_filtered.txt valid_filtered.txt

Mounted at /content/gdrive/
Archive:  gdrive/MyDrive/genres.zip
   creating: genres/
   creating: genres/blues/
  inflating: genres/blues/blues.00000.wav  
  inflating: genres/blues/blues.00001.wav  
  inflating: genres/blues/blues.00002.wav  
  inflating: genres/blues/blues.00003.wav  
  inflating: genres/blues/blues.00004.wav  
  inflating: genres/blues/blues.00005.wav  
  inflating: genres/blues/blues.00006.wav  
  inflating: genres/blues/blues.00007.wav  
  inflating: genres/blues/blues.00008.wav  
  inflating: genres/blues/blues.00009.wav  
  inflating: genres/blues/blues.00010.wav  
  inflating: genres/blues/blues.00011.wav  
  inflating: genres/blues/blues.00012.wav  
  inflating: genres/blues/blues.00013.wav  
  inflating: genres/blues/blues.00014.wav  
  inflating: genres/blues/blues.00015.wav  
  inflating: genres/blues/blues.00016.wav  
  inflating: genres/blues/blues.00017.wav  
  inflating: genres/blues/blues.00018.wav  
  inflating: genres/blues/blues.00019.wav  
  inflat

In [2]:
import os
import random
import torch
import numpy as np
import soundfile as sf
from torch.utils import data
from torchaudio.datasets import GTZAN
from torchaudio_augmentations import *



GTZAN_GENRES = ['blues', 'classical', 'country', 'disco', 'hiphop', 'jazz', 'metal', 'pop', 'reggae', 'rock']


class GTZANDataset(data.Dataset):
    def __init__(self, data_path, split, num_samples, num_chunks, is_augmentation):
        self.data_path =  data_path if data_path else ''
        self.split = split
        self.num_samples = num_samples
        self.num_chunks = num_chunks
        self.is_augmentation = is_augmentation
        self.genres = GTZAN_GENRES
        self._get_song_list()
        if is_augmentation:
            self._get_augmentations()

    def _get_song_list(self):
        list_filename = os.path.join(self.data_path, '%s_filtered.txt' % self.split)
        with open(list_filename) as f:
            lines = f.readlines()
        self.song_list = [line.strip() for line in lines]

    def _get_augmentations(self):
        transforms = [
            RandomResizedCrop(n_samples=self.num_samples),
            RandomApply([PolarityInversion()], p=0.8),
            RandomApply([Noise(min_snr=0.3, max_snr=0.5)], p=0.3),
            RandomApply([Gain()], p=0.2),
            RandomApply([HighLowPass(sample_rate=22050)], p=0.8),
            RandomApply([Delay(sample_rate=22050)], p=0.5),
            RandomApply([PitchShift(n_samples=self.num_samples, sample_rate=22050)], p=0.4),
            RandomApply([Reverb(sample_rate=22050)], p=0.3),
        ]
        self.augmentation = Compose(transforms=transforms)

    def _adjust_audio_length(self, wav):
        if self.split == 'train':
            random_index = random.randint(0, len(wav) - self.num_samples - 1)
            wav = wav[random_index : random_index + self.num_samples]
        else:
            hop = (len(wav) - self.num_samples) // self.num_chunks
            wav = np.array([wav[i * hop : i * hop + self.num_samples] for i in range(self.num_chunks)])
        return wav

    def __getitem__(self, index):
        line = self.song_list[index]

        # get genre
        genre_name = line.split('/')[0]
        genre_index = self.genres.index(genre_name)
        tags = np.zeros(len(GTZAN_GENRES))
        tags[genre_index] = 1

        # get audio
        audio_filename = os.path.join(self.data_path, 'genres', line)
        wav, fs = sf.read(audio_filename)

        # adjust audio length
        wav = self._adjust_audio_length(wav).astype('float32')

        # data augmentation
        if self.is_augmentation:
            wav = self.augmentation(torch.from_numpy(wav).unsqueeze(0)).squeeze(0).numpy()

        return wav, tags

    def __len__(self):
        return len(self.song_list)

def get_dataloader(data_path=None, 
                   split='train', 
                   num_samples=22050 * 29, 
                   num_chunks=1, 
                   batch_size=16, 
                   num_workers=0, 
                   is_augmentation=False):
    is_shuffle = True if (split == 'train') else False
    batch_size = batch_size if (split == 'train') else (batch_size // num_chunks)
    data_loader = data.DataLoader(dataset=GTZANDataset(data_path, 
                                                       split, 
                                                       num_samples, 
                                                       num_chunks, 
                                                       is_augmentation),
                                  batch_size=batch_size,
                                  shuffle=is_shuffle,
                                  drop_last=False,
                                  num_workers=num_workers)
    return data_loader

In [3]:
train_loader = get_dataloader(split='train', is_augmentation=True)
iter_train_loader = iter(train_loader)
train_wav, train_genre = next(iter_train_loader)

valid_loader = get_dataloader(split='valid')
test_loader = get_dataloader(split='test')
iter_test_loader = iter(test_loader)
test_wav, test_genre = next(iter_test_loader)
print('training data shape: %s' % str(train_wav.shape))
print('validation/test data shape: %s' % str(test_wav.shape))
print(train_genre)

training data shape: torch.Size([16, 639450])
validation/test data shape: torch.Size([16, 1, 639450])
tensor([[0., 0., 0., 0., 0., 1., 0., 0., 0., 0.],
        [0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 1., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 1., 0., 0., 0., 0.],
        [0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
        [0., 0., 1., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 1., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 1.],
        [0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 1., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 1., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 1., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 1., 0., 0., 0.]], dtype=torch.float64)


In [4]:
from torch import nn


class Conv_2d(nn.Module):
    def __init__(self, input_channels, output_channels, shape=3, pooling=2, dropout=0.1):
        super(Conv_2d, self).__init__()
        self.conv = nn.Conv2d(input_channels, output_channels, shape, padding=shape//2)
        self.bn = nn.BatchNorm2d(output_channels)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(pooling)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, wav):
        out = self.conv(wav)
        out = self.bn(out)
        out = self.relu(out)
        out = self.maxpool(out)
        out = self.dropout(out)
        return out

In [5]:
import torchaudio


class CNN(nn.Module):
    def __init__(self, num_channels=16, 
                       sample_rate=22050, 
                       n_fft=1024, 
                       f_min=0.0, 
                       f_max=11025.0, 
                       num_mels=128, 
                       num_classes=10):
        super(CNN, self).__init__()

        # mel spectrogram
        self.melspec = torchaudio.transforms.MelSpectrogram(sample_rate=sample_rate, 
                                                            n_fft=n_fft, 
                                                            f_min=f_min, 
                                                            f_max=f_max, 
                                                            n_mels=num_mels)
        self.amplitude_to_db = torchaudio.transforms.AmplitudeToDB()
        self.input_bn = nn.BatchNorm2d(1)

        # convolutional layers
        self.layer1 = Conv_2d(1, num_channels, pooling=(2, 3))
        self.layer2 = Conv_2d(num_channels, num_channels, pooling=(3, 4))
        self.layer3 = Conv_2d(num_channels, num_channels * 2, pooling=(2, 5))
        self.layer4 = Conv_2d(num_channels * 2, num_channels * 2, pooling=(3, 3))
        self.layer5 = Conv_2d(num_channels * 2, num_channels * 4, pooling=(3, 4))

        # dense layers
        self.dense1 = nn.Linear(num_channels * 4, num_channels * 4)
        self.dense_bn = nn.BatchNorm1d(num_channels * 4)
        self.dense2 = nn.Linear(num_channels * 4, num_classes)
        self.dropout = nn.Dropout(0.5)
        self.relu = nn.ReLU()

    def forward(self, wav):
        # input Preprocessing
        out = self.melspec(wav)
        out = self.amplitude_to_db(out)

        # input batch normalization
        out = out.unsqueeze(1)
        out = self.input_bn(out)

        # convolutional layers
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        
        # reshape. (batch_size, num_channels, 1, 1) -> (batch_size, num_channels)
        out = out.reshape(len(out), -1)

        # dense layers
        out = self.dense1(out)
        out = self.dense_bn(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.dense2(out)

        return out

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix, roc_auc_score


device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
cnn = CNN().to(device)
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(cnn.parameters(), lr=0.001)
valid_losses = []
num_epochs = 30

for epoch in range(num_epochs):
    losses = []

    # Train
    cnn.train()
    for (wav, tags) in train_loader:
        wav = wav.to(device)
        tags = tags.to(device)

        # Forward
        out = cnn(wav)
        loss = loss_function(out, tags)

        # Backward
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        losses.append(loss.item())
    print('Epoch: [%d/%d], Train loss: %.4f' % (epoch+1, num_epochs, np.mean(losses)))

    # Validation
    cnn.eval()
    y_true = []
    y_pred = []
    losses = []
    for wav, tags in valid_loader:
        wav = wav.to(device)
        tags = tags.to(device)

        # reshape and aggregate chunk-level predictions
        b, c, t = wav.size()
        logits = cnn(wav.view(-1, t))
        logits = logits.view(b, c, -1).mean(dim=1)
        loss = loss_function(logits, tags)
        losses.append(loss.item())
        pred = logits.data

        # append labels and predictions
        y_true.extend(tags.tolist())
        y_pred.extend(pred.tolist())
    roc = roc_auc_score(y_true, y_pred)
    valid_loss = np.mean(losses)
    print('Epoch: [%d/%d], Valid loss: %.4f, Valid ROC Score: %.4f' % (epoch+1, num_epochs, valid_loss, roc))

    # Save model
    valid_losses.append(valid_loss.item())
    if np.argmin(valid_losses) == epoch:
        print(f"Saving the best model at {epoch+1} epochs!")
        torch.save(cnn.state_dict(), 'best_model.ckpt')

Epoch: [1/30], Train loss: 2.4115
Epoch: [1/30], Valid loss: 2.3144, Valid ROC Score: 0.5747
Saving the best model at 1 epochs!
Epoch: [2/30], Train loss: 2.2794
Epoch: [2/30], Valid loss: 2.2296, Valid ROC Score: 0.6960
Saving the best model at 2 epochs!
Epoch: [3/30], Train loss: 2.1743
Epoch: [3/30], Valid loss: 2.1328, Valid ROC Score: 0.7547
Saving the best model at 3 epochs!
Epoch: [4/30], Train loss: 2.1572
Epoch: [4/30], Valid loss: 2.0184, Valid ROC Score: 0.7488
Saving the best model at 4 epochs!
Epoch: [5/30], Train loss: 2.0710
Epoch: [5/30], Valid loss: 2.0863, Valid ROC Score: 0.7499
Epoch: [6/30], Train loss: 2.0545
Epoch: [6/30], Valid loss: 1.8652, Valid ROC Score: 0.8144
Saving the best model at 6 epochs!
Epoch: [7/30], Train loss: 1.9783
Epoch: [7/30], Valid loss: 1.7855, Valid ROC Score: 0.8083
Saving the best model at 7 epochs!
Epoch: [8/30], Train loss: 1.9809
Epoch: [8/30], Valid loss: 1.9269, Valid ROC Score: 0.7991
Epoch: [9/30], Train loss: 1.9003
Epoch: [9/30

In [None]:
# Load the best model
S = torch.load('best_model.ckpt')
cnn.load_state_dict(S)
print('loaded!')

# Run evaluation
cnn.eval()
y_true = []
y_pred = []

with torch.no_grad():
    for wav, tags in test_loader:
        wav = wav.to(device)
        tags = tags.to(device)

        # reshape and aggregate chunk-level predictions
        b, c, t = wav.size()
        logits = cnn(wav.view(-1, t))
        logits = logits.view(b, c, -1).mean(dim=1)
        pred = logits.data

        # append labels and predictions
        y_true.extend(tags.tolist())
        y_pred.extend(pred.tolist())

In [None]:
import seaborn as sns
from sklearn.metrics import multilabel_confusion_matrix, roc_auc_score

roc = roc_auc_score(y_true, y_pred)
print('ROC Score: %.4f' % roc)