In [None]:
# !pip install patool
!pip install mediapipe


In [None]:
import os
import numpy as np
from matplotlib import pyplot as plt
import cv2
import tensorflow as tf
import tensorflow.keras.layers as tfl
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, BatchNormalization, ReLU, Dropout, GRU, ConvLSTM2D, Conv3D, Flatten, Bidirectional
from tensorflow.keras.utils import to_categorical
import random 
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import mediapipe as mp
# import pickle as pk
from google.colab import drive
from google.colab import files

In [None]:
drive.mount("/content/drive", force_remount=True)
%cd drive/MyDrive/Colab Notebooks/

In [None]:
# !pip install patool
# import patoolib
# patoolib.extract_archive("./traindata/all_cut.rar", outdir="./traindata")

In [None]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [None]:
holistic = mp_holistic.Holistic(
    static_image_mode=False,
    model_complexity=1,
    enable_segmentation=False,
    refine_face_landmarks=False,
    min_detection_confidence=0.5)

In [None]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    return  results

In [None]:
def normalize_zscore(pose, face, lh, rh):
       

    pose = (pose - pose.mean(axis=0))/(pose.std(axis=0) + 1e-7)
    face = (face -  face.mean(axis=0))/(face.std(axis=0) + 1e-7)
    lh = (lh - lh.mean(axis=0))/(lh.std(axis=0) + 1e-7)
    rh = (rh - rh.mean(axis=0))/(rh.std(axis=0) + 1e-7)

    
    return pose, face, lh, rh

In [None]:
def extract_keypoints(results):

    pose = np.array([[res.x, res.y, res.z] for res in results.pose_landmarks.landmark]) if results.pose_landmarks else np.zeros((33,3))
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]) if results.face_landmarks else np.zeros((468,3))
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]) if results.left_hand_landmarks else np.zeros((21,3))
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]) if results.right_hand_landmarks else np.zeros((21,3))

    face = face[:8,:]
    pose, face, lh, rh = normalize_zscore(pose, face, lh, rh)
    return np.concatenate([face.flatten(), pose.flatten(), lh.flatten(), rh.flatten()])
      

In [None]:
def draw_styled_landmarks(image, results):
    # Draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 

In [None]:
def videoProc(path, o=False):
    "###"
    cap = cv2.VideoCapture(path)
    success = True
    
    framecount = 25

    fnum = 0
    v = []
    
    fpsCounter = 0


    while success:     
        success, frame = cap.read()
        
        
        if success:  
            frame = cv2.resize(frame, (1920,1080), interpolation = cv2.INTER_AREA)
            results = mediapipe_detection(frame, holistic)

            eres = extract_keypoints(results)
            tmp = np.reshape(eres, (1,-1))
            v.append(tmp)

            fnum += 1
            if o:
                draw_styled_landmarks(frame, results)
                plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
                plt.show()

    c = round(fnum/framecount, 0)-1
    vtmp=[]
    j=0
    if c>0:
      for i in v:
        if fpsCounter==0:
            fpsCounter=c
            vtmp.append(i)
            j+=1
        else:
            fpsCounter-=1           
            continue
      fnum = j
      v = vtmp


    v = np.reshape(v,(fnum,-1)) if fnum >0 else[]

    if fnum>framecount:
      v = v[0:framecount]
    elif fnum<framecount: 
      # print(len(v))
      tmp = np.zeros((framecount-fnum,249))
      v = np.concatenate((v, tmp), axis=0) 

    
   
    
  

    return v

In [None]:
def videoProc2(path, c=1, skip=0, o=False):
    "###"
    cap = cv2.VideoCapture(path)
    success = True
    
    framecount = 30

    fnum = 0
    v = []
    
    fpsCounter = 0


    while success:     
        success, frame = cap.read()
        
        if fpsCounter==0:
            fpsCounter=c
        else:
            fpsCounter-=1           
            continue
        if fnum == framecount:
          break 
        
        if success:  
            # frame = cv2.resize(frame, (720,480), interpolation = cv2.INTER_AREA)
            results = mediapipe_detection(frame, holistic)

            eres = extract_keypoints(results)
            tmp = np.reshape(eres, (1,-1))
            v.append(tmp)

            fnum += 1
            if o:
                draw_styled_landmarks(frame, results)
                plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
                plt.show()


    
    v = np.reshape(v,(fnum,-1)) if fnum >0 else[]
        
    if fnum<framecount and fnum>0:
        tmp = np.zeros((framecount-fnum,249))
        v = np.concatenate((v, tmp), axis=0) 


    return v

***
***
***

In [None]:
path ="./traindata/all"
file_list = os.listdir(path)
files = file_list
files.sort()

In [None]:
len(files)

In [None]:
files = [path+"/"+f for f in files]# 
files[0],files[499]

In [None]:
v= videoProc2(files[352],c=3,o=1)
v.shape

In [None]:
v[24].max(),v[20].max(),

In [None]:
v[25].std(),v[25].mean()

In [None]:
sum = 0
for i in range(0,1600,10):
  v= videoProc2(files[i],c=3,o=0)
  sum+=v.shape[0]
  print(v.shape,int(i/50))


In [None]:
sum/(10*5)

