In [2]:
import cv2
import csv
import numpy as np
import os
import pickle
from ultralytics import YOLO  
from sort import *   

In [3]:


# --- Load reference histogram for red light detection ---
def load_histogram(file_path):
    """Load a precomputed histogram from a pickle file."""
    with open(file_path, 'rb') as file:
        histogram = pickle.load(file)
    return histogram

def calculate_histogram(image, bbox):
    """Calculate and normalize the histogram for a region in the image."""
    height, width, _ = image.shape
    x_center, y_center, box_width, box_height = bbox

    x_min = int((x_center - box_width / 2) * width)
    x_max = int((x_center + box_width / 2) * width)
    y_min = int((y_center - box_height / 2) * height)
    y_max = int((y_center + box_height / 2) * height)

    roi = image[y_min:y_max, x_min:x_max]
    histogram = cv2.calcHist([roi], [0, 1, 2], None, [8, 8, 8], [0, 256, 0, 256, 0, 256])
    histogram = cv2.normalize(histogram, histogram).flatten()
    return histogram

def compare_histograms(hist1, hist2):
    """Compare two histograms using correlation; returns a similarity score."""
    return cv2.compareHist(hist1, hist2, cv2.HISTCMP_CORREL)

# Arrow drawing for highlighting violations
def draw_arrow(frame, x1, y1, x2, y2):
    """
    Draw a large, filled downward-pointing arrow above the bounding box (x1,y1,x2,y2).
    Used to indicate a violation.
    """
    center_x = (x1 + x2) // 2
    top_y = y1 - 150  # start the arrow 150px above the vehicle's top
    arrow_width = 50
    arrow_height = 100
    shaft_width = 20

    # Define triangle (arrowhead) points and rectangle (shaft) points
    arrow_tip = (center_x, y1)
    left_corner = (center_x - arrow_width, top_y + arrow_height)
    right_corner = (center_x + arrow_width, top_y + arrow_height)
    shaft_top_left = (center_x - shaft_width, top_y)
    shaft_top_right = (center_x + shaft_width, top_y)
    shaft_bottom_left = (center_x - shaft_width, top_y + arrow_height)
    shaft_bottom_right = (center_x + shaft_width, top_y + arrow_height)

    arrow_head = np.array([arrow_tip, left_corner, right_corner], np.int32)
    arrow_shaft = np.array([shaft_top_left, shaft_bottom_left, shaft_bottom_right, shaft_top_right], np.int32)
    color = (0, 0, 255)  # red color for the arrow
    cv2.fillPoly(frame, [arrow_head], color)
    cv2.fillPoly(frame, [arrow_shaft], color)

def is_within_path(cx, cy, path_bbox, frame_shape):
    """Check if a point (car center) lies within the specified path region (normalized coords)."""
    height, width, _ = frame_shape
    x_center, y_center, box_width, box_height = path_bbox
    # Convert normalized path_bbox to pixel region
    x_min = int((x_center - box_width / 2) * width)
    x_max = int((x_center + box_width / 2) * width)
    y_min = int((y_center - box_height / 2) * height)
    y_max = int((y_center + box_height / 2) * height)
    return (x_min <= cx <= x_max) and (y_min <= cy <= y_max)

def point_side(point, line_start, line_end):
    """
    Determine which side of a directed line segment the point lies on.
    Returns a positive value if 'point' is to the left of the line (line_start->line_end vector),
    negative if to the right, and 0 if on the line.
    """
    return ((point[0] - line_start[0]) * (line_end[1] - line_start[1]) - 
            (point[1] - line_start[1]) * (line_end[0] - line_start[0]))

# --- Initialize model, tracker, and reference data ---
model = YOLO('yolov8n.pt')  # YOLOv8 model (using the small 'n' variant for speed)
tracker = Sort()            # Initialize SORT tracker for object tracking
reference_histogram = load_histogram('./Histogram/modelTown_histogram.pkl')  # load the reference red-light histogram

# Define the region of the frame where the traffic light appears (normalized coordinates)
# These should be set based on calibration for the specific camera view:
signal_bbox = (0.485879, 0.577487, 0.016630, 0.051067)   # Example normalized [x_center, y_center, width, height] for traffic light ROI
# Define the region of interest on the road where vehicle detection is considered (path area)
path_bbox   = (0.901061, 0.730515, 0.197877, 0.199434)   # Example normalized [x_center, y_center, width, height] for road section

# Define line positions for lower (stop line) and upper (violation line) in pixel coordinates
# These should be determined from the perspective of the camera for the specific intersection.

upper_line_start = (1389, 699)
upper_line_end = (1859, 666)
lower_line_start = (1912, 842)
lower_line_end = (1332, 1061)


threshold = 0.12  # similarity threshold above which the signal is considered "Red"

# --- Set up video capture from the mobile camera IP stream ---
ip_camera_url = "http://192.168.10.28:8080/video"  # replace with your phone's IP Webcam URL
cap = cv2.VideoCapture(ip_camera_url)
if not cap.isOpened():
    print("Error: Unable to access the IP camera stream.")
    exit()

# Prepare video writer to save output video
fps = cap.get(cv2.CAP_PROP_FPS)
if fps is None or fps <= 1:
    fps = 20  # default to 20 FPS if FPS not reported by the stream
