In [None]:
import pandas as pd
import numpy as np

import os
import sys

import librosa
import librosa.display
import seaborn as sns
import matplotlib.pyplot as plt

from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.model_selection import train_test_split

from IPython.display import Audio

import tensorflow as tf

# Data Loading

In [None]:
# Paths for data.
path = "/AudioWAV/" # not on Github

In [None]:
path_directory_list = os.listdir(path)

file_emotion = []
file_path = []

for file in path_directory_list:
    # storing file paths
    file_path.append(path + file)
    # storing file emotions
    part=file.split('_')
    if part[2] == 'SAD':
        file_emotion.append('sad')
    elif part[2] == 'ANG':
        file_emotion.append('angry')
    elif part[2] == 'DIS':
        file_emotion.append('disgust')
    elif part[2] == 'FEA':
        file_emotion.append('fear')
    elif part[2] == 'HAP':
        file_emotion.append('happy')
    elif part[2] == 'NEU':
        file_emotion.append('neutral')
    else:
        file_emotion.append('Unknown')
        
# dataframe for emotion of files
emotion_df = pd.DataFrame(file_emotion, columns=['Emotions'])

# dataframe for path of files.
path_df = pd.DataFrame(file_path, columns=['Path'])
ata_df = pd.concat([emotion_df, path_df], axis=1)
data_df.head()

# Data Processing

In [None]:
#Processing the data
dataset_min = 0.0
dataset_max = 1.0

def denormalize_dataset(input_val):
  global dataset_min, dataset_max
  return input_val * (dataset_max - dataset_min)

#Function to normalize input values
def normalize_dataset(input_val):
  global dataset_min, dataset_max
  dataset_min = np.min(input_val) 
  dataset_max = np.max(input_val) 

  diff = dataset_max - dataset_min
  if (diff != 0):
    input_val /= diff
  return input_val

def interpolateAudio(audio):
    factor = float(mic_sr)/desired_sr
    x_interp_values = []
    for i in range(len(audio)):
        x_interp_values.append(int(factor*i))
    audio_interpolated = np.interp(range(int(len(audio)*factor)), x_interp_values, audio)

    return mic_sr, audio_interpolated

In [None]:
word_dirs = list(set(data_df['class'].to_list()))
hotwords = path_directory_list

print("All words in dataset - \n", ', '.join(word_dirs))
print("\nHotwords - \n", ', '.join(hotwords))

In [None]:
add_noise = False # add different words, null samples and random noise
n_classes = len(hotwords) + int(add_noise) 

class_nSamples = 1000
other_nSamples = float(class_nSamples)/(len(word_dirs) - n_classes)

def nLabel(word):
    return n_classes-1 if ( word not in hotwords ) else hotwords.index(word)

def textLabel(index):
    return hotwords[index] if index <len(hotwords) else "background"

def sampleBackGround():
    return add_noise

In [None]:
#Dataset storing audio samples for wake word and background

top_dir = "audio"

input_audio   = np.empty((0, desired_samples)).astype(np.float32)
input_labels  = np.empty((0)).astype(np.int32); # index of the word in hotwords list is the lable.

for word in (word_dirs) :
    print("\n",word)
    
    if ( word not in hotwords and False == sampleBackGround()) : # background, do not include
        print("-- Background/noise/other words not included")
        continue
        
    else: # to be included
        dfx = data_df[data_df['class'] == word]
        start_time = time.time()

        wav_files = 0

        word_samples = np.empty((0, desired_samples))
        
        if word in hotwords: # hotwords
            print("-- Category : hotword")
            
            for i in range(len(dfx)):
                file_path = top_dir + "/fold" + str(dfx.iloc[i]['fold']) + "/" + str(dfx.iloc[i]['slice_file_name'])

                X_sub = np.empty((0, desired_samples))
                X, sr = librosa.core.load(file_path, sr=desired_sr)
                X, interval = librosa.effects.trim(X)

                if X.shape[0] < desired_sr: # if samples less than 1 second
                    continue

                if X.shape[0]%desired_samples != 0: # if it needs padding, else, there will be unnecessary silence appended
                    X = np.pad(X, (0, desired_samples - (X.shape[0]%desired_samples)))
                
                X_sub = np.array(np.split(X, int(X.shape[0]*1.0/desired_samples)))
                
                word_samples = np.append(word_samples, X_sub, axis=0)

                if ( word_samples.shape[0] > class_nSamples ):
                    break

                wav_files = wav_files + 1
            
        else:
            print("-- Category : backgound/noise/other words")

            for i in range(len(dfx)):
                file_path = top_dir + "/fold" + str(dfx.iloc[i]['fold']) + "/" + str(dfx.iloc[i]['slice_file_name'])

                X, sr = librosa.core.load(file_path, sr=desired_sr)
                X, interval = librosa.effects.trim(X)
                X = np.pad(X, (0,desired_samples - (X.shape[0]%desired_samples)))
                X_sub = np.array(np.split(X, int(X.shape[0]*1.0/desired_samples)))

                word_samples = np.append(word_samples, X_sub, axis=0)

                if ( word_samples.shape[0] > other_nSamples ):
                    break
                
                wav_files = wav_files + 1
            
        if ( word_samples.size > 0 ):
            input_audio = np.concatenate((input_audio, word_samples), axis=0)
            labels = np.full((word_samples.shape[0]), nLabel(word))
            input_labels = np.concatenate((input_labels, labels))

            print("added {} audio files with {} samples for word \"{}\" with label {} in {:.1f} sec.".
                  format(wav_files, labels.shape[0], word, nLabel(word), (time.time() - start_time)))

