In [1]:
import os
import urllib.request
import zipfile

import micronas
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.utils import to_categorical

2025-12-26 03:12:41.215747: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
import warnings

warnings.filterwarnings("ignore")

# Loading AReM dataset

In [3]:
url = "https://cdn.uci-ics-mlr-prod.aws.uci.edu/366/activity%2Brecognition%2Bsystem%2Bbased%2Bon%2Bmultisensor%2Bdata%2Bfusion%2Barem.zip"
zip_path = "AReM.zip"
root_dir = "AReM"

if not os.path.exists(zip_path):
    urllib.request.urlretrieve(url, zip_path)

if not os.path.exists(root_dir):
    with zipfile.ZipFile(zip_path, "r") as z:
        z.extractall(root_dir)

data_dir = os.path.join(root_dir, "")

activities = sorted(
    d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))
)
label_map = {name: i for i, name in enumerate(activities)}

X = []
y = []

for act in activities:
    act_dir = os.path.join(data_dir, act)
    for f in sorted(os.listdir(act_dir)):
        if f.endswith(".csv"):
            path = os.path.join(act_dir, f)
            df = pd.read_csv(path, skiprows=4, on_bad_lines="skip")
            # drop time column
            signal = df.iloc[:, 1:].to_numpy(dtype=np.float32)

            X.append(signal)
            y.append(np.full((signal.shape[0], 1), label_map[act]))

X = np.vstack(X)
y = np.vstack(y)
X.shape, y.shape

((42237, 6), (42237, 1))

# Sliding Window

In [4]:
def sliding_window(X, y, window_size=50, step=25, num_classes=None):
    """
    X: (N,6)
    y: (N,)
    Returns:
        X_win: (num_windows, window_size, 6)
        y_win: (num_windows,)
    Label for window = mode of y in window
    """
    from scipy.stats import mode

    N = X.shape[0]
    X_win, y_win = [], []

    for start in range(0, N - window_size + 1, step):
        end = start + window_size
        X_win.append(X[start:end])
        y_win.append(mode(y[start:end])[0][0])

    if num_classes is None:
        num_classes = np.max(y) + 1

    y_onehot = to_categorical(y_win, num_classes=num_classes)
    return np.stack(X_win), y_onehot

In [5]:
X_windows, y_windows = sliding_window(X, y, window_size=50, step=25)

X_train, X_test, y_train, y_test = train_test_split(
    X_windows, y_windows, test_size=0.2, random_state=0, stratify=y_windows
)

X_train, X_valid, y_train, y_valid = train_test_split(
    X_train, y_train, test_size=0.1875, random_state=0, stratify=y_train
)

X_train.shape, y_train.shape, X_valid.shape, y_valid.shape, X_test.shape, y_test.shape

((1096, 50, 6), (1096, 7), (254, 50, 6), (254, 7), (338, 50, 6), (338, 7))

# Computing Class Weight

In [6]:
labels = y_train.argmax(1).flatten()
labels_unique = np.unique(labels)
class_weights = class_weight.compute_class_weight(
    "balanced", classes=labels_unique, y=labels
)
class_weights = dict(zip(labels_unique, class_weights))
class_weights

{0: 1.799671592775041,
 1: 2.115830115830116,
 2: 0.8372803666921314,
 3: 0.8372803666921314,
 4: 0.8372803666921314,
 5: 0.8372803666921314,
 6: 0.8372803666921314}

# Defining the NAS

In [7]:
@tf.keras.utils.register_keras_serializable(package="ConditionalPoolingLayer1D")
class ConditionalPoolingLayer1D(layers.Layer):
    def __init__(self, pool_size=2, pool_type="max", **kwargs):
        super().__init__(**kwargs)
        self.pool_size = pool_size
        self.pool_type = pool_type

        if pool_type == "max":
            self.pooling_layer = layers.MaxPooling1D(pool_size=pool_size)
        else:
            self.pooling_layer = layers.AveragePooling1D(pool_size=pool_size)

    def call(self, inputs):
        input_shape = tf.shape(inputs)[1]
        output_length = input_shape // self.pool_size

        return tf.cond(
            output_length > 0, lambda: self.pooling_layer(inputs), lambda: inputs
        )

    def get_config(self):
        config = super().get_config()
        config.update(
            {
                "pool_size": self.pool_size,
                "pool_type": self.pool_type,
            }
        )
        return config

