In [None]:
import os
import math
import joblib
import random
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt

from tqdm import tqdm

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"


# Configuration

In [1]:
class config(object):
    INFERENCE_TIME = 3.0  # ... Segment duration
    HOLD_TIME = 2.0  # ... Time before next prediction session
    SEGMENT_LEN = 256
    IMG_LEN = 224  # 1/9 of the 224x224 image
    N_CHANNELS = 1
    LEARNING_RATE = 0.0003
    PROJECTION_LANDMARKS = ["rp", "rf0", "rf1"]
    INFERENCE_FEATURES = [
        "rpx",
        "rpy",
        "rpz",
        "rf0x",
        "rf0y",
        "rf0z",
        "rf1x",
        "rf1y",
        "rf1z"
    ]
    MODEL_PREDICTION = "dev.atick.slr.model.prediction"
    FRAME_EVENT = "dev.atick.slr.frame"
    GESTURES = ["Good", "Bad", "Fine", "Hello", "Yes", "Deaf", "Me",
                "No", "Please", "Sorry", "Thank You", "You", "Hungry", "Goodbye"]
    FEATURE_NAMES = [
        "time",
        "rpx",
        "rpy",
        "rpz",
        "lpx",
        "lpy",
        "lpz",
        "rf0x",
        "rf0y",
        "rf0z",
        "rf1x",
        "rf1y",
        "rf1z",
        "rf2x",
        "rf2y",
        "rf2z",
        "rf3x",
        "rf3y",
        "rf3z",
        "rf4x",
        "rf4y",
        "rf4z",
        "lf0x",
        "lf0y",
        "lf0z",
        "lf1x",
        "lf1y",
        "lf1z",
        "lf2x",
        "lf2y",
        "lf2z",
        "lf3x",
        "lf3y",
        "lf3z",
        "lf4x",
        "lf4y",
        "lf4z",
    ]


# Normalization

In [None]:
import numpy as np


def normalize(x: np.ndarray) -> np.ndarray:
    try:
        return (x - np.min(x)) / (np.max(x) - np.min(x))
    except ZeroDivisionError:
        return x


# Low-Pass Filter

In [None]:
import numpy as np
from scipy.signal import butter, lfilter


class LowPassFilter(object):
    def butter_lowpass(
        cutoff: int,
        fs: int,
        order: int
    ):
        nyq = 0.5 * fs
        normal_cutoff = cutoff / nyq
        b, a = butter(order, normal_cutoff, btype="low", analog=False)
        return b, a

    def apply(
        data: np.ndarray,
        cutoff: int = 6,
        fs: int = 100,
        order: int = 2
    ):
        b, a = LowPassFilter.butter_lowpass(cutoff, fs, order)
        y = lfilter(b, a, data)
        return y


# Spatical Projection

In [2]:
import os
import cv2
import config
import numpy as np
import pandas as pd

from random import randint
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
from matplotlib.ticker import NullLocator
from matplotlib.figure import Figure


class SpatialProjection():
    def __init__(
        self,
        img_dir: str,
        img_len: int,
        polyfit_degree: int = 0
    ):
        self.img_dir = img_dir
        self.img_len = img_len
        self.polyfit_degree = polyfit_degree

    @staticmethod
    def __write_image(
        img: np.ndarray,
        write_dir: str,
        plane: str,
        name: str
    ):
        path = os.path.join(write_dir, plane)
        if not os.path.exists(path):
            os.makedirs(path)
        cv2.imwrite(os.path.join(path, name + ".jpg"), img)

    def __get_preocessed_data(
        self,
        data: pd.Series
    ) -> np.ndarray:
        processed_data = data.to_numpy().ravel()

        if self.polyfit_degree == 0:
            # ... First few (10) datapoints contains filter artifacts
            processed_data = LowPassFilter.apply(processed_data)[10:]
        else:
            t = np.linspace(0, 1, processed_data.shape[0])
            f = np.poly1d(np.polyfit(t, processed_data, self.polyfit_degree))
            processed_data = f(t)

        return processed_data

    def __generate_projection_image(
        self,
        x: np.ndarray,
        y: np.ndarray
    ) -> np.ndarray:
        img_len_inch = self.img_len / 100.0  # 100 is the Figure DPI value
        fig = Figure(figsize=(img_len_inch, img_len_inch))
        width, height = fig.get_size_inches() * fig.get_dpi()

        canvas = FigureCanvas(fig)
        ax = fig.gca()
        ax.plot(x, y, "-k", linewidth=2)
        ax.axis("off")
        ax.xaxis.set_major_locator(NullLocator())
        ax.yaxis.set_major_locator(NullLocator())
        # fig.tight_layout()
        canvas.draw()

        image = np.frombuffer(canvas.tostring_rgb(), dtype="uint8")
        return image.reshape(int(height), int(width), 3)

    def get_projection_images(
        self,
        data: pd.DataFrame,
        subject: str,
        gesture: str,
        write_image: bool = False
    ) -> list[np.ndarray]:
        landmark = data.columns[0][:-1]
        name = str(randint(100000, 999999))
        write_dir = os.path.join(self.img_dir, subject, gesture, landmark)

        x = self.__get_preocessed_data(data.filter(regex="x"))
        y = self.__get_preocessed_data(data.filter(regex="y"))
        z = self.__get_preocessed_data(data.filter(regex="z"))

        img_xy = self.__generate_projection_image(x, y)
        img_yz = self.__generate_projection_image(y, z)
        img_zx = self.__generate_projection_image(z, x)

        if write_image == True:
            self.__write_image(img_xy, write_dir, "xy", name)
            self.__write_image(img_yz, write_dir, "yz", name)
            self.__write_image(img_zx, write_dir, "zx", name)

        return [img_xy, img_yz, img_zx]


