# Vehicle Detection & Confirmation System
## Version 1
Created by Vikram Anantha \
Continued from Ben Dwyer's code \
Summer 2023 

## Summary

This code is meant to be implemented into Road-Side Systems (RSSs), like traffic cameras, such that it can communicate with vehicles, especially Autonomous Vehicles (AVs).

### Background
One problem that might arise when AVs communicate with RSSs is that a hacker with malicious intent can join the same channel and communicate with the RSSs as if they were the vehicle. To combat this, the RSS can command the vehicle to confirm its identity by performing a specific task. Examples of this include:
 + Displaying a specific pattern on a screen, like a QR code
 + Flashing headlights in a specific pattern
 + Making a sound in a specific pattern

Once the vehicle performs the task, the RSS can confirm this has been done visually, thus confirming the identiy of the vehicle. This visual confirmation, a form of Two Factor Authentication (2FA) is the premise of this code.


### Overview of this code

Ben's initial code did the following: \
Given a [video](https://photos.app.goo.gl/KZE2xpdJrcP1h6hf6) of a [Remote Controlled Car with Arduino Display (RCC+AD)](https://photos.app.goo.gl/JfqdrNorKNP7Z9vz5) coming towards a camera
(the camera acted as a traffic camera), it should:
1. Slice the video into its frames
2. For each frame, detect the vehicle and the pattern shown, draw boxes and label if its confirmed (this takes a long time)
3. Put all the frames together into a video
4. save the video

My code took his initial code, however instead of using a sample video, it uses live webcam feed. \
Everything else was the same.

### Shortcomings with this code

Although this Version does work, it doesn't work that well.
 + When using live feed, there is about a second of delay per frame to detect if a vehicle is present, and to recognize there is no bounding box. It is much more time to recognize the array in the bounding box, which needs to be done for each vehicle.
 + The code runs a for loop to go through each vehicle and detect the pattern, meaning it verifies each vehicle one by one. Because each vehicle takes a long time to be verified, when multiple vehicles are present, the system will take a _very_ long time.
 + When using a webcam, or a dedicated camera for the computer (as in not a smartphone camera, which uses HDR to make screens appear normal with everything else), the Arduino display is super bright, and cannot be recognized.
 + The last point also doesn't really matter as much, since in the real environment, having a screen on the windshield won't be implemented. Instead, it would most likely leverage the headlights, having them flash a pattern across time, rather than display a pattern across space.

Version 2 should address all of these shortcomings.

## FFMPEG & Roboflow Install
This block only needs to be run once on a machine

In [7]:
# Basically don't touch this part- it is kind of delicate and should basically work
# If the os.system lines don't work, then run those lines yourself
# This cell only needs to be run once on your machine. After that, everything has already been installed

# This cell also isn't entirely important if you only want to run the live feed functions
from IPython.display import clear_output
import os, urllib.request
import getpass
HOME = os.path.expanduser("~")
pathDoneCMD = f'{HOME}/doneCMD.sh'
# if not os.path.exists(f"{HOME}/.ipython/ttmg.py"):
#     hCode = "https://raw.githubusercontent.com/yunooooo/gcct/master/res/ttmg.py"
#     urllib.request.urlretrieve(hCode, f"{HOME}/.ipython/ttmg.py")

# # from tmg import (
# #     loadingAn,
# #     textAn,
# # )

# # loadingAn(name="lds")
# # textAn("Installing Dependencies...", ty='twg')
os.system('pip install git+git://github.com/AWConant/jikanpy.git')
os.system('sudo add-apt-repository -y ppa:jonathonf/ffmpeg-4')
os.system('sudo apt-get update')
os.system('sudo apt install mediainfo')
os.system('sudo apt-get install ffmpeg')
# password = getpass.getpass()
# pw_commands = [
#    "sudo -S add-apt-repository -y ppa:jonathonf/ffmpeg-4",
#     "sudo -S apt-get update",
#     "sudo -S apt install mediainfo",
#     "sudo -S apt-get install ffmpeg"
# ]
# for command in pw_commands:
#     os.popen(command, 'w').write(password+'\n')
clear_output()
print('Installation finished.')

