In [None]:
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import datetime
import h5py
import os

In [None]:
#shows the available processing devices that can be used by tensorflow as well as the one currently used 
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())

sess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(log_device_placement=True))

In [None]:
def dataread(path,rotangles):
    #Function that reads the .csv files in the GCC folder created by the respeaker and
    #stores all the gcc data in a numpy array for further processing
    
    #path is the path where the GCC folder is stored
    #rotangles are the recorded angles extracted by the readme.txt
    c = []
    
    for i in range(len(rotangles)):
        #.csv file is read into a pandas dataframe
        df = pd.read_csv(path + 'GCC/gcc' + str(int(rotangles[i]))+'.csv')
        #light preprocessign that removes None values
        df = df.dropna()
        df = df.reset_index(drop=True)
        df = df.drop(labels='Unnamed: 0', axis=1)
        print(f"Processing degree no {rotangles[i]}.")
        c.append(df)
    Tensors = np.array([c[i].to_numpy() for i in range(len(rotangles))])
    return Tensors

def datseperate(Tensors,samples,ptr,pte, N =39):
    #this function seperates the obtained data into training,tests and validation for each degree
    
    #Tensors is the numpy array that has all the GCCs for each degree
    #samples is the number of GCCs for every degree
    #ptr is the percentage of training data
    #pte is the percentage of test data 
    #the rest data are validation data
    #size of GCC (default is 39 for interpolation of 4)
    
    
    #sets the indexes for the splitting of training,test and validation 
    pertr = int(ptr * samples)
    perte = int(pte* samples+pertr)
    
    inputs_train = []
    inputs_test = []
    inputs_validate = []
    

    for k in range(len(Tensors)):  
        datconst = Tensors[k][int(samples*N*(0)):int(samples*N*(1)),:]
        a = np.array(np.split(datconst,samples,axis =0))
        itr , ite ,iva = np.split(a, [pertr, perte])
        inputs_train.append(itr)
        inputs_test.append(ite)
        inputs_validate.append(iva)
    return inputs_train,inputs_test,inputs_validate



In [None]:
#This code gets the required metadata by the readme.txt

#path
path = ""

#Opens the Readme.txt file and reads the metadata in the form written in the Gui.py
f = open(path + 'Readme.txt' , 'r')
cont = f.readlines()
cont = [float(cont[i].split('is ')[1].split('\n')[0]) for i in range(len(cont))]
fs,duration,start,stop,res,samples = cont
f.close()

rotangles = np.arange(start,stop+res,res)

#Reads the GCC from the .csv files
Tensors = dataread(path,rotangles)


N = 39
percent_training = 0.6
percent_test = 0.2

training, test, validation =datseperate(Tensors,samples,percent_training,percent_test,N)

#The following 3 lines can be uncommented to concatenate the training, test and validation data of multiple recordings
#training = np.concatenate((training1,training2,training3),axis = 2)
#test = np.concatenate((test1,test2,test3),axis = 2)
#validation = np.concatenate((validation1,validation2,validation3),axis = 2)

training = np.reshape(training,(len(training)*len(training[0]),N,15))
test = np.reshape(test,(len(test)*len(test[0]),N,15))
validation = np.reshape(validation,(len(validation)*len(validation[0]),N,15))

#Creates one hot encoded labels
ohg = np.eye(len(rotangles))
training_labels = np.repeat(ohg,int(len(training)/len(rotangles)),axis =0)
test_labels = np.repeat(ohg,int(len(test)/len(rotangles)),axis =0)
validation_labels = np.repeat(ohg,int(len(validation)/len(rotangles)),axis =0)

In [None]:
#This code sets the model callbacks and the paths to save the model and the tensorboard information

tbpath = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S") 
mdpath = "logs/train/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S") +'model.{epoch:02d}.h5'
    
tb_callback = tf.keras.callbacks.TensorBoard(tbpath,
    histogram_freq=0,
    write_graph=True,
    write_images=True,
    update_freq="epoch",
    profile_batch=2
)


model_callback = tf.keras.callbacks.ModelCheckpoint(
    mdpath,
    monitor="val_loss",
    verbose=1,
    save_best_only= True,
    save_weights_only=False,
    mode="auto",
    save_freq="epoch",
    options=None
)


es_callback = tf.keras.callbacks.EarlyStopping(
    monitor="accuracy",
    min_delta=0,
    patience=20,
    verbose=1,
    mode="max",
    baseline=None,
    restore_best_weights=False,
)

In [None]:
#This part preprocesses the data to be inserted in the CNN, creates the LENET5 model and trains it

#This is done because Convolutional Neural Networks in tensorflow require one extra dimension
training = np.expand_dims(training,3)
test = np.expand_dims(test,3)
validation = np.expand_dims(validation,3)

lenet_5_model = keras.models.Sequential([
    keras.layers.Conv2D(6, kernel_size=5, strides=1,  activation='tanh', input_shape=training[0].shape, padding='same'), #C1
    keras.layers.AveragePooling2D(), #S2
    keras.layers.Conv2D(16, kernel_size=5, strides=1, activation='tanh', padding='valid'), #C3
    keras.layers.AveragePooling2D(), #S4
    keras.layers.Flatten(), #Flatten
    keras.layers.Dense(120, activation='tanh'), #C5
    keras.layers.Dense(84, activation='tanh'), #F6
    keras.layers.Dense(len(training_labels[0]), activation='softmax') #Output layer
])

lenet_5_model.compile(optimizer='adam', loss= 'categorical_crossentropy', metrics=['accuracy'])
history = lenet_5_model.fit(training, training_labels, epochs=100, batch_size=32,shuffle = True, validation_data=(validation, validation_labels))

In [None]:
np.shape(training_labels)

In [None]:
#This part creates and trains the Multilayer Perceptron model
model = tf.keras.models.Sequential()
model.add(tf.keras.Input(shape=(N,15)))
model.add(tf.keras.layers.Dense(100, activation='relu'))
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.Dense(50, activation='relu'))
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.Dense(50, activation='relu'))
model.add(tf.keras.layers.BatchNormalization())
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(36, activation = 'softmax'))
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model.optimizer.lr.assign(0.00005)

#log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
#tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

history = model.fit(training, training_labels, epochs=100, batch_size=32,shuffle = True, validation_data=(validation, validation_labels))

In [None]:
#This part evaluates the models and prints the predictions and the labels to get a better insight on the performance of the models
#lenet_5_model.evaluate(test, test_labels,verbose=1)
model.evaluate(test, test_labels,verbose=1)

# use the model to predict the test inputs
#predictions = lenet_5_model.predict(test)
predictions = model.predict(test)

# print the predictions and the expected ouputs
print("predictions =\n", np.round(predictions, decimals=3))
print("actual =\n", test_labels)

In [None]:
#This part saves the model in .h5 format, converts it in .tflite and prints its size 

#tf.saved_model.save(lenet_5_model,'.h5')
tf.saved_model.save(model,'Model.h5')

# Convert the model
saved_model_dir = "Model.h5"
converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir) # path to the SavedModel directory
tflite_model = converter.convert()

open("Model.tflite", "wb").write(tflite_model)
basic_model_size = os.path.getsize("Model.tflite")
print("Model is %d bytes" % basic_model_size)