In [1]:
import os
# from tqdm import tqdm
import torch
import torchaudio
import torch.nn as nn
import torch.nn.functional as F  
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
from sklearn.preprocessing import MultiLabelBinarizer, LabelEncoder, LabelBinarizer, StandardScaler

In [3]:
import utils

In [4]:
# define device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [5]:
DATA_DIR = './fma/data/fma_small'

tracks = utils.load('fma/data/fma_metadata/tracks.csv')
features = utils.load('fma/data/fma_metadata/features.csv')
echonest = utils.load('fma/data/fma_metadata/echonest.csv')

subset = tracks.index[tracks['set', 'subset'] <= 'small']

assert subset.isin(tracks.index).all()
assert subset.isin(features.index).all()

features_all = features.join(echonest, how='inner').sort_index(axis=1)
print('Not enough Echonest features: {}'.format(features_all.shape))

tracks = tracks.loc[subset]
features_all = features.loc[subset]

tracks.shape, features_all.shape

train = tracks.index[tracks['set', 'split'] == 'training']
val = tracks.index[tracks['set', 'split'] == 'validation']
test = tracks.index[tracks['set', 'split'] == 'test']

Not enough Echonest features: (13129, 767)


In [6]:
labels_onehot = LabelBinarizer().fit_transform(tracks['track', 'genre_top'])
labels_onehot = pd.DataFrame(labels_onehot, index=tracks.index)

In [7]:
def mono_to_stereo(waveform):
    # reshape the waveform from a 1D tensor to a 2D tensor with 2 columns
    waveform = waveform.view(-1, 1)
    # repeat the waveform along the columns to create a stereo signal
    waveform = waveform.repeat(1, 2)
    return waveform.T

def stereo_to_mono(waveform):
    if waveform.dim() == 2:
        return torch.mean(waveform, dim=0)
    elif waveform.dim() == 1:
        return waveform
    else:
        raise ValueError("Input must be 1D or 2D tensor")

In [8]:

class FMADataset(Dataset):
    def __init__(self, data_dir, track_ids, subsampling=True, sampling_rate=22050):
        self.data_dir = data_dir
        self.filenames = os.listdir(data_dir)
        self.track_ids = track_ids
        self.sampling_rate = sampling_rate
        self.max_length = 750000
        self.subsampling = subsampling

        # create the Resample transform
        self.resample = torchaudio.transforms.Resample(44100, sampling_rate)
        
        
    def __getitem__(self, index):
        tid = self.track_ids[index]
        # load the MP3 file
        filepath = utils.get_audio_path(self.data_dir, tid)
        waveform, sample_rate = torchaudio.load(filepath)

        waveform = stereo_to_mono(waveform) 

        # resample the waveform to the desired sample rate using the Resample transform
        waveform = self.resample(waveform)

        
        # get label
        label = torch.from_numpy(labels_onehot.loc[tid].values).float()
        
        # subsampling
        if self.subsampling:
            # set the length of the subsamples and the overlap
            subsample_length = self.sampling_rate * 5  # 5 seconds
            overlap = int(subsample_length * 0.75)  # 75% overlap
            subsamples = []
            shift = subsample_length - overlap
            for i in range(0, waveform.size(0) - subsample_length + 1, shift):
                subsample = waveform[i:(i + subsample_length)]
                subsamples.append(subsample)
                if len(subsamples) == 20: break
            
            # Return the subsamples
            return subsamples, label
        else:
            # padding
            padding = self.max_length - waveform.shape[0]
            padding_tensor = torch.zeros((padding, waveform.shape[1]))
            waveform = torch.cat((waveform, padding_tensor), dim=0)

        
        return waveform, label

    def __len__(self):
        return len(self.filenames)

### TODO
- modify __getitem__ method of FMADataset class for 2D CNN
- implement validation

In [9]:

class Res1DLayer(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding):
        super(Res1DLayer, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size, stride, padding)
        self.bn1 = nn.BatchNorm1d(out_channels)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size, stride, padding)
        self.bn2 = nn.BatchNorm1d(out_channels)

        # Projection shortcut
        self.projection = nn.Conv1d(in_channels, out_channels, kernel_size=1, stride=stride, padding=0, bias=False)

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = F.leaky_relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        
        # Add the projection shortcut
        identity = self.projection(identity)
        out += identity

        out = F.leaky_relu(out)

        return out
    
    
