LIVE STREAM INTO CLIPS

In [None]:
import cv2
import torch
import time
import numpy as np
from ultralytics import YOLO
from datetime import datetime

# Load YOLOv8 model with tracking enabled
model = YOLO("best(3).pt")  # Use yolov8s.pt or larger versions if needed

# Function to create a video writer
def get_video_writer(filename, frame_size, fps=30):
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    return cv2.VideoWriter(filename, fourcc, fps, frame_size)

# Initialize video capture (0 for webcam or replace with RTSP/HTTP stream URL)
cap = cv2.VideoCapture('export6.mp4')

# Object tracking dictionary
active_recordings = {}
frame_size = (int(cap.get(3)), int(cap.get(4)))
frames_per_second = int(cap.get(cv2.CAP_PROP_FPS)) or 30  # Default to 30 if unknown

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    # Run YOLOv8 inference with tracking
    results = model.track(frame, persist=True)
    
    if results[0].boxes.id is not None:
        track_ids = results[0].boxes.id.cpu().numpy()
        detections = results[0].boxes.xyxy.cpu().numpy()
        class_ids = results[0].boxes.cls.cpu().numpy()
    else:
        track_ids, detections, class_ids = [], [], []
    
    detected_objects = set()
    for bbox, cls_id, track_id in zip(detections, class_ids, track_ids):
        label = f"{model.names[int(cls_id)]}_{int(track_id)}"
        detected_objects.add(label)
        
        # Draw bounding boxes
        x1, y1, x2, y2 = map(int, bbox)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        # Start new recording if this specific object is not already being recorded
        if label not in active_recordings:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"clip_{label}_{timestamp}.mp4"
            active_recordings[label] = get_video_writer(filename, frame_size, frames_per_second)
            print(f"Started recording: {filename}")
    
    # Write frames to respective video files
    for label in list(active_recordings.keys()):
        if label in detected_objects:
            active_recordings[label].write(frame)
        else:
            print(f"Clip saved for {label}")
            active_recordings[label].release()
            del active_recordings[label]
    
    # Show frame
    cv2.imshow("YOLOv8 Live Stream", frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Cleanup
cap.release()
for writer in active_recordings.values():
    writer.release()
cv2.destroyAllWindows()

# Instantiate the VehicleSpeedEstimator with lane and distance information
speed_estimator = VehicleSpeedEstimator(names=model.names, line1=line1, line2=line2,
                                         distance_between_lines=distance_between_lines, scale_factor=scale_factor)

# Process the video frames
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Run YOLOv8 inference with tracking
    results = model.track(frame, persist=True)
    
    if results[0].boxes.id is not None:
        track_ids = results[0].boxes.id.cpu().numpy()
        detections = results[0].boxes.xyxy.cpu().numpy()
        class_ids = results[0].boxes.cls.cpu().numpy()
    else:
        track_ids, detections, class_ids = [], [], []

    detected_objects = set()
    for bbox, cls_id, track_id in zip(detections, class_ids, track_ids):
        label = f"{model.names[int(cls_id)]}_{int(track_id)}"
        detected_objects.add(label)

        # Draw bounding boxes
        x1, y1, x2, y2 = map(int, bbox)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Pass the current frame and tracks to the VehicleSpeedEstimator
    current_time = time.time()  # Or any other time mechanism you use
    frame = speed_estimator.process_frame(frame, results, current_time)

    # Display the processed frame
    cv2.imshow("Live Output", frame)
    cv2.waitKey(1)

    # Write the processed frame to the output video
    video_writer.write(frame)
    
    # Increment frame counter
    frame_counter += 1

    # Break the loop if the 'q' key is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        print("Processing terminated by user.")
        break

# Release video capture and writer resources
cap.release()
video_writer.release()
cv2.destroyAllWindows()

print(f"Processing complete. Output saved to {output_video}")


RUNNING PIPELINE ON DIRECTORY

reads the frame / add folder trajectory later

In [1]:
import cv2
import numpy as np
from ultralytics import YOLO
from pedestrianpsm.LaneInput import LaneInput  # Class to input lane regions
from carpsm.PSMorg import SpeedEstimator2  # Updated SpeedEstimator2 class with time increment
from carpsm.time_utils import increment_time, get_time_from_xml  # Import utility functions
from volume.volume import VehicleDetectionWithTracks 
from pedestrianpsm import LaneChecker
from pedestrianpsm.PSMorg import PEDESTRIANESTIMATOR
from pedestrianfeatures.PedestrianOrganization import PedestrianSpeedEstimator
from vehicle.PSMorg import SpeedEstimator2
from vehicle.Speed_3 import SpeedExtractor

Pedestrian 1 traveled 0.71 meters at 0.07 m/s.


In [2]:

# Initialize the YOLO model for object detection
model = YOLO(r'E:\learn phython\speed 2\Speed-detection-of-vehicles\best(3).pt')  # Replace with the path to your YOLO model
names = model.model.names  # List of class names for the YOLO model

# Initialize video capture
video_path = r'E:\learn phython\fydp final\export6.mp4'
cap = cv2.VideoCapture(video_path)
assert cap.isOpened(), f"Error reading video file {video_path}"

# Get video properties (width, height, and FPS)
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, 
                                      cv2.CAP_PROP_FRAME_HEIGHT, 
                                      cv2.CAP_PROP_FPS))

