# Collision Avoidance - Data Collection

In [9]:
# INITIALISATION DE LA CAMERA ET IMPORTATION DES BIBLIOTHEQUES
import traitlets
import ipywidgets.widgets as widgets
from IPython.display import display
from jetbot import Camera, bgr8_to_jpeg
camera = Camera.instance(width=224, height=224)
image = widgets.Image(format='jpeg', width=224, height=224)  # this width and height doesn't necessarily have to match the camera

camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)

#---- CREATION DES DOSSIERS CONTENANT LES DIFFERENTES PHOTOS -----
import os

detectnames = ['blocked','free','stopsign', 'speed1sign','speed2sign','speed3sign','moveright','moveleft','pieton']
detectdirs = []
for i in range(len(detectnames)):
    detectdirs.append('dataset/'+ detectnames[i])

# we have this "try/except" statement because these next functions can throw an error if the directories exist already
try:
    for i in range(len(detectdirs)):
        os.makedirs(detectdirs[i])

except FileExistsError:
    print('Directories not created because they already exist')
    
    

#FORME DES BOUTONS
button_layout = widgets.Layout(width='200px', height='32px')

#BOUTONS
# button_style options : primary/success/info/warning/danger
buttons =[]
buttons.append(widgets.Button(description='add free', button_style='success', layout=button_layout))
buttons.append(widgets.Button(description='add blocked', button_style='danger', layout=button_layout))

for i in range(2,len(detectnames)):
    buttons.append(widgets.Button(description='add '+ detectnames[i], button_style='warning', layout=button_layout))

#----- COMPTEUR D'ELEMENTS -----
compteurs = []
for directory in detectdirs:
    compteurs.append(widgets.IntText(layout=button_layout, value=len(os.listdir(directory))))
    
    
# ----- Evenement on click -----

from uuid import uuid1

def save_snapshot(directory):
    image_path = os.path.join(directory, str(uuid1()) + '.jpg')
    with open(image_path, 'wb') as f:
        f.write(image.value)

def save(i):
    print(i)
    global detectdirs, compteurs
    save_snapshot(detectdirs[i])
    compteurs[i].value = len(os.listdir(detectdirs[i]))    

    
buttons[0].on_click(lambda x: save(0))
buttons[1].on_click(lambda x: save(1))
buttons[2].on_click(lambda x: save(2))
buttons[3].on_click(lambda x: save(3))
buttons[4].on_click(lambda x: save(4))
buttons[5].on_click(lambda x: save(5))
buttons[6].on_click(lambda x: save(6))
buttons[7].on_click(lambda x: save(7))
buttons[8].on_click(lambda x: save(8))

In [10]:
display(image)
for i in range(len(detectnames)):
    display(widgets.HBox([compteurs[i], buttons[i]]))

Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x02\x01\x0…

HBox(children=(IntText(value=0, layout=Layout(height='32px', width='200px')), Button(button_style='success', d…

HBox(children=(IntText(value=0, layout=Layout(height='32px', width='200px')), Button(button_style='danger', de…















Again, let's close the camera conneciton properly so that we can use the camera in the later notebook.

In [11]:
camera.stop()

In [2]:
!zip -r -q wb.zip whitebox

^C


zip error: Interrupted (aborting)


# Collision Avoidance - Train Model

### Bibliothèques utilisées

In [None]:
import torch
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision.transforms as transforms

In [None]:
# Si les photos sont importées dans un fichier zip :
!unzip -q dataset.zip

### Create dataset instance

In [None]:
dataset = datasets.ImageFolder(
    root = 'dataset',
    transforms.Compose([
        transforms.ColorJitter(0.1, 0.1, 0.1, 0.1),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
)

# --- Explications ---
#ColorJitter(brightness=0, contrast=0, saturation=0, hue=0) hue = teinte
#

### Split dataset into train and test sets

In [None]:
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [len(dataset) - len(dataset)//2, len(dataset)//2])

### Create data loaders to load data in batches

In [None]:
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=8,
    shuffle=True,
    num_workers=0
)

test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=8,
    shuffle=True,
    num_workers=0
)

### Define the neural network

In [None]:
model = models.alexnet(pretrained=True)

In [None]:
model.classifier[6] = torch.nn.Linear(model.classifier[6].in_features, len(detectnames))
device = torch.device('cuda')
model = model.to(device)

### Train the neural network

In [None]:
NUM_EPOCHS = 30
BEST_MODEL_PATH = 'best_model.pth'
best_accuracy = 0.0

optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

for epoch in range(NUM_EPOCHS):
    
    for images, labels in iter(train_loader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = F.cross_entropy(outputs, labels)
        loss.backward()
        optimizer.step()
    
    test_error_count = 0.0
    for images, labels in iter(test_loader):
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        test_error_count += float(torch.sum(torch.abs(labels - outputs.argmax(1))))
    
    test_accuracy = 1.0 - float(test_error_count) / float(len(test_dataset))
    print('%d: %f' % (epoch, test_accuracy))
    if test_accuracy > best_accuracy:
        torch.save(model.state_dict(), BEST_MODEL_PATH)
        best_accuracy = test_accuracy