In [None]:
from typing import Optional, Tuple, Dict, Iterator
from functools import partial
import pathlib
import random
import logging
logging.disable(logging.WARNING)
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split

import tensorflow as tf
from keras import Model
from keras.utils import to_categorical
from keras.layers import (
    Input,
    Conv1D,
    BatchNormalization,
    Activation,
    MaxPooling1D,
    Flatten,
    Dense,
    Dropout,
    Softmax,
)
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ModelCheckpoint, History

In [None]:
DATA_PATH = pathlib.Path("data")
OUTPUT_PATH = pathlib.Path("output_ipynb")

In [None]:
# Set random seeds for reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

In [None]:
def load_dataset(
    data_path: str,
    win_len: int,
    win_stride: int = 1,
) -> Tuple[np.ndarray, np.ndarray, Dict[int, str]]:
    """Read CSV logs, discard columns, and segment data into windows.

    Args:
        data_path (str): Dataset path.
        win_len (int): Window length (#samples).
        win_stride (int): Window stride (#samples). Defaults to 1.

    Returns:
        Tuple[np.ndarray, np.ndarray, Dict[int, str]]:
            Features, labels, labels_dict.
    """
    X = []
    y = []
    labels_dict: Dict[int, str] = {}
    for log_path in pathlib.Path(data_path).rglob("*.csv"):
        data_frame = pd.read_csv(log_path, sep=",", header=2, dtype=np.float32)
        label, activity = log_path.parent.name.split("_", maxsplit=1)
        label = int(label)
        labels_dict[label] = activity
        win_list = []
        idx = 0
        while idx < len(data_frame) - win_len:
            win_list.append(data_frame.values[idx : idx + win_len])
            idx += win_stride
        X += win_list
        y += [label] * len(win_list)
    return (np.array(X), to_categorical(y), labels_dict)


def CNN_model(
    inp_shape: Tuple[int, int],
    out_shape: int,
    hidden: int,
    filters: int,
    kernel_size: int,
    pool_size: int,
    dropout: Optional[float] = None,
    learning_rate: float = 1e-3,
) -> Model:
    """Create Convolutional Neural Network (CNN) using 1D layers.

    Args:
        inp_shape (Tuple[int, int]): Input shape (#window_length, #channels).
        out_shape (int): Output shape (#classes).
        hidden (int): Number of hidden layers.
        filters (int): Number of convolutional filters in each hidden layer.
        kernel_size (int): Size of kernel in each convolutional layer.
        pool_size (int): Pool size of max pooling layer.
        dropout (float | None): Dropout rate in final dense layer.
            Defaults to None.
        learning_rate (float): Adam optimizer learning rate. Defaults to 1e-3.

    Returns:
        Model: 1D convolutional Neural Network.
    """
    x = Input(shape=inp_shape, name="input")
    model_input = x
    for i in range(hidden):
        x = Conv1D(
            filters=filters,
            kernel_size=kernel_size,
            padding="same",
            activation=None,
            kernel_regularizer="l2",
            name=f"conv_hidden_{i}",
        )(x)
        x = BatchNormalization(scale=True)(x)
        x = Activation("relu")(x)
        x = MaxPooling1D(pool_size=pool_size)(x)
    x = Flatten()(x)
    x = Dense(
        units=out_shape,
        activation=None,
        kernel_regularizer="l2",
        name="dense_out",
    )(x)
    if dropout is not None:
        x = Dropout(rate=dropout)(x)
    model_output = Softmax(name="output")(x)
    model = Model(
        model_input,
        model_output,
        name="cnn_" + "x".join([str(filters)] * hidden),
    )
    model.compile(
        optimizer=Adam(learning_rate),
        loss="categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model


def plot_model_history(model: Model, history: History) -> None:
    """Plot model training history.

    Args:
        model (Model): Keras model instance.
        history (History): Training history instance.
    """
    _, axes = plt.subplots(1, 2, figsize=(12, 4), sharex=True)
    for metric, ax in zip(["accuracy", "loss"], axes):
        ax.set_title(f"{model.name} {metric}")
        ax.plot(history.history[f"{metric}"])
        ax.plot(history.history[f"val_{metric}"])
        ax.set_ylabel(f"{metric}")
        ax.set_xlabel("epoch")
        ax.legend(["train", "validation"], loc="upper left")


def plot_confusion_matrix(
    y_test: np.ndarray,
    y_pred: np.ndarray,
    labels_dict: Dict[int, str],
    title: Optional[str] = None,
) -> None:
    """Plot confusion matrix with given class names.

    Args:
        y_test (np.ndarray): True labels.
        y_pred (np.ndarray): Predicted labels.
        labels_dict (Dict[int, str]): Class value to semantic label mapping.
        title (str | None): Plot figure title. Defaults to None.
    """
    _, ax = plt.subplots(1, 1, figsize=(6, 5))
    ax = plt.subplot(1, 1, 1)
    if title is None:
        title = "Confusion Matrix"
    ax.set_title(title)
    cm = confusion_matrix(y_test, y_pred, normalize="true")
    labels = list(labels_dict.values())[:len(cm)]
    df_cm = pd.DataFrame(cm, labels, labels)
    sns.heatmap(df_cm, ax=ax, annot=True, annot_kws={"size": 12}, fmt=".2f")
    plt.show()

In [None]:
# Load data from CSV logs and segment signals into 2s windows without overlap
X, y, LABELS_DICT = load_dataset(DATA_PATH, win_len=52, win_stride=52)

In [None]:
# Split dataset into training set and testing set
X_train, X_test, y_train, y_test = train_test_split(
    X, y, stratify=y, test_size=0.3)

# Split dataset into training set and validation set
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train, y_train, stratify=y_train, test_size=0.1)