class ResNet1D(nn.Module):
    def __init__(self, input_size, num_classes=10):
        super(ResNet1D, self).__init__()
        
        self.conv1 = nn.Conv1d(in_channels=input_size, out_channels=128, kernel_size=3, stride=3, padding=3, bias=False)
     
        self.layer1 = nn.Sequential(
            Res1DLayer(128, 128, kernel_size=3, stride=1, padding=1),
            nn.MaxPool1d(kernel_size=3, stride=3, padding=1),
            Res1DLayer(128, 128, kernel_size=3, stride=1, padding=1),
            nn.MaxPool1d(kernel_size=3, stride=3, padding=1)
        )
        
        self.layer2 = nn.Sequential(
            Res1DLayer(128, 256, kernel_size=3, stride=1, padding=1),
            nn.MaxPool1d(kernel_size=3, stride=3, padding=1),
            Res1DLayer(256, 256, kernel_size=3, stride=1, padding=1),
            nn.MaxPool1d(kernel_size=3, stride=3, padding=1)
        )
        
        self.layer3 = nn.Sequential(
            Res1DLayer(256, 256, kernel_size=3, stride=1, padding=1),
            nn.MaxPool1d(kernel_size=3, stride=3, padding=1),
            Res1DLayer(256, 256, kernel_size=3, stride=1, padding=1),
            nn.MaxPool1d(kernel_size=3, stride=3, padding=1),
            Res1DLayer(256, 256, kernel_size=3, stride=1, padding=1),
            nn.MaxPool1d(kernel_size=3, stride=3, padding=1),
            Res1DLayer(256, 256, kernel_size=3, stride=1, padding=1),
            nn.MaxPool1d(kernel_size=3, stride=3, padding=1)
        )
        
        self.layer4 = nn.Sequential(
            Res1DLayer(256, 512, kernel_size=3, stride=1, padding=1),
            nn.MaxPool1d(kernel_size=3, stride=3, padding=1),
            nn.Conv1d(512, 512, kernel_size=1, stride=1, padding=0)
        )
        
#         self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        #x = self.bn1(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = x.view(x.size(0), -1)  # flatten the tensor
        #x = self.avgpool(x)
        x = self.fc(x)
        return x


In [15]:


# create a Mp3Dataset from a directory of MP3 files
dataset = FMADataset(DATA_DIR, train)

# create a DataLoader from the FMADataset
dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)

    
# create the CNN model
model = ResNet1D(input_size=110250, num_classes=8)
model.to(device)

# define the loss function and the optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

num_epochs = 1
i = 0
running_loss = 0.0
# train the model
for epoch in range(num_epochs):
    for subsamples, label in dataloader:
        label = label.to(device)
        for waveform in subsamples:
            # clear the gradients
            optimizer.zero_grad()

            # forward pass
            waveform = waveform.squeeze(0)  
            waveform = waveform.unsqueeze(-1)
            
            waveform = waveform.to(device)
            output = model(waveform)
            
            loss = loss_fn(output, label)

            # backward pass
            loss.backward()
            optimizer.step()
            # print statistics
            i += 1
            running_loss += loss.item()
            if i % 10 == 9:    # print every 320 samples
                print('[%d, %5d] loss: %.3f' %
                      (epoch + 1, i + 1, running_loss / 2000))
                running_loss = 0.0


print('Finished Training')

[1,    10] loss: 0.008
[1,    20] loss: 0.008
[1,    30] loss: 0.008
[1,    40] loss: 0.007
[1,    50] loss: 0.008
[1,    60] loss: 0.007
[1,    70] loss: 0.008
[1,    80] loss: 0.007
[1,    90] loss: 0.007
[1,   100] loss: 0.007
Finished Training


In [48]:
model

ResNet1D(
  (conv1): Conv1d(110250, 128, kernel_size=(3,), stride=(3,), padding=(3,), bias=False)
  (layer1): Sequential(
    (0): Res1DLayer(
      (conv1): Conv1d(128, 128, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn1): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv1d(128, 128, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn2): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (projection): Conv1d(128, 128, kernel_size=(1,), stride=(1,), bias=False)
    )
    (1): MaxPool1d(kernel_size=3, stride=3, padding=1, dilation=1, ceil_mode=False)
    (2): Res1DLayer(
      (conv1): Conv1d(128, 128, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn1): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv1d(128, 128, kernel_size=(3,), stride=(1,), padding=(1,))
      (bn2): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_ru

In [89]:

# create a Mp3Dataset from a directory of MP3 files
dataset = FMADataset(DATA_DIR, train)

# create a DataLoader from the FMADataset
dataloader = torch.utils.data.DataLoader(dataset, batch_size=8, shuffle=True)

    
# create the CNN model
model = ResNet1D(input_size=750000, num_classes=8)

# define the loss function and the optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

num_epochs = 10
i = 0
running_loss = 0.0
# train the model
for epoch in range(num_epochs):
    for waveform, label in dataloader:
        # clear the gradients
        optimizer.zero_grad()

        # forward pass
#         waveform = waveform.unsqueeze(0)  # add a batch dimension
#         print(waveform.shape)

        # extract the first channel
        first_channel = waveform[:, :, 0]

        # reshape the first channel to add an additional dimension for the channel dimension
        first_channel = first_channel.unsqueeze(-1)
        
        output = model(first_channel)

        loss = loss_fn(output, label)

        # backward pass
        loss.backward()
        optimizer.step()
        # print statistics
        i += 1
        running_loss += loss.item()
        if i % 10 == 9:    # print every 100 samples
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 2000))
            running_loss = 0.0


print('Finished Training')

[1,    10] loss: 0.011
[1,    20] loss: 0.010
[2,    30] loss: 0.010
[2,    40] loss: 0.008
[3,    50] loss: 0.008
[3,    60] loss: 0.009
[4,    70] loss: 0.008
[4,    80] loss: 0.009
[5,    90] loss: 0.008
[5,   100] loss: 0.009
[6,   110] loss: 0.008
[6,   120] loss: 0.008
[7,   130] loss: 0.007
[7,   140] loss: 0.008
[8,   150] loss: 0.008
[8,   160] loss: 0.007
[9,   170] loss: 0.007
[9,   180] loss: 0.007
[10,   190] loss: 0.007
[10,   200] loss: 0.007
Finished Training


## Testing - subsampling

In [16]:
import os
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
# create a dataset object for testing
test_dataset = FMADataset(DATA_DIR, test)
batch_size = 32
# create a data loader to load the dataset
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True)



