### Task3: Deep Learning Model

- Train a deep learning model (e.g. CNN or attention based model) with Mel-spectrograms extracted from the audio as input

- Need to compare 2 different kinds of inputs: Mel-spectrograms with or without taking the log

- You can choose whatever FFT window size and hop length you like

- You can choose whatever deep learning model you like

- Need to report how to implement the model clearly

- Need to report the testing result (not validation result) with confusion matrix, top1 accuracy, and top3 accuracy

- You can use any music tagging model. For a novice, the short chunk CNN in this repo is recommended. (Need to replace the BCE loss to Cross-entropy loss)

In [12]:


test_data_path = '<PUT THE PATH TO THE TEST DATA HERE>'
# test_data_path = 'nsynth-test'

In [13]:
import os
import json
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import librosa
from torch.utils.data import DataLoader
import tqdm
from sklearn.preprocessing import LabelEncoder

import torch
from torch.utils.data import DataLoader
from sklearn.metrics import confusion_matrix

import pickle


In [14]:
# load the json file
def load_json(json_file):
    with open(json_file) as f:
        data = json.load(f)
    return data



In [15]:
def feature_extraction(key, file_path):
    y, sr = librosa.load(file_path)

    # FFT window size=2048, and the hop length=512
    # extract the mel spectrogram feature
    mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=512, hop_length=512)

    # extract the mel spectrogram feature with log scaling
    log_mel_spectrogram = librosa.power_to_db(mel_spectrogram)

    # put all features into a list
    features = [mel_spectrogram, log_mel_spectrogram]

    return features

In [16]:
label_encoder = pickle.load(open('label_encoder_mel_log.pkl', 'rb'))

