In [1]:
# author: Zhifei Wang, 11.14, 2021

import socket
import time
import cv2
import numpy as np

import mediapipe as mp
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_hands = mp.solutions.hands

import torch

In [2]:
class Net(torch.nn.Module):
    def __init__(self):
        super(Net,self).__init__()
        self.l1=torch.nn.Linear(63,21)
        self.l2=torch.nn.Linear(21,5)
        self.l3=torch.nn.Linear(5,3)
        self.sigmoid=torch.nn.Sigmoid()

    def forward(self,x):
        x=self.sigmoid(self.l1(x))
        x=self.sigmoid(self.l2(x))
        return self.l3(x)


model=torch.load('model.pkl')

In [3]:
def StandardScaler(x):
    '''data normalization based on sk[0]'''
    x_array=np.array(x)
    x_array-=x_array[0]

    mean=np.mean(x_array)
    std=np.std(x_array)

    x_array-=mean
    x_array/=std

    x_corr=x_array.tolist()

    return x_corr

def Landmarks2array(hand_landmarks):
    x = []
    y = []
    z = []
    xyz=[]
    for i in range(0, len(hand_landmarks.landmark)):
        x.append(hand_landmarks.landmark[i].x)
        y.append(hand_landmarks.landmark[i].y)
        z.append(hand_landmarks.landmark[i].z)

    x=StandardScaler(x)
    y=StandardScaler(y)
    z=StandardScaler(z)

    xyz=x+y+z
    return np.array(xyz)

def num_to01(num):
    if (num<0): return 0
    if (num>1): return 1
    else: return num



In [4]:
def Init(s):
    s.bind(address)

def Connect(s):
    s.listen(1)
    conn, addr = s.accept()
    print('connect from:'+str(addr))

    return conn

def recvall(sock, count):
    buf = b'' # buf is a byte type
    while count:
        newbuf = sock.recv(count)
        if not newbuf: return None
        buf += newbuf
        count -= len(newbuf)
    return buf


def ReceiveVideo(conn):

    gesture=0
    point1x,point1y=0,0
    point2x,point2y=0,0
    
    start = time.time() # for FPS

    length = recvall(conn,16)
    stringData = recvall(conn, int(length))
    data = np.frombuffer(stringData, np.uint8)
    image=cv2.imdecode(data,cv2.IMREAD_COLOR)

    with mp_hands.Hands(min_detection_confidence=0.5, min_tracking_confidence=0.5) as hands:

        results = hands.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))

        if results.multi_hand_landmarks:
            for hand_landmarks in results.multi_hand_landmarks:

                mp_drawing.draw_landmarks(image,hand_landmarks,mp_hands.HAND_CONNECTIONS)
                
                inputs=torch.from_numpy(Landmarks2array(hand_landmarks)).float()
                # calculate output for gesture classification
                outputs=model(inputs)
                _,predicted=torch.max(outputs.data,dim=0)

                if (predicted.item()==0): # write
                    gesture=0
                    hand_landmarks.landmark[8].x=num_to01(hand_landmarks.landmark[8].x)
                    hand_landmarks.landmark[8].y=num_to01(hand_landmarks.landmark[8].y)
                    
                    (point1x,point1y) = mp_drawing._normalized_to_pixel_coordinates(hand_landmarks.landmark[8].x, hand_landmarks.landmark[8].y, width, height)
                    

                elif(predicted.item()==1): # erase
                    gesture=1

                    hand_landmarks.landmark[start_landmark].x=num_to01(hand_landmarks.landmark[start_landmark].x)
                    hand_landmarks.landmark[start_landmark].y=num_to01(hand_landmarks.landmark[start_landmark].y)
                    hand_landmarks.landmark[end_landmark].x=num_to01(hand_landmarks.landmark[end_landmark].x)
                    hand_landmarks.landmark[end_landmark].y=num_to01(hand_landmarks.landmark[end_landmark].y)
          
                    (point1x,point1y)=mp_drawing._normalized_to_pixel_coordinates(hand_landmarks.landmark[start_landmark].x, hand_landmarks.landmark[start_landmark].y, width, height)
                    (point2x,point2y)=mp_drawing._normalized_to_pixel_coordinates(hand_landmarks.landmark[end_landmark].x, hand_landmarks.landmark[end_landmark].y, width, height)
          
                else: # others
                    gesture=2


        else:
            gesture=3 #no hands

    end = time.time()
    seconds = end - start
    fps  = 1/seconds
    print('FPS',fps)

    #cv2.imwrite("./test.jpg", cv2.flip(image, 1))
        
    return gesture,point1x,point1y,point2x,point2y
   

def SendData(conn,gesture,point1x,point1y,point2x,point2y):
    data = np.array([gesture,point1x,point1y,point2x,point2y]).astype(np.uint32)
    stringData = data.tostring()

    conn.send(str.encode(str(len(stringData)).ljust(16)))
    conn.send(stringData)
    


In [5]:
if __name__ == '__main__':

    height,width=480,640 # image size

    start_landmark=8
    end_landmark=17 # erase coordinate

    address = ('192.168.137.1', 8001) # change ip to your own
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    Init(s)
    conn=Connect(s)

    while True:
        gesture,point1x,point1y,point2x,point2y=ReceiveVideo(conn)
        SendData(conn,gesture,point1x,point1y,point2x,point2y)
        print(gesture,'\r\n')

        

