In [1]:
from ultralytics import YOLO
from ultralytics.solutions import object_counter
import cv2
import pandas as pd
import time

# Load YOLO models
models = [YOLO("yolov8n.pt") for _ in range(4)]

# Open video captures for each lane
caps = [cv2.VideoCapture(f"lane{i + 1}.webm") for i in range(4)]

# Define region points for three lanes spanning half of the screen
region_points = [
    [(20, 400), (540, 400), (520, 740), (20, 740)],  # Left half of the screen
    [(540, 400), (1060, 400), (1040, 740), (540, 740)],  # Middle half of the screen
    [(20, 400), (1060, 400), (1040, 740), (20, 740)]  # Right half of the screen
]

# Classes to count
classes_to_count=[1,2,5,7]
print(models[0].names)
def filter_dict_by_keys(input_dict, keys_list):
    return {key: input_dict[key] for key in keys_list}
 

# Initialize Object Counters for each lane
def initialize_counters(model, region_points):
    counters = []
    for points in region_points:
        counter = object_counter.ObjectCounter()
        counter.set_args(view_img=False, reg_pts=points, classes_names
            =filter_dict_by_keys(model.names,classes_to_count) , draw_tracks=True)
        counters.append(counter)
    return counters

counters = [initialize_counters(model, region_points) for model in models] 

# Define a function to process each lane
def process_lane(cap, model, counters, dist):
    while cap.isOpened():
        success, im0 = cap.read()
        if not success:
            print("Video frame is empty or video processing has been successfully completed.")
        im0 = cv2.resize(im0, (1080, 750))
        timer=0
        for counter, points in zip(counters, region_points):
            tracks = model.track(im0, persist=True, show=False, classes=classes_to_count)
            im0 = counter.start_counting(im0, tracks)
            px = pd.DataFrame(tracks[0].boxes.data.detach().cpu().numpy()).astype("float")
            counts = {0: 0, 1: 0, 2: 0, 3: 0, 5: 0, 7: 0}
            for _, row in px.iterrows():
                class_num = row[6]
                if class_num in classes_to_count:
                    counts[class_num] += 1 
            timer = timer+sum([
                2.5 + dist if counts.get(2, 0) != 0 else 0,
                3.5 + dist if counts.get(3, 0) != 0 else 0,
                2.5 + dist if counts.get(5, 0) != 0 else 0,
                3.5 + dist if counts.get(7, 0) != 0 else 0
            ])
            if success == True: 
            # Display the resulting frame 
                cv2.imshow('Frame', im0) 
          
            # Press Q on keyboard to exit 
            if cv2.waitKey(25) & 0xFF == ord('q'): 
                break
  
            # Break the loop 
            else: 
                break
        print(timer)
        yield timer

# Define a token passing function
def timerTokenPasser(timer_gen):
    while True:
        t1 = next(timer_gen)
        print("Lane t1 is open. Passing token to t2...")
        time.sleep(min(t1/3 ,20) )
        t2 = next(timer_gen)
        print("Lane t2 is open. Passing token to t3...")
        time.sleep(min(t2/3,20))
        t3 = next(timer_gen)
        print("Lane t3 is open. Passing token to t4...")
        time.sleep(min(t3/3,20))
        t4 = next(timer_gen)
        print("Lane t4 is open. Passing token to t1...")
        time.sleep(min(t4/3,20))

# Process each lane and collect the timer values
timer_gens = [process_lane(caps[i], models[i], counters[i], 2 * i)  for i in range(4)]

# Pass the generator functions to timerTokenPasser
for timer_gen in timer_gens:
    timerTokenPasser(timer_gen)

# Release resources
for cap in caps:
    cap.release()

cv2.destroyAllWindows()


