1. Import and Install the required libraries


In [None]:
import sys
!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip uninstall protobuf -y
!{sys.executable} -m pip uninstall mediapipe -y
!{sys.executable} -m pip uninstall tensorflow -y
!{sys.executable} -m pip uninstall opencv-python -y
!{sys.executable} -m pip uninstall opencv-python-headless -y
!{sys.executable} -m pip install notebook jupyterlab jupyter_server nbclient nbconvert nbformat ipywidgets --user
!{sys.executable} -m pip install opencv-python --user
!{sys.executable} -m pip install matplotlib --user
!{sys.executable} -m pip install protobuf==4.25.3 --user
!{sys.executable} -m pip install mediapipe --user
!{sys.executable} -m pip install tensorflow --user
!{sys.executable} -m pip install opencv-python-headless --user
!{sys.executable} -m pip install opencv-contrib-python --user
!{sys.executable} -m pip install ipython --user
!{sys.executable} -m pip install subprocess32 --user  
!{sys.executable} -m pip install threading --user  
!{sys.executable} -m pip install signal --user
!{sys.executable} -m pip install numpy --user    









In [None]:
import cv2
import numpy as np
import os
import pandas as pd
from IPython.display import display, Image, clear_output
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import time
import mediapipe as mp
import subprocess
import threading
import signal
import io
import ipywidgets as widgets
from PIL import Image
import asyncio


In [None]:
mp_holistic = mp.solutions.holistic #Holistic Model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities
mp_face_mesh = mp.solutions.face_mesh

In [None]:
DATA_PATH = os.path.join('MP_Data')
actions = np.array(['hello', 'thanks', 'iloveyou'])
no_sequences = 30
sequence_length = 30

In [None]:
# Create directories for dataset storage
for action in actions:
    for sequence in range(no_sequences):
        os.makedirs(os.path.join(DATA_PATH, action, str(sequence)), exist_ok=True)

In [None]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [None]:
def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION) # Draw face connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections
    return image

In [None]:
# Keypoint extraction
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

In [None]:
def draw_styled_landmarks(image, results):
#     # Draw face landmarks
    if results.face_landmarks:
        mp_drawing.draw_landmarks(image, results.face_landmarks, mp_face_mesh.FACEMESH_TESSELATION,
                                mp_drawing.DrawingSpec(color=(80, 110, 10), thickness=1, circle_radius=1),
                                mp_drawing.DrawingSpec(color=(80, 256, 121), thickness=1, circle_radius=1))

#     # Draw pose landmarks
    if results.pose_landmarks:
       mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                                mp_drawing.DrawingSpec(color=(80, 22, 10), thickness=2, circle_radius=4),
                                mp_drawing.DrawingSpec(color=(80, 44, 121), thickness=2, circle_radius=2))

#     # Draw left hand landmarks
    if results.left_hand_landmarks:
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                                mp_drawing.DrawingSpec(color=(121, 22, 76), thickness=2, circle_radius=4),
                                mp_drawing.DrawingSpec(color=(121, 44, 250), thickness=2, circle_radius=2))

#     # Draw right hand landmarks
    if results.right_hand_landmarks:
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                                mp_drawing.DrawingSpec(color=(121, 44, 250), thickness=2, circle_radius=4),
                                mp_drawing.DrawingSpec(color=(121, 22, 76), thickness=2, circle_radius=2))

In [None]:
# Image Widget for Display
image_widget = widgets.Image()
display(image_widget)

