In [1]:
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, MaxPooling2D, Flatten, Dense, Dropout, GlobalAveragePooling2D, Conv1D, TimeDistributed, GlobalAveragePooling1D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import load_model
import matplotlib.pyplot as plt
import os
import numpy as np
import tensorflow as tf
from sklearn.metrics import classification_report, confusion_matrix
from tensorflow.keras.applications import ResNet50
from tensorflow.keras import backend as K
import time
import numpy as np
import gc
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions
from sklearn.preprocessing import StandardScaler
import cv2
import math
from tensorflow import keras
from tensorflow.keras import layers, models
from tensorflow.python.profiler import model_analyzer, option_builder

In [2]:
def get_flops(model):
    input_signature = [
    tf.TensorSpec(
        shape=(1, *params.shape[1:]), 
        dtype=params.dtype, 
        name=params.name
    ) for params in model.inputs
]
    forward_graph = tf.function(model, input_signature).get_concrete_function().graph
    options = option_builder.ProfileOptionBuilder.float_operation()
    graph_info = model_analyzer.profile(forward_graph, options=options)
    # The //2 is necessary since `profile` counts multiply and accumulate
    # as two flops, here we report the total number of multiply accumulate ops
    flops = graph_info.total_float_ops
    return flops

In [2]:
# Adapted from https://github.com/pytorch/vision/blob/v0.4.0/torchvision/models/resnet.py
import math
from tensorflow import keras
from tensorflow.keras import layers, Model, Input
from tensorflow.keras.layers import TimeDistributed, LSTM, Dense

kaiming_normal = keras.initializers.VarianceScaling(scale=2.0, mode='fan_out', distribution='untruncated_normal')

def conv3x3(x, out_planes, stride=1, name=None):
    x = layers.ZeroPadding2D(padding=1, name=f'{name}_pad')(x)
    return layers.Conv2D(filters=out_planes, kernel_size=3, strides=stride, use_bias=False, kernel_initializer=kaiming_normal, name=name)(x)

def basic_block(x, planes, stride=1, downsample=None, name=None):
    identity = x

    out = conv3x3(x, planes, stride=stride, name=f'{name}.conv1')
    out = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name=f'{name}.bn1')(out)
    out = layers.ReLU(name=f'{name}.relu1')(out)

    out = conv3x3(out, planes, name=f'{name}.conv2')
    out = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name=f'{name}.bn2')(out)

    if downsample is not None:
        for layer in downsample:
            identity = layer(identity)

    out = layers.Add(name=f'{name}.add')([identity, out])
    out = layers.ReLU(name=f'{name}.relu2')(out)

    return out

def make_layer(x, planes, blocks, stride=1, name=None):
    downsample = None
    inplanes = x.shape[3]
    if stride != 1 or inplanes != planes:
        downsample = [
            layers.Conv2D(filters=planes, kernel_size=1, strides=stride, use_bias=False, kernel_initializer=kaiming_normal, name=f'{name}.0.downsample.0'),
            layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name=f'{name}.0.downsample.1'),
        ]

    x = basic_block(x, planes, stride, downsample, name=f'{name}.0')
    for i in range(1, blocks):
        x = basic_block(x, planes, name=f'{name}.{i}')

    return x

def resnet(x, blocks_per_layer, num_classes=1000):
    x = layers.ZeroPadding2D(padding=3, name='conv1_pad')(x)
    x = layers.Conv2D(filters=64, kernel_size=7, strides=2, use_bias=False, kernel_initializer=kaiming_normal, name='conv1')(x)
    x = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='bn1')(x)
    x = layers.ReLU(name='relu1')(x)
    x = layers.ZeroPadding2D(padding=1, name='maxpool_pad')(x)
    x = layers.MaxPool2D(pool_size=3, strides=2, name='maxpool')(x)

    x = make_layer(x, 64, blocks_per_layer[0], name='layer1')
    x = make_layer(x, 128, blocks_per_layer[1], stride=2, name='layer2')
    x = make_layer(x, 256, blocks_per_layer[2], stride=2, name='layer3')
    x = make_layer(x, 512, blocks_per_layer[3], stride=2, name='layer4')

    x = layers.GlobalAveragePooling2D(name='avgpool')(x)
    initializer = keras.initializers.RandomUniform(-1.0 / math.sqrt(512), 1.0 / math.sqrt(512))
    x = layers.Dense(units=num_classes, kernel_initializer=initializer, bias_initializer=initializer, name='fc')(x)

    return x

