# globals



In [1]:
n_classes = 10
actions = ['sister','hurry','hungry','meal','brother','tree','heavy','cry','family','wise']
colors = [
    (245,117,16),
    (117,245,16),
    (16,117,245)
]

def softmax(x):    
    f_x = np.exp(x) / np.sum(np.exp(x))
    return f_x

def arg_max(array):
    arg_max = np.argmax(array)
    return arg_max,array[arg_max]

# pytorch Model

In [2]:
import pandas as pd
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from PIL import Image


def convert_relu_to_swish(model: nn.Module):
    for child_name, child in model.named_children():
        if isinstance(child, nn.ReLU):
            setattr(model, child_name, nn.SiLU(True))
        else:
            convert_relu_to_swish(child)


            
            
class Swish(nn.Module):
    def __init(self):
        super().__init__()

    def forward(self, x):
        return x.mult_(torch.sigmoid(x))
    
    
    
class r2plus1d_18(nn.Module):
    def __init__(self, pretrained=True, n_classes=3, dropout_p=0.5):
        super(r2plus1d_18, self).__init__()
        self.pretrained = pretrained
        self.n_classes = n_classes

        model = torchvision.models.video.r2plus1d_18(pretrained=self.pretrained)
        modules = list(model.children())[:-1]
        self.r2plus1d_18 = nn.Sequential(*modules)
        convert_relu_to_swish(self.r2plus1d_18)
        self.fc1 = nn.Linear(model.fc.in_features, self.n_classes)
        self.dropout = nn.Dropout(dropout_p, inplace=True)

    def forward(self, x):
        # (b, f, c, h, w) = x.size()
        # x = x.view(b, c, f, h, w)

        out = self.r2plus1d_18(x)
        out = out.flatten(1)
        out = self.dropout(out)
        out = self.fc1(out)

        return out
    
    
h, w = 128, 128
mean = [0.43216, 0.394666, 0.37645]
std = [0.22803, 0.22145, 0.216989]



pytorch_model = r2plus1d_18(pretrained=False, n_classes=n_classes)
best_checkpoint = torch.load("final_weights\checkpoint_3dcnn_c10_v36.tar")
pytorch_model.load_state_dict(best_checkpoint["model_state_dict"])


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
pytorch_model = pytorch_model.to(device)




In [3]:
resize_transform   = transforms.Resize((h, w))
totensor_transform  = transforms.ToTensor()
normalize_transform = transforms.Normalize(mean, std)

class PytorchPredictor:
    def __init__(self,model,device):
        self.model = model
        self.device = device
        self.sequence = []
        
    
    def can_predict(self):
        return len(self.sequence) == 16
    
    def add_frame(self,frame):
        
        new_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        new_frame = Image.fromarray(new_frame)
        new_frame = resize_transform(new_frame)
        new_frame = totensor_transform(new_frame)
        new_frame = normalize_transform(new_frame).to(self.device)
        
        self.sequence.append(new_frame)
        self.sequence = self.sequence[-16:]
        
        
    def predict(self):
        seq = torch.stack(self.sequence).to(self.device)
        seq = torch.unsqueeze(seq, dim=0).permute(0, 2, 1, 3, 4)
        
        with torch.no_grad():
            self.model.eval()
            res = self.model(seq)
            res = res.cpu().detach().numpy()[0]
            return softmax(res)
        
        
        

# Keras model

In [4]:
import os
import tensorflow as tf
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
tf.config.set_visible_devices([], 'GPU')

In [5]:
tf.test.gpu_device_name()

'/device:GPU:0'

In [6]:
import tensorflow as tf

tf.config.set_visible_devices([], 'GPU')

if tf.test.gpu_device_name():
    print('GPU found')
else:
    print("No GPU found")

print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

import numpy as np
import os
import cv2 
import mediapipe as mp
import tensorflow as tf 
from tensorflow import keras
import matplotlib.pyplot as plt
import time
import pandas as pd

