# 1. Imports and Dependencies

In [None]:
!pip install tensorflow opencv-python mediapipe scikit-learn matplotlib

In [None]:
!pip uninstall mediapipe

In [None]:
!pip install mediapipe

In [None]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp

# 2. Keypoints using MP Hands

In [None]:
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
gestures = ['j', 'x', 'z']

In [None]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

In [None]:
def draw_hand_landmarks(image, results):
    for landmarks in results.multi_hand_landmarks:
        mp_drawing.draw_landmarks(image, landmarks, mp_hands.HAND_CONNECTIONS)

In [None]:
def draw_styled_landmarks(image, results):
    for landmarks in results.multi_hand_landmarks:
        mp_drawing.draw_landmarks(image, landmarks, mp_hands.HAND_CONNECTIONS,
                                  mp_drawing.DrawingSpec(color=(121, 22, 76), thickness=2, circle_radius=4),
                                  mp_drawing.DrawingSpec(color=(121, 44, 250), thickness=2, circle_radius=2))

In [None]:
def extract_keypoints(results):
    # Initialize empty arrays for left and right hand landmarks
    lh_keypoints = []
    rh_keypoints = []

    # Check for left hand landmarks
    if results.multi_hand_landmarks:
        for landmarks in results.multi_hand_landmarks:
            if landmarks.hand_type == mp.solutions.hands.HandType.LEFT:
                lh_keypoints.extend([landmark.x, landmark.y, landmark.z] for landmark in landmarks.landmark)
            elif landmarks.hand_type == mp.solutions.hands.HandType.RIGHT:
                rh_keypoints.extend([landmark.x, landmark.y, landmark.z] for landmark in landmarks.landmark)

    # Flatten and concatenate the left and right hand landmarks
    lh = np.array(lh_keypoints).flatten() if lh_keypoints else np.zeros(21 * 3)
    rh = np.array(rh_keypoints).flatten() if rh_keypoints else np.zeros(21 * 3)

    return np.concatenate([lh, rh])


# 3. Data Acquisition

In [None]:
gestures = ['j', 'x', 'z']

In [None]:
import os
import cv2
import csv
import mediapipe as mp
import numpy as np
import subprocess

# Set the root directory where the video folders are located
root_data_dir = 'Videos_dynamic'
target_dir = 'My_Data'
gestures = ['j', 'x', 'z']
video_numbers = [1, 2, 3]
sequence_length = 60  # Number of frames to capture

# get the indices of the frames that should be captured from the video clip
def get_frame_indices(total_frames, desired_frames):
    if total_frames >= desired_frames:
        return np.round(np.linspace(0, total_frames - 1, desired_frames)).astype(int)
    else:
        # Repeat some frames if there are not enough frames in the video
        repeat_factor = np.ceil(desired_frames / total_frames)
        indices = np.arange(total_frames)
        return np.tile(indices, int(repeat_factor))[:desired_frames]

# Set mediapipe model to Hands
with mp.solutions.hands.Hands(min_detection_confidence=0.5, min_tracking_confidence=0.5) as hands:
    
    for person in range(1, 7):
        person_dir = os.path.join(root_data_dir, f'Person{person}')

        for action in gestures:
            # read in the video files
            for video_number in video_numbers:
                video_file = os.path.join(person_dir, f'{action} ({video_number}).mp4')
                
                # Open the video file
                cap = cv2.VideoCapture(video_file)
                if not cap.isOpened():
                    print(f"Error opening video file: {video_file}")
                    continue

                # The same number of frames should be captured from each video, regardless of length. Sample frames evenly if video is too long
                total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
                frame_indices = get_frame_indices(total_frames, sequence_length)

                # Define CSV file path
                csv_dir = os.path.join(target_dir, action)
                os.makedirs(csv_dir, exist_ok=True)
                csv_path = os.path.join(csv_dir, f'Person{person}_{action}_{video_number}.csv')

                # Write the landmark data for each frame into a new row of a CSV file
                with open(csv_path, mode='w', newline='') as file:
                    writer = csv.writer(file)

                    for frame_idx in frame_indices:
                        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
                        ret, frame = cap.read()
                        if not ret:
                            print(f"Error reading frame at index {frame_idx}. Using the last successful frame.")
                            continue
        
                        # Make detections using Hands
                        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                        results = hands.process(image)
    
                        # Draw landmarks
                        if results.multi_hand_landmarks:
                            for landmarks in results.multi_hand_landmarks:
                                mp.solutions.drawing_utils.draw_landmarks(image, landmarks, mp.solutions.hands.HAND_CONNECTIONS)

                        # Export keypoints to CSV
                        if results.multi_hand_landmarks:
                            keypoints = [landmark for landmarks in results.multi_hand_landmarks for landmark in landmarks.landmark]
                            row = [f'{action}/'] + [val for lm in keypoints for val in (lm.x, lm.y, lm.z)]
                        else:
                            row = [f'{action}/'] + [0] * 63  # Assuming 21 landmarks * 3 coordinates each
    
                        writer.writerow(row)
                        #print(f'Saved frame {frame_idx} to: {csv_path}')  # Debugging print
    
                        # graceful break if press 'q'
                        if cv2.waitKey(10) & 0xFF == ord('q'):
                            break
                
                # Check the length of the CSV file to verify it is correct (60)
                with open(csv_path, 'r') as check_file:
                    row_count = sum(1 for row in csv.reader(check_file))
                    print(f'CSV file {csv_path} has {row_count} rows.')

