# Import

In [None]:
%cd /content/drive/MyDrive/Action2

In [None]:
!pip install opencv-python mediapipe 

In [None]:
import cv2
import mediapipe as mp
import numpy as np
import matplotlib.pyplot as plt
import os
from tensorflow import keras
from sklearn.model_selection import train_test_split #use this to split the data 
from tensorflow.keras.callbacks import EarlyStopping #use this to abort the training process early
from tensorflow.keras import layers
from tensorflow.keras import backend as K
from sklearn.metrics import classification_report
import mlflow.tensorflow
mlflow.tensorflow.autolog()
#do not run the below line if you only have one GPU
#os.environ["CUDA_VISIBLE_DEVICES"]="1" # use second GPU

In [None]:
# What version of Python do you have?
import sys
import tensorflow.keras
import pandas as pd
import sklearn as sk
import tensorflow as tf
import platform

print(f"Python Platform: {platform.platform()}")
print(f"Tensor Flow Version: {tf.__version__}")
print(f"Keras Version: {tensorflow.keras.__version__}")
print()
print(f"Python {sys.version}")
print(f"Pandas {pd.__version__}")
print(f"Scikit-Learn {sk.__version__}")
gpu = len(tf.config.list_physical_devices('GPU'))>0
print("GPU is", "available" if gpu else "NOT AVAILABLE")

# Initialize mediapipe 

In [None]:
mpHands = mp.solutions.hands #initiate the mpHands object with mediapipe

hands = mpHands.Hands(static_image_mode=False, #set prediction parameters 
                      max_num_hands=2,
                      min_detection_confidence=0.5,
                      min_tracking_confidence=0.5)
mpDraw = mp.solutions.drawing_utils #initiate the drawing object vor visualization

#we specifically use the Hands part of the mediapipe detection pipeline, since we do not need face pose etc.
#the output of mpHands.Hands is a dictionary consisting of 0 and 1, meaning the left hand and the right hand in order.
#each hand consists of 21 keypoints, each containing 3 pixels (x,y,z) values.
#the values are normalized according to the image width and height by default.

#this function flattens all the data to an array for left and right hand and joins them together.
#since our model will run based on two hands, we have to make sure we somehow fill all of the 126 features
#our model requires.
#to do this, we extract the first and last sub dictionary of the mediapipe output.
#by doing this, if only one hand is present, the missing hand data will be duplicated as the present hand data.
#the model will throw a shape mismatch error if all of the features are not present. 
def extract_keypoints(results):   
    lh = np.array([[res.x, res.y, res.z] for res in results.multi_hand_landmarks[0].landmark]).flatten() if results.multi_hand_landmarks else np.zeros(21*3)   
    rh = np.array([[res.x, res.y, res.z] for res in results.multi_hand_landmarks[-1].landmark]).flatten() if results.multi_hand_landmarks else np.zeros(21*3)   
    
    return np.concatenate([lh, rh])


DATA_PATH = os.path.join('HANDS_DATA') #path for collecting data
actions = np.array(['open', 'select' , 'close' , 'confirm' , 'reject' , #list of actions or classes to be predicted
                    'increasevol' , 'decreasevol' , 'next' , 'previous'])

n_classes = len(actions) #how many classes are there

Initialize precision and recall functions

In [None]:
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

# Import the dataset 

In [None]:
#use this for augmented data
X = np.load('dataset/ege180_kubi60_ceren30_X.npy')
y = np.load('dataset/ege180_kubi60_ceren30_y.npy')
print(X.shape)
print(y.shape)

In [None]:
#split our dataset to test and train with the ratio test_size.
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
print("Shape of whole sequences as giant array: {}".format(np.array(X).shape))
print("Shape of X_train: {}".format(X_train.shape))
print("Shape of X_test: {}".format(X_test.shape))
print("Shape of y_train: {}".format(y_train.shape))
print("Shape of y_test: {}".format(y_test.shape))

In [None]:
#split our dataset to test and train with the ratio test_size.
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
print("Shape of whole sequences as giant array: {}".format(np.array(X).shape))
print("Shape of X_train: {}".format(X_train.shape))
print("Shape of X_test: {}".format(X_test.shape))
print("Shape of y_train: {}".format(y_train.shape))
print("Shape of y_test: {}".format(y_test.shape))