GPU found
Num GPUs Available:  1


In [7]:
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

holistic = mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5)


num_hand_marks = 21
num_pose_marks = 33


pose_selected_landmarks = [
    [0,2,5,11,13,15,12,14,16],
    [0,2,4,5,8,9,12,13,16,17,20],
    [0,2,4,5,8,9,12,13,16,17,20],
]

def draw_updated_styled(image,results):
    image_rows, image_cols, _ = image.shape
    
    original_landmarks = [
        results.pose_landmarks,
        results.left_hand_landmarks,
        results.right_hand_landmarks
    ]

    for shape in range(3):
        if(original_landmarks[shape]):
            lis = original_landmarks[shape].landmark
            for idx in pose_selected_landmarks[shape]:
                point = lis[idx]
                landmark_px = mp_drawing._normalized_to_pixel_coordinates(point.x, point.y,
                                                           image_cols, image_rows)

                cv2.circle(image, landmark_px, 2, (0,0,255),
                         4)
                
def extract_keypoints(results):
    
    original_landmarks = [
        results.pose_landmarks,
        results.left_hand_landmarks,
        results.right_hand_landmarks
    ]
    
    outputs = []
    for shape in range(3):
        if(original_landmarks[shape]):
            lis = original_landmarks[shape].landmark
            pose = np.array([ [lis[res].x,lis[res].y] for res in pose_selected_landmarks[shape] ]).flatten()
        else:
            pose = np.zeros(len(pose_selected_landmarks[shape])*2)
        outputs.append(pose)
    return np.concatenate([outputs[0],outputs[1],outputs[2]])


# holistic model process image and return the results as keypoints
def mediapipe_detection(image,model):
    image  = cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    image  = cv2.cvtColor(image,cv2.COLOR_RGB2BGR)
    return image,results
    
    


