In [None]:
import os
import json
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg 
import math
import numpy as np
import traitlets
import trt_pose.coco

In [None]:
with open('hand_pose.json', 'r') as f:
    hand_pose = json.load(f)

topology = trt_pose.coco.coco_category_to_topology(hand_pose)
import trt_pose.models

num_parts = len(hand_pose['keypoints'])
num_links = len(hand_pose['skeleton'])

model = trt_pose.models.resnet18_baseline_att(num_parts, 2 * num_links).cuda().eval()
import torch


WIDTH = 256
HEIGHT = 256
data = torch.zeros((1, 3, HEIGHT, WIDTH)).cuda()

if not os.path.exists('resnet18_244x224_epoch_4150_trt.pth'):
    MODEL_WEIGHTS = 'resnet18_244x224_epoch_4150.pth'
    model.load_state_dict(torch.load(MODEL_WEIGHTS))
    import torch2trt
    model_trt = torch2trt.torch2trt(model, [data], fp16_mode=True, max_workspace_size=1<<25)
    OPTIMIZED_MODEL = 'resnet18_244x224_epoch_4150_trt.pth'
    torch.save(model_trt.state_dict(), OPTIMIZED_MODEL)


OPTIMIZED_MODEL = 'resnet18_244x224_epoch_4150_trt.pth'
from torch2trt import TRTModule

model_trt = TRTModule()
model_trt.load_state_dict(torch.load(OPTIMIZED_MODEL))

In [None]:
from trt_pose.draw_objects import DrawObjects
from trt_pose.parse_objects import ParseObjects

parse_objects = ParseObjects(topology,cmap_threshold=0.35, link_threshold=0.15)
draw_objects = DrawObjects(topology)

This notebook creates a dataset (images and labels as a json file). The dataset created can be used for pose classification.   
In order to create a new dataset for gesture recoginition specify the following parameters 

**no_of_classes** - Number of classes to be created. i.e. For hand pose the number of hand gestures to be created.

**path_dir** - Path to the directory to be created

**dataset_name** - The name of the dataset to be created


In [None]:
def create_directories_for_classes(no_of_classes, path_dir, dataset_name):
    dir_ = os.path.join(path_dir, dataset_name)
    for i in range(no_of_classes):
        dir_to_create = os.path.join(dir_,"%s" % (i+1))
        try:
            os.makedirs(dir_to_create)
        except FileExistsError:
            print(os.path.join("The following directory was not created because it already exsists", dir_ , ))


In [None]:
dir_datasets = '/home/mikyas/mike_dataset/gesture/'
dataset_name = "hand_dataset_test"
no_of_classes = 6
create_directories_for_classes(no_of_classes, dir_datasets, dataset_name )

In [None]:
import ipywidgets.widgets as widgets
dir_ = os.path.join(dir_datasets, dataset_name)
curr_class_no = 1
button_layout = widgets.Layout(width='128px', height='32px')
curr_dir = os.path.join(dir_,'%s'%curr_class_no )
collecting_button = widgets.Button(description= 'Collect Class ' + str(curr_class_no), button_style='success', layout=button_layout)
prev_button = widgets.Button(description='Previous Class', button_style='primary', layout=button_layout)
nxt_button = widgets.Button(description='Next Class', button_style='info', layout=button_layout)

dir_count = widgets.IntText(layout=button_layout, value=len(os.listdir(curr_dir)))
dir_count.continuous_update

In [None]:
from uuid import uuid1
def save_snapshot(directory):
    image_path = os.path.join(directory, str(uuid1()) + '.jpg')
    with open(image_path, 'wb') as f:
        f.write(image_s.value)
def save_dir():
    global curr_dir, dir_count
    save_snapshot(curr_dir)
    dir_count.value = len(os.listdir(curr_dir))
def prev_dir():
    global curr_class_no, curr_dir, no_of_classes
    if curr_class_no>1:
        curr_class_no-=1
    curr_dir = os.path.join(dir_,'%s'%curr_class_no )
    collecting_button.description = 'Collect Class ' + str(curr_class_no)
    dir_count.value = len(os.listdir(curr_dir))
    dir_count.continuous_update
