In [2]:
#!pip install mediapipe

Collecting mediapipe
  Using cached mediapipe-0.9.1.0-cp39-cp39-win_amd64.whl (49.8 MB)
Collecting flatbuffers>=2.0
  Using cached flatbuffers-23.1.21-py2.py3-none-any.whl (26 kB)
Collecting opencv-contrib-python
  Using cached opencv_contrib_python-4.7.0.68-cp37-abi3-win_amd64.whl (44.9 MB)
Installing collected packages: flatbuffers, opencv-contrib-python, mediapipe
  Attempting uninstall: flatbuffers
    Found existing installation: flatbuffers 1.12
    Uninstalling flatbuffers-1.12:
      Successfully uninstalled flatbuffers-1.12
Successfully installed flatbuffers-23.1.21 mediapipe-0.9.1.0 opencv-contrib-python-4.7.0.68


ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
tensorflow-gpu 2.5.0 requires flatbuffers~=1.12.0, but you have flatbuffers 23.1.21 which is incompatible.
tensorflow-gpu 2.5.0 requires typing-extensions~=3.7.4, but you have typing-extensions 4.4.0 which is incompatible.

[notice] A new release of pip available: 22.3.1 -> 23.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [54]:
# Loading libraries

# To keyboard and mouse emulating
import pydirectinput as py

# To receive and work with data
import time
import mediapipe as mp
import cv2
import csv
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

# To model training
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression, RidgeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score
import pickle

mp_drawing = mp.solutions.drawing_utils
mp_holistic = mp.solutions.holistic

In [75]:
list_of_classes = ['stay', 'left_hook', 'right_hook', 'block'] # which positions we want to classify
datafile_name = 'PaB_data.csv' # datafile path
pipelines = {
    #'lr':make_pipeline(StandardScaler(), LogisticRegression()),
    #'rc':make_pipeline(StandardScaler(), RidgeClassifier()),
    'rf':make_pipeline(StandardScaler(), RandomForestClassifier()),
    'gb':make_pipeline(StandardScaler(), GradientBoostingClassifier()),
} # which models we want to train
class_dur = 60 # time to train one position
prerec_time = 5 # time to prepare between positions

In [98]:
# Dataset initialization
base = ['class_name']
for i in range(501):
    new_row = [f'x{i}', f'y{i}', f'z{i}', f'v{i}']
    base += new_row
with open(datafile_name, mode='w', newline='') as f:
    csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    csv_writer.writerow(base)

# Var initialization
start_time = time.time()
class_iter = 0
acc_class = list_of_classes[class_iter]