#augment the training set
iterations = 10
mu, sigma = 0, 0.1
X_new = np.zeros((X_train.shape[0],X_train.shape[1],X_train.shape[2]))
y_new = np.zeros((y_train.shape[0],y_train.shape[1]))

for i in range(iterations):
    np.random.seed(i)
    noise = np.random.normal(mu, sigma, [X_train.shape[0],X_train.shape[1],X_train.shape[2]])
    Xt = np.multiply(noise,X_train) + X_train
    X_new = np.concatenate([X_new, Xt], axis = 0)
    print("Done with the {}th iteration".format(i))  
X_new = X_new[X_train.shape[0]:]    

for i in range(iterations):
    y_new = np.concatenate([y_train, y_new], 0)

y_new = y_new[~np.all(y_new == 0, axis=1)] #remove rows that only contain 0's

X_train = X_new
y_train = y_new

#augment the test set (this is more agressive)
mu, sigma = 0, 0.105
X_new = np.zeros((X_test.shape[0],X_test.shape[1],X_test.shape[2]))
y_new = np.zeros((y_test.shape[0],y_test.shape[1]))

for i in range(iterations):
    np.random.seed(i)
    noise = np.random.normal(mu, sigma, [X_test.shape[0],X_test.shape[1],X_test.shape[2]])
    Xt = np.multiply(noise,X_test) + X_test
    X_new = np.concatenate([X_new, Xt], axis = 0)
    print("Done with the {}th iteration".format(i))  
X_new = X_new[X_test.shape[0]:]    

for i in range(iterations):
    y_new = np.concatenate([y_test, y_new], 0)

y_new = y_new[~np.all(y_new == 0, axis=1)] #remove rows that only contain 0's

X_test = X_new
y_test = y_new

# Training

In [None]:
print(X_train.shape)
print(X_test.shape)

In [None]:
#This is the main transformer block
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Normalization and Attention
    x = layers.LayerNormalization(epsilon=1e-6)(inputs) #layer normalization is applied to the input to ease training
    x = layers.MultiHeadAttention(    #This is the attention layer. You can experiment with different parameters 
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(x, x)
    x = layers.Dropout(dropout)(x) #as a result, apply dropout 
    res = x + inputs #residual connection

    # Feed Forward Part
    x = layers.LayerNormalization(epsilon=1e-6)(res) #feed the result to layer normalization
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(x) #use 1d convolution with kernel size 1 to preserve original sequence length
    x = layers.Dropout(dropout)(x) #apply dropout
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x) #we need a filter size of the feature count so we can calculate weights
    return x + res


def build_model( #construct the model 
    input_shape,
    head_size, #head size for the attention block
    num_heads,#number of heads for the attention block
    ff_dim, #feed forward dimensions
    num_transformer_blocks, #specifies how many times the transformer block is repeated
    mlp_units, #multilayer perceptron units
    dropout=0, #by default
    mlp_dropout=0, #by default
):
    inputs = keras.Input(shape=input_shape) #take the input
    x = inputs   
    
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout) #feed the input to the transformer block
    
    
    #apply global average pooling for classification
    #be aware of the data format argument. If channels_last is specified, the dataset should be as
    # (batch size,sequence length,number of features)
    # if specified as channels first, sequence length and number feature dimensions are swapped.
    x = layers.GlobalAveragePooling1D(data_format="channels_last")(x) 
    
    
    
    for dim in mlp_units: #create the dense layer
        x = layers.Dense(dim, activation="relu")(x)
        x = layers.Dropout(mlp_dropout)(x)
    outputs = layers.Dense(n_classes, activation="softmax")(x) #apply softmax for probability distribution
    return keras.Model(inputs, outputs)

input_shape = X_train.shape[1:] #take the last two arguments of the input shape which are 30,126

model = build_model( #these are all hyperparameters for the model complexity. General rule of thumb
    #would be two start as low as possible for all of them except the dropouts and gradually increase them until overfitting
    input_shape,
    head_size=128,
    num_heads=4,
    ff_dim=32,
    num_transformer_blocks=1,
    mlp_units=[16], #32
    mlp_dropout=0, #0.25
    dropout=0, #0.25
)

