In [1]:
import numpy as np
import os
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
import seaborn as sns

import librosa
import librosa.display

import tensorflow as tf
import keras
from keras import layers
from keras import regularizers

import pickle

In [2]:
df = pd.read_csv(r'UrbanSound8K.csv')
df.head()

FileNotFoundError: [Errno 2] No such file or directory: 'UrbanSound8K.csv'

## Visualize Data

#### Check Data distribution 

In [None]:
unique_labels, counts = np.unique(df['classID'], return_counts=True)
plt.bar(unique_labels, counts)
plt.xlabel('Class Label')
plt.ylabel('Number of Samples')
plt.title('Class distribution in dataset')
plt.show()

print(unique_labels, counts)

#### Pick a sample to display  

In [None]:
# 6192, 4729
sample_num = 4729

# get the filename
filename = df.slice_file_name[sample_num] 
print(filename)

path = '../UrbanSound8K/audio/fold' + str(df.fold[sample_num]) + '/' + str(filename)
signal, sr = librosa.load(path, sr=22050) # sr*T -> 22050*4

#### Waveform

In [None]:
librosa.display.waveshow(signal, sr=sr)
plt.xlabel("Time")
plt.ylabel("Amplitude")
plt.show()

#### FFT -> Spectrum

In [None]:
fft = np.fft.fft(signal)

magnitude = np.abs(fft)
frequency = np.linspace(0, sr, len(magnitude))
left_frequency = frequency[:int(len(frequency)/2)]
left_magnitude = magnitude[:int(len(frequency)/2)]

plt.plot(left_frequency, left_magnitude)
plt.xlabel("Frequency")
plt.ylabel("Magnitude")
plt.show()

#### STFT -> Spectrogram

In [None]:
n_fft = 256
hop_length = 512

stft = librosa.core.stft(signal, hop_length=hop_length, n_fft=n_fft)
spectogram = np.abs(stft)
log_spectogram = librosa.amplitude_to_db(spectogram)

librosa.display.specshow(log_spectogram, sr=sr, hop_length=hop_length)
plt.xlabel("Time")
plt.ylabel("Frequency")
plt.colorbar()
plt.show()

print(len(log_spectogram))

#### MFCCs

In [None]:
MFCCs = librosa.feature.mfcc(y=signal, n_fft=n_fft, hop_length=hop_length, n_mfcc=13)

librosa.display.specshow(MFCCs, sr=sr, hop_length=hop_length)
plt.xlabel("Time")
plt.ylabel("MFCC")
plt.colorbar()
plt.show()

## Data Preprocessing and Feature Extraction

#### Padding Audio Files

In [None]:
def zero_padded_data_nparray(audios_path, duration_secs = 4, sr = 22050, files_limit = -1, verbose = False):

    files = librosa.util.find_files(audios_path)
    data_array = []

    for index, path_file in enumerate(files):
        if files_limit != -1 and index >= files_limit: break
        
        if verbose:
            print(f"At audio {index+1}/{len(files)}")
            
        signal, sr = librosa.load(path_file, sr = sr, mono=True)
        
        # zero padding
        if len(signal) < duration_secs*sr:
            signal = np.concatenate([
                signal,
                np.zeros(shape = (duration_secs*sr - len(signal), ))
            ])
        elif len(signal) > duration_secs*sr:
            signal = signal[:duration_secs*sr]
            

        if len(signal) == duration_secs*sr:
            lst = path_file.split("\\")
            file_name = lst[-1]
            newrow = [file_name, signal]
            data_array.append(newrow)
        else:
            print(file_name)
            print(path_file) 
        
    return data_array

#### Feature extraction -> MFCCs

