# Counts people depending on the colour of the traffic ligth

# Imports and setup 

In [1]:
###################### Imports #######################


###############
# Ultralytics #
###############

# !pip install ultralytics

from IPython import display
display.clear_output()

import ultralytics
ultralytics.checks()

######################
# Set Home Directory #
######################

import os
HOME = os.getcwd()
print(HOME)

######################
# Import yolox modek #
######################

# %cd {HOME}
# !git clone https://github.com/ifzhang/ByteTrack.git
# %cd {HOME}/ByteTrack

# # workaround related to https://github.com/roboflow/notebooks/issues/80
# !sed -i 's/onnx==1.8.1/onnx==1.9.0/g' requirements.txt

# !pip3 install -q -r requirements.txt
# !python3 setup.py -q develop
# !pip install -q cython_bbox
# !pip install -q onemetric
# # workaround related to https://github.com/roboflow/notebooks/issues/112 and https://github.com/roboflow/notebooks/issues/106
# !pip install -q loguru lap thop

from IPython import display
display.clear_output()


import sys
sys.path.append(f"{HOME}/ByteTrack")


import yolox
print("yolox.__version__:", yolox.__version__)

##########################
# Import ByteTrack model #
##########################

from yolox.tracker.byte_tracker import BYTETracker, STrack
from onemetric.cv.utils.iou import box_iou_batch
from dataclasses import dataclass


@dataclass(frozen=True)
class BYTETrackerArgs:
    track_thresh: float = 0.25
    track_buffer: int = 30
    match_thresh: float = 0.8
    aspect_ratio_thresh: float = 3.0
    min_box_area: float = 1.0
    mot20: bool = False


######################
# Import Supervision #
######################
    
# !pip install supervision==0.1.0


from IPython import display
display.clear_output()


import supervision
print("supervision.__version__:", supervision.__version__)

from supervision.draw.color import ColorPalette
from supervision.draw.color import Color
from supervision.geometry.dataclasses import Point
from supervision.video.dataclasses import VideoInfo
from supervision.video.source import get_video_frames_generator
from supervision.video.sink import VideoSink
from supervision.notebook.utils import show_frame_in_notebook
from supervision.tools.detections import Detections, BoxAnnotator
from supervision.tools.line_counter import LineCounter, LineCounterAnnotator

####################### Setups #######################

######################
# ByteTrack settings #
######################

from typing import List

import numpy as np


# converts Detections into format that can be consumed by match_detections_with_tracks function
def detections2boxes(detections: Detections) -> np.ndarray:
    return np.hstack((
        detections.xyxy,
        detections.confidence[:, np.newaxis]
    ))


# converts List[STrack] into format that can be consumed by match_detections_with_tracks function
def tracks2boxes(tracks: List[STrack]) -> np.ndarray:
    return np.array([
        track.tlbr
        for track
        in tracks
    ], dtype=float)


# matches our bounding boxes with predictions
def match_detections_with_tracks(
    detections: Detections,
    tracks: List[STrack]
) -> Detections:
    if not np.any(detections.xyxy) or len(tracks) == 0:
        return np.empty((0,))

    tracks_boxes = tracks2boxes(tracks=tracks)
    iou = box_iou_batch(tracks_boxes, detections.xyxy)
    track2detection = np.argmax(iou, axis=1)

    tracker_ids = [None] * len(detections)

    for tracker_index, detection_index in enumerate(track2detection):
        if iou[tracker_index, detection_index] != 0:
            tracker_ids[detection_index] = tracks[tracker_index].track_id

    return tracker_ids


##################
# YoloX settings #
##################
from ultralytics import YOLO

MODEL = "yolov8x.pt"


model = YOLO(MODEL)
model.fuse()

Ultralytics YOLOv8.1.18 🚀 Python-3.10.12 torch-2.2.1+cu121 CUDA:0 (NVIDIA GeForce RTX 3060, 12042MiB)
Setup complete ✅ (24 CPUs, 31.1 GB RAM, 80.0/878.6 GB disk)