async def collect_data():
    cap = cv2.VideoCapture(0)
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
    cap.set(cv2.CAP_PROP_FPS, 60)
    loop = asyncio.get_event_loop()
    
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        for action in actions:
            for sequence in range(no_sequences):
                for frame_num in range(sequence_length):
                    ret, frame = await loop.run_in_executor(None, cap.read)
                    if not ret:
                        break
                    
                    frame, results = mediapipe_detection(frame, holistic)
                    draw_styled_landmarks(frame, results)
                    
                    if frame_num == 0:
                        cv2.putText(frame, 'STARTING COLLECTION', (120,200), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 4, cv2.LINE_AA)
                        cv2.putText(frame, f'Collecting frames for {action} Video Number {sequence}', (15,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                        
                        # Update Display
                        _, buffer = cv2.imencode('.jpg', frame)
                        image_widget.value = buffer.tobytes()
                        await asyncio.sleep(2)  # Display 'STARTING COLLECTION' for 2 seconds
                    else:
                        cv2.putText(frame, f'Collecting frames for {action} Video Number {sequence}', (15,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    
                    # Update Display
                    _, buffer = cv2.imencode('.jpg', frame)
                    image_widget.value = buffer.tobytes()
                    
                    keypoints = extract_keypoints(results)
                    npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                    np.save(npy_path, keypoints)
                    
                    await asyncio.sleep(0.05)  # Ensures a slight break between frames
    cap.release()

In [None]:
# Run Data Collection
task = asyncio.create_task(collect_data())

In [None]:
# from sklearn.model_selection import train_test_split
# from tensorflow.keras.utils import to_categorical

In [None]:
# label_map = {label:num for num, label in enumerate(actions)}

In [None]:
# label_map

In [None]:
# sequences, labels = [], []
# for action in actions:
#     for sequence in range(no_sequences):
#         window = []
#         for frame_num in range(sequence_length):
#             res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
#             window.append(res)
#         sequences.append(window)
#         labels.append(label_map[action])

In [None]:
# np.array(sequences).shape

In [None]:
# np.array(labels).shape

In [None]:
# X=np.array(sequences)

In [None]:
# X.shape

In [None]:
# y= to_categorical(labels).astype(int)

In [None]:
# y

In [None]:
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

In [None]:
# y_test.shape

In [None]:
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import LSTM, Dense
# from tensorflow.keras.callbacks import TensorBoard

In [None]:
# log_dir = os.path.join('Logs')
# tb_callback = TensorBoard(log_dir=log_dir)

In [None]:
# model = Sequential()
# model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30,1662)))
# model.add(LSTM(128, return_sequences=True, activation='relu'))
# model.add(LSTM(64, return_sequences=False, activation='relu'))
# model.add(Dense(64, activation='relu'))
# model.add(Dense(32, activation='relu'))
# model.add(Dense(actions.shape[0], activation='softmax'))

In [None]:
# res=[0.7, 0.2, 0.1]

In [None]:
# actions[np.argmax(res)]

In [None]:
# model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])

In [None]:
# model.fit(X_train, y_train, epochs=2000, callbacks=[tb_callback])

In [None]:
# model.summary()

In [None]:
# res = model.predict(X_test)

In [None]:
# actions[np.argmax(res[4])]

In [None]:
# actions[np.argmax(y_test[4])]

In [None]:
# model.save('action.h5')

In [None]:
# del model

In [None]:
# model.load_weights('action.h5')

In [None]:
# from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

In [None]:
# yhat = model.predict(X_train)

In [None]:
# ytrue = np.argmax(y_train, axis=1).tolist()
# yhat = np.argmax(yhat, axis=1).tolist()

In [None]:
# multilabel_confusion_matrix(ytrue, yhat)

In [None]:
# accuracy_score(ytrue, yhat)

In [None]:
# seuqnece.reverse()

In [None]:
# len(sequence)

In [None]:
# sequence.append('def')

In [None]:
# sequence.reverse()

In [None]:
# sequence[30:]

In [None]:
# # Image Widget for Display
# image_widget = widgets.Image()
# display(image_widget)

# sequence = []
# sentence = []
# predictions = []
# threshold = 0.5

# async def collect_data():
#     cap = cv2.VideoCapture(0)
#     cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
#     cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
#     cap.set(cv2.CAP_PROP_FPS, 60)
#     loop = asyncio.get_event_loop()
    
#     with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
#         for action in actions:
#             for sequence_num in range(no_sequences):
#                 for frame_num in range(sequence_length):
#                     ret, frame = await loop.run_in_executor(None, cap.read)
#                     if not ret:
#                         break
                    
#                     frame, results = mediapipe_detection(frame, holistic)
#                     draw_styled_landmarks(frame, results)
                    
#                     keypoints = extract_keypoints(results)
#                     sequence.append(keypoints)
#                     sequence = sequence[-30:]

#                     if len(sequence) == 30:
#                         res = model.predict(np.expand_dims(sequence, axis=0))[0]
#                         print(actions[np.argmax(res)])
                        
#                     if res[np.argmax(res)] > threshold:
#                         if len(sentence) > 0:
#                             if actions[np.argmax(res)] != sentence[-1]:
#                                 sentence.append(actions[np.argmax(res)])
#                         else:    
#                             sentence.append(actions[np.argmax(res)])
                    
#                     if len(sentence) > 5:
#                         sentence = sentence[-5:]

#                     cv2.rectangle(frame, (0,0), (640, 40), (245, 117, 16), -1)
#                     cv2.putText(frame, ' '.join(sentence), (3,30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

#                     # Update Display
#                     _, buffer = cv2.imencode('.jpg', frame)
#                     image_widget.value = buffer.tobytes()
                    
#                     npy_path = os.path.join(DATA_PATH, action, str(sequence_num), str(frame_num))
#                     np.save(npy_path, keypoints)
                    
#                     await asyncio.sleep(0.05)
#     cap.release()

# # Run Data Collection
# task = asyncio.create_task(collect_data())


In [None]:
# Run Data Collection
# task = asyncio.create_task(collect_data())

In [None]:
# # np.expand_dims(sequence, axis=0).shape
# res[np.argmax(res)] > threshold

In [None]:
# (num_sequences, 30, 1662)

In [None]:
# model.predict(np.expand_dims(X_test[0], axis=0))