### Install necessary libraries

In [None]:
!pip install ultralytics easyocr filterpy lap asyncio websockets nest_asyncio

### Import the necessary libraries

In [None]:
from ultralytics import YOLO
import cv2
from firebase_admin import credentials, storage, firestore, auth
import firebase_admin
import datetime
import os
import math
import imutils
import csv
import sys
import numpy as np
import re
import easyocr
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from PIL import Image
from google.colab.patches import cv2_imshow
import locale
locale.getpreferredencoding = lambda: "UTF-8"
import asyncio
import websockets
import nest_asyncio
nest_asyncio.apply()


sys.path.insert(0, '/content/drive/MyDrive/Colab Notebooks/rlres/utils/codes')
%cd /content/drive/MyDrive/Colab\ Notebooks/rlres/utils/codes

from track_sort import Sort

### Useful functions

In [None]:
def send_recoreded_video_file_to_firebase(output_file_video):
  # Initialize Firebase
  if not firebase_admin._apps:
      cred = credentials.Certificate("/content/drive/MyDrive/Colab Notebooks/rlres/utils/service.json")
      firebase_admin.initialize_app(cred, {
        "storageBucket": "atles-dbabc.appspot.com"
      })

  try:
    # Sign in the with email and password
    user = auth.get_user_by_email("maxoluwatosin@gmail.com")

    # Initialize Firestore
    db = firestore.client()

    bucket = storage.bucket()
    storage_path = f"{user.uid}/videos/{date_folder}/{time_file}.mp4"
    blob = bucket.blob(storage_path)
    blob.upload_from_filename(output_file_video)

    # Get the download URL of the uploaded video
    download_url = blob.generate_signed_url(expiration=datetime.timedelta(seconds=99999999999), method='GET')

    # Create the necessary collections and documents
    all_video_docs = db.collection(user.uid).document("all_videos")

    all_video_docs.set({"Description": "This document contains all the videos captured by this camera."})

    video_docs = all_video_docs.collection('videos').document(date_folder)
    doc = video_docs.get()

    # Add to the documents if as appropiate
    video_docs.set({
          "name": [time_file] + video_docs.get().to_dict().get('name', []) if doc.exists else [time_file],
          "downloadUrl": [download_url] + video_docs.get().to_dict().get('downloadUrl', []) if doc.exists else [download_url]
      }, merge = True)

  except Exception as e:
    print(e)

def send_red_light_violation_to_emails_and_firebase(vehicle_info, date_folder, time_file):
    # Initialize Firebase with your credentials
    if not firebase_admin._apps:
      cred = credentials.Certificate("/content/drive/MyDrive/Colab Notebooks/rlres/utils/service.json")
      firebase_admin.initialize_app(cred, {
        "storageBucket": "atles-dbabc.appspot.com"
      })

    # Sign in the with email and password
    user = auth.get_user_by_email("maxoluwatosin@gmail.com")

    # Access storage
    bucket = storage.bucket()

    # Access Firestore
    db = firestore.client()

    # variables
    image_sent = set()
    plate_data = {}
    data_found = {}

    # Loop through the list of plate numbers
    for key, value in vehicle_info.items():
        plate_numbers = value[1]
        plates_id = str(key) + time_file
        plate_data[plates_id] = plate_numbers
        data_found[plates_id] = []
        image_url = ''

        for plate_number in plate_numbers:

            # Reference to the "plate_numbers" collection
            plate_numbers_collection = db.collection('plate_numbers')

            # Query Firestore for the document with the matching plate number
            doc_ref = plate_numbers_collection.document(plate_number)
            doc = doc_ref.get()

            if doc.exists:
                # Plate number found
                data_found[plates_id].append('Yes')

                # Retrieve owner's information from the document
                owner_data = doc.to_dict()

                # Compose the email message
                subject = "Red Light Violation"
                body = f"Dear {owner_data['name']},\n\nYou are requested to report to the nearest FRSC office for violating a red light on {date_folder} at {time_file}. \n\nKind regards, \n\nFRSC."

                # Attach the image to the email
                image_filename = f"/content/images/{date_folder}/{key}.jpg"
                image_attachment = attach_image(image_filename)

                # Send the email using SMTP
                send_email(owner_data['email'], subject, body, image_attachment)

                # Upload image to firebase storage
                image_url += upload_image_to_storage(user, bucket, image_filename, key)
                image_sent.add(key)

                # Update Firestore with owner's information and image URL
                update_firestore_if_exist(user, db, owner_data, image_url, plate_number)

            else:
                # Data not found
                data_found[plates_id].append('No')

                # Send image only if it has not been sent before
                if key not in image_sent:
                    # Upload image to firebase storage
                    image_filename = f"/content/images/{date_folder}/{key}.jpg"
                    image_url += upload_image_to_storage(user, bucket, image_filename, key)
                    image_sent.add(key)

        # Update Firestore with plate number and image URL
        update_firestore(user, db, image_url, plate_data, plates_id, data_found)

        # Clear variables data
        data_found.clear()
        plate_data.clear()
        image_url = ''