In [None]:
cap.release()
cv2.destroyAllWindows()

# 4. Data Preprocessing

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [None]:
label_map = {label:num for num, label in enumerate(gestures)}

In [None]:
## DATA AUGMENTATION

import pandas as pd
import numpy as np
import os
import math

def scale_data(data, scale_factor):
    # Convert data to a NumPy float array and apply scaling
    data_array = np.array(data, dtype=float)
    return data_array * scale_factor

def rotate_data(data, angle, axis='z'):
    # Rotation matrices for different axes
    if axis == 'x':
        rotation_matrix = np.array([[1, 0, 0], 
                                    [0, np.cos(angle), -np.sin(angle)],
                                    [0, np.sin(angle), np.cos(angle)]])
    elif axis == 'y':
        rotation_matrix = np.array([[np.cos(angle), 0, np.sin(angle)], 
                                    [0, 1, 0],
                                    [-np.sin(angle), 0, np.cos(angle)]])
    elif axis == 'z':
        rotation_matrix = np.array([[np.cos(angle), -np.sin(angle), 0], 
                                    [np.sin(angle), np.cos(angle), 0],
                                    [0, 0, 1]])
    else:
        raise ValueError("Invalid rotation axis")

    # Rotate each point
    data_rotated = np.dot(data, rotation_matrix)
    return data_rotated

def process_file(file_path, scale_factor, rotation_angle, rotation_axis):
    # Load data (only the first 64 columns)
    data = pd.read_csv(file_path, usecols=range(64))

    # Separate the first column (string labels) and the rest of the data
    labels = data.iloc[:, 0]
    numeric_data = data.iloc[:, 1:]

    # Apply scaling and rotation to each frame
    data_scaled_rotated = []
    for index, row in numeric_data.iterrows():
        frame = row.values.reshape(21, 3)  # 21 landmarks, each with x, y, z
        frame_scaled = scale_data(frame, scale_factor)
        frame_rotated = rotate_data(frame_scaled, rotation_angle, rotation_axis)
        data_scaled_rotated.append(frame_rotated.flatten())

    # Convert the list back to DataFrame and add the labels column
    augmented_data = pd.DataFrame(data_scaled_rotated, columns=numeric_data.columns)
    augmented_data.insert(0, data.columns[0], labels)

    return augmented_data

# Define parameters for augmentation
scale_factors = [0.9, 1.1]  # Different scaling factors - for each video, make a copy a bit smaller, and a copy a bit larger
rotation_angles = [math.radians(10), math.radians(20)]  # Rotation angles in radians
rotation_axes = ['x', 'y', 'z']  # Axes of rotation

for gesture in gestures:
    directory = DATA_PATH + gesture
    
    # Process each file with each combination of scaling and rotation
    for filename in os.listdir(directory):
        if filename.startswith('Person') and filename.endswith('.csv'):
            file_path = os.path.join(directory, filename)
            for scale_factor in scale_factors:
                for rotation_angle in rotation_angles:
                    for axis in rotation_axes:
                        augmented_data = process_file(file_path, scale_factor, rotation_angle, axis)
                        
                        # Save the augmented data with a descriptive filename
                        new_filename = f"augmented_scale_{scale_factor}_rot_{math.degrees(rotation_angle)}_axis_{axis}_{filename}"
                        augmented_data.to_csv(os.path.join(directory, new_filename), index=False)


