In [None]:
# LAPTOP_SERVER_URL = "http://192.168.45.247:8485"  # Check Server URL every time we change network
LAPTOP_SERVER_URL = "http://192.168.45.68:8485"  # Check Server URL every time we change network
# LAPTOP_SERVER_URL = "http://192.168.0.8:8485"  # Check Server URL every time we change network

API_DISPLAY = LAPTOP_SERVER_URL + '/display'
API_REGISTER = LAPTOP_SERVER_URL + '/register_car'
API_JETSON_COMMAND = LAPTOP_SERVER_URL + '/jetson_command'

In [None]:
import time
import threading
from collections import deque, Counter
from typing import List, Tuple, Any, Deque, Dict, Union
import cv2
import requests
import torch
import numpy as np
from jetracer.nvidia_racecar import NvidiaRacecar
from jetcam.csi_camera import CSICamera
from yoloDet import YoloTRT
import json
import socket
import math
import base64

IMG_SIZE = 416
END_PIX = IMG_SIZE - 1
MID_PIX = (IMG_SIZE - 1) / 2

CATEGORIES = ['apex']
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
SIGN_HEIGHT: Dict[str, float] = {'stp_sg': 0.096, 'trf_lgt': 0.090, 'prk_sg': 0.095}  # in millimeter


class YoloOutput:
    def __init__(self, cate: str, conf: float, x_min: float, y_min: float, x_max: float, y_max: float):
        self.cate: str = str(cate)
        self.conf: float = float(conf)
        self.x_min: float = float(x_min)
        self.y_min: float = float(y_min)
        self.x_max: float = float(x_max)
        self.y_max: float = float(y_max)
        self.x_cen = (x_min + x_max) / 2
        self.y_cen = (y_min + y_max) / 2

    @staticmethod
    def default() -> 'YoloOutput':
        return YoloOutput('clear', -1, -1, -1, -1, -1)

    def to_dict(self) -> Dict[str, Union[str, float]]:
        return {
            'cate': self.cate,
            'conf': self.conf,
            'x_min': self.x_min,
            'y_min': self.y_min,
            'x_max': self.x_max,
            'y_max': self.y_max
        }

    @staticmethod
    def from_dict(detection: Dict[str, Union[str, float]]) -> 'YoloOutput':
        return YoloOutput(detection['cate'], detection['conf'],
                          detection['x_min'], detection['y_min'],
                          detection['x_max'], detection['y_max'])

    def to_dense_str(self) -> str:
        return f"{self.cate}|{self.conf:0.3f}|{self.x_min:0.3f}|{self.y_min:0.3f}|{self.x_max:0.3f}|{self.y_max:0.3f}"

    @staticmethod
    def from_dense_str(dense_str: str) -> 'YoloOutput':
        if not dense_str:
            return YoloOutput.default()
        parts = dense_str.split('|')
        if len(parts) != 6:
            raise ValueError(f"Invalid dense_str format: {dense_str}")
        return YoloOutput(parts[0], float(parts[1]), float(parts[2]), float(parts[3]), float(parts[4]), float(parts[5]))

    def __repr__(self) -> str:
        return self.to_dense_str()


class YoloDetector:
    """Class for detecting various road objects including dash-segment, traffic signs, and animals using a YOLO model."""
    sign_model_trt = YoloTRT(library="/home/jetson/jetracer_backup/JetsonYolov5/yolov5/build_parking/libmyplugins.so",
                             engine="/home/jetson/jetracer_backup/JetsonYolov5/yolov5/build_parking/best_parking.engine",
                             conf=0.6, yolo_ver="v5")

    categories = ['lane', 'stp_sg', 'spd100', 'spd50', 'trf_lgt', 'prk_sg', 'bird', 'feline', 'herbivore', 'logo']

    @staticmethod
    def detect_sign(bgr8: np.ndarray) -> Tuple[List[YoloOutput], YoloOutput, YoloOutput, List[YoloOutput]]:
        # Check if the image was successfully loaded
        if bgr8 is None:
            print("Error: Could not load image.")
        # Resize the image
        # frame = imutils.resize(bgr8, width=IMG_SIZE)
        # Perform inference using the YOLO model
        detections = YoloDetector.sign_model_trt.detect(bgr8)
        list_lane = []
        highest_conf_sign = YoloOutput.default()  # Initialize with the default which has -1 conf
        list_animal = []
        highest_conf_logo = YoloOutput.default()  # Initialize with the default which has -1 conf

        for detection in detections:
            class_id = detection['clsid']
            output = YoloOutput(
                cate=YoloDetector.categories[class_id],
                conf=detection['conf'],
                x_min=detection['box'][0],
                y_min=detection['box'][1],
                x_max=detection['box'][2],
                y_max=detection['box'][3]
            )
            if class_id == 0:
                list_lane.append(output)
            elif 1 <= class_id <= 5:
                if output.conf > highest_conf_sign.conf:
                    highest_conf_sign = output
            elif 6 <= class_id <= 8:
                list_animal.append(output)
            elif class_id == 9:  # Class ID for logo
                if output.conf > highest_conf_logo.conf:
                    highest_conf_logo = output

        return list_lane, highest_conf_sign, highest_conf_logo, list_animal





