# Stop Detection

## Library

In [1]:
from google.cloud import storage
from google.oauth2 import service_account
import supervision as sv
from config import *
import tempfile
import cv2
import os
import av



## Convert avi to mp4

In [5]:
import os
import subprocess

def convert_avi_to_mp4(input_file, output_file):
    """
    Converts an AVI video file to MP4 format using FFmpeg.
    
    This function re-encodes the video and audio to ensure compatibility with
    the MP4 container format.
    """
    if not os.path.exists(input_file):
        print(f"Error: Input file '{input_file}' not found.")
        return False

    try:
        command = [
            'ffmpeg',
            '-i', input_file,
            '-c:v', 'libx264',
            '-c:a', 'aac',
            '-y',
            output_file
        ]
        
        print(f"Converting '{input_file}' to '{output_file}' with re-encoding...")
        subprocess.run(command, check=True)
        print("Conversion successful!")
        return True
    except subprocess.CalledProcessError as e:
        print(f"An error occurred during conversion: {e}")
        return False
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return False

if __name__ == '__main__':
    # ... (rest of your example code to create the dummy file) ...
    # This part of your code is already working correctly.
    
    input_video = 'video1.avi'  # Use the correct filename
    output_video = 'my_video_converted.mp4'
    
    # Call the conversion function with the re-encoding command
    if convert_avi_to_mp4(input_video, output_video):
        print(f"Video saved as '{output_video}'")
    else:
        print("Video conversion failed.")

    # Clean up the dummy file
    if os.path.exists(input_video):
        os.remove(input_video)
        print(f"Cleaned up dummy file '{input_video}'")

Converting 'video1.avi' to 'my_video_converted.mp4' with re-encoding...


ffmpeg version 7.1.1 Copyright (c) 2000-2025 the FFmpeg developers
  built with clang version 19.1.7
  configuration: --prefix=/opt/anaconda3/envs/adaro_ml --cc=arm64-apple-darwin20.0.0-clang --cxx=arm64-apple-darwin20.0.0-clang++ --nm=arm64-apple-darwin20.0.0-nm --ar=arm64-apple-darwin20.0.0-ar --disable-doc --enable-openssl --enable-demuxer=dash --enable-hardcoded-tables --enable-libfreetype --enable-libharfbuzz --enable-libfontconfig --enable-libopenh264 --enable-libdav1d --enable-cross-compile --arch=arm64 --target-os=darwin --cross-prefix=arm64-apple-darwin20.0.0- --host-cc=/Users/runner/miniforge3/conda-bld/ffmpeg_1753272244790/_build_env/bin/x86_64-apple-darwin13.4.0-clang --enable-neon --disable-gnutls --enable-libvpx --enable-libass --enable-pthreads --enable-libopenvino --enable-gpl --enable-libx264 --enable-libx265 --enable-libmp3lame --enable-libaom --enable-libsvtav1 --enable-libxml2 --enable-pic --enable-shared --disable-static --enable-version3 --enable-zlib --enable-lib

Conversion successful!
Video saved as 'my_video_converted.mp4'
Cleaned up dummy file 'video1.avi'


