## Imports for the Project and Setup

In [1]:
import torch
import torchvision
from torchvision import transforms , datasets
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchsummary import summary # for model summary
import matplotlib.pyplot as plt # for ploting our data and showing images
import matplotlib.image as mpimg
import numpy as np
import cv2
import PIL.Image
import dataHelper as dH
import os
import time
import random
import ipywidgets
from IPython.display import display
from utils import preprocess
%matplotlib inline

## Model Setup

Here we are letting pytorch know to use the gpu and we are also importing a pretrained resnet18 model that we are modifying to return only 2 outputs. One for X and one for Y.

In [2]:
device = torch.device('cuda')
output_dim = 2
# RESNET 18
model = torchvision.models.resnet18(pretrained=True)
model.fc = torch.nn.Linear(512, output_dim)
model = model.to(device)

## Model Summary
This is the inner workings of resnet18.

In [None]:
summary(model, input_size=(3,224,224))

## Data Collection

### Files Structure Setup

In [3]:
path_raw = os.getcwd() + '/raw_datasets'
try:
    os.mkdir(path_raw)
except OSError:
    pass

In [4]:
 prev_data =  os.scandir(path_raw)
h_index = 0
prev_count = 0
for file in prev_data:
    filename, f_ext = os.path.splitext(file.name)
    if f_ext == '.jpg':
        prev_count += 1
        [num] = dH.label_parser(filename, label_num = 1, ext = False)
        if num > h_index:
            h_index = num
print(prev_count)

150


In [5]:
#config
datacollect = False
dataSetSize = 150
FPScollect = 30
SEC_delay = .5


loopcounter = 0
collectedNum = 0

if datacollect:
    
    image_widget = ipywidgets.Image(format='jpeg')
    captured_frame = ipywidgets.Image(format='jpeg')
    
    # count the index of all images
    prev_data =  os.scandir(path_raw)
    h_index = 0
    prev_count = 0
    for file in prev_data:
        filename, f_ext = os.path.splitext(file.name)
        if f_ext == '.jpg':
            prev_count += 1
            [num] = dH.label_parser(filename, label_num = 1, ext = False)
            if num > h_index:
                h_index = num
    
    collectedNum = prev_count
    h_index += 1
    
    camera.unobserve_all()
    print('Get Ready!')
    frame = camera.read()
    image_widget.value = bgr8_to_jpeg(frame)
    captured_frame.value = bgr8_to_jpeg(frame)
    #plt.imshow(bgr8_to_jpeg(frame))
    time.sleep(6)
    display(ipywidgets.HBox([image_widget, captured_frame]))
    print('Begin!')
    
    while True:
        if collectedNum == dataSetSize:
            break
        frame = camera.read()
        #plt.imshow(frame)
        image_widget.value = bgr8_to_jpeg(frame)
        if loopcounter % (FPScollect * SEC_delay) == 0:
            # write raw images to file 
            captured_frame.value = bgr8_to_jpeg(frame)
            dH.write_raw_frame(path_raw, frame, h_index)
            h_index += 1
            collectedNum += 1
        loopcounter += 1
print('Done!')

Done!


## Data Labeling

An I oop... data is labeled on aonther PC because some libraries are broken

## Training Data Build

In [6]:

REBUILD = True


path_labeled = os.getcwd() + '/labeled_datasets'
data = os.scandir(path_labeled)


#fig = plt.figure()
#ax = fig.add_subplot(1,1,1)
#plt.axis("off")

TRANSFORMS = transforms.Compose([
    transforms.ColorJitter(0.2, 0.2, 0.2, 0.2),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])


fill = np.zeros((224,224,2))

if REBUILD:
    training_data = []
    for i in data:
        if '.jpg' in i.name:
            img = cv2.imread(path_labeled +'/'+ i.name)
            img = PIL.Image.fromarray(img)
            [imgnum, x, y] = dH.label_parser(path_labeled +'/'+ i.name)
            #print(f'{imgnum},{x},{y}')
            #imgplot = plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
            x = 2.0 * (x / 224 - 0.5)
            y = 2.0 * (y / 224 - 0.5)
            
            img = TRANSFORMS(img)
            training_data.append([np.array(img),np.array([x,y])])
    np.random.shuffle(training_data)
    np.save('training_data.npy', training_data)



## Model Training