In [None]:

class LaneCalculations:

    @staticmethod
    def calc_lane_average(list_lane: List[YoloOutput], last_lane_avg: Tuple[float, float]) -> Tuple[float, float]:
        result = last_lane_avg
        if list_lane:
            total_x = sum(lane.x_min + lane.x_max for lane in list_lane)
            total_y = sum(lane.y_min + lane.y_max for lane in list_lane)
            avg_x = total_x / 2 / len(list_lane)
            avg_y = total_y / 2 / len(list_lane)
            result = (avg_x, avg_y)
        return result

    @staticmethod
    def calculate_reference_lane(list_lane: List[YoloOutput]) -> Tuple[Tuple[float, float], Tuple[float, float]]:
        list_lane.sort(key=lambda _lane: _lane.y_cen)

        if len(list_lane) >= 2:
            start = (list_lane[0].x_cen, list_lane[0].y_cen)
            end = (list_lane[1].x_cen, list_lane[1].y_cen)
            lines = (start, end)
        elif len(list_lane) == 1:
            end = (list_lane[0].x_cen, list_lane[0].y_cen)
            lines = ((MID_PIX, END_PIX), end)
        else:
            lines = ((MID_PIX, END_PIX), (MID_PIX, 0))

        return lines

    @staticmethod
    def distance_point_to_line(point: Tuple[float, float], line: Tuple[Tuple[float, float], Tuple[float, float]]) -> float:
        (x0, y0) = point
        (x1, y1), (x2, y2) = line

        numerator = (y2 - y1) * x0 - (x2 - x1) * y0 + x2 * y1 - y2 * x1
        denominator = math.sqrt((y2 - y1)**2 + (x2 - x1)**2)

        if denominator < 0.01 * IMG_SIZE:
            # The two points defining the line are the same
            return 0

        return numerator / denominator

    @staticmethod
    def calculate_intersection_with_center_line(lane_avg: Tuple[float, float]) -> float:
        avg_x, avg_y = lane_avg
        # Convert to new coordinate system where (0,0) is (MID_PIX, END_PIX)
        # Normalized such that up-right is positive
        normalized_x = (avg_x - MID_PIX) / END_PIX
        normalized_y = (END_PIX - avg_y) / END_PIX

        # If y1 is 0, return extreme values for left and right
        if normalized_y == 0:
            return -1.5 if normalized_x < 0 else 1.5

        # Calculate the intersection point with y = 0.5
        x_intersect = 0.5 * normalized_x / normalized_y
        x_intersect = max(-1.5, min(x_intersect, 1.5))
        return x_intersect

    @staticmethod
    def calculate_angle_to_y_axis(lane_avg: Tuple[float, float]) -> float:
        avg_x, avg_y = lane_avg
        # Convert to new coordinate system where (0,0) is (MID_PIX, END_PIX)
        # Normalized such that up-right is positive
        normalized_x = (avg_x - MID_PIX) / END_PIX
        normalized_y = (END_PIX - avg_y) / END_PIX

        # Calculate the angle in radians with respect to the positive x-axis
        angle_radians = math.atan2(normalized_y, abs(normalized_x))

        # Convert radians to degrees
        angle_degrees = math.degrees(angle_radians)

        # Calculate the angle relative to the positive y-axis
        angle_to_y_axis = angle_degrees - 90

        # Normalize the angle to be between -1 and 1
        normalized_angle = angle_to_y_axis / 90
        normalized_angle = -normalized_angle if normalized_x < 0 else normalized_angle
        return normalized_angle

    @staticmethod
    def calculate_intersection_with_slope_line(lane_avg: Tuple[float, float]) -> float:
        avg_x, avg_y = lane_avg
        # Convert to new coordinate system where (0,0) is (MID_PIX, END_PIX)
        # Normalized such that up-right is positive
        normalized_x = (avg_x - MID_PIX) / END_PIX
        normalized_y = (END_PIX - avg_y) / END_PIX

        # Calculate the intersection point of the line passing through (0, 0) and (normalized_x, normalized_y)
        # with the line y = 1 - (2/3)x.
        if abs(normalized_x) < 0.01:
            return 0
        x_intersect = 1 / (normalized_y / abs(normalized_x) + 2 / 3)
        x_intersect = -x_intersect if normalized_x < 0 else x_intersect
        return x_intersect


