In [1]:
import json
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg 
import trt_pose.coco
import math
import os
import numpy as np
import traitlets
import sys


In [2]:
with open('preprocess/hand_pose.json', 'r') as f:
    hand_pose = json.load(f)

topology = trt_pose.coco.coco_category_to_topology(hand_pose)
import trt_pose.models

num_parts = len(hand_pose['keypoints'])
num_links = len(hand_pose['skeleton'])

model = trt_pose.models.resnet18_baseline_att(num_parts, 2 * num_links).cuda().eval()
import torch


WIDTH = 224
HEIGHT = 224
data = torch.zeros((1, 3, HEIGHT, WIDTH)).cuda()

if not os.path.exists('model/hand_pose_resnet18_att_244_244_trt.pth'):
    MODEL_WEIGHTS = 'model/hand_pose_resnet18_att_244_244.pth'
    model.load_state_dict(torch.load(MODEL_WEIGHTS))
    import torch2trt
    model_trt = torch2trt.torch2trt(model, [data], fp16_mode=True, max_workspace_size=1<<25)
    OPTIMIZED_MODEL = 'model/hand_pose_resnet18_att_244_244_trt.pth'
    torch.save(model_trt.state_dict(), OPTIMIZED_MODEL)


OPTIMIZED_MODEL = 'model/hand_pose_resnet18_att_244_244_trt.pth'
from torch2trt import TRTModule

model_trt = TRTModule()
model_trt.load_state_dict(torch.load(OPTIMIZED_MODEL))

<All keys matched successfully>

In [3]:
from trt_pose.draw_objects import DrawObjects
from trt_pose.parse_objects import ParseObjects

parse_objects = ParseObjects(topology,cmap_threshold=0.15, link_threshold=0.15)
draw_objects = DrawObjects(topology)

In [4]:
def create_directories_for_classes(no_of_classes, path_dir, dataset_name):
    dir_ = os.path.join(path_dir, dataset_name)
    for i in range(no_of_classes):
        dir_to_create = os.path.join(dir_,"%s" % (i+1))
        try:
            os.makedirs(dir_to_create)
        except FileExistsError:
            print(os.path.join("The following directory was not created because it already exsists", dir_ , ))


In [5]:
dir_datasets = 'dataset/' #give the path to where you want to save you collected data
dataset_name = "hand_dataset_2class" #change this to hand_dataset_test when you are collecting data for test
no_of_classes = 2
create_directories_for_classes(no_of_classes, dir_datasets, dataset_name )

In [6]:
train_or_test = "testing" # training or testing

In [7]:
import ipywidgets.widgets as widgets
dir_ = os.path.join(dir_datasets, dataset_name)
curr_class_no = 1
button_layout = widgets.Layout(width='128px', height='32px')
curr_dir = os.path.join(dir_,'%s'%curr_class_no )
collecting_button = widgets.Button(description= 'Collect Class ' + str(curr_class_no), button_style='success', layout=button_layout)
prev_button = widgets.Button(description='Previous Class', button_style='primary', layout=button_layout)
nxt_button = widgets.Button(description='Next Class', button_style='info', layout=button_layout)

dir_count = widgets.IntText(layout=button_layout, value=len(os.listdir(curr_dir)))
dir_count.continuous_update

False

In [8]:
from uuid import uuid1
def save_snapshot(directory):
    image_path = os.path.join(directory, str(uuid1()) + '.jpg')
    with open(image_path, 'wb') as f:
        f.write(image_s.value)
def save_dir():
    global curr_dir, dir_count
    save_snapshot(curr_dir)
    dir_count.value = len(os.listdir(curr_dir))
def prev_dir():
    global curr_class_no, curr_dir, no_of_classes
    if curr_class_no>1:
        curr_class_no-=1
    curr_dir = os.path.join(dir_,'%s'%curr_class_no )
    collecting_button.description = 'Collect Class ' + str(curr_class_no)
    dir_count.value = len(os.listdir(curr_dir))
    dir_count.continuous_update
def nxt_dir():
    global curr_class_no, curr_dir, no_of_classes
    if curr_class_no<no_of_classes:
        curr_class_no+=1
    curr_dir = os.path.join(dir_,'%s'%curr_class_no )
    collecting_button.description = 'Collect Class ' + str(curr_class_no)
    dir_count.value = len(os.listdir(curr_dir))

        

collecting_button.on_click(lambda x: save_dir())
nxt_button.on_click(lambda x: nxt_dir())
prev_button.on_click(lambda x: prev_dir())

In [9]:

import torchvision.transforms as transforms
import PIL.Image

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda()
device = torch.device('cuda')

def preprocess(image):
    global device
    device = torch.device('cuda')
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device)
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

In [10]:
from preprocessdata import preprocessdata
preprocessdata = preprocessdata(topology, num_parts)
from gesture_classifier import gesture_classifier
gesture_classifier = gesture_classifier()

