In [None]:
# cnn model with standardization
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from numpy import mean
from numpy import std
from numpy import dstack
from pandas import read_csv
from matplotlib import pyplot
from sklearn.preprocessing import StandardScaler, LabelEncoder, label_binarize
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Dense, Activation
from keras.layers import Flatten
from keras.layers import Dropout
from keras.layers import Conv1D, LSTM
from keras.layers import MaxPooling1D
from keras.layers import BatchNormalization
from keras.layers import RepeatVector
from keras.layers import TimeDistributed
from keras.utils import to_categorical
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

In [None]:
# load a single file as a numpy array
def load_file(filepath):
	dataframe = read_csv(filepath, header=None, delim_whitespace=False)
	return dataframe.values

In [None]:
# load a list of files into a 3D array of [samples, timesteps, features]
def load_group(filenames, prefix=''):
	loaded = list()
	for name in filenames:
		data = load_file(prefix + name)
		loaded.append(data)
	# stack group so that features are the 3rd dimension
	loaded = dstack(loaded)
	return loaded

In [None]:
# load a dataset group, such as train or test
def load_dataset_group(group, prefix=''):
	filepath = prefix + group + '/Inertial Signals/'
	# load all 9 files as a single array
	filenames = list()
	# total acceleration
	filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt']
	# body acceleration
	filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt']
	# body gyroscope
	filenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt']
	# load input data
	X = load_group(filenames, filepath)
	# load class output
	y = load_file(prefix + group + '/y_'+group+'.txt')
	return X, y

In [None]:
def loadDataset():
  filename = list()
  filename += ['/content/drive/MyDrive/data_2/aX_scale.txt',
               '/content/drive/MyDrive/data_2/aY_scale.txt',
               '/content/drive/MyDrive/data_2/aZ_scale.txt',
               '/content/drive/MyDrive/data_2/gX_scale.txt',
               '/content/drive/MyDrive/data_2/gY_scale.txt',
               '/content/drive/MyDrive/data_2/gZ_scale.txt']
  # load input data
  X = load_group(filename)
  print()
  # load class output
  y = load_file('/content/drive/MyDrive/data_2/labels.txt')

  # train test split
  trainX, testX, trainy, testy = train_test_split(X, y, random_state = 30, test_size = 0.3)

  # zero-offset class values
  trainy = trainy-1
  testy = testy-1

  # one hot encode y
  trainy = to_categorical(trainy)
  testy = to_categorical(testy)

  return trainX, trainy, testX, testy #x_train, y_train, x_test, y_test, x_val, y_val

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
trainX, trainy, testX, testy = loadDataset()

trainX = np.where(np.isnan(trainX), np.nanmean(trainX, axis=0), trainX)
trainy = np.where(np.isnan(trainy), np.nanmean(trainy, axis=0), trainy)
testX = np.where(np.isnan(testX), np.nanmean(testX, axis=0), testX)
testy = np.where(np.isnan(testy), np.nanmean(testy, axis=0), testy)

print(trainX.shape, trainy.shape, testX.shape, testy.shape)
print(np.isnan(trainX).sum())
print(np.isnan(trainy).sum())
print(np.isnan(testX).sum())
print(np.isnan(testy).sum())

In [None]:
#plot
def history_plot(history):
  pyplot.xlabel('Epoch')
  pyplot.ylabel('value')
  pyplot.plot(history.history['loss'], label = 'Tranning loss')
  pyplot.plot(history.history['val_loss'], label = 'Validation loss')
  pyplot.plot(history.history['accuracy'], label ='Traning accuracy')
  pyplot.plot(history.history['val_accuracy'], label = 'validation accuracy')
  pyplot.legend()
  pyplot.show()

In [None]:
import seaborn as sns
def CM_plot(testX, testy, model):
  labels = ["Sitting", "Standing", " Jumping", " Rotate Left"]
  y_pred = model.predict(testX, verbose = 1)
  y_pred_binary = (y_pred > 0.5).astype(int)
  y_pred_binary = np.argmax(y_pred_binary, axis=1)
  y_true = np.argmax(testy, axis=1)
  cm = confusion_matrix(y_true, y_pred_binary)

  plt.figure(figsize=(5, 5))
  sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", cbar=False,
              xticklabels=labels, yticklabels= labels)
  plt.xlabel("Predicted Labels")
  plt.ylabel("True Labels")
  plt.title("Confusion Matrix")
  plt.show()
  print(classification_report(y_true,y_pred_binary))

In [None]:
def ROC_plot(model):
  labels = ["Sitting", "Standing", " Jumping", " Rotate Left"]
  y_pred = model.predict(testX, verbose = 1)
  y_true = np.argmax(testy, axis=1)
  y_true_one_hot = label_binarize(y_true, classes=np.unique(y_true))
  fpr = dict()
  tpr = dict()
  roc_auc = dict()
  for i in range(4):
      fpr[i], tpr[i], _ = roc_curve(y_true_one_hot[:, i], y_pred[:, i])
      roc_auc[i] = auc(fpr[i], tpr[i])

  # Vẽ đường ROC cho từng lớp
  plt.figure(figsize=(6, 6))
  for i in range(4):
      plt.plot(fpr[i], tpr[i], label=f'Class {labels[i]} (AUC = {roc_auc[i]:.2f})')

  plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
  plt.xlabel('False Positive Rate')
  plt.ylabel('True Positive Rate')
  plt.title('Receiver Operating Characteristic (ROC) for each class')
  plt.legend(loc='lower right')
  plt.show()

