Tested with `keras==3.1.1`, `tensorflow==2.16.1`, `numpy==1.26.4`, `scikit-learn==1.5.2`, `pandas==2.2.2`

In [None]:
dataset_name = ["songs", "covertype", "nids"][0]

In [None]:
import numpy as np
import pandas as pd
import sklearn.datasets
import sklearn.preprocessing

if dataset_name == "songs":
    # https://www.kaggle.com/datasets/amitanshjoshi/spotify-1million-tracks/data
    full_dataset = pd.read_csv("data/spotify_data.csv", index_col=0)
    full_dataset.rename(columns={"genre": "label"}, inplace=True)
    full_dataset["label"] = sklearn.preprocessing.LabelEncoder().fit_transform(full_dataset["label"])
    X = full_dataset[["danceability", "energy", "loudness", "speechiness", "acousticness", "instrumentalness", "liveness", "valence", "tempo", "duration_ms", "time_signature"]]
    y = full_dataset["label"]
elif dataset_name == "covertype":
    # https://archive.ics.uci.edu/dataset/31/covertype
    full_dataset = sklearn.datasets.fetch_covtype(as_frame=True).frame
    full_dataset.rename(columns={"Cover_Type": "label"}, inplace=True)
    full_dataset["label"] = full_dataset["label"] - 1
    X = full_dataset.drop(columns=["label"])
    y = full_dataset["label"]
elif dataset_name == "nids":
    # https://rdm.uq.edu.au/files/2ad93cd0-ef9c-11ed-827d-e762de186848
    # https://staff.itee.uq.edu.au/marius/NIDS_datasets/ 
    full_dataset = pd.read_csv("data/NF-ToN-IoT.csv").drop(columns=["IPV4_SRC_ADDR", "IPV4_DST_ADDR", "Label"])
    full_dataset.rename(columns={"Attack": "label"}, inplace=True)
    X = full_dataset.drop(columns=["label"])
    le = sklearn.preprocessing.LabelEncoder()
    y = le.fit_transform(full_dataset["label"])
else:
    raise ValueError("Invalid dataset name")

value_counts = full_dataset["label"].value_counts()
frequecies = value_counts / value_counts.sum()
entropy = -np.sum(frequecies * np.log2(frequecies))
classes = len(value_counts)
print(f"Classes: {classes}")
print(f"Entropy: {entropy:.4f}")

model_path = f"models/{dataset_name}_models/"
full_dataset["label"].value_counts()#, full_dataset.info()

# Data preparation

In [None]:
from sklearn.model_selection import train_test_split
import struct

sc = sklearn.preprocessing.StandardScaler()
#sc = sklearn.preprocessing.MinMaxScaler()
Xt = sc.fit_transform(X).astype("float32")
X_train, X_test, y_train, y_test = train_test_split(Xt, y, test_size=0.2, random_state=42, stratify=y)

with open(f"data/{dataset_name}_X.lrbin", "wb") as f:
    f.write(struct.pack("<Q", Xt.shape[0]))
    f.write(struct.pack("<Q", Xt.shape[1]))
    Xt.tofile(f)

np.insert(y, 0, classes).astype(np.uint16).tofile(f"data/{dataset_name}_y.lrbin")

# Model training

In [None]:
import tensorflow as tf

tf.config.set_visible_devices([], "GPU")

import keras
import time
import pickle
import os

keras.utils.set_random_seed(42)

hidden_units = 50
num_layers = 1
model_name = f"{dataset_name}_mlp_L{num_layers}_H{hidden_units}"
filename = f"{model_name}_{time.strftime('%Y%m%d-%H%M%S')}"

model = keras.models.Sequential([
    keras.layers.Input(shape=(X_train.shape[1],), name="input"),
    *[keras.layers.Dense(hidden_units, activation="relu") for _ in range(num_layers)],
    keras.layers.Dense(classes, activation="softmax"),
])

es = keras.callbacks.EarlyStopping(
    monitor="val_sparse_categorical_accuracy",
    verbose=1,
    patience=5,
    restore_best_weights=True,
)

model.compile(
    loss="sparse_categorical_crossentropy",
    optimizer=keras.optimizers.Adam(learning_rate=1e-4),
    metrics=[
        keras.metrics.SparseCategoricalAccuracy(),
        keras.metrics.SparseTopKCategoricalAccuracy(3),
    ],
)

