In [1]:
%config IPCompleter.greedy=True

import tensorflow as tf
import numpy as np
import cv2
import os
import shutil

from tensorflow import keras
from matplotlib import pyplot as plt

# Mise en place de la caméra

### Recherche de la caméra

In [2]:
!ls -ltrh /dev/video*

crw-rw----+ 1 root video 81, 0 sept.  4 12:10 /dev/video0
crw-rw----+ 1 root video 81, 4 sept.  4 12:10 /dev/video1


### Définition de la classe Camera

In [3]:
import traitlets
import threading
import atexit
import numpy as np


class Camera(traitlets.HasTraits):
    type_camera = traitlets.Unicode("CSI")
    capture_device = traitlets.Integer(default_value=0)
    capture_width = traitlets.Integer(default_value=1280)
    capture_height = traitlets.Integer(default_value=720)
    display_width = traitlets.Integer(default_value=640)
    display_height = traitlets.Integer(default_value=480)
    fps = traitlets.Integer(default_value=30)
    flip = traitlets.Integer(default_value=0)
    image = traitlets.Any()
    video_on = traitlets.Bool(default_value=False)
    
    def __init__(self,*args,**kwargs):
        super(Camera, self).__init__(*args, **kwargs)
        self._running = False
        self.image = np.empty((self.display_height, self.display_width, 3), dtype=np.uint8)
        
        if self.type_camera.find("CSI")>=0:
            self.cap = cv2.VideoCapture(self._gstreamer_pipeline_CSI(),cv2.CAP_GSTREAMER)
        else:
            self.cap = cv2.VideoCapture(self._gstreamer_pipeline_USB(),cv2.CAP_GSTREAMER)

        if self.cap.isOpened():
            print("Caméra initialisée")
        else:
            print("Erreur d'ouverture du flux vidéo")
        atexit.register(self.cap.release)
    
    # Lecture d'une frame
    def capture_image(self):
        re, image = self.cap.read()
        if re:
            image_resized = cv2.resize(image,(int(self.display_width),int(self.display_height)))
        return image_resized
    
    # ON/OFF de la capture vidéo
    def capture_video(self,run=False):
        if run is True:
            self.video_on = True
        else:
            self.video_on = False
    
    # Lecture d'un flux vidéo
    def _capture_video(self):
        while True:
            if not self._running:
                break
            self.image = self.capture_image()

            
    # Détachement de la caméra
    def release(self):
        self.cap.release()

    # Définition du pipeline pour la caméra CSI
    def _gstreamer_pipeline_CSI(self):
        return("nvarguscamerasrc sensor-id=%d ! "
                "video/x-raw(memory:NVMM),"
                "width=(int)%d,height=(int)%d,"
                "format=(string)NV12, framerate=(fraction)%d/1 ! "
                "nvvidconv flip-method=%d ! "
                "video/x-raw,"
                "width=(int)%d,height=(int)%d,"
                "format=(string)BGRx ! videoconvert ! "
                "video/x-raw, format=(string)BGR ! "
                "appsink drop=true"
        %(self.capture_device,self.capture_width,self.capture_height,self.fps,self.flip, self.display_width,self.display_height))

    # Définition du pipeline pour la USB
    def _gstreamer_pipeline_USB(self):
        return("v4l2src device=/dev/video%d ! "
               "video/x-raw, width=(int)%d, height=(int)%d, framerate=(fraction)%d/1 ! "
               "videoflip method=%d ! "
               "videoconvert ! "
               "video/x-raw, format=(string)BGR ! appsink drop=true"
        %(self.capture_device,self.capture_width,self.capture_height,self.fps,self.flip))
    
    # Surveillance de la variable "video_on"
    @traitlets.observe('video_on')
    def _on_running(self, change):
        if change['new'] and not change['old']:
            # not running -> running
            self._running = True
            self.thread = threading.Thread(target=self._capture_video)
            self.thread.start()
        elif change['old'] and not change['new']:
            # running -> not running
            self._running = False
            self.thread.join()

### Instanciation de la classe Camera

In [4]:
def InitCamera():
    camera = Camera(type_camera="USB",capture_device=1,
                capture_width=640,capture_height=480,
                display_width=320,display_height=200,
                fps=30,flip=0)
    return camera

# Création des données d'entrainement

### Acquisition des images

On commence par créer une interface à l'aide des widgets permettant de récupérer les données et de les sauvegarder dans un dictionnaire :

In [5]:
def bgr8_to_jpeg(value, quality=75):
    return bytes(cv2.imencode('.jpg', value)[1])

In [None]:
TACHE = # à compléter
CATEGORIES = # à compléter
datasets = {}
for name in CATEGORIES:
    datasets[name] = []

In [None]:
datasets

In [None]:
import ipywidgets
import traitlets
from IPython.display import display


# Initialise la caméra
try :
    camera.capture_video(run=False)
    camera.release()
    del camera
except NameError:
    pass

camera = InitCamera()

