# Test model performance on own data

### 1. Import libraries:

In [None]:
import pandas as pd
import tensorflow as tf
import random
import keras
import matplotlib.pyplot as plt
from dataclasses import dataclass
import os

In [None]:
import os


path = "./gesture_data/"

for root, dirs, files in os.walk(path, topdown=False):
    for name in dirs:
        old_path = os.path.join(root, name)
        new_name = name.replace(" ", "_")
        new_path = os.path.join(root, new_name)
        if old_path != new_path:
            os.rename(old_path, new_path)

### 2. Load Dataframe:

In [None]:
JESTER_DATASET_DIR_PATH = os.path.join("..", "..", "datasets")
columns = ["label"]
labels_info = pd.read_csv(os.path.join(JESTER_DATASET_DIR_PATH, "info", "jester-v1-labels.csv"), names=columns)

In [None]:
own_test_df = pd.read_csv(os.path.join(".", "gesture_data", "own_dataset.csv"))

In [None]:
own_test_df.head(10)

#### Change paths to be compatible with pipeline

In [None]:
def edit_path(path: str) -> str:
    return "/".join(path.split("/")[:-1])

own_test_df["path"] = own_test_df["path"].apply(lambda path: edit_path(path))

In [None]:
own_test_df.head(10)

In [None]:
own_test_df.drop_duplicates(inplace=True)

In [None]:
own_test_df.head(10)

In [None]:
jester_labels = {}
jester_labels_id = {}
with open(os.path.join(JESTER_DATASET_DIR_PATH, "info", 'jester-v1-labels.csv')) as f:
    for idx2, line in enumerate(f):
        jester_labels[line.strip()] = idx2
        jester_labels_id[idx2] = line.strip()

In [None]:
own_test_df["label"] = own_test_df["label_id"].apply(lambda label_id: jester_labels_id.get(label_id))

In [None]:
own_test_df.head(10)

In [None]:
own_test_df["path"] = own_test_df["path"].apply(lambda path: path.replace(" ", "_"))

In [None]:
own_test_df.head(10)

In [None]:
own_test_df.head(10)

In [None]:
def get_sequence_len(path: str) -> int:
    return len([file for file in os.listdir(path) if file.endswith(".jpg")])

In [None]:
own_test_df["seq_len"] = own_test_df["path"].apply(lambda path: get_sequence_len(path))

### 3. Plot distributions:

In [None]:
plt.figure(figsize=(10, 6))

own_test_df['label'].value_counts().plot(kind='bar')
plt.xlabel('Etykiety')
plt.ylabel('Ilość próbek')
_ = plt.title('Rozkład próbek po przetworzeniu we własnym zbiorze')
ax = plt.gca()
ax.set_axisbelow(True)
plt.grid()
plt.savefig("dataset_plots/own_dist.jpg")
plt.show()

In [None]:
plt.figure(figsize=(10, 6))
own_test_df['seq_len'].hist(bins=5) 
plt.xlabel('Długość sekwencji')
plt.ylabel('Liczba wystąpień')
plt.title('Rozkład długości próbek w zbiorze')
ax = plt.gca()
ax.set_axisbelow(True)
plt.grid()
plt.savefig("dataset_plots/seq_len_distribution.jpg")
plt.show()

### 4. Create Pipeline:

In [None]:
CLASSES = [i for i in range(27)]
CLASSES_PERCENTAGE = [1/len(CLASSES) for _ in range(len(CLASSES))]
CLASSES_WITH_PERCENTAGE = {cls: percentage for cls, percentage in zip(CLASSES, CLASSES_PERCENTAGE)}
CLASS_MAPPING = {cls: idx for idx, cls in enumerate(CLASSES)}

In [None]:
def create_dataset_from_dir(df: pd.DataFrame, seq_len: int, class_mapping: dict[int, int]):
    sequences = []
    labels = []

    for _, row in df.iterrows():
        video_path = row['path']
        label_id = row['label_id']
        image_paths = generate_image_paths(video_path, seq_len, row)
        sequences.append(image_paths)
        mapped_label_id = class_mapping[label_id]
        labels.append(mapped_label_id)

    dataset = tf.data.Dataset.from_tensor_slices((sequences, labels))
    return dataset


@dataclass
class InputShape:
    weight: int
    height: int
    channels: int

    def as_tuple(self) -> tuple[int, int, int]:
        return self.weight, self.height, self.channels


