In [81]:
import math, random
import torch
import librosa
from IPython.display import Audio
import os
import re
import pandas as pd

In [82]:
class AudioUtil():
    
    @staticmethod
    def open(audio_file):
        sig, sr = librosa.load(audio_file)
        
        return (sig, sr)
    
    @staticmethod
    def rechannel(aud, new_channel):
        
        sig, sr = aud
        
        if (sig.shape[0] == new_channel):
            return aud
        
        if (new_channel == 1):
            resig = sig[:1, :]
        else:
            resig = torch.cat([sig, sig])
            
        return ((resig, sr))
    
    @staticmethod
    def resample(aud, newsr):
        sig, sr = aud
        
        if (sr == newsr):
            return aud
        
        num_channels = sig.shape[0]
        resig = librosa.resample(sr, newsr)(sig[:1, :])
        
        if (num_channels > 1):
            retwo = librosa.resample(sr, newsr)(sig[1:, :])
            resig = torch.cat([resig, retwo])
            
        return ((resig, newsr))
    
    @staticmethod
    def pad_trunc(aud, max_ms):
        sig, sr= aud
        num_rows, sig_len = sig.shape
        max_len = sr // 1000 * max_ms
        
        if (sig_len > max_len):
            sig = sig[:, :max_len]
            
        elif (sig_len < max_len):
            pad_begin_len = random.randint(0, max_len - sig_len)
            pad_end_len = max_len - sig_len - pad_begin_len
            
            pad_begin = torch.zeros((num_rows, pad_begin_len))
            pad_end = torch.zeros((num_rows, pad_end_len))
            
            sig = torch.cat((pad_begin, sig, pad_end), 1)
            
        return (sig, sr)
    
    @staticmethod
    def time_shift(aud, shift_limit):
        sig, sr = aud
        _, sig_len = sig.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        return (sig.roll(shift_amt), sr)
    
    @staticmethod
    def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
        sig, sr = aud
        top_db = 80
        
        spec = librosa.feature.melspectrogram(sr, n_fft=n_fft, hop_length=hop_len,
                                              n_mels=n_mels)(sig)
        
        spec = librosa.amplitude_to_db(top_db=top_db)(spec)
        
        return (spec)

In [83]:
from torch.utils.data import DataLoader, Dataset, random_split

In [84]:
download_path = './input/notes_v2/'
df = pd.read_csv('./metadata.csv')

In [88]:
class SoundDS(Dataset):
    
    
    def __init__(self, df, data_path):
        self.df = df
        self.data_path = str(data_path)
        self.duration = 4000
        self.sr = 44100
        self.channel = 2
        self.shift_pct = 0.4
        
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        audio_file = self.data_path + self.df.loc[idx, 'Sample']
        class_id = self.df.loc[idx, 'ClassID']
        
        aud = AudioUtil.open(audio_file)
        reaud = AudioUtil.resample(aud, self.sr)
        rechan = AudioUtil.rechannel(reaud, self.channel)
        
        dur_aud = AudioUtil.pad_trunc(rechan, self.duration)
        shift_aud = AudioUtil.time_shift(dur_aud, self.shift_pct)
        sgram = AudioUtil.spectro_gram(shift_aud, n_mels=64, n_fft=1024, hop_len=None)
        
        return sgram, class_id

In [89]:
myds = SoundDS(df, download_path)

In [61]:
num_items = len(myds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds, val_ds = random_split(myds, [num_train, num_val])

train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=16, shuffle=False)

In [66]:
import torch.nn.functional as F 
from torch.nn import init  
import torch.nn as nn 

In [67]:
class AudioClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        
        conv_layers = []
        
        self.conv1 = nn.Conv2d(2, 8, kernel_size=(5,5), stride=(2,2), padding=(2,2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        # Second Convolution Block
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        # Second Convolution Block
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        # Second Convolution Block
        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]

        # Linear Classifier
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=10)

        # Wrap the Convolutional Blocks
        self.conv = nn.Sequential(*conv_layers)
 
    # ----------------------------
    # Forward pass computations
    # ----------------------------
    def forward(self, x):
        # Run the convolutional blocks
        x = self.conv(x)

        # Adaptive pool and flatten for input to linear layer
        x = self.ap(x)
        x = x.view(x.shape[0], -1)

        # Linear layer
        x = self.lin(x)

        # Final output
        return x

# Create the model and put it on the GPU if available
myModel = AudioClassifier()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
myModel = myModel.to(device)
# Check that it is on Cuda
next(myModel.parameters()).device
    

device(type='cpu')

In [68]:
train_dl

<torch.utils.data.dataloader.DataLoader at 0x1e07638b6d0>

In [69]:
def training(model, train_dl, num_epochs):
  # Loss Function, Optimizer and Scheduler
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
  scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                steps_per_epoch=int(len(train_dl)),
                                                epochs=num_epochs,
                                                anneal_strategy='linear')

  # Repeat for each epoch
  for epoch in range(num_epochs):
    running_loss = 0.0
    correct_prediction = 0
    total_prediction = 0

    # Repeat for each batch in the training set
    for i, data in enumerate(train_dl):
        # Get the input features and target labels, and put them on the GPU
        inputs, labels = data[0].to(device), data[1].to(device)

        # Normalize the inputs
        inputs_m, inputs_s = inputs.mean(), inputs.std()
        inputs = (inputs - inputs_m) / inputs_s

        # Zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

        # Keep stats for Loss and Accuracy
        running_loss += loss.item()

        # Get the predicted class with the highest score
        _, prediction = torch.max(outputs,1)
        # Count of predictions that matched the target label
        correct_prediction += (prediction == labels).sum().item()
        total_prediction += prediction.shape[0]

        #if i % 10 == 0:    # print every 10 mini-batches
        #    print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 10))
    
    # Print stats at the end of the epoch
    num_batches = len(train_dl)
    avg_loss = running_loss / num_batches
    acc = correct_prediction/total_prediction
    print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')

  print('Finished Training')
  
num_epochs=10   
training(myModel, train_dl, num_epochs)

KeyError: 'relative_path'

In [22]:
from IPython.display import clear_output

In [23]:
data = save_mfcc(dataset_path, json_path)
clear_output()

In [24]:
def load_data(dataset: dict):
        
    inputs = np.array(dataset['mfcc'])
    targets = np.array(dataset['labels'])
    
    return inputs, targets

In [25]:
inputs, targets = load_data(data)

  inputs = np.array(dataset['mfcc'])


ValueError: could not broadcast input array from shape (10,137) into shape (10,)

In [14]:
inputs = inputs.reshape(-1)

In [17]:
inputs.shape

(360,)

In [18]:
from sklearn.model_selection import train_test_split

input_train, input_test, target_train, target_test = train_test_split(inputs, targets, test_size=0.3)
print(input_train.shape, target_train.shape)

(252,) (252,)


In [19]:
from tensorflow.keras import Sequential
from tensorflow.keras.layers import *

In [20]:
model = Sequential()

model.add(Flatten(input_shape=(inputs.shape[0], inputs.shape[1])))
model.add(Dense(512, activation='relu'))
model.add(Dense(256, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(12, activation='relu'))
model.summary()

IndexError: tuple index out of range

In [25]:
from tensorflow.keras import optimizers
adam = optimizers.Adam(learning_rate=1e-4)

In [26]:
model.compile(optimizer=adam,
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

hist = model.fit(input_train, target_train,
                 validation_data=(input_test, target_test),
                 epochs=50,
                 batch_size=32)

clear_output()

ValueError: Failed to convert a NumPy array to a Tensor (Unsupported object type list).