In [None]:
#epoch: 50, 100, 150
#drop out: 0.3, 0.5, 0.7
#optimizer: Adam, RMSProp, AdaGrad
#kernel_size: (3), (5)
import keras
num_classes = len(np.unique(trainy))
def make_model(input_shape,epochs = 50, kernel_size = 3, drop_out = 0.5, optimizer = "adam", batch_size = 16):
    input_layer = keras.layers.Input(input_shape)

    conv1 = keras.layers.Conv1D(filters=32, kernel_size=kernel_size)(input_layer)
    #conv1 = keras.layers.BatchNormalization()(conv1)
    conv1 = keras.layers.ReLU()(conv1)
    conv1 = keras.layers.Dropout(drop_out/2)(conv1)

    conv2 = keras.layers.Conv1D(filters=32, kernel_size=kernel_size)(conv1)
    #conv2 = keras.layers.BatchNormalization()(conv2)
    conv2 = keras.layers.ReLU()(conv2)
    conv2 = keras.layers.MaxPooling1D(pool_size=4)(conv2)


    conv3 = keras.layers.Conv1D(filters=32, kernel_size=kernel_size)(conv2)
    #conv3 = keras.layers.BatchNormalization()(conv3)
    conv3 = keras.layers.ReLU()(conv3)
    conv3 = keras.layers.Dropout(drop_out/2)(conv3)

    conv4 = keras.layers.Conv1D(filters=32, kernel_size=kernel_size)(conv3)
    #conv4 = keras.layers.BatchNormalization()(conv4)
    conv4 = keras.layers.ReLU()(conv4)
    conv4 = keras.layers.MaxPooling1D(pool_size=4)(conv4)

    flatten = keras.layers.Flatten()(conv4)
    dense = keras.layers.Dense(100, activation="relu")(flatten)
    #gap = keras.layers.GlobalAveragePooling1D()(conv3)

    output_layer = keras.layers.Dense(4, activation="softmax")(dense)

    model = keras.models.Model(inputs=input_layer, outputs=output_layer)

    keras.utils.plot_model(model, show_shapes=True)

    callbacks = [
    keras.callbacks.ModelCheckpoint( "best_model.keras", save_best_only=True, monitor="val_loss"),
    keras.callbacks.ReduceLROnPlateau( monitor="val_loss", factor=0.5, patience=20, min_lr=0.0001),
    keras.callbacks.EarlyStopping(monitor="val_loss", patience=50, verbose=1),]

    model.compile(
        optimizer = optimizer,
        loss="categorical_crossentropy",
        metrics=["accuracy"])

    history = model.fit(
        trainX,
        trainy,
        batch_size=batch_size,
        epochs=epochs,
        callbacks=callbacks,
        validation_split=0.1,
        verbose=1,)
    return model, history

In [None]:
def run_experiments(trainX, trainy, testX, testy):
  repeats = 10
  epochs = [50, 100, 150]
  drop_out = [0.3, 0.5, 0.7]
  optimizer= ["Adam", "RMSProp", "AdaGrad"]
  kernel_size = [3, 5, 7]
  scores = list()
  for r in range(repeats):
    print(f"Times {r+1}")
    print(f"Model with {epochs[2]} epochs, {kernel_size[0]} kernel size, {drop_out[1]} dropout, {optimizer[0]} optimizer.")
    model, history = make_model(input_shape=trainX.shape[1:],
                                epochs = epochs[2],
                                kernel_size = kernel_size[0],
                                drop_out = drop_out[1],
                                optimizer = optimizer[0])
    '''keras.utils.plot_model(model,
                           to_file='model.png',
                           show_shapes=True,
                           show_dtype=True,
                           show_layer_names=True,
                           rankdir='TB',
                           expand_nested=True,
                           dpi=96)'''
    loss_test , accuracy_test = model.evaluate(testX, testy, batch_size = 1, verbose=1)
    print(f"Test: Accuracy = {accuracy_test}, Lost = {loss_test}")
    score = accuracy_test * 100.0
    print('>Times %d: %.3f' % (r+1, score))
    scores.append(score)
    history_plot(history)
    CM_plot(testX, testy, model)
    ROC_plot(model)
  # summarize results
  summarize_results(scores)
  return model, history
model, history = run_experiments(trainX, trainy, testX, testy)

In [None]:
model.summary()

In [None]:
import tensorflow as tf

def representative_data_gen():
  for input_value in tf.data.Dataset.from_tensor_slices(trainX).batch(1).take(2000):
    input_value = tf.reshape(input_value, (1, 120, 6))
    input_value = tf.dtypes.cast(input_value, tf.float32)
    yield [input_value]

converter = tf.lite.TFLiteConverter.from_keras_model(model)

converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = representative_data_gen
# Ensure that if any ops can't be quantized, the converter throws an error
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
# Set the input and output tensors to uint8 (APIs added in r2.3)
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8

tflite_model_quant = converter.convert()
# Save the model to disk
open('model_quantized.tflite', "wb").write(tflite_model_quant)


import os
basic_model_size = os.path.getsize('model_quantized.tflite')
print("Model is %d bytes" % basic_model_size)

interpreter = tf.lite.Interpreter(model_content=tflite_model_quant)
input_type = interpreter.get_input_details()[0]['dtype']
print('input: ', input_type)
output_type = interpreter.get_output_details()[0]['dtype']
print('output: ', output_type)

tf.lite.experimental.Analyzer.analyze(model_path='model_quantized.tflite')

In [None]:
!echo "const unsigned char model[] = {" > /content/model.h
!cat model_quantized.tflite | xxd -i              >> /content/model.h
!echo "};"                              >> /content/model.h

In [None]:
model_h_size = os.path.getsize("model.h")
print(f"Header file, model.h, is {model_h_size:,} bytes.")
print("\nOpen the side panel (refresh if needed). Double click model.h to download the file.")