def update_firestore(user, db, image_url, plate_data, plates_id, data_found):

    now = datetime.datetime.now()
    date = now.strftime("%Y-%m-%d")
    time = now.strftime("%H-%M-%S")

    # Create the necessary collections and documents
    all_violators_docs = db.collection(user.uid).document("all_violators")
    all_violators_docs.set({"Description": "This document contains all the violations captured by this camera."})
    violators_docs = all_violators_docs.collection('violators').document(date)
    doc = violators_docs.get()

    # Add to the documents if as appropiate
    violators_docs.set({
          "plate_numbers": [plate_data] + violators_docs.get().to_dict().get('plate_numbers', []) if doc.exists else [plate_data],
          "data_found": [data_found] + violators_docs.get().to_dict().get('data_found', []) if doc.exists else [data_found],
          "image_url": [image_url] + violators_docs.get().to_dict().get('image_url', []) if doc.exists else [image_url],
          "plates_id": [plates_id] + violators_docs.get().to_dict().get('plates_id', []) if doc.exists else [plates_id],
          "time": [time] + violators_docs.get().to_dict().get('time', []) if doc.exists else [time]
      }, merge = True)


def update_firestore_if_exist(user, db, owner_data, image_url, plate_number):

    now = datetime.datetime.now()
    date = now.strftime("%Y-%m-%d")
    time = now.strftime("%H-%M-%S")

    # Reference to the "red_light_violations" collection
    violations_collection = db.collection(user.uid).document('histories')

    violations_collection.set({"Description": "This document contains all the history of cited violations."})

    # Data to be added to the Firestore document
    violation_data = {
        'name': owner_data['name'],
        'email': owner_data['email'],
        'image_url': image_url,
        'plate_number': plate_number,
        'date': date,
        'time': time
    }

    # Add the data to Firestore
    violations_collection.collection('history').document().set(violation_data)

def upload_image_to_storage(user, bucket, image_filename, key):
    image_blob = bucket.blob(f"{user.uid}/violators/{date_folder}/{key}.jpg")
    image_blob.upload_from_filename(image_filename)

    # Get the download URL of the uploaded image
    image_url = image_blob.generate_signed_url(expiration=datetime.timedelta(seconds=99999999999), method='GET')

    return image_url

def attach_image(image_filename):
    # Open and attach the image to the email
    with open(image_filename, 'rb') as image_file:
        image_attachment = MIMEImage(image_file.read())
        image_attachment.add_header('Content-Disposition', f'attachment; filename={image_filename}')
    return image_attachment

def send_email(to_email, subject, body, attachment=None):
    # Configure SMTP server and credentials
    smtp_server = 'smtp.gmail.com'
    smtp_port = 587  # Modify as needed
    sender_email = 'maxoluwatosin@gmail.com'
    sender_password = 'pbuiexnitprzleka'

    # Create a multipart email message
    message = MIMEMultipart()
    message['From'] = sender_email
    message['To'] = to_email
    message['Subject'] = subject

    # Attach the body text
    message.attach(MIMEText(body, 'plain'))

    # Attach the image if provided
    if attachment:
        message.attach(attachment)

    # Send the email using SMTP
    with smtplib.SMTP(smtp_server, smtp_port) as server:
        server.starttls()
        server.login(sender_email, sender_password)
        server.sendmail(sender_email, to_email, message.as_string())

