In [1]:
#import tensorflow as tf
import numpy as np
#import keras_cv
import cv2
#from tensorflow import keras
from matplotlib import pyplot as plt
import carla
#import time
from ultralytics import YOLO
import easyocr
import sys
import os
import paho.mqtt.client as mqtt

client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
world = client.get_world()
spectator = world.get_spectator()
model_input_shape = 1024
detected_plates = {}

def spawn_vehicle(vehicle_index=0, spawn_index=0, pattern='vehicle.*'):
    blueprint_library = world.get_blueprint_library()
    vehicle_bp = blueprint_library.filter(pattern)[vehicle_index]
    spawn_point = world.get_map().get_spawn_points()[spawn_index]
    vehicle = world.spawn_actor(vehicle_bp, spawn_point)
    return vehicle

def resize_image(image):
    if max(image.shape[0], image.shape[1]) > model_input_shape:
        if image.shape[0] > image.shape[1]:
            height = model_input_shape
            width = int(image.shape[1] * model_input_shape / image.shape[0])
        else:
            height = int(image.shape[0] * model_input_shape / image.shape[1])
            width = model_input_shape
    resized_image = cv2.resize(image, (int(width), int(height)), cv2.INTER_AREA)
    resized_image = cv2.copyMakeBorder(resized_image, 0, model_input_shape - resized_image.shape[0], 0, model_input_shape - resized_image.shape[1], cv2.BORDER_CONSTANT, value=[0, 0, 0])
    scale = model_input_shape / max(image.shape[0], image.shape[1])
    return scale, resized_image

def update_detected_plates(plates_list, ticks_threshold=30, n_consecutive_ticks=5):
    new_confirmed_plates = []
    removed_plates = []

    # Step 1: Update or add plates in the detected list
    for plate in plates_list:
        if plate not in detected_plates:
            detected_plates[plate] = {
                "ticks_since_last_detection": 0,
                "consecutive_detections": 1,
                "is_confirmed": False
            }
        else:
            detected_plates[plate]["ticks_since_last_detection"] = 0
            detected_plates[plate]["consecutive_detections"] += 1

        # Confirm detection after `n_consecutive_frames`
        if (not detected_plates[plate]["is_confirmed"] and
                detected_plates[plate]["consecutive_detections"] >= n_consecutive_ticks):
            detected_plates[plate]["is_confirmed"] = True
            new_confirmed_plates.append(plate)

    # Step 2: Update ticks and detect stale plates
    stale_plates = []
    for plate, data in detected_plates.items():
        if plate not in plates_list:
            data["ticks_since_last_detection"] += 1
            data["consecutive_detections"] = 0  # Reset consecutive detections if not seen
        if data["ticks_since_last_detection"] > ticks_threshold:
            stale_plates.append(plate)

    for plate in stale_plates:
        removed_plates.append(plate)
        del detected_plates[plate]

    # Step 3: Generate message
    message = ""
    if new_confirmed_plates:
        message += f"New plates confirmed: {', '.join(new_confirmed_plates)}\n"
    if removed_plates:
        message += f"Plates not detected anymore: {', '.join(removed_plates)}"

    if message:
        send_mqtt_message("", message)
    
'''def update_detected_plates(plates_list, ticks_threshold=30, ticks_for_detection = 5):
    new_plates = []
    removed_plates = []
    for plate in plates_list:
        if plate not in detected_plates:
            if plate not in candidates_for_detection: # first detection
                candidates_for_detection[plate] = {"ticks_since_first_detection": 0}
            else: # Subsequent detections
                candidates_for_detection[plate]["ticks_since_first_detection"] = int(candidates_for_detection[plate]["ticks_since_first_detection"]) + 1
        if plate in candidates_for_detection.keys and candidates_for_detection.get(plate)["ticks_since_first_detection"] > ticks_for_detection: # Actual detection
            new_plates.append(plate)
            candidates_for_detection.pop(plate)
        detected_plates[plate] = {"ticks_since_last_detection": 0}
    for key, value in detected_plates.items():
        value["ticks_since_last_detection"] = int(value["ticks_since_last_detection"]) + 1
        if value["ticks_since_last_detection"] > ticks_threshold:
            removed_plates.append(key)
    for plate in candidates_for_detection.keys:
        if plate not in plates_list:
            # detected_plates[plate] = {"ticks_since_last_detection": 0}
            pass
    message = ""
    if len(new_plates) > 0:
        message = "New plates detected: "
        for plate in new_plates:
            message += plate + " "
        message += "\n"
    if len(removed_plates) > 0:
        message += "Plates not detected anymore: "
        for plate in removed_plates:
            detected_plates.pop(plate)
            message += plate + " "
    if len(message) > 0:
        send_mqtt_message("", message)
'''
def send_mqtt_message(topic, message):
    print(message)


camera_bp = world.get_blueprint_library().find('sensor.camera.rgb')
camera_bp.set_attribute('image_size_x', '1920')
camera_bp.set_attribute('image_size_y', '1080')
camera_bp.set_attribute('fov', '45')
camera_bp.set_attribute('sensor_tick', '0.04')
camera_bp.set_attribute('motion_blur_intensity', '0.0')
spawn_point = carla.Transform()

#yolov8_backbone = keras_cv.models.YOLOV8Backbone.from_preset('yolo_v8_s_backbone_coco',load_weights=True)
yolov8_plates_detector = YOLO("yolo_pytorch/best.pt")