In [8]:
class IMUModel(micronas.HyperModel):
    def build(self, hp):
        time_domain = keras.Sequential()

        time_domain.add(keras.Input(shape=X_train.shape[1:], name="input"))

        time_domain.add(layers.BatchNormalization())

        for Block in range(3):
            if hp.Boolean("Block%d" % Block, True):
                time_domain.add(
                    layers.Conv1D(
                        hp.Int("conv%d" % Block, min_value=1, max_value=500, step=10),
                        kernel_size=(
                            hp.Int(
                                "kernel_size%d" % Block,
                                min_value=1,
                                max_value=8,
                                step=1,
                            ),
                        ),
                        activation=hp.Choice(
                            "activation%d" % Block, ["relu", "tanh", "sigmoid"]
                        ),
                        padding=hp.Choice("padding%d" % Block, ["same", "valid"]),
                        strides=hp.Int(
                            "stride%d" % Block, min_value=1, max_value=2, step=1
                        ),
                    )
                )
                if hp.Boolean("Block%d_BN" % Block, True):
                    time_domain.add(layers.BatchNormalization())
                if hp.Boolean("Block%d_pooling" % Block, True):
                    if hp.Boolean("Block%d_MaxPooling" % Block, True):
                        time_domain.add(
                            ConditionalPoolingLayer1D(
                                hp.Choice("pool%d" % Block, [2]), pool_type="max"
                            )
                        )
                    else:
                        time_domain.add(
                            ConditionalPoolingLayer1D(
                                hp.Choice("pool%d" % Block, [2]), pool_type="average"
                            )
                        )

        # prefinal Block
        if hp.Boolean("prefinal_Global", True):
            time_domain.add(layers.GlobalAveragePooling1D())
        if hp.Boolean("prefinal_Dropout", True):
            time_domain.add(
                layers.Dropout(hp.Float("dropout", min_value=0.4, max_value=0.5))
            )
        time_domain.add(layers.Flatten(name="flatten"))

        # Final Block
        for Block in range(3):
            if hp.Boolean("finalBlock%d" % Block, True):
                if hp.Boolean("finalBlock%d_BN" % Block, True):
                    time_domain.add(layers.BatchNormalization())
                if hp.Boolean("finalBlock%d_dense" % Block, True):
                    time_domain.add(
                        layers.Dense(
                            hp.Int(
                                "dense_size%d" % Block,
                                min_value=3,
                                max_value=1024,
                                step=20,
                            ),
                            activation=hp.Choice(
                                "finalactivation%d" % Block, ["relu", "tanh", "sigmoid"]
                            ),
                        )
                    )
                if hp.Boolean("final_Dropout%d" % Block, True):
                    time_domain.add(
                        layers.Dropout(
                            hp.Float(
                                "final_dropout%d" % Block, min_value=0.4, max_value=0.5
                            )
                        )
                    )
        time_domain.add(
            layers.Dense(
                y_train.shape[1],
                activation="softmax",
                kernel_regularizer=keras.regularizers.l1(
                    hp.Float("l1", min_value=1e-5, max_value=1e-3, step=1e-6)
                ),
            )
        )

        learning_rate = hp.Float("lr", min_value=1e-6, max_value=1e-2, sampling="log")
        time_domain.compile(
            optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
            loss="categorical_crossentropy",
            metrics=[
                "accuracy",
                tf.keras.metrics.Precision(name="precision"),
                tf.keras.metrics.Recall(name="recall"),
                tf.keras.metrics.AUC(name="auc", curve="PR"),
            ],
        )
        return time_domain

    def fit(self, hp, model, *args, **kwargs):
        hist = model.fit(*args, **kwargs)

        model_score = np.mean(hist.history["val_auc"])
        objective = model_score
        hist.history["objective"] = objective
        return hist.history