def license_complies_format(text):
  """
  Check if the license plate text complies with the required format.

  Args:
      text (str): License plate text.

  Returns:
      bool: True if the license plate complies with the format, False otherwise.
  """

  # Define regular expressions for the two formats
  format1 = r'^[A-Za-z]{3}-\d{2,3}[A-Za-z]{2}$'
  format2 = r'^[A-Za-z]{2}\d{3}-[A-Za-z]{3}$'
  format3 = r'^[A-Za-z]{3}-[0-9]{4}[A-Za-z]{1}$'


  # Check if the input string matches either of the formats
  if re.match(format1, text) or re.match(format2, text) or re.match(format3, text):
      return True
  else:
      return False

def write_csv(results, output_path):
    """
    Write the results to a CSV file.

    Args:
        results (dict): Dictionary containing the results.
        output_path (str): Path to the output CSV file.
    """
    with open(output_path, 'w') as f:
        f.write('{},{},{},{},{},{},{}\n'.format('frame_nmr', 'car_id', 'car_bbox',
                                                'license_plate_bbox', 'license_plate_bbox_score', 'license_number',
                                                'license_number_score'))

        for frame_nmr in results.keys():
            for car_id in results[frame_nmr].keys():
                print(results[frame_nmr][car_id])
                if 'car' in results[frame_nmr][car_id].keys() and \
                   'license_plate' in results[frame_nmr][car_id].keys() and \
                   'text' in results[frame_nmr][car_id]['license_plate'].keys():
                    f.write('{},{},{},{},{},{},{}\n'.format(frame_nmr,
                                                            car_id,
                                                            '[{} {} {} {}]'.format(
                                                                results[frame_nmr][car_id]['car']['bbox'][0],
                                                                results[frame_nmr][car_id]['car']['bbox'][1],
                                                                results[frame_nmr][car_id]['car']['bbox'][2],
                                                                results[frame_nmr][car_id]['car']['bbox'][3]),
                                                            '[{} {} {} {}]'.format(
                                                                results[frame_nmr][car_id]['license_plate']['bbox'][0],
                                                                results[frame_nmr][car_id]['license_plate']['bbox'][1],
                                                                results[frame_nmr][car_id]['license_plate']['bbox'][2],
                                                                results[frame_nmr][car_id]['license_plate']['bbox'][3]),
                                                            results[frame_nmr][car_id]['license_plate']['bbox_score'],
                                                            results[frame_nmr][car_id]['license_plate']['text'],
                                                            results[frame_nmr][car_id]['license_plate']['text_score'])
                            )
        f.close()

def read_license_plate(license_plate_crop):
  """
  Read the license plate text from the given cropped image.

  Args:
      license_plate_crop (PIL.Image.Image): Cropped image containing the license plate.

  Returns:
      tuple: Tuple containing the formatted license plate text and its confidence score.
  """

  reader = easyocr.Reader(['en'])

  detections = reader.readtext(license_plate_crop)

  for detection in detections:
      bbox, text, score = detection
      print(text)
      if ' ' in text:
          si = text.index(' ')
          if si == 3 or si == 5:
              text = text.upper().replace(' ', '-')
          else:
              text = text.upper().replace(' ', '')
      if ':' in text:
          si = text.index(':')
          if si == 3 or si == 5:
              text = text.upper().replace(':', '-')
          else:
              text = text.upper().replace(' ', '')
      if license_complies_format(text):
          print(text)
          return text, score

  return None, None