print(filename)
model.summary()

train_start = time.perf_counter()
history = model.fit(
    X_train,
    y_train,
    epochs=200,
    batch_size=128,
    verbose=2,
    validation_split=0.1,
    callbacks=[es],
)
train_end = time.perf_counter()
print(f"Training time: {train_end - train_start} s")

os.makedirs(os.path.dirname(model_path), exist_ok=True)

model.save(f"{model_path}/{filename}.keras")
with open(f"{model_path}/{filename}_history.pkl", "wb") as f:
    pickle.dump(history.history | {"training_seconds": train_end - train_start}, f)

# Model evaluation

In [None]:
eval_start = time.perf_counter()
benchmarks = model.evaluate(X_test, y_test, return_dict=True, verbose=0)
eval_end = time.perf_counter()

print(benchmarks)
print(f"{1_000_000 * (eval_end - eval_start) / len(y_test)} µs/sample")

with open(f"{model_path}/{filename}_eval.pkl", "wb") as f:
    pickle.dump(benchmarks, f)

In [None]:
model_entropy = 0
correct_predictions = 0
all_probs = np.concatenate([model.predict(X_test, verbose=0), model.predict(X_train, verbose=0)], axis=0)
all_labels = np.concatenate([y_test, y_train], axis=0)
for freqs, label in zip(all_probs, all_labels):
    correct_predictions += 1 if label == freqs.argmax() else 0
    model_entropy -= np.log2(freqs[label])
model_entropy /= len(full_dataset)

print("Overall accuracy:", correct_predictions / len(full_dataset))
print("Model entropy:", model_entropy)
print("Model params:", model.count_params())
print("Model bytes:", os.path.getsize(f"{model_path}/{filename}.keras"))

# TFLite

In [None]:
import tensorflow as tf
import keras
from tensorflow.lite.python import interpreter

def representative_dataset():
  for x in X_train[:1000]:
    yield {"input": x}

quantized_modes = []

for quantization in [None, "uint8", "float16"]:
    print("#" * 35, f"{quantization} quantization", "#" * 35)
    tf.config.set_visible_devices([], "GPU")
    model = keras.saving.load_model(f"{model_path}/{filename}.keras", compile=False)
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS]
    if quantization:
        converter.optimizations = [tf.lite.Optimize.DEFAULT]
        if quantization == "uint8":
            converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
            converter.representative_dataset = representative_dataset
            converter.inference_input_type = tf.float32
            converter.inference_output_type = tf.uint8
        elif quantization == "float16":
            converter.target_spec.supported_types = [tf.float16]
        else:
            raise ValueError("Invalid quantization type")
        
    tflite_model = converter.convert()
    suffix = f"_{quantization}" if quantization else ""
    with open(f"{model_path}/{filename}{suffix}.tflite", "wb") as f:
        f.write(tflite_model)
    quantized_modes.append(tflite_model)

for quantization, tflite_model in zip(["none", "uint8", "float16"], quantized_modes):
    interp = interpreter.InterpreterWithCustomOps(model_content=tflite_model)
    input_details = interp.get_input_details()
    output_details = interp.get_output_details()
    input_type = input_details[0]['dtype']
    tflite_X = X_train
    # if input_type == np.uint8:
    #     input_scale, input_zero_point = input_details[0]['quantization']
    #     print(f"Input scale: {input_scale}, zero point: {input_zero_point}")
    #     tflite_X = np.around((X_train / input_scale) + input_zero_point).astype(np.uint8)

    interp.allocate_tensors()
    model_runner = interp.get_signature_runner("serving_default")
    t0 = time.perf_counter()
    tflite_y = model_runner(input=tflite_X)
    t1 = time.perf_counter()
    tflite_y = list(tflite_y.values())[0]
    accuracy = keras.metrics.sparse_categorical_accuracy(y_train, tflite_y).numpy().mean()
    print(f"{quantization} quantization bytes: {len(tflite_model)}")
    print(f"{quantization} quantization µs/prediction: {1_000_000 * (t1 - t0) / X_train.shape[0]}")
    print(f"{quantization} quantization accuracy: {accuracy}")