In [2]:
import numpy as np
import tensorflow as tf
from collections import deque
from traffic_count_detection import get_vehicle_counts
from emergency_detection import detect_emergency
from accident_detection import detect_accident

# Load the trained RL model
model = tf.keras.models.load_model("../models/rl_trained_model.keras")

# Phase encoding with four traffic phases
PHASE_ENCODING = {
    0: [1, 0, 0, 0],  # North-South Straight
    1: [0, 1, 0, 0],  # East-West Straight
    2: [0, 0, 1, 0],  # North-South Left Turn
    3: [0, 0, 0, 1],  # East-West Left Turn
}

def get_state(in_counts, out_counts, emergency, accident, queue_length, current_phase, phase_history, neighbor_state):
    in_counts = [count / 10.0 for count in in_counts]
    out_counts = [count / 10.0 for count in out_counts]
    neighbor_state = [count / 10.0 for count in neighbor_state]

    state = []
    state.extend(in_counts)
    state.extend(out_counts)
    state.extend([emergency, accident, queue_length])
    state.extend(PHASE_ENCODING[current_phase])
    state.extend(np.array(phase_history).flatten())  # Flatten phase history
    state.extend(neighbor_state)
    return np.array(state, dtype=np.float32)

def act(state, model):
    state = state.reshape(1, -1)
    q_values = model.predict(state, verbose=0)
    return np.argmax(q_values[0])

def get_inputs():
    video_paths = [
        '../videos/count/count_2.mp4',
        '../videos/count/count_4.mp4',
        '../videos/count/count_6.mp4',
        '../videos/count/count_7.mp4'
    ]

    in_counts = []
    out_counts = []
    emergency_detected = False
    accident_detected = False

    for path in video_paths:
        in_count, out_count = get_vehicle_counts(path, duration=20)
        in_counts.append(in_count)
        out_counts.append(out_count)

        if detect_emergency(path):
            emergency_detected = True

        if detect_accident(path):
            accident_detected = True

    emergency = 1 if emergency_detected else 0
    accident = 1 if accident_detected else 0
    queue_length = sum(in_counts) - sum(out_counts)
    neighbor_state = [0] * 8  # Placeholder for neighbor data

    return in_counts, out_counts, emergency, accident, queue_length, neighbor_state

def change_phase(phase):
    phases = [
        "North-South Straight Traffic Green",
        "East-West Straight Traffic Green",
        "North-South Left Turn Green",
        "East-West Left Turn Green"
    ]
    print(f"Changing to: {phases[phase]}")

def main_inference():
    phase_history = deque([[0, 0, 0, 0] for _ in range(3)], maxlen=3)
    current_phase = 0

    # Process input data once
    in_counts, out_counts, emergency, accident, queue_length, neighbor_state = get_inputs()

    # Prepare the state for RL model
    state = get_state(
        in_counts, out_counts, emergency, accident, queue_length,
        current_phase, phase_history, neighbor_state
    )

    # Predict the next phase
    next_phase = act(state, model)
    change_phase(next_phase)

if __name__ == "__main__":
    main_inference()



0: 320x640 4 persons, 8 cars, 4 motorcycles, 1 traffic light, 58.2ms
Speed: 10.5ms preprocess, 58.2ms inference, 99.6ms postprocess per image at shape (1, 3, 320, 640)

0: 320x640 4 persons, 8 cars, 4 motorcycles, 1 traffic light, 16.7ms
Speed: 2.2ms preprocess, 16.7ms inference, 2.9ms postprocess per image at shape (1, 3, 320, 640)

0: 320x640 3 persons, 8 cars, 4 motorcycles, 1 traffic light, 20.9ms
Speed: 0.0ms preprocess, 20.9ms inference, 0.0ms postprocess per image at shape (1, 3, 320, 640)

0: 320x640 4 persons, 8 cars, 4 motorcycles, 1 traffic light, 14.7ms
Speed: 0.0ms preprocess, 14.7ms inference, 2.3ms postprocess per image at shape (1, 3, 320, 640)

0: 320x640 4 persons, 8 cars, 4 motorcycles, 1 traffic light, 5.9ms
Speed: 4.4ms preprocess, 5.9ms inference, 0.0ms postprocess per image at shape (1, 3, 320, 640)