def get_car(license_plate, vehicle_track_ids):
    """
    Retrieve the vehicle coordinates and ID based on the license plate coordinates.

    Args:
        license_plate (tuple): Tuple containing the coordinates of the license plate (x1, y1, x2, y2, score, class_id).
        vehicle_track_ids (list): List of vehicle track IDs and their corresponding coordinates.

    Returns:
        tuple: Tuple containing the vehicle coordinates (x1, y1, x2, y2) and ID.
    """
    x1, y1, x2, y2, score, class_id = license_plate

    foundIt = False
    for j in range(len(vehicle_track_ids)):
        xcar1, ycar1, xcar2, ycar2, car_id = vehicle_track_ids[j]

        if x1 > xcar1 and y1 > ycar1 and x2 < xcar2 and y2 < ycar2:
            car_indx = j
            foundIt = True
            break

    if foundIt:
        return vehicle_track_ids[car_indx]

    return -1, -1, -1, -1, -1

def preocess(frame):
  # Define variables
  vehicles = [3, 4, 6, 8]
  vehicles_info = {}
  mot_tracker = Sort()
  temporary_plates = []
  frame_nmr = -1

  # load models
  model = YOLO("yolov8s.pt")
  license_plate_detector = YOLO('/content/drive/MyDrive/Colab Notebooks/rlres/runs/detect/version8n/weights/version8n.pt')

  # Get the current date and time
  now = datetime.datetime.now()
  date_folder = now.strftime("%Y-%m-%d")
  time_file = now.strftime("%H-%M-%S")

  # Define the folder structure for local storage
  local_storage_path_images = os.path.join("/content/images", date_folder)
  local_storage_path_videos = os.path.join("/content/recorded_videos", date_folder)
  video_output_file = os.path.join(local_storage_path_videos, f"{time_file}.mp4")
  os.makedirs(local_storage_path_images, exist_ok=True)
  os.makedirs(local_storage_path_videos, exist_ok=True)

  # Create a VideoWriter to save the video file
  out = cv2.VideoWriter(video_output_file, cv2.VideoWriter_fourcc(*'mp4v'), 25, (640, 480))

  # detect vehicles
  detections = model(frame)[0]
  detections_ = []

  for detection in detections.boxes.data.tolist():
      x1, y1, x2, y2, score, class_id = detection
      if int(class_id) in vehicles:
          detections_.append([x1, y1, x2, y2, score])

  track_ids = mot_tracker.update(np.asarray(detections_))

  # detect license plates
  license_plates = license_plate_detector(frame)[0]
  for license_plate in license_plates.boxes.data.tolist():
      x1, y1, x2, y2, score, class_id = license_plate

      if int(class_id) == 1:
          continue

      # assign license plate to car
      xcar1, ycar1, xcar2, ycar2, car_id = get_car(license_plate, track_ids)
      car_id = int(car_id)

      if car_id != -1:

          # crop license plate
          license_plate_crop = resized_frame[int(y1):int(y2), int(x1):int(x2), :]

          # process license plate
          license_plate_crop_gray = cv2.cvtColor(license_plate_crop, cv2.COLOR_BGR2GRAY)
          _, license_plate_crop_thresh = cv2.threshold(license_plate_crop_gray, 200, 255, cv2.THRESH_BINARY)

          # read license plate number
          license_plate_text, license_plate_text_score = read_license_plate(license_plate_crop_thresh)

          # make decisions based on certain conditions
          if license_plate_text is not None and license_plate_text not in temporary_plates:
              temporary_plates.append(license_plate_text)
              print(license_plate_text)

              # checks if a car ID is present. Creates a dictionary if not, updates it if yes
              if car_id not in vehicles_info:
                  vehicles_info[car_id] = [score, [license_plate_text]]
              else:
                  vehicles_info[car_id][1].append(license_plate_text)
                  vehicles_info[car_id][0] = score

              # saves a single image per vehicle found based on highest confidence score
              if score >= vehicles_info[car_id][0]:
                  # crop vehicle
                  vehicle_crop = resized_frame[int(ycar1):int(ycar2), int(xcar1):int(xcar2), :]
                  filename = os.path.join(local_storage_path_images, f"{car_id}.jpg")

                  # save the cropped vehicle
                  cv2.imwrite(filename, vehicle_crop)

  out.write(frame)

  # Release resources
  out.release()
  cap.release()

  # Process extracted information
  send_red_light_violation_to_emails_and_firebase(vehicles_info, date_folder, time_file)

  # Send the video stream to firebase
  send_recoreded_video_file_to_firebase(video_output_file)

