In [3]:
# Signal Detection (Colour detection, Two Phases)

import cv2
import csv
import numpy as np
from collections import deque

# ---------------- Config ----------------
VIDEO_IN = '/Users/cherrychoy/Library/CloudStorage/OneDrive-TheUniversityofSydney(Students)/Honours Thesis/Videos/3 Redfern Street and Pitt Street/RedfernPittPM-Part3.mp4'
VIDEO_OUT = "/Users/cherrychoy/Library/CloudStorage/OneDrive-TheUniversityofSydney(Students)/Honours Thesis/Videos/3 Redfern Street and Pitt Street/PMredfernpitt-Part3_signal_annotated.mp4"

CSV_OUTS = {
    "NS": "/Users/cherrychoy/Library/CloudStorage/OneDrive-TheUniversityofSydney(Students)/Honours Thesis/Videos/3 Redfern Street and Pitt Street/PMP1_signal_logPart3.csv",
    "EW": "/Users/cherrychoy/Library/CloudStorage/OneDrive-TheUniversityofSydney(Students)/Honours Thesis/Videos/3 Redfern Street and Pitt Street/PMP2_signal_logPart3.csv",
}

STABLE_FRAMES = 6       # Number of frames to confirm a state
ROI_FRAME_NUMBER = 7000  # Frame number to select ROI
FRAME_SKIP = 3

# ---------------- Helpers ----------------
def classify_signal_bgr(roi_bgr):
    """Return 'GREEN' or 'NON-GREEN' using robust HSV thresholding"""
    hsv = cv2.cvtColor(roi_bgr, cv2.COLOR_BGR2HSV)
    hsv[..., 2] = cv2.equalizeHist(hsv[..., 2])

    lower_green = np.array([35, 20, 20])
    upper_green = np.array([95, 255, 255])
    green_mask = cv2.inRange(hsv, lower_green, upper_green)

    lower_green_bright = np.array([35, 0, 200])
    upper_green_bright = np.array([95, 30, 255])
    green_mask_bright = cv2.inRange(hsv, lower_green_bright, upper_green_bright)

    green_mask = cv2.bitwise_or(green_mask, green_mask_bright)
    kernel = np.ones((3,3), np.uint8)
    green_mask = cv2.morphologyEx(green_mask, cv2.MORPH_OPEN, kernel)

    green_pixels = int(cv2.countNonZero(green_mask))
    total_pixels = roi_bgr.shape[0] * roi_bgr.shape[1]

    return "GREEN" if green_pixels > total_pixels * 0.1 else "NON-GREEN"

