# Evitement des collisions - Démonstration (Resnet18)

Dans ce notebook, nous allons utiliser le modèle entrainé en situation réelle !

In [None]:
import tensorflow as tf
import numpy as np
import cv2
import os
import shutil

from tensorflow import keras

# Création du modèle

In [None]:
import classification_models
from classification_models.tfkeras import Classifiers

# Chargement du modèle ResNEt18
ResNet18, preprocess_input = Classifiers.get('resnet18')

# Instanciation du modèle pré-entrainé ResNet18
base_model = ResNet18(input_shape=(224,224,3), weights='imagenet', include_top=False,pooling=False)

# Désactivation des couches pour l'entrainement
for layer in base_model.layers:
    layer.trainable = False
    
# Ajout de l'applatissement des sorties et de la couche dense avec 2 neurones"
x = tf.keras.layers.GlobalAveragePooling2D()(base_model.output)
output = tf.keras.layers.Dense(units=2, activation='softmax')(x)

model = tf.keras.Model(inputs=[base_model.input], outputs=[output])

On charge ensuite les poids sauvegardés lors de l'entrainement :

In [None]:
model.load_weights("meilleur_modele_colab.hdf5")

### Création de l'interface d'acquisition

J'ai créé une classe Image personnelle afin d'optimiser le pipeline avec OpenCV :

In [None]:
import traitlets
import threading
import atexit
import numpy as np


class Camera(traitlets.HasTraits):
    type_camera = traitlets.Unicode("CSI")
    capture_device = traitlets.Integer(default_value=0)
    capture_width = traitlets.Integer(default_value=1280)
    capture_height = traitlets.Integer(default_value=720)
    display_width = traitlets.Integer(default_value=640)
    display_height = traitlets.Integer(default_value=480)
    fps = traitlets.Integer(default_value=30)
    flip = traitlets.Integer(default_value=0)
    image = traitlets.Any()
    video_on = traitlets.Bool(default_value=False)
    
    def __init__(self,*args,**kwargs):
        super(Camera, self).__init__(*args, **kwargs)
        self._running = False
        self.image = np.zeros((self.display_height, self.display_width, 3), dtype=np.uint8)
        
        if self.type_camera.find("CSI")>=0:
            self.cap = cv2.VideoCapture(self._gstreamer_pipeline_CSI(),cv2.CAP_GSTREAMER)
        else:
            self.cap = cv2.VideoCapture(self._gstreamer_pipeline_USB(),cv2.CAP_GSTREAMER)

        if self.cap.isOpened():
            print("Caméra initialisée")
        else:
            print("Erreur d'ouverture du flux vidéo")
        atexit.register(self.cap.release)
    
    # Lecture d'une frame
    def capture_image(self):
        re, image = self.cap.read()
        if re:
            image_resized = cv2.resize(image,(int(self.display_width),int(self.display_height)))
            return image_resized
        else:
            return self.image
    
    # ON/OFF de la capture vidéo
    def capture_video(self,run=False):
        if run is True:
            self.video_on = True
        else:
            self.video_on = False
    
    # Lecture d'un flux vidéo
    def _capture_video(self):
        while True:
            if not self._running:
                break
            self.image = self.capture_image()

            
    # Détachement de la caméra
    def release(self):
        self.cap.release()

    # Définition du pipeline pour la caméra CSI
    def _gstreamer_pipeline_CSI(self):
        return("nvarguscamerasrc sensor-id=%d ! "
                "video/x-raw(memory:NVMM),"
                "width=(int)%d,height=(int)%d,"
                "format=(string)NV12, framerate=(fraction)%d/1 ! "
                "nvvidconv flip-method=%d ! "
                "video/x-raw,"
                "width=(int)%d,height=(int)%d,"
                "format=(string)BGRx ! videoconvert ! "
                "video/x-raw, format=(string)BGR ! "
                "appsink drop=true"
        %(self.capture_device,self.capture_width,self.capture_height,self.fps,self.flip, self.display_width,self.display_height))

    # Définition du pipeline pour la USB
    def _gstreamer_pipeline_USB(self):
        return("v4l2src device=/dev/video%d ! "
               "video/x-raw, width=(int)%d, height=(int)%d, framerate=(fraction)%d/1 ! "
               "videoflip method=%d ! "
               "videoconvert ! "
               "video/x-raw, format=(string)BGR ! appsink drop=true"
        %(self.capture_device,self.capture_width,self.capture_height,self.fps,self.flip))
    
    # Surveillance de la variable "video_on"
    @traitlets.observe('video_on')
    def _on_running(self, change):
        if change['new'] and not change['old']:
            # not running -> running
            self._running = True
            self.thread = threading.Thread(target=self._capture_video)
            self.thread.start()
        elif change['old'] and not change['new']:
            # running -> not running
            self._running = False
            self.thread.join()

