# Get GTZAN and MusiCNN-MSD

In [1]:
import os
import tensorflow
import torch
import torchaudio
import numpy as np
import essentia.standard as es
from essentia import Pool
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
torch.manual_seed(42)
###############################################################################################

# Please first specify the path of your GTZAN dataset if it is already downloaded in your system.
    # Otherwise leave 'data' or the desired path where we will download the dataset.

GTZAN_path = 'data'
#GTZAN_path = <your_path>

###############################################################################################

# TensorflowPredictMusiCNN expects mono 16kHz sample rate inputs. Resample needed
resample = es.Resample(inputSampleRate=22050, outputSampleRate=16000, quality=0)

# Download dataset from torchaudio
if not os.path.isdir(GTZAN_path):
    os.mkdir(GTZAN_path)
    train_dataset = torchaudio.datasets.GTZAN(root=GTZAN_path, download=True, subset='training')
else:
    train_dataset = torchaudio.datasets.GTZAN(root=GTZAN_path, subset='training')
val_dataset = torchaudio.datasets.GTZAN(root=GTZAN_path, subset='validation')
test_dataset = torchaudio.datasets.GTZAN(root=GTZAN_path, subset='testing')

# We download the essentia MSD MusiCNN model
if not os.path.isfile('msd-musicnn-1.pb'):
    !curl -SLO https://essentia.upf.edu/models/autotagging/msd/msd-musicnn-1.pb

class Essentia_MusiCNNMSD_GTZAN_Dataset(Dataset):
    """ The embeddings of the GTZAN dataset extracted with Essentia-Tensorflow's MusiCNN-MSD model. """
    def __init__(self, GTZAN_dataset, embeddings):
        self.GTZAN_dataset = GTZAN_dataset
        self.embeddings = embeddings
        self.GTZAN_genres = [
            "blues",
            "classical",
            "country",
            "disco",
            "hiphop",
            "jazz",
            "metal",
            "pop",
            "reggae",
            "rock",
        ]
        
    def __len__(self):
        return len(self.GTZAN_dataset)
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        inputs = torch.from_numpy(self.embeddings[idx]).mean(0) #comment mean for original method
        labels = torch.tensor(self.GTZAN_genres.index(self.GTZAN_dataset[idx][2]))
        
        return inputs, labels
# We define a shallow model

class shallowClassifier(nn.Module):
    def __init__(self):
        super(shallowClassifier, self).__init__()
        self.dense1 = nn.Linear(200, 100) #change to 19*200 if commenting .mean()avobe
        self.dense2 = nn.Linear(100, 10)
        
    def forward(self,x):
        x = x.view(-1, 200) #change to 200*19 if commenting .mean() above
        x = F.relu(self.dense1(x))
        x = self.dense2(x)
        
        return x
    

# Build embeddings with GTZAN time

In [2]:
# Compute and store the embeddings for each subset
if not os.path.isfile('train_embeddings.npy'):
    i=0
    train_embeddings = []
    for track in train_dataset:
        if i%100==0:
            i+=1
        print('Processing track '+str(i)+' of '+str(len(train_dataset)))
        train_embeddings.append(es.TensorflowPredictMusiCNN(
            graphFilename='msd-musicnn-1.pb', output='model/dense/BiasAdd')(resample(track[0].numpy()[0])))
    train_embeddings = np.array(train_embeddings)
    np.save('train_embeddings.npy',train_embeddings)

    val_embeddings = []
    for track in val_dataset:
        val_embeddings.append(es.TensorflowPredictMusiCNN(
            graphFilename='msd-musicnn-1.pb', output='model/dense/BiasAdd')(resample(track[0].numpy()[0])))
    val_embeddings=np.array(val_embeddings)    
    np.save('val_embeddings.npy',val_embeddings)

    test_embeddings = []
    for track in test_dataset:
        test_embeddings.append(es.TensorflowPredictMusiCNN(
            graphFilename='msd-musicnn-1.pb', output='model/dense/BiasAdd')(resample(track[0].numpy()[0])))
    test_embeddings=np.array(test_embeddings)
    np.save('test_embeddings.npy',np.array(test_embeddings))
else:
    train_embeddings=np.load('train_embeddings.npy')
    val_embeddings=np.load('val_embeddings.npy')
    test_embeddings=np.load('test_embeddings.npy')

