## Imports

In [19]:
import ast
import cv2 as cv
import easyocr
from glob import glob
import numpy as np
import pandas as pd
import string
from ultralytics import YOLO
import torch
import torchvision
easyocr.__version__

'1.7.1'

In [20]:
# regular pre-trained yolov8 model for car recognition
# coco_model = YOLO('yolov8n.pt')
coco_model = YOLO('yolov8s.pt')
# yolov8 model trained to detect number plates
np_model = YOLO('/Users/carter/Desktop/Code/B351FinalProject/B351-Final-Project/model/runs/detect/yolov8n_custom3/weights/best.pt')

In [21]:
# read in test video paths
videos = glob('video_test/*.mp4')
print(videos)

['video_test/helmet-video.mp4']


## License Plate Detection

In [22]:
import os
# read video by index
video = cv.VideoCapture(videos[0])

directory = 'detection_processing/'
os.makedirs(directory, exist_ok=True)

ret = True
frame_number = -1
vehicles = [2,3,5]

# read the 10 first frames
while ret:
    frame_number += 1
    ret, frame = video.read()

    if ret and frame_number < 10:
        
        # vehicle detector
        detections = coco_model.track(frame, persist=True)[0]
        for detection in detections.boxes.data.tolist():
            x1, y1, x2, y2, track_id, score, class_id = detection
            if int(class_id) in vehicles and score > 0.5:
                vehicle_bounding_boxes = []
                vehicle_bounding_boxes.append([x1, y1, x2, y2, track_id, score])
                for bbox in vehicle_bounding_boxes:
                    print(bbox)
                    roi = frame[int(y1):int(y2), int(x1):int(x2)]
                    # debugging check if bbox lines up with detected vehicles (should be identical to save_crops() above
                    # cv.imwrite(str(track_id) + '.jpg', roi)
                    
                    # license plate detector for region of interest
                    license_plates = np_model(roi)[0]
                    # check every bounding box for a license plate
                    for license_plate in license_plates.boxes.data.tolist():
                        plate_x1, plate_y1, plate_x2, plate_y2, plate_score, _ = license_plate
                        # verify detections
                        print(license_plate, 'track_id: ' + str(bbox[4]))
                        plate = roi[int(plate_y1):int(plate_y2), int(plate_x1):int(plate_x2)]
                        file_path = os.path.join(directory, str(track_id) + '.jpg')
                        cv.imwrite(file_path, plate)
                        
video.release()


0: 640x384 2 persons, 2 cars, 1 motorcycle, 1 truck, 174.5ms
Speed: 3.0ms preprocess, 174.5ms inference, 1.5ms postprocess per image at shape (1, 3, 640, 384)
[133.1885986328125, 189.4381103515625, 191.1517791748047, 281.4183349609375, 3.0, 0.7747134566307068]

0: 640x416 1 rider, 79.1ms
Speed: 2.6ms preprocess, 79.1ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 416)
[0.815595269203186, 1.019182562828064, 55.8717155456543, 89.83650207519531, 0.4948098361492157, 2.0] track_id: 3.0
[215.67654418945312, 149.22122192382812, 267.58416748046875, 189.71771240234375, 4.0, 0.7722992897033691]

0: 512x640 (no detections), 82.2ms
Speed: 2.0ms preprocess, 82.2ms inference, 0.4ms postprocess per image at shape (1, 3, 512, 640)
[32.33158874511719, 132.7964630126953, 108.39532470703125, 219.58045959472656, 5.0, 0.6891348361968994]