# Paths and other constants

In [11]:
###################### Paths #######################

SOURCE_VIDEO_PATH = "/media/alex/F882-9E28/Videos/00001-converted.mp4"

TARGET_VIDEO_PATH = f"{HOME}/vehicle-counting-result.mp4"

print(f"SOURCE_VIDEO_PATH: {SOURCE_VIDEO_PATH}")
print(f"TARGET_VIDEO_PATH: {TARGET_VIDEO_PATH}")

VideoInfo.from_video_path(SOURCE_VIDEO_PATH)

###################### Class constants #######################

# dict maping class_id to class_name
CLASS_NAMES_DICT = model.model.names
# class_ids of interest - person, car, motorcycle, bus and truck
CLASS_ID = [0, 2, 3, 5, 7]
CLASS_ID_PEOPLE = [0]
CLASS_ID_VEHICLE = [2, 3, 5, 7]
CLASS_ID_TRAFFIC_LIGHT = [9]

SOURCE_VIDEO_PATH: /media/alex/F882-9E28/Videos/00001-converted.mp4
TARGET_VIDEO_PATH: /home/alex/Documents/Licenta-main/Licenta/vehicle-counting-result.mp4


VideoInfo(width=1440, height=1080, fps=25, total_frames=2559)

# Counting algorithm

### Setup of traffic light classifier

In [None]:
# !pip install traffic_light_classifier

import traffic_light_classifier as tlc


model_traffic_light = tlc.Model()
model_traffic_light.compile()

###################### Functions #######################

#####################################################
# HSV color space ranges for red, yellow, and green #
#####################################################

import cv2
import numpy as np

def predict_traffic_light_color(frame):
    # Convert to HSV color space
    hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
    # Define hue ranges for red, yellow, and green (these ranges might need adjustment)
    red_lower1 = np.array([0, 100, 100])
    red_upper1 = np.array([160, 100, 100])
    red_lower2 = np.array([160, 50, 50])
    red_upper2 = np.array([180, 255, 255])
    yellow_lower = np.array([20, 50, 50])
    yellow_upper = np.array([30, 255, 255])
    green_lower = np.array([40, 50, 50])
    green_upper = np.array([90, 255, 255])
    # Count pixels within each color range
    red_pixels = (cv2.inRange(hsv, red_lower1, red_upper1) > 0).sum() + (cv2.inRange(hsv, red_lower2, red_upper2) > 0).sum()
    yellow_pixels = (cv2.inRange(hsv, yellow_lower, yellow_upper) > 0).sum()
    green_pixels = (cv2.inRange(hsv, green_lower, green_upper) > 0).sum()
    # Determine the most prominent color
    print(f"red_pixels: {red_pixels}, yellow_pixels: {yellow_pixels}, green_pixels: {green_pixels}")
    if red_pixels <= 30 and yellow_pixels <= 30 and green_pixels <= 30:
        return "Unknown"
    if max(red_pixels, yellow_pixels, green_pixels) == red_pixels:
        return "Red"
    elif max(red_pixels, yellow_pixels, green_pixels) == yellow_pixels:
        return "Yellow"
    else:
        return "Green"


#### Select counting lines coodrinates

In [33]:
###################### Select line #######################

import cv2
import numpy as np

# Initialize global variables
points = []  # To store the points where you click
lines = []

