Import necessary libraries

In [None]:
#ALL IMPORTS
import tensorflow as tf
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
from statistics import mode
import csv
import os
import random
import copy
from tqdm import tqdm
from math import cos, sin
from scipy.interpolate import interp1d

!mkdir ./newData
!mkdir ./KNNtrain
!mkdir ./SVMtest

##Loading data

In [None]:
#create container for file data
def load_data(directory =  './'):
  data = []
  # iterate over files in that directory
  for filename in os.listdir(directory):
      f = os.path.join(directory, filename)
      # checking if it is a file
      if os.path.isfile(f) and f.endswith(".csv"):
        with open(f) as csvfile:
          print("reading file: ", f);
          csv_f = csv.reader(csvfile, delimiter = "\t")
          data = data + [row for row in csv_f]

  return data

print("Training data loading")
data = load_data("./")

print("Testing data loading")
newData = load_data("./newData")

In [None]:
#custom normalization
def mean_std_norm(data):
  media = np.mean(np.asarray(data).astype(float),0)
  stdev = np.std(np.asarray(data).astype(float),0)

  print("Media = ", media)
  print("Stdev = ", stdev)

  for i in range(len(data)):
    for j in range(6):
      data[i][j] = (float(data[i][j]) - media[j]) / stdev[j]

#min-max normalization alternative (DO THIS)
def min_max_norm(data):
  for i in range(len(data)):
    for j in range(6):
      data[i][j] = float(data[i][j]) / 16384

#post split normalization sample by sample
def samp_norm(X):
  for x in X:
    media = np.mean(x,0)
    stdev = np.std(x,0)

    for i in range(len(x)):
      for j in range(6):
        x[i][j] = (float(x[i][j]) - media[j]) / stdev[j]
  return X


In [None]:
min_max_norm(data)
min_max_norm(newData)

In [None]:
#Decide windows size and overlapping
def split_XY(data, window_size = 500, overlap = 0):
  #creating train set data
  X = []
  Y = []
  for i in range(0, len(data)-window_size, int(window_size*(1-overlap))):
    X.append([x[0:6] for x in data[i:i+window_size]])
    Y.append(mode([int(y[6]) for y in data[i:i+window_size]]))

  #convertion of values from string to integers
  X = np.array(X).astype(float)
  Y = np.array(Y).astype(int)

  return X,Y

X,Y = split_XY(data, 512, 0.3)
X_new,Y_new = split_XY(newData, 512, 0.3)

classes = len(np.unique(Y))

del data, newData

###Data augmentation

In [None]:
def randrot(X, ang = 0.4):
  a = np.random.uniform(-ang,ang)
  b = np.random.uniform(-ang,ang)
  c = np.random.uniform(-ang,ang)

  rot = [ [cos(a)*cos(b),     cos(a)*sin(b)*sin(c)-sin(a)*cos(c),     cos(a)*sin(b)*cos(c)+sin(a)*sin(c)],
          [sin(a)*cos(b),     sin(a)*sin(b)*sin(c)+cos(a)*cos(c),     sin(a)*sin(b)*cos(c)-cos(a)*sin(c)],
          [-sin(b),           cos(b)*sin(c),                          cos(b)*cos(c)                     ] ]

  rot = np.array(rot)

  for i in range(0,len(X)):
    X[i][:3] = np.matmul(rot, X[i][:3])

  return X


def const_scaling(X, sigma=1):
    scalingFactor = np.random.normal(loc=1.2, scale=sigma, size=(1,X.shape[1])) # shape=(1,3)
    myNoise = np.matmul(np.ones((X.shape[0],1)), scalingFactor)
    return (X*myNoise).tolist()

def noise_add(X, sigma=0.0008):
    myNoise = np.random.normal(loc=0, scale=sigma, size=X.shape)
    return (X+myNoise).tolist()

def obfuscation(X):
  rnd_sz = random.randint(10,50)
  rnd_pos = random.randint(0,len(X)-rnd_sz)

  for i in range(rnd_pos, rnd_pos+rnd_sz):
    for j in range(0,len(X[0])):
      X[i][j] = 0

  return X.tolist()

