# Imports and loading needed libraries

In [None]:
!pip install supervision==0.1.0
from IPython import display
display.clear_output()
import supervision

In [None]:
!git clone https://github.com/DatumLearning/SSD_using_OpenCV.git

fatal: destination path 'SSD_using_OpenCV' already exists and is not an empty directory.


In [None]:
import os
HOME = os.getcwd()
print(HOME)
%cd {HOME}
!wget --load-cookies /tmp/cookies.txt "https://docs.google.com/uc?export=download&confirm=$(wget --quiet --save-cookies /tmp/cookies.txt --keep-session-cookies --no-check-certificate 'https://docs.google.com/uc?export=download&id=1pz68D1Gsx80MoPg-_q-IbEdESEmyVLm-' -O- | sed -rn 's/.*confirm=([0-9A-Za-z_]+).*/\1\n/p')&id=1pz68D1Gsx80MoPg-_q-IbEdESEmyVLm-" -O vehicle-counting.mp4 && rm -rf /tmp/cookies.txt

/content
/content
--2024-02-11 09:14:57--  https://docs.google.com/uc?export=download&confirm=&id=1pz68D1Gsx80MoPg-_q-IbEdESEmyVLm-
Resolving docs.google.com (docs.google.com)... 74.125.143.100, 74.125.143.139, 74.125.143.102, ...
Connecting to docs.google.com (docs.google.com)|74.125.143.100|:443... connected.
HTTP request sent, awaiting response... 303 See Other
Location: https://drive.usercontent.google.com/download?id=1pz68D1Gsx80MoPg-_q-IbEdESEmyVLm-&export=download [following]
--2024-02-11 09:14:58--  https://drive.usercontent.google.com/download?id=1pz68D1Gsx80MoPg-_q-IbEdESEmyVLm-&export=download
Resolving drive.usercontent.google.com (drive.usercontent.google.com)... 142.251.18.132, 2a00:1450:4013:c18::84
Connecting to drive.usercontent.google.com (drive.usercontent.google.com)|142.251.18.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 35345757 (34M) [video/mp4]
Saving to: ‘vehicle-counting.mp4’


2024-02-11 09:14:59 (205 MB/s) - ‘vehicle-counting

In [None]:
import cv2
import numpy as np
from sklearn.cluster import KMeans
import webcolors
from scipy.spatial.distance import cdist
import matplotlib.pyplot as plt
from supervision.draw.color import ColorPalette
from supervision.geometry.dataclasses import Point
from supervision.video.dataclasses import VideoInfo
from supervision.video.source import get_video_frames_generator
from supervision.video.sink import VideoSink
from supervision.notebook.utils import show_frame_in_notebook
from supervision.tools.detections import Detections, BoxAnnotator
from supervision.tools.line_counter import LineCounter, LineCounterAnnotator
from tqdm import tqdm

# Methods to detect color

In [None]:
def get_dominant_color(image, k=5):

    image_hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    pixels = image_hsv.reshape((-1, 3))

    clt = KMeans(n_clusters=k, n_init=25)
    clt.fit(pixels)

    hist = np.bincount(clt.labels_)
    dominant_color = clt.cluster_centers_[hist.argmax()]

    return dominant_color


def map_color_to_category(color_hsv):

    color_ranges = {
        "red": ((0, 50, 50), (10, 255, 255)),
        "blue": ((110, 50, 50), (130, 255, 255)),
        "green": ((50, 50, 50), (70, 255, 255)),
        'white': ((0, 0, 200), (180, 25, 255)),
        'black': ((0, 0, 0), (180, 255, 50)),
    }

    for color_name, (lower, upper) in color_ranges.items():
        lower = np.array(lower, dtype=np.uint8)
        upper = np.array(upper, dtype=np.uint8)
        if cv2.inRange(np.array([[color_hsv]], dtype=np.uint8), lower, upper).any():
            return color_name

    return "unknown"

def get_car_color(frame, bbox):

    x1, y1, x2, y2 = map(int, bbox)
    roi = frame[y1:y2, x1:x2]


    dominant_color = get_dominant_color(roi, k=5)


    color_category = map_color_to_category(dominant_color)

    return color_category

# Loop helper to display detected colors