0: 320x640 3 persons, 7 cars, 4 motorcycles, 1 traffic light, 18.4ms
Speed: 0.0ms preprocess, 18.4ms inference, 0.0ms postprocess per image at shape (1, 3, 320, 640

In [None]:
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import tensorflow as tf
from collections import deque
from traffic_count_detection import get_vehicle_counts
from emergency_detection import detect_emergency
from accident_detection import detect_accident

model = tf.keras.models.load_model("../models/rl_trained_model.keras")

PHASE_ENCODING = {0: [1, 0, 0, 0], 1: [0, 1, 0, 0], 2: [0, 0, 1, 0], 3: [0, 0, 0, 1]}

def get_state(in_counts, out_counts, emergency, accident, queue_length, current_phase, phase_history, neighbor_state):
    in_counts = [count / 10.0 for count in in_counts]
    out_counts = [count / 10.0 for count in out_counts]
    neighbor_state = [count / 10.0 for count in neighbor_state]

    state = []
    state.extend(in_counts)
    state.extend(out_counts)
    state.extend([emergency, accident, queue_length])
    state.extend(PHASE_ENCODING[current_phase])
    state.extend(np.array(phase_history).flatten())
    state.extend(neighbor_state)
    return np.array(state, dtype=np.float32)

def act(state, model):
    state = state.reshape(1, -1)
    q_values = model.predict(state, verbose=0)
    return np.argmax(q_values[0])

def process_models(video_path):
    with ThreadPoolExecutor(max_workers=3) as executor:
        vehicle_future = executor.submit(get_vehicle_counts, video_path, 20)
        emergency_future = executor.submit(detect_emergency, video_path)
        accident_future = executor.submit(detect_accident, video_path)

        in_count, out_count = vehicle_future.result()
        emergency = emergency_future.result()
        accident = accident_future.result()

    return in_count, out_count, int(emergency), int(accident)

def get_inputs():
    video_paths = [
        '../videos/count/count_2.mp4',
        '../videos/count/count_4.mp4',
        '../videos/count/count_6.mp4',
        '../videos/count/count_7.mp4'
    ]

    results = []
    for path in video_paths:
        results.append(process_models(path))

    in_counts, out_counts, emergency, accident = [], [], False, False

    for in_count, out_count, emergency_flag, accident_flag in results:
        in_counts.append(in_count)
        out_counts.append(out_count)
        emergency |= emergency_flag
        accident |= accident_flag

    queue_length = max(0, sum(in_counts) - sum(out_counts))
    neighbor_state = [0] * 8
    return in_counts, out_counts, emergency, accident, queue_length, neighbor_state

def change_phase(phase):
    phases = [
        "North-South Straight Traffic Green",
        "East-West Straight Traffic Green",
        "North-South Left Turn Green",
        "East-West Left Turn Green"
    ]
    print(f"Changing to: {phases[phase]}")

def main_inference():
    phase_history = deque([[0, 0, 0, 0] for _ in range(3)], maxlen=3)
    current_phase = 0

    in_counts, out_counts, emergency, accident, queue_length, neighbor_state = get_inputs()

    state = get_state(
        in_counts, out_counts, emergency, accident, queue_length, current_phase, phase_history, neighbor_state
    )

    next_phase = act(state, model)
    change_phase(next_phase)

if __name__ == "__main__":
    main_inference()




0: 384x640 1 noaccident, 80.4ms
Speed: 0.0ms preprocess, 80.4ms inference, 100.9ms postprocess per image at shape (1, 3, 384, 640)

0: 320x640 4 persons, 8 cars, 4 motorcycles, 1 traffic light, 204.4ms
Speed: 10.0ms preprocess, 204.4ms inference, 8.0ms postprocess per image at shape (1, 3, 320, 640)
0: 384x640 1 ER, 212.4ms
Speed: 10.0ms preprocess, 212.4ms inference, 4.3ms postprocess per image at shape (1, 3, 384, 640)


0: 384x640 1 noaccident, 47.2ms
Speed: 0.0ms preprocess, 47.2ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 ER, 20.4ms
Speed: 8.5ms preprocess, 20.4ms inference, 14.2ms postprocess per image at shape (1, 3, 384, 640)

0: 320x640 4 persons, 8 cars, 4 motorcycles, 1 traffic light, 33.0ms
Speed: 1.6ms preprocess, 33.0ms inference, 9.0ms postprocess per image at shape (1, 3, 320, 640)
0: 384x640 1 noaccident, 29.0ms
0: 384x640 1 ER, 20.5ms

Speed: 5.5ms preprocess, 29.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 64

In [1]:
import numpy as np
import tensorflow as tf
from collections import deque
from traffic_count_detection import get_vehicle_counts_at_second
from emergency_detection import detect_emergency_at_second
from accident_detection import detect_accident_at_second

# Load the trained RL model
model = tf.keras.models.load_model("../models/rl_trained_model.keras")

# Phase encoding with four traffic phases
PHASE_ENCODING = {
    0: [1, 0, 0, 0],  # North-South Straight
    1: [0, 1, 0, 0],  # East-West Straight
    2: [0, 0, 1, 0],  # North-South Left Turn
    3: [0, 0, 0, 1],  # East-West Left Turn
}

def get_state(in_counts, out_counts, emergency, accident, queue_length, current_phase, phase_history, neighbor_state):
    in_counts = [count / 10.0 for count in in_counts]
    out_counts = [count / 10.0 for count in out_counts]
    neighbor_state = [count / 10.0 for count in neighbor_state]

    state = []
    state.extend(in_counts)
    state.extend(out_counts)
    state.extend([emergency, accident, queue_length])
    state.extend(PHASE_ENCODING[current_phase])
    state.extend(np.array(phase_history).flatten())  # Flatten phase history
    state.extend(neighbor_state)
    return np.array(state, dtype=np.float32)

def act(state, model):
    state = state.reshape(1, -1)
    q_values = model.predict(state, verbose=0)
    return np.argmax(q_values[0])

def get_inputs():
    # Define video paths
    video_paths = [
        '../videos/count/count_2.mp4',
        '../videos/count/count_4.mp4',
        '../videos/count/count_6.mp4',
        '../videos/count/count_7.mp4'
    ]

    # Initialize variables
    in_counts = []
    out_counts = []
    emergency_detected = False
    accident_detected = False

    # Define the target second for detection
    target_second = 10  # Specify the second for emergency and accident detection

    for path in video_paths:
        # Get vehicle counts
        in_count, out_count = get_vehicle_counts_at_second(path, target_second)
        in_counts.append(in_count)
        out_counts.append(out_count)

        # Check for emergency vehicles at the specified second
        if detect_emergency_at_second(path, target_second):
            emergency_detected = True

        # Check for accidents at the specified second
        if detect_accident_at_second(path, target_second):
            accident_detected = True

    # Set flags based on detection results
    emergency = 1 if emergency_detected else 0
    accident = 1 if accident_detected else 0

    # Calculate queue length
    queue_length = max(0, sum(in_counts) - sum(out_counts))  # Ensure non-negative value

    # Placeholder neighbor state
    neighbor_state = [0] * 8  # Modify as needed for neighbor data

    return in_counts, out_counts, emergency, accident, queue_length, neighbor_state

def change_phase(phase):
    phases = [
        "North-South Straight Traffic Green",
        "East-West Straight Traffic Green",
        "North-South Left Turn Green",
        "East-West Left Turn Green"
    ]
    print(f"Changing to: {phases[phase]}")

def main_inference():
    # Track last 3 phases
    phase_history = deque([[0, 0, 0, 0] for _ in range(3)], maxlen=3)
    current_phase = 0  # Initialize with phase 0

    # Get input data
    in_counts, out_counts, emergency, accident, queue_length, neighbor_state = get_inputs()

    # Prepare the state for RL model
    state = get_state(
        in_counts, out_counts, emergency, accident, queue_length,
        current_phase, phase_history, neighbor_state
    )

    # Predict the next phase using the RL model
    next_phase = act(state, model)
    change_phase(next_phase)

if __name__ == "__main__":
    main_inference()



0: 320x640 2 persons, 6 cars, 91.9ms
Speed: 0.0ms preprocess, 91.9ms inference, 87.3ms postprocess per image at shape (1, 3, 320, 640)

0: 384x640 (no detections), 60.2ms
Speed: 0.0ms preprocess, 60.2ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 noaccident, 46.2ms
Speed: 0.0ms preprocess, 46.2ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 320x640 10 cars, 1 train, 1 truck, 35.8ms
Speed: 0.0ms preprocess, 35.8ms inference, 8.0ms postprocess per image at shape (1, 3, 320, 640)

0: 384x640 1 ER, 36.0ms
Speed: 10.0ms preprocess, 36.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 noaccident, 33.6ms
Speed: 0.0ms preprocess, 33.6ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 320x640 1 car, 81.2ms
Speed: 15.6ms preprocess, 81.2ms inference, 0.0ms postprocess per image at shape (1, 3, 320, 640)

0: 384x640 1 ER, 97.1ms
Speed: 0.0ms preprocess, 97.1ms inference, 3.8ms postpro

In [15]:
import time

# Traffic Signal Optimization Function
def optimize_signals(in_count, out_count, emergency, accident, neighbor_states):
    """
    Optimize traffic signals for an intersection based on traffic conditions.

    Args:
        in_count (list): Traffic approaching each signal [South, North, East, West].
        out_count (list): Traffic exiting each signal [South, North, East, West].
        emergency (str): Direction of an emergency vehicle (South, North, East, West, or None).
        accident (str): Direction of an accident (South, North, East, West, or None).
        neighbor_states (list): Outbound traffic from neighboring intersections [South, North, East, West].

    Returns:
        dict: Signal states {"green": <direction>, "red": [<directions>]}.
    """
    # Define the directions and initialize priorities
    phases = ["South", "North", "East", "West"]
    priorities = []

    # Step 1: Emergency Handling
    if emergency in phases:
        return {"green": emergency, "red": [d for d in phases if d != emergency]}

    # Step 2: Calculate Priorities
    for i, phase in enumerate(phases):
        if phase == accident:
            priorities.append(-1)  # Exclude accident direction
        else:
            # Priority is based on in_count + weighted neighbor state
            priorities.append(in_count[i] + 0.5 * neighbor_states[i])

    # Step 3: Select the Phase with Highest Priority
    max_priority = max(priorities)
    if max_priority == -1:
        return None  # No valid phases available due to accidents
    selected_phase = phases[priorities.index(max_priority)]

    # Step 4: Set Signal States
    green = selected_phase
    red = [d for d in phases if d != green]
    return {"green": green, "red": red}


# Traffic Signal Control Simulation
def traffic_signal_control():
    """
    Simulate traffic signal control for an intersection with dynamic conditions.
    """
    # Example inputs
    in_count = [10, 15, 20, 5]  # Traffic approaching [South, North, East, West]
    out_count = [5, 10, 8, 12]  # Traffic exiting [South, North, East, West]
    neighbor_states = [7, 9, 5, 6]  # Outbound traffic from neighbors [South, North, East, West]

    # Dynamic conditions (can change during runtime)
    emergency = "None"  # Set to None or "South", "North", "East", "West"
    accident = "None"  # Set to None or "South", "North", "East", "West"

        # Optimize signals based on the current state
    signal_states = optimize_signals(in_count, out_count, emergency, accident, neighbor_states)

    if signal_states is None:
        print("No valid phases available. All signals remain red.")

    # Display the signal states
    print(f"Green Signal: {signal_states['green']}")
    print(f"Red Signals: {', '.join(signal_states['red'])}")
    print("-" * 50)


# Main Function
if __name__ == "__main__":
    traffic_signal_control()


Green Signal: East
Red Signals: South, North, West
--------------------------------------------------


In [None]:
import time
from collections import deque
import numpy as np
from traffic_count_detection import get_vehicle_counts_at_second
from emergency_detection import detect_emergency_at_second
from accident_detection import detect_accident_at_second


# Traffic Signal Optimization Function
def optimize_signals(in_count, out_count, emergency, accident, neighbor_states):
    phases = ["South", "North", "East", "West"]
    priorities = []

    # Step 1: Emergency Handling
    if emergency in phases:
        return {"green": emergency, "red": [d for d in phases if d != emergency]}

    # Step 2: Calculate Priorities
    for i, phase in enumerate(phases):
        if phase == accident:
            priorities.append(-1)  # Exclude accident direction
        else:
            priorities.append(in_count[i] + 0.5 * neighbor_states[i])

    # Step 3: Select the Phase with Highest Priority
    max_priority = max(priorities)
    if max_priority == -1:
        return None  # No valid phases available due to accidents
    selected_phase = phases[priorities.index(max_priority)]

    # Step 4: Set Signal States
    green = selected_phase
    red = [d for d in phases if d != green]
    return {"green": green, "red": red}


# Function to Get Inputs for a Specific Target Second
def get_inputs(target_second):
    # Get in/out counts from four different video streams
    in_counts = []
    out_counts = []

    # # Define video paths
    # video_paths = [
    #     '../videos/count/count_2.mp4',
    #     '../videos/count/count_4.mp4',
    #     '../videos/count/count_6.mp4',
    #     '../videos/count/count_7.mp4'
    # ]

    video_paths = [
        '../videos/cut/lane_1.mp4',
        '../videos/cut/lane_2.mp4',
        '../videos/cut/lane_3.mp4',
        '../videos/cut/lane_4.mp4'
    ]

    # Initialize flags
    emergency_detected = False
    accident_detected = False

    for path in video_paths:
        # Get vehicle counts at the target second
        in_count, out_count = get_vehicle_counts_at_second(path, target_second)
        in_counts.append(in_count)
        out_counts.append(out_count)

        # Check for emergency vehicles at the specified second
        if detect_emergency_at_second(path, target_second):
            emergency_detected = True

        # Check for accidents at the specified second
        if detect_accident_at_second(path, target_second):
            accident_detected = True

    # Set emergency and accident flags
    emergency = "None"
    accident = "None"

    # Placeholder neighbor states (this can be modified as needed)
    neighbor_states = [5, 7, 3, 4]

    return in_counts, out_counts, emergency, accident, neighbor_states


# Traffic Signal Control Simulation
def traffic_signal_control():
    # Track last 3 phases
    phase_history = deque([[0, 0, 0, 0] for _ in range(3)], maxlen=3)
    current_phase = 0  # Initialize with phase 0
    interval = 10  # Time interval in seconds to fetch data (10 seconds for this case)
    target_second = 10  # Initial second to analyze

    while True:
        # Get input data for the current target second
        in_counts, out_counts, emergency, accident, neighbor_state = get_inputs(target_second)

        # Optimize signals based on the current state
        signal_states = optimize_signals(in_counts, out_counts, emergency, accident, neighbor_state)

        if signal_states is None:
            print("No valid phases available. All signals remain red.")
        else:
            # Log and display the signal states
            print(f"Green Signal: {signal_states['green']}")
            print(f"Red Signals: {', '.join(signal_states['red'])}")
            print("-" * 50)

        # Update phase history and set the current phase
        phase_history.append([signal_states['green'] if signal_states else "None"] * 4)
        current_phase = signal_states['green'] if signal_states else current_phase

        # Update the target second for the next interval (move to the next 10th second)
        target_second += interval

        # Wait for the next interval
        time.sleep(interval)


# Main Function
if __name__ == "__main__":
    traffic_signal_control()


0: 320x640 9 cars, 1 train, 1 truck, 58.2ms
Speed: 0.0ms preprocess, 58.2ms inference, 70.0ms postprocess per image at shape (1, 3, 320, 640)

0: 384x640 1 ER, 64.9ms
Speed: 1.1ms preprocess, 64.9ms inference, 8.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 noaccident, 41.6ms
Speed: 0.0ms preprocess, 41.6ms inference, 9.0ms postprocess per image at shape (1, 3, 384, 640)

0: 320x640 6 cars, 1 train, 3 trucks, 18.2ms
Speed: 0.0ms preprocess, 18.2ms inference, 0.0ms postprocess per image at shape (1, 3, 320, 640)

0: 384x640 (no detections), 8.0ms
Speed: 0.0ms preprocess, 8.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 noaccident, 12.2ms
Speed: 0.0ms preprocess, 12.2ms inference, 11.8ms postprocess per image at shape (1, 3, 384, 640)

0: 320x640 11 cars, 2 trucks, 10.7ms
Speed: 2.4ms preprocess, 10.7ms inference, 1.5ms postprocess per image at shape (1, 3, 320, 640)

0: 384x640 (no detections), 10.0ms
Speed: 2.5ms preprocess, 10.0

In [3]:
from ultralytics import YOLO
import cv2

# Load the YOLOv8m model pre-trained on COCO (which includes the 'person' class)
model = YOLO("yolov8m.pt")

def detect_pedestrians(video_path, target_second, confidence_threshold=0.5, show_output=True):
    """
    Detect pedestrians in the given video at a specific second using YOLOv8m and visualize the output.

    Parameters:
        video_path (str): Path to the video file.
        target_second (int): The second at which to analyze the frame.
        confidence_threshold (float): Minimum confidence score to consider a detection valid.
        show_output (bool): Whether to display the detection output.

    Returns:
        bool: True if pedestrians are detected, False otherwise.
    """
    # Open the video file
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)  # Frames per second of the video
    frame_number = int(target_second * fps)  # Calculate the frame number to extract
    
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)  # Jump to the target frame
    ret, frame = cap.read()

    if not ret:
        print(f"Error: Unable to read the frame at second {target_second}")
        cap.release()
        return False  # Return False if frame cannot be read

    # Perform pedestrian detection
    results = model(frame)  # YOLOv8 inference
    detections = results[0].boxes.data.cpu().numpy()  # Extract detections (bounding boxes, confidence, class IDs)
    
    person_detected = False
    for detection in detections:
        x1, y1, x2, y2, confidence, class_id = detection
        if int(class_id) == 0 and confidence >= confidence_threshold:  # Class ID 0 is 'person'
            person_detected = True
            # Draw the bounding box
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
            label = f"Person: {confidence:.2f}"
            cv2.putText(frame, label, (int(x1), int(y1) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Display the frame with detections
    if show_output:
        cv2.imshow("Pedestrian Detection", frame)
        cv2.waitKey(0)  # Wait for a key press to close the window
        cv2.destroyAllWindows()

    cap.release()  # Release video resource
    return person_detected

In [4]:
# Example usage
video_path = "../videos/pedestrians/lane_1.mp4"
target_second = 10  # Analyze the video at the 10th second
pedestrian_present = detect_pedestrians(video_path, target_second)

if pedestrian_present:
    print("Pedestrians detected. Activate green signal.")
else:
    print("No pedestrians detected. Keep signal red.")


0: 384x640 24 persons, 2 handbags, 17.0ms
Speed: 0.0ms preprocess, 17.0ms inference, 0.0ms postprocess per image at shape (1, 3, 384, 640)
Pedestrians detected. Activate green signal.


In [11]:
import cv2
from ultralytics import YOLO
from tracker import Tracker

# Initialize YOLO model
model = YOLO('../models/yolov8m.pt')

# Function to process video and return pedestrian counts at a specific second
def get_pedestrian_counts_at_second(video_path, target_second=20):
    tracker = Tracker()
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Error: Could not open video at {video_path}")
        return 0, 0

    fps = int(cap.get(cv2.CAP_PROP_FPS))
    target_frame = target_second * fps
    in_count, out_count = 0, 0
    in_ids, out_ids = set(), set()

    cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)

    ret, frame = cap.read()
    if not ret:
        print(f"Failed to read the frame at second {target_second}. Returning counts as (0, 0).")
        cap.release()
        return 0, 0

    resized_width, resized_height = 1020, 500
    frame = cv2.resize(frame, (resized_width, resized_height))

    # Define the vertical counting lines for pedestrian crossing areas
    vertical_line_x_left = resized_width // 4  # Left side of the pedestrian crossing area
    vertical_line_x_right = 3 * resized_width // 4  # Right side of the pedestrian crossing area

    results = model.predict(frame)
    objects_rects = []

    for result in results:
        for box in result.boxes.data.cpu().numpy():
            x1, y1, x2, y2, conf, class_id = map(float, box[:6])
            # If detected class is pedestrian (class_id of pedestrian in YOLO is typically 0 or 1)
            if class_id == 0:  # Change based on the class ID for pedestrians in your model
                objects_rects.append([x1, y1, x2 - x1, y2 - y1])

    bbox_ids = tracker.update(objects_rects)

    for bbox in bbox_ids:
        x, y, w, h, obj_id = bbox
        cx, cy = x + w // 2, y + h // 2  # Calculate the center of the bounding box

        # Check if pedestrian is on the left or right side of the pedestrian crossing area
        if cx < vertical_line_x_left:
            if obj_id not in out_ids:  # Pedestrian crossing from left
                out_count += 1
                out_ids.add(obj_id)
        elif cx > vertical_line_x_right:
            if obj_id not in in_ids:  # Pedestrian crossing from right
                in_count += 1
                in_ids.add(obj_id)

        # Draw bounding box and vertical lines for visualization
        cv2.rectangle(frame, (int(x), int(y)), (int(x + w), int(y + h)), (0, 255, 0), 2)
        cv2.putText(frame, f"ID: {obj_id}", (int(x), int(y) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Draw vertical lines for left and right pedestrian crossing areas
    cv2.line(frame, (vertical_line_x_left, 0), (vertical_line_x_left, resized_height), (255, 0, 0), 2)
    cv2.line(frame, (vertical_line_x_right, 0), (vertical_line_x_right, resized_height), (255, 0, 0), 2)

    # Optionally, display the processed frame
    cv2.imshow("Pedestrian Detection at Target Second", frame)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
    cap.release()

    # Return counts
    return in_count, out_count

# Test with a video
video_path = "../videos/cut/lane_1.mp4"
in_count, out_count = get_pedestrian_counts_at_second(video_path, target_second=10)
print(f"Left count: {in_count}, Right count: {out_count}")



0: 320x640 9 cars, 1 truck, 25.4ms
Speed: 0.0ms preprocess, 25.4ms inference, 0.8ms postprocess per image at shape (1, 3, 320, 640)
Left count: 0, Right count: 0


In [17]:
import cv2
from ultralytics import YOLO
from tracker import Tracker

# Initialize YOLO model
model = YOLO('../models/yolov8m.pt')

# Function to process video and return pedestrian counts at a specific second
def get_pedestrian_counts_at_second(video_path, target_second=20):
    tracker = Tracker()
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Error: Could not open video at {video_path}")
        return 0, 0

    fps = int(cap.get(cv2.CAP_PROP_FPS))
    target_frame = target_second * fps
    in_count, out_count = 0, 0
    in_ids, out_ids = set(), set()

    cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)

    ret, frame = cap.read()
    if not ret:
        print(f"Failed to read the frame at second {target_second}. Returning counts as (0, 0).")
        cap.release()
        return 0, 0

    resized_width, resized_height = 1020, 500
    frame = cv2.resize(frame, (resized_width, resized_height))

    # Define the vertical counting lines for pedestrian crossing areas (closer to left and right)
    vertical_line_x_left = int(resized_width * 0.1)  # 20% from the left side of the frame
    vertical_line_x_right = int(resized_width * 0.9)  # 80% from the left side of the frame

    results = model.predict(frame)
    objects_rects = []

    for result in results:
        for box in result.boxes.data.cpu().numpy():
            x1, y1, x2, y2, conf, class_id = map(float, box[:6])
            # If detected class is pedestrian (class_id of pedestrian in YOLO is typically 0 or 1)
            if class_id == 0:  # Change based on the class ID for pedestrians in your model
                objects_rects.append([x1, y1, x2 - x1, y2 - y1])

    bbox_ids = tracker.update(objects_rects)

    for bbox in bbox_ids:
        x, y, w, h, obj_id = bbox
        cx, cy = x + w // 2, y + h // 2  # Calculate the center of the bounding box

        # Check if pedestrian is on the left or right side of the pedestrian crossing area
        if cx < vertical_line_x_left:
            if obj_id not in out_ids:  # Pedestrian crossing from left
                out_count += 1
                out_ids.add(obj_id)
        elif cx > vertical_line_x_right:
            if obj_id not in in_ids:  # Pedestrian crossing from right
                in_count += 1
                in_ids.add(obj_id)

        # Draw bounding box and vertical lines for visualization
        cv2.rectangle(frame, (int(x), int(y)), (int(x + w), int(y + h)), (0, 255, 0), 2)
        cv2.putText(frame, f"ID: {obj_id}", (int(x), int(y) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Draw vertical lines for left and right pedestrian crossing areas
    cv2.line(frame, (vertical_line_x_left, 0), (vertical_line_x_left, resized_height), (255, 0, 0), 2)
    cv2.line(frame, (vertical_line_x_right, 0), (vertical_line_x_right, resized_height), (255, 0, 0), 2)

    # Optionally, display the processed frame
    cv2.imshow("Pedestrian Detection at Target Second", frame)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
    cap.release()

    # Return counts
    return in_count, out_count

# Test with a video
video_path = "../videos/cut/lane_1.mp4"
in_count, out_count = get_pedestrian_counts_at_second(video_path, target_second=10)
print(f"Left count: {in_count}, Right count: {out_count}")



0: 320x640 9 cars, 1 truck, 12.4ms
Speed: 0.4ms preprocess, 12.4ms inference, 0.0ms postprocess per image at shape (1, 3, 320, 640)
Left count: 0, Right count: 0