In [17]:
class Conv_2d(nn.Module):
    def __init__(self, input_channels, output_channels, kernel_size=3, stride=1, pooling=2):
        super(Conv_2d, self).__init__()
        self.conv = nn.Conv2d(input_channels, output_channels, kernel_size, stride=stride, padding=kernel_size//2)
        self.bn = nn.BatchNorm2d(output_channels)
        self.relu = nn.ReLU()
        self.mp = nn.MaxPool2d(pooling)
        
    def forward(self, x):
        out = self.mp(self.relu(self.bn(self.conv(x))))
        return out

class ShortChunkCNN(nn.Module):
    '''
    Short-chunk CNN architecture.
    So-called VGG-like model with a small receptive field.
    Deeper layers, smaller pooling (2x2).
    '''
    def __init__(self,
                 n_channels=1,
                 n_class=11):
        super(ShortChunkCNN, self).__init__()

        # CNN Layers
        self.layer1 = Conv_2d(1, n_channels, pooling=2)
        self.layer2 = Conv_2d(n_channels, n_channels, pooling=2)
        self.layer3 = Conv_2d(n_channels, n_channels*2, pooling=2)
        self.layer4 = Conv_2d(n_channels*2, n_channels*2, pooling=2)
        self.layer5 = Conv_2d(n_channels*2, n_channels*2, pooling=2)
        self.layer6 = Conv_2d(n_channels*2, n_channels*2, pooling=2)
        self.layer7 = Conv_2d(n_channels*2, n_channels*4, pooling=2)

        # Fully Connected Layers
        self.dense1 = nn.Linear(n_channels*4, n_channels*4)
        self.bn1 = nn.BatchNorm1d(n_channels*4)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.dense2 = nn.Linear(n_channels*4, n_class)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        # x: (batch_size, 1, 128, 137)

        # CNN Forward Pass
        x = self.layer1(x)  # -> (batch_size, n_channels, H/2, W/2)
        x = self.layer2(x)  # -> (batch_size, n_channels, H/4, W/4)
        x = self.layer3(x)  # -> (batch_size, n_channels*2, H/8, W/8)
        x = self.layer4(x)  # -> (batch_size, n_channels*2, H/16, W/16)
        x = self.layer5(x)  # -> (batch_size, n_channels*2, H/32, W/32)
        x = self.layer6(x)  # -> (batch_size, n_channels*2, H/64, W/64)
        x = self.layer7(x)  # -> (batch_size, n_channels*4, H/128, W/128)

        if x.size(3) != 1:
            x = nn.MaxPool2d(kernel_size=(1, x.size(3)))(x)
        x = x.squeeze(3)  # -> (batch_size, n_channels*4, H/128)

        if x.size(2) != 1:
            x = nn.MaxPool1d(x.size(2))(x)
        x = x.squeeze(2)  # -> (batch_size, n_channels*4)

        x = self.dense1(x)          # -> (batch_size, n_channels*4)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.dense2(x)          # -> (batch_size, n_class)
        x = self.softmax(x)

        return x


In [18]:
# read the model
model = ShortChunkCNN(n_channels=1, n_class=11)
model.load_state_dict(torch.load('mel_model_log.pth'))

In [19]:
batch_size = 32

# load the test data
test_data = load_json(os.path.join(test_data_path, 'examples.json'))
test_keys = list(test_data.keys())
test_features = []
for key in test_keys:
    file = os.path.join(test_data_path, 'audio', key + '.wav')
    feature = feature_extraction(key, file)
    test_features.append(feature)

test_mel_spectrogram = [f[1] for f in test_features]

test_labels = [test_data[key]["instrument_family_str"] for key in test_keys]
test_integer_encoded = label_encoder.fit_transform(test_labels)
test_integer_encoded = test_integer_encoded.reshape(-1, 1)

# Reshape integer_encoded to 2D array (necessary for OneHotEncoder)
test_integer_encoded = test_integer_encoded.reshape(-1, 1)

test_x = np.array(test_mel_spectrogram)
# test_x = np.expand_dims(test_x, axis=1)
test_y = test_integer_encoded

# load the test data into the data loader
test_loader = DataLoader(dataset=list(zip(test_x, test_y)), batch_size=batch_size, shuffle=False)

In [20]:
print(test_x.shape)
print(test_y.shape)

In [21]:
def evaluate(model, test_loader, device):
    model.eval()  # Set model to evaluation mode
    top1_correct = 0
    top3_correct = 0
    total = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():  # Disable gradient calculation
        for batch_x, batch_y in test_loader:

            batch_x = batch_x.unsqueeze(1)  # -> (batch_size, 1, 128, 137)
            batch_y = batch_y.squeeze() # -> (batch_size)

            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            
            # Forward pass
            outputs = model(batch_x)
            
            # Get Top-1 predictions
            _, top1_pred = torch.max(outputs, dim=1)
            
            # Get Top-3 predictions
            _, top3_pred = torch.topk(outputs, k=3, dim=1)
            
            # Compute Top-1 accuracy
            top1_correct += (top1_pred == batch_y.squeeze()).sum().item()
            
            # Compute Top-3 accuracy
            top3_correct += (batch_y.squeeze().unsqueeze(1) == top3_pred).sum().item()

            # Collect predictions and true labels for confusion matrix
            all_preds.extend(top1_pred.cpu().numpy())
            all_labels.extend(batch_y.squeeze().cpu().numpy())
            
            total += batch_y.size(0)

    # Calculate accuracies
    top1_accuracy = top1_correct / total
    top3_accuracy = top3_correct / total

    # Compute confusion matrix
    conf_matrix = confusion_matrix(all_labels, all_preds)

    return top1_accuracy, top3_accuracy, conf_matrix


# Example usage of the evaluate function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the model (assuming model has already been defined and trained)
model.to(device)

# Load the test data into the data loader
test_loader = DataLoader(dataset=list(zip(test_x, test_y)), batch_size=32, shuffle=False)

# Evaluate the model
top1_acc, top3_acc, conf_matrix = evaluate(model, test_loader, device)

# Print results
print(f"Top-1 Accuracy: {top1_acc * 100:.2f}%")
print(f"Top-3 Accuracy: {top3_acc * 100:.2f}%")
print("Confusion Matrix:")
print(conf_matrix)