In [3]:
# Embedding shapes...
train_embeddings[0].shape

(19, 200)

In [4]:
# Embedding types
train_embeddings[0].dtype

dtype('float32')

In [5]:
embtrain_dataset = Essentia_MusiCNNMSD_GTZAN_Dataset(train_dataset, train_embeddings)

# Build train loop

In [6]:
def train_test(train_embeddings, val_embeddings, test_embeddings):
    # We compute the distance of the embeddings between all songs in the training set
    emb_distance = np.zeros((len(train_embeddings), len(train_embeddings)))

    for indxA, trackA in enumerate(train_embeddings):
        for indxB, trackB in enumerate(train_embeddings):
            emb_distance[indxA, indxB] = np.linalg.norm(trackA - trackB)        

    embtrain_dataset = Essentia_MusiCNNMSD_GTZAN_Dataset(train_dataset, train_embeddings)
    train_loader = torch.utils.data.DataLoader(embtrain_dataset, batch_size=16, shuffle=True, num_workers=4)

    embval_dataset = Essentia_MusiCNNMSD_GTZAN_Dataset(val_dataset, val_embeddings)
    val_loader = torch.utils.data.DataLoader(embval_dataset, batch_size=16, shuffle=False, num_workers=4)

    embtest_dataset = Essentia_MusiCNNMSD_GTZAN_Dataset(test_dataset, test_embeddings)
    test_loader = torch.utils.data.DataLoader(embtest_dataset, batch_size=16, shuffle=False, num_workers=4)

    model = shallowClassifier()

    if torch.cuda.is_available():
        model = model.cuda()

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=1e-4)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer)

    num_epochs = 2000
    train_losses = torch.zeros(num_epochs)
    val_losses = torch.zeros(num_epochs)

    bestloss = 100000.0
    for epoch in range(num_epochs):
        #The train loop
        model.train()
        for inputs, labels in train_loader:
            # Send data to the GPU
            if torch.cuda.is_available():
                inputs = inputs.cuda()
                labels = labels.cuda()

            # Clear gradient and forward + loss + backward
            optimizer.zero_grad()
            outputs = model(inputs) 
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_losses[epoch] += loss.item()
        train_losses[epoch] /= len(train_loader)

        model.eval()
        with torch.no_grad():
            for inputs, labels in val_loader:
                # Send data to the GPU
                if torch.cuda.is_available():
                    inputs = inputs.cuda()
                    labels = labels.cuda()

                outputs = model(inputs) 
                loss = criterion(outputs, labels)

                val_losses[epoch] += loss.item()
            val_losses[epoch] /= len(val_loader)
            scheduler.step(val_losses[epoch])

            # If best epoch, we save parameters
            if val_losses[epoch] < bestloss :
                bestloss = val_losses[epoch]
                torch.save(model.state_dict(), 'model.pth')
                
        if epoch%100==0:
            print('Epoch '+str(epoch)+': Train Loss = '+str(train_losses[epoch].item())+'. Val Loss = '+str(val_losses[epoch].item())+'.')
    print('Best validation loss :' + str(bestloss.item()))

    # Finally we compute accuracy with the test set
    model.load_state_dict(torch.load('model.pth'));
    model.eval()
    confusion_matrix = torch.zeros(len(embtrain_dataset.GTZAN_genres), len(embtrain_dataset.GTZAN_genres))
    with torch.no_grad():
        for inputs, labels in test_loader:
            # Send data to the GPU
            if torch.cuda.is_available():
                inputs = inputs.cuda()
                labels = labels.cuda()
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            for t, p in zip(labels.view(-1), predicted.view(-1)):
                confusion_matrix[t.long(), p.long()] += 1

    # Per-class Accuracy
    pclass_acc = confusion_matrix.diag()/confusion_matrix.sum(1)
    return torch.mean(pclass_acc).item()*100

# Train GTZAN_time

In [7]:
baseline = train_test(train_embeddings, val_embeddings, test_embeddings)

