In [1]:
import tensorflow as tf
from tensorflow.keras import Model
import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings("ignore")

tf.random.set_seed(42)
type_train_ds = tf.data.experimental.load("/kaggle/input/cmi-tf-datasets/type_train_ds")
type_valid_ds = tf.data.experimental.load("/kaggle/input/cmi-tf-datasets/type_valid_ds")
gesture_train_ds = tf.data.experimental.load("/kaggle/input/cmi-tf-datasets/gesture_train_ds")
gesture_valid_ds = tf.data.experimental.load("/kaggle/input/cmi-tf-datasets/gesture_valid_ds")
# test_ds = tf.data.experimental.load("/kaggle/input/cmi-tf-datasets/test_ds")

2025-08-16 07:49:58.880024: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1755330599.097147      36 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1755330599.167498      36 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
I0000 00:00:1755330611.984979      36 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 15513 MB memory:  -> device: 0, name: Tesla P100-PCIE-16GB, pci bus id: 0000:00:04.0, compute capability: 6.0


In [2]:
# print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

In [3]:
print(len(type_train_ds))
print(len(type_valid_ds))
print(len(gesture_train_ds))
print(len(gesture_valid_ds))

# print(len(test_ds))

505
48
505
48


In [4]:
for i, (inputs, targets) in enumerate(type_train_ds):
    if i==0:
        print(inputs[0].shape)
        print(inputs[1].shape)
        print(inputs[2].shape)
        print(targets.shape)
        break

(16, 70, 8, 8, 5)
(16, 70, 19)
(16, 17)
(16, 1)


In [5]:
from tensorflow.keras.layers import Conv3D, Conv1D, BatchNormalization, TimeDistributed
from tensorflow.keras.layers import MaxPool3D, GlobalMaxPool3D, MaxPool1D, Dropout
from tensorflow.keras.layers import GlobalMaxPool1D, Flatten, Dense
from tensorflow.keras.layers import ReLU, ELU, Masking
from tensorflow.keras.regularizers import L1, L2, L1L2
from tensorflow.keras import Sequential