#yolov8_plates_detector = keras_cv.models.YOLOV8Detector(backbone=yolov8_backbone,num_classes=1,bounding_box_format='xywh')
#yolov8_plates_detector.compile(optimizer='adam', classification_loss='binary_crossentropy', box_loss='ciou')
#yolov8_plates_detector.load_weights("model_checkpoints_3.8/checkpoint_epoch_24.weights.h5")
reader = easyocr.Reader(["en"], gpu=True)

In [2]:
ego_vehicle = spawn_vehicle(vehicle_index=2, spawn_index=7)
static_vehicle = spawn_vehicle(spawn_index=9)
running = True

camera = world.spawn_actor(camera_bp, carla.Transform(carla.Location(x=1.7, z=1.2), carla.Rotation(pitch=-10)), attach_to=ego_vehicle)
video_output = np.zeros((model_input_shape, model_input_shape, 4), dtype=np.uint8)
saved_location = carla.Location(x=0, y=0, z=0)

#Used to enhance the contrast of the read plate, improving visibility and potentially the chance to be correctly read
sharpen_kernel = np.array([[0, -1, 0],
                           [-1, 5, -1],
                           [0, -1, 0]])


def camera_callback(image):
    global video_output
    #output = np.reshape(np.copy(image.raw_data), (image.height, image.width, 4))
    image_array = np.frombuffer(image.raw_data, dtype=np.uint8).reshape((image.height, image.width, 4))
    image_bgr = cv2.cvtColor(image_array, cv2.COLOR_BGRA2BGR)
    current_detected_plates = []
    scale, resized_image = resize_image(image_bgr)
    #sys.stdout = open(os.devnull, 'w')
    pred = yolov8_plates_detector.predict(resized_image, verbose=False)
    #sys.stdout = sys.__stdout__
    rescaling_factor = 1 / scale
    for result in pred:
        boxes = result.boxes
        if boxes:
            for box in boxes:
                plate = image_bgr[int(box.xyxy[0][1].item() * rescaling_factor) : int(box.xyxy[0][3].item() * rescaling_factor), int(box.xyxy[0][0].item() * rescaling_factor) : int(box.xyxy[0][2].item() * rescaling_factor)]
                plate_enhanced = cv2.filter2D(plate, -1, sharpen_kernel)
                plate_reading = reader.readtext(plate_enhanced)
                #plt.imshow(plate)
                #plt.axis("off")
                #plt.show()
                cv2.rectangle(image_bgr, (int(box.xyxy[0][0].item() * rescaling_factor), int(box.xyxy[0][1].item() * rescaling_factor)), (int((box.xyxy[0][2].item() * rescaling_factor)), int((box.xyxy[0][3].item() * rescaling_factor))), (0, 255, 0), 1)
                read_flag = False
                for plate_detected in plate_reading:
                    confidence = plate_detected[2]
                    plate_text = plate_detected[1]
                    #print(plate_text)
                    if confidence > 0.5:
                        read_flag = True
                        cv2.putText(image_bgr, plate_text, (int(box.xyxy[0][0].item() * rescaling_factor), int(box.xyxy[0][1].item() * rescaling_factor)), cv2.FONT_HERSHEY_COMPLEX, 1, (255, 255, 255), 2)
                        current_detected_plates.append(plate_text.upper())
                if not read_flag:
                    cv2.putText(image_bgr, "????", (int(box.xyxy[0][0].item() * rescaling_factor), int(box.xyxy[0][1].item() * rescaling_factor)), cv2.FONT_HERSHEY_COMPLEX, 1, (255, 255, 255), 2)
    update_detected_plates(current_detected_plates)
    video_output = image_bgr


camera.listen(lambda image: camera_callback(image))

ego_vehicle_controls = carla.VehicleControl()
ego_vehicle_controls.throttle = 0.2
ego_vehicle.apply_control(ego_vehicle_controls)
#ego_vehicle.set_autopilot(True)
#static_vehicle.set_autopilot(True)
#print(static_vehicle.attributes.items)

cv2.namedWindow('RGB Camera', cv2.WINDOW_AUTOSIZE)
#print(ego_vehicle.attributes)


try:
    while running:
        if cv2.waitKey(1) == ord('q'):
            running = False
            break
        if video_output is not None:
            cv2.imshow('RGB Camera', video_output)
        if cv2.waitKey(1) == ord('s'):
            saved_location = ego_vehicle.get_location()
            print("Saved location: ", saved_location)
except Exception as e:
    print(f"Error occurred: {e}")
finally:
    print("Shutting down...")
    cv2.destroyAllWindows()
    if 'camera' in locals() and camera.is_alive:
        camera.destroy()
    for actor in world.get_actors():
        if actor.type_id.startswith('vehicle.') or actor.type_id.startswith('sensor.camera'):
            actor.destroy()


New plates confirmed: CARLA

Plates not detected anymore: CARLA
Shutting down...


In [37]:
for vehicle in world.get_actors().filter('vehicle.*'):
    vehicle.destroy()


In [50]:
world.set_weather(carla.WeatherParameters.ClearNight)

In [10]:
import torch, torchvision
print(torch.cuda.is_available())
print(torch.version.cuda)
print(torch.backends.cudnn.version())
print(torchvision.__version__)


In [9]:
blueprint_library = world.get_blueprint_library()

# Filter vehicle blueprints
vehicles_with_plates = []

blueprints = blueprint_library.filter('vehicle.*')
i = 0
for blueprint in blueprints:
    if blueprint.id == "vehicle.audi.tt":
        print(i)
    else:
        i = i + 1

# Print vehicles that support license plate changes
print("Vehicles with customizable license plates:")
#for vehicle_id in vehicles_with_plates:
#    print(vehicle_id)
#print(len(vehicles_with_plates))

2
Vehicles with customizable license plates:
