In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import torch
import random
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import resnet18, convnext_tiny, efficientnet_v2_s,shufflenet_v2_x2_0,mobilenet_v3_small,mobilenet_v3_large


device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

input_path = '/kaggle/input/'

urbansound8k_path = input_path + 'urbansound8k/'

In [None]:
urban_df = pd.read_csv(urbansound8k_path + "UrbanSound8K.csv")
urban_df.head()

In [None]:
urban_df['relative_path'] = '/fold' + urban_df['fold'].astype(str) + '/' + urban_df['slice_file_name'].astype(str)
urban_df.head()

In [None]:
import torch
import torchaudio
from torchaudio import transforms

In [None]:
class AudioUtil():
    
    def open_audio(audio_path):
        signal, sample_rate = torchaudio.load(audio_path)
        return (signal, sample_rate)
    
    def rechannel(aud, new_channel):
        sig, sr = aud

        if (sig.shape[0] == new_channel):
          # Nothing to do
          return aud

        if (new_channel == 1):
          # Convert from stereo to mono by selecting only the first channel
          resig = sig[:1, :]
        else:
          # Convert from mono to stereo by duplicating the first channel
          resig = torch.cat([sig, sig])

        return ((resig, sr))
    
    def resample(aud, new_sample_rate):
        
        signal, sample_rate = aud
        n_channels = signal.shape[0]
        if (sample_rate != new_sample_rate):
            resig = torchaudio.transforms.Resample(sample_rate, new_sample_rate)(signal[:1,:])
            if (n_channels > 1):
                retwo = torchaudio.transforms.Resample(sample_rate, new_sample_rate)(signal[1:,:])
                resig = torch.cat([resig,retwo])
            return ((resig, new_sample_rate))
        else:
            return aud
    def pad_trunc(aud, max_ms):
        sig, sr = aud
        num_rows, sig_len = sig.shape
        max_len = sr // 1000 * max_ms

        if sig_len > max_len:
            # Truncate the signal to the given length
            sig = sig[:, :max_len]
        elif sig_len < max_len:
            # Calculate the length of padding to add at the beginning and end of the signal
            pad_begin_len = random.randint(0, max_len - sig_len)
            pad_end_len = max_len - sig_len - pad_begin_len

            # Pad with 0s
            pad_begin = torch.zeros((num_rows, pad_begin_len))
            pad_end = torch.zeros((num_rows, pad_end_len))

            sig = torch.cat((pad_begin, sig, pad_end), 1)

        return (sig, sr)
    def time_shift(aud, shift_limit):
        sig,sr = aud
        _, sig_len = sig.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        return (sig.roll(shift_amt), sr)
    def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
        sig,sr = aud
        top_db = 80

        # spec has shape [channel, n_mels, time], where channel is mono, stereo etc
        spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)

        # Convert to decibels
        spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
        return (spec)
    def LFCC(aud, sr = 16000, n_lfcc = 64, n_fft=1024, hop_len=None):
        sig,sr = aud
        top_db = 80
        spec = transforms.LFCC(sr, speckwargs={"n_fft": n_fft, "hop_length": hop_len, "center": False}, n_lfcc = n_lfcc)(sig)

        # Convert to decibels
        spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
        return (spec)
    def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
        _, n_mels, n_steps = spec.shape
        mask_value = spec.mean()
        aug_spec = spec

        freq_mask_param = max_mask_pct * n_mels
        for _ in range(n_freq_masks):
            aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

        time_mask_param = max_mask_pct * n_steps
        for _ in range(n_time_masks):
            aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

        return aug_spec