Collecting git+git://github.com/AWConant/jikanpy.git
  Cloning git://github.com/AWConant/jikanpy.git to /tmp/pip-req-build-vxf13lf9


  Running command git clone -q git://github.com/AWConant/jikanpy.git /tmp/pip-req-build-vxf13lf9


In [None]:
# Install Roboflow (this is actually important)
!pip3 install roboflow

## Imports and setup

In [1]:
# Imports everything that should be needed

from roboflow import Roboflow
import json
from time import sleep
from PIL import Image, ImageDraw, ImageFont
import io
import base64
import requests
from os.path import exists
import os, sys, re, glob
import time
import cv2
import numpy as np
from ecapture import ecapture as ec

font_dir = "Extras/Product Sans Bold.ttf" # this is for the images created and displayed

print("Imports have been imported")

Imports have been imported


### Roboflow
[Roboflow](https://roboflow.com/) is a platform which houses a vast variety of models, mainly for computer vision. It is often used because it is easy to train the models, and prediction happens over the cloud. For computer vision, Roboflow uses [YOLOv8](https://blog.roboflow.com/whats-new-in-yolov8/) (You Only Look Once), the fastest and most accurate architecture.

Ben has 3 models used in this code: one for detecting the vehicle, one for detecting the bounding box, and one for detecting whether each box is on or off in the bounding box. To access Roboflow, you need to provide the API key and the project name for each model.

In [2]:
# Retreive the roboflow models 

rf = Roboflow(api_key="vAsGYa1MuHAuaPrRdiar")

project_car = rf.workspace().project("rc-vehicle-detection_v2")
model_car = project_car.version(1).model

### Bounding box for the LED array
project_led = rf.workspace().project("led-signal-detection_v2")
model_led = project_led.version(2).model

### Determines if an LED is on or off
on_off_project = rf.workspace().project("led-signal-detection_v3")
on_off_model = on_off_project.version(4).model

print("Models retreived")

loading Roboflow workspace...
loading Roboflow project...
loading Roboflow workspace...
loading Roboflow project...
loading Roboflow workspace...
loading Roboflow project...
Models retreived


# Helper Code

In [5]:
# Video files pathfiles variables

# This block isn't actually needed for the live feed, but is kept for legacy
drive = '05'
video_dir = 'Videos/drive_%s/' % drive
input_video = video_dir + 'drive_%s_vikram2023.mp4' % drive
input_video_frames_dir = video_dir + "frames/"
output_video_frames_dir = video_dir + "frames_wboxes/"
output_video = video_dir + 'drive_%s_wboxes_vikram2023.mp4' % drive

# Here is a bunch of formats that are defined in one place, for convenience
extention = ".png"
frames_format = "frame_%04d" + extention
twofa_crop_dir = video_dir + "on_off_crop/"
twofa_frames_dir = input_video_frames_dir
aoi_crop_dir = video_dir + 'aoi_crop/'
aoi_crop_format = 'aoi_crop%s.jpg'

In [6]:
# A bunch of helper functions, related to labeling the image

## Draws boxes on the image
def draw_boxes(box, x0, y0, img, class_name, color=None, weight=5):
    # OPTIONAL - color map, change the key-values for each color to make the
    # class output labels specific to your dataset
    color_map = {
        "RC-Car":"red"
    }

    # get position coordinates
    bbox = ImageDraw.Draw(img) 
    if (color == None): outline_color = color_map[class_name]
    else: outline_color = color
    bbox.rectangle(box, outline = outline_color, width=weight)
    bbox.text((x0, y0), class_name, fill='black', anchor='mm')

    return img

## Puts a label on the image
def label_img(box_pos = (0, 0), img=None, text_label="", box_color=(255, 255, 255), weight=5, font_size=15):
    
    label = ImageDraw.Draw(img)

    # Define the position and size of the red box
    box_size = (500, 100)

    # Draw the red box
    label.rectangle((box_pos, (box_pos[0]+box_size[0], box_pos[1]+box_size[1])), fill=box_color, width=weight)

    # Define the position and font of the text
    text_pos = (box_pos[0]+25, box_pos[1]+40)
    text_font = ImageFont.truetype(font_dir, font_size)
    # Draw the white text
    text_color = (255, 255, 255)
    label.text(text_pos, text_label, fill=text_color, font=text_font)

    return img

## Saves the labeled and boxed image
def save_with_bbox_renders(img):
    file_name = os.path.basename(img.filename)
    img.save(output_video_frames_dir + file_name)




In [None]:
# More helper functions, related to the object detection

## Prepares the image before it is put through the model
def prep_img(file_path):
    image = cv2.imread(file_path)

    # Get the image dimensions
    height, width = image.shape[:2]

    # Define the maximum width and height of the resized image
    max_size = 1000

    # Calculate the aspect ratio of the image
    aspect_ratio = width / height

    # Calculate the new dimensions for the resized image
    # all the training data was square images
    if width > height:
        new_width = max_size
        new_height = int(new_width / aspect_ratio)
    else:
        new_height = max_size
        new_width = int(new_height * aspect_ratio)

    # Resize the image using bilinear interpolation
    img = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_LINEAR)
    
    return img