def resnet18(x, **kwargs):
    # First convolution: 3×3, 64 filters, stride=1.
    x = conv3x3(x, 64, stride=1, name='conv1')
    x = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='bn1')(x)
    x = layers.ReLU(name='relu1')(x)

    # Residual blocks:
    x = make_layer(x, 64, 2, name='layer1')
    x = make_layer(x, 128, 2, stride=2, name='layer2')
    x = make_layer(x, 256, 2, stride=2, name='layer3')
    x = make_layer(x, 512, 2, stride=2, name='layer4')

    # Global average pooling: converts feature map to a vector (should be 512-dim).
    x = layers.GlobalAveragePooling2D(name='avgpool')(x)
    
    x = layers.Dropout(0.5, name='dropout')(x)
    
    return x

def resnet34(x, **kwargs):
    return resnet(x, [3, 4, 6, 3], **kwargs)

In [None]:
input_tensor = tf.keras.Input(shape=(32, 32, 2))
output_tensor = resnet18(input_tensor)
base_model = Model(inputs=input_tensor, outputs=output_tensor)

sequence_input = tf.keras.Input(shape=(20, 32, 32, 2))

x = TimeDistributed(base_model)(sequence_input)

x = LSTM(512, return_sequences=False)(x)

x = Dense(8, activation='softmax')(x)

model = Model(inputs=sequence_input, outputs=x)

In [None]:
model.summary()

In [None]:
# Resnet 18
ResNet18_flops = get_flops(model)
print('FLOPs: ', ResNet18_flops)
print('GFLOPs:', ResNet18_flops / 1e9)
total_params = model.count_params()
print(f"Parameters: {total_params:,}")

In [None]:
model.summary()

In [None]:
# Load Dataset
x_train_gesture  = np.load('traindata_bath_concate/data_bath_train.npy')
gesture_labels_train = np.load('traindata_bath_concate/label_bath_train.npy')
x_test_gesture = np.load('traindata_bath_concate/data_bath_test.npy')
gesture_labels_test = np.load('traindata_bath_concate/label_bath_test.npy')
x_val_gesture = np.load('traindata_bath_concate/data_bath_val.npy')
gesture_labels_val = np.load('traindata_bath_concate/label_bath_val.npy')

x_train_gesture = x_train_gesture.reshape(-1, 20, 32, 32, 2)
x_test_gesture = x_test_gesture.reshape(-1, 20, 32, 32, 2)
x_val_gesture = x_val_gesture.reshape(-1, 20, 32, 32, 2)

print("Gesture data train shape:", np.shape(x_train_gesture))
print("Gesture labels train shape:", np.shape(gesture_labels_train))

label_encoder = LabelEncoder()
gesture_labels_train_encoded = label_encoder.fit_transform(gesture_labels_train)
gesture_labels_test_encoded = label_encoder.transform(gesture_labels_test)
gesture_labels_val_encoded = label_encoder.transform(gesture_labels_val)

gesture_labels_train_one_hot = to_categorical(gesture_labels_train_encoded).reshape(-1, 20, 8)
gesture_labels_test_one_hot = to_categorical(gesture_labels_test_encoded).reshape(-1, 20, 8)
gesture_labels_val_one_hot = to_categorical(gesture_labels_val_encoded).reshape(-1, 20, 8)

gesture_labels_train_one_hot = gesture_labels_train_one_hot[:, -1, :]
gesture_labels_test_one_hot = gesture_labels_test_one_hot[:, -1, :]
gesture_labels_val_one_hot = gesture_labels_val_one_hot[:, -1, :]

print("Reshaped Gesture data train shape:", x_train_gesture.shape)
print("Reshaped Gesture labels train shape:", gesture_labels_train_one_hot.shape)

In [6]:
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.00001)

model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])

In [10]:
def AGIprogressBar(count, total,start):
    bar_len = 60
    filled_len = int(round(bar_len * count / float(total)))

    percents = round(100.0 * count / float(total), 1)
    bar = '=' * filled_len + '-' * (bar_len - filled_len)
    duration=time.time()-start
    print('\r[%s] %s%s ...%s sec' % (bar, percents, '%', duration),end=' ')

In [5]:
indices = np.random.permutation(len(x_train_gesture))
x_train_gesture = np.array(x_train_gesture)
gesture_labels_train_one_hot = np.array(gesture_labels_train_one_hot)

x_train_gesture = x_train_gesture[indices]
gesture_labels_train_one_hot = gesture_labels_train_one_hot[indices]

In [12]:
def evaluate_data(model, x_data, y_data, batch_size, num_segments=10):
    segment_size = len(x_data) // num_segments
    total_loss = 0
    correct_predictions = 0
    total_samples = 0

    for i in range(num_segments):
        start_idx = i * segment_size
        end_idx = (i + 1) * segment_size if i < num_segments - 1 else len(x_data)

        x_segment = x_data[start_idx:end_idx]
        y_segment = y_data[start_idx:end_idx]

        result = model.evaluate(x_segment, y_segment, batch_size=batch_size, verbose=0)
        segment_loss = result[0]
        predictions = model.predict(x_segment, batch_size=batch_size, verbose=0)

        total_loss += segment_loss * len(y_segment)

        correct_predictions += np.sum(np.argmax(predictions, axis=1) == np.argmax(y_segment, axis=1))
        total_samples += len(y_segment)

    train_loss = total_loss / total_samples
    train_acc = correct_predictions / total_samples

    return train_loss, train_acc