In [None]:
l=0
X_train=[]
X_dev=[]
y_train=[]
y_dev=[]
files_ = files[1600:]
for f in files_:
    v= videoProc2(f,c=3,o=0)
    p = int(f.split('/')[-1].split('_')[1])
    if p <10:
      X_train.append(v)
      label = int(f.split('/')[-1].split('_')[0])-1
      y_train.append(label)
    else:
      X_dev.append(v)
      label = int(f.split('/')[-1].split('_')[0])-1
      y_dev.append(label)


    print(p, l)
    l+=1

    

In [None]:
X_train = np.array(X_train)
y_train = np.array(y_train)
X_dev = np.array(X_dev)
y_dev = np.array(y_dev)

In [None]:
X_train.shape,y_train.shape,X_dev.shape,y_dev.shape

In [None]:
np.save("./X_train_2.npy",X_train)
np.save("./y_train_2.npy",y_train)
np.save("./X_dev_2.npy",X_dev)
np.save("./y_dev_2.npy",y_dev)

In [None]:
del files
del file_list

***
***
***

In [None]:
# vids = vids[0:1600,:,:]
# vids = np.concatenate([vids[:,:,0::3],vids[:,:,1::3],],axis=0)
# labels = labels[0:1600]
X_train = np.load("./X_train_1.npy")
y_train = np.load("./y_train_1.npy")
X_dev = np.load("./X_dev_1.npy")
y_dev = np.load("./y_dev_1.npy")

In [None]:
X_train2 = np.load("./X_train_2.npy")
y_train2 = np.load("./y_train_2.npy")
X_dev2 = np.load("./X_dev_2.npy")
y_dev2 = np.load("./y_dev_2.npy")

In [None]:
X_train = np.concatenate([X_train,X_train2],axis=0)
y_train = np.concatenate([y_train,y_train2],axis=0)
X_dev = np.concatenate([X_dev,X_dev2],axis=0)
y_dev = np.concatenate([y_dev,y_dev2],axis=0)


In [None]:
del X_train2
del y_train2
del X_dev2
del y_dev2

In [None]:
X_train, y_train = shuffle(X_train, y_train, random_state=13)

X_dev, y_dev = shuffle(X_dev,  y_dev, random_state=13)

In [None]:
X_train[1,20].mean(),X_train[1,20].std(),X_train.shape

In [None]:
X_train[3].min(),X_train[3].max(),X_train[3].shape

In [None]:
y_train[0:10],y_train.min(),y_train.max(),y_train.shape

In [None]:

# Bidirectional(GRU(128, return_sequences=True, activation='tanh')),
# BatchNormalization(axis=-1,center=True,scale=True,),
# ReLU(), 
# LSTM
# tf.keras.layers.SimpleRNN(256, return_sequences=True, activation="relu",),


# Dense(128, activation='relu',kernel_regularizer=tf.keras.regularizers.L1L2(0.01,0.05)),  
# # Dense(64, activation='relu',kernel_regularizer=tf.keras.regularizers.L1L2(0.01,0.05)),  
# BatchNormalization(axis=-1,center=True,scale=True,),

model_lstm_1 = Sequential([
tf.keras.Input(shape=(30,249),),


GRU(96, return_sequences=False, activation="relu",),
BatchNormalization(axis=-1,center=True,scale=True,),
Dropout(0.25),


Dense(64, activation='softmax')
])


In [None]:
model_lstm_1.summary()

In [None]:
model_lstm_1.compile(tf.keras.optimizers.Adam(learning_rate=1e-3,beta_1=0.9,beta_2=0.999,epsilon=1e-07,)
        ,loss=tf.keras.losses.SparseCategoricalCrossentropy()
            ,metrics=["accuracy",])

In [None]:
model_lstm_1.fit(X_train, y_train, epochs=20, batch_size=128, )#batch_size=64 ,#validation_data=(X_dev, y_dev)

In [None]:
model_lstm_1.evaluate(X_train, y_train)

In [None]:
model_lstm_1.evaluate(X_dev, y_dev)

In [None]:
model_lstm_1.save("./modelArgantine/v8_64_4")

In [None]:
path ="./testdata/"
file_list = os.listdir(path)
file_list.sort()

files = [path+"/"+f for f in file_list]# 
files[0],files[1]

In [None]:
test1 = {}
j=0
for i in file_list:
  v=videoProc2(files[j],c=1,o=0)
  test1[i]=v
  print(j)
  j+=1

In [None]:
test2 = {}
j=0
for i in file_list:
  v=videoProc2(files[j],c=2,skip=0,o=0)
  test2[i]=v
  print(j)
  j+=1

In [None]:
# reconstructed_model = tf.keras.models.load_model("./modelArgantine/v8.6")

In [None]:
# reconstructed_model.summary()

In [None]:
for l,v in test1.items():#model_lstm_1
  p = model_lstm_1.predict(np.reshape(v, (1,30,249)))
  pp =np.argmax(p)
  print(l,pp,p[:,pp])

In [None]:
for l,v in test2.items():#model_lstm_1
  p = model_lstm_1.predict(np.reshape(v, (1,30,249)))
  pp =np.argmax(p)
  print(l,pp,p[:,pp])

In [None]:
test2["Bright.mp4"][25].max()