In [None]:
import os
import sys
import librosa
import pickle
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Dense, Flatten, Dropout, BatchNormalization, Conv2D, MaxPooling2D, Input

from sklearn.metrics import classification_report, confusion_matrix
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split 

from google.colab import drive
drive.mount('/content/drive')

PATH = '/content/drive/My Drive/DSIM/'
data_path = PATH + 'dati/'
dump_path = PATH + 'dumps/'

In [None]:
def printProgressBar(i,max,postText):
    n_bar =10 #size of progress bar
    j= i/max
    sys.stdout.write('\r')
    sys.stdout.write(f"[{'=' * int(n_bar * j):{n_bar}s}] {int(100 * j)}%  {postText}")
    sys.stdout.flush()

In [None]:
def crop(input, size):
    output = input[0:min(size, input.shape[0])]
    output = np.concatenate((output, np.zeros(size-output.shape[0])))
    return output

In [None]:
def mfcc(file_path, max_len=40):
    seconds=3
    wave, sr = librosa.load(file_path, mono=True, sr=44100)
    wave = crop(wave, sr*seconds)
    mfcc = librosa.feature.mfcc(wave, sr=sr, n_fft=2048, hop_length=512, n_mfcc=40)
    return mfcc

In [None]:
def load_dump(dump_data):
    with open(dump_data, "rb") as fp: 
        data=pickle.load(fp)
    return data

In [None]:
def data_extract_mfcc():

dump_data = dump_path + "dump_mfcc.txt"
if not os.path.isfile(dump_data):
    data = []
    count=0
    for command in os.listdir(data_path):
        track_perc = 1
        data_gen = []
    for track in os.listdir(os.path.join(data_path,command)):
        try:
            #carico la traccia 
            track= mfcc(data_path+'/'+command+'/'+track)
            data.append([track, command,count])

            #barra di caricamento %
            printProgressBar(track_perc, len(os.listdir(os.path.join(data_path,command))), str(command))
            track_perc+=1
        except:
            print("\n An exception occurred: ", track)
        print('----> done!' )
        count+=1 

    with open(dump_data, "wb") as fp: 
        pickle.dump(data, fp)
    print("-----> ", dump_data, "saved!")
else:
    print('Restoring data from dump...\n')
    data = load_dump(dump_data)
    print('Done!')

return data

In [None]:
data = data_extract_mfcc()

In [None]:
def preprocess_data(data):

df = pd.DataFrame(data, columns=['features','subj_comm', 'subj_comm_int'])
df[['subject','command']] = df.subj_comm.str.split(expand=True, pat='_')
df['davide'] = np.where(df.subject=='davide',1,0)
df['gabriele'] = np.where(df.subject=='gabriele',1,0)
df['laura'] = np.where(df.subject=='laura',1,0)

X = np.array(df.features.tolist())

y_davide = to_categorical(df.davide.values)
y_gabriele = to_categorical(df.gabriele.values)
y_laura = to_categorical(df.laura.values)

dict_az = {"data": 0, "ora": 1, "temperatura":2}
y_az = to_categorical(df.command.map(dict_az).values)

X, y_laura, y_gabriele, y_davide, y_az = shuffle(X, y_laura, y_gabriele, y_davide, y_az)
X = X.reshape(-1,40,259,1)

return X, y_gabriele, y_laura, y_davide, y_az 

In [None]:
X, y_laura, y_gabriele, y_davide, y_az = preprocess_data(data)

In [None]:
X_train, X_test, y_train_lau, y_test_lau = train_test_split(X, y_laura, test_size=0.2, random_state=14)
X_train, X_test, y_train_g, y_test_g = train_test_split(X, y_gabriele, test_size=0.2, random_state=14)
X_train, X_test, y_train_d, y_test_d = train_test_split(X, y_davide, test_size=0.2, random_state=14)
X_train, X_test, y_train_az, y_test_az = train_test_split(X, y_az, test_size=0.2, random_state=14)

In [None]:
def build_model():

dropout_rate=0.3
kernel_size=(2,2)
pooling_dim=(2,2)

input_layer=Input(shape=(X.shape[1],X.shape[2],X.shape[3]))

# NETWORK SPEAKER

x= Conv2D(64, kernel_size=kernel_size, activation='relu')(input_layer)
x= Conv2D(64, kernel_size=kernel_size, activation='relu')(x)
x= Dropout(dropout_rate)(x)
x= MaxPooling2D(pool_size=pooling_dim)(x)

x= Conv2D(128, kernel_size=kernel_size, activation='relu')(x)
x= Conv2D(128, kernel_size=kernel_size, activation='relu')(x)
x= Dropout(dropout_rate)(x)
x= MaxPooling2D(pool_size=pooling_dim)(x)

