In [1]:
!pip install pydub

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import pydub
import os 
import librosa
import soundfile as sf
import torch
import torchaudio
from torchaudio import transforms
import random
from torch.utils.data import DataLoader, Dataset, random_split
import torch.nn.functional as F
from torch.nn import init
import torch.nn as nn

In [3]:
def set_seed(seed = 42):
    '''Sets the seed of the entire notebook so results are the same every time we run.
    This is for REPRODUCIBILITY.'''
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    # When running on the CuDNN backend, two further options must be set
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    # Set a fixed value for the hash seed
    os.environ['PYTHONHASHSEED'] = str(seed)

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
sound_label = ["Dog", "Rooster", "Pig", "Cow", "Frog", "Cat", "Hen", "Insects (flying)", "Sheep", "Crow"
                ,"Rain", "Sea waves", "Crackling fire", "Crickets", "Chirping birds", "Water drops", "Wind", "Pouring water", "Toilet flush", "Thunderstorm"
                ,"Crying baby", "Sneezing", "Clapping", "Breathing", "Coughing", "Footsteps", "Laughing", "Brushing teeth", "Snoring", "Drinking, sipping"
                , "Door knock", "Mouse click", "Keyboard typing", "Door, wood creaks", "Can opening", "washing machine", "Vacuum cleaner", "Clock alarm", "Clock tick", "Glass breaking"
                , "Helicopter", "Chainsaw", "Siren", "Car horn", "Engine", "Train", "Church bells", "Airplane", "Fireworks", "Hand saw"]
sounds = dict(zip(range(50), sound_label))

In [6]:
df = pd.read_csv('/content/drive/MyDrive/ASR Project/ESC-50-master/meta/esc50.csv')
df.head(10)


Unnamed: 0,filename,fold,target,category,esc10,src_file,take
0,1-100032-A-0.wav,1,0,dog,True,100032,A
1,1-100038-A-14.wav,1,14,chirping_birds,False,100038,A
2,1-100210-A-36.wav,1,36,vacuum_cleaner,False,100210,A
3,1-100210-B-36.wav,1,36,vacuum_cleaner,False,100210,B
4,1-101296-A-19.wav,1,19,thunderstorm,False,101296,A
5,1-101296-B-19.wav,1,19,thunderstorm,False,101296,B
6,1-101336-A-30.wav,1,30,door_wood_knock,False,101336,A
7,1-101404-A-34.wav,1,34,can_opening,False,101404,A
8,1-103298-A-9.wav,1,9,crow,False,103298,A
9,1-103995-A-30.wav,1,30,door_wood_knock,False,103995,A


In [7]:
data_path = r'/content/drive/MyDrive/ASR Project/ESC-50-master/audio/'