Epoch 0: Train Loss = 2.396742582321167. Val Loss = 2.302741050720215.
Epoch 100: Train Loss = 1.0398520231246948. Val Loss = 1.171513557434082.
Epoch 200: Train Loss = 0.6912970542907715. Val Loss = 0.885692298412323.
Epoch 300: Train Loss = 0.5544649362564087. Val Loss = 0.7747247219085693.
Epoch 400: Train Loss = 0.47854354977607727. Val Loss = 0.7215855717658997.
Epoch 500: Train Loss = 0.4272844195365906. Val Loss = 0.6938523054122925.
Epoch 600: Train Loss = 0.3927702009677887. Val Loss = 0.6783539056777954.
Epoch 700: Train Loss = 0.3665103316307068. Val Loss = 0.6705629825592041.
Epoch 800: Train Loss = 0.34445664286613464. Val Loss = 0.667378842830658.
Epoch 900: Train Loss = 0.3370794653892517. Val Loss = 0.6668924689292908.
Epoch 1000: Train Loss = 0.334261029958725. Val Loss = 0.6668926477432251.
Epoch 1100: Train Loss = 0.33406737446784973. Val Loss = 0.6668928861618042.
Epoch 1200: Train Loss = 0.3340103328227997. Val Loss = 0.6668930053710938.
Epoch 1300: Train Loss = 0.

NameError: name 'test_loader' is not defined

In [None]:
print(baseline)

# Baseline is 80.50%

In [None]:
audio = resample(train_dataset[0][0].numpy()[0])
original = es.TensorflowPredictMusiCNN(graphFilename='msd-musicnn-1.pb', output='model/dense/BiasAdd')(audio)

In [None]:
original.shape

In [None]:
def melspectrogram(audio):
    # Computes the mel spectrogram of audio inputs as done in the MelonPlaylist dataset
    windowing = es.Windowing(type='hann', normalized=False, zeroPadding=0)
    spectrum = es.Spectrum()
    melbands = es.MelBands(numberBands=48,
                                   sampleRate=16000,
                                   lowFrequencyBound=0,
                                   highFrequencyBound=16000/2,
                                   inputSize=(512+0)//2+1,
                                   weighting='linear',
                                   normalize='unit_tri',
                                   warpingFormula='slaneyMel',
                                   type='power')
    amp2db = es.UnaryOperator(type='lin2db', scale=2)
    result = []
    for frame in es.FrameGenerator(audio, frameSize=512, hopSize=256,
                                   startFromZero=False):
        spectrumFrame = spectrum(windowing(frame))

        melFrame = melbands(spectrumFrame)
        result.append(amp2db(melFrame))
    return np.array(result)

In [None]:
def adapt_melonInput_TensorflowPredict(melon_sample):
    db2amp = es.UnaryOperator(type='db2lin', scale=2)
    oversampled = np.zeros((len(melon_sample), melon_sample.shape[1]*2)).astype(np.float32)
    for k in range(len(melon_sample)):
        sample = np.log10(1 + (db2amp(melon_sample[k])*10000))
        oversampled[k,:]=np.interp(np.arange(96)/2, np.arange(48), sample)
    # Now we cut again, but with hop size of 93 frames as in default TensorflowPredictMusiCNN
    new = np.zeros((int(len(oversampled) / 93) - 1, 187, 96)).astype(np.float32)
    for k in range(int(len(oversampled) / 93) - 1):
        new[k]=oversampled[k*93:k*93+187]
    return np.expand_dims(new, 2)

In [None]:
modelName='msd-musicnn-1.pb'
output_layer='model/dense/BiasAdd'
input_layer='model/Placeholder'
predict = es.TensorflowPredict(graphFilename=modelName,
                               inputs=[input_layer],
                               outputs=[output_layer])


In [None]:
in_pool = Pool()
in_pool.set('model/Placeholder', adapt_melonInput_TensorflowPredict(melspectrogram(audio)))
output = predict(in_pool)
prediction = output['model/dense/BiasAdd'][:,0,0,:]