In [None]:
## CODE FOR READING IN LANDMARK VALUES FROM CSV FILE

import os
import csv
import numpy as np
import math

sequences, labels = [], []
num_columns = 63  # Number of columns to read from each row
target_dir = DATA_PATH

scale_factors = [0.9, 1.1]  # Different scaling factors
rotation_angles = [math.radians(10), math.radians(20)]  # Rotation angles in radians
rotation_axes = ['x', 'y', 'z']  # Axes of rotation

for person in range(1, 7):
    # Original files
    for action in gestures:
        for video_number in range(1, 4):
            csv_path = os.path.join(target_dir, action, f'Person{person}_{action}_{video_number}.csv')

            if os.path.exists(csv_path):
                window = []

                with open(csv_path, 'r') as file:
                    reader = csv.reader(file)
                    for row in reader:
                        # Read only the first 21 columns (excluding the label in the first column)
                        landmarks = [float(coordinate) for coordinate in row[1:num_columns + 1]]
                        window.append(landmarks)

                sequences.append(window)
                labels.append(label_map[action])

    # Augmented files
    for action in gestures:
        for video_number in range(1, 4):
            for scale_factor in scale_factors:
                for rotation_angle in rotation_angles:
                    for axis in rotation_axes:
                        new_filename = f"augmented_scale_{scale_factor}_rot_{math.degrees(rotation_angle)}_axis_{axis}_{filename}"
            
                        csv_path = os.path.join(target_dir, action, new_filename)
            
                        if os.path.exists(csv_path):
                            window = []
            
                            with open(csv_path, 'r') as file:
                                reader = csv.reader(file)
                                for row in reader:
                                    landmarks = [float(coordinate) for coordinate in row[1:num_columns + 1]]
                                    window.append(landmarks)
            
                            sequences.append(window)
                            labels.append(label_map[action])
    

In [None]:
## Some testing statements for debugging purposes, to make sure the shapes of the collected data are correct

In [None]:
np.array(sequences).shape

In [None]:
np.array(labels).shape

In [None]:
X = np.array(sequences)

In [None]:
X.shape

In [None]:
y = to_categorical(labels).astype(int)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

In [None]:
y_test.shape

# 5. Build and Train LSTM Neural Network

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

In [None]:
# Define the model architecture
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(60,63)))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(len(gestures), activation='softmax'))

In [None]:
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [None]:
model.fit(X_train, y_train, epochs=2000, callbacks=[tb_callback])

In [None]:
model.summary()

In [None]:
# If using TensorFlow with Keras
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"Final Test Accuracy: {test_accuracy * 100:.2f}%")

In [None]:
model.save('lstm_model.h5')

# 6. Compression with Pruning

In [None]:
# This code has been derived from the TensorFlow Keras documentation: https://www.tensorflow.org/model_optimization/guide/pruning/pruning_with_keras

In [None]:
pip install tensorflow-model-optimization

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow_model_optimization.sparsity import keras as sparsity

# Specify the pruning parameters
pruning_params = {
    'pruning_schedule': sparsity.PolynomialDecay(initial_sparsity=0.0,
                                                 final_sparsity=0.5,
                                                 begin_step=0,
                                                 end_step=1000)
}

# Wrap the model with pruning
model_for_pruning = sparsity.prune_low_magnitude(model, **pruning_params)

# Compile the pruned model
model_for_pruning.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Add the UpdatePruningStep callback
callbacks = [
    sparsity.UpdatePruningStep()
]

# Train the model
# Replace 'X_train', 'y_train' with your training data
model_for_pruning.fit(X_train, y_train, epochs=1000, callbacks=callbacks)

test_loss, test_accuracy = model_for_pruning.evaluate(X_test, y_test)

print(f"Accuracy of the pruned model: {test_accuracy*100:.2f}%")



In [None]:
test_loss, model_for_pruning_accuracy = model_for_pruning.evaluate(X_test, y_test)

print(f"Accuracy of the pruned model: {test_accuracy*100:.2f}%")

In [None]:
# Save the pruned model
import tensorflow_model_optimization as tfmot
import tempfile

model_for_export = tfmot.sparsity.keras.strip_pruning(model_for_pruning)

pruned_keras_file = 'pruned_model_098.h5'
tf.keras.models.save_model(model_for_export, pruned_keras_file, include_optimizer=False)
print('Saved pruned Keras model to:', pruned_keras_file)

