In [None]:
%run Scripts/Read_RP_Lidar.ipynb
%run Scripts/Estimate_stages.ipynb
%run Scripts/Calculate_Angles.ipynb
%run Scripts/Env_Class.ipynb

# Variables

In [None]:
# If True you WILL Collect Data
global COLLECT_DATA 
COLLECT_DATA = False 

# If you have a Fan, set to True
global FAN_CONTROL
FAN_CONTROL = False

# Define what angles you want to measure
angles = [x for x in range(180,360,20)]+[x for x in range(0,180,20)]

# A3C Trained Model Path
UNITY_MODEL = "Files/George-1-128.pth"

# Reset Camera + Lidar

In [None]:
import getpass
import os

# Reset Camera
password = 'jetbot'
command = "sudo -S systemctl restart nvargus-daemon" #can be any command but don't forget -S as it enables input from stdin
os.system('echo %s | %s' % (password, command))

# Reset Lidar
command = "sudo -S chmod 666 /dev/ttyUSB0" #can be any command but don't forget -S as it enables input from stdin
os.system('echo %s | %s' % (password, command))

# Load the Pre-Trained Keypoints RCNN

In [None]:
from threading import Thread
import os
import json
import trt_pose.coco

with open('Files/human_pose.json', 'r') as f:
    human_pose = json.load(f)

topology = trt_pose.coco.coco_category_to_topology(human_pose)

In [None]:
import trt_pose.models

num_parts = len(human_pose['keypoints'])
num_links = len(human_pose['skeleton'])

In [None]:
import torch
from jetbot import Robot
import time

WIDTH = 224
HEIGHT = 224

data = torch.zeros((1, 3, HEIGHT, WIDTH)).cuda()

In [None]:
import torch2trt
OPTIMIZED_MODEL = 'Files/resnet18_baseline_att_224x224_A_epoch_249_trt.pth'

from torch2trt import TRTModule
model_trt = TRTModule()
model_trt.load_state_dict(torch.load(OPTIMIZED_MODEL))
print("KeyPoints RCNN Successfully Loaded. ")

# Initialize the Camera

In [None]:
import cv2
import torchvision.transforms as transforms
import PIL.Image
import time

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda()
device = torch.device('cuda')

def preprocess(image):
    global device
    device = torch.device('cuda')
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device)
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

In [None]:
from trt_pose.draw_objects import DrawObjects
from trt_pose.parse_objects import ParseObjects

parse_objects = ParseObjects(topology)
draw_objects = DrawObjects(topology)

In [None]:
from jetcam.csi_camera import CSICamera

camera = CSICamera(width=WIDTH, height=HEIGHT, capture_fps=15)
camera.running = True

# Initialize Paths to Save Collected KeyPoints Data

In [None]:
import numpy as np
import shutil
global folder

camera.running = True

if COLLECT_DATA:
    folder = "/home/jetbot/Desktop/"
    file_path = folder + "temp.csv"
    f = open(file_path, "a")

# Get the KeyPoints (For Angle Calculation)

In [None]:
def get_keypoints(image, human_pose, topology, object_counts, objects, normalized_peaks):
    """Get the keypoints from torch data and put into a dictionary where keys are keypoints
    and values the x,y coordinates. The coordinates will be interpreted on the image given.

    Args:
        image: cv2 image
        human_pose: json formatted file about the keypoints

    Returns:
        dictionary: dictionary where keys are keypoints and values are the x,y coordinates
    """
    height = image.shape[0]
    width = image.shape[1]
    keypoints = {}
    K = topology.shape[0]
    count = int(object_counts[0])

    for i in range(count):
        obj = objects[0][i]
        C = obj.shape[0]
        for j in range(C):
            k = int(obj[j])
            if k >= 0:
                peak = normalized_peaks[0][j][k]
                x = round(float(peak[1]) * width)
                y = round(float(peak[0]) * height)
                keypoints[human_pose["keypoints"][j]] = (x, y)

    return keypoints

# Get KeyPoints (To save in Files)

In [None]:
def get_keypoint(humans, hnum, peaks):
    #check invalid human index
    kpoint = []
    human = humans[0][hnum]
    C = human.shape[0]
    for j in range(C):
        k = int(human[j])
        if k >= 0:
            peak = peaks[0][j][k]   # peak[1]:width, peak[0]:height
            peak = [j, float(peak[0]), float(peak[1])]
            kpoint.append(peak)
        else:    
            peak = [j, -1, -1] # -1 = None
            kpoint.append(peak)

    return kpoint