class CNNModel(Model):
    def __init__(self, kernel_size3d=(3, 3, 5), kernel_size1d=3, filters_3d=[16, 32], dropout=0.2, 
                 filters_1d=[16, 32], hidden_size=64, regularizer="l1", l1_penalty=0.1, l2_penalty=0.1, 
                 binary=True, **kwargs):
        super().__init__(**kwargs)
        self.hidden_size = hidden_size
        
        self.conv3d_nets = []
        for i in range(len(filters_3d)):
            conv3d_net = Sequential([
                Conv3D(filters_3d[i], kernel_size3d, padding="same"),
                BatchNormalization(),
                ReLU(),
                MaxPool3D(pool_size=2, strides=(2, 1, 1), padding="same"),
                Dropout(dropout)
            ], name=f"conv3d_net_{i}")
            self.conv3d_nets.append(conv3d_net)
        self.global_maxpool3d = GlobalMaxPool3D()

        # self.masking = Masking(mask_value=0.0)
        self.conv1d_nets = []
        for i in range(len(filters_1d)):
            conv1d_net = Sequential([
                Conv1D(filters_1d[i], kernel_size1d, padding="same", kernel_initializer="he_normal"),
                BatchNormalization(),
                ELU(),
                MaxPool1D(pool_size=2, strides=2, padding="same"),
                Dropout(dropout)
            ], name=f"conv1d_net_{i}")
            self.conv1d_nets.append(conv1d_net)

        self.global_maxpool1d = GlobalMaxPool1D()


        if regularizer == "l1":
            self.dense0 = Dense(hidden_size, activation="relu", 
                                kernel_regularizer=L1(l1_penalty),
                                bias_regularizer=L1(l1_penalty))
            self.dense1_0 = Dense(1, activation="sigmoid",
                                  kernel_regularizer=L1(l1_penalty),
                                  bias_regularizer=L1(l1_penalty))
            self.dense1_1 = Dense(18, activation="softmax", kernel_initializer="glorot_normal",
                                  kernel_regularizer=L1(l1_penalty),
                                  bias_regularizer=L1(l1_penalty))
        elif regularizer == "l2":
            self.dense0 = Dense(hidden_size, activation="relu", 
                                kernel_regularizer=L2(l2_penalty),
                                bias_regularizer=L2(l2_penalty))
            self.dense1_0 = Dense(1, activation="sigmoid", 
                                  kernel_regularizer=L2(l2_penalty),
                                  bias_regularizer=L2(l2_penalty))
            self.dense1_1 = Dense(18, activation="softmax", kernel_initializer="glorot_normal",
                                  kernel_regularizer=L2(l2_penalty),
                                  bias_regularizer=L2(l2_penalty))
        elif regularizer == "l1l2":
            self.dense0 = Dense(hidden_size, activation="relu", 
                                kernel_regularizer=L1L2(l1_penalty, l2_penalty),
                                bias_regularizer=L1L2(l1_penalty, l2_penalty))
            self.dense1_0 = Dense(1, activation="sigmoid", 
                                  kernel_regularizer=L1L2(l1_penalty, l2_penalty),
                                  bias_regularizer=L1L2(l1_penalty, l2_penalty))
            self.dense1_1 = Dense(18, activation="softmax", kernel_initializer="glorot_normal",
                                  kernel_regularizer=L1L2(l1_penalty, l2_penalty),
                                  bias_regularizer=L1L2(l1_penalty, l2_penalty))
        else:
            self.dense0 = Dense(hidden_size, activation="relu")
            self.dense1_0 = Dense(1, activation="sigmoid")
            self.dense1_1 = Dense(18, activation="softmax")

        self.binary = binary

    def build(self, input_shapes, training=False):
        image_shape = input_shapes[0]
        for i in range(len(self.conv3d_nets)):
            self.conv3d_nets[i].build(image_shape)
            image_shape = self.conv3d_nets[i].compute_output_shape(image_shape)
        self.global_maxpool3d.build(image_shape)
        image_shape = self.global_maxpool3d.compute_output_shape(image_shape)

        time_series_shape = input_shapes[1]
        for i in range(len(self.conv1d_nets)):
            self.conv1d_nets[i].build(time_series_shape)
            time_series_shape = self.conv1d_nets[i].compute_output_shape(time_series_shape)
        self.global_maxpool1d.build(time_series_shape)
        time_series_shape = self.global_maxpool1d.compute_output_shape(time_series_shape)

        shape = (image_shape[0], image_shape[1] + time_series_shape[1] + input_shapes[2][1])

        self.dense0.build(shape)
        shape = self.dense0.compute_output_shape(shape)

        self.dense1_0.build(shape)
        self.dense1_1.build(shape)
        

    def call(self, inputs, training=False):
        image_out = inputs[0]
        for i in range(len(self.conv3d_nets)):
            image_out = self.conv3d_nets[i](image_out, training=training)
        image_out = self.global_maxpool3d(image_out)   # (batch, filters)

        time_series_out = inputs[1]
        for i in range(len(self.conv1d_nets)):
            time_series_out = self.conv1d_nets[i](time_series_out, training=training)
        time_series_out = self.global_maxpool1d(time_series_out)

        out = tf.concat([image_out, time_series_out, inputs[2]], axis=-1)
        out = self.dense0(out)
        if self.binary:
            out = self.dense1_0(out)
        else:
            out = self.dense1_1(out)
        return out

    def set_binary(self):
        self.binary = True
        if self.dense1_0.build == False:
            self.dense1_0.build(input_shape=(None, self.hidden_size))

    def set_multi(self):
        self.binary = False
        if self.dense1_1.build == False:
            self.dense1_1.build(input_shape=(None, self.hidden_size))

    def freeze_conv_timeseries(self):
        for i in range(len(self.conv3d_nets)):
            self.conv3d_nets[i].trainable = False
        for i in range(len(self.conv1d_nets)):
            self.conv1d_nets[i].trainable = False
        self.rnn.trainable = False

In [6]:
for i, (inputs, targets) in enumerate(type_train_ds):
    if i == 0:
        print(inputs[0].shape)
        print(inputs[1].shape)
        print(inputs[2].shape)
        break