In [None]:
Batch = 16
epochs = 100
segment_count = 5
rec = []
st = time.time()
train_accuracies = []
val_accuracies = []
train_losses = []
val_losses = []

for ep in range(epochs):
    print(f'EP: {ep + 1}')
    segment_size = len(x_train_gesture) // segment_count

    for seg in range(segment_count):
        start_idx = seg * segment_size
        end_idx = start_idx + segment_size

        x_segment = x_train_gesture[start_idx:end_idx]
        y_segment = gesture_labels_train_one_hot[start_idx:end_idx]

        indices = np.arange(len(x_segment))
        np.random.shuffle(indices)
        
        # Apply the shuffled indices to the segments
        x_segment = x_segment[indices]
        y_segment = y_segment[indices]

        for i in range(len(x_segment) // Batch):
            AGIprogressBar(i, len(x_segment) // Batch, st)
            x_batch = x_segment[i * Batch:(i + 1) * Batch]
            y_batch = y_segment[i * Batch:(i + 1) * Batch]

            model.train_on_batch(x_batch, y_batch)

    train_loss, train_acc = evaluate_data(
        model=model,
        x_data=x_train_gesture,
        y_data=gesture_labels_train_one_hot,
        batch_size=3200,
        num_segments=20
    )

    train_losses.append(train_loss)
    train_accuracies.append(train_acc)
    print(' ')
    print(f'Epoch {ep + 1} Training Loss = {train_loss}')
    print(f'Epoch {ep + 1} Training ACC = {train_acc}')

    val_loss, val_acc = evaluate_data(
        model=model,
        x_data=np.array(x_val_gesture),
        y_data=np.array(gesture_labels_val_one_hot),
        batch_size=3200,
        num_segments=20
    )

    val_losses.append(val_loss)
    val_accuracies.append(val_acc)

    print(f'Epoch {ep + 1} Validation Loss = {val_loss}')
    print(f'Epoch {ep + 1} Validation ACC = {val_acc}')

    os.makedirs('saved_model_Resnet', exist_ok=True)
    if (ep + 1) % 1 == 0:
        model.save(f'saved_model_Resnet/epoch_{ep + 1}.h5')
        print(f'Model saved at epoch {ep + 1}')
    tf.keras.backend.clear_session()
    gc.collect()
    model = load_model(f'saved_model_Resnet/epoch_{ep + 1}.h5')

plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(train_accuracies, label='Training Accuracy')
plt.plot(val_accuracies, label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(['Train', 'Validation'])

plt.subplot(1, 2, 2)
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(['Train', 'Validation'])

plt.show()

In [None]:
best_epoch_accuracy = val_accuracies.index(max(val_accuracies))
best_val_accuracy = val_accuracies[best_epoch_accuracy]
print(f"The best epoch based on validation accuracy is: {best_epoch_accuracy + 1}, with accuracy: {best_val_accuracy:.4f}")

In [None]:
model = load_model('saved_model_Resnet/epoch_19.h5')

In [None]:
from tensorflow.keras.layers import Dense, Softmax
lstm_output = model.get_layer('lstm').output

new_dense = Dense(8, name='dense', activation=None)(lstm_output)

new_output = Softmax(name='softmax')(new_dense)

new_model = Model(inputs=model.input, outputs=new_output)

new_model.summary()

In [6]:
# Set matplotlib parameters
plt.rcParams['font.size'] = 8
plt.rcParams['font.family'] = 'Times New Roman'
plt.rcParams['text.usetex'] = False

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

y_test = np.array(gesture_labels_test_one_hot)

y_true = np.argmax(y_test, axis=1)

y_pred_prob = model.predict(x_test_gesture)

y_pred = np.argmax(y_pred_prob, axis=1)

cm = (confusion_matrix(y_true, y_pred, normalize='true'))*100
accuracy = np.trace(cm) / np.sum(cm)
print(f"Accuracy: {accuracy:.10f}")

# Print the confusion matrix
print("Confusion Matrix:\n", cm)

# Gesture class labels
classes = ['DoublePat', 'FallDown', 'HorizontalSwipe', 'SlowUp', 'SwipeDown', 'SwipeLeft', 'SwipeRight', 'SwipeUp']


In [None]:
# Visualize the confusion matrix
fig, ax = plt.subplots(figsize=(5, 5))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=classes)
disp.plot(cmap=plt.cm.Blues, values_format='.2f', ax=ax, colorbar=False)
for text in disp.text_.ravel():
    if text.get_text():
        text.set_text(f"{float(text.get_text()):.2f}%")
plt.xticks(rotation=90, ha="right")
plt.title(r"ResNet-18 and LSTM for Hand Gesture Classification on D$_1$ (Bathroom Environment)")
plt.savefig('ResNet-18 and LSTM for Hand Gesture Classification on Bathroom Dataset.png', dpi=300, bbox_inches='tight')
#plt.subplots_adjust(left=0.2, right=0.3, top=0.9, bottom=0.3)
plt.show()

In [None]:
# Neuron analysis
# hook Layers
hook=[]
id=[0, 1, 2, 3, 4]
for i in range(len(id)):
  hook.append(new_model.layers[id[i]].output)
ModelExtract = Model(inputs=new_model.input, outputs=hook)

In [None]:
ModelExtract.output

In [None]:
indices = np.random.permutation(len(x_test_gesture))
x_test_gesture = np.array(x_test_gesture)
gesture_labels_test_one_hot = np.array(gesture_labels_test_one_hot)

x_test_gesture = x_test_gesture[indices]
gesture_labels_test_one_hot = gesture_labels_test_one_hot[indices]

In [None]:
predictions = ModelExtract.predict(x_test_gesture)
labels = np.argmax(gesture_labels_test_one_hot, axis=1)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

plt.rcParams['font.size'] = 12
plt.rcParams['font.family'] = 'Times New Roman'

classes = ['DoublePat', 'FallDown', 'HorizontalSwipe', 'SlowUp', 'SwipeDown', 'SwipeLeft', 'SwipeRight', 'SwipeUp']

label_to_color = {
    'DoublePat': [1, 0, 0, 1],           # Red
    'SwipeDown': [0, 0.5, 0, 1],         # Green
    'SwipeUp': [0, 0, 1, 1],             # Blue
    'SlowUp': [1, 0.65, 0, 1],           # Orange
    'HorizontalSwipe': [1, 1, 0, 1],     # Yellow
    'SwipeLeft': [0, 0, 0, 1],           # Black
    'FallDown': [0.93, 0.51, 0.93, 1],   # Violet
    'SwipeRight': [0.5, 0, 0.5, 1],      # Purple
}

fig, ax = plt.subplots(figsize=(8, 6))

for label, color in label_to_color.items():
    ax.scatter([], [], label=label, facecolors='none', edgecolors=color, marker='o', s=50)

ax.legend(loc='center', frameon=False, ncol=4)

ax.axis('off')

plt.savefig('legend_only_3x4.png', dpi=300, bbox_inches='tight', transparent=True)

plt.close()

In [None]:
from sklearn import decomposition
from matplotlib import pyplot as plt
PCA=[]

classes = ['DoublePat', 'FallDown', 'HorizontalSwipe', 'SlowUp', 'SwipeDown', 'SwipeLeft', 'SwipeRight', 'SwipeUp']

label_to_color = {
    'DoublePat': [1, 0, 0, 1],           # Red
    'SwipeDown': [0, 0.5, 0, 1],         # Green
    'SwipeUp': [0, 0, 1, 1],             # Blue
    'SlowUp': [1, 0.65, 0, 1],           # Orange
    'HorizontalSwipe': [1, 1, 0, 1],     # Yellow
    'SwipeLeft': [0, 0, 0, 1],           # Black
    'FallDown': [0.93, 0.51, 0.93, 1],   # Violet
    'SwipeRight': [0.5, 0, 0.5, 1],      # Purple
}

for i in range(len(predictions)):
    input=np.reshape(predictions[i],[len(predictions[i]),-1])
    pca=decomposition.PCA(n_components=2)
    pca.fit(input)
    PCA.append(pca)
    Dim2=PCA[-1].transform(input)
    plt.figure(figsize=(8, 6))
    if new_model.layers[id[i]].name == 'input_2':
        title = "Gesture Class Distribution in Our D$_1$ Dataset via PCA using ResNet-18 + LSTM (Input Layer)"
    elif new_model.layers[id[i]].name == 'dense':
        title = r"ResNet-18 + LSTM: D$_1$ (Bathroom Environment), " + model.layers[id[i]].name + ': '+f'{predictions[i].shape}'
    else:
        title = new_model.layers[id[i]].name+': '+f'{predictions[i].shape}'
    plt.title(title)
    for cl in range(8):
        chos=Dim2[np.argwhere(labels==cl).reshape([-1])]
        color = label_to_color[classes[cl]]
        plt.scatter(chos[:, 0], 
                   chos[:, 1], 
                   label=classes[cl],
                   facecolors='none', 
                   edgecolors=color,
                   marker='o', 
                   s=10)
    plt.legend(loc=1)