def curve_scaling(X, sigma = 0.3, sz = 8):
  noise = np.random.normal(loc=1, scale=sigma, size=(sz));

  x = np.linspace(0, X.shape[0], sz)
  noise = interp1d(x, noise, kind = 'quadratic')
  scalingFactor = noise(range(0,X.shape[0]))

  res = []
  for x in np.transpose(x_val):
    res.append(x*scalingFactor)

  return np.transpose(res).tolist()

def shift_values(X):
  #shift value
  val = random.randint(40,len(X)-40)
  X = np.roll(X, val, 0)
  return X.tolist()


In [None]:
#augment balancing classes (using the vector rotation)
balanced = True

x_aug = X.tolist()
y_aug = Y.tolist()

_, classes_count = np.unique(y_aug,return_counts=True)
max_class = np.max(classes_count)

class_num = 0
for cc in classes_count:
  indices = np.where(Y == class_num)[0]

  while cc < max_class :
    rnd_pos = random.randint(0, len(indices)-1)
    rnd_pos = indices[rnd_pos]

    x_val = np.array(x_aug[rnd_pos])
    x_aug.append(randrot(x_val, 0.2))
    y_aug.append(y_aug[rnd_pos])

    cc=cc+1

  class_num = class_num + 1

_, classes_count = np.unique(y_aug,return_counts=True)
print(classes_count)

In [None]:
size_multiplier = 4
try: #needed to check if balancing of classes was performed
    balanced;
except NameError:
  x_aug = X.tolist()
  y_aug = Y.tolist()

start_sz = len(x_aug)

while (len(x_aug) < start_sz * size_multiplier):

  for i in range(1):
    #augmentation 1 (random rotation)
    rnd_pos = random.randint(0, len(x_aug)-1)
    x_val = np.array(x_aug[rnd_pos])
    x_aug.append(randrot(x_val, 0.3))
    y_aug.append(y_aug[rnd_pos])

  #augmentation 2 (random noise)
  rnd_pos = random.randint(0, len(x_aug)-1)
  x_val = np.array(x_aug[rnd_pos])
  x_aug.append(noise_add(x_val))
  y_aug.append(y_aug[rnd_pos])

  #augmentation 3 (random scaling)
  rnd_pos = random.randint(0, len(x_aug)-1)
  x_val = np.array(x_aug[rnd_pos])
  x_aug.append(const_scaling(x_val))
  y_aug.append(y_aug[rnd_pos])

  #augmentation 5 (random shift to right)
  rnd_pos = random.randint(0, len(x_aug)-1)
  x_val = np.array(x_aug[rnd_pos])
  x_aug.append(shift_values(x_val))
  y_aug.append(y_aug[rnd_pos])



x_train = np.array(x_aug)
y_train = np.array(y_aug)

del x_aug
del y_aug

In [None]:
#testing aug
rnd_pos = 3000
x_val = np.array(x_train[rnd_pos])
# plt.plot(random_scaling(x_val,5))

res = curve_scaling(x_val)

sos, axis = plt.subplots(2)
axis[0].plot(x_val)
axis[1].plot(res)
sos.suptitle(y_train[rnd_pos])

In [None]:
#check number of samples per label
for i in range(10):
  print("Count of ", i, " labels: ", np.count_nonzero(y_train == i))

In [None]:
#check last eight inserted augmentations
fig = plt.figure(figsize=(15,6))
for ii in range(8):
    ax = fig.add_subplot(2,4,ii+1)
    ax.plot(x_train[-ii])
    plt.title("label = " + str(y_train[-ii]))

###FFT

In [None]:
#FFT of sample
def samp_fft(X):
  trans = []
  for x in X:
    trans.append(np.absolute(np.fft.rfft(x, axis = 0)[1:])) #tolto valore trasformata in f=0. Prendo solo ampiezza (rimuovo fase)
  return trans

x_train = np.array(samp_fft(x_train))
X_new =  np.array(samp_fft(X_new))


In [None]:
#fft augmentation
x_aug = []
y_aug = []
start_sz = len(x_train)
increment = 0.5