def generate_image_paths(video_dir_path: str, seq_len: int, row: pd.Series):
    image_paths = []
    current_frames: int = row["seq_len"]

    if current_frames >= seq_len:
        mid_point = current_frames // 2
        start_point = max(0, mid_point - seq_len // 2)
        end_point = start_point + seq_len

        for i in range(start_point + 1, end_point + 1):
            img_name = f"frame_{i:03d}.jpg"
            img_path = os.path.join(video_dir_path, img_name)
            image_paths.append(img_path)

    else:
        padding_needed = seq_len - current_frames
        left_padding = padding_needed // 2
        right_padding = padding_needed - left_padding

        most_left_img = f"frame_001.jpg"
        most_right_img = f"frame_{current_frames:03d}.jpg"

        for i in range(left_padding):
            img_path = os.path.join(video_dir_path, most_left_img)
            image_paths.append(img_path)

        for i in range(1, current_frames + 1):
            img_name = f"frame_{i:03d}.jpg"
            img_path = os.path.join(video_dir_path, img_name)
            image_paths.append(img_path)

        for i in range(right_padding):
            img_path = os.path.join(video_dir_path, most_right_img)
            image_paths.append(img_path)

    if len(image_paths) != seq_len:
        raise ValueError(f"Missing images in dir: {video_dir_path}")

    return image_paths


@tf.function
def preprocess_frames(frames, label):
    # flip = random.choice([True, False])
    brightness_delta = random.uniform(-0.2, 0.2)
    contrast_factor = random.uniform(0.8, 1.2)
    saturation_factor = random.uniform(0.8, 1.2)
    hue_delta = random.uniform(-0.02, 0.02)
    noise_stddev = 0.05

    augmentations = [
        # lambda img: tf.image.flip_left_right(img) if flip else img,
        lambda img: tf.image.adjust_brightness(img, brightness_delta),
        lambda img: tf.image.adjust_contrast(img, contrast_factor),
        lambda img: tf.image.adjust_saturation(img, saturation_factor),
        lambda img: tf.image.adjust_hue(img, hue_delta),
        lambda img: img + tf.random.normal(tf.shape(img), mean=0.0, stddev=noise_stddev)
    ]

    chosen_augmentations = random.sample(augmentations, 2)

    def process_image(img):
        for aug in chosen_augmentations:
            img = aug(img)
        return img

    preprocessed_images = tf.map_fn(process_image, frames, fn_output_signature=tf.float32)

    return preprocessed_images, label


@tf.function
def load_sequence_from_dir(image_paths: tf.Tensor, label: int, inp_shape: tuple[int, int, int]):
    def process_image(img_path):
        image = tf.io.read_file(img_path)
        image = tf.image.decode_jpeg(image, channels=inp_shape[2])
        image = tf.image.resize(image, inp_shape[:2])
        return image

    preprocessed_images = tf.map_fn(process_image, image_paths, fn_output_signature=tf.float32)

    return preprocessed_images, label


@tf.function
def one_hot_encode(path: tf.Tensor, label: tf.Tensor, classes_num: int):
    return path, tf.one_hot(label, classes_num, dtype=tf.int32)


@tf.function
def remove_one_dimensions(images: tf.Tensor, label: int):
    return tf.squeeze(images), label


@tf.function
def normalize_frames(frames, label):
    return (tf.cast(frames, tf.float32) / 255.0), label


@tf.function
def add_dimension(frames, label):
    frames = tf.expand_dims(frames, axis=0) 
    return frames, label

def create_pipeline(df: pd.DataFrame, *, num_classes: int, image_input_shape: InputShape, seq_len: int, batch_size: int, class_mapping: dict[int, int], is_training: bool = False, cache: bool = False, normalize: bool = False) -> tf.data.Dataset:
    ds = create_dataset_from_dir(df, seq_len=seq_len, class_mapping=class_mapping) # (list with paths strs)
    ds = ds.map(lambda images, label: one_hot_encode(images, label, num_classes), num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.map(lambda path, label: load_sequence_from_dir(path, label, image_input_shape.as_tuple()), num_parallel_calls=tf.data.AUTOTUNE) # (seq_len, width, height, channels)
    if is_training:
        ds = ds.map(lambda frames, label: preprocess_frames(frames, label), num_parallel_calls=tf.data.AUTOTUNE)
    if normalize:
        ds = ds.map(lambda frames, label: normalize_frames(frames, label), num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch_size=batch_size, drop_remainder=True)
    if cache:
        ds = ds.cache()
    ds = ds.prefetch(tf.data.AUTOTUNE)
    return ds


class PipelineConfig:
    IMAGE_INPUT_SHAPE = InputShape(224, 224, 3)
    NUM_CLASSES = len(CLASSES)
    BATCH_SIZE = 1
    SEQ_LEN = 34

own_test_ds = create_pipeline(
    own_test_df,
    num_classes=PipelineConfig.NUM_CLASSES,
    image_input_shape=PipelineConfig.IMAGE_INPUT_SHAPE,
    seq_len=PipelineConfig.SEQ_LEN,
    batch_size=PipelineConfig.BATCH_SIZE,
    class_mapping=CLASS_MAPPING,
    is_training=False
)

### 5. Build model and load weights:

In [None]:
def build_model_from_articles(input_shape: tuple[int, int], num_classes: int):
    mobilenet_model = keras.applications.MobileNetV3Large(
        include_top=False,
        weights="imagenet",
        input_shape=PipelineConfig.IMAGE_INPUT_SHAPE.as_tuple(),
        include_preprocessing=True,
        pooling="avg",
    )
    model = keras.models.Sequential()
    model.add(keras.layers.Input(input_shape))
    model.add(keras.layers.TimeDistributed(mobilenet_model))
    for layer in model.layers:
        layer.trainable = False

    model.add(keras.layers.LSTM(units=256, return_sequences=True, kernel_regularizer=keras.regularizers.l2(0.0001), name="LSTM_1"))
    model.add(keras.layers.Dropout(0.2, name="LSTM_DROPOUT_1"))
    model.add(keras.layers.LayerNormalization(name="LSTM_LNORM_1"))
    model.add(keras.layers.LSTM(units=256, return_sequences=True, kernel_regularizer=keras.regularizers.l2(0.0001), name="LSTM_2"))
    model.add(keras.layers.Dropout(0.2, name="LSTM_DROPOUT_2"))
    model.add(keras.layers.LayerNormalization(name="LSTM_LNORM_2"))
    model.add(keras.layers.LSTM(units=256, return_sequences=False, kernel_regularizer=keras.regularizers.l2(0.0001), name="LSTM_3"))
    model.add(keras.layers.Dropout(0.2, name="LSTM_DROPOUT_3"))
    model.add(keras.layers.LayerNormalization(name="LSTM_LNORM_3"))
    model.add(keras.layers.Dense(units=256, activation="tanh", kernel_regularizer=keras.regularizers.l2(0.001), name="DENSE_1"))
    model.add(keras.layers.Dropout(0.1, name="DENSE_DROPOUT_1"))
    model.add(keras.layers.LayerNormalization(name="DENSE_LNORM_1"))
    model.add(keras.layers.Dense(units=num_classes, name="DENSE_OUTPUT"))

    optimizer = keras.optimizers.Adam(0.001)

    model.compile(
    optimizer=optimizer,
    loss=keras.losses.CategoricalCrossentropy(from_logits=True),
        metrics=[
        'accuracy',
        'precision',
        'recall',
        keras.metrics.F1Score(average='weighted')
        ]
    )
    return model


def build_model_from_articles_conv3D(input_shape: tuple[int, int, int, int], num_classes: int):
    model = keras.models.Sequential()
    model.add(keras.layers.Input(input_shape))

    model.add(keras.layers.Conv3D(32, kernel_size=(3, 3, 3), strides=(1, 1, 1), kernel_regularizer=keras.regularizers.l2(0.0001), activation='relu', padding='same'))
    model.add(keras.layers.BatchNormalization())
    model.add(keras.layers.MaxPooling3D(pool_size=(1, 2, 2)))
    model.add(keras.layers.Conv3D(64, kernel_size=(3, 3, 3), strides=(1, 1, 1), kernel_regularizer=keras.regularizers.l2(0.0001), activation='relu', padding='same'))
    model.add(keras.layers.BatchNormalization())
    model.add(keras.layers.MaxPooling3D(pool_size=(1, 2, 2)))
    model.add(keras.layers.Conv3D(128, kernel_size=(3, 3, 3), strides=(1, 1, 1), kernel_regularizer=keras.regularizers.l2(0.0001), activation='relu', padding='same'))
    model.add(keras.layers.BatchNormalization())
    model.add(keras.layers.MaxPooling3D(pool_size=(1, 2, 2)))
    model.add(keras.layers.Conv3D(256, kernel_size=(3, 3, 3), strides=(1, 1, 1), kernel_regularizer=keras.regularizers.l2(0.0001), activation='relu', padding='same'))
    model.add(keras.layers.BatchNormalization())
    model.add(keras.layers.Conv3D(256, kernel_size=(3, 3, 3), strides=(1, 1, 1), kernel_regularizer=keras.regularizers.l2(0.0001), activation='relu', padding='same'))
    model.add(keras.layers.BatchNormalization())
    model.add(keras.layers.Conv3D(256, kernel_size=(3, 3, 3), strides=(1, 1, 1), kernel_regularizer=keras.regularizers.l2(0.0001), activation='relu', padding='same'))
    model.add(keras.layers.BatchNormalization())
    model.add(keras.layers.GlobalMaxPooling3D())
    model.add(keras.layers.RepeatVector(1))
    model.add(keras.layers.LSTM(256, kernel_regularizer=keras.regularizers.l2(0.0001), return_sequences=True))
    model.add(keras.layers.LSTM(256, kernel_regularizer=keras.regularizers.l2(0.0001)))
    model.add(keras.layers.LayerNormalization(name="DENSE_LNORM_1"))
    model.add(keras.layers.Dense(units=256, activation="relu", kernel_regularizer=keras.regularizers.l2(0.0001), name="DENSE_LAYER"))
    model.add(keras.layers.LayerNormalization(name="DENSE_LNORM_2"))
    model.add(keras.layers.Dense(units=num_classes, name="DENSE_OUTPUT"))

    optimizer = keras.optimizers.Adam(0.001)
    model.compile(
        optimizer=optimizer,
        loss=keras.losses.CategoricalCrossentropy(from_logits=True),
        metrics=[
            'accuracy',
            'precision',
            'recall',
            keras.metrics.F1Score(average='weighted')
        ]
    )

    return model

In [None]:
lstm_model = build_model_from_articles(input_shape=(PipelineConfig.SEQ_LEN, 1280), num_classes=len(CLASSES))
# lstm_model = build_model_from_articles_conv3D(input_shape=(PipelineConfig.SEQ_LEN, *PipelineConfig.IMAGE_INPUT_SHAPE.as_tuple()), num_classes=len(CLASSES))

In [None]:
lstm_model.load_weights('../MobileNetLSTM/model_weights.weights.h5')

In [None]:
results = lstm_model.evaluate(own_test_ds)

print("Test Loss:", results[0])
print("Test Accuracy:", results[1])
print("Test Precision:", results[2])
print("Test Recall:", results[3])
print("Test F1 Score:", results[4])

In [None]:
import numpy as np
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
y_pred = []
y_true = []

for images, labels in own_test_ds:
    predictions = lstm_model.predict(images)
    predicted_classes = np.argmax(predictions, axis=1)

    true_classes = np.argmax(labels, axis=1) if labels.shape[-1] > 1 else labels.numpy()

    y_pred.extend(predicted_classes)
    y_true.extend(true_classes)


cm = confusion_matrix(y_true, y_pred, labels=range(len(CLASSES)))

jester_labels = {}
with open(os.path.join(JESTER_DATASET_DIR_PATH, "info", 'jester-v1-labels.csv')) as f:
    for idx, line in enumerate(f):
        jester_labels[idx] = line.strip()

display_info = [jester_labels.get(cls) for cls in CLASSES]

disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=display_info)

disp.plot(
    cmap=plt.cm.Blues,
    xticks_rotation=45,  
    # values_format='.2f'
    values_format='.0f'
)

fig = disp.figure_
fig.set_figwidth(30)
fig.set_figheight(30)

plt.xlabel('Przewidziana Etykieta', fontsize=20)
plt.ylabel('Prawdziwa Etykieta', fontsize=20)

plt.xticks(fontsize=15, rotation=45, ha='right')
plt.yticks(fontsize=15, rotation=0, va="center")

for text in disp.text_.ravel():
    text.set_fontsize(14)

plt.savefig('conf_mat_BEST_norm_test_mobilenet.png')
print("Plots saved as 'conf_BEST_test_norm.png'")

In [None]:
lstm_model.summary()