In [None]:
pip install python-levenshtein



## **Importing Libraries**

In [None]:
import librosa
import math
import numpy as np
import pdb
import string
from Levenshtein import distance
import csv, os, glob
from tensorflow import keras
import pandas as pd

import tensorflow as tf
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Dense,Dropout,MaxPooling2D,Conv2D,GlobalAveragePooling2D,BatchNormalization


# **Functions provided in utils.py**

In [None]:
def wav2feat(wavfile):
    '''
    Input: audio wav file name
    Output: Magnitude spectrogram
    '''
    x, Fs = librosa.load(wavfile, sr=44100, mono=True) 
    hop = int(0.01 * Fs) # 10ms
    win = int(0.02 * Fs) # 20ms
    X = librosa.stft(x, n_fft=1024, hop_length=hop, win_length=win, window='hann', center=True, pad_mode='reflect')
    return np.abs(X)

def wavs2feat(wavfiles):
    '''
    Concatenate the audio files listed in wavfiles
    Input: list of audio wav file names
    Output: Magnitude spectrogram of concatenated wav
    '''
    x = []
    for wf in wavfiles:
        x1, Fs = librosa.load(wf, sr=44100, mono=True)
        x.append(x1)
    x = np.hstack(x)
    hop = int(0.01 * Fs) # 10ms
    win = int(0.02 * Fs) # 20ms
    X = librosa.stft(x, n_fft=1024, hop_length=hop, win_length=win, window='hann', center=True, pad_mode='reflect')
    return np.abs(X)

def read_csv(filename):
    id_label = {}
    with open(filename,'r') as fid:
        for line in fid: # '176787-5-0-27.wav,engine_idling\n'
            tokens = line.strip().split(',') # ['176787-5-0-27.wav', 'engine_idling']
            id_label[tokens[0]] = tokens[1]
    return id_label

def editDistance(gt, est):
    '''both are lists of labels
    E.g. gt is "dog_bark-street_music-engine_idling"
    E.g. est is "street_music-engine_idling"
    '''
    gttokens = gt.split('-')
    esttokens = est.split('-')
    # Map token to char
    tokenset = list(set(gttokens+esttokens)) # ['dog_bark', 'siren', 'street_music', 'engine_idling']
    token_char = {}
    for i in range(len(tokenset)):
        token_char[tokenset[i]] = string.ascii_uppercase[i]  # {'dog_bark': 'A', 'siren': 'B', 'street_music': 'C', 'engine_idling': 'D'}
    # convert gt and est to strings
    gtstr = [token_char[t] for t in gttokens]
    gtstr = ''.join(gtstr)  # 'BCA'
    eststr = [token_char[t] for t in esttokens]
    eststr = ''.join(eststr)  # 
    # Compare
    editdist = distance(gtstr, eststr) # 1
    score = 1 - editdist/len(gtstr)
    return editdist, score

def evals(gtcsv, estcsv, taskid):
    gt_id_label = read_csv(gtcsv)
    est_id_label = read_csv(estcsv)
    score = 0
    for id in est_id_label:
        if taskid==1:
            if est_id_label[id] == gt_id_label[id]:
                score += 1
        elif taskid==2:
            _, ss = editDistance(gt_id_label[id], est_id_label[id])
            score += ss
        else:
            pdb.set_trace()
            assert False, ["taskid not correct; it is", taskid]
    avgScore = score/len(est_id_label)
    return avgScore

#if __name__=="__main__":
#    wavs = ['../shared_train/audio_train/180937-7-3-27.wav']
#    wavs2feat(wavs)
#     # wavfiles = ['../shared_train/audio_train/180937-7-3-27.wav','../shared_train/audio_train/180937-7-3-27.wav']
#     # X = wavs2feat(wavfiles)
#     # eval('test_task1/labels.csv', 'test_task1/est.csv', 1)
#     editDistance("dog_bark-street_music-engine_idling",
#         "siren-street_music-engine_idling")

# **Functions made**

**Feature extraction of multiple audio files**

In [None]:
def read_audio_files(files):
    feat = [wav2feat(file) for file in files]
    return feat

**Add padding for making the dimension same for all the files**

In [None]:
def add_padding(X):
    data_samples = len(X)
    for i in range(data_samples):
        X[i] = np.pad(X[i],((0,0),(0,401-X[i].shape[1])))
    X = np.array(X)
    return X

**Function for One_hot_encoding**

In [None]:
def one_hot_encoding(t_indices, N):
    '''
    Inputs:
        t_indices: [np.array] list of indices
        N: [int] total no. of classes
    Output:
        t_1hot: [np.array] one hot encoded vectors
    '''
    assert N>max(t_indices), (N, max(t_indices))
    ### WRITE YOUR CODE HERE - 2 MARKS
    l = len(t_indices)
    t_1hot = np.zeros((l,N),dtype = int)
    for i in range(l):
        t_1hot[i,t_indices[i]] = 1

    return t_1hot

