#### 1) Import the prerequisite packages.

In [1]:
import numpy as np
import cv2
import math
import obd
import time
from pygame import mixer

pygame 2.1.2 (SDL 2.0.18, Python 3.9.13)
Hello from the pygame community. https://www.pygame.org/contribute.html


#### 2) Create a class that is able to track a moving object. 
- Finds the center point of each bounding box.
- Calculates distance between a box's center point in consecutive frames.
- If distance is less than a specified amount, assigns a unique ID to the bounding box.

In [2]:
# Written by Sergio Canu from PySource
class EuclideanDistTracker:
    def __init__(self):
        # Store the center positions of the objects
        self.center_points = {}
        # Keep the count of the IDs
        # each time a new object id detected, the count will increase by one
        self.id_count = 1


    def update(self, objects_rect):
        # Objects boxes and ids
        objects_bbs_ids = []

        # Get center point of new object
        for rect in objects_rect:
            x, y, w, h = rect
            cx = (x + x + w) // 2
            cy = (y + y + h) // 2

            # Find out if that object was detected already
            same_object_detected = False
            for id, pt in self.center_points.items():
                dist = math.hypot(cx - pt[0], cy - pt[1])

                if dist < 10:
                    self.center_points[id] = (cx, cy)
                    #print(self.center_points)
                    objects_bbs_ids.append([x, y, w, h, id])
                    same_object_detected = True
                    break

            # New object is detected we assign the ID to that object
            if same_object_detected is False:
                self.center_points[self.id_count] = (cx, cy)
                objects_bbs_ids.append([x, y, w, h, self.id_count])
                self.id_count += 1
                #print(self.id_count)
                #print(objects_bbs_ids)

        # Clean the dictionary by center points to remove IDS not used anymore
        new_center_points = {}
        for obj_bb_id in objects_bbs_ids:
            _, _, _, _, object_id = obj_bb_id
            center = self.center_points[object_id]
            new_center_points[object_id] = center

        # Update dictionary with IDs not used removed
        self.center_points = new_center_points.copy()
        return objects_bbs_ids

#### 3) Initialize the OBD-II sensor and record the speed of the user's vehicle.

In [3]:
# .Async ensures that this runs in a separate thread.
connection = obd.Async(portstr = "COM3", baudrate="9600", fast=False, timeout = 30) 
while len(connection.supported_commands) < 100:
    connection = obd.Async(portstr = "COM3", baudrate="9600", fast=False, timeout = 30)

mySpeed = 0
c1 = obd.commands.SPEED
c2 = obd.commands.RUN_TIME

def speedTracker(s):
    global mySpeed
    if not s.is_null():
        mySpeed = int(s.value.magnitude * .621)

connection.watch(c1, callback=speedTracker)
connection.start()

#### 4) Implement YOLOv4 object detection in tandem with OpenCV.
- Creates an "alert zone" wherein any vehicle that enters is flagged as being an object of interest.
- Approximates the distance between oncoming vehicle and camera.
- Approximates the speed of oncoming vehicle.
- Alerts if vehicle is in region and has a high speed in relation to user's vehicle.

In [6]:
mixer.init()
tracker = EuclideanDistTracker()

net = cv2.dnn.readNet(r"C:\Users\rsand\Downloads\yolov4-tiny.weights", r"C:\Users\rsand\Downloads\yolov4-tiny.cfg")
sound = mixer.Sound(r"C:\Users\rsand\Downloads\alarm.wav")

classes = []
with open(r"C:\Users\rsand\Downloads\coco.names", "r") as f:
    classes = f.read().splitlines()
    
cap = cv2.VideoCapture(1, cv2.CAP_DSHOW) # Captures a live feed of secondary webcam
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# cap = cv2.VideoCapture(r"Path to video goes here")

font = cv2.FONT_HERSHEY_PLAIN
colors = np.random.uniform(0, 255, size=(100, 3))

distanceDictionary = {}
relSpeedDictionary = {}

mySpeed = 0
theirSpeed = 0
toleranceSpeed = 5 
frameGap = 10

def curveFit(area):
    distance = -21.24629 + (99172160 - -21.24629)/(1 + (area/8.629923e-7)**0.6045811) # Equation varies depending on camera
    return distance