(16, 70, 8, 8, 5)
(16, 70, 19)
(16, 17)


In [7]:
import json

with open('/kaggle/input/cmi-tf-datasets/sample_weight.json') as f:
    sample_weight_dict = json.load(f)

type_sample_weight = sample_weight_dict["type_sample_weight"]
type_class_weight = {0: type_sample_weight[0], 1: type_sample_weight[1]}

gesture_sample_weight = sample_weight_dict["gesture_sample_weight"]
gesture_class_weight = {i: gesture_sample_weight[i] for i in range(len(gesture_sample_weight))}

In [8]:
def add_type_sample_weight(x, y):
    y_int = tf.cast(y, tf.int32)
    weight = tf.gather([type_class_weight[0], type_class_weight[1]], y_int)
    return x, y, weight

def add_gesture_sample_weight(x, y):
    y_arg = tf.argmax(y)
    weight = tf.gather(gesture_sample_weight, y_arg)
    return x, y, weight

In [9]:
type_train_ds = type_train_ds.unbatch().map(add_type_sample_weight).batch(32)
type_valid_ds = type_valid_ds.unbatch().map(add_type_sample_weight).batch(32)

In [10]:
# gesture_train_ds = gesture_train_ds.unbatch().map(add_gesture_sample_weight).batch(16)
# gesture_valid_ds = gesture_valid_ds.unbatch().map(add_gesture_sample_weight).batch(16)
gesture_train_ds = gesture_train_ds.unbatch().batch(16)
gesture_valid_ds = gesture_valid_ds.unbatch().batch(16)

In [29]:
from tensorflow.keras.losses import BinaryCrossentropy, CategoricalCrossentropy, CategoricalFocalCrossentropy
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import BinaryAccuracy, F1Score, AUC

model = CNNModel(kernel_size3d=(3, 3, 3), kernel_size1d=3, filters_3d=[8, 16, 32, 64], dropout=0.3,
                 filters_1d=[32, 64, 128, 256], hidden_size=128, regularizer="l1l2", 
                 l1_penalty=1e-4, l2_penalty=1e-4, binary=False)
model.build(input_shapes=((None, 150, 8, 8, 5), (None, 150, 19), (None, 17)))
# model.set_multi()
model.compile(loss=BinaryCrossentropy(),
              optimizer=Adam(learning_rate=1e-3),
              metrics=["accuracy"])
model.summary()
# model.freeze_conv_timeseries()

In [30]:
# from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler
# checkpoint_filepath = "/kaggle/working/cmi_binary_best_model.weights.h5"
# checkpoint_callback = ModelCheckpoint(
#     filepath=checkpoint_filepath,
#     monitor="val_loss",
#     save_best_only=True,
#     save_weights_only=True,
#     mode="min",
#     verbose=1
# )

# history = model.fit(type_train_ds, epochs=50, 
#                     validation_data=type_valid_ds,
#                     callbacks=[checkpoint_callback])
# model.load_weights(checkpoint_filepath)

In [31]:
model.set_multi()
model.compile(loss=CategoricalFocalCrossentropy(alpha=gesture_sample_weight, 
                                                gamma=3,
                                                label_smoothing=0.1),
              optimizer=Adam(learning_rate=1e-3),
              metrics=["accuracy"])

In [32]:
def scheduler(epoch):
    lr = 0.0005
    if epoch < 50:
        return lr
    else:
        return lr * (1/10)

In [33]:
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler
checkpoint_filepath = "/kaggle/working/cmi_best_model.weights.h5"
checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filepath,
    monitor="val_loss",
    save_best_only=True,
    save_weights_only=True,
    mode="min",
    verbose=1
)

lr_callback = LearningRateScheduler(scheduler, verbose=0)

history = model.fit(gesture_train_ds, epochs=200, 
                    validation_data=gesture_valid_ds,
                    callbacks=[checkpoint_callback])
model.load_weights(checkpoint_filepath)

