In [None]:
!pip install mediapipe

In [None]:
!pip install tensorflow

In [None]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp
import tensorflow as tf

#KeyPoint using MP Holistic

In [None]:
mp_holistic = mp.solutions.holistic  # Holisitic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [None]:
def mediapipe_detection(image,model):
  image = cv2.cvtColor(image,cv2.COLOR_BGR2RGB) # color conversion BGR 2 RGB
  image.flags.writeable = False
  results = model.process(image)
  image.flags.writeable = True
  image = cv2.cvtColor(image,cv2.COLOR_RGB2BGR)# color conversion RGB 2 BGR
  return image,results

In [None]:
def draw_landmarks(image, results):
  mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS) # Draw face connections
  mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections
  mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
  mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # draw right hand connections

In [None]:
mp_holistic.FACEMESH_CONTOURS

In [None]:
def draw_styled_landmarks(image, results):
    # Draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS,
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             )
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             )
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             )
    # Draw right hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             )

In [None]:
cap = cv2.VideoCapture(0)
# access mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
  while cap.isOpened():

    # Read feed
    ret,frame = cap.read()
    # make detections
    image,results = mediapipe_detection(frame,holistic)
    print(results)
    # draw_landmarks(image,results)
    draw_styled_landmarks(image, results)
    # show to screen
    cv2.imshow('OpenCV Feed',image)
    # break gracefully
    # if we press q then it will break
    if cv2.waitKey(10) & 0xFF == ord('q'):
      break
  cap.release()
  cv2.destroyAllWindows()

In [None]:
len(results.face_landmarks.landmark)

In [None]:
len(results.left_hand_landmarks.landmark)

In [None]:
draw_styled_landmarks(image,results)

In [None]:
plt.imshow(cv2.cvtColor(image,cv2.COLOR_BGR2RGB))

#Extracting key point Values

In [None]:
results.pose_landmarks.landmark[0]

In [None]:
for res in results.pose_landmarks.landmark:
  test = np.array([res.x,res.y,res.z,res.visibility])

In [None]:
test

In [None]:
# till here we have one landmark to get all the land mark in a flattend array
pose = []
for res in results.pose_landmarks.landmark:
  test = np.array([res.x,res.y,res.z,res.visibility])
  pose.append(test)

In [None]:
#refarctoring the above code
pose = np.array([[res.x,res.y,res.z,res.visibility]for res in results.pose_landmarks.landmark]).flatten()  if results.pose_landmarks else np.zeros(33*4)