'ahaha'

# Setup

In [None]:
augmentation_levels = [0, 7, 9, 11, 13]
sp_augment = [
    SpatialProjection(
    img_dir="images/",
    img_len=math.floor(config.IMG_LEN / 3),
    polyfit_degree=degree
)
    for degree in augmentation_levels ]



In [None]:
X_train = joblib.load("X_train.joblib")
y_train = joblib.load("y_train.joblib")
X_test = joblib.load("X_test.joblib")
y_test = joblib.load("y_test.joblib")

print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)

# Pretrained Nets

In [None]:
# preprocess_input = tf.keras.applications.mobilenet_v2.preprocess_input
# base_model = tf.keras.applications.MobileNetV2(
#     input_shape=(config.IMG_LEN, config.IMG_LEN, 3),
#     include_top=False,
#     weights="imagenet"
# )
# base_model.trainable = True
# global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
# prediction_layer = tf.keras.layers.Dense(len(config.GESTURES))

# inputs = tf.keras.Input(shape=(config.IMG_LEN, config.IMG_LEN, 3))
# x = preprocess_input(inputs)
# x = base_model(x, training=True)
# x = global_average_layer(x)
# x = tf.keras.layers.Dropout(0.6)(x)
# outputs = prediction_layer(x)
# model = tf.keras.Model(inputs, outputs)

# ConvNet

In [None]:
model = tf.keras.models.Sequential([
    tf.keras.layers.Rescaling(
        1/255.0,
        input_shape=(config.IMG_LEN, config.IMG_LEN, config.N_CHANNELS)
    ),
    tf.keras.layers.Conv2D(16, (3, 3), activation="relu"),
    tf.keras.layers.MaxPool2D((2, 2)),
    tf.keras.layers.Conv2D(16, (3, 3), activation="relu"),
    tf.keras.layers.MaxPool2D((2, 2)),
    tf.keras.layers.Conv2D(32, (3, 3), activation="relu"),
    tf.keras.layers.MaxPool2D((2, 2)),
    tf.keras.layers.Conv2D(32, (3, 3), activation="relu"),
    tf.keras.layers.MaxPool2D((2, 2)),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation="relu"),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(len(config.GESTURES))
])


# Training

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=config.LEARNING_RATE),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy']
)

callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor="val_loss",
        patience=30,
        restore_best_weights=True
    )
]

history = model.fit(
    x=X_train,
    y=y_train,
    validation_data=(X_test, y_test),
    batch_size=32,
    epochs=700,
    verbose=1,
    callbacks=callbacks
)

In [None]:
import matplotlib.pyplot as plt

plt.rcParams.update({
    # "text.usetex": True,
    "font.family": "serif",
    # "font.serif": ["Computer Modern Roman"],
    "font.size": 22,
    "text.color": "#212121",
    "axes.edgecolor": "#212121",
    "xtick.color": "#212121",
    "ytick.color": "#212121",
    "axes.labelcolor": "#212121",
    'legend.frameon': False,
})

fig = plt.figure(figsize=(8, 6))
ax = fig.gca()
ax.plot(history.history["loss"], "-", color="#212121", label="Train Loss")
ax.plot(history.history["val_loss"], "--",
        color="#212121", label="Validation Loss")
ax.set_xlabel("Epoch")
ax.set_ylabel("Loss")
ax.spines['right'].set_visible(False)
ax.spines['top'].set_visible(False)
plt.title("Learning Curves")
plt.legend()
plt.tight_layout()
plt.savefig("../assets/lc.png")
plt.show()


# Evaluation

In [None]:
model.evaluate(X_test, y_test)

In [None]:
from sklearn.metrics import classification_report

y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred))

In [None]:
tf.keras.backend.clear_session()