In [None]:
# Convert the pruned model to TFLite format and save it
converter = tf.lite.TFLiteConverter.from_keras_model(model_for_export)

# It's necessary to specify the supported ops due to the nature of the LSTM network
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS, 
                                       tf.lite.OpsSet.SELECT_TF_OPS]
converter._experimental_lower_tensor_list_ops = False
pruned_tflite_model = converter.convert()

pruned_tflite_file = 'pruned_model_tflite_098.h5'

with open(pruned_tflite_file, 'wb') as f:
  f.write(pruned_tflite_model)

print('Saved pruned TFLite model to:', pruned_tflite_file)

In [None]:
def get_gzipped_model_size(file):
  # Returns size of gzipped model, in bytes.
  import os
  import zipfile

  _, zipped_file = tempfile.mkstemp('.zip')
  with zipfile.ZipFile(zipped_file, 'w', compression=zipfile.ZIP_DEFLATED) as f:
    f.write(file)

  return os.path.getsize(zipped_file)

In [None]:
print("Size of gzipped baseline Keras model: %.2f bytes" % (get_gzipped_model_size('final_lstm_model.h5')))
print("Size of gzipped pruned Keras model: %.2f bytes" % (get_gzipped_model_size(pruned_keras_file)))
print("Size of gzipped pruned TFlite model: %.2f bytes" % (get_gzipped_model_size(pruned_tflite_file)))

# 7. Compression with Post-Pruning Quantization

In [None]:
converter = tf.lite.TFLiteConverter.from_keras_model(model_for_export)
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS, 
                                       tf.lite.OpsSet.SELECT_TF_OPS]
converter._experimental_lower_tensor_list_ops = False
converter.optimizations = [tf.lite.Optimize.DEFAULT]
quantized_and_pruned_tflite_model = converter.convert()

quantized_and_pruned_tflite_file = 'quantized_and_pruned_tflite_model.tflite'

with open(quantized_and_pruned_tflite_file, 'wb') as f:
  f.write(quantized_and_pruned_tflite_model)

print('Saved quantized and pruned TFLite model to:', quantized_and_pruned_tflite_file)

print("Size of gzipped baseline Keras model: %.2f bytes" % (get_gzipped_model_size('final_lstm_model.h5')))
print("Size of gzipped pruned and quantized TFlite model: %.2f bytes" % (get_gzipped_model_size(quantized_and_pruned_tflite_file)))

In [None]:
import numpy as np

def evaluate_model(interpreter):
  input_index = interpreter.get_input_details()[0]["index"]
  output_index = interpreter.get_output_details()[0]["index"]

  # Run predictions on ever y image in the "test" dataset.
  prediction_digits = []
  for i, test_image in enumerate(X_test):
    if i % 1000 == 0:
      print('Evaluated on {n} results so far.'.format(n=i))
    # Pre-processing: add batch dimension and convert to float32 to match with
    # the model's input data format.
    test_image = np.expand_dims(test_image, axis=0).astype(np.float32)
    interpreter.set_tensor(input_index, test_image)

    # Run inference.
    interpreter.invoke()

    # Post-processing: remove batch dimension and find the digit with highest
    # probability.
    output = interpreter.tensor(output_index)
    digit = np.argmax(output()[0])
    prediction_digits.append(digit)

  print('\n')
  # Compare prediction results with ground truth labels to calculate accuracy.
  prediction_digits = np.array(prediction_digits)
  y_test_indices = np.argmax(y_test, axis=1)
  accuracy = (prediction_digits == y_test_indices).mean()
  return accuracy

In [None]:
interpreter = tf.lite.Interpreter(model_content=quantized_and_pruned_tflite_model)
interpreter.allocate_tensors()

test_accuracy = evaluate_model(interpreter)

print('Pruned and quantized TFLite test_accuracy:', test_accuracy)
print('Pruned TF test accuracy:', model_for_pruning_accuracy)

# 9. Compression with Knowledge Distillation

In [None]:
import tensorflow as tf

student_model = Sequential()
student_model.add(LSTM(32, return_sequences=True, activation='relu', input_shape=(60,63)))
student_model.add(LSTM(32, return_sequences=False, activation='relu'))
student_model.add(Dense(16, activation='relu'))
student_model.add(Dense(len(gestures), activation='softmax'))

# Compile the student model
student_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Distillation parameters
temperature = 10.0