def format_timestamp(seconds):
    """Format seconds into H:MM:SS.mmm"""
    h = int(seconds // 3600)
    m = int((seconds % 3600) // 60)
    s = int(seconds % 60)
    ms = int((seconds - int(seconds)) * 1000)
    return f"{h}:{m:02d}:{s:02d}.{ms:03d}"

# ---------------- Main ----------------
def main():
    cap = cv2.VideoCapture(VIDEO_IN)
    if not cap.isOpened():
        raise RuntimeError(f"Cannot open video: {VIDEO_IN}")

    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    w   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h   = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(VIDEO_OUT, fourcc, fps, (w, h))

    # --- ROI selection ---
    cap.set(cv2.CAP_PROP_POS_FRAMES, ROI_FRAME_NUMBER)
    ret, frame_for_roi = cap.read()
    if not ret:
        raise RuntimeError(f"Could not read frame {ROI_FRAME_NUMBER} for ROI selection")

    rois = {}
    for key in ["NS", "EW"]:
        print(f"Select ROI for {key}")
        roi = cv2.selectROI(f"Select ROI {key}", frame_for_roi, fromCenter=False, showCrosshair=True)
        cv2.destroyWindow(f"Select ROI {key}")
        x, y, w_roi, h_roi = roi
        rois[key] = (x, y, x + w_roi, y + h_roi)

    # --- Initialize CSV buffers ---
    logs = {key: [] for key in CSV_OUTS.keys()}

    # --- State tracking ---
    states = {}
    for key in rois:
        states[key] = {
            "last": None,
            "count": 0,
            "coords": rois[key],
            "history": deque(maxlen=STABLE_FRAMES)
        }

    frame_idx = 0
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Skip frames: only process every Nth frame
            if frame_idx % FRAME_SKIP != 0:
                cap.grab()
                frame_idx += 1
                continue

            # ---- Signal detection per ROI ----
            for key in states:
                x1, y1, x2, y2 = states[key]["coords"]
                roi_frame = frame[y1:y2, x1:x2]
                current_state = classify_signal_bgr(roi_frame)

                states[key]["history"].append(current_state)
                majority_state = max(set(states[key]["history"]), key=states[key]["history"].count)

                if majority_state != states[key]["last"]:
                    states[key]["count"] += 1
                    if states[key]["count"] >= STABLE_FRAMES:
                        # Use true video timestamp
                        timestamp_s = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000
                        logs[key].append([timestamp_s, majority_state])
                        states[key]["last"] = majority_state
                        states[key]["count"] = 0
                else:
                    states[key]["count"] = 0

                # Draw ROI and state
                color_map = {"GREEN": (0,255,0), "NON-GREEN": (0,0,255)}
                color = color_map.get(majority_state, (255,255,255))
                cv2.rectangle(frame, (x1,y1), (x2,y2), color, 2)
                cv2.putText(frame, f"{key}: {majority_state}", (x1, y1-10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)

            # Timestamp overlay (true video time)
            timestamp_s = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000
            timestamp_text = f"Time: {format_timestamp(timestamp_s)}"
            cv2.putText(frame, timestamp_text, (10, 30),
                        cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2, cv2.LINE_AA)

            writer.write(frame)
            frame_idx += 1

            cv2.imshow("Traffic Signal Logger", frame)
            if cv2.waitKey(10) & 0xFF == ord("q"):
                break

    finally:
        cap.release()
        writer.release()
        cv2.destroyAllWindows()

        # Save CSVs
        for key, path in CSV_OUTS.items():
            with open(path, "w", newline="") as f:
                writer_csv = csv.writer(f)
                writer_csv.writerow(["timestamp_s", "signal_color"])
                writer_csv.writerows(logs[key])

        print(f"Done. CSVs: {', '.join(CSV_OUTS.values())} | Video: {VIDEO_OUT}")

if __name__ == "__main__":
    main()


Select ROI for NS
Select a ROI and then press SPACE or ENTER button!
Cancel the selection process by pressing c button!
Select ROI for EW
Select a ROI and then press SPACE or ENTER button!
Cancel the selection process by pressing c button!
Done. CSVs: /Users/cherrychoy/Library/CloudStorage/OneDrive-TheUniversityofSydney(Students)/Honours Thesis/Videos/3 Redfern Street and Pitt Street/PMP1_signal_logPart3.csv, /Users/cherrychoy/Library/CloudStorage/OneDrive-TheUniversityofSydney(Students)/Honours Thesis/Videos/3 Redfern Street and Pitt Street/PMP2_signal_logPart3.csv | Video: /Users/cherrychoy/Library/CloudStorage/OneDrive-TheUniversityofSydney(Students)/Honours Thesis/Videos/3 Redfern Street and Pitt Street/PMredfernpitt-Part3_signal_annotated.mp4


In [3]:
# VEH IN CONFLICT ZONES!!

import cv2
import csv
import numpy as np
from ultralytics import YOLO

# CONFIGURATIONS
VIDEO_IN = '/Users/cherrychoy/Library/CloudStorage/OneDrive-TheUniversityofSydney(Students)/Honours Thesis/Videos/3 Redfern Street and Pitt Street/RedfernpittPM-Part4.mp4'
VIDEO_PATH = VIDEO_IN
OUTPUT_VIDEO = "/Users/cherrychoy/Library/CloudStorage/OneDrive-TheUniversityofSydney(Students)/Honours Thesis/Videos/3 Redfern Street and Pitt Street/PMPART4RedfernPitt-vehicle-conflict.mp4"
OUTPUT_CSV = "/Users/cherrychoy/Library/CloudStorage/OneDrive-TheUniversityofSydney(Students)/Honours Thesis/Videos/3 Redfern Street and Pitt Street/PMPART4RedfernPitt-vehicle-conflict.csv"

FRAME_SKIP = 5
FRAME_LINE = 2000
MIN_MOVE_DIST = 10
CONFIDENCE_THRESHOLD = 0.3  

VEHICLE_CLASSES = {
    1: "bicycle",
    2: "car",
    3: "motorcycle",
    5: "bus",
    7: "truck"
}

# INITIALISE YOLO
model = YOLO("yolo11m.pt")

cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
    raise RuntimeError(f"Cannot open video: {VIDEO_PATH}")

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0

out = cv2.VideoWriter(
    OUTPUT_VIDEO,
    cv2.VideoWriter_fourcc(*'mp4v'),
    fps,
    (frame_width, frame_height)
)

# LINE SELECTIONS
LINE_NAMES = ["P1", "P2", "P3", "P4"]
LINE_POINTS = {}

def select_line(event, x, y, flags, param):
    frame_copy, line_name = param
    if event == cv2.EVENT_LBUTTONDOWN:
        if line_name not in LINE_POINTS:
            LINE_POINTS[line_name] = []
        LINE_POINTS[line_name].append((x, y))
        cv2.circle(frame_copy, (x, y), 5, (0, 0, 255), -1)
        cv2.imshow(f"Select {line_name} line", frame_copy)
        if len(LINE_POINTS[line_name]) == 2:
            cv2.destroyWindow(f"Select {line_name} line")

cap.set(cv2.CAP_PROP_POS_FRAMES, FRAME_LINE)
ret, frame_line = cap.read()
if not ret:
    raise RuntimeError(f"Cannot read frame {FRAME_LINE} for line selection")

for name in LINE_NAMES:
    fc = frame_line.copy()
    cv2.imshow(f"Select {name} line", fc)
    cv2.setMouseCallback(f"Select {name} line", select_line, param=(fc, name))
    while name not in LINE_POINTS or len(LINE_POINTS[name]) < 2:
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
cv2.destroyAllWindows()
print("Lines selected:", LINE_POINTS)

# ---------------- HELPERS ----------------
def hms_from_seconds(total_seconds_float: float) -> str:
    total = int(total_seconds_float)
    h = total // 3600
    m = (total % 3600) // 60
    s = total % 60
    return f"{h}:{m:02d}:{s:02d}"

def segments_intersect(p1, p2, q1, q2) -> bool:
    def ccw(a, b, c):
        return (c[1]-a[1]) * (b[0]-a[0]) > (b[1]-a[1]) * (c[0]-a[0])
    return ccw(p1, q1, q2) != ccw(p2, q1, q2) and ccw(p1, p2, q1) != ccw(p1, p2, q2)

def parse_hms(t_str):
    """Convert H:MM:SS string to total seconds."""
    h, m, s = map(int, t_str.split(":"))
    return h*3600 + m*60 + s

# ---------------- PROCESS VIDEO ----------------
vehicle_events = {}
prev_centers = {}
frame_idx = 0

# Phase first-crossing trackers
first_crossing_phase1 = {"P1": None, "P2": None}
first_crossing_phase2 = {"P3": None, "P4": None}
phase1_green_time = None
phase2_green_time = None

cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
try:
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_idx += 1
        if frame_idx % FRAME_SKIP != 0:
            continue

        t_seconds = frame_idx / fps
        t_str = hms_from_seconds(t_seconds)
        cv2.putText(frame, f"Time: {t_str}", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 2)

        for name in LINE_NAMES:
            p1, p2 = LINE_POINTS[name]
            cv2.line(frame, p1, p2, (0, 0, 255), 2)
            cv2.putText(frame, name, p1, cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)

        # YOLO with ByteTrack
        results = model.track(frame, conf=CONFIDENCE_THRESHOLD, iou=0.3,
                              classes=list(VEHICLE_CLASSES.keys()), persist=True)[0]

        det_boxes_for_class = []
        for box in results.boxes:
            cls = int(box.cls[0])
            if cls not in VEHICLE_CLASSES:
                continue
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
            score = float(box.conf[0])
            if score < CONFIDENCE_THRESHOLD:
                continue
            det_boxes_for_class.append(((x1, y1, x2, y2), cls))

        for box in results.boxes:
            if box.id is None:
                continue

            track_id = int(box.id[0])
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
            cx, cy = int((x1 + x2) / 2), int((y1 + y2) / 2)
            prev_point = prev_centers.get(track_id, (cx, cy))

            dx = cx - prev_point[0]
            dy = cy - prev_point[1]

            # Pick corner based on horizontal movement
            if dx > 0:
                curr_point = (int(x2), int(y2))  # bottom-right
            else:
                curr_point = (int(x1), int(y2))  # bottom-left

            # Store corner point for next frame
            prev_centers[track_id] = curr_point

            if np.hypot(cx - prev_point[0], cy - prev_point[1]) < MIN_MOVE_DIST:
                cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
                cv2.putText(frame, f"ID:{track_id}", (int(x1), max(15, int(y1) - 8)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
                continue

            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
            cv2.putText(frame, f"ID:{track_id}", (int(x1), max(15, int(y1) - 8)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

            # Nearest class
            det_cls = None
            best_d2 = 1e12
            for (bx1, by1, bx2, by2), c_id in det_boxes_for_class:
                dcx, dcy = (bx1 + bx2) / 2.0, (by1 + by2) / 2.0
                d2 = (dcx - cx) ** 2 + (dcy - cy) ** 2
                if d2 < best_d2:
                    best_d2, det_cls = d2, c_id
            veh_type = VEHICLE_CLASSES.get(det_cls, "vehicle")

            # Replace prev_point/curr_point with prev_point and curr_point #CONFIGURE TO INTERSECTION
            for name in LINE_NAMES:
                l1, l2 = LINE_POINTS[name]
                if segments_intersect(prev_point, curr_point, l1, l2):
                    if name in ["P1", "P2"]:
                        if dx > 0: direction = "East"
                        elif dx < 0: direction = "West"
                        else: direction = "Unknown"
                    elif name in ["P3", "P4"]:
                        if dx > 0: direction = "North"
                        elif dx < 0: direction = "South"
                        else: direction = "Unknown"
                    else:
                        direction = "Unknown"

                    if track_id not in vehicle_events:
                        vehicle_events[track_id] = []

                    if not any(e["line"] == name for e in vehicle_events[track_id]):
                        vehicle_events[track_id].append({
                            "time": t_str, "type": veh_type, "line": name, "direction": direction
                        })
                        print(f"[{t_str}] Track {track_id} ({veh_type}) crossed {name} → {direction}")

        out.write(frame)
        cv2.imshow("Tracking", frame)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            print("Quit pressed; saving progress...")
            break

finally:
    cap.release()
    out.release()
    cv2.destroyAllWindows()

    # ---------------- Save vehicle events ----------------
    flat = []
    for tid, entries in vehicle_events.items():
        for e in entries:
            flat.append((tid, e["time"], e["type"], e["line"], e["direction"]))

    flat.sort(key=lambda x: (x[3], x[1]))

    with open(OUTPUT_CSV, "w", newline="") as f:
        w = csv.writer(f)
        w.writerow(["VehicleID", "Time(H:MM:SS)", "Type", "Line", "Direction"])
        w.writerows(flat)

    print(f"Vehicle crossings saved to {OUTPUT_CSV}")
    print(f"Annotated video saved to {OUTPUT_VIDEO}")


Lines selected: {'P1': [(921, 375), (1042, 456)], 'P2': [(309, 407), (557, 373)], 'P3': [(255, 409), (885, 507)], 'P4': [(625, 372), (893, 378)]}

0: 384x640 5 cars, 363.3ms
Speed: 2.8ms preprocess, 363.3ms inference, 1.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 5 cars, 359.7ms
Speed: 3.0ms preprocess, 359.7ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 5 cars, 362.9ms
Speed: 1.7ms preprocess, 362.9ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 5 cars, 372.2ms
Speed: 1.9ms preprocess, 372.2ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 cars, 351.8ms
Speed: 1.5ms preprocess, 351.8ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 cars, 350.2ms
Speed: 1.5ms preprocess, 350.2ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 cars, 358.3ms
Speed: 1.6ms preprocess, 358.3ms inference, 0.9ms postprocess per imag