**Splitting training data into training and validation data**

In [None]:
def splitData(X,t,testFraction=0.1):
    """
    Split the data randomly into training and test sets
    Use numpy functions only
    Inputs:
        X: np array of shape (Nsamples, dim)
        t: np array of len Nsamples; can be one hot vectors or labels
        testFraction: (float) Nsamples_test = testFraction * Nsamples
    """


    ### WRITE YOUR CODE HERE - 3 MARKS
    np.random.seed(0)
    np.take(X,np.random.permutation(X.shape[0]),axis=0,out=X);
    np.random.seed(0)
    np.take(t,np.random.permutation(X.shape[0]),axis=0,out=t);
    N_test = int(X.shape[0]*testFraction)
    X_test, X_train = X[:N_test,:,:,:], X[N_test:,:,:,:]
    t_test, t_train = t[:N_test], t[N_test:]   

    return X_train, t_train, X_test, t_test

**Training Function**

In [None]:
def train(X_train, t_train, X_val, t_val, Nepochs):
    '''
    Train a keras dense model for multi-event classification.
    Inputs:
        X_train: Data for training. 
        Nepochs: number of epochs
    Return:
        model: keras model
    '''
    n_input = X_train.shape[0]
    
    model = keras.Sequential()
    model.add(Conv2D(filters = 16, kernel_size = 3, input_shape = (513,401,1), activation = 'relu'))
    model.add(MaxPooling2D(pool_size = 2))
    model.add(Dropout(0.15))
    
    model.add(Conv2D(filters = 32, kernel_size = 3, activation = 'relu'))
    model.add(MaxPooling2D(pool_size = 2))
    model.add(Dropout(0.15))
    
    model.add(Conv2D(filters = 64, kernel_size = 3, activation = 'relu'))
    model.add(MaxPooling2D(pool_size = 2))
    model.add(Dropout(0.1))
    
    model.add(Conv2D(filters = 128, kernel_size = 3, activation = 'relu'))
    model.add(MaxPooling2D(pool_size = 2))
    model.add(Dropout(0.1))
    model.add(GlobalAveragePooling2D())
    
    model.add(Dense(100,activation = 'relu'))
    model.add(Dropout(0.1))

    model.add(Dense(11, activation = 'softmax'))
    
    opt = keras.optimizers.Adam(learning_rate = 0.01)
    checkpoint_filepath = "/checkpoint"
    checkpoint = keras.callbacks.ModelCheckpoint(filepath=checkpoint_filepath, verbose=1, monitor='val_accuracy',save_best_only=True,save_weights_only=True,mode='max')
    model.compile(optimizer=opt, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    model.fit(X_train, t_train, epochs = Nepochs,batch_size = 100, validation_data=(X_val,t_val),callbacks=[checkpoint])   
      

    return model

**Hard Prediction**

In [None]:
def predict(model, X_test):
    '''
    Predict class for unknown inputs
    Returns:
        y_pred: np array of predicted vectors.
    '''

    ### WRITE YOUR CODE HERE - 5 MARKS
    y_pred = model.predict(X_test)
    n_output = y_pred.shape[1]
    y = np.argmax(y_pred,axis = 1)
    y_pred = one_hot_encoding(y,n_output)

    return y_pred

**Evaluation of CM**

In [None]:
def evaluate(y_pred, t_test):
    '''
    Use only numpy
    Return CM: np.array of shape (No. of Events, No. of Events)
    '''
    n_test = y_pred.shape[0]

    n_classes = y_pred.shape[1]
    CM = np.zeros((n_classes,n_classes),dtype = int) 
    y, t = np.argmax(y_pred, axis = 1), t_test
    for i in range(n_test):
      CM[t[i],y[i]] = CM[t[i],y[i]] + 1

    return CM

# **Preprocessing of training data**

In [None]:
from google.colab import drive
drive.mount("/content/drive",force_remount = True)

Mounted at /content/drive


In [None]:
files = glob.glob("/content/drive/My Drive/audio_train_1ch/*.wav")
audio_files = [os.path.basename(file) for file in files]

*Saving extracted .npy file of training data into train_data.npy*

In [None]:
#X_train = read_audio_files(files)
#X_train = add_padding(X_train)
#np.save("/content/drive/My Drive/train_data.npy",X_train)

*Loading saved features for training Data*

In [None]:
train_data = np.load("/content/drive/My Drive/train_data.npy")

*Creating Labels for training_data*

In [None]:
Id_labels = read_csv("/content/drive/My Drive/labels_train.csv")
Label_to_index = {'dog_bark':0, 'air_conditioner':1, 'engine_idling':2, 'siren':3, 'class':4, 'gun_shot':5, 'jackhammer':6, 'drilling':7, 'children_playing':8, 'car_horn':9, 'street_music':10}
index_to_Label = {0:'dog_bark', 1:'air_conditioner', 2:'engine_idling', 3:'siren', 4:'class', 5:'gun_shot', 6:'jackhammer', 7:'drilling', 8:'children_playing', 9:'car_horn', 10:'street_music'}
target_data = np.array([Label_to_index[Id_labels[i]] for i in audio_files])

# **Preprocessing of test data**

In [None]:
test_files = glob.glob("/content/drive/My Drive/test_task1/feats/*.npy")
test_audio_files = [os.path.basename(file)[0:-4] for file in test_files]

fpath ="/content/drive/My Drive/test_data.npy"
npyfilespath ="/content/drive/My Drive/test_task1/feats"   
os.chdir(npyfilespath)
npfiles = glob.glob("/content/drive/My Drive/test_task1/feats/*.npy")
all_arrays = []
for i, npfile in enumerate(npfiles):
    all_arrays.append(np.load(os.path.join(npyfilespath, npfile)))
X_test = add_padding(all_arrays)
np.save(fpath,X_test)

test_data = X_test

# **Training**

In [None]:
train_data = train_data.reshape(train_data.shape[0],train_data.shape[1],train_data.shape[2],1)
X_train, t_train, X_val, t_val = splitData(train_data,target_data,testFraction=0.05)

In [None]:
if __name__=="__main__":
  model = train(X_train, t_train, X_val,t_val, Nepochs=100)

Epoch 1/100
Epoch 00001: val_accuracy improved from -inf to 0.06818, saving model to /checkpoint
Epoch 2/100
Epoch 00002: val_accuracy improved from 0.06818 to 0.20455, saving model to /checkpoint
Epoch 3/100
Epoch 00003: val_accuracy did not improve from 0.20455
Epoch 4/100
Epoch 00004: val_accuracy improved from 0.20455 to 0.40909, saving model to /checkpoint
Epoch 5/100
Epoch 00005: val_accuracy improved from 0.40909 to 0.45455, saving model to /checkpoint
Epoch 6/100
Epoch 00006: val_accuracy did not improve from 0.45455
Epoch 7/100
Epoch 00007: val_accuracy improved from 0.45455 to 0.48864, saving model to /checkpoint
Epoch 8/100
Epoch 00008: val_accuracy did not improve from 0.48864
Epoch 9/100
Epoch 00009: val_accuracy improved from 0.48864 to 0.65909, saving model to /checkpoint
Epoch 10/100
Epoch 00010: val_accuracy did not improve from 0.65909
Epoch 11/100
Epoch 00011: val_accuracy did not improve from 0.65909
Epoch 12/100
Epoch 00012: val_accuracy did not improve from 0.6590

**Test on Validation Dataset**

In [77]:
model.load_weights("/checkpoint")
model.evaluate(X_val, t_val)

y = predict(model,X_val)
print(evaluate(y,t_val))

[[11  0  1  0  0  1  0  1  0  0  0]
 [ 0 10  1  0  0  0  0  0  0  0  0]
 [ 0  1 11  0  0  0  0  0  0  0  0]
 [ 0  0  0  4  0  0  0  0  0  0  0]
 [ 0  0  0  0  0  0  0  0  0  0  0]
 [ 0  0  0  0  0  5  0  0  0  0  0]
 [ 0  0  0  0  0  0 12  0  0  0  0]
 [ 0  0  0  0  0  0  0  6  0  0  0]
 [ 0  0  0  0  0  0  0  0  7  0  0]
 [ 0  0  0  0  0  0  0  0  0  4  0]
 [ 0  0  1  0  0  0  0  0  0  0 12]]


# **Prediction on Test Data**

In [None]:
test_data = test_data.reshape(test_data.shape[0],test_data.shape[1],test_data.shape[2],1)
y_pred = predict(model, test_data)

# **Save keys to csv file**

In [None]:
def getKeysByValue(dictOfElements, valueToFind):
    keys = 'happy'
    listOfItems = dictOfElements.items()
    for item  in listOfItems:
        if item[1] == valueToFind:
            keys = item[0]
    return  keys


prediction_event = list()
prediction_key = np.argmax(y_pred, axis=1)
for i in range(len(prediction_key)):
  key_list = getKeysByValue(Label_to_index, prediction_key[i])
  prediction_event.append(key_list)
prediction_dict = dict(zip(test_audio_files, prediction_event)) 


filename = '/content/drive/MyDrive/task1.csv'
with open(filename, 'w') as f:
    for key in prediction_dict.keys():
        f.write("%s,%s\n" % (key, prediction_dict[key]))