In [None]:
# TensorflowPredictMusiCNN expects mono 16kHz sample rate inputs. Resample needed
# Compute and store the embeddings for each subset
if not os.path.isfile('train_embeddings_melon.npy'):
    i=0
    train_embeddings = []
    for track in train_dataset:
        if i%100==0:
            i+=1
        print('Processing track '+str(i)+' of '+str(len(train_dataset)))
        in_pool = Pool()
        in_pool.set('model/Placeholder', adapt_melonInput_TensorflowPredict(melspectrogram(resample(track[0].numpy()[0]))))
        output = predict(in_pool)
        train_embeddings.append(output['model/dense/BiasAdd'][:,0,0,:])
    train_embeddings = np.array(train_embeddings)
    np.save('train_embeddings_melon.npy', train_embeddings)
    
    val_embeddings = []
    for track in val_dataset:
        in_pool = Pool()
        in_pool.set('model/Placeholder', adapt_melonInput_TensorflowPredict(melspectrogram(resample(track[0].numpy()[0]))))
        output = predict(in_pool)
        val_embeddings.append(output['model/dense/BiasAdd'][:,0,0,:])
    val_embeddings = np.array(val_embeddings)
    np.save('val_embeddings_melon.npy', val_embeddings)
    
    test_embeddings = []
    for track in test_dataset:
        in_pool = Pool()
        in_pool.set('model/Placeholder', adapt_melonInput_TensorflowPredict(melspectrogram(resample(track[0].numpy()[0]))))
        output = predict(in_pool)
        test_embeddings.append(output['model/dense/BiasAdd'][:,0,0,:])
    test_embeddings = np.array(test_embeddings)
    np.save('test_embeddings_melon.npy', test_embeddings)    

else:
    train_embeddings=np.load('train_embeddings_melon.npy')
    val_embeddings=np.load('val_embeddings_melon.npy')
    test_embeddings=np.load('test_embeddings_melon.npy')

In [None]:
# Embedding shapes...
train_embeddings[0].shape

In [None]:
# Embedding types
train_embeddings[0].dtype

# Re-train, now with interpolated melspecs

In [None]:
interpolated = train_test(train_embeddings, val_embeddings, test_embeddings)

In [None]:
print(interpolated)

# Now with random embeddings

In [None]:
np.random.seed(42)

In [None]:
rand = train_test(np.reshape(5*np.random.randn(train_embeddings.size).astype(np.float32), train_embeddings.shape), 
                 np.reshape(5*np.random.randn(val_embeddings.size).astype(np.float32), val_embeddings.shape), 
                 np.reshape(5*np.random.randn(test_embeddings.size).astype(np.float32), test_embeddings.shape))

In [None]:
print(rand)

# Now with random musiCNN