for i in range(0,int(start_sz*increment)):
  #aug 1
  rnd_pos = random.randint(0, start_sz-1)
  x_val = x_train[rnd_pos] + np.random.normal(0.08, 0.06)
  x_aug.append(x_val)
  y_aug.append(y_train[rnd_pos])

  #aug 2
  rnd_pos = random.randint(0, start_sz-1)
  x_val = x_train[rnd_pos]
  x_aug.append(curve_scaling(x_val))
  y_aug.append(y_train[rnd_pos])


x_aug = np.array(x_aug)
y_aug = np.array(y_aug)

x_train = np.append(x_train, x_aug, 0)
y_train = np.append(y_train, y_aug, 0)

In [None]:
#check last eight inserted augmentations
fig = plt.figure(figsize=(15,6))
for ii in range(8):
    ax = fig.add_subplot(2,4,ii+1)
    ax.plot(x_train[-ii])
    plt.title("label = " + str(y_train[-ii]))

In [None]:
for i in range(10):
  print("Count of ", i, " labels: ", np.count_nonzero(y_train == i))

##Model

In [None]:
#@title Conv2D FOR FFT
model = keras.Sequential(
  [
    keras.layers.Input((x_train.shape[1],x_train.shape[2],1)),
    keras.layers.Conv2D(filters=128, kernel_size=(20,1), strides = (5,1), activation="relu"),
    keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(0.5),

    keras.layers.Conv2D(filters=16, kernel_size=(10,3), strides = (2,3), activation="relu"),
    keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(0.5),

    keras.layers.Conv2D(filters=128, kernel_size=(8,2), strides = (3,1), activation="relu"),
    keras.layers.BatchNormalization(),
    tf.keras.layers.Dropout(0.5),

    keras.layers.GlobalAveragePooling2D(),
    keras.layers.Dense(classes, activation="softmax")
  ]
)

model.summary()

##Training

In [None]:
epochs = 300
batch_size = 128

callbacks = [
    keras.callbacks.ModelCheckpoint(
        "best_model.h5", save_best_only=True, monitor="val_loss"
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss", factor=0.5, patience=10, min_lr=0.00005
    ),
    keras.callbacks.EarlyStopping(monitor="val_loss", patience=25, verbose=1),
]

opt = keras.optimizers.Adam(learning_rate=0.05)

#ADD OR REMOVE "SPARSE" FOR ONEHOT ENCODING OR NOT
model.compile(
    optimizer = opt,
    loss = "sparse_categorical_crossentropy",
    metrics = ["sparse_categorical_accuracy"],
)
history = model.fit(
    x_train,
    y_train,
    batch_size=batch_size,
    epochs=epochs,
    callbacks=callbacks,
    validation_data = (X_new, Y_new),
    verbose=1,
    shuffle = True
)

##Evaluation of results

In [None]:
#Plot the model's training and validation loss
metric = "sparse_categorical_accuracy"
plt.figure()
plt.plot(history.history[metric])
plt.plot(history.history["val_" + metric])
plt.title("model " + metric)
plt.ylabel(metric, fontsize="large")
plt.xlabel("epoch", fontsize="large")
plt.legend(["train", "val"], loc="best")
plt.show()
plt.close()

In [None]:
# CONFUSION MATRIX OF TRAININIG DATA
import seaborn as sns
import pandas as pd

#predict labels of validation/test dataset
predictions = model.predict(x_train);
#print(predictions)
predictions = np.argmax(predictions, axis = 1)


#draw confusion matrix
con_mat = tf.math.confusion_matrix(y_train, predictions).numpy()
print(con_mat)
con_mat_norm = np.around(con_mat.astype('float') / con_mat.sum(axis=1)[:, np.newaxis], decimals=2)

classes_list = range(0,classes)
con_mat_df = pd.DataFrame(con_mat_norm, index = classes_list, columns = classes_list)

figure = plt.figure(figsize=(5, 4))

sns.heatmap(con_mat_df, annot=True,cmap=plt.cm.Blues)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

In [None]:
model = keras.models.load_model("best_model.h5")

test_loss, test_acc = model.evaluate(X_new, Y_new)

print("Test accuracy", test_acc)
print("Test loss", test_loss)


