In [None]:
import queue
from typing import Any
import torch
import numpy as np
import cv2
from time import perf_counter
from ultralytics import YOLO
import yaml
from pathlib import Path
from types import SimpleNamespace
import carla
import random
import os
from deep_sort.deep_sort import DeepSort
from deep_sort.utils.parser import get_config
from PIL import Image, ImageDraw, ImageFont
from pascal_voc_writer import Writer
import find_groundtruth
import json


YOLO_PATH = 'yolov8n.pt'
class_id = [1, 2, 3, 5, 7]
class_name = {1: 'bicycle' , 2: 'car', 3: 'motorcycle', 5: 'bus' ,7: 'truck'}

IM_WIDTH = image_w =  256*4
IM_HEIGHT = image_h = 256*3
palette = (2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1)

output_path = "output_video.mp4"

class main:
    def __init__(self):
        print('in init')
        
        self.model = self.load_model()
        self.save_vid = True
        self.output_path = "output_video.mp4"
        self.cfg = get_config()
        self.cfg.merge_from_file('deep_sort/configs/deep_sort.yaml')
        self.deepsort_weights = "deep_sort/deep/checkpoint/ckpt.t7"
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.deepsort = DeepSort(
            self.deepsort_weights,
            max_age=70
        )
        
    def load_model(self):
        print('in load model')
        model = YOLO(YOLO_PATH)
        return model

    def build_projection_matrix(self, w, h, fov, is_behind_camera=False):
        focal = w / (2.0 * np.tan(fov * np.pi / 360.0))
        K = np.identity(3)
        if is_behind_camera:
            K[0, 0] = K[1, 1] = -focal
        else:
            K[0, 0] = K[1, 1] = focal
        K[0, 2] = w / 2.0
        K[1, 2] = h / 2.0
        return K   
   
    def __call__(self):
        
        client = carla.Client('localhost', 2000)
        client.set_timeout(20.0)
        world = client.get_world()
        print('connected to carla')

        bp_lib = world.get_blueprint_library()
        vehicle_bp = bp_lib.find('vehicle.lincoln.mkz_2020')
        print('vehicle')
        spawn_points = world.get_map().get_spawn_points()
        print('spawn points')
        vehicle = world.try_spawn_actor(vehicle_bp, random.choice(spawn_points))
        print('spawn vehicle')
        
        #print('spectator')
        #spectator = world.get_spectator()
        #transform = carla.Transform(vehicle.get_transform().transform(carla.Location(x=-4, z=2.5)), carla.Rotation(yaw=-180, pitch=-90))     
        #spectator.set_transform(transform)

        camera_bp = bp_lib.find('sensor.camera.rgb')
        
        print('set cam attributes')
        camera_bp.set_attribute('image_size_x', f'{IM_WIDTH}')
        camera_bp.set_attribute('image_size_y', f'{IM_HEIGHT}')
        fov = camera_bp.set_attribute('fov', '110')
        print('cam settings done')
        fov = 110
        
        camera_init_trans = carla.Transform(carla.Location(z=2))
        print('cam initilized')
        camera = world.try_spawn_actor(camera_bp, camera_init_trans, attach_to=vehicle)
        print('cam attached')
        
        image_queue = queue.Queue()
        print('image queue')
        camera.listen(image_queue.put)

        def camera_callback(image, data_dict):
            image_data = np.array(image.raw_data)
            image_rgb = image_data.reshape((image.height, image.width, 4))[:, :, :3]
            data_dict['image'] = image_rgb

        camera_data = {'image': np.zeros((IM_HEIGHT, IM_WIDTH, 4))}
        camera.listen(lambda image: camera_callback(image, camera_data))
        
        edges = [[0,1], [1,3], [3,2], [2,0], [0,4], [4,5], [5,1], [5,7], [7,6], [6,4], [6,2], [7,3]]
        world_2_camera = np.array(camera.get_transform().get_inverse_matrix())

        K   = self.build_projection_matrix(IM_WIDTH, IM_HEIGHT, fov)
        K_b = self.build_projection_matrix(IM_WIDTH, IM_HEIGHT, fov, is_behind_camera=True)
        
        print('spawn vehicles')
        for i in range(30):
            vehicle_bp = bp_lib.filter('vehicle')
            car_bp = [bp for bp in vehicle_bp if int(bp.get_attribute('number_of_wheels')) == 4]
            npc = world.try_spawn_actor(random.choice(car_bp), random.choice(spawn_points))
            if npc:
                npc.set_autopilot(True)
                
        print('get objects filtered ')
        car_objects = world.get_environment_objects(carla.CityObjectLabel.Car) 
        truck_objects = world.get_environment_objects(carla.CityObjectLabel.Truck) 
        bus_objects = world.get_environment_objects(carla.CityObjectLabel.Bus) 

        env_object_ids = []

        for obj in (car_objects + truck_objects + bus_objects):
            env_object_ids.append(obj.id)

        # Disable all static vehicles
        world.enable_environment_objects(env_object_ids, False) 
        edges = [[0,1], [1,3], [3,2], [2,0], [0,4], [4,5], [5,1], [5,7], [7,6], [6,4], [6,2], [7,3]]
            
        def clear():
            camera.stop()
            for npc in world.get_actors().filter('*vehicle*'):
                if npc:
                    npc.destroy()
            print("Vehicles Destroyed.")    
            
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        video_writer = cv2.VideoWriter(self.output_path, fourcc, 20.0, (IM_WIDTH, IM_HEIGHT))

        vehicle.set_autopilot(True)
        edges = [[0,1], [1,3], [3,2], [2,0], [0,4], [4,5], [5,1], [5,7], [7,6], [6,4], [6,2], [7,3]]

        print('before while loop')
        yolo_writer = Writer('yolo.xml', image_w, image_h)
        gt_writer = Writer('groundtruth.xml', image_w, image_h)
        deepsort_annotations = []
        deepsort_final_annotations = []
        ds_output = []

        while True:
            print('inside while loop')
            world.tick()
            
            transform = carla.Transform(vehicle.get_transform().transform(carla.Location(x=-4,z=50)), carla.Rotation(yaw=-180, pitch=-90)) 
            spectator.set_transform(transform)
            
            frame_gt = image_queue.get()
            img = np.reshape(np.copy(frame_gt.raw_data), (frame_gt.height, frame_gt.width, 4))
            
            gt_file = 'output/%06d' % frame_gt.frame
            frame_gt.save_to_disk(gt_file + '.png')
            gt_writer = Writer(gt_file + '.png', IM_WIDTH, IM_HEIGHT)
            timestamp_sec = frame_gt.timestamp

            print('calling ground truth file')
            get_groundtruth(world, camera, vehicle, frame_gt, K, K_b,timestamp_sec)
            
            frame = camera_data['image']
            results = self.model(frame)
            bbox_xyxy = []
            conf_score = []
            cls_id = []
            outputs = []
            for box in results:  
                for row in box.boxes.data.tolist():
                    x1, y1, x2, y2, conf, id = row    
                    if int(id) in class_id:
                        bbox_xyxy.append([int(x1), int(y1), int(x2), int(y2)])
                        conf_score.append(conf)
                        cls_id.append(int(id))
                    else:
                        continue                 
                    outputs = self.deepsort.update(bbox_xyxy, conf_score, frame)

                    

                    for output , conf , id in zip(outputs , conf_score , cls_id):
                        
                        deepsort_writer.addObject('vehicle', timestamp_sec, output[0] ,output[1], output[2] - output[0] ,output[3] - output[1])
                        
                        deepsort_annotations.append({
                                            "dco": True,
                                            "height": output[3] - output[1],
                                            "width": output[2] - output[0],
                                            "id": "vehicle",  # Replace with actual class name
                                            "y": output[1],
                                            "x": output[0]
                                                        })

                    deepsort_final_annotations.append({
                                "timestamp": timestamp_sec,
                                "num": image.frame,
                                "class": "frame",
                                "annotations": deepsort_annotations
                            })
                    ds_output = {
                                    "frames": deepsort_final_annotations,
                                    "class": "video",
                                    "filename": "hypothese.json"
                                }

                    
                    
                    frame = np.array(frame)
                    if len(outputs) > 0:
                        for j, (output, conf) in enumerate(zip(outputs, conf_score)):
                                frame = self.annotation(frame, output, conf, cls_id[j])

                    with open ('hypothese.json' , 'w') as json_file:
                            json.dump(ds_output , json_file)

                        
                    frame = cv2.UMat(frame)
                    cv2.imshow('deepSORT', frame)
                    cv2.imshow('ground truth', img)
                    if self.save_vid:
                        video_writer.write(frame)  
                    if cv2.waitKey(1) == ord('q'):
                        break

                        
        cv2.destroyAllWindows()
        camera.stop()
        camera.destroy()
        vehicle.destroy()
        clear()
    def compute_color_for_labels(self , label):
        color = [int((p * (label ** 2 - label + 1)) % 255) for p in palette]
        return tuple(color)       
    def annotation(self, frame, output, conf, cls_id):
        x1, y1, x2, y2 = map(int,output[0:4])
        id = int(output[4])
        label = ''
        if cls_id in class_name:
            label = class_name[cls_id]  
        frame = frame if isinstance(frame, np.ndarray) else np.array(frame)
        color = self.compute_color_for_labels(id)
        t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 1 , 1)[0]
        c_id = f'{label} {id}'
        cv2.rectangle(frame, (x1, y1),(x2,y2), color, 1)
        cv2.rectangle(frame, (x1, y1), (x1 + t_size[0] + 3, y1 + t_size[1] + 4), color, -1)
        cv2.putText(frame, c_id, (x1, y1 + t_size[1] + 4), cv2.FONT_HERSHEY_PLAIN, 1, [255, 255, 255], 2)
        return frame    
    
if __name__ == '__main__':
    run = main()
    run()

in grounf truth file
in init
in load model
connected to carla
vehicle
spawn points
spawn vehicle
set cam attributes
cam settings done
cam initilized