In [None]:
class Conv_V(nn.Module):
    # vertical convolution
    def __init__(self, input_channels, output_channels, filter_shape):
        super(Conv_V, self).__init__()
        self.conv = nn.Conv2d(input_channels, output_channels, filter_shape,
                              padding=(0, filter_shape[1]//2))
        self.bn = nn.BatchNorm2d(output_channels)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.bn(self.conv(x)))
        freq = x.size(2)
        out = nn.MaxPool2d((freq, 1), stride=(freq, 1))(x)
        out = out.squeeze(2)
        return out


class Conv_H(nn.Module):
    # horizontal convolution
    def __init__(self, input_channels, output_channels, filter_length):
        super(Conv_H, self).__init__()
        self.conv = nn.Conv1d(input_channels, output_channels, filter_length,
                              padding=filter_length//2)
        self.bn = nn.BatchNorm1d(output_channels)
        self.relu = nn.ReLU()

    def forward(self, x):
        freq = x.size(2)
        out = nn.AvgPool2d((freq, 1), stride=(freq, 1))(x)
        out = out.squeeze(2)
        out = self.relu(self.bn(self.conv(out)))
        return out

class Conv_1d(nn.Module):
    def __init__(self, input_channels, output_channels, shape=3, stride=1, pooling=2):
        super(Conv_1d, self).__init__()
        self.conv = nn.Conv1d(input_channels, output_channels, shape, stride=stride, padding=shape//2)
        self.bn = nn.BatchNorm1d(output_channels)
        self.relu = nn.ReLU()
        self.mp = nn.MaxPool1d(pooling)
    def forward(self, x):
        out = self.mp(self.relu(self.bn(self.conv(x))))
        return out
    
class Musicnn(nn.Module):
    '''
    Pons et al. 2017
    End-to-end learning for music audio tagging at scale.
    This is the updated implementation of the original paper. Referred to the Musicnn code.
    https://github.com/jordipons/musicnn
    '''
    def __init__(self,
                sample_rate=16000,
                n_fft=512,
                f_min=0.0,
                f_max=8000.0,
                n_mels=96,
                n_class=50,
                dataset='mtat'):
        super(Musicnn, self).__init__()

        # Spectrogram
        self.spec = torchaudio.transforms.MelSpectrogram(sample_rate=sample_rate,
                                                         n_fft=n_fft,
                                                         f_min=f_min,
                                                         f_max=f_max,
                                                         n_mels=n_mels)
        self.to_db = torchaudio.transforms.AmplitudeToDB()
        self.spec_bn = nn.BatchNorm2d(1)

        # Pons front-end
        m1 = Conv_V(1, 204, (int(0.7*96), 7))
        m2 = Conv_V(1, 204, (int(0.4*96), 7))
        m3 = Conv_H(1, 51, 129)
        m4 = Conv_H(1, 51, 65)
        m5 = Conv_H(1, 51, 33)
        self.layers = nn.ModuleList([m1, m2, m3, m4, m5])

        # Pons back-end
        backend_channel= 512 if dataset=='msd' else 64
        self.layer1 = Conv_1d(561, backend_channel, 7, 1, 1)
        self.layer2 = Conv_1d(backend_channel, backend_channel, 7, 1, 1)
        self.layer3 = Conv_1d(backend_channel, backend_channel, 7, 1, 1)

        # Dense
        dense_channel = 500 if dataset=='msd' else 200
        self.dense1 = nn.Linear((561+(backend_channel*3))*2, dense_channel)
        self.bn = nn.BatchNorm1d(dense_channel)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.dense2 = nn.Linear(dense_channel, n_class)

    def forward(self, x):
        # Spectrogram
        x = self.spec(x)
        x = self.to_db(x)
        x = x.unsqueeze(1)
        x = self.spec_bn(x)

        # Pons front-end
        out = []
        for layer in self.layers:
            out.append(layer(x))
        out = torch.cat(out, dim=1)

        # Pons back-end
        length = out.size(2)
        res1 = self.layer1(out)
        res2 = self.layer2(res1) + res1
        res3 = self.layer3(res2) + res2
        out = torch.cat([out, res1, res2, res3], 1)

        mp = nn.MaxPool1d(length)(out)
        avgp = nn.AvgPool1d(length)(out)
        out = torch.cat([mp, avgp], dim=1)
        out = out.squeeze(2)

        out = self.relu(self.bn(self.dense1(out)))
        out = self.dropout(out)
        #out = self.dense2(out)
        #out = nn.Sigmoid()(out)

        return out
rand_musiCNN = Musicnn()
rand_musiCNN.eval();

In [None]:
# Compute and store the embeddings for each subset
if not os.path.isfile('train_embeddings_random.npy'):
    i=0
    train_embeddings = []
    for track in train_dataset:
        if i%100==0:
            i+=1
        print('Processing track '+str(i)+' of '+str(len(train_dataset)))
        train_embeddings.append(rand_musiCNN(torch.from_numpy(resample(track[0].numpy()[0])).unsqueeze(0)).detach().numpy())
    train_embeddings = np.array(train_embeddings)
    np.save('train_embeddings_random.npy', train_embeddings)
    
    val_embeddings = []
    for track in val_dataset:
        val_embeddings.append(rand_musiCNN(torch.from_numpy(resample(track[0].numpy()[0])).unsqueeze(0)).detach().numpy())
    val_embeddings = np.array(val_embeddings)
    np.save('val_embeddings_random.npy', val_embeddings)
    
    test_embeddings = []
    for track in test_dataset:
        test_embeddings.append(rand_musiCNN(torch.from_numpy(resample(track[0].numpy()[0])).unsqueeze(0)).detach().numpy())
    test_embeddings = np.array(test_embeddings)
    np.save('test_embeddings_random.npy', test_embeddings)    

else:
    train_embeddings=np.load('train_embeddings_random.npy')
    val_embeddings=np.load('val_embeddings_random.npy')
    test_embeddings=np.load('test_embeddings_random.npy')

In [None]:
random_net = train_test(train_embeddings, val_embeddings, test_embeddings)

In [None]:
random_net

| Model        | Loss           | Accuracy  |
| ------------- |:-------------:| -----:|
| Random embeddings      | 2.31 | 8.07% |
| Random musiCNN      | 1.88 | 40,58% |
| musiCNN waveform      |    0.67   |   80.50% |
| musiCNN melonMEL | 0.92      |    74.57% |