class BgrConverter:

    @staticmethod
    def bgr8_to_jpeg(image: np.ndarray) -> bytes:
        return bytes(cv2.imencode('.jpg', image)[1])

    @staticmethod
    def bgr8_to_base64(image: np.ndarray) -> str:
        buffer = cv2.imencode('.jpg', image)[1]
        jpg_as_text = base64.b64encode(buffer).decode('utf-8')
        return jpg_as_text


class Observation:
    """Class for storing observation data."""
    QUEUE_SIZE_ANIM = 8
    COUNT_NEED_ANIM = 3
    QUEUE_SIZE_SIGN = 12
    COUNT_NEED_SIGN = 5

    def __init__(self) -> None:
        self.img_base64: str = ''
        self.current_time: float = -1
        self.lane_avg: Tuple[float, float] = (MID_PIX, MID_PIX)
        self.steer_calc: float = 0.0
        self.list_lane: List[YoloOutput] = []
        self.queue_sign: Deque[YoloOutput] = deque(maxlen=Observation.QUEUE_SIZE_SIGN)
        self.queue_anim: Deque[List[YoloOutput]] = deque(maxlen=Observation.QUEUE_SIZE_ANIM)
        self.last_anim: YoloOutput = YoloOutput.default()
        self.park_zone: YoloOutput = YoloOutput.default()
        self.counter: Counter = Counter()
        self.frame_index: int = 0
        self.anim_last_left_most: float = 0
        self.anim_last_right_most: float = 0
        self.timer: Timer = Timer()
        self.flag: Flag = Flag()

    def reset(self) -> None:
        """Reset the observation data."""
        self.img_base64: str = ''
        self.current_time = -1
        self.lane_avg = (MID_PIX, MID_PIX)
        self.steer_calc = 0.0
        self.list_lane = []
        self.queue_sign = deque(maxlen=Observation.QUEUE_SIZE_SIGN)
        self.queue_anim = deque(maxlen=Observation.QUEUE_SIZE_ANIM)
        self.last_anim = YoloOutput.default()
        self.park_zone: YoloOutput = YoloOutput.default()
        self.counter = Counter()
        self.frame_index = 0
        self.anim_last_left_most = 0
        self.anim_last_right_most = 0
        self.timer.reset()
        self.flag.reset()

    def display_cam(self, throttle_value: float, steer_real: float) -> None:
        sign_thread = threading.Thread(target=WebDisplayer.display_cam,
                                       args=(self.img_base64, throttle_value, steer_real, self.steer_calc,
                                             self.list_lane, self.queue_sign[-1], self.park_zone, self.queue_anim[-1]))
        sign_thread.start()

    def evaluate_observation(self, throttle_value: float, steer_real: float) -> None:
        """Evaluate the current observation for road and traffic sign detection."""
        self.img_base64 = BgrConverter.bgr8_to_base64(CamWrapper.bgr8)
        self.list_lane, sign, self.park_zone, list_animal = YoloDetector.detect_sign(CamWrapper.bgr8)
        self.queue_sign.append(sign)
        self.queue_anim.append(list_animal)
        self.current_time = time.time()
        self.update_flags(throttle_value, steer_real)

    def get_last_list_anim(self) -> List[YoloOutput]:
        """Get the last non-empty list from the queue."""
        for list_anim in reversed(self.queue_anim):
            if list_anim:  # Check if the list is not empty
                return list_anim
        return []  # Return an empty list if all are empty

    def update_flags(self, throttle_value: float, steer_real: float) -> None:
        """Update flags based on detection counts."""
        if self.flag.park_phrase_2:
            # steering now base on parking zone
            steer_to_this = (self.park_zone.x_cen,self.park_zone.y_cen)
            self.steer_calc = LaneCalculations.calculate_intersection_with_slope_line(steer_to_this)
            self.display_cam(throttle_value, steer_real)
            return # ignore all others flag when parking
        if self.flag.park_phrase_3:
            return  # ignore all others flag when parking

        self.counter = Counter(sign.cate for sign in self.queue_sign if sign)

        self.flag.s100_seeing = self.counter['spd100'] >= Observation.COUNT_NEED_SIGN and self.timer.s100_ignore < self.current_time
        self.flag.s50_seeing = self.counter['spd50'] >= Observation.COUNT_NEED_SIGN and self.timer.s50_ignore < self.current_time

        if self.flag.stp_sg_waiting:  # if car is waiting, ignore stp_sg flags
            self.flag.stp_sg_seeing = False
            self.flag.stp_sg_marked = False
        else:
            self.flag.stp_sg_seeing = self.counter['stp_sg'] >= Observation.COUNT_NEED_SIGN and \
                                       self.timer.stp_sg_ignore < self.current_time
            if self.flag.stp_sg_seeing and not self.flag.stp_sg_marked:  # 1st frame stp_sg_seeing
                self.flag.stp_sg_marked = True

        self.flag.prk_sg_seeing = self.counter['prk_sg'] >= Observation.COUNT_NEED_SIGN
        if self.flag.prk_sg_seeing and not self.flag.prk_sg_marked:  # 1st frame prk_sg_seeing
            self.flag.prk_sg_marked = True

        if self.counter['trf_lgt'] >= Observation.COUNT_NEED_SIGN:
            self.flag.trf_lgt_seeing = True
        elif self.flag.trf_lgt_seeing:
            self.flag.trf_lgt_seeing = False
            self.flag.resume = True

        frames_with_animal = sum(1 for list_anim in self.queue_anim if list_anim)
        if frames_with_animal >= Observation.COUNT_NEED_ANIM:
            self.flag.anim_seeing = True
        else:
            self.flag.anim_seeing = False

        self.lane_avg = LaneCalculations.calc_lane_average(self.list_lane, self.lane_avg)
        self.steer_calc = LaneCalculations.calculate_intersection_with_slope_line(self.lane_avg)
        if self.flag.anim_seeing or self.current_time < self.timer.anim_last_dis_timer:
            if self.flag.anim_seeing:
                list_anim = self.get_last_list_anim()
                # Only evade animal with top conf -> usually also the closest one
                anim = list_anim[0]
                self.last_anim = anim
            else:
                anim = self.last_anim
            lane_line = LaneCalculations.calculate_reference_lane(self.list_lane)

            distance_to_cam = 1 - (anim.y_max / END_PIX)

            # if self.flag.anim_seeing:
            #     anim_left_most_to_lane = LaneCalculations.distance_point_to_line((anim.x_min, anim.y_max), lane_line) / END_PIX
            #     anim_right_most_to_lane = LaneCalculations.distance_point_to_line((anim.x_max, anim.y_max), lane_line) / END_PIX
            #     self.timer.anim_last_dis_timer = self.current_time + 1.2
            #     self.anim_last_left_most = anim_left_most_to_lane
            #     self.anim_last_right_most = anim_right_most_to_lane
            # else: # still try to avoid anim base on memory
            #     anim_left_most_to_lane = self.anim_last_left_most
            #     anim_right_most_to_lane = self.anim_last_right_most
            #
            # if anim_left_most_to_lane < 0 and anim_right_most_to_lane < -0.05: # If animal CLEARLY on the left, steer right.
            #     self.steer_calc += (0.6 - abs(anim_right_most_to_lane) * 1.1)
            # elif 0 < anim_right_most_to_lane and 0.05 < anim_left_most_to_lane: # If animal CLEARLY on the right, steer left.
            #     self.steer_calc -= (0.6 - abs(anim_right_most_to_lane) * 1.1)
            # else:
            #     # If animal is in the middle, stop if close
            #     if distance_to_cam < 0.3:
            #         self.flag.anim_close = True
            #     else:
            #         self.flag.anim_close = False


            anim_left_most = anim.x_min / END_PIX
            anim_right_most = anim.x_max / END_PIX
            if 0.6 < anim_left_most and distance_to_cam < 0.6:  # If animal on the right, steer left.
                anim_right_most_to_lane = LaneCalculations.distance_point_to_line((anim.x_min, anim.y_max), lane_line) / END_PIX
                # the closer to lane, the more steer
                self.steer_calc -= (0.7 - abs(anim_right_most_to_lane) * 1.05)
            elif anim_right_most < 0.4 and distance_to_cam < 0.6:  # If animal on the left, steer right.
                # the closer to lane, the more steer
                anim_right_most_to_lane = LaneCalculations.distance_point_to_line((anim.x_max, anim.y_max), lane_line) / END_PIX
                self.steer_calc += (0.7 - abs(anim_right_most_to_lane) * 1.05)
            else:
                # If animal is in the middle, stop if close
                if distance_to_cam < 0.3:
                    self.flag.anim_close = True
                else:
                    self.flag.anim_close = False


            if distance_to_cam < 0.7:
                self.flag.anim_slowing = True
                self.timer.slow_to_anim_timer = self.current_time + 4

            # Ignore some traffic sign due to animal
            self.flag.prk_sg_marked = False
            self.flag.prk_sg_seeing = False

        self.display_cam(throttle_value, steer_real)