In [None]:
!pip install numpy==1.22.4

In [None]:
#CONFUSION MATRIX WITH UNSEEN DATA
import seaborn as sns
import pandas as pd

#predict labels of validation/test dataset
predictions = model.predict(X_new);

predictions = np.argmax(predictions, axis = 1)


#draw confusion matrix
con_mat = tf.math.confusion_matrix(Y_new, predictions).numpy()
print(con_mat)
con_mat_norm = np.around(con_mat.astype('float') / con_mat.sum(axis=1)[:, np.newaxis], decimals=2)

classes_list = range(0,len(con_mat))
con_mat_df = pd.DataFrame(con_mat_norm, index = classes_list, columns = classes_list)

figure = plt.figure(figsize=(5, 4))

sns.heatmap(con_mat_df, annot=True,cmap=plt.cm.Blues)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

##Quantization

In [None]:
model = keras.models.load_model("best_model.h5")

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]

dataset_x = tf.data.Dataset.from_tensor_slices(x_train)

def representative_dataset_gen():
    for input_value in dataset_x.batch(1).take(100):
        input_value = tf.expand_dims(input_value, axis=-1)  # only for CONV2 NN: Aggiungi una dimensione di canale
        yield [tf.cast(input_value, tf.float32)]

converter.representative_dataset = representative_dataset_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]

tflite_model = converter.convert()

del dataset_x

open('q_model_2.tflite', 'wb').write(tflite_model)



In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()

open('q_model_1.tflite', 'wb').write(tflite_model)

In [None]:
#Loading model
def predict_q_model(filename, x_test):
  interpreter = tf.lite.Interpreter(model_path=tflite_model_file)
  interpreter.allocate_tensors()

  #getting input specifics to pass later
  input_index = interpreter.get_input_details()[0]["index"]
  output_index = interpreter.get_output_details()[0]["index"]


  #changing type to ensure correct analization
  test_vals = x_test.astype(np.float32)

  predictions = []

  tensorSize = np.append(list(test_vals.shape),1)
  tensorSize[0] = 1

  for i in tqdm(range(len(test_vals))):
    interpreter.set_tensor(input_index, test_vals[i].reshape(tensorSize)) #this for conv2d
    #interpreter.set_tensor(input_index, test_vals[i].reshape(1,500,6))  #this for conv1d
    interpreter.invoke()
    predictions.append(interpreter.get_tensor(output_index))

  return predictions

In [None]:
tflite_model_file = 'q_model_2.tflite' # Change the filename here for different models

predictions = predict_q_model(tflite_model_file, X_new)

score = 0
for i in range(len(Y_new)):
  prediction=np.argmax(predictions[i])
  label = Y_new[i]
  if prediction==label:
    score=score+1

print("\nOut of", len(Y_new), "predictions, " + str(score) + " were correct")
print("Accuracy: ", score/len(Y_new))



##SVM implementation with last CNN layer


In [None]:
model = keras.models.load_model("best_model.h5")
model.pop() # this will remove the last layer
model.summary() # check the network

In [None]:

svm_data = load_data("./KNNtrain")
min_max_norm(svm_data)

svm_x_train, svm_y_train = split_XY(svm_data, 512, 0)
svm_x_train = samp_fft(svm_x_train)
svm_x_train = np.array(svm_x_train)

svm_data = load_data("./SVMtest")
min_max_norm(svm_data)

svm_x_test, svm_y_test = split_XY(svm_data, 512, 0)
svm_x_test = samp_fft(svm_x_test)
svm_x_test = np.array(svm_x_test)

In [None]:
from sklearn import svm, tree, ensemble, neighbors

feature_mapping = model(svm_x_train)

# clf = svm.SVC(gamma = 0.001)
#clf = ensemble.RandomForestClassifier()
clf = neighbors.KNeighborsClassifier()

clf.fit(feature_mapping, svm_y_train)

In [None]:
predCNN = model(svm_x_test)

predSVM = clf.predict(predCNN)

res = np.count_nonzero(svm_y_test == predSVM)

print("The accuracy of the classifier is: ", res/len(predSVM)*100, "%")