while True:
    ret, img = cap.read()
    if not ret:
        break
        
    height, width, _ = img.shape

    # Runs YOLOv4 on the frame and creates a list of detected objects and their bounding boxes
    blob = cv2.dnn.blobFromImage(img, 1/255, (416, 416), (0,0,0), swapRB=True, crop=False)
    net.setInput(blob)
    output_layers_names = net.getUnconnectedOutLayersNames()
    layerOutputs = net.forward(output_layers_names)

    boxes = []
    confidences = []
    class_ids = []
    
    # Gets the confidence levels for each detected object and if confidence > 0.3, appends object attributes to respective lists
    for output in layerOutputs:
        for detection in output:
            scores = detection[5:]
            class_id = np.argmax(scores)
            confidence = scores[class_id]
            if confidence > 0.3:
                center_x = int(detection[0]*width)
                center_y = int(detection[1]*height)
                w = int(detection[2]*width)
                h = int(detection[3]*height)

                x = int(center_x - w/2)
                y = int(center_y - h/2)

                boxes.append([x, y, w, h])
                confidences.append((float(confidence)))
                class_ids.append(class_id)
    
    # Eliminates overlapping detections and returns a list of indexes of the remaining detections
    indexes = cv2.dnn.NMSBoxes(boxes, confidences, 0.2, 0.4)
    
    # Creating the "alert zone" - varies depending on the position/angle of camera
    area_1 = np.array([(250, 673), (850, 673), (700, 420), (630, 420)])
    cv2.polylines(img, [area_1], True, (255, 0, 0), 3)
    
    boxes_ids = tracker.update(boxes)
    cv2.putText(img, "My car speed: " + str(mySpeed) + " mph", (800, 100), font, 2, (0,0,255), 2)
    
    if len(indexes) > 0:
        for i in indexes.flatten():
            x, y, w, h = boxes[i] # Dimensions of bounding box
            label = str(classes[class_ids[i]])
            confidence = str(round(confidences[i],2))
            color = colors[i]
                
            # If object is actually desired
            if label == "car" or label == "bus" or label == "truck" or label == "motorbike": 
                box_id = boxes_ids[i][4]

                bottom_x = int(x + (w/2))
                bottom_y = int(y + h)
                cx = bottom_x
                cy = int(y + (h/2))
                result = cv2.pointPolygonTest(area_1, (bottom_x, bottom_y), True)

                # If vehicle in aforementioned alert zone
                if result >= 0: 
                    pixelWidth = w
                    pixelArea = w * h
                    realDistance = curveFit(pixelArea) 
                    realDistance = round(realDistance, 2) 
                    cv2.putText(img, "Distance from car: " + str(int(realDistance)) + " ft", (800, 150), font, 2, (0, 0, 255), 2)
                    timestamp = time.time()

                    # Adds distance to the corresponding box id in a dictionary
                    if box_id not in distanceDictionary: 
                        distanceDictionary[box_id] = [[realDistance, timestamp]]
                    else:
                        distanceDictionary[box_id].append([realDistance, timestamp])

                    if len(distanceDictionary[box_id]) >= 2: 
                        currentDistance = distanceDictionary[box_id][-1][0] 
                        previousDistance = distanceDictionary[box_id][-2][0] 

                        currentTime = distanceDictionary[box_id][-1][1]
                        previousTime = distanceDictionary[box_id][-2][1]
                        timeDifference = currentTime - previousTime

                        if timeDifference != 0:
                            length = len(distanceDictionary[box_id])
                            if length % frameGap == 0 and length != 0:
                                deltaD = distanceDictionary[box_id][length-1][0] - distanceDictionary[box_id][length-frameGap][0]
                                deltaT = distanceDictionary[box_id][length-1][1] - distanceDictionary[box_id][length-frameGap][1]
                                myDistance2 = ((mySpeed * deltaT)/3600) * 5280 # Distance (ft) that user's car travels in x number of frames
                                theirSpeed = abs(deltaD + myDistance2/deltaT)
                                theirSpeed /= 1.467 # Converts ft/s to mph
                                theirSpeed = round(theirSpeed, 2) # Speed of the trailing car

                                relSpeed = round(theirSpeed - mySpeed, 2)
                                if box_id not in relSpeedDictionary:
                                    relSpeedDictionary[box_id] = [[relSpeed]]
                                else:
                                    relSpeedDictionary[box_id].append([relSpeed])

                        if theirSpeed != 0:
                            print("theirSpeed: " + str(theirSpeed))
                            print("relSpeed: " + str(relSpeed))
                            print("\n")

                            if box_id in relSpeedDictionary:
                                print(relSpeedDictionary)
                                if len(relSpeedDictionary[box_id]) >= 3:
                                    value1 = relSpeedDictionary[box_id][-1][0]
                                    value2 = relSpeedDictionary[box_id][-2][0]
                                    value3 = relSpeedDictionary[box_id][-3][0]
                                    
                                    #Alert if trailing vehicle has a greater relative speed and is accelerating
                                    if value1 > value2 > value3 and value1 > toleranceSpeed and value2 > toleranceSpeed and value3 > toleranceSpeed:
                                        print("realDistance: " + str(realDistance))
                                        print("Be alert!")
                                        sound.play()

                            # Displays graphics onto the frame
                            cv2.rectangle(img, (x,y), (x+w, y+h), color, 2)
                            cv2.circle(img, (bottom_x, bottom_y), 5, color, -1)
                            cv2.circle(img, (cx, cy), 5, color, -1)
                            cv2.putText(img, label + " " + str(int(theirSpeed)) + " mph", (x, y+20), font, 2, (255,255,255), 2)

    cv2.imshow('Image', img)
    key = cv2.waitKey(1) & 0xFF
    
    if key == 27: # Press esc key to stop program
        break
    
    if key == ord('p'): # Press p key to pause program
        cv2.waitKey(-1)
    
cap.release()
cv2.destroyAllWindows()