In [None]:
def get_features(df_in, signals, hop_length = 512, n_fft = 2048, n_mfcc = 40, sr = 44100):
    
    MFCCS = [] # list to save MFCCs
    SPECTOGRAM = [] # list to save Spectograms
    labels = [] # list to save labels
    
    for index in range(len(signals)):

        # get the filename        
        filename = signals[index][0]
        if filename:
            
            # find correspondig row in df_in
            row = df_in.loc[df_in["slice_file_name"] == filename]

            if not row.empty:
                # save labels
                label = row.iloc[0,6] 
                
                # Extracting MFCCs
                mfcc = librosa.feature.mfcc(y = np.array(signals[index][1]), 
                                            sr=sr, 
                                            n_fft = n_fft,  
                                            n_mfcc = n_mfcc,
                                            hop_length = hop_length)
                mfcc = mfcc.T
 
                MFCCS.append(np.array([mfcc]))


                # Extracting Spectograms
                stft = librosa.core.stft(y = np.array(signals[index][1]), 
                                         hop_length=hop_length, 
                                         n_fft=246)
                spectogram = np.abs(stft)
                log_spectogram = librosa.amplitude_to_db(spectogram)

                SPECTOGRAM.append(np.array([log_spectogram]))
                
                # Extracting labels
                labels.append(label)
                
            else: 
                print(f"No matching row for filename {filename}")
        else:
            print("Empty filename")

    # check is features and labels  have the same lenght
    assert len(MFCCS) == len(labels) == len(SPECTOGRAM)


    mfcc = np.concatenate(MFCCS, axis = 0)
    spec = np.concatenate(SPECTOGRAM, axis = 0)
    return(np.array(mfcc), np.array(spec), labels)

#### Save Data to Pickel file

In [None]:
def save_pkl(data, path):
    with open(path, "wb") as saved_data:
        pickle.dump(data, saved_data)
    saved_data.close()

def load_pkl(path):
    to_return = None
    with open(path, "rb") as loaded_data:
        to_return = pickle.load(loaded_data)
    loaded_data.close()
    return to_return

def numpy_array_float_32(data):
    return np.asarray(tuple(data)).astype(np.float32)

In [None]:
fold_paths = ["../UrbanSound8K/audio/fold1/", "../UrbanSound8K/audio/fold2/", "../UrbanSound8K/audio/fold3/",
              "../UrbanSound8K/audio/fold4/", "../UrbanSound8K/audio/fold5/", "../UrbanSound8K/audio/fold6/",
              "../UrbanSound8K/audio/fold7/", "../UrbanSound8K/audio/fold8/", "../UrbanSound8K/audio/fold9/",
              "../UrbanSound8K/audio/fold10/"]

mfcc = [10]
spec = [10]
labels = [10]

for i in range(len(fold_paths)): 
    print(fold_paths[i])

    # Adding ZEro-Padding to audio 
    audio = zero_padded_data_nparray(fold_paths[i])

    # Feature Extraction
    mfccs, spectograms, y = get_features(df, audio)

    # Data Normalization Min-Max scaling to [0, 1] 
    mfccs_scaled = (mfccs - np.min(mfccs)) / (np.max(mfccs) - np.min(mfccs))
    spectograms_scaled = (spectograms - np.min(spectograms)) / (np.max(spectograms) - np.min(spectograms))

    # One-Hot Encoding Target feature
    y_encoded = np.zeros((len(y), max(y) +1))
    y_encoded[np.arange(len(y)), y] = 1

    # Saving Features and Labels to 
    mfcc.append(np.array(mfccs_scaled))
    spec.append(np.array(spectograms_scaled))
    labels.append(y_encoded)

mfcc = mfcc[1:]
spec = spec[1:]
labels = labels[1:]

save_pkl(mfcc, "./mfcc.pkl")
save_pkl(spec, "./spec.pkl")
save_pkl(labels, "./labels.pkl")


## Building the Recurent Neural Network


#### Build Network topology

In [None]:
def create_model():
    input_shape = (124,173) # shape of X_train

    model = keras.Sequential()

    # 2 LSTM layers
    model.add(layers.LSTM(128,  input_shape = input_shape, return_sequences = True, activation='tanh', kernel_initializer='random_normal'))
    model.add(layers.LSTM(128, return_sequences = True, activation='tanh'))

    #model.add(layers.BatchNormalization())

    model.add(layers.TimeDistributed(layers.Dense(128, activation = 'tanh', kernel_regularizer = regularizers.l2(0.01))))
    model.add(layers.Dropout(0.3))
    model.add(layers.TimeDistributed(layers.Dense(64, activation='tanh', kernel_regularizer = regularizers.l2(0.01))))
    model.add(layers.Dropout(0.3))
    model.add(layers.TimeDistributed(layers.Dense(32, activation='tanh', kernel_regularizer = regularizers.l2(0.01))))
    model.add(layers.Dropout(0.3))
    model.add(layers.TimeDistributed(layers.Dense(16, activation='tanh', kernel_regularizer = regularizers.l2(0.01))))
    model.add(layers.Dropout(0.3))
    model.add(layers.TimeDistributed(layers.Dense(8, activation='tanh', kernel_regularizer = regularizers.l2(0.01))))
    model.add(layers.Dropout(0.3))


    #model.add(layers.BatchNormalization())

    # Flatten layer 
    model.add(layers.Flatten())

    # Output layer
    model.add(layers.Dense(10, activation = 'softmax'))

    return model

In [None]:
model = create_model()
model.summary()

We can also plot the graph of the LSTM:

In [None]:
keras.utils.plot_model(model, "model.png", show_shapes=True)

#### Compile the Model

The **Adam** optimizer manages the learning rate for stochastic gradient descent. The loss function is **categorical_crossentropy**, which is used when the target label is One-Hot-Encoded. 

In [None]:
optimizer1 = keras.optimizers.Adam(learning_rate = 0.001)
optimizer2 = keras.optimizers.SGD(clipvalue = 0.8, learning_rate = 0.0001)

model.compile(optimizer = optimizer1, loss = 'categorical_crossentropy', metrics=['accuracy'])

model


## Metric Variables

And, to analyse the 10 models made in the 10 fold cross validation, we will keep useful info in some arrays.

In [None]:
fold_metrics = []

## 10-Fold Cross Validation

The model will be fit using the Adam optimizar, we will use categorical (hence why we one hot encoded the class labels) crossentropy as our loss function, and we will use accuracy to analyse how well our model performs.

We will also input the data in batches of 64, and, from previous testings, we will define the number of epochs to 60, as more start to take too much time to train with a low yield of results.

In [17]:
EPOCHS = 60
features = spec

for fold in range(10):
    fold = "fold" + str(i+1)
    print("Fold "+str(i)+":")

    X_train, y_train = [], []
    X_test, y_test = [], []
        
    # Splitting the data into Test, Validation and Training sets
    for i in range(10):
        if( i != fold):
            X_train += features[i].tolist()
            y_train.extend(labels[i])
            
        else:
            X_test = features[i]
            y_test = labels[i]

        
    X_train = np.array(X_train)
    y_train = np.array(y_train)
    X_test = np.array(X_test)
    y_test = np.array(y_test)
            
    X_test, X_val, y_test, y_val = train_test_split(X_test, y_test, test_size = 0.5, random_state = 123)

    # Print sets shapes
    print(f"X_train Shape: {X_train.shape}")
    print(f"X_test Shape: {X_test.shape}")
    print(f"X_val Shape: {X_val.shape}")

    # Create & Compile model
    model = create_model()
    optimizer = keras.optimizers.Adam(learning_rate = 0.001)
    model.compile(
        optimizer = optimizer, 
        loss = 'categorical_crossentropy', 
        metrics=['accuracy']
    )

    # Train model
    LSTM = model.fit(
        X_train, y_train, 
        epochs = EPOCHS, 
        batch_size = 64, 
        shuffle=False, 
        validation_data=(X_val, y_val)
    )

    # Predict unseen data
    y_pred = model.predict(X_test)

    y_pred_reshape = np.argmax(y_pred, axis=1)
    y_test_reshape = np.argmax(y_test, axis=1)
    TestLoss, Testacc = model.evaluate(X_test, y_test)
    
    # Save fold results
    m_metrics = {
        'loss': TestLoss, 
        'accuracy': Testacc, 
        'confusion_matrix': confusion_matrix(y_test_reshape, y_pred_reshape), 
        'history': model, 'history_dict': LSTM.history
    }
    fold_metrics.append(m_metrics)

    save_pkl(m_metrics, f"assets/kfold_metrics/metrics_fold{i+1}.pkl")

    model.save(f"assets/kfold_metrics/model_fold{i+1}.keras", save_format="keras")
    save_pkl(fold_metrics, "assets/kfold_metrics/metrics.pkl")

    # restart model to avoid memory leakage
    del model 

    print("Done.")