## Gets the predictions, and presents the vars to crop / label
def make_predictions(model, img):
    predictions = model.predict(img).json()['predictions']
    img_copy = img.copy()

    print("4> ", predictions)
    prediction = predictions[0]
    x = int(prediction['x'])
    y = int(prediction['y'])
    height = int(prediction['height'])
    width = int(prediction['width'])
    cls = prediction['class']
    
    cropped_img = img[y-height//2:y+height//2, x-width//2:x+width//2]
    
    return x, y, height, width, cropped_img

def render_boxed_image(image, verbose=False):
    # INFERENCE
    predictions = model_car.predict(image).json()['predictions']
    newly_rendered_image = Image.open(image)

    # RENDER 
    # for each detection, create a crop and convert into CLIP encoding
    if verbose: print(predictions)
    for prediction in predictions:
        # rip bounding box coordinates from current detection
        # note: infer returns center points of box as (x,y) and width, height
        # ----- but pillow crop requires the top left and bottom right points to crop
        x0 = prediction['x'] - prediction['width'] / 2
        x1 = prediction['x'] + prediction['width'] / 2
        y0 = prediction['y'] - prediction['height'] / 2
        y1 = prediction['y'] + prediction['height'] / 2
        box = (x0, y0, x1, y1)

        newly_rendered_image = draw_boxes(box, x0, y0, newly_rendered_image, prediction['class'])
    return newly_rendered_image

## Take the bounding box, and crop the image up into just the on/off squares
def crop_thirds(image, row, col, sz):
    # Get the dimensions of the input image
    height, width = image.shape[:2]
    
    # Determine the size of the square to crop
    size = min(height, width)
    
    # Crop the input image to a square
    x = (width - size) // 2
    y = (height - size) // 2
    square = image[y:y+size, x:x+size]
    
    # Determine the dimensions of each third of the square
    third_size = size // sz
    x = third_size * (col - 1)
    y = third_size * (row - 1)
    
    # Crop the specified section of the square
    crop = square[y:y+third_size, x:x+third_size]
    
    cv2.imwrite(twofa_crop_dir + f'{row}{col}.jpg', crop)


## Decode the pattern
def find_code(directory, model):
    ans = []

    path = directory + '*.jpg'
    # Use the glob function to get a list of file paths
    file_paths = glob.glob(path)

    # Use the sorted function to sort the file paths in ascending order
    sorted_file_paths = sorted(file_paths)

    # Iterate through the sorted file paths
    for path in sorted_file_paths:
        if not os.path.isdir(path):
            on_off = model.predict(path).json()['predictions']

            val = on_off[0]['predictions'][0]['class']
            if val == "On":
                a = 1
            else:
                a = 0
            ans.append(a)
    return ans

def decode_signal(sz, directory, expected, cropped_img):
    
    # Decoding Signal
    for col in range(1, sz+1):
        for row in range(1, sz+1):
            crop_thirds(cropped_img, row, col, sz)

    ans = find_code(directory, on_off_model)
    
    if ans == expected:
#         print("Vehicle verified. ")
        return ans
    else:
#         print("Unable to verify vehicle.")
        return ans

# 2FA Code

### How does 2FA work

For each frame:
 1. Detect if there is a vehicle (the model technically detected RCC+ADs):
    1. Simply do `roboflow_model.predict()` to detect if a vehicle is there and get the coords
    2. Draw the box around the vehicle, label the image
    3. Crop the frame such that it's only the vehicle ([like this](https://photos.app.goo.gl/R4HdeR6yvDfXgNsc9))
 2. Detect if there is an Arduino Display which shows a bounding box
    1. Use a roboflow model
    2. If so, crop the image again so it only shows the bounding box
    3. If not, just say so
 3. Detect the pattern inside the bounding box
    1. Slice the image (which at this point is just a 3x3 grid of on/off squares) into the 9 small squares
    2. For each square, use roboflow to predict whether the box is off or on
 4. If the pattern is correct, then the identity has been confirmed
 5. Else, then it hasn't been confimed _yet_

In [None]:
# Main code that does all the 2 factor Authentication
def twofa(image, model_car, model_led, expected, sz, verified):   
    print()
    img_to_crop = cv2.imread(image)
    newly_rendered_image = Image.open(image)

    # Vehicle Predictions
    # to be replaced with a faster obj detection model?
    
    rc_predictions = model_car.predict(image).json()['predictions']
    
    # print(rc_predictions)

    # RENDER 
    for prediction in rc_predictions:
        x0 = prediction['x'] - prediction['width'] / 2
        x1 = prediction['x'] + prediction['width'] / 2
        y0 = prediction['y'] - prediction['height'] / 2
        y1 = prediction['y'] + prediction['height'] / 2
        box = (x0, y0, x1, y1)

        start = time.time()
        if verified == False:
            newly_rendered_image = draw_boxes(box, x0, y0, newly_rendered_image, prediction['class'], (255, 0, 0), weight=7)
        else: 
            newly_rendered_image = draw_boxes(box, x0, y0, newly_rendered_image, prediction['class'], (0, 255, 0), weight=7)
        print(f"{round(time.time()-start, 2)} seconds to draw boxes")

        if verified == False:
            start = time.time()
            AOI_img = img_to_crop.copy()
            height, width = AOI_img.shape[:2]
            x0 = int(max(0, x0))
            x1 = int(min(width, x1))
            y0 = int(max(0, y0))
            y1 = int(min(height, y1))
            AOI_img = AOI_img[y0:y1, x0:x1]
            # file_path = aoi_crop_dir + (aoi_crop_format % i)
            file_path = 'Videos/aoi_frame_crop.jpg'
            cv2.imwrite(file_path, AOI_img)
            print(f"{round(time.time()-start, 2)} seconds to save AOI_img")
            try:     
                # LED Array Predictions
                start = time.time()
                img = prep_img(file_path)
                print(f"{round(time.time()-start, 2)} seconds to prep img")
                start = time.time()
                x, y, height, width, cropped_img = make_predictions(model_led, img)
                print(f"{round(time.time()-start, 2)} seconds to prep img")
                # print(x, y)
                print("Attempting to verify vehicle: Array Found")

                directory = twofa_crop_dir

                ans = decode_signal(sz, directory, expected, cropped_img)

                print(ans)

                if ans == expected:
                    verified = True
                    newly_rendered_image = label_img(box_pos = (x0,y0), img=newly_rendered_image, text_label="Vehicle ID Confirmed", box_color=(0, 255, 0), font_size=30)
                else:
                    newly_rendered_image = label_img(box_pos = (x0,y0), img=newly_rendered_image, text_label="Vehicle Unidentified", box_color=(255, 0, 0), font_size=30)

            except IndexError:
                print("Attempting to verify vehicle: No array found.")
        else:
            print("Vehicle has been verified.")
            
    return newly_rendered_image

In [None]:
# This does 2fa on a video file (or the frames of the video file)
# This function isn't actually used, but is there for legacy
def run_on_globbed(globbed_files, model_car, model_led, expected, sz):

    for i in range(len(globbed_files)):
        print(i+1,"/", len(globbed_files))
        newly_rendered_image = twofa(globbed_files[i], model_car, model_led, expected, sz, False)

        save_with_bbox_renders(newly_rendered_image)

In [None]:
# Main code that is run for live feed

def use_livefeed_twofa():
  
    # define a video capture object
    vid = cv2.VideoCapture(-1) # use the camera
    vid.set(cv2.CAP_PROP_FRAME_WIDTH, 3840) 
    vid.set(cv2.CAP_PROP_FRAME_HEIGHT, 2160)
    # 4k dimensions
    filename = 'Videos/frame_cv.png'
    
    verified = False
    expected = [
        1, 0, 0,
        0, 0, 0,
        0, 1, 1
    ] # sample patter
    sz = 3 # how many rows&cols
    
    while(True):

        # Capture the video frame by frame
        startbig = time.time()
        for _ in range(4): 
            vid.grab()
        # for some reason cv2 takes the next 5 frames and buffers them
        # this means that whats displayed is 5 frames behind
        # but when each frame takes a second to process, that's a 5 second delay
        # this makes sure its only looking at the current frame
        
        
        ret, frame = vid.read()

        # Zed cameras have two cameras so only use one of them
        frame = np.hsplit(frame, 2)[0]
        
        
        # do vehicle detection
        cv2.imwrite(filename, frame) # roboflow model needs a file thats aleady saved on the computer
        opencv_boxed_image = twofa(filename, model_car, model_led, expected, sz, verified)
        opencv_boxed_image = np.array(opencv_boxed_image)[:, :, ::-1].copy() # turns the PIL image to a CV2-ready image
        
        # display frame
        cv2.imshow('frame', opencv_boxed_image)
        
        print(f"{round(time.time()-startbig, 2)} seconds overall")
        # the 'q' button is set as the
        # quitting button you may use any
        # desired button of your choice
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # After the loop release the cap object
    vid.release()
    # Destroy all the windows
    cv2.destroyAllWindows()
    

## Actual Main Code that should be run for V1

In [None]:
# MAIN CODE

use_livefeed_twofa()

In [None]:
# For if you want to test 2FA on an image

globbed_files = ['Videos/the_car_3.jpg']
expected = [
    1, 0, 0,
    0, 0, 0,
    0, 1, 1
]
run_on_globbed(globbed_files, model_car, model_led, expected, 3)

## Legacy functions (that I don't want to delete)
I'm a very sentimental person \
These functions might also be useful in the future idk

In [None]:
# Video helper functions
# These functions aren't actually used in the live feed, but are there for legacy

def import_video(input_vid_val=input_video, input_frames_val=input_video_frames_dir+frames_format, fps=25):
    # break video down into images - UPDATE THE PATH TO THE FILE!
    os.environ['inputFile'] = input_vid_val

    # fps value: the number of frames to sample per second from the video
    # !ffmpeg  -hide_banner -loglevel error -i "$inputFile" -vf fps=25 "$inputFile_out%04d.png" 
    os.system(f'ffmpeg -hide_banner -loglevel error -i "$inputFile" -vf fps={fps} {input_frames_val}')
    print("Frames have been gotten!")

def export_video_with_boxes(input_val = input_video_frames_dir + frames_format, output_val = output_video, fps=25):
    # stich images together into video
    os.system(f"ffmpeg -r {fps} -s 1920x1080 -i {input_val} -vcodec libx264 -crf {fps}  -pix_fmt yuv420p {output_val}")
    # !ffmpeg -r 25 -s 1920x1080 -i '/Users/bendwyer/Documents/Academic/2022-2023/Thesis/Vehicle Detection/%04d.png' -vcodec libx264 -crf 25  -pix_fmt yuv420p '/Users/bendwyer/Documents/Academic/2022-2023/Thesis/Vehicle Detection/test.mp4'


# Detect vehicle with sample video

In [None]:
import_video()

In [None]:
# perform inference on each image from the split up video

# glob files based on location and file format
globbed_files = sorted(glob.glob(input_video_frames_dir + '*' + extention))
# print(globbed_files)


start = time.time()
for image in globbed_files:
  newly_rendered_image = render_boxed_image(image)
  # WRITE
  save_with_bbox_renders(newly_rendered_image, verbose=True)
end = time.time()


print(f"It took {(end-start)/60} minutes to detect {len(globbed_files)} frames")

In [None]:
export_video_with_boxes()

# Live view object detection

In [None]:
## This code only detects the vehicle, doesn't do any 2 factor authentication

# This code isn't actually used, but is there for legacy

def use_livefeed_objdet():
  
    # define a video capture object
    vid = cv2.VideoCapture(0)
    filename = 'Videos/frame_cv.png'

    while(True):

        # Capture the video frame by frame
        ret, frame = vid.read()
        
        # Zed cameras have two cameras so only use one of them
        frame = np.hsplit(frame, 2)[0]
        
        # do vehicle detection
        cv2.imwrite(filename, frame)
        opencv_boxed_image = np.array(render_boxed_image(filename))
        opencv_boxed_image = opencv_boxed_image[:, :, ::-1].copy() 
        
        # display frame
        cv2.imshow('frame', opencv_boxed_image)

        # the 'q' button is set as the
        # quitting button you may use any
        # desired button of your choice
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    # After the loop release the cap object
    vid.release()
    # Destroy all the windows
    cv2.destroyAllWindows()

In [None]:
use_livefeed_objdet()

## Ignore beyond this point

In [None]:
### IGNORE THIS ###

ROBOFLOW_API_KEY = "vAsGYa1MuHAuaPrRdiar"
ROBOFLOW_MODEL = "rc-vehicle-detection_v2" # eg xx-xxxx--#
ROBOFLOW_SIZE = 416

import cv2
import base64
import numpy as np
import requests

upload_url = "".join([
    "https://detect.roboflow.com/",
    ROBOFLOW_MODEL,
    "?access_token=",
    ROBOFLOW_API_KEY,
    "&format=image",
    "&stroke=5"
]) 

In [None]:
### IGNORE THIS ###

def infer(video):
    # Get the current image from the webcam
    ret, img = video.read()

    # Resize (while maintaining the aspect ratio) to improve speed and save bandwidth
    height, width, channels = img.shape
    scale = ROBOFLOW_SIZE / max(height, width)
    img = cv2.resize(img, (round(scale * width), round(scale * height)))

    # Encode image to base64 string
    retval, buffer = cv2.imencode('.jpg', img)
    img_str = base64.b64encode(buffer)

    # Get prediction from Roboflow Infer API
    resp = requests.post(upload_url, data=img_str, headers={
        "Content-Type": "application/x-www-form-urlencoded"
    }, stream=True).raw

    # Parse result image
    image = np.asarray(bytearray(resp.read()), dtype="uint8")
    image = cv2.imdecode(image, cv2.IMREAD_COLOR)
    print(image)
    return image

In [None]:
### IGNORE THIS ###

video = cv2.VideoCapture(-1)

while 1:
    # On "q" keypress, exit
    if(cv2.waitKey(1) == ord('q')):
        break

    # Synchronously get a prediction from the Roboflow Infer API
    image = infer(video)
    # And display the inference results
    try:
        cv2.imshow('image', image)
    except Exception as e:
        print("oops")
    
# Release resources when finished
video.release()
cv2.destroyAllWindows()