[af#0:1 @ 0x131f84600] No filtered frames for output stream, trying to initialize anyway.
[aac @ 0x131f83580] Too many bits 8832.000000 > 6144 per frame requested, clamping to max
Output #0, mp4, to 'my_video_converted.mp4':
  Metadata:
    VGN0            : CAM SIMPANG HILL 11
    VGT0            : � o2�?
    VGT1            : ?�s2�?
    encoder         : Lavf61.7.100
  Stream #0:0: Video: h264 (avc1 / 0x31637661), yuvj420p(pc, bt709, progressive), 800x450 [SAR 1:1 DAR 16:9], q=2-31, 30 fps, 15360 tbn
      Metadata:
        encoder         : Lavc61.19.101 libx264
      Side data:
        cpb: bitrate max/min/avg: 0/0/0 buffer size: 0 vbv_delay: N/A
  Stream #0:1: Audio: aac (LC) (mp4a / 0x6134706D), 8000 Hz, mono, fltp, 48 kb/s
      Metadata:
        encoder         : Lavc61.19.101 aac
[out#0/mp4 @ 0x131f19830] video:13560KiB audio:0KiB subtitle:0KiB other streams:0KiB global headers:0KiB muxing overhead: 0.736782%
frame= 8536 fps=967 q=-1.0 Lsize=   13660KiB time=N/A bitrate=N/A sp

## Read Data from GCS

In [3]:
def download_gcs_video_with_specific_key(key_file_path, bucket_name, source_blob_name, destination_file_name):
    """
    Downloads a blob from GCS by explicitly specifying the service account key file.
    """
    try:
        # Initialize the client using the service account key file
        storage_client = storage.Client.from_service_account_json(key_file_path)

        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(source_blob_name)

        blob.download_to_filename(destination_file_name)
        print(f"Video '{source_blob_name}' downloaded to '{destination_file_name}' using service account key.")
        return True
    except Exception as e:
        print(f"Error downloading video '{source_blob_name}' using service account key: {e}")
        # Provide more specific error hints if possible
        if "401 Unauthorized" in str(e) or "access denied" in str(e).lower():
            print("Hint: Check if the service account has the necessary permissions (e.g., Storage Object Viewer).")
        elif "404 Not Found" in str(e):
             print("Hint: The bucket name or object name might be incorrect, or the object does not exist.")
        return False


In [4]:
SERVICE_ACCOUNT_KEY_PATH = "adaro-vision-poc-e36a1eaf86f5.json"

# Your Google Cloud Storage details (from our previous discussion)
GCS_BUCKET_NAME = "vision-poc-bucket-adaro"
your_video_blob_name = "2025-07-22/14/2025-07-22 14-00-00~14-05-03.avi" # The exact object name
local_download_path = "downloaded_video_from_sa_key.avi"

# --- Verification ---
if not os.path.exists(SERVICE_ACCOUNT_KEY_PATH):
    print(f"Error: Service account key file not found at '{SERVICE_ACCOUNT_KEY_PATH}'")
    print("Please ensure the JSON key file is in the same directory as this script, or provide its full path.")
else:
    print(f"Attempting to download video '{your_video_blob_name}' from bucket '{GCS_BUCKET_NAME}'...")
    print(f"Using service account key: {SERVICE_ACCOUNT_KEY_PATH}")

    if download_gcs_video_with_specific_key(
        SERVICE_ACCOUNT_KEY_PATH,
        GCS_BUCKET_NAME,
        your_video_blob_name,
        local_download_path
    ):
        print("\nDownload complete. Now attempting to open with OpenCV...")
        # Process video with OpenCV
        cap = cv2.VideoCapture(local_download_path)
        if not cap.isOpened():
            print("Error: Could not open the downloaded video with OpenCV.")
            print("This might happen if the download was incomplete or the file is corrupted.")
        else:
            print("Video opened successfully with OpenCV for further processing!")
            # Example: Print video properties
            frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
            frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            fps = int(cap.get(cv2.CAP_PROP_FPS))
            frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

            print(f"Video Resolution: {frame_width}x{frame_height}")
            print(f"FPS: {fps}")
            print(f"Total Frames: {frame_count}")

            cap.release()

Attempting to download video '2025-07-22/14/2025-07-22 14-00-00~14-05-03.avi' from bucket 'vision-poc-bucket-adaro'...
Using service account key: adaro-vision-poc-e36a1eaf86f5.json
Video '2025-07-22/14/2025-07-22 14-00-00~14-05-03.avi' downloaded to 'downloaded_video_from_sa_key.avi' using service account key.

Download complete. Now attempting to open with OpenCV...
Video opened successfully with OpenCV for further processing!
Video Resolution: 800x450
FPS: 30
Total Frames: 9257


[h264 @ 0x150ec0060] missing picture in access unit with size 56


In [20]:
def get_first_frame_from_gcs(gcs_blob_path,video_name,SERVICE_ACCOUNT_KEY_PATH,GCS_BUCKET_NAME):


    try:
        storage_client = storage.Client.from_service_account_json(SERVICE_ACCOUNT_KEY_PATH)
        bucket = storage_client.bucket(GCS_BUCKET_NAME)
        blob = bucket.blob(gcs_blob_path)
    except Exception as e:
        print(f"Error initializing GCS client or blob: {e}")
        print(f"Please check if '{SERVICE_ACCOUNT_KEY_PATH}' exists and is a valid JSON key.")
        return

    temp_video_file = None
    try:
        # Create a temporary file to store the downloaded video
        # NamedTemporaryFile ensures it's cleaned up automatically
        # 'delete=False' allows OpenCV to open it after creation, and we'll manually delete.
        temp_video_file = tempfile.NamedTemporaryFile(suffix=".avi", delete=False)
        temp_video_path = temp_video_file.name
        temp_video_file.close() # Close the file handle so blob.download_to_filename can write

        print(f"Downloading '{gcs_blob_path}' from GCS to '{temp_video_path}'...")
        blob.download_to_filename(temp_video_path)
        print("Download complete.")

        # 4. Use sv.get_video_frames_generator with the temporary file
        frames_generator = sv.get_video_frames_generator(source_path=temp_video_path)
        
        # 5. Get the first frame
        iterator = iter(frames_generator)
        first_frame = next(iterator)

        # 6. Save the first frame
        output_dir = "first_frames/"
        os.makedirs(output_dir, exist_ok=True)
        output_path = f"{output_dir}{video_name}.png"
        cv2.imwrite(output_path, first_frame)
        print(f"First frame saved to '{output_path}'")

    except Exception as e:
        print(f"An error occurred during video processing: {e}")
        if "404 Not Found" in str(e):
            print(f"Error: The video object '{gcs_blob_path}' was not found in bucket '{GCS_BUCKET_NAME}'.")
            print("Please double-check the 'video_name' and the constructed GCS path.")
        elif "access denied" in str(e).lower() or "unauthorized" in str(e).lower():
            print("Error: Service account does not have sufficient permissions to access the video.")
            print("Ensure the service account has 'Storage Object Viewer' role on the bucket.")
    finally:
        # 7. Clean up the temporary video file
        if temp_video_file and os.path.exists(temp_video_path):
            os.remove(temp_video_path)
            print(f"Cleaned up temporary file: {temp_video_path}")

In [21]:
get_first_frame_from_gcs(your_video_blob_name,'14-00-00~14-05-03',SERVICE_ACCOUNT_KEY_PATH,GCS_BUCKET_NAME)

Downloading '2025-07-22/14/2025-07-22 14-00-00~14-05-03.avi' from GCS to '/var/folders/3j/7f88lb_d7nxccf5l_r8b02bh0000gn/T/tmpo34gsibl.avi'...
Download complete.
First frame saved to 'first_frames/14-00-00~14-05-03.png'
Cleaned up temporary file: /var/folders/3j/7f88lb_d7nxccf5l_r8b02bh0000gn/T/tmpo34gsibl.avi


[h264 @ 0x12cc437c0] missing picture in access unit with size 56


In [9]:
def get_first_frame_from_local(local_video_path, video_name):
    """
    Loads a local video file, extracts the first frame, and saves it as a PNG image.

    Args:
        local_video_path (str): The full path to the local video file.
        video_name (str): The base name for the output PNG file.
    """
    # 1. Check if the local video file actually exists
    if not os.path.exists(local_video_path):
        print(f"Error: Video file not found at '{local_video_path}'")
        return

    try:
        # 2. Get a frame generator directly from the local video path
        frames_generator = sv.get_video_frames_generator(source_path=local_video_path)
        
        # 3. Get the first frame from the generator
        first_frame = next(iter(frames_generator))

        # 4. Define the output directory and path, then save the frame
        output_dir = "first_frames/"
        os.makedirs(output_dir, exist_ok=True)
        output_path = f"{output_dir}{video_name}.png"
        
        cv2.imwrite(output_path, first_frame)
        print(f"✅ First frame saved to '{output_path}'")

    except StopIteration:
        print(f"Error: Could not read any frames from '{local_video_path}'. The video might be empty or corrupted.")
    except Exception as e:
        print(f"An error occurred during video processing: {e}")

In [10]:
get_first_frame_from_local('video2.mp4','video2')

✅ First frame saved to 'first_frames/video2.png'


## Process Video

In [3]:
class StopVideo:
    def __init__(self, video_name,key_file_path,gcs_bucket_name,video_blob_name, local_video_path):
        self.video_name = video_name
        self.output_path = f"output/{video_name}"
        os.makedirs(self.output_path, exist_ok=True)
        #GCS Video
        # self.credentials = service_account.Credentials.from_service_account_file(key_file_path)
        self.storage_client = storage.Client.from_service_account_json(key_file_path)
        self.gcs_bucket_name = gcs_bucket_name
        self.video_blob_name = video_blob_name
        self.bucket = self.storage_client.bucket(self.gcs_bucket_name)
        self.local_video_path = local_video_path
        
        # Load video data
        self.video_info, self.frames_generator = self.load_video_local()
        self.stopzone = STOP_ZONE
        self.outzone = OUT_ZONE
        self.road = ROAD
        self.point = POINT
        # Load detection models
        self.model_wheels = get_model("wheels-detection-fgbtv/1") #api_key="3ygJM8lQ7lQrTCOVXwmQ"
        self.model_yolo = get_model("yolov8s-640") #https://inference.roboflow.com/quickstart/aliases/#supported-pre-trained-models
    
        # Initialize trackers
        self.tracker_yolo = sv.ByteTrack(frame_rate=self.video_info.fps)
        self.tracker_wheel = sv.ByteTrack(frame_rate=self.video_info.fps)

        # Box annotators
        self.box_annotator_green = sv.BoxAnnotator(color=sv.Color.GREEN)
        self.box_annotator_gray = sv.BoxAnnotator(color=sv.Color(r=176, g=178, b=181))
        self.box_annotator_red = sv.BoxAnnotator(color=sv.Color.RED)
        self.box_annotator_white = sv.BoxAnnotator(color=sv.Color.WHITE)

        # Label annotators
        scale = 1.05
        self.label_annotator_green = sv.LabelAnnotator(color=sv.Color.GREEN, text_scale=scale)
        self.label_annotator_gray = sv.LabelAnnotator(color=sv.Color(r=176, g=178, b=181), text_scale=scale)
        self.label_annotator_red = sv.LabelAnnotator(color=sv.Color.RED, text_scale=scale)

        # Zones
        self.stopzone, self.stopzone_annotator = self.add_zone(sv.Color.GREEN, STOP_ZONE)
        self.outzone, self.outzone_annotator = self.add_zone(sv.Color.RED, OUT_ZONE)   
        self.road, self.road_annotator = self.add_zone(sv.Color.BLUE, ROAD)             

        self.point = POINT

    def inference(self):
        """
        Performs inference to get car and wheel detections.
        """
        # Bbox detections of car locations. Key is frame number, value is the detections
        self.car_detections = {}
        # Bbox detections of wheel locations. Key is frame number, value is the detections
        self.wheel_detections = {}
        # Report for each frame. Key is frame number, value is the report (created in in construct_report())
        self.report = {}
        # Center position of the car. Key is frame number, value is the center position. Used to calculate speed.
        self.car_center_history = {}
        self.first_last_frames = {}
        video_capture = self.frames_generator
        frame_no = 0
        with tqdm(total=self.video_info.total_frames, desc="Processing Frames") as pbar:
            while True:
                ret, frame = video_capture.read()
                # If 'ret' is False, we've reached the end of the video
                if not ret:
                    break
                #### 1. For veichle detection ####
                # Perform inference on the frame - This detects all objects defined by the model
                results_yolo = self.model_yolo.infer(frame)[0]
                detections_yolo_all = sv.Detections.from_inference(results_yolo)
                # Filter to only include vehicles
                detections_yolo_vehicle = detections_yolo_all[self.is_vehicle(detections_yolo_all)]

                # Update the tracker with the detections - to get ID of the vehicle
                detections_yolo_vehicle = self.tracker_yolo.update_with_detections(detections_yolo_vehicle)

                car_ids = self.get_ids(detections_yolo_vehicle)
                for id in car_ids:
                    if id not in self.first_last_frames:
                        self.first_last_frames[id] = {"first": frame_no, "last": frame_no}
                    else:
                        self.first_last_frames[id]["last"] = frame_no
                
                # Filter to only include vehicles on the road of the stop sign
                on_road_mask = self.road.trigger(detections_yolo_vehicle)
                detections_yolo_vehicle = detections_yolo_vehicle[on_road_mask]

                # Filter to only include the vehicle closest to the point
                detections_yolo_vehicle = self.return_target(detections_yolo_vehicle)
                
                
                # Extract the vehicle from the frame
                vehicle, vehicle_coords = self.extract_vehicle(frame, detections_yolo_vehicle)
                
                # Save car detections - if we have any
                if len(detections_yolo_vehicle) > 0:
                    self.car_detections[frame_no] = detections_yolo_vehicle

                #### 2. For wheel detection ####
                # If there is a vehicle in the frame, we perform wheel detection
                if vehicle is not None:
                    # Perform inference on the vehicle
                    results_wheel = self.model_wheels.infer(np.array(vehicle))[0]
                    detections_wheel = sv.Detections.from_inference(results_wheel)

                    # If we found a wheel in the sliced image, we need to update the coordinates to the full image
                    if len(detections_wheel) > 0:
                        detections_wheel = self.update_wheel_coords(detections_wheel, vehicle_coords)
                    detections_wheel_id = self.tracker_wheel.update_with_detections(detections_wheel)
                    # Save wheel detection if we have any
                    if len(detections_wheel_id) > 0:
                        self.wheel_detections[frame_no] = detections_wheel_id

                # Construct frame report
                self.report[frame_no] = self.construct_report(frame_no)
                pbar.update(1)  # Manually update the progress bar
                frame_no += 1
        return self.report
    def is_vehicle(self, detections):
        """ Filters detections to only include vehicles"""
        vehicle_class_ids = [2, 3, 5, 7] # ["car", "motorcycle", "bus", "truck"]
        return [True if x in vehicle_class_ids else False for x in detections.class_id]
    
    def get_ids(self, detections):
        """ Returns the ids of the detections (car or wheel ids) """
        return [detection[4] for detection in detections]
    
    def return_target(self, detections):
        """
        Filters detections to only include the vehicle closest to the point.
        This is helpful if there is a queue of cars and we only want to track the car closest to the stopline.
        """
        if len(detections) == 0:
            return detections
        else:
            distances = []
            for detection in detections:
                distance_to_point = self.get_distance_to_point(detection)
                distances.append(distance_to_point)
            
            # Find the index of the detection with the minimum distance
            min_dist_index = np.argmin(distances)
            
            # Return the detection with the minimum distance
            closest_detection = detections[min_dist_index:min_dist_index+1]
            return closest_detection
        
    def get_distance_to_point(self, detection):
        """ Returns the distance from the center of the detection to the point """
        x1, y1, x2, y2 = detection[0]
        center = ((x1 + x2) / 2, (y1 + y2) / 2)
        return ((center[0] - self.point[0]) ** 2 + (center[1] - self.point[1]) ** 2) ** 0.5
        
    def extract_vehicle(self, frame, detections):
        """
        Extracts the vehicle from the frame

        Args:
            frame: The frame to extract the vehicle from
            detections: The detections of the vehicle

        Returns:
            vehicle: The vehicle extracted from the frame
            vehicle_coords: The coordinates of the vehicle in the frame
        """
        if len(detections) == 0:
            return None, None
        # Cut the frame to the bounding box of the vehicle and return it. We will alwas one have one vehicle in the frame
        for detection in detections:
            # define the bounding box
            x1, y1, x2, y2 = detection[0]
            vehicle = frame[int(y1):int(y2), int(x1):int(x2)]
            vehicle_coords = (int(x1), int(y1), int(x2), int(y2))
            return vehicle, vehicle_coords

    def update_wheel_coords(self, wheel_detections, vehicle_coords):
        """ Adjusts the wheel coordinates to the full frame"""
        x1, y1, x2, y2 = vehicle_coords
        for i, xyxy in enumerate(wheel_detections.xyxy):
            w_x1, w_y1, w_x2, w_y2 = xyxy
            nw_x1 = w_x1 + x1
            nw_y1 = w_y1 + y1
            nw_x2 = w_x2 + x1
            nw_y2 = w_y2 + y1
            wheel_detections.xyxy[i] = [nw_x1, nw_y1, nw_x2, nw_y2]
        return wheel_detections

    def construct_report(self, frame_no):
        """
        Construct a summary report for a given frame.

        Args:
            frame_no (int): The frame number to construct the report for.

        Returns:
            dict: A nested dictionary with the following structure:
                {
                    car_id: {
                        "wheel": [
                            {
                                wheel_id: {
                                    "stopzone": bool,
                                    "outzone": bool
                                }
                            },
                            ...
                        ],
                        "speed": float,
                        "car_center": tuple
                    }
                }
                If no car is detected in the frame, returns an empty dictionary.
                If a car is detected but no wheels are detected, returns a dictionary with car_id and an empty dictionary for wheels.
        """
        # 1. If we dont have a car in frame
        if frame_no not in self.car_detections:
            return {}
        
        car_id = self.get_ids(self.car_detections[frame_no])[0]

        # 2. If we dont have a wheel in frame
        if frame_no not in self.wheel_detections:
            return {car_id:{}}

        # 3. If we have a car and a wheel in frame
        wheel_ids = self.get_ids(self.wheel_detections[frame_no])

        in_stopping_zone = self.stopzone.trigger(self.wheel_detections[frame_no])
        in_outzone = self.outzone.trigger(self.wheel_detections[frame_no])
        wheel_lst = []
        for wheel, stop, out in zip(wheel_ids, in_stopping_zone, in_outzone):
            wheel_dict = {wheel:{"stopzone":stop, "outzone": out}}
            wheel_lst.append(wheel_dict)

        ### Speed
        car_center = self.get_car_center(frame_no)
        if car_id not in self.car_center_history:
            self.car_center_history[car_id] = [None, car_center]
        else:
            self.car_center_history[car_id].append(car_center)
        
        if self.car_center_history[car_id][-2] is not None:
            distance_moved = self.get_distance(self.car_center_history[car_id][-2], car_center)
            current_speed = distance_moved * self.video_info.fps
        else:
            current_speed = None
        
        # Return the dict for the frame
        return {car_id:{"wheel":wheel_lst, "speed":current_speed, "car_center":car_center}}
    
    def get_car_center(self, frame_no):
        """ Returns the center of the car in the frame """
        x1, y1, x2, y2 = self.car_detections[frame_no].xyxy[0]
        center = (x1+x2)/2, (y1+y2)/2
        return center

    def get_distance(self, prev_center, current_center):
        """
        Returns the distance between two points.
        Used to calculate speed.
        """
        return ((prev_center[0]-current_center[0])**2 + (prev_center[1]-current_center[1])**2)**0.5


    def load_video(self):
        """
        Downloads the video from Google Cloud Storage to a temporary file,
        then loads its information and returns the cv2.VideoCapture object.

        Returns:
            tuple: A tuple containing video_info, cv2.VideoCapture object,
                   stopzone, outzone, point, road.
        """
        print(f"Downloading video '{self.video_blob_name}' from bucket '{self.gcs_bucket_name}'...")
        blob = self.bucket.blob(self.video_blob_name)

        with tempfile.NamedTemporaryFile(suffix=".avi", delete=False) as temp_file:
            self._temp_video_path = temp_file.name
            blob.download_to_filename(self._temp_video_path)

        print(f"Video downloaded to temporary file: '{self._temp_video_path}'")

        try:
            # Get video info using supervision
            video_info = sv.VideoInfo.from_video_path(video_path=self._temp_video_path)
            
            # Create the OpenCV VideoCapture object
            cap = cv2.VideoCapture(self._temp_video_path)

            if not cap.isOpened():
                print(f"ERROR: Failed to open video file with OpenCV: {self._temp_video_path}")
                # Ensure cleanup if video can't even be opened locally
                self.cleanup_temp_file()
                raise IOError("Could not open video file with OpenCV.")

            print(f"Video opened successfully from: {self._temp_video_path}")
            print(f"Video Resolution: {video_info.width}x{video_info.height}")
            print(f"FPS: {video_info.fps}")
            print(f"Total Frames: {video_info.total_frames}")

            # Return the cap object along with other necessary data
            return video_info, cap

        except Exception as e:
            print(f"Error in load_video: {e}")
            self.cleanup_temp_file()
            raise
        
    def load_video_local(self):
        """
        Loads a local video file, gets its information, and returns the
        cv2.VideoCapture object.

        Returns:
            tuple: A tuple containing video_info and the cv2.VideoCapture object.
        """
        # 1. Use the local video path directly
        print(f"Loading video from local path: '{self.local_video_path}'")

        try:
            # 2. Get video info using supervision from the local path
            video_info = sv.VideoInfo.from_video_path(video_path=self.local_video_path)
            
            # 3. Create the OpenCV VideoCapture object from the local path
            cap = cv2.VideoCapture(self.local_video_path)

            # Check if the video was opened successfully
            if not cap.isOpened():
                raise IOError(f"Error: Failed to open video file with OpenCV at '{self.local_video_path}'")

            print("Video opened successfully.")
            print(f"Resolution: {video_info.width}x{video_info.height}, FPS: {video_info.fps}, Total Frames: {video_info.total_frames}")

            # 4. Return the video info and the capture object
            return video_info, cap

        except Exception as e:
            print(f"An error occurred in load_video: {e}")
            raise

    def add_zone(self, color, zone_coords):
        """Adds a zone to the video, used to detect if a vehicle is in the stopzone, outzone or road"""
        zone = sv.PolygonZone(zone_coords)
        zone_annotator = sv.PolygonZoneAnnotator(
            display_in_zone_count=False,
            zone=zone,
            color=color)
        return zone, zone_annotator
    

    def annotate_image_with_zones(self, input_image_path: str):
        print(f"Attempting to load image from: {input_image_path}")
        image = cv2.imread(input_image_path)

        if image is None:
            print(f"ERROR: Could not load image from {input_image_path}. Please check the path and file integrity.")
            return

        if self.video_info and (image.shape[1] != self.video_info.width or image.shape[0] != self.video_info.height):
             print(f"Resizing image from {image.shape[1]}x{image.shape[0]} to {self.video_info.width}x{self.video_info.height}")
             image = cv2.resize(image, (self.video_info.width, self.video_info.height))


        # Annotate stop zone
        annotated_stopzone_image = self.stopzone_annotator.annotate(
            scene=image.copy(), # Use a copy to avoid modifying the original image for subsequent annotations
            # No 'detections' argument needed here
        )
        stopzone_output_path = os.path.join( f"{self.video_name}_annotated_stopzone.png")
        cv2.imwrite(stopzone_output_path, annotated_stopzone_image)
        print(f"Annotated stop zone image saved to: {stopzone_output_path}")

        # Annotate out zone
        annotated_outzone_image = self.outzone_annotator.annotate(
            scene=image.copy(),
            # No 'detections' argument needed here
        )
        outzone_output_path = os.path.join(f"{self.video_name}_annotated_outzone.png")
        cv2.imwrite(outzone_output_path, annotated_outzone_image)
        print(f"Annotated out zone image saved to: {outzone_output_path}")

        # Annotate road zone
        annotated_road_image = self.road_annotator.annotate(
            scene=image.copy(),
            # No 'detections' argument needed here
        )
        road_output_path = os.path.join(f"{self.video_name}_annotated_road.png")
        cv2.imwrite(road_output_path, annotated_road_image)
        print(f"Annotated road zone image saved to: {road_output_path}")

        # Optional: Save an image with ALL zones combined
        all_zones_image = image.copy()
        all_zones_image = self.stopzone_annotator.annotate(scene=all_zones_image)
        all_zones_image = self.outzone_annotator.annotate(scene=all_zones_image)
        all_zones_image = self.road_annotator.annotate(scene=all_zones_image)
        all_zones_path = os.path.join(f"{self.video_name}_annotated_all_zones.png")
        cv2.imwrite(all_zones_path, all_zones_image)
        print(f"Annotated all zones image saved to: {all_zones_path}")
                        

    def process_video(self):
        """
        Loops through video frames, applies annotations, and saves the result
        to a new video file without displaying it live.
        """
        video_capture = self.frames_generator
        
        # 1. Define the output path for the new video
        output_video_path = f"{self.output_path}/annotated_video_v2.mp4"
        print(f"Processing video. Output will be saved to: {output_video_path}")

        # 2. Use sv.VideoSink to handle video writing
        with sv.VideoSink(target_path=output_video_path, video_info=self.video_info) as sink:
            while video_capture.isOpened():
                ret, frame = video_capture.read()
                if not ret:
                    break

                # --- Annotation logic (remains the same) ---
                annotated_frame = frame.copy()
                annotated_frame = self.stopzone_annotator.annotate(scene=annotated_frame)
                annotated_frame = self.outzone_annotator.annotate(scene=annotated_frame)
                annotated_frame = self.road_annotator.annotate(scene=annotated_frame)

                # 3. Write the annotated frame to the video file
                sink.write_frame(frame=annotated_frame)

        # Release the video capture object
        video_capture.release()
        print(f"✅ Video processing complete. File saved to {output_video_path}")
            
    def cleanup_temp_file(self):
        """Deletes the temporary video file if it exists."""
        if self._temp_video_path and os.path.exists(self._temp_video_path):
            print(f"Cleaning up temporary file: {self._temp_video_path}")
            os.remove(self._temp_video_path)
            self._temp_video_path = None # Reset the path after deletion

    def analyze(self):
        wheels_in_stopzone = {}
        wheels_in_outzone = {}
        self.car_left_zone = {}
        self.car_speeds = {}
        stopped = {}
        car_id_history = []
        
        self.reported_wheels = {}
        self.analysis = {}
        
        for frame_id, frame_dict in self.report.items():
            for car_id, vehicle_dict in frame_dict.items():

                if car_id not in car_id_history:
                    car_id_history.append(car_id)

                if len(vehicle_dict)==0:
                    continue
                
                # Add the wheels in the stopzone
                if car_id not in wheels_in_stopzone:
                    wheels_in_stopzone[car_id] = []
                else:
                    for wheel_dict in vehicle_dict["wheel"]:
                        for wheel_id, wheel_info in wheel_dict.items():
                            if wheel_info["stopzone"] and wheel_id not in wheels_in_stopzone[car_id]:
                                wheels_in_stopzone[car_id].append(wheel_id)

                # Add the wheels in the outzone
                if car_id not in wheels_in_outzone:
                    wheels_in_outzone[car_id] = []
                else:
                    for wheel_dict in vehicle_dict["wheel"]:
                        for wheel_id, wheel_info in wheel_dict.items():
                            if wheel_info["outzone"] and wheel_id not in wheels_in_outzone[car_id]:
                                wheels_in_outzone[car_id].append(wheel_id)
                
                # Save speed if the car is in the stopzone 
                if len(wheels_in_stopzone[car_id])>0:
                    if car_id not in self.car_speeds:
                        self.car_speeds[car_id] = []
                    self.car_speeds[car_id].append({"frame_no":frame_id, "speed":vehicle_dict['speed']})
                
                # Detect if the car has a wheel that has been in the stopzone that is now in the outzone
                for wheel in wheels_in_stopzone[car_id]:
                    if wheel in wheels_in_outzone[car_id]:
                        if car_id not in self.reported_wheels:
                            self.reported_wheels[car_id] = wheel
                        if car_id not in stopped:
                            filtered_speeds = [x for x in self.car_speeds[car_id] if x["speed"] is not None]
                            min_speed_in_stopzone = min(filtered_speeds, key=lambda x: x['speed'])
                            if min_speed_in_stopzone["speed"] < 15: # TODO: add threshold to config
                                stopped[car_id] = True
                                self.car_left_zone[car_id] = min_speed_in_stopzone["frame_no"]
                            else:
                                stopped[car_id] = False
                                self.car_left_zone[car_id] = frame_id
        
        # Construct the analysis
        for car_id in car_id_history:
            status = "Could not detect" if car_id not in stopped else "Stopped" if stopped[car_id] else "Failed to stop"
            # Only add the car to the analysis if it has been in the stopzone and outzone
            if status != "Could not detect":
                self.analysis[car_id] = {"First Entrance": self.first_last_frames[car_id]["first"],
                                        "Last Exit": self.first_last_frames[car_id]["last"],
                                        "Status": status}
                
    def render_video_simple(self):
        # We need to open the video file again to read it from the start
        video_capture = cv2.VideoCapture(self.local_video_path)
        frame_no = 1

        with sv.VideoSink(target_path=f"{self.output_path}/inference_simple.mp4", video_info=self.video_info) as sink:
            # Manually set up the tqdm progress bar
            with tqdm(total=self.video_info.total_frames, desc="Rendering Video") as pbar:
                # Use a while loop to correctly read frames
                while True:
                    ret, frame = video_capture.read()
                    if not ret:
                        break  # End of video

                    annotated_frame = frame.copy()

                    if frame_no in self.car_detections:
                        car_detections = self.car_detections[frame_no]
                        car_id = self.get_ids(car_detections)[0]
                        
                        if car_id not in self.reported_wheels:
                            sink.write_frame(annotated_frame)
                            pbar.update(1)
                            frame_no += 1
                            continue
                        
                        label = self.add_label(frame_no)
                        color = self.get_color(label)
                        
                        if color == "green":
                            annotated_frame = self.label_annotator_green.annotate(annotated_frame, car_detections, labels=label)
                            annotated_frame = self.box_annotator_green.annotate(annotated_frame, car_detections)
                        elif color == "red":
                            annotated_frame = self.label_annotator_red.annotate(annotated_frame, car_detections, labels=label)
                            annotated_frame = self.box_annotator_red.annotate(annotated_frame, car_detections)
                        else:
                            annotated_frame = self.label_annotator_gray.annotate(annotated_frame, car_detections, labels=label)
                            annotated_frame = self.box_annotator_gray.annotate(annotated_frame, car_detections)

                    sink.write_frame(annotated_frame)
                    
                    pbar.update(1)  # Manually update the progress bar
                    frame_no += 1   # Manually increment the frame counter
        # Release the video capture object
        video_capture.release()
        print("Video rendering complete.")

    def render_video_advanced(self):
        # We must open the video file again to read it from the start
        video_capture = cv2.VideoCapture(self.local_video_path)
        frame_no = 1

        with sv.VideoSink(target_path=f"{self.output_path}/inference_advanced.mp4", video_info=self.video_info) as sink:
            # Manually set up the tqdm progress bar
            with tqdm(total=self.video_info.total_frames, desc="Rendering Advanced Video") as pbar:
                # Use a while loop to correctly read frames
                while True:
                    ret, frame = video_capture.read()
                    if not ret:
                        break  # End of video

                    annotated_frame = frame.copy()
                    annotated_frame = self.stopzone_annotator.annotate(scene=annotated_frame)
                    annotated_frame = self.outzone_annotator.annotate(scene=annotated_frame)
                    annotated_frame = self.road_annotator.annotate(scene=annotated_frame)

                    point_x, point_y = self.point
                    cv2.circle(annotated_frame, (int(point_x), int(point_y)), 10, (0, 255, 0), -1)

                    if frame_no in self.car_detections:
                        car_detections = self.car_detections[frame_no]
                        car_id = self.get_ids(car_detections)[0]
                        if car_id not in self.reported_wheels:
                            sink.write_frame(annotated_frame)
                            pbar.update(1)
                            frame_no += 1
                            continue
                        
                        car_center = self.get_car_center(frame_no)
                        cv2.circle(annotated_frame, (int(car_center[0]), int(car_center[1])), 10, (0, 255, 0), -1)
                        cv2.line(annotated_frame, (int(point_x), int(point_y)), (int(car_center[0]), int(car_center[1])), (0, 255, 0), 2, cv2.LINE_AA)
                        
                        label = self.add_label(frame_no)
                        color = self.get_color(label)
                        if color == "green":
                            annotated_frame = self.label_annotator_green.annotate(annotated_frame, car_detections, labels=label)
                            annotated_frame = self.box_annotator_green.annotate(annotated_frame, car_detections)
                        elif color == "red":
                            annotated_frame = self.label_annotator_red.annotate(annotated_frame, car_detections, labels=label)
                            annotated_frame = self.box_annotator_red.annotate(annotated_frame, car_detections)
                        else:
                            annotated_frame = self.label_annotator_gray.annotate(annotated_frame, car_detections, labels=label)
                            annotated_frame = self.box_annotator_gray.annotate(annotated_frame, car_detections)

                    if frame_no in self.wheel_detections and 'car_id' in locals() and car_id in self.reported_wheels:
                        wheel_detections = self.wheel_detections[frame_no]
                        wheel_detections = self.filter_detections(wheel_detections, self.reported_wheels[car_id])
                        annotated_frame = self.box_annotator_white.annotate(annotated_frame, wheel_detections)
                    
                    sink.write_frame(annotated_frame)

                    pbar.update(1)  # Manually update the progress bar
                    frame_no += 1   # Manually increment the frame counter

        video_capture.release()
        print("Advanced video rendering complete.")

                
    def filter_detections(self, detections, id):
        """
        Filters detections to only include the detections with the given id.
        Used to only show the front wheel.
        """
        boo = [True if detection[4] == id else False for detection in detections]
        return detections[boo]

    def get_color(self, label):
        """ Returns the color of the label """
        if "Stopped" in label[0]:
            return "green"
        elif "Failed" in label[0]:
            return "red"
        else:
            return "gray"

    def add_label(self, frame_no):
        """ Construct a label for a given frame."""
        # If we have a car in the frame
        if frame_no in self.car_detections:
            car_id = self.get_ids(self.car_detections[frame_no])[0]
            status = self.analysis[car_id]['Status'] # This is the label from the analysis (Stopped, Failed to stop, Could not detect)

            if car_id in self.car_left_zone and frame_no>=self.car_left_zone[car_id]:
                return [f"{status} (id:{car_id})"]
            else:
                return [f"Detected (id:{car_id})"]

In [4]:
first_video = StopVideo('video1',SERVICE_ACCOUNT_KEY_PATH,GCS_BUCKET_NAME,your_video_blob_name,'video1.avi')


Loading video from local path: 'video1.avi'
Video opened successfully.
Resolution: 800x450, FPS: 30, Total Frames: 9257


[h264 @ 0x12e3a2110] missing picture in access unit with size 56
[h264 @ 0x12e8355a0] missing picture in access unit with size 56
Specified provider 'CUDAExecutionProvider' is not in available provider names.Available providers: 'CoreMLExecutionProvider, AzureExecutionProvider, CPUExecutionProvider'
Specified provider 'OpenVINOExecutionProvider' is not in available provider names.Available providers: 'CoreMLExecutionProvider, AzureExecutionProvider, CPUExecutionProvider'


In [5]:
first_video.video_info

VideoInfo(width=800, height=450, fps=30, total_frames=9257)

In [6]:
first_video.frames_generator

< cv2.VideoCapture 0x11885dc50>

In [7]:
first_video.process_video()


Processing video. Output will be saved to: output/video1/annotated_video_v2.mp4
✅ Video processing complete. File saved to output/video1/annotated_video_v2.mp4


[h264 @ 0x137a9b9b0] deblocking_filter_idc 32 out of range
[h264 @ 0x137a9b9b0] decode_slice_header error
[h264 @ 0x137a42fa0] No start code is found.
[h264 @ 0x137a42fa0] Error splitting the input into NAL units.


In [7]:
first_video.inference()

Processing Frames:   0%|          | 24/9257 [00:00<02:03, 75.01it/s][h264 @ 0x12e888ae0] deblocking_filter_idc 32 out of range
[h264 @ 0x12e888ae0] decode_slice_header error
Processing Frames:   1%|          | 63/9257 [00:00<01:58, 77.89it/s][h264 @ 0x12e83b690] No start code is found.
[h264 @ 0x12e83b690] Error splitting the input into NAL units.
Processing Frames:   1%|          | 73/9257 [00:00<02:05, 73.12it/s]


{0: {},
 1: {},
 2: {},
 3: {},
 4: {},
 5: {},
 6: {},
 7: {},
 8: {},
 9: {},
 10: {},
 11: {},
 12: {},
 13: {},
 14: {},
 15: {},
 16: {},
 17: {},
 18: {},
 19: {},
 20: {},
 21: {},
 22: {},
 23: {},
 24: {},
 25: {},
 26: {},
 27: {},
 28: {},
 29: {},
 30: {},
 31: {},
 32: {},
 33: {},
 34: {},
 35: {},
 36: {},
 37: {},
 38: {},
 39: {},
 40: {},
 41: {},
 42: {},
 43: {},
 44: {},
 45: {},
 46: {},
 47: {},
 48: {},
 49: {},
 50: {},
 51: {},
 52: {},
 53: {},
 54: {},
 55: {},
 56: {},
 57: {},
 58: {},
 59: {},
 60: {np.int64(3): {}},
 61: {np.int64(3): {}},
 62: {np.int64(3): {}},
 63: {np.int64(3): {}},
 64: {np.int64(3): {}},
 65: {np.int64(3): {}},
 66: {np.int64(3): {}},
 67: {np.int64(3): {}},
 68: {np.int64(3): {}},
 69: {np.int64(3): {}},
 70: {np.int64(3): {}},
 71: {np.int64(3): {}},
 72: {np.int64(3): {}}}

In [None]:
first_video.wheel_detections

{88: Detections(xyxy=array([[708., 142., 715., 156.],
        [733., 151., 743., 167.]]), mask=None, confidence=array([0.91455078, 0.90722656]), class_id=array([0, 0]), tracker_id=array([1, 2]), data={'class_name': array(['wheel', 'wheel'], dtype='<U5')}, metadata={}),
 89: Detections(xyxy=array([[711., 143., 719., 157.],
        [737., 151., 746., 168.]]), mask=None, confidence=array([0.90039062, 0.87255859]), class_id=array([0, 0]), tracker_id=array([1, 2]), data={'class_name': array(['wheel', 'wheel'], dtype='<U5')}, metadata={}),
 90: Detections(xyxy=array([[714., 143., 721., 158.],
        [741., 153., 749., 170.]]), mask=None, confidence=array([0.91357422, 0.87060547]), class_id=array([0, 0]), tracker_id=array([1, 2]), data={'class_name': array(['wheel', 'wheel'], dtype='<U5')}, metadata={}),
 91: Detections(xyxy=array([[717., 145., 725., 159.],
        [744., 155., 753., 171.]]), mask=None, confidence=array([0.89648438, 0.88671875]), class_id=array([0, 0]), tracker_id=array([1, 

In [8]:
len(first_video.report)

9088

In [9]:
first_video.analyze()

In [10]:
analysis = first_video.analysis

In [11]:
first_video.render_video_simple()

Rendering Video: 100%|██████████| 9088/9088 [00:42<00:00, 215.84it/s]

Video rendering complete.





In [13]:
first_video.render_video_advanced()

Rendering Advanced Video: 100%|██████████| 9088/9088 [00:43<00:00, 209.52it/s]

Advanced video rendering complete.





In [12]:
first_video.plot_car_speed(ylim=(0, 800), type="all")

AttributeError: 'StopVideo' object has no attribute 'plot_car_speed'