In [None]:
# Concatenating dataset into matrix of inputs and labels

onehot_labels = np.zeros((input_labels.size, n_classes)).astype(np.int32)
onehot_labels[np.arange(input_labels.size), input_labels] = 1

input_labels = onehot_labels
print("Input dataset size:", input_audio.shape)
print("Input targets size:", input_labels.shape)

In [None]:
# Add 10% of random noise and 10% of silent samples as background.
if ( sampleBackGround() ) :
    n_bg_samples = int(other_nSamples)

    bg_labels    = np.zeros((n_bg_samples, n_classes)).astype(np.int)
    bg_labels[:,n_classes-1] = 1

    silence = np.zeros((n_bg_samples, desired_samples))
    input_audio = np.append(input_audio, silence, axis=0)
    input_labels = np.append(input_labels, bg_labels, axis=0)
    
    background = np.zeros((n_bg_samples, desired_samples))
    input_audio = np.append(input_audio, background, axis=0)
    input_labels = np.append(input_labels, bg_labels, axis=0)

#     %xdel background
#     %xdel silence
#     %xdel bg_labels

In [None]:
# hop_len=int(win_len/4) # default
# fft_len=pow(2, int(np.log2(win_len)+1))
fft_len = 2048
win_len = fft_len
hop_len = int(win_len/4)

def spectrogramOp(X):
  # STFT returns np.ndarray of shape=(1 + fft_len/2, t)
  spectrogram_out = librosa.core.stft(X, n_fft=fft_len, hop_length=hop_len, win_length=win_len, center=True)
#  spectrogram_out = np.swapaxes(np.abs(spectrogram_out), 0, 1)
  return np.absolute(spectrogram_out)

#inputs = np.array([spectrogramOp(input) for input in input_audio])
input_spectrogram = np.empty((input_audio.shape[0], int(fft_len/2 + 1), int(desired_samples/hop_len + 1))).astype(np.float32)

i = 0 ;
for input in input_audio:
    input_spectrogram[i] = spectrogramOp(input) 
    i = i +  1
print("input dataset size:", input_spectrogram.shape)

In [None]:
# Normalize
input_spectrogram = normalize_dataset(input_spectrogram)

In [None]:
total_len = input_labels.shape[0]

#Shuffling inputs and labels
shuffle_permutation = np.arange(total_len)
np.random.shuffle(shuffle_permutation)

input_spectrogram = input_spectrogram[shuffle_permutation]
input_labels = input_labels[shuffle_permutation]

#Splitting into train and test dataset - 90-10 ratio
train_split = 0.9
cutoff = int(train_split*total_len)

inputs_train = input_spectrogram[:cutoff]
inputs_test = input_spectrogram[cutoff:]
labels_train = input_labels[:cutoff]
labels_test = input_labels[cutoff:]

In [None]:
#Selecting random index from test dataset

ind = int(np.random.uniform()*len(inputs_train))

#Displaying sample spectrogram and audio from test dataset
X = inputs_train[ind]
y = labels_train[ind].argmax()
print("Label :", textLabel(y) )
plt.imshow(X, cmap='hot', interpolation='nearest', aspect='auto')
plt.show()
audio = librosa.core.istft(X, hop_length=hop_len, win_length=win_len)

ipd.Audio(audio, rate=desired_sr, autoplay=True)

## Modeling Building

In [None]:
from keras.models import Model
from keras.layers import Flatten, Dense, Input, Conv2D, MaxPooling2D, GlobalAveragePooling2D, GlobalMaxPooling2D
from keras.engine.topology import get_source_inputs
from keras import backend as K