Epoch 1/200
    505/Unknown [1m31s[0m 26ms/step - accuracy: 0.1758 - loss: 6.8743
Epoch 1: val_loss improved from inf to 5.20591, saving model to /kaggle/working/cmi_best_model.weights.h5
[1m505/505[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 32ms/step - accuracy: 0.1758 - loss: 6.8718 - val_accuracy: 0.2697 - val_loss: 5.2059
Epoch 2/200
[1m505/505[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 9ms/step - accuracy: 0.3059 - loss: 4.5506
Epoch 2: val_loss improved from 5.20591 to 4.68174, saving model to /kaggle/working/cmi_best_model.weights.h5
[1m505/505[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 10ms/step - accuracy: 0.3059 - loss: 4.5504 - val_accuracy: 0.3211 - val_loss: 4.6817
Epoch 3/200
[1m502/505[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 9ms/step - accuracy: 0.3685 - loss: 4.0856
Epoch 3: val_loss improved from 4.68174 to 4.49060, saving model to /kaggle/working/cmi_best_model.weights.h5
[1m505/505[0m [32m━━━━━━━━━━━━━━━━━━━━[

KeyboardInterrupt: 

In [None]:
import json

with open('/kaggle/input/cmi-tf-datasets/mapping.json') as f:
    mapping_dict = json.load(f)

gesture_mapping = mapping_dict["gesture_mapping"]
inv_gesture_mapping = {value: key for key, value in gesture_mapping.items()}

num2gesture = np.vectorize(lambda x: inv_gesture_mapping[x])

In [None]:
non_target_gestures = ["Drink from bottle/cup", "Glasses on/off", "Pull air toward your face",
                       "Pinch knee/leg skin", "Scratch knee/leg skin", "Write name on leg",
                       "Text on phone", "Feel around in tray and pull out an object",
                       "Write name in air", "Wave hello"]

def map_non_target(y_ind):
    y_pred = inv_gesture_mapping[y_ind]
    if y_ind == 3:
        y_ind = 2
    elif y_ind == 4:
        y_ind = 3
    elif y_ind == 6:
        y_ind = 4
    elif y_ind == 7:
        y_ind = 5
    elif y_ind == 9:
        y_ind = 6
    elif y_ind == 10:
        y_ind = 7
    if y_pred in non_target_gestures:
        y_ind = 8
    return y_ind

vectorize_map_non_target = np.vectorize(map_non_target)

In [None]:
conf_tensor = np.zeros((9, 9), dtype=np.int32)
for i, (inputs, labels) in enumerate(gesture_valid_ds):
    labels_pred = model.predict(inputs, verbose=0)
    labels_pred = tf.argmax(labels_pred, axis=-1).numpy()
    labels_pred = vectorize_map_non_target(labels_pred)
    labels_true = tf.argmax(labels, axis=-1).numpy()
    labels_true = vectorize_map_non_target(labels_true)
    conf_tensor += tf.math.confusion_matrix(labels_true, labels_pred, num_classes=9)

In [None]:
precisions = []
recalls = []
f1s = []

for i in range(9):
    column = conf_tensor[i, :]
    row = conf_tensor[:, i]
    precision = column[i] / tf.math.reduce_sum(column)
    recall = row[i] / tf.math.reduce_sum(row)
    inv_f1 = (1/precision + 1/recall)/2
    f1 = 1/inv_f1
    precisions.append(precision)
    recalls.append(recall)
    f1s.append(f1)
print(f"F1 score mean: {np.round(np.mean(f1s), 3)}")

In [None]:
# import os
# save_path = os.path.join("/kaggle/working/", "cmi_model.weights.h5")
# model.save_weights(save_path)

In [None]:
new_model = CNNModel(kernel_size3d=(3, 3, 5), kernel_size1d=5, filters_3d=[8, 16, 32, 64], dropout=0.2,
                     filters_1d=[32, 64, 128, 256], hidden_size=128, regularizer="l1l2", 
                     l1_penalty=1e-5, l2_penalty=1e-5, binary=False)
new_model.build(input_shapes=((None, 100, 8, 8, 5), (None, 100, 19), (None, 14)))
new_model.load_weights(checkpoint_filepath)
new_model.summary()