class Timer:
    """Class for managing timers for different actions."""

    def __init__(self) -> None:
        self.s100_ignore: float = -1
        self.s50_ignore: float = -1
        self.stp_sg_wait: float = -1
        self.stp_sg_ignore: float = -1
        self.slow_to_anim_timer: float = -1
        self.anim_last_dis_timer: float = -1
        self.prk_sg_ending: float = -1
    def reset(self):
        self.__init__()


class Flag:
    """Class for managing flags indicating various detections."""

    def __init__(self) -> None:
        self.prk_sg_seeing: bool = False
        self.prk_sg_marked: bool = False
        self.park_phrase_2: bool = False
        self.park_phrase_3: bool = False
        self.s100_seeing: bool = False
        self.s50_seeing: bool = False
        self.trf_lgt_seeing: bool = False
        self.resume: bool = False
        self.stp_sg_marked: bool = False
        self.stp_sg_seeing: bool = False
        self.stp_sg_waiting: bool = False
        self.anim_seeing: bool = False
        self.anim_slowing: bool = False
        self.anim_close: bool = False

    def reset(self):
        self.__init__()


class CarWrapper:
    """Wrapper class for managing the car's status and actions."""
    # THROTTLE - STEERING_MULTIPLIER
    SPD_FULL = (0.400, 2.000)
    SPD_FAST = (0.350, 2.000)
    SPD_MIDD = (0.240, 2.000)
    SPD_SLOW = (0.200, 2.000)
    SPD_PARK = (0.190, 2.000)
    SPD_ANIM = (0.185, 1.000)
    SPD_ZERO = (0.000, 0.000)
    STEERING_BIAS = 0.05

    def __init__(self, car: Any, observation: Observation) -> None:
        self.car: NvidiaRacecar = car
        self.obs: Observation = observation
        self.is_car_running: bool = False
        self.is_pause: bool = True
        self.speed_tuple: Tuple[float, float] = CarWrapper.SPD_ZERO
        self.steering_multiplier: float = 0.0

    def reset(self) -> None:
        self.obs.reset()
        self.is_car_running = False
        self.is_pause = True
        self.speed_tuple = CarWrapper.SPD_ZERO
        self.steering_multiplier = 0.0

    # For some reason, negative throttle means go forward, so we have to flip to avoid confusion.
    # Now negative means backward, positive means forward.
    def set_throttle(self, throttle_value: float) -> None:
        if -self.car.throttle != throttle_value:
            self.car.throttle = -throttle_value

    def get_throttle(self) -> float:
        return -self.car.throttle

    # Steering also needs flipping
    # Now negative means left, positive means right
    def set_steering(self, steer_calc: float) -> None:
        steer_real = steer_calc * self.steering_multiplier + CarWrapper.STEERING_BIAS
        if -self.car.steering != steer_real:
            self.car.steering = -steer_real

    def get_steering(self) -> float:
        steer_real = -self.car.steering
        return steer_real

    def set_speed(self, speed_tuple: Tuple[float, float], is_permanent: bool = False) -> None:
        self.set_throttle(speed_tuple[0])
        self.steering_multiplier = speed_tuple[1]
        if is_permanent:
            self.speed_tuple = speed_tuple

    def resume_speed(self) -> None:
        """Resume the car's speed."""
        self.set_speed(self.speed_tuple, is_permanent=False)

    def move_to_sign(self, sign_name: str, trigger_if_close: callable, trigger_if_pass: callable) -> None:
        """Move the car towards the detected sign."""
        newest_sign = self.obs.queue_sign[-1]
        x_min_pct = newest_sign.x_min / END_PIX
        x_max_pct = newest_sign.x_max / END_PIX
        sign_height = (newest_sign.y_max - newest_sign.y_min + 1) / IMG_SIZE
        if 0.24 <= x_min_pct and x_max_pct <= 0.76:
            # Forward with current speed
            pass
        elif 0.19 <= x_min_pct < 0.24 or 0.76 < x_max_pct <= 0.81:
            # Slow down
            self.set_speed(CarWrapper.SPD_SLOW, is_permanent=False)
        elif 0 <= x_min_pct < 0.19 or 0.81 < x_max_pct <= 1:
            # Execute sign action
            # trigger_if_close()
            if SIGN_HEIGHT[sign_name] < sign_height:  # estimate distance from camera to sign by
                trigger_if_close()
            else:
                self.set_speed(CarWrapper.SPD_ANIM, is_permanent=False)
        else:
            # This means traffic sign is not detected
            if trigger_if_pass:
                trigger_if_pass()

    def func_sign_trf_lgt(self) -> None:
        self.set_speed(CarWrapper.SPD_ZERO, is_permanent=False)
        time.sleep(0.3)

    def trigger_stp_sg_wait(self) -> None:
        self.set_speed(CarWrapper.SPD_ZERO, is_permanent=False)
        self.obs.flag.stp_sg_waiting = True
        self.obs.flag.stp_sg_seeing = False
        self.obs.flag.stp_sg_marked = False
        self.obs.timer.stp_sg_wait = self.obs.current_time + 3
        self.obs.timer.stp_sg_ignore = self.obs.current_time + 3 + (1 / self.speed_tuple[0])


    def park_phrase_1(self) -> None:
        # Back 1 sec
        self.car.steering = -0.9
        self.car.throttle = 0.24
        time.sleep(0.8)

        # Steer to the left
        self.car.steering = 0.9
        self.car.throttle = -0.24
        time.sleep(1.5)

        # Move backward while steering to the right
        self.car.steering = -0.9
        self.car.throttle = 0.24
        time.sleep(1.9)

        # self.car.steering = 1
        # self.car.throttle = -0.28
        # time.sleep(0.5)

        self.car.steering = 0
        self.car.throttle = 0.0
        self.steering_multiplier = CarWrapper.SPD_PARK[1]
        self.obs.reset() # stop all calculation beside parking
        self.obs.flag.park_phrase_2 = True
        time.sleep(0.2)
        ## REMOVE THIS
        # self.is_car_running = False


    def take_action(self) -> None:
        if self.is_car_running and GlobalObj.web_command['racer'] != 'pause' and not self.obs.flag.park_phrase_3:
            self.set_steering(self.obs.steer_calc)

        if self.obs.flag.park_phrase_2:
            if self.obs.park_zone.cate == 'logo':
                self.set_speed(CarWrapper.SPD_PARK, is_permanent=True)
                distance_to_cam = 1 - (self.obs.park_zone.y_max / END_PIX)
                if distance_to_cam < 0.05:
                    self.obs.flag.park_phrase_2 = False
                    self.obs.flag.park_phrase_3 = True
                    self.obs.timer.park_ending = self.obs.current_time + 1.45 # run for extra 1 sec then stop
                    self.set_steering(0)
            else:
                # park zone not found when still in phrase 2
                self.set_speed(CarWrapper.SPD_ZERO, is_permanent=True)
                self.is_car_running = False
            return
        if self.obs.flag.park_phrase_3 and self.obs.timer.park_ending < self.obs.current_time:
            self.set_speed(CarWrapper.SPD_ZERO, is_permanent=True)
            self.is_car_running = False

        # React to traffic sign detections based on flags
        if self.obs.flag.anim_slowing:
            if self.obs.flag.anim_seeing:
                if self.obs.flag.anim_close:
                    self.set_speed(CarWrapper.SPD_ZERO, is_permanent=False)
                else:
                    self.set_speed(CarWrapper.SPD_ANIM, is_permanent=False)
            else:
                if self.obs.timer.slow_to_anim_timer < self.obs.current_time or \
                        (self.get_throttle() < CarWrapper.SPD_ANIM[0] and self.obs.timer.slow_to_anim_timer - 2.5 < self.obs.current_time):
                    self.obs.flag.anim_slowing = False
                    self.resume_speed()

        if self.obs.flag.s100_seeing:
            if self.obs.flag.anim_seeing:
                self.speed_tuple = CarWrapper.SPD_FAST  # will resume speed when self.obs.flag.slow_to_anim = False
            else:
                self.set_speed(CarWrapper.SPD_FAST, is_permanent=True)
                self.obs.timer.s100_ignore = self.obs.current_time + (1 / self.speed_tuple[0])
        elif self.obs.flag.s50_seeing:
            if self.obs.flag.anim_seeing:
                self.speed_tuple = CarWrapper.SPD_MIDD  # will resume speed when self.obs.flag.anim_slowing = False
            else:
                self.set_speed(CarWrapper.SPD_MIDD, is_permanent=True)
                self.obs.timer.s50_ignore = self.obs.current_time + (1 / self.speed_tuple[0])
        elif self.obs.flag.prk_sg_marked:
            if self.obs.flag.anim_seeing:
                pass
            elif self.obs.flag.prk_sg_seeing:
                self.move_to_sign('prk_sg', self.park_phrase_1, None)
            else:  # car move too fast and past prk_sg
                self.park_phrase_1()
        elif self.obs.flag.trf_lgt_seeing:
            self.move_to_sign('trf_lgt', self.func_sign_trf_lgt, None)
        elif self.obs.flag.stp_sg_waiting:
            if self.obs.current_time < self.obs.timer.stp_sg_wait:
                self.set_speed(CarWrapper.SPD_ZERO, is_permanent=False)  # Keep waiting
                time.sleep(0.3)
            else:
                self.obs.flag.stp_sg_waiting = False  # Done waiting
                self.obs.timer.stp_sg_ignore = self.obs.current_time + (1 / self.speed_tuple[0])
                self.resume_speed()
        elif self.obs.flag.stp_sg_marked:
            if self.obs.flag.stp_sg_seeing:
                self.move_to_sign('stp_sg', self.trigger_stp_sg_wait, None)
            else:  # car move too fast and past stp_sg
                self.trigger_stp_sg_wait()
        elif self.obs.flag.resume:  # No longer detect "trf_lgt" sign
            self.resume_speed()
            self.obs.flag.resume = False
        else:
            # No traffic sign detected action
            pass

    def run_loop(self) -> None:
        try:
            GlobalObj.web_command['racer'] = 'pause'
            WebDisplayer.set_jetson_command('pause')
            self.reset()
            self.set_throttle(0)
            self.steering_multiplier = 0
            self.speed_tuple = CarWrapper.SPD_FAST
            self.is_car_running = True
            while self.is_car_running and GlobalObj.web_command['racer'] != 'end':
                if GlobalObj.web_command['racer'] == 'pause':
                    self.obs.evaluate_observation(self.get_throttle(), self.get_steering())
                    self.set_throttle(0)
                    time.sleep(0.1)
                elif GlobalObj.web_command['racer'] == 'end':
                    pass
                elif GlobalObj.web_command['racer'] == 'resume':
                    self.obs.reset()
                    self.obs.evaluate_observation(self.get_throttle(), self.get_steering())
                    self.resume_speed()
                    self.take_action()
                    GlobalObj.web_command['racer'] = 'run'
                else:
                    self.obs.evaluate_observation(self.get_throttle(), self.get_steering())
                    self.take_action()

        except Exception as _ex:
            raise _ex
        finally:
            self.is_car_running = False
            self.set_speed(CarWrapper.SPD_ZERO, is_permanent=True)
            self.set_steering(0)
            GlobalObj.stop_obj()