x= Conv2D(256, kernel_size=kernel_size, activation='relu')(x)
x= Conv2D(256, kernel_size=kernel_size, activation='relu')(x)
x= Dropout(dropout_rate)(x)
x= MaxPooling2D(pool_size=pooling_dim)(x)

x= Flatten()(x)

x= Dense(128, activation='relu')(x)
x= Dropout(dropout_rate)(x)
x= Dense(64, activation='relu')(x)
x= Dropout(dropout_rate)(x)

output_laura= Dense(y_laura.shape[1], activation='softmax', name='laura')(x)
output_gabriele= Dense(y_gabriele.shape[1], activation='softmax', name='gabriele')(x)
output_davide = Dense(y_davide.shape[1], activation='softmax', name='davide')(x)

# NETWORK AZIONI

x = Conv2D(64, kernel_size=kernel_size, activation='relu')(input_layer)
x= Conv2D(64, kernel_size=kernel_size, activation='relu')(x)
x= Dropout(dropout_rate)(x)
x= MaxPooling2D(pool_size=pooling_dim)(x)

x= Conv2D(128, kernel_size=kernel_size, activation='relu')(x)
x= Conv2D(128, kernel_size=kernel_size, activation='relu')(x)
x= Dropout(dropout_rate)(x)
x= MaxPooling2D(pool_size=pooling_dim)(x)

x= Flatten()(x)

x= Dense(128, activation='relu')(x)
x= Dropout(dropout_rate)(x)

output_va = Dense(y_va.shape[1], activation='softmax', name='azione')(x)

model = Model(input_layer, [output_laura,output_gabriele,output_davide,output_va])

losses = {"laura": "binary_crossentropy",
          "gabriele": "binary_crossentropy",
          "davide": "binary_crossentropy",
          "azione": "categorical_crossentropy"}

model.compile(loss=losses,optimizer='adam', metrics=['accuracy'])

return model

In [None]:
def plot_history(history):
    
fig = plt.figure(figsize=(20,10))
#----------------- LAURA---------------------------
fig.add_subplot(2,4,1)
plt.plot(history.history['laura_accuracy'])
plt.plot(history.history['az_laura_accuracy'])
plt.title('model accuracy - LAURA')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
fig.add_subplot(2,4,5)
plt.plot(history.history['laura_loss'])
plt.plot(history.history['az_laura_loss'])
plt.title('model loss - LAURA')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')

#----------------- GABRIELE---------------------------
fig.add_subplot(2,4,2)
plt.plot(history.history['gabriele_accuracy'])
plt.plot(history.history['az_gabriele_accuracy'])
plt.title('model accuracy - GABRIELE')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
fig.add_subplot(2,4,6)
plt.plot(history.history['gabriele_loss'])
plt.plot(history.history['az_gabriele_loss'])
plt.title('model loss - GABRIELE')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')

#----------------- DAVIDE---------------------------
fig.add_subplot(2,4,3)
plt.plot(history.history['davide_accuracy'])
plt.plot(history.history['az_davide_accuracy'])
plt.title('model accuracy - DAVIDE')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
fig.add_subplot(2,4,7)
plt.plot(history.history['davide_loss'])
plt.plot(history.history['az_davide_loss'])
plt.title('model loss - DAVIDE')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')

#------------- AZIONE----------------------
fig.add_subplot(2,4,4)
plt.plot(history.history['azione_accuracy'])
plt.plot(history.history['az_azione_accuracy'])
plt.title('model accuracy - AZIONE')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
fig.add_subplot(2,4,8)
plt.plot(history.history['azione_loss'])
plt.plot(history.history['az_azione_loss'])
plt.title('model loss - AZIONE')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')

plt.show()

In [None]:
model=build_model()

In [None]:
plot_model(model,to_file='plot.png',show_shapes=True, dpi=50)

In [None]:
history = model.fit(X_train,[y_train_lau, y_train_g, y_train_d, y_train_az], validation_data=(X_test, [y_test_lau, y_test_g, y_test_d, y_test_az]), batch_size=32, epochs=30, verbose=1)

In [None]:
plot_history(history)

In [None]:
y_test_list = [y_test_lau, y_test_g, y_test_d, y_test_va]
names = ['Laura', 'Gabriele', 'Davide', 'Azione']
pos = 0

y_pred = model.predict(X_test)
for y in y_test_list:
    print("\n -", names[pos])
    y_pred_ = np.argmax(y_pred[pos], axis = 1)
    y_test = np.argmax(y_test_list[pos], axis = 1)
    print(classification_report(y_test, y_pred_, digits = 2))
    pos+=1

In [None]:
model.save(PATH+'model')