In [None]:
def update_color_counts_display(frame, color_counts, display_start_pos):
    for i, (color, count) in enumerate(color_counts.items()):
        text_color = (255, 255, 255)
        cv2.putText(frame, f"{color}: {count}", (display_start_pos[0], display_start_pos[1] + 60 * i), cv2.FONT_HERSHEY_SIMPLEX, 2, text_color, 2)

# Variables declaration

In [None]:
# To keep track on cars
tracked_cars = []
car_id_counter = 0

# To keep track on colors
color_counts = {}
tracked_car_ids = set()


TARGET_VIDEO_PATH = '/content/vehicle-counting.mp4'

# Loading the model and preparing the model

In [None]:
# Load the model
proto = "./SSD_using_OpenCV/MobileNetSSD_deploy.prototxt"
weights = "./SSD_using_OpenCV/MobileNetSSD_deploy.caffemodel"
net = cv2.dnn.readNetFromCaffe(proto, weights)

In [None]:
# Detection classes of intrest
class_names_dict = {"2": "car", "7": "truck"}
class_ids = [2,7]

# OpenCV preperation

In [None]:
# Settings
cap = cv2.VideoCapture(TARGET_VIDEO_PATH)

# Input video info
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# Output video settings
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('/content/output_video.mp4', fourcc, fps, (frame_width, frame_height))

# For tqdm to display the progress as for the total
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

# Looping and counting objects
## -Creating and reading the video
## -Tracking objects

In [None]:
for _ in tqdm(range(total_frames), desc="Processing Video Frames"):
    ret, frame = cap.read()
    if not ret:
        break

    frame_resized = cv2.resize(frame, (300, 300))
    blob = cv2.dnn.blobFromImage(frame_resized, 0.007843, (300, 300), (127.5, 127.5, 127.5), False)
    net.setInput(blob)
    detections = net.forward()

    current_frame_detections = []

    for i in range(detections.shape[2]):
        confidence = detections[0, 0, i, 2]
        if confidence > 0.1:
            class_id = int(detections[0, 0, i, 1])
            if class_id in [2, 7]:
                box = detections[0, 0, i, 3:7] * np.array([frame_width, frame_height, frame_width, frame_height])
                (x1, y1, x2, y2) = box.astype("int")
                center = np.array([(x1+x2)/2, (y1+y2)/2])

                current_frame_detections.append((center, class_id))


    if tracked_cars:

        tracked_positions = [car['position'] for car in tracked_cars]
        detected_positions = [det[0] for det in current_frame_detections]

        detected_positions = np.array(detected_positions).reshape(-1, 2)
        tracked_positions = np.array(tracked_positions).reshape(-1, 2)

        distances = cdist(detected_positions, tracked_positions)

        for det_idx, det in enumerate(current_frame_detections):
            closest_tracked_idx = np.argmin(distances[det_idx])
            distance = distances[det_idx, closest_tracked_idx]

            if distance < 100:
                tracked_cars[closest_tracked_idx]['position'] = det[0]
                car_id = tracked_cars[closest_tracked_idx]['id']
                if car_id not in tracked_car_ids:
                    tmp_car_color = get_car_color(frame, box)
                    color_counts[tmp_car_color] = color_counts.get(tmp_car_color, 0) + 1
                    tracked_car_ids.add(car_id)

            else:
                car_id = car_id_counter
                car_id_counter += 1
                tracked_cars.append({'id': car_id, 'position': det[0]})

            box = detections[0, 0, det_idx, 3:7] * np.array([frame_width, frame_height, frame_width, frame_height])
            (x1, y1, x2, y2) = box.astype("int")
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            car_color = get_car_color(frame, box)
            label = f"{class_names_dict[str(det[1])]} ID:{car_id} Color:{car_color}"
            cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 3, (0, 255, 0), 2)

    else:
        for det in current_frame_detections:
            car_id = car_id_counter
            car_id_counter += 1
            tracked_cars.append({'id': car_id, 'position': det[0]})

    cv2.putText(frame, f"Cars Detected: {len(tracked_cars)}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 255, 0), 2)
    update_color_counts_display(frame, color_counts, display_start_pos=((frame_width - 400, 50)))


    out.write(frame)

    #cv2.imshow("Frame", frame)

    #if cv2.waitKey(25) & 0xFF == ord('q'):
    #    break

cap.release()
out.release()
cv2.destroyAllWindows()

Processing Video Frames: 100%|██████████| 538/538 [51:54<00:00,  5.79s/it]