def draw_landmark_from_array(image, keyPoints):
    image_rows, image_cols, _ = image.shape
    
    
    for i in range(len(keyPoints)//2):
        x = keyPoints[i*2]
        y = keyPoints[i*2+1]
        if(x!=0 and y!=0): 
            landmark_px = mp_drawing._normalized_to_pixel_coordinates(x,y,
                                                       image_cols, image_rows)
            cv2.circle(image, landmark_px, 2, (0,0,255),
                     4)

                


In [8]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense,Input,Dropout
from tensorflow.keras.models import Model



def get_model():
    input_layer = Input(shape=(16,62))
    layer = LSTM(64,return_sequences=True,activation="relu")(input_layer)
    layer = LSTM(128,return_sequences=True,activation="relu")(layer)
    layer = LSTM(96,return_sequences=False,activation="relu")(layer)
    layer = Dense(64,activation="relu")(layer)
    layer = Dense(len(actions),activation="softmax")(layer)

    
    model = Model(inputs=input_layer,outputs=layer)
    model.compile(optimizer="Adam", loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    
    return model



keras_weights_dir = os.path.join("final_weights")
best_model_file_name = os.path.join(keras_weights_dir,"V1.h5")
keras_model = get_model()
keras_model.load_weights(os.path.join(best_model_file_name))

In [9]:
class KerasPredictor:
    def __init__(self,model):
        self.model = model
        self.sequence = []
        
    
    def can_predict(self):
        return len(self.sequence) == 16
    
    def add_frame(self,frame):
        
        f2 = cv2.resize(frame,(512,512))
        image, results = mediapipe_detection(f2, holistic)
        keypoints = extract_keypoints(results)
        
        self.sequence.append(keypoints)
        self.sequence = self.sequence[-16:]
        
        
        
    def predict(self):
        return self.model.predict(np.expand_dims(self.sequence, axis=0))[0]
        
        
        

# Real time

In [10]:
class Real_Time:
    def __init__(self, cap, holistic, fsize=(512, 512)):
        self.cap = cap
        self.fsize = fsize
        self.listed_frames = []
        self.holistic = holistic
        self.frame_pose = None
        self.frame_left_hand = None
        self.frame_right_hand = None
        self.last_frame_pose = None
        self.last_frame_left_hand = None
        self.last_frame_right_hand = None
        
    def read_frame(self):
        ret, frame = self.cap.read()
        if not ret:
            return None, ret, None
        frame = cv2.resize(frame, self.fsize)
        image, results = mediapipe_detection(frame, self.holistic)
        self.draw_styled_landmarks(image, results)
        frame_pose, frame_left_hand, frame_right_hand = self.extract_keypoints(results)
        self.frame_pose = frame_pose.sum().round(2)
        self.frame_left_hand = frame_left_hand.sum().round(2)
        self.frame_right_hand = frame_right_hand.sum().round(2)
        return frame, ret, image
    
    def update_last_frame(self):
        self.last_frame_pose = self.frame_pose
        self.last_frame_left_hand = self.frame_left_hand
        self.last_frame_right_hand = self.frame_right_hand
    
    def add_listed_frame(self, frame):
        self.listed_frames.append(frame)
    
    def considered_frame(self, pose_diff_threshold=0.5, right_hand_diff_threshold=0.5, left_hand_diff_threshold=0.5):
        pose_diff = np.abs(self.last_frame_pose - self.frame_pose).round(2)
        right_hand_diff = np.abs(self.last_frame_right_hand - self.frame_right_hand).round(2)
        left_hand_diff = np.abs(self.last_frame_left_hand - self.frame_left_hand).round(2)

        if pose_diff >= pose_diff_threshold or right_hand_diff >= right_hand_diff_threshold or left_hand_diff >= left_hand_diff_threshold:
            return True
        elif pose_diff < pose_diff_threshold and right_hand_diff < right_hand_diff_threshold and left_hand_diff < left_hand_diff_threshold:
            return False

    def extract_keypoints(self, results):
        # extract pose marks
        if results.pose_landmarks:
            pose = np.array([ [res.x,res.y,res.z,res.visibility] for res in results.pose_landmarks.landmark ]).flatten()
        else:
            pose = np.zeros(num_pose_marks*4)
        
        # extract left hand
        if results.left_hand_landmarks:
            left_hand = np.array([ [res.x,res.y] for res in results.left_hand_landmarks.landmark ]).flatten()
        else:
            left_hand = np.zeros(num_hand_marks*2)
            
            
        # extract right hand
        if results.right_hand_landmarks:
            right_hand = np.array([ [res.x,res.y] for res in results.right_hand_landmarks.landmark ]).flatten()
        else:
            right_hand = np.zeros(num_hand_marks*2)
        
        return pose, left_hand, right_hand

    def draw_styled_landmarks(self, image, results):
        # Draw pose connections
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                                mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                                mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                                ) 
        # Draw left hand connections
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                                mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                                ) 
        # Draw right hand connections  
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                                mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                                ) 
    
    def get_frames_indices(self, frames_no):
        self.listed_frames = self.listed_frames[1:]
        return np.linspace(0, len(self.listed_frames)-1, frames_no, dtype=np.int16)
    
    def truncate_listed_frames(self):
        self.listed_frames = []

    def __getitem__(self, idx):
        return self.listed_frames[idx]

    def __len__(self):
        return len(self.listed_frames)
    
    def get_data(self):
        return dict({
            "P": self.frame_pose,
            "LH": self.frame_left_hand,
            "RH": self.frame_right_hand,
            "LP": self.last_frame_pose,
            "LLH": self.last_frame_left_hand,
            "LRH": self.last_frame_right_hand,
            "LP-P": np.abs(self.last_frame_pose - self.frame_pose).round(2),
            "LLH-LH": np.abs(self.last_frame_right_hand - self.frame_right_hand).round(2),
            "LRH-RH": np.abs(self.last_frame_left_hand - self.frame_left_hand).round(2)
        })