In [None]:
#CONFUSION MATRIX WITH UNSEEN DATA

#predict labels of validation/test dataset
predictions = clf.predict(predCNN);

#draw confusion matrix
con_mat = tf.math.confusion_matrix(svm_y_test, predictions).numpy()
print(con_mat)
con_mat_norm = np.around(con_mat.astype('float') / con_mat.sum(axis=1)[:, np.newaxis], decimals=2)

classes_list = range(0,10)
con_mat_df = pd.DataFrame(con_mat_norm, index = classes_list, columns = classes_list)

figure = plt.figure(figsize=(5, 4))

sns.heatmap(con_mat_df, annot=True,cmap=plt.cm.Blues)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

#Quantization + SVM/tree/KNN



In [None]:
model = keras.models.load_model("best_model.h5")
model.pop()
model.summary()

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]

dataset_x = tf.data.Dataset.from_tensor_slices(x_train)

def representative_dataset_gen():
    for input_value in dataset_x.batch(1).take(100):
        input_value = tf.expand_dims(input_value, axis=-1)  # only for CONV2 NN: Aggiungi una dimensione di canale
        yield [tf.cast(input_value, tf.float32)]

converter.representative_dataset = representative_dataset_gen
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]

tflite_model = converter.convert()

del dataset_x
open('q_model_2.tflite', 'wb').write(tflite_model)

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()

open('q_model_1.tflite', 'wb').write(tflite_model)

In [None]:
!mkdir ./newData
!mkdir ./KNNtrain
!mkdir ./SVMtest

In [None]:

svm_data = load_data("./KNNtrain")
min_max_norm(svm_data)

svm_x_train, svm_y_train = split_XY(svm_data, 512, 0)
svm_x_train = samp_fft(svm_x_train)
svm_x_train = np.array(svm_x_train)

svm_data = load_data("./SVMtest")
min_max_norm(svm_data)

svm_x_test, svm_y_test = split_XY(svm_data, 512, 0)
svm_x_test = samp_fft(svm_x_test)
svm_x_test = np.array(svm_x_test)

In [None]:
from sklearn import svm, tree, ensemble, neighbors

tflite_model_file = 'q_model_2.tflite' # Change the filename here for different models

feature_mapping = predict_q_model("q_model_2.tflite", svm_x_train)
feature_mapping = np.array(feature_mapping).squeeze()
print(feature_mapping.shape);

# clf = svm.SVC(gamma = 0.001)
# clf = ensemble.RandomForestClassifier()
clf = neighbors.KNeighborsClassifier()

clf.fit(feature_mapping, svm_y_train)

In [None]:
predCNN = predict_q_model("q_model_2.tflite", svm_x_test)
print(len(predCNN));

predCNN = np.array(predCNN).squeeze()
print(predCNN.shape);
predSVM = clf.predict(predCNN)

res = np.count_nonzero(svm_y_test == predSVM)
print("\nThe accuracy of the SVM is: ", res/len(predSVM)*100, "%")

In [None]:
#check structure tflite model
# Carica il modello TFLite
interpreterCheck = tf.lite.Interpreter(model_path="q_model_2.tflite")
interpreterCheck.allocate_tensors()

# Ottieni dettagli del modello
input_details = interpreterCheck.get_input_details()
output_details = interpreterCheck.get_output_details()

# Stampa dettagli del modello
print("Input details:")
print(input_details)
print("\nOutput details:")
print(output_details)

In [None]:
!pip install -U micromlgen

In [None]:
from micromlgen import port

c_code = port(clf)

with open('classifier.h', 'w') as file:
  file.write(c_code)


# TFLite->TFMicro

In [None]:
!apt-get update && apt-get -qq install xxd

MODEL_TFLITE = '/content/q_model_2.tflite'
MODEL_TFLITE_MICRO = 'q_model_2_2_new.cc'
!xxd -i {MODEL_TFLITE} > {MODEL_TFLITE_MICRO}
#REPLACE_TEXT = MODEL_TFLITE.replace('/', '_').replace('.', '_')

In [None]:
#saving model not quantized
!zip -r ./model.zip ./saved_model