class CamWrapper:
    """Class for managing the camera operations."""
    camera: CSICamera = None
    bgr8: np.array = None

    @staticmethod
    def init_cam() -> None:
        """Initialize a new camera."""
        if CamWrapper.camera:
            try:
                CamWrapper.stop_cam()
            except Exception as _ex:
                print(_ex)
        CamWrapper.camera = CSICamera(width=IMG_SIZE, height=IMG_SIZE, capture_fps=20)

    @staticmethod
    def update_image(change: Any) -> None:
        CamWrapper.bgr8 = change['new']

    @staticmethod
    def observe() -> None:
        CamWrapper.camera.running = True
        CamWrapper.camera.observe(CamWrapper.update_image, names='value')

    @staticmethod
    def unobserve() -> None:
        CamWrapper.camera.running = False
        CamWrapper.camera.unobserve(CamWrapper.update_image, names='value')

    @staticmethod
    def stop_cam() -> None:
        """Stop the camera and release resources."""
        try:
            CamWrapper.camera.running = False
            CamWrapper.camera.cap.release()  # Release the camera resource if applicable
        except Exception as _ex:
            print(_ex)
        finally:
            CamWrapper.camera = None


class WebDisplayer:
    """Class for broadcast img."""

    @staticmethod
    def display_cam(img_base64: str, throttle: float, steer_real: float, steer_calc: float, list_lane: List[YoloOutput],
                    sign: YoloOutput, park_zone: YoloOutput, list_animal: List[YoloOutput]) -> None:
        upload_data = {
            'img_base64': img_base64,
            'throttle': throttle,
            'steer_real': steer_real,
            'steer_calc': steer_calc,
            'list_lane': json.dumps([lane.to_dense_str() for lane in list_lane]),
            'sign': sign.to_dense_str(),
            'logo': park_zone.to_dense_str(),
            'list_anim': json.dumps([animal.to_dense_str() for animal in list_animal])
        }
        try:
            response = requests.post(API_DISPLAY, data=upload_data, timeout=5)
            response.raise_for_status()  # Raise an exception for HTTP errors
            command = response.json()['racer']
            if command in ["run", "pause", "end"]:
                if command == 'run':
                    # ignore if current web_command['racer'] is 'run' or 'resume'
                    if GlobalObj.web_command['racer'] == 'pause':
                        GlobalObj.web_command['racer'] = 'resume'
                else:
                    GlobalObj.web_command['racer'] = command
        except requests.exceptions.Timeout:
            print("The request timed out after 5 seconds.")
            GlobalObj.car_wrapper.is_car_running = False
            GlobalObj.web_command['racer'] = 'end'
        except requests.exceptions.RequestException as _ex:
            print("An error occurred:", _ex)
            GlobalObj.car_wrapper.is_car_running = False
            GlobalObj.web_command['racer'] = 'end'

    @staticmethod
    def send_zero(img_base64: str) -> None:
        """Send a zero command to the server."""
        WebDisplayer.display_cam(img_base64, 0, 0, 0.0, [], YoloOutput.default(), YoloOutput.default(), [])

    @staticmethod
    def set_jetson_command(racer_command: str) -> None:
        params = {
            'racer': racer_command
        }
        response = requests.get(API_JETSON_COMMAND, params=params, timeout=5)
        response.raise_for_status()  # Raise an exception for HTTP errors


class GlobalObj:
    racecar: NvidiaRacecar = None
    obs: Observation = None
    car_wrapper: CarWrapper = None
    web_command: Dict[str, str] = {'racer': 'pause'}

    @staticmethod
    def init_and_reset_obj():
        is_ok = True
        if not GlobalObj.racecar:
            GlobalObj.racecar = NvidiaRacecar()

        if not GlobalObj.obs:
            GlobalObj.obs = Observation()
        if not GlobalObj.car_wrapper:
            GlobalObj.car_wrapper = CarWrapper(GlobalObj.racecar, GlobalObj.obs)
        else:
            GlobalObj.car_wrapper.reset()

        if is_ok:
            CamWrapper.init_cam()
            CamWrapper.observe()
            print("Car is ready")
        else:
            print("NOT OK")

    @staticmethod
    def stop_obj():
        CamWrapper.unobserve()
        CamWrapper.stop_cam()


#### Init Environment

In [None]:
GlobalObj.init_and_reset_obj()

#### Run this loop

In [None]:
GlobalObj.car_wrapper.run_loop()

#### Stop Camera before re-run or closing notebook

In [None]:
GlobalObj.stop_obj()