model.compile( #use adam optimizer
    loss="categorical_crossentropy",
    optimizer=keras.optimizers.Adam(learning_rate=5e-5), #learning rate is also a hyperparameter
    metrics=["categorical_accuracy"],
)
model.summary() #take a look at the model structure


In [None]:
es = EarlyStopping(monitor='loss', verbose=1, patience=5000) #if we do not see improvement on test accuracy for 10 epochs, stop training
#start training. we assign this training to a variable so we can log the training graphs.
#if you quit training early with ctrl+c, the variable is lost and you cannot plot the graphs.
#history = model.fit(X_train, y_train, validation_split = 0.3 , epochs=200, verbose = 1, callbacks = [es])
history = model.fit(X_train, y_train, validation_split = 0.2, epochs=100, verbose = 1, callbacks = [es])

print(history.history.keys()) #see what can we plot

# Plotting training performance

In [None]:
# summarize history for accuracy
plt.plot(history.history['categorical_accuracy'])
plt.plot(history.history['val_categorical_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

# Test the model performance

In [None]:
model.load_weights('models/11EylulTransformer.h5')

In [None]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
y_pred = model.predict(X_test) #Test the model with our test set, since we have used cross validation

cm = confusion_matrix(y_test.argmax(axis=1), y_pred.argmax(axis=1))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=actions)
fig, ax = plt.subplots(figsize=(10,10))
disp.plot(ax=ax)

In [None]:
scores = model.evaluate(X_test, y_test, verbose=0)
print(scores)

In [None]:
rounded_labels=np.argmax(y_test, axis=1)
rounded_labels[1]

y_pred = model.predict(X_test, batch_size=64, verbose=1)
y_pred_bool = np.argmax(y_pred, axis=1)

print(classification_report(rounded_labels, y_pred_bool))

In [None]:
!mlflow ui

# Save the model weights

In [None]:
model.save('models/9KasimTransformer_egekubi.h5') 

# Detect in real time

In [None]:
import cv2
import mediapipe as mp
import numpy as np
import os
from tensorflow import keras
from tensorflow.keras import layers

mpHands = mp.solutions.hands #initiate the mpHands object with mediapipe
hands = mpHands.Hands(static_image_mode=False, #set prediction parameters 
                      max_num_hands=2,
                      min_detection_confidence=0.5,
                      min_tracking_confidence=0.5)
mpDraw = mp.solutions.drawing_utils #initiate the drawing object vor visualization

#we specifically use the Hands part of the mediapipe detection pipeline, since we do not need face pose etc.
#the output of mpHands.Hands is a dictionary consisting of 0 and 1, meaning the left hand and the right hand in order.
#each hand consists of 21 keypoints, each containing 3 pixels (x,y,z) values.
#the values are normalized according to the image width and height by default.

#this function flattens all the data to an array for left and right hand and joins them together.
#since our model will run based on two hands, we have to make sure we somehow fill all of the 126 features
#our model requires.
#to do this, we extract the first and last sub dictionary of the mediapipe output.
#by doing this, if only one hand is present, the missing hand data will be duplicated as the present hand data.
#the model will throw a shape mismatch error if all of the features are not present. 
def extract_keypoints(results):   
    lh = np.array([[res.x, res.y, res.z] for res in results.multi_hand_landmarks[0].landmark]).flatten() if results.multi_hand_landmarks else np.zeros(21*3)   
    rh = np.array([[res.x, res.y, res.z] for res in results.multi_hand_landmarks[-1].landmark]).flatten() if results.multi_hand_landmarks else np.zeros(21*3)   
    
    return np.concatenate([lh, rh])

actions = np.array(['open', 'select' , 'close' , 'confirm' , 'reject' , #list of actions or classes to be predicted
                    'increasevol' , 'decreasevol' , 'next' , 'previous'])
n_classes = len(actions) #how many classes are there
input_shape = (30,126)
X_train = np.zeros((2,30,126))
X_train.shape[1:]


#This is the main transformer block
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    # Normalization and Attention
    x = layers.LayerNormalization(epsilon=1e-6)(inputs) #layer normalization is applied to the input to ease training
    x = layers.MultiHeadAttention(    #This is the attention layer. You can experiment with different parameters 
        key_dim=head_size, num_heads=num_heads, dropout=dropout
    )(x, x)
    x = layers.Dropout(dropout)(x) #as a result, apply dropout 
    res = x + inputs #residual connection

    # Feed Forward Part
    x = layers.LayerNormalization(epsilon=1e-6)(res) #feed the result to layer normalization
    x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(x) #use 1d convolution with kernel size 1 to preserve original sequence length
    x = layers.Dropout(dropout)(x) #apply dropout
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x) #we need a filter size of the feature count so we can calculate weights
    return x + res


