In [None]:
import tensorflow as tf
from tensorflow.keras.regularizers import l2
from tensorflow.keras.layers import Dense, LSTM, Bidirectional, Dropout, Reshape
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import Input

from numpy import load

import matplotlib.pyplot as plt

In [None]:
# mount Google drive

from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
# google colab file paths

CSV_SEQUENCES = "/content/gdrive/MyDrive/Colab Notebooks/iot_device_classification/csv_sequences"
NPZ_WINDOWS = "/content/gdrive/MyDrive/Colab Notebooks/iot_device_classification/npz_windows"
MODELS = "/content/gdrive/MyDrive/Colab Notebooks/iot_device_classification/models"

In [None]:
# laptop file paths

CSV_SEQUENCES = "C:/work_c/2022-09-20_unsw_dataset_iot_2018/csv_sequences"
NPZ_WINDOWS = "C:/work_c/2022-09-20_unsw_dataset_iot_2018/npz_windows"
MODELS = "C:/work_c/2022-09-20_unsw_dataset_iot_2018/models"

In [None]:
def load_train_val_test_data(npz_file):
    """
    Load training data (windows + one-hot labels) from compressed file. Split data into 3 datasets: train (60%), val (20%), and test (20%).

    Arguments:
        - npz_file: The path to the *.npz file
    Returns:
        x and y for the 3 datasets
    """
    dict_data = load(npz_file)
    x = dict_data['x']
    y = dict_data['y']
    total_length = len(x)
    x_train = x[:int(total_length * 0.6)]
    y_train = y[:int(total_length * 0.6)]
    x_val = x[int(total_length * 0.6) : int(total_length * 0.8)]
    y_val = y[int(total_length * 0.6) : int(total_length * 0.8)]
    x_test = x[int(total_length * 0.8):]
    y_test = y[int(total_length * 0.8):]
    return (x_train, y_train, x_val, y_val, x_test, y_test)

In [None]:
# test load_data_make_split()

x_train, y_train, x_val, y_val, x_test, y_test = load_train_val_test_data("{}/all_days_all_devices.npz".format(NPZ_WINDOWS))
print("shape of train windws: {}".format(x_train.shape))
print("shape of train labels: {}".format(y_train.shape))
print("shape of test windows: {}".format(x_val.shape))
print("shape of test labels: {}".format(y_val.shape))
print("shape of test windows: {}".format(x_test.shape))
print("shape of test labels: {}".format(y_test.shape))
print("First train window, part:")
print(x_train[0][:3])
print("First train labels:")
print(y_train[:3])
print("Data type for train windows: {}".format(x_train.dtype))
print("Data type for train labels: {}".format(y_train.dtype))

In [None]:

# define model

CLASSES=28
PATIENCE=1
EPOCHS=1
BATCH_SIZE = 128
DROPOUT = 0.2
LEARNING_RATE=0.0001
REGULARIZATION = 0.001

model = tf.keras.models.Sequential([
    Input(shape=x_train[0].shape, dtype = tf.float32),
    Bidirectional(LSTM(64, return_sequences=True, kernel_regularizer=l2(REGULARIZATION))),
    Bidirectional(LSTM(64, return_sequences=False, kernel_regularizer=l2(REGULARIZATION))),
    Dense(units=128, activation = 'relu', kernel_regularizer=l2(REGULARIZATION)),
    Dropout(DROPOUT),
    Dense(units=CLASSES, activation = 'softmax', kernel_regularizer=l2(REGULARIZATION)),
    Reshape([1, -1]),
])

early_stopping = EarlyStopping(monitor='val_loss',
                               patience=PATIENCE,
                               mode='min')

checkpoint = ModelCheckpoint("{}/all_days_all_devices".format(MODELS), monitor='val_loss', verbose=0,
                                    save_best_only=True, mode='min')

model.compile(loss='categorical_crossentropy',
                   optimizer=Adam(learning_rate = LEARNING_RATE),
                   metrics=['accuracy'])



In [None]:
# do training

history = model.fit(x_train, y_train, epochs=EPOCHS,
                    validation_data=(x_val,y_val),
                    callbacks=[early_stopping, checkpoint],
                    batch_size = BATCH_SIZE)

In [None]:
# plot training progress

plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Stacked Bi-LSTM training')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['train', 'test'], loc='lower right')
plt.show()

In [None]:
# get test accuracy

accuracy_test = model.evaluate(x_test,y_test)