### EasyOCR

In [None]:
reader = easyocr.Reader(['en'], gpu=False)

# Load the image
img = cv2.imread('/content/drive/MyDrive/Colab Notebooks/rlres/test2.png')

# Convert it to greyscale and save to current directory
grey_img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
cv2.imwrite("grey_image.jpg", grey_img)

# Convert to B&W and save to current directory
_, threshold_img = cv2.threshold(grey_img, 127, 255, cv2.THRESH_BINARY)
cv2.imwrite("/content/thresh_image.jpg", threshold_img)

# Add empty space around the image (e.g., 50 pixels of padding)
padding = 50
image_with_padding = cv2.copyMakeBorder(threshold_img, padding, padding, padding, padding, cv2.BORDER_CONSTANT, value=[255, 255, 255])

# Read the text on the image
results = reader.readtext(image_with_padding)

# Copy the padded image
annotated_image = image_with_padding.copy()

for (bbox, text, prob) in results:
    (top_left, top_right, bottom_right, bottom_left) = bbox
    top_left = tuple(map(int, top_left))
    bottom_right = tuple(map(int, bottom_right))

    # Draw a bounding box around the text
    cv2.rectangle(annotated_image, top_left, bottom_right, (0, 0, 255), 2)

    # Put the extracted text on the image
    cv2.putText(annotated_image, text, (top_left[0], top_left[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)

cv2.imwrite('/content/annotated_image.jpg', annotated_image)

### Model training

In [None]:
!scp -r /content/runs/ "/content/drive/MyDrive/Colab Notebooks/rlres/"

In [None]:
# Load a model
model = YOLO("yolov5mu.pt")  # build a new model from scratch

file_path = "/content/drive/MyDrive/Colab Notebooks/rlres/data/config.yaml"

# Use the model
results = model.train(data = file_path, epochs = 120)  # train the model

### This section does the following:
* Process each video frames
* Extract any detected license plate number
* Crop the vehicle
* Email offenders if found in the database and send their information to firebase
* if not found in the database, send the vehicle's image, capture time, date, and extracted plate number to firebase

In [None]:
# Define variables
vehicles = [2, 3, 4, 6, 8]
vehicles_info = {}
mot_tracker = Sort()
temporary_plates = []
frame_nmr = -1

# load models
model = YOLO("yolov8s.pt")
license_plate_detector = YOLO('/content/drive/MyDrive/Colab Notebooks/rlres/runs/detect/version8n/weights/version8n.pt')

# load video
video_url = "/content/drive/MyDrive/Colab Notebooks/rlres/utils/test_videos/test11.mp4"
cap = cv2.VideoCapture(video_url)

# Width and height of frames to be processed
new_width = 640
new_height = 480

# Get the current date and time
now = datetime.datetime.now()
date_folder = now.strftime("%Y-%m-%d")
time_file = now.strftime("%H-%M-%S")

# Define the folder structure for local storage
local_storage_path_images = os.path.join("/content/images", date_folder)
local_storage_path_videos = os.path.join("/content/recorded_videos", date_folder)
video_output_file = os.path.join(local_storage_path_videos, f"{time_file}.mp4")
os.makedirs(local_storage_path_images, exist_ok=True)
os.makedirs(local_storage_path_videos, exist_ok=True)

# Create a VideoWriter to save the video file
fps = cap.get(cv2.CAP_PROP_FPS)
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
out = cv2.VideoWriter(video_output_file, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))

while cap.isOpened():

  response_, frame = cap.read()

  if not response_:
    break

  # detect vehicles
  detections = model(frame)[0]
  detections_ = []
  for detection in detections.boxes.data.tolist():
      x1, y1, x2, y2, score, class_id = detection
      if int(class_id) in vehicles:
          detections_.append([x1, y1, x2, y2, score])


  # track vehicles
  track_ids = mot_tracker.update(np.asarray(detections_))

  # detect license plates
  license_plates = license_plate_detector(frame)[0]
  for license_plate in license_plates.boxes.data.tolist():
      x1, y1, x2, y2, score, class_id = license_plate

      if int(class_id) == 1:
          continue

      # assign license plate to car
      xcar1, ycar1, xcar2, ycar2, car_id = get_car(license_plate, track_ids)

      if car_id != -1:

          # crop license plate
          license_plate_crop = frame[int(y1):int(y2), int(x1): int(x2), :]

          # process license plate
          license_plate_crop_gray = cv2.cvtColor(license_plate_crop, cv2.COLOR_BGR2GRAY)
          _, license_plate_crop_thresh = cv2.threshold(license_plate_crop_gray, 127, 255, cv2.THRESH_BINARY_INV)

          # read license plate number
          license_plate_text, license_plate_text_score = read_license_plate(license_plate_crop_thresh)

          # make decisions based on certain conditions
          if license_plate_text is not None and license_plate_text not in temporary_plates:
              temporary_plates.append(license_plate_text)
              print(license_plate_text)

              # checks if a car ID is present. Creates a dictionary if not, updates it if yes
              if car_id not in vehicles_info:
                  vehicles_info[car_id] = [score, [license_plate_text]]
              else:
                  vehicles_info[car_id][1].append(license_plate_text)
                  vehicles_info[car_id][0] = score

              # saves a single image per vehicle found based on highest confidence score
              if score >= vehicles_info[car_id][0]:
                  # crop vehicle
                  vehicle_crop = frame[int(ycar1):int(ycar2), int(xcar1):int(xcar2), :]
                  if vehicle_crop.size != 0:
                      filename = os.path.join(local_storage_path_images, f"{car_id}.jpg")

                      # save the cropped vehicle
                      cv2.imwrite(filename, vehicle_crop)

  out.write(frame)

# Release resources
out.release()
cap.release()

# Process extracted information
send_red_light_violation_to_emails_and_firebase(vehicles_info, date_folder, time_file)

# Send the video stream to firebase
send_recoreded_video_file_to_firebase(video_output_file)

### Grab the frame data sent from esp32 cam and open it using openCV for processing.

In [None]:
async def receive_and_display_frames():
    # WebSocket server URL (replace with your ESP32 IP and port)
    ws_url = "ws://172.28.0.12:6000"

    # Open a WebSocket connection
    async with websockets.connect(ws_url) as websocket:
        while True:
            try:
                frame_data = await websocket.recv()
                frame_bytes = np.frombuffer(frame_data, dtype=np.uint8)
                frame = cv2.imdecode(frame_bytes, cv2.IMREAD_COLOR)

                if frame is not None:
                    # Display the frame using OpenCV (you can replace this with your processing logic)
                    print(frame_data)

            except websockets.exceptions.ConnectionClosed as e:
                print(f"Connection closed: {e}")
                break

            except Exception as e:
                print(f"Error: {e}")

loop = asyncio.get_event_loop()
loop.run_until_complete(receive_and_display_frames())


In [None]:
import csv
import numpy as np
from scipy.interpolate import interp1d


def interpolate_bounding_boxes(data):
    # Extract necessary data columns from input data
    frame_numbers = np.array([int(row['frame_nmr']) for row in data])
    car_ids = np.array([int(float(row['car_id'])) for row in data])
    car_bboxes = np.array([list(map(float, row['car_bbox'][1:-1].split())) for row in data])
    license_plate_bboxes = np.array([list(map(float, row['license_plate_bbox'][1:-1].split())) for row in data])

    interpolated_data = []
    unique_car_ids = np.unique(car_ids)
    for car_id in unique_car_ids:

        frame_numbers_ = [p['frame_nmr'] for p in data if int(float(p['car_id'])) == int(float(car_id))]
        print(frame_numbers_, car_id)

        # Filter data for a specific car ID
        car_mask = car_ids == car_id
        car_frame_numbers = frame_numbers[car_mask]
        car_bboxes_interpolated = []
        license_plate_bboxes_interpolated = []

        first_frame_number = car_frame_numbers[0]
        last_frame_number = car_frame_numbers[-1]

        for i in range(len(car_bboxes[car_mask])):
            frame_number = car_frame_numbers[i]
            car_bbox = car_bboxes[car_mask][i]
            license_plate_bbox = license_plate_bboxes[car_mask][i]

            if i > 0:
                prev_frame_number = car_frame_numbers[i-1]
                prev_car_bbox = car_bboxes_interpolated[-1]
                prev_license_plate_bbox = license_plate_bboxes_interpolated[-1]

                if frame_number - prev_frame_number > 1:
                    # Interpolate missing frames' bounding boxes
                    frames_gap = frame_number - prev_frame_number
                    x = np.array([prev_frame_number, frame_number])
                    x_new = np.linspace(prev_frame_number, frame_number, num=frames_gap, endpoint=False)
                    interp_func = interp1d(x, np.vstack((prev_car_bbox, car_bbox)), axis=0, kind='linear')
                    interpolated_car_bboxes = interp_func(x_new)
                    interp_func = interp1d(x, np.vstack((prev_license_plate_bbox, license_plate_bbox)), axis=0, kind='linear')
                    interpolated_license_plate_bboxes = interp_func(x_new)

                    car_bboxes_interpolated.extend(interpolated_car_bboxes[1:])
                    license_plate_bboxes_interpolated.extend(interpolated_license_plate_bboxes[1:])

            car_bboxes_interpolated.append(car_bbox)
            license_plate_bboxes_interpolated.append(license_plate_bbox)

        for i in range(len(car_bboxes_interpolated)):
            frame_number = first_frame_number + i
            row = {}
            row['frame_nmr'] = str(frame_number)
            row['car_id'] = str(car_id)
            row['car_bbox'] = ' '.join(map(str, car_bboxes_interpolated[i]))
            row['license_plate_bbox'] = ' '.join(map(str, license_plate_bboxes_interpolated[i]))

            if str(frame_number) not in frame_numbers_:
                # Imputed row, set the following fields to '0'
                row['license_plate_bbox_score'] = '0'
                row['license_number'] = '0'
                row['license_number_score'] = '0'
            else:
                # Original row, retrieve values from the input data if available
                original_row = [p for p in data if int(p['frame_nmr']) == frame_number and int(float(p['car_id'])) == int(float(car_id))][0]
                row['license_plate_bbox_score'] = original_row['license_plate_bbox_score'] if 'license_plate_bbox_score' in original_row else '0'
                row['license_number'] = original_row['license_number'] if 'license_number' in original_row else '0'
                row['license_number_score'] = original_row['license_number_score'] if 'license_number_score' in original_row else '0'

            interpolated_data.append(row)

    return interpolated_data


# Load the CSV file
with open('/content/testing.csv', 'r') as file:
    reader = csv.DictReader(file)
    data = list(reader)

# Interpolate missing data
interpolated_data = interpolate_bounding_boxes(data)

# Write updated data to a new CSV file
header = ['frame_nmr', 'car_id', 'car_bbox', 'license_plate_bbox', 'license_plate_bbox_score', 'license_number', 'license_number_score']
with open('/content/test_interpolated.csv', 'w', newline='') as file:
    writer = csv.DictWriter(file, fieldnames=header)
    writer.writeheader()
    writer.writerows(interpolated_data)

In [None]:
import ast

import cv2
import numpy as np
import pandas as pd


def draw_border(img, top_left, bottom_right, color=(0, 255, 0), thickness=10, line_length_x=200, line_length_y=200):
    x1, y1 = top_left
    x2, y2 = bottom_right

    cv2.line(img, (x1, y1), (x1, y1 + line_length_y), color, thickness)  #-- top-left
    cv2.line(img, (x1, y1), (x1 + line_length_x, y1), color, thickness)

    cv2.line(img, (x1, y2), (x1, y2 - line_length_y), color, thickness)  #-- bottom-left
    cv2.line(img, (x1, y2), (x1 + line_length_x, y2), color, thickness)

    cv2.line(img, (x2, y1), (x2 - line_length_x, y1), color, thickness)  #-- top-right
    cv2.line(img, (x2, y1), (x2, y1 + line_length_y), color, thickness)

    cv2.line(img, (x2, y2), (x2, y2 - line_length_y), color, thickness)  #-- bottom-right
    cv2.line(img, (x2, y2), (x2 - line_length_x, y2), color, thickness)

    return img


results = pd.read_csv('/content/test_interpolated.csv')

# load video
video_path = "/content/drive/MyDrive/Colab Notebooks/rlres/utils/test_videos/test11.mp4"
cap = cv2.VideoCapture(video_path)

fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Specify the codec
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter('/content/out.mp4', fourcc, 25, (width, height))

license_plate = {}
for car_id in np.unique(results['car_id']):
    max_ = np.amax(results[results['car_id'] == car_id]['license_number_score'])
    license_plate[car_id] = {'license_crop': None,
                             'license_plate_number': results[(results['car_id'] == car_id) &
                                                             (results['license_number_score'] == max_)]['license_number'].iloc[0]}
    cap.set(cv2.CAP_PROP_POS_FRAMES, results[(results['car_id'] == car_id) &
                                             (results['license_number_score'] == max_)]['frame_nmr'].iloc[0])
    ret, frame = cap.read()

    x1, y1, x2, y2 = ast.literal_eval(results[(results['car_id'] == car_id) &
                                              (results['license_number_score'] == max_)]['license_plate_bbox'].iloc[0].replace('[ ', '[').replace('   ', ' ').replace('  ', ' ').replace(' ', ','))

    license_crop = frame[int(y1):int(y2), int(x1):int(x2), :]
    license_crop = cv2.resize(license_crop, (int((x2 - x1) * 400 / (y2 - y1)), 400))

    license_plate[car_id]['license_crop'] = license_crop


frame_nmr = -1

cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

# read frames
ret = True
while ret:
    ret, frame = cap.read()
    frame_nmr += 1
    if ret:
        df_ = results[results['frame_nmr'] == frame_nmr]
        for row_indx in range(len(df_)):
            # draw car
            car_x1, car_y1, car_x2, car_y2 = ast.literal_eval(df_.iloc[row_indx]['car_bbox'].replace('[ ', '[').replace('   ', ' ').replace('  ', ' ').replace(' ', ','))
            draw_border(frame, (int(car_x1), int(car_y1)), (int(car_x2), int(car_y2)), (0, 255, 0), 10,
                        line_length_x=200, line_length_y=200)

            # draw license plate
            x1, y1, x2, y2 = ast.literal_eval(df_.iloc[row_indx]['license_plate_bbox'].replace('[ ', '[').replace('   ', ' ').replace('  ', ' ').replace(' ', ','))
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 0, 255), 5)

            # crop license plate
            license_crop = license_plate[df_.iloc[row_indx]['car_id']]['license_crop']

            H, W, _ = license_crop.shape

            try:
                frame[int(car_y1) - H - 100:int(car_y1) - 100,
                      int((car_x2 + car_x1 - W) / 2):int((car_x2 + car_x1 + W) / 2), :] = license_crop

                frame[int(car_y1) - H - 100:int(car_y1) - H - 100,
                      int((car_x2 + car_x1 - W) / 2):int((car_x2 + car_x1 + W) / 2), :] = (255, 255, 255)

                (text_width, text_height), _ = cv2.getTextSize(
                    license_plate[df_.iloc[row_indx]['car_id']]['license_plate_number'],
                    cv2.FONT_HERSHEY_SIMPLEX,
                    3.0,
                    13)

                cv2.putText(frame,
                            license_plate[df_.iloc[row_indx]['car_id']]['license_plate_number'],
                            (int((car_x2 + car_x1 - text_width) / 2), int(car_y1 - H - 200 + (text_height / 2))),
                            cv2.FONT_HERSHEY_SIMPLEX,
                            3.0,
                            (0, 255, 0),
                            13)

            except:
                pass

        out.write(frame)
        frame = cv2.resize(frame, (1280, 720))

        # cv2.imshow('frame', frame)
        # cv2.waitKey(0)

out.release()
cap.release()