def build_model( #construct the model 
    input_shape,
    head_size, #head size for the attention block
    num_heads,#number of heads for the attention block
    ff_dim, #feed forward dimensions
    num_transformer_blocks, #specifies how many times the transformer block is repeated
    mlp_units, #multilayer perceptron units
    dropout=0, #by default
    mlp_dropout=0, #by default
):
    inputs = keras.Input(shape=input_shape) #take the input
    x = inputs   
    
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout) #feed the input to the transformer block
    
    
    #apply global average pooling for classification
    #be aware of the data format argument. If channels_last is specified, the dataset should be as
    # (batch size,sequence length,number of features)
    # if specified as channels first, sequence length and number feature dimensions are swapped.
    x = layers.GlobalAveragePooling1D(data_format="channels_last")(x) 
    
    
    
    for dim in mlp_units: #create the dense layer
        x = layers.Dense(dim, activation="relu")(x)
        x = layers.Dropout(mlp_dropout)(x)
    outputs = layers.Dense(n_classes, activation="softmax")(x) #apply softmax for probability distribution
    return keras.Model(inputs, outputs)

input_shape = X_train.shape[1:] #take the last two arguments of the input shape which are 30,126

model = build_model( #these are all hyperparameters for the model complexity. General rule of thumb
    #would be two start as low as possible for all of them except the dropouts and gradually increase them until overfitting
    input_shape,
    head_size=128,
    num_heads=4,
    ff_dim=32,
    num_transformer_blocks=1,
    mlp_units=[16], #32
    mlp_dropout=0, #0.4
    dropout=0,
)


model.load_weights('models/11EylulTransformer.h5') 
#model.load_weights('models/7AgustosTransformer.h5') 



sequence = []
sentence = []
predictions = []
threshold = 0.75
        
cap = cv2.VideoCapture(1)       
while cap.isOpened():
 
    success, img = cap.read()
    imgRGB = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    results = hands.process(imgRGB)
    #print(results.multi_hand_landmarks)
      
     
    
    if results.multi_hand_landmarks:
        for handLms in results.multi_hand_landmarks:
            for id, lm in enumerate(handLms.landmark):               
                h, w, c = imgRGB.shape
                cx, cy = int(lm.x *w), int(lm.y*h)                
                cv2.circle(imgRGB, (cx,cy), 3, (255,0,255), cv2.FILLED)
    
            mpDraw.draw_landmarks(imgRGB, handLms, mpHands.HAND_CONNECTIONS)
    
    keypoints = extract_keypoints(results)
    sequence.append(keypoints)
    sequence = sequence[-30:]
    
    if len(sequence) == 30:
        res = model.predict(np.expand_dims(sequence, axis=0))[0]
        two_max = (-res).argsort()[:2] #find the two largest values of the softmax output
        #if res[np.argmax(res)] > threshold:
        if (res[two_max[0]] - res[two_max[1]])/res[two_max[0]] > threshold:  #if the normalized difference is greater, visualize  
            #print(actions[np.argmax(res)])
            #predictions.append(np.argmax(res))
            cv2.putText(imgRGB, actions[np.argmax(res)], (3,30), 
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            cv2.putText(imgRGB, str(np.amax(res)), (3,80), 
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
    
        #sequence = []
    cv2.imshow('OpenCV Feed', imgRGB)
    
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
                
cap.release()
cv2.destroyAllWindows()