Next, let's define a function that will preprocess the image, which is originally in BGR8 / HWC format.

In [11]:
def draw_joints(image, joints):
    count = 0
    for i in joints:
        if i==[0,0]:
            count+=1
    if count>= 3:
        return 
    for i in joints:
        cv2.circle(image, (i[0],i[1]), 2, (0,0,255), 1)
    cv2.circle(image, (joints[0][0],joints[0][1]), 2, (255,0,255), 1)
    for i in hand_pose['skeleton']:
        if joints[i[0]-1][0]==0 or joints[i[1]-1][0] == 0:
            break
        cv2.line(image, (joints[i[0]-1][0],joints[i[0]-1][1]), (joints[i[1]-1][0],joints[i[1]-1][1]), (0,255,0), 1)

In [12]:
from jetcam.usb_camera import USBCamera
from jetcam.csi_camera import CSICamera
from jetcam.utils import bgr8_to_jpeg

camera = USBCamera(width=WIDTH, height=HEIGHT, capture_fps=30, capture_device=1)
#camera = CSICamera(width=WIDTH, height=HEIGHT, capture_fps=30)

camera.running = True

In [13]:
import ipywidgets
from IPython.display import display


image_w = ipywidgets.Image(format='jpeg', width=224, height=224)
image_s = ipywidgets.Image(format='jpeg', width=224, height=224)


In [14]:
display(image_w)
display(widgets.HBox([dir_count, collecting_button]))
display(widgets.HBox([ nxt_button]))
display(widgets.HBox([ prev_button]))

Image(value=b'', format='jpeg', height='224', width='224')

HBox(children=(IntText(value=0, layout=Layout(height='32px', width='128px')), Button(button_style='success', d…

HBox(children=(Button(button_style='info', description='Next Class', layout=Layout(height='32px', width='128px…

HBox(children=(Button(button_style='primary', description='Previous Class', layout=Layout(height='32px', width…

In [15]:
def execute(change):
    image = change['new']
    image_s.value = bgr8_to_jpeg(image[:, ::-1, :])
    data = preprocess(image)
    cmap, paf = model_trt(data)
    cmap, paf = cmap.detach().cpu(), paf.detach().cpu()
    counts, objects, peaks = parse_objects(cmap, paf)
    joints = preprocessdata.joints_inference(image, counts, objects, peaks)
    draw_joints(image, joints)
    #draw_objects(image, counts, objects, peaks)# try this for multiple hand pose prediction 
    image_w.value = bgr8_to_jpeg(image[:, ::-1, :])

In [16]:
execute({'new': camera.value})

In [17]:
camera.observe(execute, names='value')

In [18]:
camera.unobserve_all()

In [None]:
#camera.running = False

In [19]:
def generate_labels(dir_, dataset_name):
    labels = []
    starting_label = 1
    for i in range(no_of_classes):
        dir_to_check = os.path.join(dir_,"%s" % (i+1))
        for j in range(len(os.listdir(dir_to_check))):
            labels.append(starting_label)
        starting_label+=1
    labels_to_dict = {"labels": labels}
    if train_or_test == "training":
        filename = dir_+'.json'
    elif train_or_test == "testing": 
        filename = dir_+'_testing.json'
    else:
        print("ERROR: check your `train_or_test` variable")
    
    with open(filename, 'w') as outfile:
        json.dump(labels_to_dict, outfile)
    return labels      

In [20]:
def rename_images(dir_):
    overall_count = 0
    #dir_ = dir_+dataset_name
    for i in range(no_of_classes):
        dir_to_check = os.path.join(dir_,"%s" % (i+1))
        dir_to_check+='/'
        for count, filename in enumerate(os.listdir(dir_to_check)):
            dst = "%08d.jpg"% overall_count
            src = dir_to_check+filename
            dst = dir_to_check+dst 
            os.rename(src, dst)
            overall_count+=1

In [21]:
generate_labels(dir_, dataset_name)
rename_images(dir_)

# Pick a cell to execute depending on Training or Testing

In [22]:
import shutil
dir_training = dir_datasets +'/' + train_or_test + '/'#change this to /test/ when you are collecting data for test
try:
    os.makedirs(dir_training)
except FileExistsError:
    print(os.path.join("The following directory was not created because it already exsists", dir_ , ))
for i in range(no_of_classes):
    dir_to_check = os.path.join(dir_,"%s" % (i+1))+'/'
    for count, filename in enumerate(os.listdir(dir_to_check)):
            src = dir_to_check+filename
            shutil.move(src,dir_training)
    os.rmdir(dir_to_check)
shutil.move(dir_training,dir_)
if train_or_test == "training":
    shutil.move(dir_+'.json',dir_)
elif train_or_test == "testing": 
    shutil.move(dir_+'_testing.json',dir_)
else:
    print("ERROR: check your `train_or_test` variable")