# Define Speed Control

In [None]:
def fan_control(speed):
    if speed >= 0 and speed <=100:
        print("Changing speed of fan to: " + str(speed))
        cmd = "echo " + str(speed) + " > /sys/devices/pwm-fan/target_pwm"
        os.system(cmd)
        print("Speed of fan changed")
    else:
        print("Speed value incorrect: " + str(speed) + ". Not changing fan speed")

# Load Unity Trained Model and defines more variables

In [None]:
model = torch.load(UNITY_MODEL)
model.eval()

# Initialize Environment

In [None]:
robot = Robot()
env = Environment(model, angles, robot)

# Defines Thread that handles running the Environment

In [None]:
from IPython.display import clear_output as cls

def RUN_ENV():
    
    global environment_running
    
    environment_running = True
    
    for i in range (100):
        #cls(wait=True)
        print(f'\n\nRunning for iteration #: {i}')
        observation = env.observe()
        # Normalize RPLIDAR
        observation2 = observation[0:4] + [x/12000 for x in observation[4:]]
        print('Observation collected: ', observation2)

        action = env.sample_action(observation2)
        print('Action sampled: ', action)
#         env.step(action)
#         print('Action Taken')  

# Defines Execute used to calculate angle

In [None]:
def execute2(change, visualize=True):
    image = change['new']
    data = preprocess(image)
    cmap, paf = model_trt(data)
    cmap, paf = cmap.detach().cpu(), paf.detach().cpu()
    counts, objects, peaks = parse_objects(cmap, paf)#, cmap_threshold=0.15, link_threshold=0.15)
    keypoints = get_keypoints(image, human_pose, topology, counts, objects, peaks)

    return keypoints, image, counts, objects, peaks

# Defines Execute to visualize Camera

In [None]:
def execute(change):
    image = change['new']
    data = preprocess(image)
    cmap, paf = model_trt(data)
    cmap, paf = cmap.detach().cpu(), paf.detach().cpu()
    counts, objects, peaks = parse_objects(cmap, paf)#, cmap_threshold=0.15, link_threshold=0.15)
    keypoints = get_keypoint(objects, 0, peaks) # 0 for just the first human recognized in the frame

    draw_objects(image, counts, objects, peaks)
    image_w.value = bgr8_to_jpeg(image[:, ::-1, :])
    
    if COLLECT_DATA:
        np.savetxt(f, keypoints, delimiter=',', fmt='%.4f', newline='\n')
    
        global kp_sets_saved
        global files_copied
        kp_sets_saved = kp_sets_saved + 1
        print(str(int(time.time())) + " kp sets saved: " + str(kp_sets_saved))
        if kp_sets_saved == 100: # save every 100 sets of keypoints
            current_epoch = int(time.time())
            new_file = folder + str(current_epoch) + ".csv"
            print(str(int(time.time())) + " temp file copied to: " + new_file)
            shutil.copy(file_path, new_file)
            files_copied = files_copied + 1
            kp_sets_saved = 0
            print(str(int(time.time())) + " temp file truncated")
            f.truncate()
            
        if files_copied == 20 and upload_running == 0:
            thread = Thread(target = upload) 
            thread.start()
        
    if environment_running == False:
        thread2 = Thread(target = RUN_ENV)
        thread2.start()

In [None]:
if COLLECT_DATA:
    kp_sets_saved = 0
    files_copied = 0
    upload_running = 0

# activate fan on low speed before starting processing images
if FAN_CONTROL:
    fan_control(64) # 1/4 of full speed, 255 is full speeed
    
environment_running = False

if COLLECT_DATA:
    upload()

In [None]:
import ipywidgets
from IPython.display import display
from jetcam.utils import bgr8_to_jpeg

image_w = ipywidgets.Image(format='jpeg', width=WIDTH, height=HEIGHT)
display(image_w)

In [None]:
execute({'new': camera.value})
camera.observe(execute, names='value')

In [None]:
if COLLECT_DATA: upload()
if FAN_CONTROL: fan_control(0)   # Fan off