In [None]:
face = np.array([[res.x,res.y,res.z]for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)

In [None]:
lh = np.array([[res.x,res.y,res.z]for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)

In [None]:
rh = np.array([[res.x,res.y,res.z]for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)

In [None]:
# now lets make a fucntion for it to combine
def extract_keypoints(results):
  pose = np.array([[res.x,res.y,res.z,res.visibility]for res in results.pose_landmarks.landmark]).flatten()  if results.pose_landmarks else np.zeros(33*4)
  face = np.array([[res.x,res.y,res.z]for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
  lh = np.array([[res.x,res.y,res.z]for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
  rh = np.array([[res.x,res.y,res.z]for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
  return np.concatenate([pose,face,lh,rh])

In [None]:
extract_keypoints(results).shape

# Setup folders for Collection

In [None]:
# Path for the exported data, numpy arrays
DATA_PATH = os.path.join('MP_Data')

#Actions that we try to detect
actions = np.array(['hello','thanks','iloveyou'])

# thirty videos worth of data
no_sequences = 30

# videos are going to be 30 frames in length
sequence_length = 30


In [None]:
#hello
## 0
## 1
## ..
## ..
## 29

# thanks
## 0

# iloveyou
##

In [None]:
for action in actions:
  for sequence in range(no_sequences):
    try:
      os.makedirs(os.path.join(DATA_PATH,action,str(sequence)))
    except:
      pass

In [None]:
# just to reccap , we're going to collect 30 videos per action i.e. hello , thanks, ily
# then each one of the those video sequences are going to contain 30 frames of data.
# Each frame will contain  1662 landmark values i.e. 3*30 sequences, 30 frames,1662 landmarks

# Collect keypoint Values for Training and Testing

In [None]:
cap = cv2.VideoCapture(0)
# access mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
  #loop through actions
  for action in actions:
    #loop through sequences aka videos
    for sequence in range(no_sequences):
      #loop through video length aka sequence_length
      for frame_num in range(sequence_length):

        # Read feed
        ret,frame = cap.read()
        # make detections
        image,results = mediapipe_detection(frame,holistic)
        print(results)
        # draw_landmarks(image,results)
        draw_styled_landmarks(image, results)

        # Apply wait logic
        if frame_num==0:
          cv2.putText(image,'STARTING COLLECTION',(120,200),
                      cv2.FONT_HERSHEY_SIMPLEX,1,(0,255,0),4,cv2.LINE_AA)
          #passing image,name,position of name,font,fontsize,fontcolor,line width,line type
          cv2.putText(image,'Collecting frames for {} video number {}'.format(action,sequence),(15,12),
                      cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,0,255),1,cv2.LINE_AA)
          cv2.waitKey(2000)
        else:
          cv2.putText(image,'Collecting frames for {} video number {}'.format(action,sequence),(15,12),
                      cv2.FONT_HERSHEY_SIMPLEX,0.5,(0,0,255),1,cv2.LINE_AA)

        # New export keypoints
        keypoints = extract_keypoints(results)
        npy_path = os.path.join(DATA_PATH,action,str(sequence),str(frame_num))
        np.save(npy_path,keypoints)
        # show to screen
        cv2.imshow('OpenCV Feed',image)
        # break gracefully
        # if we press q then it will break
        if cv2.waitKey(10) & 0xFF == ord('q'):
          break
  cap.release()
  cv2.destroyAllWindows()

# PreProcessing Data and create labels and features

In [None]:
!pip install scikit-learn

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [None]:
label_map = {label:num for num,label in enumerate(actions)}

In [None]:
label_map

In [None]:
sequences,labels =[],[]
for action in actions:
  for sequence in range(no_sequences):
    window=[]
    for frame_num in range(sequence_length):
      res= np.load(os.path.join(DATA_PATH,action,str(sequence),"{}.npy".format(frame_num)))
      window.append(res)
    sequences.append(window)
    labels.append(label_map[action])

In [None]:
np.array(sequences).shape

In [None]:
np.array(labels).shape

In [None]:
x = np.array(sequences)

In [None]:
x.shape

In [None]:
y= to_categorical(labels)

In [None]:
y.shape

In [None]:
# now converting the total data in to train and test
X_train, X_test, y_train, y_test = train_test_split(x,y,test_size=0.05)

In [None]:
y_train.shape

# Build and Train LSTM Neural Network

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM,Dense
from tensorflow.keras.callbacks import TensorBoard

In [None]:
# set random seed
import random
tf.random.set_seed(42)

# create our model

model = Sequential([
    LSTM(64,return_sequences=True,activation="relu",input_shape=(30,1662)),
    LSTM(128,return_sequences=True,activation="relu"),
    LSTM(64,return_sequences=False,activation="relu"),
    Dense(64,activation="relu"),
    Dense(32,activation="relu"),
    Dense(actions.shape[0],activation="softmax"),

])


In [None]:
model.compile(loss="categorical_crossentropy",
              optimizer="Adam",
              metrics=["categorical_accuracy"])

In [None]:
history = model.fit(X_train,y_train,epochs=150)

In [None]:
pip install wget

In [None]:
!python -m wget https://raw.githubusercontent.com/code1ayush/DeepLearning_helper_functions/main/All_in_one_function.py

In [None]:
# Plot the validation and training data separately
def plot_loss_curves(history):
  """
  Returns separate loss curves for training and validation metrics.
  """
  loss = history.history['loss']
  # val_loss = history.history['val_loss']

  accuracy = history.history['categorical_accuracy']
  # val_accuracy = history.history['val_accuracy']

  epochs = range(len(history.history['loss']))

  # Plot loss
  plt.plot(epochs, loss, label='training_loss')
  # plt.plot(epochs, val_loss, label='val_loss')
  plt.title('Loss')
  plt.xlabel('Epochs')
  plt.legend()

  # Plot accuracy
  plt.figure()
  plt.plot(epochs, accuracy, label='training_accuracy')
  # plt.plot(epochs, val_accuracy, label='val_accuracy')
  plt.title('Accuracy')
  plt.xlabel('Epochs')
  plt.legend();

In [None]:
plot_loss_curves(history)

# Make Predictions

In [None]:
res = model.predict(X_test)

In [None]:
actions[np.argmax(res[4])]

In [None]:
actions[np.argmax(y_test[4])]

# Save the model

In [None]:
model.save('action.h5')

In [None]:
# model = load_model('action.h5')

# Evaluate the confusion matrix

In [None]:
from All_in_one_function import make_confusion_matrix

In [None]:
y_pred = model.predict(X_test)

In [None]:
y_true = np.argmax(y_test,axis=1).tolist()
y_pred = np.argmax(y_pred,axis=1).tolist()

In [None]:
make_confusion_matrix(y_true,y_pred)

# Test in real time

In [None]:
sequence = []
sentence = []
threshold =0.4

cap = cv2.VideoCapture(0)
# access mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
  while cap.isOpened():

    # Read feed
    ret,frame = cap.read()
    # make detections
    image,results = mediapipe_detection(frame,holistic)
    print(results)
    # draw_landmarks(image,results)
    draw_styled_landmarks(image, results)

    # predict logic
    keypoints = extract_keypoints(results)

    # sequence.append(keypoints)
    sequence.insert(0,keypoints)
    sequence = sequence[:30]

    if len(sequence)==30:
      res = model.predict(np.expand_dims(sequence,axis=0))[0]
      print(actions[np.argmax(res)])

    # video logic
    if res[np.argmax(res)]>threshold:
      sentence = actions[np.argmax(res)]
      # if len(sentence)>0:
      #   if actions[np.argmax(res)] != sentence[-1]:
      #     sentence.append(actions[np.argmax(res)])
      # else:
      #     sentence.append(actions[np.argmax(res)])
    # if len(sentence) >5:
    #   sentence = sentence[-5:]

    cv2.rectangle(image,(0,0),(110,40),(0,255,0),-1)
    cv2.putText(image,sentence,(3,30),
                cv2.FONT_HERSHEY_SIMPLEX,1,(255,255,255),2,cv2.LINE_AA)
    # show to screen
    cv2.imshow('OpenCV Feed',image)
    # break gracefully
    # if we press q then it will break
    if cv2.waitKey(10) & 0xFF == ord('q'):
      break
  cap.release()
  cv2.destroyAllWindows()

In [None]:
  cap.release()
  cv2.destroyAllWindows()

In [None]:
X_test[0].shape

In [None]:
# but we want (1,30,1662) so
np.expand_dims(X_test[0],axis=0).shape