Fonction pour initialiser la caméra :

In [None]:
def InitCamera():
    camera = Camera(type_camera="CSI",capture_device=0,
                capture_width=224,capture_height=224,
                display_width=224,display_height=224,
                fps=25,flip=0)
    return camera

Création de l'interface :

In [None]:
import ipywidgets
import traitlets
from IPython.display import display

try :
    camera.capture_video(run=False)
    camera.release()
    del camera
except NameError:
    pass

# Initialise la caméra
camera = InitCamera()

In [None]:
import traitlets
from IPython.display import display
import ipywidgets.widgets as widgets

from jetbot import bgr8_to_jpeg

# Lance la caméra
camera.capture_video(run=True)

# Création de l'interface
image = widgets.Image(format='jpeg', width=224, height=224)
slider_bloquer = widgets.FloatSlider(description='bloquer', min=0.0, max=1.0, orientation='vertical')
slider_vitesse = widgets.FloatSlider(description='vitesse', min=0.0, max=0.5, value=0.0, step=0.01, orientation='horizontal')

lien_camera = traitlets.dlink((camera, 'image'), (image, 'value'), transform=bgr8_to_jpeg)

display(widgets.VBox([widgets.HBox([image, slider_bloquer]), slider_vitesse]))

Il faut ensuite créer une instance du robot pour pouvoir le commander :

In [None]:
from jetbot import Robot

robot = Robot()

Ensuite, on créé une fonction qui va être appellée à chaque fois qu'une nouvelle image est disponible. Cette fonction doit réaliser les opérations suivantes :

1. Pré-traitement de l'image issue de la caméra
2. Appliquer le modèle sur l'image récupérée
3. Si le modèle indique que nous sommes bloqué, alors le robot tourne à gauche. Sinon le robot avance.

In [None]:
import time

def mise_a_jour(change):
    global slider_bloquer, robot
    x = change['new']                               # (224,224,3)
    #x = traitement_image(x)                        # Aucun traitement ici
    y = model(tf.expand_dims(x,0))                  # (224,224,3) => (1,224,224,3)
      
    proba_bloquer = float(np.asarray(y[0])[0])
    
    slider_bloquer.value = proba_bloquer
    
    if proba_bloquer < 0.5:
        robot.forward(slider_vitesse.value)
    else:
        robot.left(slider_vitesse.value)
    
    time.sleep(0.001)

In [None]:
mise_a_jour({'new': camera.image})  # Appel la fonction une première fois pour initialiser le réseau

Nous avons donc maintenant notre réseeau de neurones en place, mais il reste à créer le lien entre la caméra et la fonction de mise à jour pour récupérer les images filmées. Cela se fait avec la fonction ``observe``.

In [None]:
camera.observe(mise_a_jour, names='image')

Vous devriez maintenant voir le slider indiquer la probabilité d'être bloqué ou non ! Il suffit d'augmenter la vitesse pour tester le bon fonctionnement du robot en déplacement.

Pour arrêter les expériences, il suffit de rompre le lien avec la fonction de mise à jour :

In [None]:
import time

camera.unobserve(mise_a_jour, names='image')

time.sleep(0.1)  # Petit temps d'attente pour que l'image soit bien traitée par le modèle

robot.stop()

Avant de quitter, il faut fermer la caméra :

In [None]:
try :
    camera.capture_video(run=False)
    camera.release()
    del camera
except NameError:
    pass

### Conclusion

That's it for this live demo!  Hopefully you had some fun and your robot avoided collisions intelligently! 

If your robot wasn't avoiding collisions very well, try to spot where it fails.  The beauty is that we can collect more data for these failure scenarios
and the robot should get even better :)