frame_width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
os.makedirs('./result', exist_ok=True) 
output_path = "./result/violation_output.mp4"
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

track_states = {}  # Dictionary to hold tracking state (entered/violated) for each vehicle ID

# --- Main loop: read frames and process ---
while True:
    ret, frame = cap.read()
    if not ret:
        print("Stream ended or cannot read frame.")
        break

    # 1. Draw the lane boundary lines on the frame
    cv2.line(frame, upper_line_start, upper_line_end, (0, 255, 255), 2)  # upper line in yellow
    cv2.line(frame, lower_line_start, lower_line_end, (255, 0, 0), 2)    # lower line in blue

    # 2. Determine traffic light state using histogram comparison
    frame_hist = calculate_histogram(frame, signal_bbox)
    score = compare_histograms(frame_hist, reference_histogram)
    # Draw the signal ROI box and the current similarity score
    h, w, _ = frame.shape
    sx, sy, sw, sh = signal_bbox
    sx_min = int((sx - sw/2) * w); sx_max = int((sx + sw/2) * w)
    sy_min = int((sy - sh/2) * h); sy_max = int((sy + sh/2) * h)
    cv2.rectangle(frame, (sx_min, sy_min), (sx_max, sy_max), (255, 255, 255), 2)
    cv2.putText(frame, f"Score: {score:.2f}", (sx_min, sy_max + 20),
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

    # (Optional) Draw the detection zone (path) bounding box on the frame for visualization
    px, py, pw, ph = path_bbox
    px_min = int((px - pw/2) * w); px_max = int((px + pw/2) * w)
    py_min = int((py - ph/2) * h); py_max = int((py + ph/2) * h)
    cv2.rectangle(frame, (px_min, py_min), (px_max, py_max), (0, 255, 0), 2)

    # 3. If the signal is Red (histogram similarity above threshold), perform vehicle detection
    if score > threshold:
        results = model(frame)  # run YOLOv8 on the frame
        detections = []
        # Filter detections for cars (class id 2 for COCO, which is "car")
        for det in results[0].boxes.data.cpu().numpy():
            x1, y1, x2, y2, conf, cls_id = det[:6]
            if int(cls_id) == 2:  # class 2 = car
                detections.append([x1, y1, x2, y2, conf])
        detections = np.array(detections) if len(detections) > 0 else np.empty((0, 5))
        # Update tracker with current detections
        tracked_objects = tracker.update(detections)

        # 4. Process tracked vehicles for violation logic
        for x1, y1, x2, y2, track_id in tracked_objects:
            track_id = int(track_id)
            # Compute the center of the bounding box
            center_x = int((x1 + x2) / 2)
            center_y = int((y1 + y2) / 2)

            # Only consider vehicles within the defined path/road area
            if not is_within_path(center_x, center_y, path_bbox, frame.shape):
                continue

            # Update tracking state: check if vehicle has entered and then crossed the lines
            if track_id not in track_states:
                # If the vehicle has passed the **lower (stop) line**, mark it as "entered"
                if point_side((center_x, center_y), lower_line_start, lower_line_end) > 0:
                    track_states[track_id] = {"entered": True, "violated": False}
            else:
                # If already entered and now passes the **upper line**, mark as a violation
                if (track_states[track_id]["entered"] and not track_states[track_id]["violated"] and 
                        point_side((center_x, center_y), upper_line_start, upper_line_end) > 0):
                    track_states[track_id]["violated"] = True

            # Determine if this vehicle is in violation
            violated = track_states.get(track_id, {}).get("violated", False)
            # Draw the vehicle's bounding box (red if violated, green if not yet violated)
            color = (0, 0, 255) if violated else (0, 255, 0)
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), color, 2)
            # If violation occurred, draw the downward arrow indicator
            if violated:
                draw_arrow(frame, int(x1), int(y1), int(x2), int(y2))
                # (Optional: could log or save violation event details here)

        # Write this frame to the output video (only recording during red signal periods)
        video_writer.write(frame)
    else:
        # If the light is not red, we do not perform detection or tracking (for efficiency),
        # and we skip writing the frame to the output video.
        pass

    # 5. Display the frame with annotations in a window
    cv2.imshow("Red Light Violation Detection", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Clean up resources
cap.release()
video_writer.release()
cv2.destroyAllWindows()

[mjpeg @ 0xf872280] overread 8
[mjpeg @ 0xf872280] overread 8



0: 384x640 7 persons, 8 cars, 4 motorcycles, 241.6ms
Speed: 3.3ms preprocess, 241.6ms inference, 7.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 persons, 8 cars, 4 motorcycles, 1 truck, 1 traffic light, 192.0ms
Speed: 5.1ms preprocess, 192.0ms inference, 2.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 5 persons, 7 cars, 2 motorcycles, 1 truck, 173.5ms
Speed: 4.1ms preprocess, 173.5ms inference, 5.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 5 persons, 4 cars, 3 motorcycles, 1 truck, 186.6ms
Speed: 6.0ms preprocess, 186.6ms inference, 3.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 8 cars, 2 motorcycles, 211.5ms
Speed: 4.5ms preprocess, 211.5ms inference, 3.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 persons, 3 cars, 3 motorcycles, 1 truck, 168.6ms
Speed: 7.1ms preprocess, 168.6ms inference, 2.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 7 persons, 4 cars, 3 mo