In [9]:
tuner_time = micronas.RandomSearch(
    IMUModel(),
    objective=micronas.Objective("objective", direction="max"),
    max_trials=1,
    overwrite=True,
    max_model_size=8000,
    max_consecutive_failed_trials=float("inf"),
)
tuner_time.search_space_summary()

2025-12-26 03:12:43.202265: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.


Search space summary
Default search space size: 56
Block0 (Boolean)
{'default': True, 'conditions': []}
conv0 (Int)
{'default': None, 'conditions': [], 'min_value': 1, 'max_value': 500, 'step': 10, 'sampling': 'linear'}
kernel_size0 (Int)
{'default': None, 'conditions': [], 'min_value': 1, 'max_value': 8, 'step': 1, 'sampling': 'linear'}
activation0 (Choice)
{'default': 'relu', 'conditions': [], 'values': ['relu', 'tanh', 'sigmoid'], 'ordered': False}
padding0 (Choice)
{'default': 'same', 'conditions': [], 'values': ['same', 'valid'], 'ordered': False}
stride0 (Int)
{'default': None, 'conditions': [], 'min_value': 1, 'max_value': 2, 'step': 1, 'sampling': 'linear'}
Block0_BN (Boolean)
{'default': True, 'conditions': []}
Block0_pooling (Boolean)
{'default': True, 'conditions': []}
Block0_MaxPooling (Boolean)
{'default': True, 'conditions': []}
pool0 (Choice)
{'default': 2, 'conditions': [], 'values': [2], 'ordered': True}
Block1 (Boolean)
{'default': True, 'conditions': []}
conv1 (Int)


# Start NAS

In [10]:
callback = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=10)

tuner_time.search_atleast(
    2,
    X_train,
    y_train,
    epochs=5,
    batch_size=512,
    callbacks=[callback],
    class_weight=class_weights,
    validation_data=(X_valid, y_valid),
)

Trial 16 Complete [00h 00m 02s]
objective: 0.0

Best objective So Far: 0.0
Total elapsed time: 00h 00m 11s


# Get Best Model

In [11]:
models = tuner_time.get_best_models(num_models=1)
best_estimator = models[0]

best_estimator.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 batch_normalization (BatchN  (None, 50, 6)            24        
 ormalization)                                                   
                                                                 
 conv1d (Conv1D)             (None, 24, 11)            209       
                                                                 
 batch_normalization_1 (Batc  (None, 24, 11)           44        
 hNormalization)                                                 
                                                                 
 global_average_pooling1d (G  (None, 11)               0         
 lobalAveragePooling1D)                                          
                                                                 
 flatten (Flatten)           (None, 11)                0         
                                                        

# Classification Report

In [12]:
y_pred = best_estimator.predict(X_test).argmax(1)

target_names = [name for name, idx in sorted(label_map.items(), key=lambda x: x[1])]

print(classification_report(y_test.argmax(1), y_pred, target_names=target_names))
print(confusion_matrix(y_test.argmax(1), y_pred))

              precision    recall  f1-score   support

    bending1       0.08      1.00      0.15        27
    bending2       0.00      0.00      0.00        23
     cycling       0.00      0.00      0.00        58
       lying       0.00      0.00      0.00        57
     sitting       0.00      0.00      0.00        58
    standing       0.00      0.00      0.00        58
     walking       0.00      0.00      0.00        57

    accuracy                           0.08       338
   macro avg       0.01      0.14      0.02       338
weighted avg       0.01      0.08      0.01       338

[[27  0  0  0  0  0  0]
 [23  0  0  0  0  0  0]
 [58  0  0  0  0  0  0]
 [57  0  0  0  0  0  0]
 [58  0  0  0  0  0  0]
 [58  0  0  0  0  0  0]
 [57  0  0  0  0  0  0]]


In [13]:
best_estimator.save("AReM_Best.keras")