0: 640x576 1 rider, 91.0ms
Speed: 3.1ms preprocess, 91.0ms inference, 0.8ms postprocess per image at shape (1, 3, 640, 576)
[1.5993813276290894, 0.9898613

## Preprocessing

In [23]:
# read video by index
import os
video = cv.VideoCapture(videos[0])

directory = 'detection_processing/'
os.makedirs(directory, exist_ok=True)

ret = True
frame_number = -1
vehicles = [2,3,5]

# read the 10 first frames
while ret:
    frame_number += 1
    ret, frame = video.read()

    if ret and frame_number < 100:
        
        # vehicle detector
        detections = coco_model.track(frame, persist=True)[0]
        for detection in detections.boxes.data.tolist():
            x1, y1, x2, y2, track_id, score, class_id = detection
            if int(class_id) in vehicles and score > 0.5:
                vehicle_bounding_boxes = []
                vehicle_bounding_boxes.append([x1, y1, x2, y2, track_id, score])
                for bbox in vehicle_bounding_boxes:
                    print(bbox)
                    roi = frame[int(y1):int(y2), int(x1):int(x2)]
                    
                    # license plate detector for region of interest
                    license_plates = np_model(roi)[0]
                    # process license plate
                    for license_plate in license_plates.boxes.data.tolist():
                        plate_x1, plate_y1, plate_x2, plate_y2, plate_score, _ = license_plate
                        # crop plate from region of interest
                        plate = roi[int(plate_y1):int(plate_y2), int(plate_x1):int(plate_x2)]
                        # de-colorize
                        plate_gray = cv.cvtColor(plate, cv.COLOR_BGR2GRAY)
                        # posterize
                        _, plate_treshold = cv.threshold(plate_gray, 64, 255, cv.THRESH_BINARY_INV)
                        
                        file_path = os.path.join(directory, str(track_id) + '_gray.jpg')
                        cv.imwrite(file_path, plate_gray)
                        
                        file_path = os.path.join(directory, str(track_id) + '_thresh.jpg')
                        cv.imwrite(file_path, plate_treshold)
                        
video.release()




0: 640x384 4 persons, 3 cars, 1 motorcycle, 1 truck, 299.8ms
Speed: 4.6ms preprocess, 299.8ms inference, 2.0ms postprocess per image at shape (1, 3, 640, 384)
[132.41757202148438, 193.11776733398438, 189.8925323486328, 279.964111328125, 3.0, 0.7747134566307068]

0: 640x448 1 rider, 135.2ms
Speed: 3.3ms preprocess, 135.2ms inference, 1.3ms postprocess per image at shape (1, 3, 640, 448)
[216.4652862548828, 149.1117706298828, 268.1229248046875, 188.9789276123047, 4.0, 0.7722992897033691]

0: 480x640 (no detections), 141.7ms
Speed: 2.2ms preprocess, 141.7ms inference, 0.4ms postprocess per image at shape (1, 3, 480, 640)
[13.504748344421387, 133.3866424560547, 108.01368713378906, 220.3870391845703, 5.0, 0.6891348361968994]

0: 608x640 (no detections), 238.1ms
Speed: 5.7ms preprocess, 238.1ms inference, 0.6ms postprocess per image at shape (1, 3, 608, 640)
[336.24139404296875, 156.6688232421875, 398.10687255859375, 209.06312561035156, 9.0, 0.5901923775672913]

0: 576x640 (no detections), 1

## Reading the License Plate

In [24]:
reader = easyocr.Reader(['en'], gpu=False)

Using CPU. Note: This module is much faster with a GPU.


In [25]:
def read_license_plate(license_plate_crop):
    detections = reader.readtext(license_plate_crop)
    for detection in detections:
        bbox, text, score = detection
        
        text = text.upper().replace(' ', '')
        
        return text, score
    
    return None, None

In [26]:
def write_csv(results, output_path):
    
    with open(output_path, 'w') as f:
        f.write('{},{},{},{},{},{},{},{}\n'.format(
            'frame_number', 'track_id', 'car_bbox', 'car_bbox_score',
            'license_plate_bbox', 'license_plate_bbox_score', 'license_plate_number',
            'license_text_score'))

        for frame_number in results.keys():
            for track_id in results[frame_number].keys():
                print(results[frame_number][track_id])
                if 'car' in results[frame_number][track_id].keys() and \
                   'license_plate' in results[frame_number][track_id].keys() and \
                   'number' in results[frame_number][track_id]['license_plate'].keys():
                    f.write('{},{},{},{},{},{},{},{}\n'.format(
                        frame_number,
                        track_id,
                        '[{} {} {} {}]'.format(
                            results[frame_number][track_id]['car']['bbox'][0],
                            results[frame_number][track_id]['car']['bbox'][1],
                            results[frame_number][track_id]['car']['bbox'][2],
                            results[frame_number][track_id]['car']['bbox'][3]
                        ),
                        results[frame_number][track_id]['car']['bbox_score'],
                        '[{} {} {} {}]'.format(
                            results[frame_number][track_id]['license_plate']['bbox'][0],
                            results[frame_number][track_id]['license_plate']['bbox'][1],
                            results[frame_number][track_id]['license_plate']['bbox'][2],
                            results[frame_number][track_id]['license_plate']['bbox'][3]
                        ),
                        results[frame_number][track_id]['license_plate']['bbox_score'],
                        results[frame_number][track_id]['license_plate']['number'],
                        results[frame_number][track_id]['license_plate']['text_score'])
                    )
        f.close()

In [27]:
results = {}

# read video by index
video = cv.VideoCapture(videos[0])

ret = True
frame_number = -1
vehicles = [2,3,5]

# read the 10 first frames
while ret:
    frame_number += 1
    ret, frame = video.read()

    if ret and frame_number < 100:
        results[frame_number] = {}
        
        # vehicle detector
        detections = coco_model.track(frame, persist=True)[0]
        for detection in detections.boxes.data.tolist():
            x1, y1, x2, y2, track_id, score, class_id = detection
            if int(class_id) in vehicles and score > 0.5:
                vehicle_bounding_boxes = []
                vehicle_bounding_boxes.append([x1, y1, x2, y2, track_id, score])
                for bbox in vehicle_bounding_boxes:
                    print(bbox)
                    roi = frame[int(y1):int(y2), int(x1):int(x2)]
                    
                    # license plate detector for region of interest
                    license_plates = np_model(roi)[0]
                    # process license plate
                    for license_plate in license_plates.boxes.data.tolist():
                        plate_x1, plate_y1, plate_x2, plate_y2, plate_score, _ = license_plate
                        # crop plate from region of interest
                        plate = roi[int(plate_y1):int(plate_y2), int(plate_x1):int(plate_x2)]
                        # de-colorize
                        plate_gray = cv.cvtColor(plate, cv.COLOR_BGR2GRAY)
                        # posterize
                        _, plate_treshold = cv.threshold(plate_gray, 64, 255, cv.THRESH_BINARY_INV)
                        
                        # OCR
                        np_text, np_score = read_license_plate(plate_treshold)
                        # if plate could be read write results
                        if np_text is not None:
                            results[frame_number][track_id] = {
                                'car': {
                                    'bbox': [x1, y1, x2, y2],
                                    'bbox_score': score
                                },
                                'license_plate': {
                                    'bbox': [plate_x1, plate_y1, plate_x2, plate_y2],
                                    'bbox_score': plate_score,
                                    'number': np_text,
                                    'text_score': np_score
                                }
                            }

write_csv(results, './results.csv')
video.release()




0: 640x384 1 person, 1 car, 289.2ms
Speed: 2.5ms preprocess, 289.2ms inference, 1.6ms postprocess per image at shape (1, 3, 640, 384)
[215.8395233154297, 147.2574462890625, 270.61907958984375, 192.97804260253906, 4.0, 0.7722992897033691]

0: 544x640 (no detections), 100.2ms
Speed: 2.7ms preprocess, 100.2ms inference, 0.4ms postprocess per image at shape (1, 3, 544, 640)

0: 640x384 2 persons, 2 cars, 1 motorcycle, 1 truck, 135.0ms
Speed: 2.0ms preprocess, 135.0ms inference, 0.9ms postprocess per image at shape (1, 3, 640, 384)
[221.29647827148438, 149.33387756347656, 270.7431335449219, 190.68141174316406, 4.0, 0.7515806555747986]

0: 544x640 (no detections), 86.8ms
Speed: 2.4ms preprocess, 86.8ms inference, 0.4ms postprocess per image at shape (1, 3, 544, 640)
[136.93309020996094, 188.99301147460938, 194.08404541015625, 281.02716064453125, 23.0, 0.790433406829834]

0: 640x416 1 rider, 79.1ms
Speed: 2.8ms preprocess, 79.1ms inference, 0.7ms postprocess per image at shape (1, 3, 640, 416

## Clean Up

In [28]:
# Mapping dictionaries for character conversion
# characters that can easily be confused can be 
# verified by their location - an `O` in a place
# where a number is expected is probably a `0`
dict_char_to_int = {'O': '0',
                    'I': '1',
                    'J': '3',
                    'A': '4',
                    'G': '6',
                    'S': '5'}

dict_int_to_char = {'0': 'O',
                    '1': 'I',
                    '3': 'J',
                    '4': 'A',
                    '6': 'G',
                    '5': 'S'}

In [29]:
def license_complies_format(text):
    # True if the license plate complies with the format, False otherwise.
    if len(text) != 7:
        return False

    if (text[0] in string.ascii_uppercase or text[0] in dict_int_to_char.keys()) and \
       (text[1] in string.ascii_uppercase or text[1] in dict_int_to_char.keys()) and \
       (text[2] in ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'] or text[2] in dict_char_to_int.keys()) and \
       (text[3] in ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'] or text[3] in dict_char_to_int.keys()) and \
       (text[4] in string.ascii_uppercase or text[4] in dict_int_to_char.keys()) and \
       (text[5] in string.ascii_uppercase or text[5] in dict_int_to_char.keys()) and \
       (text[6] in string.ascii_uppercase or text[6] in dict_int_to_char.keys()):
        return True
    else:
        return False

In [30]:
def format_license(text):
    license_plate_ = ''
    mapping = {0: dict_int_to_char, 1: dict_int_to_char, 4: dict_int_to_char, 5: dict_int_to_char, 6: dict_int_to_char,
               2: dict_char_to_int, 3: dict_char_to_int}
    for j in [0, 1, 2, 3, 4, 5, 6]:
        if text[j] in mapping[j].keys():
            license_plate_ += mapping[j][text[j]]
        else:
            license_plate_ += text[j]

    return license_plate_

In [31]:
def read_license_plate(license_plate_crop):
    detections = reader.readtext(license_plate_crop)

    for detection in detections:
        bbox, text, score = detection

        text = text.upper().replace(' ', '')

        # verify that text is conform to a standard license plate
        if license_complies_format(text):
            # bring text into the default license plate format
            return format_license(text), score

    return None, None

In [32]:
results = {}

# read video by index
video = cv.VideoCapture(videos[0])

ret = True
frame_number = -1
vehicles = [2,3,5]

# read the entire video
while ret:
    ret, frame = video.read()
    frame_number += 1
    if ret:
        results[frame_number] = {}
        
        # vehicle detector
        detections = coco_model.track(frame, persist=True)[0]
        for detection in detections.boxes.data.tolist():
            x1, y1, x2, y2, track_id, score, class_id = detection
            if int(class_id) in vehicles and score > 0.5:
                vehicle_bounding_boxes = []
                vehicle_bounding_boxes.append([x1, y1, x2, y2, track_id, score])
                for bbox in vehicle_bounding_boxes:
                    print(bbox)
                    roi = frame[int(y1):int(y2), int(x1):int(x2)]
                    
                    # license plate detector for region of interest
                    license_plates = np_model(roi)[0]
                    # process license plate
                    for license_plate in license_plates.boxes.data.tolist():
                        plate_x1, plate_y1, plate_x2, plate_y2, plate_score, _ = license_plate
                        # crop plate from region of interest
                        plate = roi[int(plate_y1):int(plate_y2), int(plate_x1):int(plate_x2)]
                        # de-colorize
                        plate_gray = cv.cvtColor(plate, cv.COLOR_BGR2GRAY)
                        # posterize
                        _, plate_treshold = cv.threshold(plate_gray, 64, 255, cv.THRESH_BINARY_INV)
                        
                        # OCR
                        np_text, np_score = read_license_plate(plate_treshold)
                        # if plate could be read write results
                        if np_text is not None:
                            results[frame_number][track_id] = {
                                'car': {
                                    'bbox': [x1, y1, x2, y2],
                                    'bbox_score': score
                                },
                                'license_plate': {
                                    'bbox': [plate_x1, plate_y1, plate_x2, plate_y2],
                                    'bbox_score': plate_score,
                                    'number': np_text,
                                    'text_score': np_score
                                }
                            }

write_csv(results, './results.csv')
video.release()




0: 640x384 1 person, 1 car, 153.2ms
Speed: 2.2ms preprocess, 153.2ms inference, 0.8ms postprocess per image at shape (1, 3, 640, 384)
[215.8395233154297, 147.2574462890625, 270.61907958984375, 192.97804260253906, 4.0, 0.7722992897033691]

0: 544x640 (no detections), 252.4ms
Speed: 10.5ms preprocess, 252.4ms inference, 0.4ms postprocess per image at shape (1, 3, 544, 640)

0: 640x384 2 persons, 2 cars, 1 motorcycle, 1 truck, 247.5ms
Speed: 3.2ms preprocess, 247.5ms inference, 0.9ms postprocess per image at shape (1, 3, 640, 384)
[221.29647827148438, 149.33387756347656, 270.7431335449219, 190.681396484375, 4.0, 0.7515806555747986]

0: 544x640 (no detections), 156.1ms
Speed: 3.1ms preprocess, 156.1ms inference, 0.4ms postprocess per image at shape (1, 3, 544, 640)
[136.93309020996094, 188.99301147460938, 194.08404541015625, 281.02716064453125, 40.0, 0.790433406829834]

0: 640x416 1 rider, 91.9ms
Speed: 2.6ms preprocess, 91.9ms inference, 1.1ms postprocess per image at shape (1, 3, 640, 41

ValueError: not enough values to unpack (expected 7, got 6)

In [None]:
results = pd.read_csv('./results.csv')

# show results for tracking ID `1` - sort by OCR prediction confidence
results[results['track_id'] == 1.].sort_values(by='license_text_score', ascending=False)

: 

## Visualization

In [None]:
def draw_border(img, top_left, bottom_right, color=(0, 255, 0), thickness=6, line_length_x=200, line_length_y=200):
    x1, y1 = top_left
    x2, y2 = bottom_right

    cv.line(img, (x1, y1), (x1, y1 + line_length_y), color, thickness)  #-- top-left
    cv.line(img, (x1, y1), (x1 + line_length_x, y1), color, thickness)

    cv.line(img, (x1, y2), (x1, y2 - line_length_y), color, thickness)  #-- bottom-left
    cv.line(img, (x1, y2), (x1 + line_length_x, y2), color, thickness)

    cv.line(img, (x2, y1), (x2 - line_length_x, y1), color, thickness)  #-- top-right
    cv.line(img, (x2, y1), (x2, y1 + line_length_y), color, thickness)

    cv.line(img, (x2, y2), (x2, y2 - line_length_y), color, thickness)  #-- bottom-right
    cv.line(img, (x2, y2), (x2 - line_length_x, y2), color, thickness)

    return img

: 

In [None]:
# read video by index
video = cv.VideoCapture(videos[0])

# get video dims
frame_width = int(video.get(3))
frame_height = int(video.get(4))
size = (frame_width, frame_height)

# Define the codec and create VideoWriter object
fourcc = cv.VideoWriter_fourcc(*'DIVX')
out = cv.VideoWriter('./outputs/processed.avi', fourcc, 20.0, size)

# reset video before you re-run cell below
frame_number = -1
video.set(cv.CAP_PROP_POS_FRAMES, 0)

: 

In [None]:
ret = True

while ret:
    ret, frame = video.read()
    frame_number += 1
    if ret:
        df_ = results[results['frame_number'] == frame_number]
        for index in range(len(df_)):
            # draw car
            vhcl_x1, vhcl_y1, vhcl_x2, vhcl_y2 = ast.literal_eval(df_.iloc[index]['car_bbox'].replace('[ ', '[').replace('   ', ' ').replace('  ', ' ').replace(' ', ','))
            
            draw_border(
                frame, (int(vhcl_x1), int(vhcl_y1)),
                (int(vhcl_x2), int(vhcl_y2)), (0, 255, 0),
                12, line_length_x=200, line_length_y=200)
            
            # draw license plate
            plate_x1, plate_y1, plate_x2, plate_y2 = ast.literal_eval(df_.iloc[index]['license_plate_bbox'].replace('[ ', '[').replace('   ', ' ').replace('  ', ' ').replace(' ', ','))

            # region of interest
            roi = frame[int(vhcl_y1):int(vhcl_y2), int(vhcl_x1):int(vhcl_x2)]
            cv.rectangle(roi, (int(plate_x1), int(plate_y1)), (int(plate_x2), int(plate_y2)), (0, 0, 255), 6)

            # write detected number
            (text_width, text_height), _ = cv.getTextSize(
                df_.iloc[index]['license_plate_number'],
                cv.FONT_HERSHEY_SIMPLEX,
                2,
                6)

            cv.putText(
                frame,
                df_.iloc[index]['license_plate_number'],
                (int((vhcl_x2 + vhcl_x1 - text_width)/2), int(vhcl_y1 - text_height)),
                cv.FONT_HERSHEY_SIMPLEX,
                2,
                (0, 255, 0),
                6
            )

        out.write(frame)
        frame = cv.resize(frame, (1280, 720))

out.release()
video.release()

: 