In [8]:
class AudioUtil():
    @staticmethod
    def open(audio_file):
        sig, sr = torchaudio.load(audio_file)
        return (sig,sr)
    
    # ----------------------------
    # Standardizing sample rate to 44100Hz
    # ----------------------------
    def resample(audio, srate):
        sig, sr = audio
        if (sr == srate):
            return audio
        no_channels = sig.shape[0]

        #Resample 1st channel:
        resig = torchaudio.transforms.Resample(sr, srate)(sig[:1,:])
        if (no_channels > 1):
            #Resample 2nd channel and merge both
            retwo = torchaudio.transforms.Resample(sr, srate)(sig[1:,:])
            resig = torch.cat([resig, retwo])

        return ((resig, srate))


    # ----------------------------
    # Some audios are mono, some are stereo. We need everything to have the same dimensions.
    # Thus, we can either only select the first channel of stereo or duplicate the first channel of mono
    # ----------------------------
    @staticmethod
    def rechannel(audio, channel):
        sig, sr = audio
        if (sig.shape[0]==channel):
            return audio
        if (channel==1):
            resig = sig[:1,:]
        else:
            resig = torch.cat([sig,sig])

        return ((resig, sr))

    

    # ----------------------------
    # Standardize the length of the audio - that is, either pad or truncate the audio
    # ----------------------------
    @staticmethod
    def resize_aud(audio, ms):
        sig, sr = audio
        no_rows, sig_len = sig.shape
        max_len = sr // 1000 * ms

        #Truncate
        if (sig_len > max_len):
            sig = sig[:, :max_len]
        #Padding
        elif (sig_len < max_len):
            #Length of the paddings at the start and end of the signal
            len_start = random.randint(0, max_len-sig_len)
            len_end = max_len - len_start - sig_len

            pad_start = torch.zeros((no_rows, len_start))
            pad_end = torch.zeros((no_rows, len_end))

            sig = torch.cat((pad_start, sig, pad_end), 1)

        return (sig, sr)


    # ----------------------------
    # Refer to textbox_1 for the reasoning of this method
    # ----------------------------
    @staticmethod
    def time_shift(aud, shift_limit):
        sig,sr = aud
        _, sig_len = sig.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        return (sig.roll(shift_amt), sr)

    # ----------------------------
    # Generating Spectrogram
    # ----------------------------
    @staticmethod
    def spectro_gram(audio, n_mels=64, n_fft=1024, hop_len=None):
        sig, sr = audio
        top_db = 80 #if we have more time, we can try 80
        spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)
        #shape of spec is [channel (mono or stereo etc), n_mels, time]
        spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
        return (spec)


    # ----------------------------
    # Augment the Spectrogram by masking out some sections of it in both the frequency
    # dimension (ie. horizontal bars) and the time dimension (vertical bars) to prevent
    # overfitting and to help the model generalise better. The masked sections are
    # replaced with the mean value.
    # ----------------------------
    @staticmethod
    def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
        _, n_mels, n_steps = spec.shape
        mask_value = spec.mean()
        aug_spec = spec

        freq_mask_param = max_mask_pct * n_mels
        for _ in range(n_freq_masks):
            aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

        time_mask_param = max_mask_pct * n_steps
        for _ in range(n_time_masks):
            aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

        return aug_spec
    

                

In [9]:
class SoundDS(Dataset):
  def __init__(self, df, path):
    self.df = df
    self.path = str(path)
    self.duration = 5000 #our audio is 5 seconds
    self.sr = 44100
    self.channel = 2
    self.shift_pct = 0.4


  def __len__(self):
    return len(self.df)
  
  def __shape__(self):
    return self.df.shape
  
  def __getitem__(self, index):
    file = self.path + self.df.loc[index, 'filename']
    class_id = self.df.loc[index, 'target'] #the index of the label aka target
    fold = self.df.loc[index, 'fold']

    audio = AudioUtil.open(file)
    #print(f"Original shape {audio[0].shape} and sample rate of {audio[1]}")
    rechannel = AudioUtil.rechannel(audio, self.channel)
    #print(f"Rechanneling shape {rechannel[0].shape} and sample rate of {rechannel[1]}")
    resamp = AudioUtil.resample(rechannel, self.sr)
    #print(f"Resampling shape {resamp[0].shape} and sample rate of {resamp[1]}")
    padded = AudioUtil.resize_aud(resamp, self.duration)
    #print(f"Padded shape {padded[0].shape} and sample rate of {padded[1]}")
    shifted = AudioUtil.time_shift(padded, self.shift_pct)
    #print(f"Time shift shape {shifted[0].shape} and sample rate of {shifted[1]}")
    sgram = AudioUtil.spectro_gram(shifted, n_mels=64, n_fft=1024, hop_len=None)
    #print(f"Mel spectrogram shape {sgram.shape}")
    aug_sgram = AudioUtil.spectro_augment(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)
    #print(f"Augmented spectrogram shape {aug_sgram.shape} of (num_channels, Mel freq_bands, time_steps)")
    feature = aug_sgram
    return feature[0].permute(1, 0), class_id


In [10]:
# a, b = myds[0]
# print(a.shape)
# print(b.shape)
# inputs, classes = next(iter(train_dl))
# print("inputs:", inputs)
# print("classes:", classes)