import vggish_params as params

def VGGish(load_weights=True, weights='audioset',
           input_tensor=None, input_shape=None,
           out_dim=None, include_top=True, pooling='avg'):
    '''
    An implementation of the VGGish architecture.

    :param load_weights: if load weights
    :param weights: loads weights pre-trained on a preliminary version of YouTube-8M.
    :param input_tensor: input_layer
    :param input_shape: input data shape
    :param out_dim: output dimension
    :param include_top:whether to include the 3 fully-connected layers at the top of the network.
    :param pooling: pooling type over the non-top network, 'avg' or 'max'

    :return: A Keras model instance.
    '''

    if weights not in {'audioset', None}:
        raise ValueError('The `weights` argument should be either '
                         '`None` (random initialization) or `audioset` '
                         '(pre-training on audioset).')

    if out_dim is None:
        out_dim = params.EMBEDDING_SIZE

    # input shape
    if input_shape is None:
        input_shape = (params.NUM_FRAMES, params.NUM_BANDS, 1)

    if input_tensor is None:
        aud_input = Input(shape=input_shape, name='input_1')
    else:
        if not K.is_keras_tensor(input_tensor):
            aud_input = Input(tensor=input_tensor, shape=input_shape, name='input_1')
        else:
            aud_input = input_tensor



    # Block 1
    x = Conv2D(64, (3, 3), strides=(1, 1), activation='relu', padding='same', name='conv1')(aud_input)
    x = MaxPooling2D((2, 2), strides=(2, 2), padding='same', name='pool1')(x)

    # Block 2
    x = Conv2D(128, (3, 3), strides=(1, 1), activation='relu', padding='same', name='conv2')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), padding='same', name='pool2')(x)

    # Block 3
    x = Conv2D(256, (3, 3), strides=(1, 1), activation='relu', padding='same', name='conv3/conv3_1')(x)
    x = Conv2D(256, (3, 3), strides=(1, 1), activation='relu', padding='same', name='conv3/conv3_2')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), padding='same', name='pool3')(x)

    # Block 4
    x = Conv2D(512, (3, 3), strides=(1, 1), activation='relu', padding='same', name='conv4/conv4_1')(x)
    x = Conv2D(512, (3, 3), strides=(1, 1), activation='relu', padding='same', name='conv4/conv4_2')(x)
    x = MaxPooling2D((2, 2), strides=(2, 2), padding='same', name='pool4')(x)



    if include_top:
        # FC block
        x = Flatten(name='flatten_')(x)
        x = Dense(4096, activation='relu', name='vggish_fc1/fc1_1')(x)
        x = Dense(4096, activation='relu', name='vggish_fc1/fc1_2')(x)
        x = Dense(out_dim, activation='relu', name='vggish_fc2')(x)
    else:
        if pooling == 'avg':
            x = GlobalAveragePooling2D()(x)
        elif pooling == 'max':
            x = GlobalMaxPooling2D()(x)


    if input_tensor is not None:
        inputs = get_source_inputs(input_tensor)
    else:
        inputs = aud_input
    # Create model.
    model = Model(inputs, x, name='VGGish')


    # load weights
    if load_weights:
        if weights == 'audioset':
            if include_top:
                model.load_weights(WEIGHTS_PATH_TOP)
            else:
                model.load_weights(WEIGHTS_PATH)
        else:
            print("failed to load weights")

    return model

model = VGGish()

In [None]:
model.fit(inputs_train, labels_train, batch_size=64, epochs=1024, callbacks=callbacks, validation_data=(inputs_test, labels_test))

## Inference Testing

In [None]:
ind = int(np.random.uniform()*len(inputs_test))
spectrogram_out = inputs_test[ind]

ipd.Audio(spectrogram_out, rate=desired_sr)

y = labels_test[ind]
output = model.predict(np.expand_dims(np.array([inputs_test[ind]]), 0))

print("True label:", textLabel(np.argmax(y)))
print("Prediction:", textLabel(np.argmax(output)))

plt.imshow(spectrogram_out, cmap='hot', interpolation='nearest', aspect='auto')
plt.show()

In [None]:
background = spectrogramOp(np.random.random((desired_samples)))
silence = spectrogramOp(np.zeros((desired_samples)))
background_out, silence_out = model.predict(np.array([background, silence]))
print("Predicted ", textLabel(background_out.argmax()), "on random audio with vector", background_out)
print("Predicted ", textLabel(silence_out.argmax()), "on null audio with vector", silence_out)