def nxt_dir():
    global curr_class_no, curr_dir, no_of_classes
    if curr_class_no<no_of_classes:
        curr_class_no+=1
    curr_dir = os.path.join(dir_,'%s'%curr_class_no )
    collecting_button.description = 'Collect Class ' + str(curr_class_no)
    dir_count.value = len(os.listdir(curr_dir))

        

collecting_button.on_click(lambda x: save_dir())
nxt_button.on_click(lambda x: nxt_dir())
prev_button.on_click(lambda x: prev_dir())



In [None]:
from jetcam.usb_camera import USBCamera
from jetcam.csi_camera import CSICamera
from jetcam.utils import bgr8_to_jpeg
WIDTH = 256
HEIGHT = 256
camera = USBCamera(width=WIDTH, height=HEIGHT, capture_fps=30, capture_device=1)
#camera = CSICamera(width=WIDTH, height=HEIGHT, capture_fps=50)

camera.running = True

In [None]:
import ipywidgets
from IPython.display import display

image_w = ipywidgets.Image(format='jpeg', width=256, height=256)
image_s = ipywidgets.Image(format='jpeg', width=256, height=256)


In [None]:
import torchvision.transforms as transforms
import PIL.Image

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda()
device = torch.device('cuda')

def preprocess(image):
    global device
    device = torch.device('cuda')
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device)
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

In [None]:
display(image_w)
display(widgets.HBox([dir_count, collecting_button]))
display(widgets.HBox([ nxt_button]))
display(widgets.HBox([ prev_button]))

In [None]:
def execute(change):
    image = change['new']
    #image_s = image
    image_s.value = bgr8_to_jpeg(image[:, ::-1, :])
    data = preprocess(image)
    cmap, paf = model_trt(data)
    cmap, paf = cmap.detach().cpu(), paf.detach().cpu()
    counts, objects, peaks = parse_objects(cmap, paf)#, cmap_threshold=0.15, link_threshold=0.15)
    draw_objects(image, counts, objects, peaks)
    image_w.value = bgr8_to_jpeg(image[:, ::-1, :])
    #image_w.value = bgr8_to_jpeg(image)

In [None]:
execute({'new': camera.value})

In [None]:
camera.observe(execute, names='value')

In [None]:
camera.unobserve_all()

In [None]:
camera.running = False

In [None]:
def generate_labels(dir_, dataset_name):
    labels = []
    starting_label = 1
    for i in range(len(os.listdir(dir_))):
        dir_to_check = os.path.join(dir_,"%s" % (i+1))
        for j in range(len(os.listdir(dir_to_check))):
            labels.append(starting_label)
        starting_label+=1
    labels_to_dict = {"labels": labels}
    with open((dir_+'.json'), 'w') as outfile:
        json.dump(labels_to_dict, outfile)
    return labels      

In [None]:
def rename_images(dir_):
    overall_count = 0
    #dir_ = dir_+dataset_name
    for i in range(len(os.listdir(dir_))):
        dir_to_check = os.path.join(dir_,"%s" % (i+1))
        dir_to_check+='/'
        for count, filename in enumerate(os.listdir(dir_to_check)):
            dst = "%08d.jpg"% overall_count
            src = dir_to_check+filename
            dst = dir_to_check+dst 
            os.rename(src, dst)
            overall_count+=1

In [None]:
generate_labels(dir_, dataset_name)
rename_images(dir_)

In [None]:
import shutil
dir_training = dir_datasets +'/testing/'
try:
    os.makedirs(dir_training)
except FileExistsError:
    print(os.path.join("The following directory was not created because it already exsists", dir_ , ))
for i in range(len(os.listdir(dir_))):
    dir_to_check = os.path.join(dir_,"%s" % (i+1))+'/'
    for count, filename in enumerate(os.listdir(dir_to_check)):
            src = dir_to_check+filename
            shutil.move(src,dir_training)
    os.rmdir(dir_to_check)
shutil.move(dir_training,dir_)
shutil.move(dir_+'.json',dir_)