In [None]:
from torch.utils.data import DataLoader, Dataset, random_split
class SoundDS(Dataset):
    def __init__(self, df, data_path):
        self.df = df
        self.data_path = str(data_path)
        self.duration = 4000
        self.sr = 44100
        self.channel = 2
        self.shift_pct = 0.4
            
   # ----------------------------
   # Number of items in dataset
   # ----------------------------
    def __len__(self):
        return len(self.df)    
      
   # ----------------------------
   # Get i'th item in dataset
   # ----------------------------
    def __getitem__(self, idx):
        # Absolute file path of the audio file - concatenate the audio directory with
        # the relative path
        audio_file = self.data_path + self.df.loc[idx, 'relative_path']
        # Get the Class ID
        class_id = self.df.loc[idx, 'classID']

        aud = AudioUtil.open_audio(audio_file)
        # Some sounds have a higher sample rate, or fewer channels compared to the
        # majority. So make all sounds have the same number of channels and same 
        # sample rate. Unless the sample rate is the same, the pad_trunc will still
        # result in arrays of different lengths, even though the sound duration is
        # the same.
        reaud = AudioUtil.resample(aud, self.sr)
        rechan = AudioUtil.rechannel(reaud, self.channel)

        dur_aud = AudioUtil.pad_trunc(rechan, self.duration)
        shift_aud = AudioUtil.time_shift(dur_aud, self.shift_pct)
        sgram = AudioUtil.spectro_gram(shift_aud, n_mels=64, n_fft=1024, hop_len=None)
        aug_sgram = AudioUtil.spectro_augment(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)

        return aug_sgram, class_id

In [None]:
myds = SoundDS(urban_df, urbansound8k_path)

# Random split of 80:20 between training and validation
num_items = len(myds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds, val_ds = random_split(myds, [num_train, num_val])

# Create training and validation data loaders
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=16, shuffle=False)

In [None]:
class modModel(nn.Module):
    def __init__(self, num_classes):
        super(modModel,self).__init__()
        self.num_classes = num_classes
        self.conv1x1 = nn.Conv2d(2,3, kernel_size = 1)
        self.baseModel = mobilenet_v3_large(num_classes = self.num_classes)
    def forward(self, x):
        x = self.conv1x1(x)
        x = self.baseModel(x)
#         x = F.softmax(x, dim = 1)
        return x

In [None]:
test = torch.rand((1,2,400,400))
print(test.shape)
model = modModel(10)
x = model(test)
print(x.shape)

In [None]:

child_counter = 0
for child in model.children():
    print(" child", child_counter, "is:")
    print(child)
    child_counter += 1

In [None]:
def training(model, train_dl, num_epochs):
    # Loss Function, Optimizer and Scheduler
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                steps_per_epoch=int(len(train_dl)),
                                                epochs=num_epochs,
                                                anneal_strategy='linear')

  # Repeat for each epoch
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct_prediction = 0
        total_prediction = 0

        # Repeat for each batch in the training set
        for i, data in enumerate(train_dl):
            # Get the input features and target labels, and put them on the GPU
            inputs, labels = data[0].to(device), data[1].to(device)

            # Normalize the inputs
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s

            # Zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            scheduler.step()

            # Keep stats for Loss and Accuracy
            running_loss += loss.item()

            # Get the predicted class with the highest score
            _, prediction = torch.max(outputs,1)
            # Count of predictions that matched the target label
            correct_prediction += (prediction == labels).sum().item()
            total_prediction += prediction.shape[0]

            #if i % 10 == 0:    # print every 10 mini-batches
            #    print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 10))
    
        # Print stats at the end of the epoch
        num_batches = len(train_dl)
        avg_loss = running_loss / num_batches
        avg_acc = correct_prediction/total_prediction
        print(f'Epoch: {epoch}, Loss: {avg_loss}, Accuracy: {avg_acc}')

    # Save model
    torch.save(model.state_dict(), 'model.pt')

    print('Finished Training')

In [None]:
myModel = modModel(10)
training(myModel, train_dl, 100)

In [None]:
def inference(model, test_dl):
    correct_prediction = 0
    total_prediction = 0

    # Disable gradient updates
    with torch.no_grad():
        for data in test_dl:
            # Get the input features and target labels, and put them on the GPU
            inputs, labels = data[0].to(device), data[1].to(device)

            # Normalize the inputs
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s

            # Get predictions
            outputs = model(inputs)

            # Get the predicted class with the highest score
            _, prediction = torch.max(outputs,1)
            # Count of predictions that matched the target label
            correct_prediction += (prediction == labels).sum().item()
            total_prediction += prediction.shape[0]
    acc = correct_prediction/total_prediction
    print(f'Accuracy: {acc}, Total items: {total_prediction}')
    
model_inf = modModel(10)
model_inf.load_state_dict(torch.load('model.pt'))
model_inf = model_inf.to(device)
model_inf.eval()

inference(model_inf, val_dl)