def distillation_loss(y_true, y_pred, teacher_logits):
    student_probs = tf.nn.softmax(y_pred / temperature)
    teacher_probs = tf.nn.softmax(teacher_logits / temperature)
    
    return tf.reduce_mean(tf.keras.losses.categorical_crossentropy(teacher_probs, student_probs))

train_loss_metric = tf.keras.metrics.Mean(name='train_loss')
train_accuracy_metric = tf.keras.metrics.CategoricalAccuracy(name='train_accuracy')

train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
train_dataset = train_dataset.batch(32)  # Set your batch size

optimizer = tf.keras.optimizers.Adam()

for epoch in range(20):
    for x_batch, y_batch in train_dataset:
        # Get teacher model's logits
        teacher_logits = model.predict(x_batch)

        with tf.GradientTape() as tape:
            student_logits = student_model(x_batch, training=True)
            
            # Compute the loss
            loss = distillation_loss(y_batch, student_logits, teacher_logits)
        
        gradients = tape.gradient(loss, student_model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, student_model.trainable_variables))

        train_loss_metric.update_state(loss)
        train_accuracy_metric.update_state(y_batch, student_logits)

    # Print the metrics at the end of each epoch
    print(f"Epoch {epoch+1}, Loss: {train_loss_metric.result():.4f}, Accuracy: {train_accuracy_metric.result() * 100:.2f}%")

test_loss, test_accuracy = student_model.evaluate(X_test, y_test)

student_model.save('student_model.h5')

# Print the final accuracy
print(f'Final Accuracy of the Student Model: {test_accuracy * 100:.2f}%')

# 10. Test Sample Videos 

In [None]:
# Code for testing pre-recorded samples from the webcam

import cv2
import mediapipe as mp
import numpy as np
import subprocess

# Initialize MediaPipe hand pose model
mp_hands = mp.solutions.hands.Hands(min_detection_confidence=0.5, min_tracking_confidence=0.5)

# Function to convert .mov files to .mp4 using FFmpeg
def convert_mov_to_mp4(input_file, output_file):
    # Check if the output file already exists, and if so, skip conversion
    if os.path.exists(output_file):
        print(f'{output_file} already exists. Skipping conversion.')
        return
    
    ffmpeg_path = r'C:\Program Files\ffmpeg-6.1.1-essentials_build\bin\ffmpeg.exe'
    command = [ffmpeg_path, '-i', input_file, '-q:v', '0', '-pix_fmt', 'yuv420p', output_file]
    
    try:
        subprocess.check_call(command)
        print(f'Successfully converted: {input_file} to {output_file}')
    except subprocess.CalledProcessError as e:
        print(f'Error converting: {input_file} to {output_file}')
        print(e)
    except Exception as e:
        print(f'An error occurred: {e}')

# Path to the test video file
convert_mov_to_mp4('test_videos/j7.mov', 'test_videos/j7.mp4')
test_video_path = 'test_videos/j7.mp4'

# Function to process a video frame and return hand landmarks
def process_frame(frame, hands_model):
    image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = hands_model.process(image)
    if results.multi_hand_landmarks:
        keypoints = [landmark for landmarks in results.multi_hand_landmarks for landmark in landmarks.landmark]
        landmarks = [val for lm in keypoints for val in (lm.x, lm.y, lm.z)]
        # Ensure landmarks length is 63
        if len(landmarks) == 63:
            return landmarks 
        else:
            print("zero")
            return [0] * 63
    else:
        return [0] * 63


# Open the test video file
cap = cv2.VideoCapture(test_video_path)
if not cap.isOpened():
    print(f"Error opening video file: {test_video_path}")

sequence_length = 60  # Same as your training data
landmarks_list = []

while cap.isOpened():
    ret, frame = cap.read()
    if not ret or len(landmarks_list) >= sequence_length:
        break

    landmarks = process_frame(frame, mp_hands)
    landmarks_list.append(landmarks)

cap.release()

# Ensure the sequence is of the correct length
if len(landmarks_list) < sequence_length:
    # Repeat some frames if there are not enough frames in the video
    repeat_factor = np.ceil(sequence_length / len(landmarks_list))
    landmarks_list = (landmarks_list * int(repeat_factor))[:sequence_length]

# Format the collected landmarks for model prediction
test_data = np.array(landmarks_list)
test_data = test_data.reshape(1, sequence_length, 63)  # Reshape as needed for your model

prediction = model.predict(test_data)

predicted_index = np.argmax(prediction)

predicted_gesture = gestures[predicted_index]

print("Predicted Gesture:", predicted_gesture)