# Video writer to save the processed video with lane detection
output_video = "lane_detection_output.avi"
video_writer = cv2.VideoWriter(output_video, 
                               cv2.VideoWriter_fourcc(*"mp4v"), 
                               fps, (w, h))
frame_counter = 0
skip_frames = 10  # Number of frames to skip after processing one frame
# Initialize LaneInput to capture lane points

# Capture the first frame from the video to input lane points


# LANE INPUT

In [None]:
lane_input = LaneInput()

success, first_frame = cap.read()
if not success:
    raise ValueError("Failed to read the first frame from the video.")

# Input the lane regions in the first frame
lane_input.input_lanes(first_frame)
lane_regions , lane_centers = lane_input.get_lanes_and_centers()  # Get the captured lane regions
print("Captured Lane Regions:", lane_regions ,"lane Centers:" , lane_centers)

# Load initial time from the XML file
xml_file = r"E:\learn phython\speed 2\Speed-detection-of-vehicles\C0148M01.XML"
current_time = get_time_from_xml(xml_file)

# Initialize SpeedEstimator2 for VehICle PSM
# lane_estimator = SpeedEstimator2(
   # names=names,
   # lane_regions=lane_regions,
   # view_img=True,  # Display image flag
   # target_x=500,   # Define the X-coordinate target for lane estimation
   #  csv_file="lane_log.csv" # Define the CSV file for logging)

# Initialize SpeedEstimator2 for Pedestrian PSM
lane_estimator = PEDESTRIANESTIMATOR(
    names=names,
    lane_regions=lane_regions,
    lane_centers= lane_centers,
    view_img=True,  # Display image flag   # Define the X-coordinate target for lane estimation
    csv_file="lane_log.csv"  # Define the CSV file for logging
)
# Frame processing

pedestrian_speed = PedestrianSpeedEstimator (
    names= names
)

Car_Speed = SpeedEstimator2(
    names=names,
    lane_regions=lane_regions
    current_time = current_time ,
    view_img=True,  # Display image flag   # Define the X-coordinate target for lane estimation
    csv_file="lane_log.csv"  # Define the CSV file for logging
)
# Main loop for processing each video frame


Please input the number of lanes:
No lanes inputted. Using default lane regions.
Default Lane Regions: [((509, 2), (622, 832), (988, 832), (731, 3)), ((733, 4), (987, 832), (1342, 833), (969, 5)), ((969, 5), (1344, 832), (1528, 832), (1233, 2))]
Lane Centers: [(712, 417), (1007, 418), (1268, 417)]
Captured Lane Regions: [((509, 2), (622, 832), (988, 832), (731, 3)), ((733, 4), (987, 832), (1342, 833), (969, 5)), ((969, 5), (1344, 832), (1528, 832), (1233, 2))] lane Centers: [(712, 417), (1007, 418), (1268, 417)]


TypeError: SpeedEstimator2.__init__() missing 1 required positional argument: 'lane_regions'

# FEATURES

In [None]:
while cap.isOpened():
    success, im0 = cap.read()  # Read the next frame
    if not success:
        print("Video processing completed or frame not available.")
        break

    # Process every (skip_frames + 1)-th frame
    if frame_counter % (skip_frames + 1) == 0:
        # Perform object detection and tracking using the YOLO model
        tracks = model.track(im0, persist=True)
        
        # Increment the current time based on the number of frames skipped
        current_time = increment_time(current_time, skip_frames + 1, fps)
        
        #CAR PSM CODE THAT RESULTS IN EXCEL FILES
        # Pass the tracks and the frame to SpeedEstimator2's check_and_log_lanes function
        #im0 = lane_estimator.check_and_log_lanes(im0, tracks, current_time)
        
        #PEDESTRIAN PSM CODE
        #im0 = lane_estimator.process_frame(im0, tracks, current_time)
        # Display the processed frame with lane estimation
       
       #PEDESTRIAN FEATURES
       

       #Vehicle Features
        im0 = Car_Speed.Speed(im0 , tracks , current_time)
        #FLOW
       # = VehicleDetectionWithTracks(tracks, start_time="2025-02-27 08:00:00", frame_rate=30)

# Calculate the traffic flow based on the provided tracks
        #vehicles_per_hour, interval_vehicles, max_flow_interval, max_flow_count = vehicle_detector.calculate_traffic_flow()

# Output the results
    #print("Total vehicles per hour:")
    #for hour, count in vehicles_per_hour.items():
     #print(f"{hour.strftime('%Y-%m-%d %H:%M:%S')} - {count} vehicles")

        # Display the processed frame with lane estimation
    cv2.imshow("Live Output", im0)
    cv2.waitKey(1)
        # Write the processed frame to the output video
    video_writer.write(im0)

    # Increment frame counter
    frame_counter += 1

    # Break the loop if the 'q' key is pressed
    if cv2.waitKey(1) & 0xFF == ord('q'):
        print("Processing terminated by user.")
        break

# Release video capture and writer resources
cap.release()
video_writer.release()
cv2.destroyAllWindows()

print(f"Processing complete. Output saved to {output_video}")


# VEHICLE FEATURES


## PSM