# test the model
model.eval()
model.to(device)
correct = 0
total = 0
with torch.no_grad(): # don't need to track, calculate or save the gradients in the model
    for subsamples, labels in test_loader:
        labels = labels.to(device)
        labels = torch.argmax(labels, dim=1)
        batch_size = labels.size(0) # we reupdate the batch size because the last batch can be incomplete.
        subsample_outputs = {i: [] for i in range(batch_size)}
        for waveform in subsamples:
            waveform = waveform.squeeze(0)  
            waveform = waveform.unsqueeze(-1)
            waveform = waveform.to(device)
            outputs = model(waveform)
            predicted = torch.argmax(outputs.data, dim=1).cpu()
            
            for j in range(batch_size):
                subsample_outputs[j].append(predicted[j]) 
        for j in range(batch_size):
            # count the occurrences of each class
            counts = np.bincount(subsample_outputs[j])
            # Find the class with the highest count
            aggregate_prediction = np.argmax(counts)
            correct += (aggregate_prediction == labels[j])
        total += labels.size(0)
        
    
        print(f"CORRECT #  {correct}")

print('Accuracy of the network on the test samples: %d %%' % (100 * correct / total))

CORRECT #  5
CORRECT #  8
CORRECT #  13
CORRECT #  17
CORRECT #  23
Accuracy of the network on the test samples: 14 %


## Testing - full sample

In [45]:
# create a dataset object for testing
test_dataset = FMADataset(DATA_DIR, test)

# create a data loader to load the dataset
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=True)

# test the model
model.eval()
model.to(device)
correct = 0
total = 0
with torch.no_grad(): # don't need to track, calculate or save the gradients in the model
    for data in test_loader:
        # get the inputs
        audio, labels = data
        # wrap them in a torch Variable
        audio, labels = audio.to(device), labels.to(device)

        # forward + backward + optimize
        outputs = model(audio)
        _, predicted = torch.max(outputs.data, 1)
        labels = torch.argmax(labels, dim=1)
        predicted = torch.argmax(outputs.data, dim=1)
        print(labels)
        print(predicted)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        print(f"CORRECT #  {correct}")

print('Accuracy of the network on the test samples: %d %%' % (100 * correct / total))

RuntimeError: CUDA error: unspecified launch failure
CUDA kernel errors might be asynchronously reported at some other API call,so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.

## Remarks on implementations - draft

### There are two types of samples: mono and stereo - we need to convert mono to stereo when feeding the CNN

An audio channel refers to a single track of audio. The number of channels in an audio file determines the number of separate audio tracks that are mixed together to form the final audio.

A mono audio file has a single channel, which means that all the audio is mixed together into one single track. This means that if you play a mono audio file, the same audio will come out of both the left and right speakers (or headphones) and it will sound the same regardless of the stereo or mono setup.

A stereo audio file, on the other hand, has two channels - a left channel and a right channel. These two channels carry separate audio tracks that are mixed together to create the final audio. When played back on a stereo setup, each channel will be played through its corresponding speaker or headphone and this way, the stereo audio creates a sense of space and directionality.

So, for example, a stereo audio recording of a live concert will have different audio captured by different microphone positioned in different positions in the concert hall, and when it is played back, it creates the sense of being there in the concert hall.

It is worth noting that there are also audio file format with more than 2 channels, such as 5.1 or 7.1 surround sound audio.


### Downsampling

 we downsample the audio signals to a lower sample rate to reduce the data size or to simplify the processing of the signal. Downsampling can be useful for tasks such as speech recognition or audio classification, where the lower frequencies of the signal are more important than the higher frequencies.