In [1]:
import cv2
import os
import supervision as sv 
from ultralytics import YOLO
import numpy as np 
from enum import Enum

In [2]:
VIDEO_PATH = r"../../data/raw/videos/cctv/LeaveNGoInBus1.mp4"
MODEL_PATH = r"model/yolov8l.pt"
LEG_DETECTOR_PATH = r"../../results/weights/best.pt"
TOLERANCE = 10
REGION = {
    "5": 671,
    "4": 1761,
}

model = YOLO(MODEL_PATH)
tracker = sv.ByteTrack()
colorAnnotator = sv.ColorAnnotator()

class BusStatus(Enum):
    NO_DETECT = "No bus detected" # wont be used, if dont have bus then centroid will be empty 
    LEAVE_BUS = "Passengers on the bus are getting off"
    WAITING = "Wait for passengers to board" # wait when have ppl up / after ppl down / after resting 
    RESTING = "The bus driver is taking a break"
    DETECTING = "Detecting"
    LEAVING = "The bus is leaving for the next stop"
    
    def __str__(self) :
        return self.value

In [3]:
# use to store the bus current location id : {centroid, numberMatch, station, status}
busCentroid = {} 
avgLeg = []

In [4]:
def isBusExist(result) :
    return True if len(result[0].boxes.xyxy) > 0 else False

# clear when the bus is leaving  
def clearBusLocation() : 
    global busCentroid
    busCentroid = {}

def isBusCentroidExist(busId, centroid = busCentroid) : 
    return busId in centroid

# match the tolerance windown
def isWithinRange(currentCentroid, prevCentroid, tolerance = TOLERANCE) : 
    # calculate the euclidean distance between current and previous  
    euclideanDistance = np.linalg.norm(prevCentroid - currentCentroid)
    return euclideanDistance <= tolerance

def updateBusStatus(busId, status) : 
    global busCentroid 
    busCentroid[busId]["status"] = status

def getRegion(centroidX) : 
    for x in REGION :
        if centroidX < REGION[x] : 
            return x
    # outside station 4 and 5 
    return None 

def isLeave(bbox) : 
    # check bus xyxy is outside the x of station 4
    # if outside means already leave 
    # bbox[3] = bottom right corner 
    return bbox[2] >= REGION["4"]

In [5]:
# check filepath exist 
if not os.path.exists(VIDEO_PATH) : 
    # not exist raise an error 
    raise FileNotFoundError("Video doesnot exist")

# read video 
cap = cv2.VideoCapture(VIDEO_PATH)

while cap.isOpened() : 
    ret, frame = cap.read()

    if not ret : 
        # target resource have problem, unable to continue support execute 
        raise FileNotFoundError("Video doesnot exist")

    # detect bus 
    result = model(frame, classes=[5])[0]

    # check bus exist 
    if isBusExist(result) : 
        # bus exist - check moving 
        detections = sv.Detections.from_ultralytics(result[0])
        tracks = tracker.update_with_detections(detections)
        annotatedFrame = colorAnnotator.annotate(scene=frame.copy(), detections=detections)

        # loop track get centroid
        for track in tracks : 
            # get id and boundary box 
            busId = track[3]
            bbox = track[0]

            # calculate center 
            centerX = (bbox[0] + bbox[2])/2
            centerY = (bbox[1] + bbox[3])/2

            currentLocation = np.array([centerX, centerY])

            if not isBusCentroidExist(busId) : 
                # busid doesnot exist in the dictionary - create one 
                # init 
                busCentroid[busId] = {}
                busCentroid[busId]["centroid"] = currentLocation
                busCentroid[busId]["numberMatch"] = 0
                busCentroid[busId]["station"] = None
                updateBusStatus(busId, BusStatus.DETECTING)
            else :  
                # compare busCentroid, if same for 10 frame means stopping, else store the busCentroid and status = moving 
                # counter 
                prevLocation = busCentroid[busId]["centroid"]
                if isWithinRange(currentCentroid=currentLocation, prevCentroid=prevLocation) : 
                    # within the tolerance window 
                    # update the value 
                    busCentroid[busId]["numberMatch"] += 1

                    if busCentroid[busId]["numberMatch"] >= 10 : 
                        # means stopping 
                        updateBusStatus(busId, BusStatus.WAITING)
                    else : 
                        # not reach 10 
                        updateBusStatus(busId, BusStatus.DETECTING)
                else : 
                    # because not match means is moving so clear the numberMatch 
                    busCentroid[busId]["numberMatch"] = 0 
                    busCentroid[busId]["centroid"] = currentLocation

                    # check is leave or just only moving 
                    if busCentroid[busId]["status"] == BusStatus.WAITING : 
                        # if previous data is waiting means now is leave 
                        updateBusStatus(busId, BusStatus.LEAVING)
                    else : 
                        # just moving 
                        updateBusStatus(busId, BusStatus.DETECTING)

            # do station checking
            # if station have value then move means is ready to leave 
            # only will have 3 case , station = None / station == tempStation / station != tempStation 
            tempStation = getRegion(busCentroid[busId]["centroid"][0])
            if busCentroid[busId]["station"] is None or (busCentroid[busId]["status"] != BusStatus.LEAVING and busCentroid[busId]["status"] != BusStatus.WAITING) : 
                # since waiting and leaving wont affect the station 
                # resting also wont
                busCentroid[busId]["station"] = tempStation

            if isLeave(bbox) : 
                # check the xyxy is outside already or not 
                # if yes then clear the busCentroid
                # remove the bus from centroid and get next bus schedule time 
                busCentroid.pop(busId, None)

            print(f"bus > {busCentroid}")
            frame = annotatedFrame
    else : 
        print("nothings")

    # Draw bounding box
    x_min, y_min, x_max, y_max = map(int, bbox)
    cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)  # Green box

    # Display Bus ID, Status, and Station
    text = (f"ID: {busId}, "
            f"Status: {busCentroid[busId]['status']}, "
            f"Station: {busCentroid[busId]['station']}")
    cv2.putText(frame, text, (x_min, y_min - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Display Centroid
    centroid_text = f"Centroid: ({int(centerX)}, {int(centerY)})"
    cv2.putText(frame, centroid_text, (x_min, y_max + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
    cv2.circle(frame, (int(centerX), int(centerY)), 5, (0, 0, 255), -1)  # Red dot for centroid

    cv2.imshow("show", frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break


0: 384x640 1 bus, 523.2ms
Speed: 3.5ms preprocess, 523.2ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)
bus > {np.int64(5): {'centroid': array([     13.699,      497.91], dtype=float32), 'numberMatch': 0, 'station': '5', 'status': <BusStatus.DETECTING: 'Detecting'>}}

0: 384x640 1 bus, 509.2ms
Speed: 4.0ms preprocess, 509.2ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)
bus > {np.int64(5): {'centroid': array([     13.699,      497.91], dtype=float32), 'numberMatch': 1, 'station': '5', 'status': <BusStatus.DETECTING: 'Detecting'>}}

0: 384x640 1 bus, 478.3ms
Speed: 3.0ms preprocess, 478.3ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)
bus > {np.int64(5): {'centroid': array([     18.571,      491.76], dtype=float32), 'numberMatch': 0, 'station': '5', 'status': <BusStatus.DETECTING: 'Detecting'>}}

0: 384x640 1 bus, 443.2ms
Speed: 3.0ms preprocess, 443.2ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)
bus > {n