In [28]:
class AudioLSTM(nn.Module):

    def __init__(self, n_feature=64, out_feature=50, n_hidden=256, n_layers=2, drop_prob=0.2):
        super().__init__()
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.n_feature = n_feature

        self.lstm = nn.LSTM(self.n_feature, self.n_hidden, self.n_layers, dropout=self.drop_prob, batch_first=True)

        self.dropout = nn.Dropout(drop_prob)

        self.fc = nn.Linear(n_hidden, out_feature)

    def forward(self, x, hidden):
        # x.shape (batch, seq_len, n_features)
        l_out, l_hidden = self.lstm(x, hidden)

        # out.shape (batch, seq_len, n_hidden*direction)
        out = self.dropout(l_out)

        # out.shape (batch, out_feature)
        out = self.fc(out[:, -1, :])

        # return the final output and the hidden state
        return out, l_hidden

    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data

        hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
        return hidden


In [29]:
def train(model, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data = data.to(device)
        target = target.to(device)

        model.zero_grad()
        output, hidden_state = model(data, model.init_hidden(hyperparameters["batch_size"]))
        
        loss = criterion(output, target)
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        if batch_idx % log_interval == 0: #print training stats
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss))

In [30]:
def test(model, epoch):
    model.eval()
    correct = 0
    y_pred, y_target = [], []
    for data, target in test_loader:
        data = data.to(device)
        target = target.to(device)
        
        output, hidden_state = model(data, model.init_hidden(hyperparameters["batch_size"]))
        
        pred = torch.max(output, dim=1).indices
        correct += pred.eq(target).cpu().sum().item()
        y_pred = y_pred + pred.tolist()
        y_target = y_target + target.tolist()
    print('\nTest set: Accuracy: {}/{} ({:.0f}%)\n'.format(
        correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

In [31]:
hyperparameters = {"lr": 1e-4, "weight_decay": 0.001, "batch_size": 64, "in_feature": 64, "out_feature": 50}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

myds = SoundDS(df, data_path)

# Random split of 80:20 between training and validation
num_items = len(myds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds, val_ds = random_split(myds, [num_train, num_val])

kwargs = {'num_workers': 1, 'pin_memory': True} if device == 'cuda' else {}  # needed for using datasets on gpu

# Create training and validation data loaders
train_loader = torch.utils.data.DataLoader(train_ds, batch_size=hyperparameters["batch_size"], shuffle=True, drop_last=True, **kwargs)
test_loader = torch.utils.data.DataLoader(val_ds, batch_size=hyperparameters["batch_size"], shuffle=True, drop_last=True, **kwargs)

cuda


In [32]:
model = AudioLSTM(n_feature=hyperparameters["in_feature"], out_feature=hyperparameters["out_feature"])
model.to(device)
print(model)


optimizer = torch.optim.Adam(model.parameters(), lr=hyperparameters['lr'], weight_decay=hyperparameters['weight_decay'])
# scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)
criterion = nn.CrossEntropyLoss()
clip = 5  # gradient clipping

log_interval = 10
for epoch in range(1, 30):
    # scheduler.step()
    train(model, epoch)
    test(model, epoch)

AudioLSTM(
  (lstm): LSTM(64, 256, num_layers=2, batch_first=True, dropout=0.2)
  (dropout): Dropout(p=0.2, inplace=False)
  (fc): Linear(in_features=256, out_features=50, bias=True)
)

Test set: Accuracy: 11/400 (3%)


Test set: Accuracy: 16/400 (4%)


Test set: Accuracy: 29/400 (7%)


Test set: Accuracy: 27/400 (7%)


Test set: Accuracy: 31/400 (8%)


Test set: Accuracy: 39/400 (10%)


Test set: Accuracy: 42/400 (10%)


Test set: Accuracy: 55/400 (14%)


Test set: Accuracy: 49/400 (12%)


Test set: Accuracy: 60/400 (15%)


Test set: Accuracy: 61/400 (15%)


Test set: Accuracy: 54/400 (14%)


Test set: Accuracy: 77/400 (19%)


Test set: Accuracy: 71/400 (18%)


Test set: Accuracy: 81/400 (20%)


Test set: Accuracy: 71/400 (18%)


Test set: Accuracy: 72/400 (18%)


Test set: Accuracy: 80/400 (20%)


Test set: Accuracy: 74/400 (18%)


Test set: Accuracy: 84/400 (21%)


Test set: Accuracy: 79/400 (20%)


Test set: Accuracy: 88/400 (22%)


Test set: Accuracy: 76/400 (19%)


Test set: Accu

In [34]:
torch.save(model.state_dict(), 'LSTM_2-layers_256-hidden.pt')