{0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard', 67: 'cell phone', 68: 'microw

error: OpenCV(4.9.0) D:\a\opencv-python\opencv-python\opencv\modules\imgproc\src\resize.cpp:4152: error: (-215:Assertion failed) !ssize.empty() in function 'cv::resize'


In [4]:
from ultralytics import YOLO
from ultralytics.solutions import object_counter
import cv2
import pandas as pd
import time

# Load YOLO models
models = [YOLO("yolov8n.pt") for _ in range(4)]

# Open video captures for each lane
caps = [cv2.VideoCapture("highway.mp4") for _ in range(4)]

# Check if video captures are opened successfully
for cap in caps:
    assert cap.isOpened(), "Error reading video file"

# Define region points for three lanes spanning half of the screen
region_points = [
    [(20, 400), (540, 400), (520, 740), (20, 740)],     # Left half of the screen
    [(540, 400), (1060, 400), (1040, 740), (540, 740)], # Middle half of the screen
    [(20, 400), (1060, 400), (1040, 740), (20, 740)]    # Right half of the screen
]

# Initialize Object Counters for each lane and each region
counters = [[[object_counter.ObjectCounter() for _ in range(3)] for _ in range(3)] for _ in range(4)]
for i in range(4):
    for j in range(3):
        for k in range(3):
            counters[i][j][k].set_args(view_img=False, reg_pts=region_points[k], classes_names=models[i].names, draw_tracks=True)

# Define classes to count
classes_to_count = [2, 5, 7]  # Car, Motorcycle, Truck

# Define a function to process each lane
def process_lane(cap, models, counters):
    while cap.isOpened():
        success, im0 = cap.read()
        im0 = cv2.resize(im0, (1080, 750))
        if not success:
            print("Video frame is empty or video processing has been successfully completed.")
            break

        timers = []

        # Iterate over models, counters, and regions
        for i, (model, cnts) in enumerate(zip(models, counters)):
            for j, cnts_region in enumerate(cnts):
                counts = {0: 0, 1: 0, 2: 0, 3: 0, 5: 0, 7: 0}  # Initialize counters for the current region

                # Process frame with the current model and counter
                tracks = model.track(im0, persist=True, show=False, classes=classes_to_count)
                im0 = cnts_region[j].start_counting(im0, tracks)
                boxes_data = tracks[0].boxes.data.detach().cpu().numpy()

                # Iterate over detected objects and update counts
                for box in boxes_data:
                    class_num = int(box[6])
                    if class_num in classes_to_count:
                        counts[class_num] += 1

                # Calculate timer based on counts
                timer = (counts.get(2, 0) * 7) + (counts.get(3, 0) * 5) + (counts.get(5, 0) + counts.get(7, 0)) * 10
                timers.append(timer)

                print(f"Lane {i+1}, Region {j+1} Counts:", counts)

        # Yield timer value
        yield timers

# Define a token passing function
def token_passer(timer_gens):
    while True:
        for timer_gen in timer_gens:
            timers = next(timer_gen)
            total_time = sum(timers)
            print(f"Total time for passing token: {total_time} seconds")
            time.sleep(total_time)

# Generate timer generators for each lane
timer_gens = [process_lane(caps[i], models, counters[i]) for i in range(4)]

# Start token passing
# token_passer(timer_gens)

# Release resources
for cap in caps:
    cap.release()

cv2.destroyAllWindows()


Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.
Region Counter Initiated.


In [6]:
import cv2
import numpy as np

# Given coordinates
x1 = 1200
y1 = 1024

# Calculate the center of the screen
center_x = x1 // 2
center_y = y1 // 2

# Calculate the coordinates for dividing the quarter
div_x1 = center_x
div_x2 = x1

# Calculate the height of the quarter
quarter_height = y1 - center_y

# Calculate the height of each part
part_height = quarter_height // 3

# Define region points for the lower-right quarter and its divisions
region_points = [(center_x, center_y), (x1, center_y), (x1, y1), (center_x, y1)]
division1_points = [(center_x, center_y), (x1, center_y), (x1, center_y + part_height), (center_x, center_y + part_height)]
division2_points = [(center_x, center_y + part_height), (x1, center_y + part_height), (x1, center_y + 2 * part_height), (center_x, center_y + 2 * part_height)]
division3_points = [(center_x, center_y + 2 * part_height), (x1, center_y + 2 * part_height), (x1, y1), (center_x, y1)]

# Create an image to visualize the regions
img = np.zeros((y1, x1, 3), dtype=np.uint8)

# Draw the region rectangles
cv2.polylines(img, [np.array(region_points)], isClosed=True, color=(255, 0, 0), thickness=2)
cv2.polylines(img, [np.array(division1_points)], isClosed=True, color=(0, 255, 0), thickness=2)
cv2.polylines(img, [np.array(division2_points)], isClosed=True, color=(0, 0, 255), thickness=2)
cv2.polylines(img, [np.array(division3_points)], isClosed=True, color=(255, 255, 0), thickness=2)

# Display the image
cv2.imshow("Quarter with Divisions", img)
cv2.waitKey(0)
cv2.destroyAllWindows()