In [7]:
optimizer = optim.Adam(model.parameters(), lr = 0.0001)
loss_func = nn.MSELoss()

TRAIN = True

EPOCHS = 25
BATCH_SIZE = 1

training_data = np.load('training_data.npy', allow_pickle=True)
training_img = torch.Tensor([i[0] for i in training_data]).view(-1,3,224,224).to(device)
training_label = torch.Tensor([i[1] for i in training_data]).view(-1,2).to(device)

if False:
    model = torch.load('test_run_1')

if TRAIN:
    model = model.train()
    for epoch in range(EPOCHS):
        # randomize each epoch
        #rand_idx = torch.randperm(len(training_data))
        
        for i in range(0, len(training_data), BATCH_SIZE):
            model.zero_grad()
            
            #output = model(training_img[rand_idx[i:i+BATCH_SIZE]].view(-1,3,224,224))
            output = model(training_img[i:i+BATCH_SIZE].view(-1,3,224,224))
            
            #computed_output = 224 * (output / 2.0 + 0.5)
            #processed_labels = 2.0 * (training_label[rand_idx[i:i+BATCH_SIZE]].view(-1,2) / 224 - 0.5)
            #processed_labels = 2.0 * (training_label[i:i+BATCH_SIZE].view(-1,2) / 224 - 0.5)
            
            loss = loss_func(output,training_label[i:i+BATCH_SIZE].view(-1,2)) #computed_output,training_label[rand_idx[i:i+BATCH_SIZE]].view(-1,2)
            loss.backward()
            optimizer.step()
            #break
        #break
        #print(output)
        #print(processed_labels)
        print(loss)
    '''
    print('testing')
    print(output)
    print(computed_output)
    #plt.imshow(check.cpu().view(224,224,3).numpy().astype(int))
    '''

tensor(0.1759, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0196, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0017, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0100, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0091, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0007, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0322, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(7.2550e-05, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0070, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0131, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0101, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0042, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0004, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0015, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0016, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0100, device='cuda:0', grad_fn=<MseLossBackward>)
tensor(0.0163, device='cuda:0', grad_fn=<MseLossBack

In [11]:
torch.save(model, 'demo_run_try2')

In [12]:
test_idx = 44
o = model(training_img[test_idx].view(-1,3,224,224))
c_output = 224 * (o / 2.0 + 0.5)
L = training_label[test_idx]
L = 224 * (L / 2.0 + 0.5)
print(c_output)
print(L)

tensor([[201.9026,  28.3197]], device='cuda:0', grad_fn=<MulBackward0>)
tensor([207.0000,  16.0000], device='cuda:0')


## Evaluation

In [None]:
cap = cv2.VideoCapture(0)
Eval = True
#camera.running = False
if Eval:
    model = model.eval()
    #camera.unobserve_all()
    eval_iwdget = ipywidgets.Image(format='jpeg')
    
    image = cap.read()
    input_tensor = torch.Tensor(image).view(-1,3,224,224).to(device)
    output = model(input_tensor).detach().cpu().numpy().flatten()
    #print(224 * (output / 2.0 + 0.5))
    image = cv2.circle(image, (int(224 * (output[0] / 2.0 + 0.5)), int(224 * (output[1] / 2.0 + 0.5))), 8, (255, 0, 0), 3)
    eval_iwdget.value = bgr8_to_jpeg(image)
    display(eval_iwdget)
    
    
    while True:
        image = cap.read()
        #input_tensor = torch.Tensor(image).view(-1,3,224,224).to(device)
        processed = preprocess(image)
        output = model(processed).detach().cpu().numpy().flatten()
        image = cv2.circle(image, (int(224 * (output[0] / 2.0 + 0.5)), int(224 * (output[1] / 2.0 + 0.5))), 8, (255, 0, 0), 3)
        eval_iwdget.value = bgr8_to_jpeg(image)
    
    
    #prediction = cv2.circle(prediction, (x, y), 8, (255, 0, 0), 3)

In [16]:
cap = cv2.VideoCapture(0)
from utils import preprocess
image = cap.read()
cap.release()


In [18]:
type(image)

tuple

In [14]:
processed = preprocess(image)
p = processed.cpu().numpy()
np.reshape(p,(224,224,3))
#imgplot = plt.imshow(p)

AttributeError: 'tuple' object has no attribute '__array_interface__'

In [17]:
cap.release()