In [None]:
from jetcam.csi_camera import CSICamera
import cv2
import time
import os
import ipywidgets.widgets as widgets
import onnxruntime as ort
import torch
import torchvision.transforms as transforms
import numpy as np
from PIL import Image
import os
import time
from jetracer.nvidia_racecar import NvidiaRacecar

# Configuration
dataset_dir = "dataset_capture_3"
capture_interval_sec = 5

# Create directory if it doesn't exist (good I/O practice)
if not os.path.exists(dataset_dir):
    os.makedirs(dataset_dir)

counter = 0

car = NvidiaRacecar()

In [None]:
# Configuration
MODEL_PATH = "models/resnet18_15.onnx" 
NUM_CLASSES = 2

# Priority on TensorRT but CUDA as fallback
providers = [
    ('TensorrtExecutionProvider', {
        'device_id': 0,
        'trt_max_workspace_size': 2147483648, # 2GB workspace
        'trt_fp16_enable': True, # Activate FP16
    }),
    ('CUDAExecutionProvider', {
        'device_id': 0,
    })
]
print(f"Loading model {MODEL_PATH} with providers : {providers}")

# Create session
ort_sess = ort.InferenceSession(MODEL_PATH, providers=providers)
print("Session ONNX loaded")

In [None]:
def default_transform(resolution=(224,224)):
    return transforms.Compose([
        transforms.Resize(resolution),
        transforms.CenterCrop(resolution),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

transform = default_transform()

In [None]:
def run_inference(image, session, transform):
    # 1. Loading and preprocessing
    
    # Transformation et send to GPU
    input_tensor = transform(image).unsqueeze(0).to("cuda:0")

    # 2. IO Binding
    io_binding = session.io_binding()

    # Bind Input
    io_binding.bind_input(
        name="input_0", # !!!! Vérifie bien que c'est le nom dans ton ONNX (netron.app pour vérifier)
        device_type="cuda",
        device_id=0,
        element_type=np.float32,
        shape=tuple(input_tensor.shape),
        buffer_ptr=input_tensor.data_ptr()
    )

    # Prepare Tensor Output
    output_tensor = torch.empty((1, NUM_CLASSES), dtype=torch.float32, device="cuda:0").contiguous()

    # Bind Output
    io_binding.bind_output(
        name="output_0", # !!! Idem, vérifie le nom
        device_type="cuda",
        device_id=0,
        element_type=np.float32,
        shape=tuple(output_tensor.shape),
        buffer_ptr=output_tensor.data_ptr()
    )

    # 3. Execution and time measure
    start = time.time()
    session.run_with_iobinding(io_binding)
    end = time.time()
    
    latency_ms = (end - start) * 1000

    # 4. Get result
    probas = output_tensor.to("cpu")[0].tolist()
    
    return probas, latency_ms

In [None]:
# Instanciation camera
camera = CSICamera(
    width=224, height=224, # 224x224
    capture_width=1080, # 1080
    capture_height=720, # 720
    capture_fps=30 # 30
)



# 1. Start the stream ONLY ONCE
# Checking running state prevents heavy re-initialization overhead
if not camera.running:
    camera.running = True

In [None]:
try:
    while True:
        # Get current image from memory (Zero latency here)
        # camera.value is a reference to the latest frame in RAM
        image = camera.value
        
        if image is not None:
                probas, latency = run_inference(image, ort_sess, transform)
                classe_predite = np.argmax(probas)
                label = "CIBLE" if classe_predite == 0 else "NO CIBLE"
                print(f"Prediction : {label}")
                if label == "CIBLE" : 
                     car.throttle = 0.3
                else :
                     car.throttle = 0

        else:
            print("Error: No image retrieved from stream.")

        # Efficient sleep to release CPU for other tasks
        time.sleep(capture_interval_sec)
        
except KeyboardInterrupt:
    # This block handles the Jupyter 'Stop' button event
    print("\nCapture stopped by user.")
    
finally:

    camera.running = False 
    print("Camera stopped. Done.")