Reset de la caméra :

In [1]:
!sudo /home/jetson/notebook/ResetCam.sh

Met à jour les règles concernant les droits d'accès aux GPIO:

In [2]:
!sudo /home/jetson/notebook/MajReglesGPIO.sh

In [3]:
import tensorrt as trt
import numpy as np
import pycuda.driver as cuda
import threading
import ctypes
import time
import traitlets
import atexit
import cv2
from playsound import playsound
import Jetson.GPIO as GPIO

class TRTInference(threading.Thread):
    def __init__(self,repertoire_engine_MoveNet, repertoire_engine_Classification, widget_image,widget_probas, widget_PWM, widget_jeu, smileOK, smileNONOK, type_camera,capture_device,capture_width,capture_height,display_width,display_height,fps,flip=0):
        threading.Thread.__init__(self)
        self.widget_image = widget_image
        self.widget_probas = widget_probas
        self.widget_PWM = widget_PWM
        self.widget_jeu = widget_jeu
        self.smile_OK = smileOK
        self.smile_NON_OK = smileNONOK
        self.type_camera = type_camera
        self.capture_device = capture_device
        self.capture_width = capture_width
        self.capture_height = capture_height
        self.display_width = display_width
        self.display_height = display_height
        self.fps = fps
        self.flip = flip
        self.camera_on = False
        self.a = 0
        
        # Initialisation du PWM
        GPIO.setmode(GPIO.BOARD)
        GPIO.setup(32,GPIO.OUT)
        self.pwm = GPIO.PWM(32,1000)           # Fréquence 1 kHz
        self.pwm.start(50)                     # Rapport cyclique 50%
        
        # Initialisation des variables de la caméra
        self._running = False
        self.image_pour_widget = np.zeros((capture_width, capture_height, 3), dtype=np.int32)
        self.image = np.zeros((256, 256, 3), dtype=np.int32)
        
        if self.type_camera.find("CSI")>=0:
            self.cap = cv2.VideoCapture(self._gstreamer_pipeline_CSI(),cv2.CAP_GSTREAMER)
        else:
            self.cap = cv2.VideoCapture(self._gstreamer_pipeline_USB(),cv2.CAP_GSTREAMER)
        if self.cap.isOpened():
            print("Caméra initialisée")
        else:
            print("Erreur d'ouverture du flux vidéo")
        atexit.register(self.cap.release)

        # Initialisation du runtime TensorRT
        #self.logger = MyLogger()
        self.logger = trt.Logger(trt.Logger.INFO)
        trt.init_libnvinfer_plugins(self.logger, namespace="")
        self.runtime = trt.Runtime(self.logger)
        
        ############################################
        # Chargement du moteur MoveNet
        #
        print("Chargement du moteur MoveNet...")
        with open(repertoire_engine_MoveNet, "rb") as f:
            self.engine_MoveNet = self.runtime.deserialize_cuda_engine(f.read())
        self.context_MoveNet = self.engine_MoveNet.create_execution_context()
        
        #Initialisation du context Cuda et du contexte TensorRT 
        cuda.init()
        self.cudactx = cuda.Device(0).retain_primary_context()
        self.cudactx.push()
        
        # Réservation de la mémoire pour l'entrée
        print("Allocation mémoire...")
        size_input = trt.volume(self.engine_MoveNet.get_binding_shape(0))*self.engine_MoveNet.max_batch_size
        self.input_host_mem_MoveNet = cuda.pagelocked_empty(size_input, np.int32)
        self.input_device_mem_MoveNet = cuda.mem_alloc(self.input_host_mem_MoveNet.nbytes)
       
        # Réservation de la mémoire pour les sorties
        self.output_device_mem_MoveNet = [];
        format_sorties = [];
        types_sorties = [];

        for i in range(self.engine_MoveNet.num_bindings):
            if not self.engine_MoveNet.binding_is_input(i):
                size_output = trt.volume(self.engine_MoveNet.get_binding_shape(i))*self.engine_MoveNet.max_batch_size
                output_hm = cuda.pagelocked_empty(size_output,trt.nptype(self.engine_MoveNet.get_binding_dtype(i)))
                self.output_device_mem_MoveNet.append(cuda.mem_alloc(output_hm.nbytes))
                format_sorties.append(self.engine_MoveNet.get_binding_shape(i))
                types_sorties.append(trt.nptype(self.engine_MoveNet.get_binding_dtype(i)))

        # Récupère les adresses en GPU des buffers entrées / sorties
        binding_entree = int(self.input_device_mem_MoveNet)
        binding_sorties = []

        for output_ in self.output_device_mem_MoveNet:
            binding_sorties.append(int(output_))
        self.bindings_MoveNet = [binding_entree, binding_sorties[0]]

        # Allocation de la mémoire hote pour les sorties
        self.output_host_mem_MoveNet = []
        for i in range(len(self.output_device_mem_MoveNet)):
            self.output_host_mem_MoveNet.append(np.zeros(format_sorties[i],types_sorties[i]))
        
        # Input tensor
        self.image = np.zeros((256, 256, 3), dtype=trt.nptype(self.engine_MoveNet.get_binding_dtype(0)))
        self.image = self.image.astype(np.int32)       
        self.cudactx.pop()
        
        ############################################
        # Chargement du moteur de classification
        #
        print("Chargement du moteur de classification...")
        with open(repertoire_engine_Classification, "rb") as f:
            self.engine_Classification = self.runtime.deserialize_cuda_engine(f.read())
        self.context_Classification = self.engine_Classification.create_execution_context()
        self.context_Classification.debug_sync = True
        
        #Initialisation du context Cuda et du contexte TensorRT 
        self.cudactx = cuda.Device(0).retain_primary_context()
        self.cudactx.push()

        # Réservation de la mémoire pour l'entrée
        print("Allocation mémoire...")
        size_input = trt.volume(self.engine_Classification.get_binding_shape(0))*self.engine_Classification.max_batch_size
        self.input_host_mem_Classification = cuda.pagelocked_empty(size_input, np.float32)
        self.input_device_mem_Classification = cuda.mem_alloc(self.input_host_mem_Classification.nbytes)
        

        # Réservation de la mémoire pour les sorties
        self.output_device_mem_Classification = [];
        format_sorties = [];
        types_sorties = [];

        for i in range(self.engine_Classification.num_bindings):
            if not self.engine_Classification.binding_is_input(i):
                size_output = trt.volume(self.engine_Classification.get_binding_shape(i))*self.engine_Classification.max_batch_size
                output_hm = cuda.pagelocked_empty(size_output,trt.nptype(self.engine_Classification.get_binding_dtype(i)))
                self.output_device_mem_Classification.append(cuda.mem_alloc(output_hm.nbytes))
                format_sorties.append(self.engine_Classification.get_binding_shape(i))
                types_sorties.append(trt.nptype(self.engine_Classification.get_binding_dtype(i)))

        # Récupère les adresses en GPU des buffers entrées / sorties
        binding_entree = int(self.input_device_mem_Classification)
        binding_sorties = []

        for output_ in self.output_device_mem_Classification:
            binding_sorties.append(int(output_))
        self.bindings_Classification = [binding_entree, binding_sorties[0]]

        # Allocation de la mémoire hote pour les sorties
        self.output_host_mem_Classification = []
        for i in range(len(self.output_device_mem_Classification)):
            self.output_host_mem_Classification.append(np.zeros(format_sorties[i],types_sorties[i]))
        self.cudactx.pop()
        
        
    # Inférence
    def Calcul(self):
        ################################################
        # MoveNet
        ###
        # Copie de l'image dans le tenseur d'entrée
        x = self.image.astype(np.int32)
        x = np.expand_dims(x,axis=0)                    # (1,256,256,3)
        np.copyto(self.input_host_mem_MoveNet,x.ravel())
        
        # Transfert de l'entrée vers le GPU
        self.cudactx = cuda.Device(0).retain_primary_context()
        self.cudactx.push()
        cuda.memcpy_htod(self.input_device_mem_MoveNet, self.input_host_mem_MoveNet)
        
        # Appel du modèle
        self.context_MoveNet.execute_v2(bindings=self.bindings_MoveNet)
        
        # Récupération des sorties
        for i in range(len(self.output_host_mem_MoveNet)):
            cuda.memcpy_dtoh(self.output_host_mem_MoveNet[i], self.output_device_mem_MoveNet[i])
        self.cudactx.pop()

        ################################################
        # Classification
        ###
        # Copie du tenseur des positions dans le tenseur d'entrée
        x = self.output_host_mem_MoveNet[0][0]                     # (1,17,3)
        np.copyto(self.input_host_mem_Classification,x.ravel())
        
        # Transfert de l'entrée vers le GPU
        self.cudactx = cuda.Device(0).retain_primary_context()
        self.cudactx.push()
        cuda.memcpy_htod(self.input_device_mem_Classification, self.input_host_mem_Classification)

        # Appel du modèle
        self.context_Classification.execute_v2(bindings=self.bindings_Classification)

        # Récupération des sorties
        for i in range(len(self.output_host_mem_Classification)):
            cuda.memcpy_dtoh(self.output_host_mem_Classification[i], self.output_device_mem_Classification[i])
        self.cudactx.pop()
        
        proba_correcte = float(self.output_host_mem_Classification[0][0][0])
        self.widget_probas.value = proba_correcte
        
        if proba_correcte > 0.98:
            self.widget_jeu.value = self.smile_OK


        else:
            self.widget_jeu.value = self.smile_NON_OK
        

    # Lecture d'une frame
    def capture_image(self):
        re, image = self.cap.read()
        if re:
            return image
        else:
            return self.image_pour_widget
        
    def run(self):
        while True:
            if self.camera_on is True:
                
                # Gestion du PWM
                valeur_pwm = self.widget_PWM.value
                if valeur_pwm > 100:
                    valeur_pwm = 100
                elif valeur_pwm < 0:
                    valeur_pwm = 0
                    
                self.pwm.ChangeDutyCycle(valeur_pwm)
                
                # Traitement de l'image par l'IA
                self.image_pour_widget = self.capture_image()
                self.image = cv2.resize(self.image_pour_widget,(int(256),int(256)))              
                self.Calcul()
                
                # Récupère les coordonnees XY
                coordonneesXY = self.output_host_mem_MoveNet[0][0][0][:][:]
                
                # Place les points
                width, height = (self.display_width,self.display_height)

                nez = [int(coordonneesXY[0][1]*width),int(coordonneesXY[0][0]*height)]
                oeuil_gauche = [int(coordonneesXY[1][1]*width),int(coordonneesXY[1][0]*height)]
                oeuil_droit = [int(coordonneesXY[2][1]*width),int(coordonneesXY[2][0]*height)]
                oreille_gauche = [int(coordonneesXY[3][1]*width),int(coordonneesXY[3][0]*height)]
                oreille_droite = [int(coordonneesXY[4][1]*width),int(coordonneesXY[4][0]*height)]
                epaule_gauche = [int(coordonneesXY[5][1]*width),int(coordonneesXY[5][0]*height)]
                epaule_droite = [int(coordonneesXY[6][1]*width),int(coordonneesXY[6][0]*height)]
                coude_gauche = [int(coordonneesXY[7][1]*width),int(coordonneesXY[7][0]*height)]
                coude_droite = [int(coordonneesXY[8][1]*width),int(coordonneesXY[8][0]*height)]
                poignet_gauche = [int(coordonneesXY[9][1]*width),int(coordonneesXY[9][0]*height)]
                poignet_droite = [int(coordonneesXY[10][1]*width),int(coordonneesXY[10][0]*height)]
                hanche_gauche = [int(coordonneesXY[11][1]*width),int(coordonneesXY[11][0]*height)]
                hanche_droite = [int(coordonneesXY[12][1]*width),int(coordonneesXY[12][0]*height)]
                genou_gauche = [int(coordonneesXY[13][1]*width),int(coordonneesXY[13][0]*height)]
                genou_droite = [int(coordonneesXY[14][1]*width),int(coordonneesXY[14][0]*height)]
                cheville_gauche = [int(coordonneesXY[15][1]*width),int(coordonneesXY[15][0]*height)]
                cheville_droite = [int(coordonneesXY[16][1]*width),int(coordonneesXY[16][0]*height)]
                
                centre_yeux = [((int(0.5*(coordonneesXY[1][1]*width+int(coordonneesXY[2][1]*width))))),((int(0.5*(coordonneesXY[1][0]*height+int(coordonneesXY[2][0]*height)))))]
                centre_epaules = [((int(0.5*(coordonneesXY[5][1]*width+int(coordonneesXY[6][1]*width))))),((int(0.5*(coordonneesXY[5][0]*height+int(coordonneesXY[6][0]*height)))))]
                centre_hanches =  [((int(0.5*(coordonneesXY[11][1]*width+int(coordonneesXY[12][1]*width))))),((int(0.5*(coordonneesXY[11][0]*height+int(coordonneesXY[12][0]*height)))))]
                
                # Dessine les points en vert
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (nez[0],nez[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (oeuil_gauche[0],oeuil_gauche[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (oeuil_droit[0],oeuil_droit[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (oreille_gauche[0],oreille_gauche[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (oreille_droite[0],oreille_droite[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (epaule_gauche[0],epaule_gauche[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (epaule_droite[0],epaule_droite[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (coude_gauche[0],coude_gauche[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (coude_droite[0],coude_droite[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (poignet_gauche[0],poignet_gauche[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (poignet_droite[0],poignet_droite[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (hanche_gauche[0],hanche_gauche[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (hanche_droite[0],hanche_droite[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (genou_gauche[0],genou_gauche[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (genou_droite[0],genou_droite[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (cheville_gauche[0],cheville_gauche[1]), radius=8, color=(0, 255, 0), thickness=-1)
                self.image_pour_widget = cv2.circle(self.image_pour_widget, (cheville_droite[0],cheville_droite[1]), radius=8, color=(0, 255, 0), thickness=-1)
                

                # Lignes nez-centre des yeux
                self.image_pour_widget = cv2.line(self.image_pour_widget, (nez[0],nez[1]),(centre_yeux[0],centre_yeux[1]),color=(0, 0, 255), thickness=2)

                # Lignes centre des yeux - oueil gauche
                self.image_pour_widget = cv2.line(self.image_pour_widget, (oeuil_gauche[0],oeuil_gauche[1]),(centre_yeux[0],centre_yeux[1]),color=(0, 0, 255), thickness=2)

                # Lignes centre des yeux - oueil droit
                self.image_pour_widget = cv2.line(self.image_pour_widget, (oeuil_droit[0],oeuil_droit[1]),(centre_yeux[0],centre_yeux[1]),color=(0, 0, 255), thickness=2)

                # Lignes oeuil_gauche - oreille gauche
                self.image_pour_widget = cv2.line(self.image_pour_widget, (oeuil_gauche[0],oeuil_gauche[1]),(oreille_gauche[0],oreille_gauche[1]),color=(0, 0, 255), thickness=2)

                # Lignes oeuil_droit - oreille droite
                self.image_pour_widget = cv2.line(self.image_pour_widget, (oeuil_droit[0],oeuil_droit[1]),(oreille_droite[0],oreille_droite[1]),color=(0, 0, 255), thickness=2)

                # Lignes nez-centre_epaules
                self.image_pour_widget = cv2.line(self.image_pour_widget, (nez[0],nez[1]),(centre_epaules[0],centre_epaules[1]),color=(0, 0, 255), thickness=2)

                # Ligne centre_epaules - epaule_droit
                self.image_pour_widget = cv2.line(self.image_pour_widget, (epaule_droite[0],epaule_droite[1]),(centre_epaules[0],centre_epaules[1]),color=(0, 0, 255), thickness=2)

                # Ligne centre_epaules - epaule_gauche
                self.image_pour_widget = cv2.line(self.image_pour_widget, (epaule_gauche[0],epaule_gauche[1]),(centre_epaules[0],centre_epaules[1]),color=(0, 0, 255), thickness=2)

                # Ligne épaule_gauche - coude_gouche
                self.image_pour_widget = cv2.line(self.image_pour_widget, (epaule_gauche[0],epaule_gauche[1]),(coude_gauche[0],coude_gauche[1]),color=(0, 0, 255), thickness=2)

                # Ligne coude_gouche - poignet_gauche
                self.image_pour_widget = cv2.line(self.image_pour_widget, (poignet_gauche[0],poignet_gauche[1]),(coude_gauche[0],coude_gauche[1]),color=(0, 0, 255), thickness=2)

                # Ligne épaule_droite - coude_doit
                self.image_pour_widget = cv2.line(self.image_pour_widget, (epaule_droite[0],epaule_droite[1]),(coude_droite[0],coude_droite[1]),color=(0, 0, 255), thickness=2)

                # Ligne coude_droite - poignet_droite
                self.image_pour_widget = cv2.line(self.image_pour_widget, (poignet_droite[0],poignet_droite[1]),(coude_droite[0],coude_droite[1]),color=(0, 0, 255), thickness=2)

                # Ligne centre_epaules - centre_hanche
                self.image_pour_widget = cv2.line(self.image_pour_widget, (centre_hanches[0],centre_hanches[1]),(centre_epaules[0],centre_epaules[1]),color=(0, 0, 255), thickness=2)

                # Ligne centre_hanche - hanche_gauche
                self.image_pour_widget = cv2.line(self.image_pour_widget, (centre_hanches[0],centre_hanches[1]),(hanche_gauche[0],hanche_gauche[1]),color=(0, 0, 255), thickness=2)

                # Ligne centre_hanche - hanche_gauche
                self.image_pour_widget = cv2.line(self.image_pour_widget, (centre_hanches[0],centre_hanches[1]),(hanche_droite[0],hanche_droite[1]),color=(0, 0, 255), thickness=2)

                # Ligne hanche_gauche - genou_gauche
                self.image_pour_widget = cv2.line(self.image_pour_widget, (genou_gauche[0],genou_gauche[1]),(hanche_gauche[0],hanche_gauche[1]),color=(0, 0, 255), thickness=2)

                # Ligne hanche_droite - genou_droite
                self.image_pour_widget = cv2.line(self.image_pour_widget, (genou_droite[0],genou_droite[1]),(hanche_droite[0],hanche_droite[1]),color=(0, 0, 255), thickness=2)

                # Ligne genou_droite - cheville_droite
                self.image_pour_widget = cv2.line(self.image_pour_widget, (genou_droite[0],genou_droite[1]),(cheville_droite[0],cheville_droite[1]),color=(0, 0, 255), thickness=2)

                # Ligne genou_gauche - cheville_gauche
                self.image_pour_widget = cv2.line(self.image_pour_widget, (genou_gauche[0],genou_gauche[1]),(cheville_gauche[0],cheville_gauche[1]),color=(0, 0, 255), thickness=2)                
                
                self.widget_image.value = bgr8_to_jpeg(self.image_pour_widget)
                time.sleep(0.001)

    # Définition du pipeline pour la caméra CSI
    def _gstreamer_pipeline_CSI(self):
        return("nvarguscamerasrc sensor-id=%d ! "
                "video/x-raw(memory:NVMM),"
                "width=(int)%d,height=(int)%d,"
                "format=(string)NV12, framerate=(fraction)%d/1 ! "
                "nvvidconv flip-method=%d ! "
                "video/x-raw,"
                "width=(int)%d,height=(int)%d,"
                "format=(string)BGRx ! videoconvert ! "
                "video/x-raw, format=(string)BGR ! "
                "appsink drop=true"
        %(self.capture_device,self.capture_width,self.capture_height,self.fps,self.flip, self.display_width,self.display_height))

    # Définition du pipeline pour la USB
    def _gstreamer_pipeline_USB(self):
        return("v4l2src device=/dev/video%d ! "
               "video/x-raw, width=(int)%d, height=(int)%d, framerate=(fraction)%d/1 ! "
               "videoflip method=%d ! "
               "videoconvert ! "
               "video/x-raw, format=(string)BGR ! appsink drop=true"
        %(self.capture_device,self.capture_width,self.capture_height,self.fps,self.flip))            

    # Routine pour arrêter le Thread
    def raise_exception(self):
        for id, thread in threading._active.items():
            if thread is self:
                thread_id = id
        res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id,ctypes.py_object(SystemExit))
        if res > 1:
            ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
            print('Exception raise failure')

    def destroy(self):
        self.cudactx.detach()

### Interface de visualisation

In [4]:
import ipywidgets.widgets as widgets
import traitlets
from IPython.display import display
from jetbot import bgr8_to_jpeg
from ipywidgets import Layout, Label

# Chargement des logos
file_logo_win = open("SmileOK.jpg", "rb")
logo_win = file_logo_win.read()

file_logo_failed = open("SmileNON_OK.jpg", "rb")
logo_failed = file_logo_failed.read()

# Création de l'interface
image_widget = widgets.Image(format='jpeg', width=1024, height=768)
slider_probas = widgets.FloatProgress(description='', min=0.0, max=1.0, orientation='horizontal')
slider_PWM = widgets.IntSlider(description='PWM caméra :', min=0, max=100, value=55.,
                               orientation='horizontal', style={'description_width': 'initial'},
                               layout=Layout(width='80%', height='80px'))

image_jeu = widgets.Image(format='jpeg', width=200, height=200)
image_jeu.value = logo_failed

# Affichage de l'interface
display(widgets.VBox([slider_PWM, widgets.HBox([image_widget, widgets.VBox([Label('Probabilité :'), slider_probas, image_jeu])])]))

VBox(children=(IntSlider(value=55, description='PWM caméra :', layout=Layout(height='80px', width='80%'), styl…

In [5]:
trt_inference_wrapper = TRTInference(repertoire_engine_MoveNet="model_jetson.engine",
                                     repertoire_engine_Classification="model_classification.engine",
                                     widget_image=image_widget,
                                     widget_probas=slider_probas,
                                     widget_PWM=slider_PWM,
                                     widget_jeu=image_jeu,
                                     smileOK=logo_win,
                                     smileNONOK=logo_failed,
                        type_camera="USB",capture_device=0,
                        capture_width=1280,capture_height=960,
                        display_width=1280,display_height=960,
                        fps=30,flip=0)

Caméra initialisée
Chargement du moteur MoveNet...
Allocation mémoire...
Chargement du moteur de classification...
Allocation mémoire...


In [6]:
trt_inference_wrapper.start()
trt_inference_wrapper.camera_on = True