def prob_viz(res, actions, input_frame, colors):
    l = len(colors)
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        prob = max(0,prob)
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100)*5, 90+num*40), colors[num%l], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame


In [None]:
pytorch_predictor = PytorchPredictor(model=pytorch_model,device=device)
keras_predictor = KerasPredictor(model=keras_model)

sentence = []
predictions = []
holistic = mp.solutions.holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5)

cap = cv2.VideoCapture(0)
real_time = Real_Time(cap, holistic, fsize=(512, 512))
# Set mediapipe model 

real_time.read_frame()
real_time.update_last_frame()

counter = 0
discarded_frames = 0

with holistic:
    while cap.isOpened():

        # Read feed
        frame, ret, image = real_time.read_frame()
        if(not ret):
            break

        if real_time.considered_frame():
            counter += 1
            update_flag = True
            discarded_frames = 0
            real_time.add_listed_frame(frame)

        elif not real_time.considered_frame(0.7):
            discarded_frames += 1
            if discarded_frames == 3:
                counter = 0
                discarded_frames = 0
                if len(real_time) >= 16:
                    frame_list = real_time.get_frames_indices(frames_no=16)
                    
                    for frame_idx in frame_list:
                        pytorch_predictor.add_frame(real_time[frame_idx])
                        keras_predictor.add_frame(real_time[frame_idx])

                    res1 = pytorch_predictor.predict()
                    res2 = keras_predictor.predict()
                    res = res1 + res2
                    arg_max = np.argmax(res)
                    predictions.append(arg_max)
                    predictions = predictions[-16:]
                    print(predictions)
                    real_time.truncate_listed_frames()
                    sentence.append(actions[arg_max])
                    
                    if len(sentence) > 4:
                        sentence = sentence[-4:]

        data = real_time.get_data()
        frame = image

        cv2.rectangle(frame, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(frame, ' '.join(sentence), (3,30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        
        cv2.putText(frame, "LRH:"+str(data["LRH"]), (0, 85+0*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (100,250,150), 2, cv2.LINE_8)
        cv2.putText(frame, "RH:"+str(data["RH"]), (0, 85+1*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (100,250,150), 2, cv2.LINE_8)
        cv2.putText(frame, "LRH-RH:"+str(data["LRH-RH"]), (0, 85+2*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (100,250,150), 2, cv2.LINE_8)

        cv2.putText(frame, "LLH:"+str(data["LLH"]), (0, 85+4*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (100,250,150), 2, cv2.LINE_8)
        cv2.putText(frame, "LH:"+str(data["LH"]), (0, 85+5*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (100,250,150), 2, cv2.LINE_8)
        cv2.putText(frame, "LLH-LH:"+str(data["LLH-LH"]), (0, 85+6*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (100,250,150), 2, cv2.LINE_8)

        cv2.putText(frame, "LP:"+str(data["LP"]), (0, 85+8*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (100,250,150), 2, cv2.LINE_8)
        cv2.putText(frame, "P:"+str(data["P"]), (0, 85+9*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (100,250,150), 2, cv2.LINE_8)
        cv2.putText(frame, "LP-P:"+str(data["LP-P"]), (0, 85+10*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (100,250,150), 2, cv2.LINE_8)

        cv2.putText(frame, str(counter), (250, 85+5*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (100,250,150), 2, cv2.LINE_8)
        cv2.putText(frame, str(discarded_frames), (250, 85+6*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (100,250,150), 2, cv2.LINE_8)

        # Updating
        if update_flag:
            real_time.update_last_frame()
        update_flag = False
        cv2.imshow("Real-Time", frame)

        if cv2.waitKey(100) & 0xFF == ord('q'):
            break
cap.release()
cv2.destroyAllWindows()

In [28]:
cap.release()
cv2.destroyAllWindows()