In [None]:
import airsim
import numpy as np
import cv2
import os
import datetime
from PIL import Image

import nbimporter
import Classifier
import matplotlib.pyplot as plt
import timeit

depth_det = 10
depth_cf = 40
stacksize = 5

#Customizable
detectable = ['No_gesture', 'Swiping_Left', 'Swiping_Right', 'Stop_Sign', 'Pulling_Hand_In']
target = ['Pulling_Hand_In'] #STOP SIGN TIME = 0.0215, REG=0.14
ShowCropping = True
CreateVideos = True

In [2]:
def GetCurrentFrame():
    responses = client.simGetImages([
        airsim.ImageRequest("1", airsim.ImageType.Scene, False, False)])  #scene vision image in uncompressed RGB array

    response = responses[0]
    img1d = np.fromstring(response.image_data_uint8, dtype=np.uint8) # get numpy array
    img_rgb = img1d.reshape(response.height, response.width, 3) # reshape array to 3 channel image array H X W X 3
    #cv2.imwrite('fr.png',img_rgb)
    
    return img_rgb #as nparray

In [3]:
def load_checkpoint(filepath):
    checkpoint = torch.load(filepath, map_location='cpu')

    my_dict = checkpoint['state_dict']
    my_dict_copy = my_dict.copy()

    for key in my_dict_copy.keys():
        newkey = key[7:]
        my_dict[newkey] = my_dict.pop(key)

    checkpoint['state_dict'] = my_dict 
    
    model = resnet.resnet10(
        num_classes=2,
        shortcut_type='B',
        cardinality=32,
        sample_size=112,
        sample_duration=depth_det)

    model.load_state_dict(checkpoint['state_dict'])

    model.eval()
    return model

In [4]:
def getTransforms():
    scales = [1]
    for i in range(1, 5):
        scales.append(scales[-1] * 0.84089641525)
        
    norm_method = Normalize(get_mean(norm_value), [1, 1, 1])
            
    spatial_transform = spatial_transforms.Compose([
        Scale(112),
        CenterCrop(112),
        ToTensor(norm_value), norm_method 
    ])

    return spatial_transform

In [5]:
#TURN THE 37 FRAMES FROM ./TESTVID INTO TENSOR INPUT <1,3,32,112,112>
def TransformData(spatial_transform, frameST):
    a = []
    for frame in frameST:
        image = torchvision.transforms.ToPILImage()(frame)
        a.append(image)

    spatial_transform.randomize_parameters()
    input_spat = [spatial_transform(image) for image in a]
    
    im_dim = input_spat[0].size()[-2:]
    clip = torch.cat(input_spat, 0).view((duration, -1) + im_dim).permute(1, 0, 2, 3)
    input = clip[None, :, :, :, :] #Add 5th Dimension; batchSize=1
    print(input.shape)
    return input

In [6]:
def CropHuman(frame, show=False):
    hog = cv2.HOGDescriptor()
    hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
    frame = cv2.resize(frame, (768, 288))
    gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    boxes, weights = hog.detectMultiScale(frame, winStride=(8,8) )
    boxes = np.array([[x, y, x + w, y + h] for (x, y, w, h) in boxes])
    for (xA, yA, xB, yB) in boxes:
        cv2.rectangle(frame, (xA, yA), (xB, yB),
                        (0, 255, 0), 2)

    if show:
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        plt.imshow(frame)#, cmap = 'gray', interpolation = 'bicubic')
        plt.xticks([]), plt.yticks([])  # to hide tick values on X and Y axis
        plt.show()
    
    #return(boxes[0][0],boxes[0][1],boxes[0][2],boxes[0][3])
    return(boxes)

In [7]:
def human_detector(fr, verbose=False):
    cv2.imwrite('hum_det.png',fr)
    boxes = CropHuman(fr, verbose)
    if boxes.size == 0:
        return 0,0,0,0
    else:
        x = boxes[0][0] * 2
        y = boxes[0][1] * 2
        xF = boxes[0][2] * 2
        yF = boxes[0][3] * 2
        return x,y,xF,yF

In [9]:
def CropLastX(vstreamshort,x,y,xF,yF):
    newstream = []
    for fr in vstreamshort:
        fr = fr[y:yF,x:xF]
        height = fr.shape[0]
        width = fr.shape[1]
        fr = fr[int(round(height/7)):int(round(height/3)),int(round(width/9)):int(round(width*4/5))]
        #fr = fr[int(round(height/14)):int(round(height/3.6)),int(round(width/9)):int(round(width*4/5))]
        newstream.append(fr)
    return newstream

In [10]:
def Detector(frameST):
    return True

In [11]:
try: #24to18lux
    client = airsim.CarClient()
    client.confirmConnection()
    client.enableApiControl(True)
    car_controls = airsim.CarControls()
    
    timingarr = []
    percents = []
    correct = 0
    total = 0
    
    stopcalled = 0
    comecalled = 0
    rightcalled = 0
    leftcalled = 0
    ngcalled = 0
    
    Classifier.ClearDir('gesturevideos','avi')
    start = timeit.default_timer()

    videostream = []
    while total < 2040:
        try:
            videostream.append(GetCurrentFrame())
        except ValueError:
            continue #skip to next frame
        if len(videostream) >= depth_cf:
            if len(videostream) % stacksize == 0: #every X frames run human_detector
                total +=1
                print('STACK ' + str(total))
                x,y,xF,yF = human_detector(videostream[len(videostream)-1], ShowCropping)
                if x or y or xF or yF: #Only continue if human detected
                    #print('Human Detected.')
                    croppedstream_recent = CropLastX(videostream[-depth_cf:],x,y,xF,yF) #Crop last X frames of VS
                    exists = Detector(croppedstream_recent[-depth_det:]) #See if gesture detected
                    if exists:
                        classes, indices, percentages = Classifier.Classify(croppedstream_recent, CreateVideos)
                        for i in range(0,26):
                            gesture = classes[indices[0][i]]
                            if gesture not in detectable:
                                continue
                            elif gesture in target:
                                correct+=1
                                percents.append(percentages[indices[0][i]].item())
                                break
                            else: #predicted a class, but wrong one
                                if gesture == 'No_gesture':
                                    ngcalled+=1
                                if gesture == 'Swiping_Left':
                                    leftcalled+=1
                                if gesture == 'Swiping_Right':
                                    rightcalled+=1
                                if gesture == 'Stop_Sign':
                                    stopcalled+=1
                                if gesture == 'Pulling_Hand_In':
                                    comecalled+=1
                                break
                #else:
                    #print('Human NOT Detected')
                print("Accuracy: " + str(correct) + "/" + str(total))
                print("Time per Frame (Avg): " + str(sum(timingarr)/float(len(timingarr)+0.00000000000000000000000000000001)))
                print("Percentage (Avg): " + str(sum(percents)/float(len(percents)+0.00000000000000000000000000000001)))
                print(ngcalled)
                print(leftcalled)
                print(rightcalled)
                print(stopcalled)
                print(comecalled)
            end = timeit.default_timer()
            timingarr.append(end-start) #time per frame taken
            start = timeit.default_timer()   
        
    client.reset()
    client.enableApiControl(False)
except KeyboardInterrupt:
    client.reset()
    client.enableApiControl(False)

TransportError: Retry connection over the limit

In [None]:
sum(timingarr)/len(timingarr)