# Création du widget de la vidéo
camera_widget = ipywidgets.Image()
traitlets.dlink((camera, 'image'), (camera_widget, 'value'), transform=bgr8_to_jpeg)

# Création des widgets de l'interface
category_widget = ipywidgets.Dropdown(options=CATEGORIES, description='Catégorie')
count_widget = ipywidgets.IntText(description='Nombre')
save_widget = ipywidgets.Button(description='Ajouter')

# Mise à jour du nombre de données dans les catégories
def update_counts(change):
    count_widget.value = len(datasets[change['new']])
count_widget.value = len(datasets[category_widget.value])
category_widget.observe(update_counts, names='value')

# Prise d'une image
def save(c):
    datasets[category_widget.value].append(camera.image)
    count_widget.value = len(datasets[category_widget.value])
save_widget.on_click(save)

data_collection_widget = ipywidgets.VBox([
    ipywidgets.HBox([camera_widget]), category_widget, count_widget, save_widget])

# Lancement de la vidéo
camera.capture_video(run=True)
camera_link = traitlets.dlink((camera, 'image'), (camera_widget, 'value'), transform=bgr8_to_jpeg)

# Affiche l'interface
display(data_collection_widget)

### Sauvegarde des images

Les données sont sauvegardées en respectant la structure requise par la fonction tf.keras.preprocessing.image_dataset_from_directory : https://www.tensorflow.org/api_docs/python/tf/keras/utils/image_dataset_from_directory

In [None]:
repertoire_courant = os.getcwd()

In [None]:
repertoire_courant

In [None]:
# Création des répertoires de sauvegarde
try:
    shutil.rmtree(repertoire_courant+"/projet_classification/"+TACHE)
except FileNotFoundError:
    pass