cap = cv2.VideoCapture(0)
# Initiate holistic model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    while cap.isOpened():
        ret, frame = cap.read()
        
        # Recolor Feed
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False        
        
        # Make Detections
        results = holistic.process(image)
        # print(results.face_landmarks)
        
        # Recolor image back to BGR for rendering
        image.flags.writeable = True   
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        # 1. Draw face landmarks
        mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
                                 mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                                 mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                                 )
        
        # 2. Right hand
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(80,22,10), thickness=1, circle_radius=2),
                                 mp_drawing.DrawingSpec(color=(80,44,121), thickness=1, circle_radius=1)
                                 )

        # 3. Left Hand
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(121,22,76), thickness=1, circle_radius=2),
                                 mp_drawing.DrawingSpec(color=(121,44,250), thickness=1, circle_radius=1)
                                 )

        # 4. Pose Detections
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(245,117,66), thickness=1, circle_radius=2),
                                 mp_drawing.DrawingSpec(color=(245,66,230), thickness=1, circle_radius=1)
                                 )
        
        # Class initialization
        iter_time = time.time()
        time_dist = iter_time - start_time
        if time_dist > class_dur:
            if class_iter < len(list_of_classes) - 1:
                class_iter += 1
                start_time = time.time()
            else:
                print('End of recognition')
                break
        acc_class = list_of_classes[class_iter]

        
        try:
            # Show data info
            cv2.rectangle(image, (0,0), (270, 60), (245, 117, 16), -1)
            cv2.putText(image, acc_class, (90, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            cv2.putText(image, str(round(class_dur - time_dist, 1)), (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            
            # Giving a few seconds to user to prepair 
            if time_dist > prerec_time: 
                # Get landmarks
                pose = results.pose_landmarks.landmark
                pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten())
                face = results.face_landmarks.landmark
                face_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in face]).flatten())
                row = pose_row + face_row
                row.insert(0, acc_class)
                
                # Load landmarks into dataset
                with open(datafile_name, mode='a', newline='') as f:
                    csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
                    csv_writer.writerow(row)
                
                # Show data info
                w = 'to: ' + str(datafile_name)
                cv2.putText(image, w, (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                cv2.putText(image, 'recording', (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                
            else:
                cv2.putText(image, 'get ready', (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            
        except:
            print('Detection error')
            pass
        
        
        cv2.imshow('Raw Webcam Feed', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

Detection error
End of recognition


In [99]:
# load and show data for train
df = pd.read_csv(datafile_name)
display(df)

# Show action space
print('Action space:', df['class_name'].unique())

# Prepair data for training
X = df.drop('class_name', axis=1)
y = df['class_name']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=228)

# Train models
fit_models = {}
for algo, pipeline in pipelines.items():
    model = pipeline.fit(X_train, y_train)
    fit_models[algo] = model

# Return test scores
print('Models scores:')
for algo, model in fit_models.items():
    yhat = model.predict(X_test)
    print(algo, accuracy_score(y_test, yhat))

# Save models
for model_name in fit_models.keys():
    with open(f'{model_name}_PaB.pkl', 'wb') as f:
        pickle.dump(fit_models[model_name], f)

# Load one model for test
with open('gb_PaB.pkl', 'rb') as f:
    model = pickle.load(f)

Unnamed: 0,class_name,x0,y0,z0,v0,x1,y1,z1,v1,x2,...,z498,v498,x499,y499,z499,v499,x500,y500,z500,v500
0,stay,0.495366,0.091722,-0.445066,0.999998,0.502493,0.074127,-0.421138,0.999991,0.507274,...,0.001597,0.0,0.507460,0.076966,0.007747,0.0,0.508643,0.076114,0.007984,0.0
1,stay,0.494587,0.092100,-0.439953,0.999998,0.502183,0.074430,-0.416521,0.999991,0.506914,...,0.001638,0.0,0.507271,0.077659,0.007893,0.0,0.508427,0.076854,0.008126,0.0
2,stay,0.495259,0.092251,-0.419165,0.999998,0.502501,0.074553,-0.394597,0.999991,0.507387,...,0.001331,0.0,0.508537,0.078564,0.007743,0.0,0.509685,0.077852,0.007987,0.0
3,stay,0.495607,0.092321,-0.411438,0.999998,0.503067,0.074749,-0.387374,0.999991,0.507880,...,0.001369,0.0,0.512999,0.079769,0.007215,0.0,0.514217,0.079027,0.007413,0.0
4,stay,0.496590,0.092524,-0.385138,0.999998,0.503864,0.074977,-0.360946,0.999992,0.508496,...,0.001312,0.0,0.515513,0.080882,0.007443,0.0,0.516712,0.080080,0.007665,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2695,block,0.507477,0.126750,-0.207469,0.999811,0.512697,0.109602,-0.184081,0.999760,0.516681,...,-0.000394,0.0,0.518001,0.103964,0.006023,0.0,0.519000,0.102968,0.006214,0.0
2696,block,0.503780,0.126776,-0.222314,0.999805,0.508441,0.109545,-0.199098,0.999754,0.512277,...,-0.000480,0.0,0.513749,0.103947,0.006308,0.0,0.514814,0.102955,0.006529,0.0
2697,block,0.502411,0.126917,-0.221542,0.999802,0.507351,0.109340,-0.198375,0.999753,0.511094,...,-0.000203,0.0,0.512300,0.104467,0.006729,0.0,0.513292,0.103552,0.006959,0.0
2698,block,0.503131,0.126296,-0.220408,0.999794,0.507599,0.108591,-0.197053,0.999742,0.511142,...,0.000122,0.0,0.513108,0.104620,0.007059,0.0,0.514115,0.103805,0.007298,0.0


Action space: ['stay' 'left_hook' 'right_hook' 'block']
Models scores:
rf 1.0
gb 1.0


FileNotFoundError: [Errno 2] No such file or directory: 'gb_PaB(1).pkl'

In [101]:
model

Pipeline(steps=[('standardscaler', StandardScaler()),
                ('gradientboostingclassifier', GradientBoostingClassifier())])

In [102]:
cap = cv2.VideoCapture(0)
# Initiate holistic model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    while cap.isOpened():
        ret, frame = cap.read()
        
        # Recolor Feed
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False        
        
        # Make Detections
        results = holistic.process(image)
        # print(results.face_landmarks)
        
        # face_landmarks, pose_landmarks, left_hand_landmarks, right_hand_landmarks
        
        # Recolor image back to BGR for rendering
        image.flags.writeable = True   
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        # 1. Draw face landmarks
        mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
                                 mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                                 mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                                 )
        
        # 2. Right hand
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(80,22,10), thickness=1, circle_radius=2),
                                 mp_drawing.DrawingSpec(color=(80,44,121), thickness=1, circle_radius=1)
                                 )

        # 3. Left Hand
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(121,22,76), thickness=1, circle_radius=2),
                                 mp_drawing.DrawingSpec(color=(121,44,250), thickness=1, circle_radius=1)
                                 )

        # 4. Pose Detections
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(245,117,66), thickness=1, circle_radius=2),
                                 mp_drawing.DrawingSpec(color=(245,66,230), thickness=1, circle_radius=1)
                                 )
        
        
        try:
            # Get landmarks
            pose = results.pose_landmarks.landmark
            pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten())
            face = results.face_landmarks.landmark
            face_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in face]).flatten())
            row = pose_row + face_row
            
            # Predict pose
            X = pd.DataFrame([row])
            pred = model.predict(X)[0]
            proba = model.predict_proba(X)[0]
            
            # Show predict
            cv2.rectangle(image, (0,0), (250, 60), (245, 117, 16), -1)
            cv2.putText(image, pred.split(' ')[0], (95, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            cv2.putText(image, str(round(proba[np.argmax(proba)], 2)), (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            
        except:
            print('Detection error')
            pass
        
        
        cv2.imshow('Raw Webcam Feed', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error
Detection error


KeyboardInterrupt: 

In [103]:
# Contains what we want to emulate for each pose
bind_settings = {
    'stay':      [['ms', 'mouseUp', 'right']],
    'left_hook': [['ms', 'mouseUp', 'right'],['ms', 'mouseDown', 'left'],['ms', 'mouseUp', 'left']],
    'right_hook':[['ms', 'mouseUp', 'right'],['ms', 'mouseDown', 'left'],['ms', 'mouseUp', 'left']],
    'block':     [['ms', 'mouseDown', 'right']],
}

In [104]:
# Emulate keyboard and mouse function
def emulator_request_decoder(predict, binds):
    request = binds[predict]
    for req_index in range(len(request)):
        device, command, button = request[req_index]
        if device == 'kb':
            if command == 'keyUp':
                try:
                    py.keyUp(button)
                except:
                    print(f'Unknown Button Error: {button}')
            elif command == 'keyDown':
                try:
                    py.keyDown(button)
                except:
                    print(f'Unknown Button Error: {button}')
            elif command == 'press':
                try:
                    py.press(button)
                except:
                    print(f'Unknown Button Error: {button}')
            else:
                print(f'Unknown Command Error: {command}. Only "keyUp", "keyDown", "press" commands are available for {device}')
        elif device == 'ms':
            if command == 'click':
                try:
                    py.click(button=button)
                except:
                    print(f'Unknown Button Error: {button}')
            elif command == 'mouseUp':
                try:
                    py.mouseUp(0,0,button)
                except:
                    print(f'Unknown Button Error: {button}')
            elif command == 'mouseDown':
                try:
                    py.mouseDown(0,0,button)
                except:
                    print(f'Unknown Button Error: {button}')
            elif command == 'move':
                try:
                    x, y = button.split('_')
                    py.moveRel(int(x), int(y))
                except:
                    print(f'Unknown Button Error: {button}')
            else:
                print(f'Unknown Command Error: {command}. Only "click", "mouseDown", "mouseUp" commands are available for {device}')
        else:
            print(f'Unknown Device Error: {device}. Only "kb", "ms" devices are available')
        print(predict)

In [106]:
cap = cv2.VideoCapture(0)
# Initiate holistic model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    while cap.isOpened():
        ret, frame = cap.read()
        
        # Recolor Feed
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False        
        
        # Make Detections
        results = holistic.process(image)
        # print(results.face_landmarks)
        
        # Recolor image back to BGR for rendering
        image.flags.writeable = True   
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        # 1. Draw face landmarks
        mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
                                 mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                                 mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                                 )
        
        # 2. Right hand
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(80,22,10), thickness=1, circle_radius=2),
                                 mp_drawing.DrawingSpec(color=(80,44,121), thickness=1, circle_radius=1)
                                 )

        # 3. Left Hand
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(121,22,76), thickness=1, circle_radius=2),
                                 mp_drawing.DrawingSpec(color=(121,44,250), thickness=1, circle_radius=1)
                                 )

        # 4. Pose Detections
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(245,117,66), thickness=1, circle_radius=2),
                                 mp_drawing.DrawingSpec(color=(245,66,230), thickness=1, circle_radius=1)
                                 )
        
        
        try:
            # Get landmarks
            pose = results.pose_landmarks.landmark
            pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten())
            face = results.face_landmarks.landmark
            face_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in face]).flatten())
            row = pose_row + face_row
            
            # Predict pose
            X = pd.DataFrame([row])
            pred = model.predict(X)[0]
            proba = model.predict_proba(X)[0]
            
            # Emulate prediction
            emulator_request_decoder(pred, bind_settings)
            
            # Show prediction
            cv2.rectangle(image, (0,0), (250, 60), (245, 117, 16), -1)
            cv2.putText(image, pred.split(' ')[0], (95, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            cv2.putText(image, str(round(proba[np.argmax(proba)], 2)), (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        except:
            print('Detection error')
            pass
        
        
        cv2.imshow('Raw Webcam Feed', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

stay
stay
stay
stay
stay
stay
block
block
block
block
block
block
block
block
block
block
block
block
block
block
block
stay
stay
stay
stay
stay
stay
stay
stay
stay
stay
stay
stay
stay
stay
stay
stay
stay
stay
stay
stay
stay
stay
stay
block
block
block
block
block
block
block
right_hook
right_hook
right_hook
block
block
block
block
left_hook
left_hook
left_hook
block
block
block
block
block
block
block
block
block
block
block
block
block
left_hook
left_hook
left_hook
block
block
block
block
block
block
block
block
block
block
block
block
block
block
right_hook
right_hook
right_hook
right_hook
right_hook
right_hook
block
block
block
block
left_hook
left_hook
left_hook
block
block
right_hook
right_hook
right_hook
block
block
block
block
block
block
block
block
block
block
block
block
block
block
left_hook
left_hook
left_hook
block
block
block
block
block
left_hook
left_hook
left_hook
block
block
block
block
block
block
block
left_hook
left_hook
left_hook
block
block
block
block
right_hoo

KeyboardInterrupt: 