# Callback function for mouse events
def click_event(event, x, y, flags, param):
    global points, img, scaleFactorX, scaleFactorY
    if event == cv2.EVENT_LBUTTONDOWN:  # Left button click
        if len(points) < 2:  # Ensure we only have 2 points
            # Adjust x, y back to original image scale
            origX = int(x * scaleFactorX)
            origY = int(y * scaleFactorY)
            points.append((origX, origY))
            cv2.circle(resized_img, (x, y), 5, (0, 0, 255), -1)  # Draw the dot on resized image
            if len(points) == 2:
                lines.append((points[0], points[1]))
                cv2.line(resized_img, 
                    (int(points[0][0] // scaleFactorX), int(points[0][1] // scaleFactorY)),
                    (int(points[1][0] // scaleFactorX), int(points[1][1] // scaleFactorY)), 
                    (255, 0, 0), 2)  # Draw the line on resized image
                print(f"Point 1: {points[0]}, Point 2: {points[1]}")  # Print coordinates of original points
                cv2.imshow("image", resized_img)  # Show the image with the line
                
    if len(points) == 2:  # Reset after 2 points for new line drawing
        points.clear()



# Create a black image
generator = get_video_frames_generator(SOURCE_VIDEO_PATH)
# create instance of BoxAnnotator
box_annotator = BoxAnnotator(color=ColorPalette(), thickness=4, text_thickness=4, text_scale=2)
# acquire first video frame
iterator = iter(generator)
frame = next(iterator)
# Resize the image to 200x200
resized_img = cv2.resize(frame, (1000, 800))

# Calculate scale factors
originalHeight, originalWidth = frame.shape[:2]
scaleFactorX = originalWidth / 1000
scaleFactorY = originalHeight / 800

cv2.namedWindow("image")
cv2.setMouseCallback("image", click_event)

cv2.imshow("image", resized_img)
cv2.waitKey(0)
cv2.destroyAllWindows()

line_start = Point(lines[0][0][0], lines[0][0][1])
line_end = Point(lines[0][1][0], lines[0][1][1])

line_cars_start = Point(lines[1][0][0], lines[1][0][1])
line_cars_end = Point(lines[1][1][0], lines[1][1][1])

print(f"line_start: {line_start}, line_end: {line_end}")
print(f"line_cars_start: {line_cars_start}, line_cars_end: {line_cars_end}")

Point 1: (325, 880), Point 2: (907, 832)
Point 1: (480, 514), Point 2: (904, 1073)
line_start: Point(x=325, y=880), line_end: Point(x=907, y=832)
line_cars_start: Point(x=480, y=514), line_cars_end: Point(x=904, y=1073)


### Predict on whole video

In [44]:
from tqdm.notebook import tqdm
import cv2
from numpy import argmax


def annotate_with_counts(frame, people_count_green, vehicles_count_green, people_count_red, vehicles_count_red, font_scale=1, thickness=2):
    # Set the position for the annotations on the frame
    position_people_green = (10, 30)
    position_vehicles_green = (10, 60)
    position_people_red = (1000, 30)
    position_vehicles_red = (1000, 60)
    font = cv2.FONT_HERSHEY_SIMPLEX
    cv2.putText(frame, f"People crossed on green: {people_count_green}", position_people_green, font, font_scale, (255, 255, 0), thickness)
    cv2.putText(frame, f"Vehicles crossed on green: {vehicles_count_green}", position_vehicles_green, font, font_scale, (0, 255, 0), thickness)
    cv2.putText(frame, f"People crossed on red: {people_count_red}", position_people_red, font, font_scale, (0, 255, 255), thickness)
    cv2.putText(frame, f"Vehicles crossed on red: {vehicles_count_red}", position_vehicles_red, font, font_scale, (255, 0, 255), thickness)


# create BYTETracker instance
byte_tracker = BYTETracker(BYTETrackerArgs())
# create VideoInfo instance
video_info = VideoInfo.from_video_path(SOURCE_VIDEO_PATH)
# create frame generator
generator = get_video_frames_generator(SOURCE_VIDEO_PATH)
# create LineCounter instance
line_counter_people = LineCounter(start=line_start, end=line_end)
line_counter_vehicles = LineCounter(start=line_cars_start, end=line_cars_end)
line_counter_people_green = LineCounter(start=line_start, end=line_end)
line_counter_people_red = LineCounter(start=line_start, end=line_end)
line_counter_vehicles_green = LineCounter(start=line_cars_start, end=line_cars_end)
line_counter_vehicles_red = LineCounter(start=line_cars_start, end=line_cars_end)
# create instance of BoxAnnotator and LineCounterAnnotator
box_annotator = BoxAnnotator(color=ColorPalette(), thickness=3, text_thickness=4, text_scale=2)
line_annotator_people = LineCounterAnnotator(thickness=3, text_thickness=4, text_scale=1)
line_annotator_vehicle = LineCounterAnnotator(thickness=3, text_thickness=4, text_scale=1, color=Color(0, 255, 0))

# open target video file
with VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
    # loop over video frames
    for frame in tqdm(generator, total=video_info.total_frames):
        # model prediction on single frame and conversion to supervision Detections
        results = model(frame)
        detections = Detections(
            xyxy=results[0].boxes.xyxy.cpu().numpy(),
            confidence=results[0].boxes.conf.cpu().numpy(),
            class_id=results[0].boxes.cls.cpu().numpy().astype(int)
        )
        detections_people = Detections(
            xyxy=results[0].boxes.xyxy.cpu().numpy(),
            confidence=results[0].boxes.conf.cpu().numpy(),
            class_id=results[0].boxes.cls.cpu().numpy().astype(int)
        )
        detections_vehicles = Detections(
            xyxy=results[0].boxes.xyxy.cpu().numpy(),
            confidence=results[0].boxes.conf.cpu().numpy(),
            class_id=results[0].boxes.cls.cpu().numpy().astype(int)
        )
        detections_traffic_light = Detections(
            xyxy=results[0].boxes.xyxy.cpu().numpy(),
            confidence=results[0].boxes.conf.cpu().numpy(),
            class_id=results[0].boxes.cls.cpu().numpy().astype(int)
        )
        # filtering out detections with unwanted classes
        mask_people = np.array([class_id in CLASS_ID_PEOPLE for class_id in detections.class_id], dtype=bool)
        mask_vehicle = np.array([class_id in CLASS_ID_VEHICLE for class_id in detections.class_id], dtype=bool)
        mask_traffic_light= np.array([class_id in CLASS_ID_TRAFFIC_LIGHT for class_id in detections.class_id], dtype=bool)
        detections_people.filter(mask=mask_people, inplace=True)
        detections_vehicles.filter(mask=mask_vehicle, inplace=True)
        detections_traffic_light.filter(mask=mask_traffic_light, inplace=True)
        print("DEBUG", len(detections_people.class_id))
        mask = np.array([class_id in CLASS_ID for class_id in detections.class_id], dtype=bool)
        detections.filter(mask=mask, inplace=True)
        # tracking detections
        tracks_people = byte_tracker.update(
            output_results=detections2boxes(detections=detections_people),
            img_info=frame.shape,
            img_size=frame.shape
        )
        tracks_vehicles = byte_tracker.update(
            output_results=detections2boxes(detections=detections_vehicles),
            img_info=frame.shape,
            img_size=frame.shape
        )
        tracks = byte_tracker.update(
            output_results=detections2boxes(detections=detections),
            img_info=frame.shape,
            img_size=frame.shape
        )
        tracker_id = match_detections_with_tracks(detections=detections, tracks=tracks)
        tracker_id_people = match_detections_with_tracks(detections=detections_people, tracks=tracks_people)
        tracker_id_vehicles = match_detections_with_tracks(detections=detections_vehicles, tracks=tracks_vehicles)
        detections.tracker_id = np.array(tracker_id)
        detections_people.tracker_id = np.array(tracker_id_people)
        detections_vehicles.tracker_id = np.array(tracker_id_vehicles)
        # filtering out detections without trackers
        mask = np.array([tracker_id is not None for tracker_id in detections.tracker_id], dtype=bool)
        mask_people = np.array([tracker_id is not None for tracker_id in detections_people.tracker_id], dtype=bool)
        mask_vehicles = np.array([tracker_id is not None for tracker_id in detections_vehicles.tracker_id], dtype=bool)
        detections.filter(mask=mask, inplace=True)
        detections_people.filter(mask=mask_people, inplace=True)
        detections_vehicles.filter(mask=mask_vehicles, inplace=True)

        # Cut the box around the traffic light
        for detection in detections_traffic_light.xyxy:
            %matplotlib inline
            x1, y1, x2, y2 = detection.astype(int)
            frame1 = frame[y1:y2, x1:x2]


            # Predict the color of the traffic light
            colours = ["Red", "Yellow", "Green"]
            color = predict_traffic_light_color(frame1)
            traffic_light_color = "Unknown"
            # print(f"Traffic light color: {color}")
            if color != "Unknown":
                frame1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2RGB)
                frame1 = cv2.resize(frame1, (32, 32))
                if frame1.size == 0:
                    raise ValueError("The cropped image is empty.")
                pred = model_traffic_light.predict(frame1, show_analysis=False)
                print(f"pred: {pred}")
                if colours[argmax(pred)] != color:
                    print(f"!!! hsv: {color}, nn: {colours[argmax(pred)]}")
                print(f"Traffic light color: {colours[argmax(pred)]}")
                traffic_light_color = colours[argmax(pred)]
            else:
                print("Traffic light color: not visible")

        # updating line counter if the traffic light is green
        if traffic_light_color == "Green":
            line_counter_people_green.update(detections=detections_people)
            line_counter_vehicles_red.update(detections=detections_vehicles)
        elif traffic_light_color == "Red":
            line_counter_people_red.update(detections=detections_people)
            line_counter_vehicles_green.update(detections=detections_vehicles)

        line_counter_people.update(detections=detections_people)
        line_counter_vehicles.update(detections=detections_vehicles)


        # annotate and display frame
        line_annotator_people.annotate(frame=frame, line_counter=line_counter_people)
        line_annotator_vehicle.annotate(frame=frame, line_counter=line_counter_vehicles)
        annotate_with_counts(frame, line_counter_people_green.in_count + line_counter_people_green.out_count, 
                             line_counter_vehicles_green.in_count + line_counter_vehicles_green.out_count, 
                             line_counter_people_red.in_count + line_counter_people_red.out_count, 
                             line_counter_vehicles_red.in_count + line_counter_vehicles_red.out_count)
        sink.write_frame(frame)

  0%|          | 0/2559 [00:00<?, ?it/s]




0: 480x640 15 persons, 6 cars, 2 traffic lights, 1 stop sign, 33.9ms
Speed: 13.3ms preprocess, 33.9ms inference, 1.1ms postprocess per image at shape (1, 3, 480, 640)
red_pixels: 175, yellow_pixels: 0, green_pixels: 4
pred: [1, 0, 0]
Traffic light color: Red
red_pixels: 0, yellow_pixels: 0, green_pixels: 0
Traffic light color: not visible

0: 480x640 16 persons, 6 cars, 2 traffic lights, 1 stop sign, 1 suitcase, 33.3ms
Speed: 1.2ms preprocess, 33.3ms inference, 0.7ms postprocess per image at shape (1, 3, 480, 640)
red_pixels: 176, yellow_pixels: 0, green_pixels: 0
pred: [1, 0, 0]
Traffic light color: Red
red_pixels: 0, yellow_pixels: 0, green_pixels: 0
Traffic light color: not visible

0: 480x640 15 persons, 6 cars, 1 traffic light, 1 stop sign, 1 suitcase, 33.0ms
Speed: 1.1ms preprocess, 33.0ms inference, 0.6ms postprocess per image at shape (1, 3, 480, 640)
red_pixels: 161, yellow_pixels: 0, green_pixels: 0
pred: [1, 0, 0]
Traffic light color: Red

0: 480x640 16 persons, 6 cars, 2 tr