for i in range(len(CATEGORIES):
    try:
        shutil.rmtree(repertoire_courant+"/projet_classification/"+TACHE+"/"+CATEGORIES[i])
    except FileNotFoundError:
        pass
    os.makedirs(repertoire_courant+"/projet_classification/"+TACHE+"/"+CATEGORIES[i])

In [None]:
# Enregistrement des images
for i in range (len(CATEGORIES)):
    n = 0
    for image in datasets[CATEGORIES[i]]:
        n = n+1
        cv2.imwrite(repertoire_courant+"/projet_classification/"+TACHE+"/"+CATEGORIES[i]+"/"+CATEGORIES[i]+"_image_%d.jpg" %n,image)

# Création des datasets

### Chargement des images dans le dataset d'entrainement

A partir des images sauvegardées dans le répertoire de travail, on peut maintenant créer notre dataset :

In [None]:
dataset_entrainement = tf.keras.preprocessing.image_dataset_from_directory(
    repertoire_courant+"/projet_classification/"+TACHE,
    validation_split=0.0,
    image_size=(224, 224),
    batch_size=5,
    label_mode='categorical')

In [None]:
class_names = dataset_entrainement.class_names
print(class_names)

Regardons le format du tenseur contenu dans le dataset :

In [None]:
for image,label in dataset_entrainement.take(2):
    print(image.shape)
    print(label.shape)

Regardons comment est codée une image :

In [None]:
for image,label in dataset_entrainement.take(1):
    print(image[0])

Affichons quelques labels codé de manière "categorical" et leur valeur équivalente "binaire" :

In [None]:
for image,label in dataset_entrainement.take(3):
    print("Label categorical : %s" %label[0])
    print("Label binaire correspondant : %s" %np.argmax(label[0], axis=None, out=None))
    print("Classe correspondante : %s" %class_names[np.argmax(label[0], axis=None, out=None)])
    print("")

Affichons maintenant quelques images :

In [None]:
iterator = iter(dataset_entrainement)

plt.figure(figsize=(10, 10))
for i in range(8):
    ax = plt.subplot(4, 4, i + 1)
    image, label = iterator.get_next()
    plt.imshow(image[0].numpy().astype("uint8"))
    plt.title(class_names[np.argmax(label[0], axis=None, out=None)])
    plt.axis("off")

### Traitement des images

On applique les transformations suivantes sur les images :
    - Pour le Resnet18 : Il ne faut rien faire !
    - Pour le Resnet50 : Il faut utiliser la méthode spécifique au Resnet50 de Keras

##### Resnet18

On ne convertit pas les valeurs !

##### Resnet50

On utilise la fonction https://www.tensorflow.org/api_docs/python/tf/keras/applications/resnet50/preprocess_input :

In [None]:
dataset_entrainement = dataset_entrainement.map(
    lambda x,y: (tf.keras.applications.resnet50.preprocess_input(x),y))

Regardons à quoi ressembles les images :

In [None]:
for image,label in dataset_entrainement.take(1):
    print(image[0])

# Création du modèle

### Resnet50

Pour utiliser le modèle ResNet50, on utilise les applications disponnibles dans Keras : https://keras.io/api/applications/

In [None]:
base_model = tf.keras.applications.resnet50.ResNet50(weights='imagenet',
                                                     include_top=False,
                                                     input_shape=(224,224,3),
                                                     pooling=False)
model = tf.keras.models.Model(inputs=base_model.input, outputs=base_model.output)
model.summary()

In [None]:
# Désactivation des couches poru l'entrainement
for layer in base_model.layers:
    layer.trainable = False

model.summary()

In [None]:
# Ajout de l'applatissement des sorties et de la couche dense avec 2 neurones"
x = tf.keras.layers.GlobalAveragePooling2D()(base_model.output)
output = tf.keras.layers.Dense(units=2, activation='softmax')(x)
model = tf.keras.Model(inputs=[base_model.input], outputs=[output])
model.summary()

In [None]:
# Liste des couches du modèle
for i, layer in enumerate(model.layers):
   print(i, layer.name)

In [None]:
for layer in model.layers[165:]:
   layer.trainable = True

model.summary()

### Restnet18

Pour utiliser le modèle RestNet18, nous allons utiliser le package Image-classifiers disponnible sur le github : https://github.com/AlexandreBourrieau/classification_models

In [None]:
import classification_models
from classification_models.tfkeras import Classifiers

In [None]:
# Chargement du modèle ResNEt18
ResNet18, preprocess_input = Classifiers.get('resnet18')

# Instanciation du modèle pré-entrainé ResNet18
base_model = ResNet18(input_shape=(224,224,3), weights='imagenet', include_top=False,pooling=False)

model = tf.keras.models.Model(inputs=base_model.input, outputs=base_model.output)
model.summary()

In [None]:
# Désactivation des couches pour l'entrainement
for layer in base_model.layers:
    layer.trainable = False

model.summary()

In [None]:
# Ajout de l'applatissement des sorties et de la couche dense avec 2 neurones"
x = tf.keras.layers.GlobalAveragePooling2D()(base_model.output)
output = # à compléter
model = tf.keras.Model(inputs=[base_model.input], outputs=[output])
model.summary()

In [None]:
# Liste des couches du modèle
for i, layer in enumerate(model.layers):
   print(i, layer.name)

In [None]:
for layer in model.layers[68:]:
   layer.trainable = True

model.summary()

# Entrainement du modèle

In [None]:
# Nombre de périodes d'entrainement
periodes = 5

# Entrainement du modèle
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model.fit(dataset_entrainement,verbose=1,epochs=periodes)

# Expérimentations et évaluations

### Fonction de pré-traitement de l'image

La fonction ci-dessous permet de traiter l'image avant de réaliser la prédiction avec le modèle :

In [None]:
def traitement_image(image):
    image = tf.image.resize(image,[224,224])
#    image = tf.keras.applications.resnet50.preprocess_input(image)
    return image

### Création de l'interface d'acquisition

In [None]:
# Initialise la caméra
try :
    camera.capture_video(run=False)
    camera.release()
    del camera
except NameError:
    pass

camera = InitCamera()

# Création du widget de la vidéo
camera_widget = ipywidgets.Image()
traitlets.dlink((camera, 'image'), (camera_widget, 'value'), transform=bgr8_to_jpeg)

# Lancement de la vidéo
camera.capture_video(run=True)
camera_link = traitlets.dlink((camera, 'image'), (camera_widget, 'value'), transform=bgr8_to_jpeg)

In [None]:
# Création des widgets
prediction_widget = ipywidgets.Text(description='prediction')
state_widget = ipywidgets.ToggleButtons(options=['stop', 'live'], description='state', value='stop')

# Initialisation des scores dans le widgets Slider
score_widgets = []
for categorie in CATEGORIES:
    score_widget = ipywidgets.FloatSlider(min=0.0, max=1.0, description=categorie, orientation='vertical')
    score_widgets.append(score_widget)


# Fonction de traitement des actions du widget "state_widget"
def live(state_widget, model, camera, prediction_widget, score_widget):
    global dataset
    while state_widget.value == 'live':
        image = camera.image
        image = traitement_image(image)
        output = model(tf.expand_dims(image,0))
        score_widgets[1].value = # à coppléter
        # à compléter
        # ...
       
def start_live(change):
    if change['new'] == 'live':
        global execute_thread
        execute_thread = threading.Thread(target=live, args=(state_widget, model, camera, prediction_widget, score_widget))
        execute_thread.start()
    else:
        execute_thread.join()
    
       
state_widget.observe(start_live, names='value')

live_execution_widget = ipywidgets.VBox([
    ipywidgets.HBox(score_widgets),
    prediction_widget,
    state_widget
])

all_widget = ipywidgets.VBox([
    ipywidgets.HBox([camera_widget,live_execution_widget])])

display(all_widget)

# Fermeture de la caméra et du kernel

In [None]:
# Extinction de la caméra
try :
    camera.capture_video(run=False)
    camera.release()
    del camera
except NameError:
    pass

os._exit(00)