print(f"Training set:   {len(y_train):>4} samples")
print(f"Validation set: {len(y_valid):>4} samples")
print(f"Testing set:    {len(y_test):>4} samples")

In [None]:
# Create instance of 1D-CNN model with 3 hidden layers
model = CNN_model(
    inp_shape=X.shape[1:],
    out_shape=y.shape[1],
    hidden=3,
    filters=8,
    kernel_size=3,
    pool_size=2,
    dropout=0.1,
    learning_rate=1e-3,
)

model.summary()

In [None]:
keras_model = OUTPUT_PATH / f"{model.name}.h5"

# Start model training using validation set to track progress
history = model.fit(
    X_train, y_train,
    validation_data=(X_valid, y_valid),
    epochs=100,
    batch_size=32,
    callbacks=[
        EarlyStopping(
            monitor="val_loss",
            verbose=1,
            patience=20,
            restore_best_weights=True,
        ),
        ModelCheckpoint(
            filepath=str(keras_model),
            patience=20,
            monitor="val_loss",
            save_freq="epoch",
            save_best_only=True,
        ),
    ],
)

# Load best model weights
model.load_weights(keras_model)

In [None]:
plot_model_history(model, history)

In [None]:
loss, acc = model.evaluate(X_test, y_test)
print(f"\nPrediction accuracy: {acc:.02%}")

In [None]:
# Obtain predictions from model
y_pred = model.predict(X_test)
plot_confusion_matrix(
    np.argmax(y_test, axis=1),
    np.argmax(y_pred, axis=1),
    LABELS_DICT,
    f"Confusion matrix: {model.name}",
)

In [None]:
def get_representative_data(X: np.ndarray) -> Iterator[tf.lite.RepresentativeDataset]:
    """Create iterator for TFLite API to estimate dynamic range from data.

    Args:
        X (np.ndarray): Feature dataset.

    Yields:
        Iterator[tf.lite.RepresentativeDataset]: Dataset iterator.
    """
    for val in tf.data.Dataset.from_tensor_slices(X).batch(1).take(100):
        yield [val]


def run_tflite_model(interpreter: tf.lite.Interpreter, X: np.ndarray) -> np.ndarray:
    """Helper function to run TFLite model inference and get predictions.

    Args:
        interpreter (tf.lite.Interpreter): TFLite interpreter.
        X (np.ndarray): Feature dataset.

    Returns:
        np.ndarray: Model predictions.
    """
    interpreter.allocate_tensors()
    input_details = interpreter.get_input_details()[0]
    output_details = interpreter.get_output_details()[0]
    predictions = np.zeros((len(X),), dtype=int)
    for i, x in enumerate(X):
        x = np.expand_dims(x, axis=0).astype(input_details["dtype"])
        interpreter.set_tensor(input_details["index"], x)
        interpreter.invoke()
        output = interpreter.get_tensor(output_details["index"])[0]
        predictions[i] = output.argmax()
    return predictions

In [None]:
# Create TFLite converter
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = partial(get_representative_data, X_train)

# Ensure that if any ops can't be quantized, the converter throws an error
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]

# Set the input and output tensors to float32
converter.inference_input_type = tf.float32
converter.inference_output_type = tf.float32
tflite_model_quant = converter.convert()
interpreter = tf.lite.Interpreter(model_content=tflite_model_quant)

In [None]:
# Obtain predictions from the quantized model
y_pred = run_tflite_model(interpreter, X_test)
acc = np.sum(y_pred == np.argmax(y_test, axis=1)) / len(y_pred)
print(f"Prediction accuracy: {acc:.02%}")

In [None]:
plot_confusion_matrix(
    np.argmax(y_test, axis=1),
    y_pred,
    LABELS_DICT,
    f"Confusion matrix: q{model.name}",
)

In [None]:
# Save the quantized model
tflite_model = OUTPUT_PATH / f"q{model.name}.tflite"
tflite_model.write_bytes(tflite_model_quant)

In [None]:
# Save validation data
testset = OUTPUT_PATH / "har_testset.npz"
os.makedirs(OUTPUT_PATH, exist_ok=True)
np.savez(testset, m_inputs=X_test, m_outputs=y_test)