X_train Shape: (6193, 124, 173)
X_test Shape: (990, 124, 173)
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


##  Model Analysis

#### Accuracy & Loss

In [None]:
import matplotlib.pyplot as plt
epochs = range(1, EPOCHS + 1)

fig, ax = plt.subplots(len(fold_metrics),2,figsize=(10,5*len(fold_metrics)))

for i in range(0,len(fold_metrics)):
    history_dict = fold_metrics[i].get('history_dict')
    loss_values=history_dict['loss']
    acc_values=history_dict['accuracy']
    val_loss_values = history_dict['val_loss']
    val_acc_values = history_dict['val_accuracy']

    ax[i,0].plot(epochs,loss_values,'co',label='Training Loss')
    ax[i,0].plot(epochs,val_loss_values,'m', label='Validation Loss')
    ax[i,0].set_title('Training and validation loss on fold '+str(i+1)+' of 10')
    ax[i,0].set_xlabel('Epochs')
    ax[i,0].set_ylabel('Loss')
    ax[i,0].legend()

    ax[i,1].plot(epochs,acc_values,'co', label='Training accuracy')
    ax[i,1].plot(epochs,val_acc_values,'m', label='Validation accuracy')
    ax[i,1].set_title('Training and validation accuracy on fold '+str(i+1)+' of 10')
    ax[i,1].set_xlabel('Epochs')
    ax[i,1].set_ylabel('Accuracy')
    ax[i,1].legend()
plt.show()

#### Average Accuracy

In [None]:
avg_train_acc = 0
for i in fold_metrics:
    avg_train_acc += max(i.get('history_dict').get('accuracy'))
    
print(str(avg_train_acc / len(fold_metrics)) + " average train accuracy across all folds.")

avg_val_acc = 0
for i in fold_metrics:
    avg_val_acc += max(i.get('history_dict').get('val_accuracy'))

print(str(avg_val_acc / len(fold_metrics)) + " average validation accuracy across all folds.")

avg_test_acc = 0
for i in fold_metrics:
    avg_test_acc += i.get('accuracy')

print(str(avg_test_acc / len(fold_metrics)) + " average test accuracy across all folds.")

#### Confusion Matrix

In [None]:
import seaborn as sns

fig, ax =plt.subplots(5, 2, figsize=(15,5*len(fold_metrics)))

for i in range(0,len(fold_metrics)):
    cm = fold_metrics[i].get('confusion_matrix')
    ax[i//2,i%2].set_title('Confusion matrix on fold '+str(i+1)+' of 10')
    ax[i//2,i%2].set_xlabel('Predicted label')
    ax[i//2,i%2].set_ylabel('Actual label')
    ax[i//2,i%2].invert_yaxis()
    sns.heatmap(cm, annot=True, fmt=".0f", linewidths=.5, square = True, cmap = 'RdYlGn', ax=ax[i//2,i%2])

plt.show()
