In [1]:
# CSC 760 - Deep Learning - Fall 2025 - SD School of Mines And Tech
#
# Professor:
#    Dr. Nirmalya Thakur
# TA:
#    PhD Student Faria Nishat Khan
#
# Project 2 (Submission Nov 7, 2025)
#
# Description: This script implements a computer vision system that:
# 1. Detects traffic signals and trains using YOLO
# 2. Classifies traffic signal colors (red, yellow, green)
# 3. Tracks train states through a state machine
# 4. Outputs timestamped state changes
#
# Team Members: Luke Videckis and Jose David Cortes


# **** Libraries
# TODO
# we should clean and remove the libraries that we are not using on this project
import sys, subprocess, importlib, os
import numpy as np
import cv2

# ----------------------------
# Lightweight dependency setup
# ----------------------------
def _ensure(pkg):
    name = pkg.split("==")[0].split(">=")[0].split("[")[0]
    try:
        importlib.import_module(name)
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg, "--quiet"])

# Public packages only (per Project2.pdf notes)
for p in ["ultralytics>=8.2.0", "numpy", "opencv-python"]:
    _ensure(p)

from ultralytics import YOLO

# ----------------------------
# Configuration parameters
# ----------------------------
class Cfg:
    # Model and detection
    yolo_weights = "yolov8n.pt"   # COCO model; includes 'traffic light' & 'train' classes
    yolo_conf = 0.30
    yolo_iou  = 0.50

    # Train-state logic
    min_train_area_px = 8000      # ignore tiny train boxes
    miss_patience = 6             # frames without train before "left_the_frame"
    just_entered_grace = 3        # frames allowed to settle after first appearance
    orb_nfeatures = 1000
    motion_speed_thresh = 1.0     # px/frame; median ORB displacement
    xor_dx_thresh = 1.0           # px; fallback horizontal shift

    # Signal color (BGR-domain, as required)
    red_min   = 140
    green_min = 140
    blue_max_for_yellow = 120
    rg_margin = 20

    # Output/visualization
    annotate_video = True
    out_video_path = "annotated_output.mp4"
    draw_every_n = 1

cfg = Cfg()

# ----------------------------
# YOLO detector wrapper
# ----------------------------
class Detector:
    def __init__(self, weights):
        self.model = YOLO(weights)
        names = self.model.model.names if hasattr(self.model, "model") else self.model.names
        self.names = names
        self.cls_traffic = None
        self.cls_train = None
        for k,v in names.items():
            n = str(v).lower()
            if n == "traffic light":
                self.cls_traffic = k
            if n == "train":
                self.cls_train = k
        if self.cls_traffic is None or self.cls_train is None:
            raise RuntimeError("Required classes not found in model: 'traffic light' and 'train'.")

    def infer(self, frame_bgr):
        res = self.model.predict(
            source=frame_bgr,
            conf=cfg.yolo_conf,
            iou=cfg.yolo_iou,
            classes=[self.cls_traffic, self.cls_train],
            verbose=False,
            device="cpu"
        )[0]
        tl_boxes, tr_boxes = [], []
        if res and res.boxes is not None:
            for b in res.boxes:
                cls_id = int(b.cls.item())
                box = b.xyxy.cpu().numpy().astype(int).ravel().tolist()
                conf = float(b.conf.item())
                if cls_id == self.cls_traffic:
                    tl_boxes.append((box, conf))
                elif cls_id == self.cls_train:
                    tr_boxes.append((box, conf))
        return tl_boxes, tr_boxes

# ----------------------------
# BGR-based signal color
# ----------------------------
def classify_signal_color_bgr(roi_bgr, prev_color=None):
    if roi_bgr is None or roi_bgr.size == 0:
        return prev_color or "unknown"
    B,G,R = cv2.split(roi_bgr)
    red_mask   = (R > cfg.red_min)   & (R > G + cfg.rg_margin) & (R > B + cfg.rg_margin)
    green_mask = (G > cfg.green_min) & (G > R + cfg.rg_margin) & (G > B + cfg.rg_margin)
    yellow_mask = (R > cfg.red_min) & (G > cfg.green_min) & (B < cfg.blue_max_for_yellow)

    def vote(mask, channel):
        cnt = float(mask.sum())
        return 0.0 if cnt <= 0 else float(channel[mask].mean()) * (cnt / mask.size)

    s_red    = vote(red_mask, R)
    s_green  = vote(green_mask, G)
    s_yellow = vote(yellow_mask, (R.astype(np.float32)+G.astype(np.float32))/2.0)

    scores = {"red": s_red, "green": s_green, "yellow": s_yellow}
    color = max(scores.items(), key=lambda kv: kv[1])[0]
    if scores[color] < 1.0:
        return prev_color or "unknown"
    return color

# ----------------------------
# ORB motion + XOR fallback
# ----------------------------
def orb_speed(prev_gray, curr_gray, roi):
    x1,y1,x2,y2 = roi
    h,w = prev_gray.shape
    x1,y1 = max(0,x1), max(0,y1)
    x2,y2 = min(w,x2), min(h,y2)
    if x2<=x1 or y2<=y1:
        return 0.0, 0
    g1 = prev_gray[y1:y2, x1:x2]
    g2 = curr_gray[y1:y2, x1:x2]
    if g1.size==0 or g2.size==0:
        return 0.0, 0

    diff = cv2.absdiff(g2, g1)
    _, mask = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
    orb = cv2.ORB_create(nfeatures=cfg.orb_nfeatures)
    k1,d1 = orb.detectAndCompute(g1, mask)
    k2,d2 = orb.detectAndCompute(g2, mask)
    if d1 is None or d2 is None or len(d1)==0 or len(d2)==0:
        return 0.0, 0
    bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
    matches = sorted(bf.match(d1, d2), key=lambda m:m.distance)[:80]
    if not matches:
        return 0.0, 0
    P1 = np.float32([k1[m.queryIdx].pt for m in matches])
    P2 = np.float32([k2[m.trainIdx].pt for m in matches])
    speed = float(np.median(np.linalg.norm(P2-P1, axis=1)))
    return speed, len(matches)

def xor_dx(prev_gray, curr_gray, roi):
    x1,y1,x2,y2 = roi
    h,w = prev_gray.shape
    x1,y1 = max(0,x1), max(0,y1)
    x2,y2 = min(w,x2), min(h,y2)
    if x2<=x1 or y2<=y1:
        return 0.0
    g1 = prev_gray[y1:y2, x1:x2]
    g2 = curr_gray[y1:y2, x1:x2]
    E1 = cv2.Canny(g1, 100, 200)
    E2 = cv2.Canny(g2, 100, 200)
    XOR = cv2.bitwise_xor(E1, E2)
    E1m = cv2.bitwise_and(E1, XOR)
    E2m = cv2.bitwise_and(E2, XOR)
    p1 = E1m.sum(axis=0).astype(np.float32)
    p2 = E2m.sum(axis=0).astype(np.float32)
    if np.all(p1==0) or np.all(p2==0):
        return 0.0
    c = np.correlate(p2 - p2.mean(), p1 - p1.mean(), mode='full')
    shift = int(np.argmax(c) - (len(p1) - 1))
    return float(shift)

# ----------------------------
# Train state machine
# ----------------------------
from dataclasses import dataclass

@dataclass
class TrainState:
    mode: str = "not_present"      # not_present | just_entered | moving | left_the_frame
    miss_count: int = 0
    just_counter: int = 0
    had_presence: bool = False
    last_roi: tuple = (0,0,0,0)

def pick_largest_valid_train(tr_boxes):
    best, area = None, 0
    for (x1,y1,x2,y2), conf in tr_boxes:
        w,h = max(0,x2-x1), max(0,y2-y1)
        a = w*h
        if a > area:
            best, area = (x1,y1,x2,y2), a
    if area < cfg.min_train_area_px:
        return None
    return best

def update_train_state(ts: TrainState, prev_gray, curr_gray, roi):
    present = roi is not None
    if not present:
        ts.miss_count += 1
        if ts.had_presence and ts.miss_count >= cfg.miss_patience and ts.mode != "left_the_frame":
            ts.mode = "left_the_frame"
            ts.had_presence = False
            ts.just_counter = 0
        elif not ts.had_presence:
            ts.mode = "not_present"
        return ts

    ts.miss_count = 0
    if not ts.had_presence:
        ts.mode = "just_entered"
        ts.had_presence = True
        ts.just_counter = 0
    else:
        speed, nm = orb_speed(prev_gray, curr_gray, roi)
        if speed < cfg.motion_speed_thresh:
            dx = xor_dx(prev_gray, curr_gray, roi)
            if abs(dx) >= cfg.xor_dx_thresh:
                speed = abs(dx)
        if ts.mode == "just_entered":
            ts.just_counter += 1
            if ts.just_counter >= cfg.just_entered_grace or speed >= cfg.motion_speed_thresh:
                ts.mode = "moving"
        else:
            if speed >= cfg.motion_speed_thresh:
                ts.mode = "moving"
    ts.last_roi = roi
    return ts

def format_event(t_s, signal_color, train_mode):
    return f"t = {t_s:.2f}s, signal = {signal_color}, train = {train_mode}"

# ----------------------------
# Main pipeline
# ----------------------------
def run_pipeline(video_path, write_video=cfg.annotate_video):
    det = Detector(cfg.yolo_weights)
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise SystemExit(f"Cannot open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    W  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 640)
    H  = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 360)

    writer = None
    if write_video:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        writer = cv2.VideoWriter(cfg.out_video_path, fourcc, fps, (W, H))

    ok, frame0 = cap.read()
    if not ok:
        raise SystemExit("Empty video.")
    gray_prev = cv2.cvtColor(frame0, cv2.COLOR_BGR2GRAY)

    # Process first frame at t=0
    tl_boxes, tr_boxes = det.infer(frame0)
    tl_roi = None
    if tl_boxes:
        (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
        x1,y1 = max(0,x1), max(0,y1)
        x2,y2 = min(W-1,x2), min(H-1,y2)
        if x2>x1 and y2>y1:
            tl_roi = frame0[y1:y2, x1:x2]
    last_signal = classify_signal_color_bgr(tl_roi, prev_color=None)
    ts = TrainState()
    train_roi = pick_largest_valid_train(tr_boxes)
    ts = update_train_state(ts, gray_prev, gray_prev, train_roi)  # prev==curr at t=0
    last_reported_signal = None
    last_reported_train  = None

    # Emit initial line
    line0 = format_event(0.0, last_signal, ts.mode)
    print(line0)
    last_reported_signal = last_signal
    last_reported_train  = ts.mode

    # Annotate first frame
    if writer is not None:
        draw = frame0.copy()
        if tl_boxes:
            (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
            cv2.rectangle(draw, (x1,y1), (x2,y2), (0,255,255), 2)
            cv2.putText(draw, f"signal:{last_signal}", (x1, max(20,y1-6)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
        if train_roi is not None:
            x1,y1,x2,y2 = train_roi
            cv2.rectangle(draw, (x1,y1), (x2,y2), (0,128,255), 3)
            cv2.putText(draw, f"train:{ts.mode}", (x1, min(H-6,y2+18)), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,128,255), 2)
        cv2.putText(draw, "t=0.00s", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
        writer.write(draw)

    # Iterate remaining frames
    i = 1
    while True:
        ok, frame = cap.read()
        if not ok:
            break
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        t_s = i / float(fps)

        tl_boxes, tr_boxes = det.infer(frame)
        tl_roi = None
        if tl_boxes:
            (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
            x1,y1 = max(0,x1), max(0,y1)
            x2,y2 = min(W-1,x2), min(H-1,y2)
            if x2>x1 and y2>y1:
                tl_roi = frame[y1:y2, x1:x2]
        sig = classify_signal_color_bgr(tl_roi, prev_color=last_signal)

        train_roi = pick_largest_valid_train(tr_boxes)
        ts = update_train_state(ts, gray_prev, gray, train_roi)

        changed = (sig != last_reported_signal) or (ts.mode != last_reported_train)
        if changed:
            print(format_event(t_s, sig, ts.mode))
            last_reported_signal = sig
            last_reported_train  = ts.mode

        # Annotate if requested
        if writer is not None:
            draw = frame.copy()
            if tl_boxes:
                (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
                cv2.rectangle(draw, (x1,y1), (x2,y2), (0,255,255), 2)
                cv2.putText(draw, f"signal:{sig}", (x1, max(20,y1-6)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
            if train_roi is not None:
                x1,y1,x2,y2 = train_roi
                cv2.rectangle(draw, (x1,y1), (x2,y2), (0,128,255), 3)
                cv2.putText(draw, f"train:{ts.mode}", (x1, min(H-6,y2+18)), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,128,255), 2)
            cv2.putText(draw, f"t={t_s:.2f}s", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
            writer.write(draw)

        gray_prev = gray
        last_signal = sig
        i += 1

    cap.release()
    if writer is not None:
        writer.release()

# ----------------------------
# Optional diagnostic: XOR shift with provided images
# ----------------------------
def test_xor_dx_with_images(imgA="real_car_left_1.jpg", imgB="real_car_left_2.jpg"):
    if not (os.path.exists(imgA) and os.path.exists(imgB)):
        print("Sample images not found; skipping.")
        return
    A = cv2.imread(imgA, cv2.IMREAD_GRAYSCALE)
    B = cv2.imread(imgB, cv2.IMREAD_GRAYSCALE)
    if A is None or B is None:
        print("Unable to read test images.")
        return
    dx = xor_dx(A, B, (0,0,A.shape[1],A.shape[0]))
    direction = "Right" if dx > 0.5 else ("Left" if dx < -0.5 else "None")
    print(f"XOR dx ≈ {dx:.2f} px, direction = {direction}")

# ----------------------------
# Entry point (set your path)
# ----------------------------
# Optional: handle Google Colab vs local
try:
    from google.colab import drive
    drive.mount('/content/drive')
    DRIVE_PATH = "/content/drive/MyDrive/Deep Learning/Projects/Project2"
except ImportError:
    print("Running outside Colab, skipping Google Drive mount")
    DRIVE_PATH = None


VIDEO_PATH = "/content/drive/MyDrive/videos/Input Video.mp4"
run_pipeline(VIDEO_PATH, write_video=cfg.annotate_video)
# To test XOR fallback with the provided context images, optionally run:
# test_xor_dx_with_images()
# ============================================================
# Notes:
# - The output lines are printed only when either the signal color changes
#   or the train state changes, with timestamps t computed from FPS, in the
#   exact textual format required by Project2.pdf [Sections 3 & 6].
# - Signal color is computed directly from BGR pixels in the YOLO traffic
#   light ROI as permitted; no colorspace conversion is used.
# - Train state transitions are determined by ORB feature motion within the
#   YOLO train ROI, with a moving-edges XOR correlation fallback for low-texture
#   frames, inspired by the provided class demos.
# ============================================================


Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
Mounted at /content/drive
[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2MB 104.0MB/s 0.1s
t = 0.00s, signal = red, train = not_present
t = 1.90s, signal = red, train = just_entered
t = 1.95s, signal = red, train = moving
t = 6.00s, signal = red, train = left_the_frame
t = 6.02s, signal = red, train = not_present


# Task
Modify the existing code to detect the direction of the train and include this information in the output and the annotated video.

## Modify the `update train state` function

### Subtask:
Enhance the train state machine to incorporate direction detection based on the change in the train's bounding box over time or using the motion information from ORB features or the XOR fallback.


**Reasoning**:
Modify the `TrainState` dataclass and the `update_train_state` function to include and update the train's direction based on calculated movement.



In [2]:
from dataclasses import dataclass

@dataclass
class TrainState:
    mode: str = "not_present"      # not_present | just_entered | moving | left_the_frame
    direction: str = "unknown"     # unknown | left | right | stopped
    miss_count: int = 0
    just_counter: int = 0
    had_presence: bool = False
    last_roi: tuple = (0,0,0,0)
    direction_history: list = None # For smoothing

    def __post_init__(self):
        if self.direction_history is None:
            self.direction_history = []

def update_train_state(ts: TrainState, prev_gray, curr_gray, roi):
    present = roi is not None
    current_direction = "unknown"

    if not present:
        ts.miss_count += 1
        if ts.had_presence and ts.miss_count >= cfg.miss_patience and ts.mode != "left_the_frame":
            ts.mode = "left_the_frame"
            ts.had_presence = False
            ts.just_counter = 0
            current_direction = "unknown"
        elif not ts.had_presence:
            ts.mode = "not_present"
            current_direction = "unknown"
    else:
        ts.miss_count = 0
        if not ts.had_presence:
            ts.mode = "just_entered"
            ts.had_presence = True
            ts.just_counter = 0
            current_direction = "unknown"
        else:
            speed, nm = orb_speed(prev_gray, curr_gray, roi)
            dx = xor_dx(prev_gray, curr_gray, roi)

            if speed >= cfg.motion_speed_thresh:
                if dx > 0.5: # Using a small threshold for significant movement
                    current_direction = "right"
                elif dx < -0.5:
                    current_direction = "left"
                else:
                    current_direction = "stopped"
            else:
                # Maintain direction for a few frames if in moving state and speed is low
                if ts.mode == "moving":
                    if len(ts.direction_history) > 0:
                        last_dir = ts.direction_history[-1]
                        if last_dir in ["left", "right"]:
                             current_direction = last_dir
                        else:
                            current_direction = "stopped"
                    else:
                        current_direction = "stopped" # Should not happen if in moving state
                else:
                    current_direction = "stopped"


            if ts.mode == "just_entered":
                ts.just_counter += 1
                if ts.just_counter >= cfg.just_entered_grace or speed >= cfg.motion_speed_thresh:
                    ts.mode = "moving"


    # Smoothing the direction
    ts.direction_history.append(current_direction)
    # Keep only the last few directions (e.g., 5 frames) for smoothing
    smoothing_window = 5
    if len(ts.direction_history) > smoothing_window:
        ts.direction_history = ts.direction_history[-smoothing_window:]

    # Determine the final direction based on history (simple majority vote or last consistent direction)
    # For simplicity, let's use the last consistent direction if available, otherwise the most frequent in history
    if len(set(ts.direction_history)) == 1:
         ts.direction = ts.direction_history[-1]
    elif len(ts.direction_history) > 0 and ts.direction_history[-1] != "unknown":
         ts.direction = ts.direction_history[-1] # Prioritize the most recent if not unknown
    else:
        # Simple majority vote
        from collections import Counter
        direction_counts = Counter(ts.direction_history)
        most_common_direction = direction_counts.most_common(1)
        if most_common_direction:
            ts.direction = most_common_direction[0][0]
        else:
            ts.direction = "unknown"


    ts.last_roi = roi
    return ts

def format_event(t_s, signal_color, train_mode, train_direction):
    return f"t = {t_s:.2f}s, signal = {signal_color}, train = {train_mode}, direction = {train_direction}"

# Modify the main pipeline to use the updated format_event and TrainState
def run_pipeline(video_path, write_video=cfg.annotate_video):
    det = Detector(cfg.yolo_weights)
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise SystemExit(f"Cannot open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    W  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 640)
    H  = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 360)

    writer = None
    if write_video:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        writer = cv2.VideoWriter(cfg.out_video_path, fourcc, fps, (W, H))

    ok, frame0 = cap.read()
    if not ok:
        raise SystemExit("Empty video.")
    gray_prev = cv2.cvtColor(frame0, cv2.COLOR_BGR2GRAY)

    # Process first frame at t=0
    tl_boxes, tr_boxes = det.infer(frame0)
    tl_roi = None
    if tl_boxes:
        (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
        x1,y1 = max(0,x1), max(0,y1)
        x2,y2 = min(W-1,x2), min(H-1,y2)
        if x2>x1 and y2>y1:
            tl_roi = frame0[y1:y2, x1:x2]
    last_signal = classify_signal_color_bgr(tl_roi, prev_color=None)
    ts = TrainState()
    train_roi = pick_largest_valid_train(tr_boxes)
    ts = update_train_state(ts, gray_prev, gray_prev, train_roi)  # prev==curr at t=0
    last_reported_signal = None
    last_reported_train_mode  = None
    last_reported_train_direction = None


    # Emit initial line
    line0 = format_event(0.0, last_signal, ts.mode, ts.direction)
    print(line0)
    last_reported_signal = last_signal
    last_reported_train_mode  = ts.mode
    last_reported_train_direction = ts.direction

    # Annotate first frame
    if writer is not None:
        draw = frame0.copy()
        if tl_boxes:
            (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
            cv2.rectangle(draw, (x1,y1), (x2,y2), (0,255,255), 2)
            cv2.putText(draw, f"signal:{last_signal}", (x1, max(20,y1-6)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
        if train_roi is not None:
            x1,y1,x2,y2 = train_roi
            cv2.rectangle(draw, (x1,y1), (x2,y2), (0,128,255), 3)
            cv2.putText(draw, f"train:{ts.mode} ({ts.direction})", (x1, min(H-6,y2+18)), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,128,255), 2)
        cv2.putText(draw, f"t=0.00s", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
        writer.write(draw)

    # Iterate remaining frames
    i = 1
    while True:
        ok, frame = cap.read()
        if not ok:
            break
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        t_s = i / float(fps)

        tl_boxes, tr_boxes = det.infer(frame)
        tl_roi = None
        if tl_boxes:
            (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
            x1,y1 = max(0,x1), max(0,y1)
            x2,y2 = min(W-1,x2), min(H-1,y2)
            if x2>x1 and y2>y1:
                tl_roi = frame[y1:y2, x1:x2]
        sig = classify_signal_color_bgr(tl_roi, prev_color=last_signal)

        train_roi = pick_largest_valid_train(tr_boxes)
        ts = update_train_state(ts, gray_prev, gray, train_roi)

        changed = (sig != last_reported_signal) or (ts.mode != last_reported_train_mode) or (ts.direction != last_reported_train_direction)
        if changed:
            print(format_event(t_s, sig, ts.mode, ts.direction))
            last_reported_signal = sig
            last_reported_train_mode  = ts.mode
            last_reported_train_direction = ts.direction

        # Annotate if requested
        if writer is not None:
            draw = frame.copy()
            if tl_boxes:
                (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
                cv2.rectangle(draw, (x1,y1), (x2,y2), (0,255,255), 2)
                cv2.putText(draw, f"signal:{sig}", (x1, max(20,y1-6)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
            if train_roi is not None:
                x1,y1,x2,y2 = train_roi
                cv2.rectangle(draw, (x1,y1), (x2,y2), (0,128,255), 3)
                cv2.putText(draw, f"train:{ts.mode} ({ts.direction})", (x1, min(H-6,y2+18)), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,128,255), 2)
            cv2.putText(draw, f"t={t_s:.2f}s", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
            writer.write(draw)

        gray_prev = gray
        last_signal = sig
        i += 1

    cap.release()
    if writer is not None:
        writer.release()


## Update the `trainstate` dataclass

### Subtask:
Add a field to store the detected train direction (e.g., "left", "right", "stopped", "unknown").


## Modify the output format

### Subtask:
Modify the printed output lines to include the detected train direction when the train state changes.


**Reasoning**:
Modify the print statement in the run_pipeline function to include the train direction, as per the instructions.



In [3]:
def run_pipeline(video_path, write_video=cfg.annotate_video):
    det = Detector(cfg.yolo_weights)
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise SystemExit(f"Cannot open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    W  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 640)
    H  = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 360)

    writer = None
    if write_video:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        writer = cv2.VideoWriter(cfg.out_video_path, fourcc, fps, (W, H))

    ok, frame0 = cap.read()
    if not ok:
        raise SystemExit("Empty video.")
    gray_prev = cv2.cvtColor(frame0, cv2.COLOR_BGR2GRAY)

    # Process first frame at t=0
    tl_boxes, tr_boxes = det.infer(frame0)
    tl_roi = None
    if tl_boxes:
        (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
        x1,y1 = max(0,x1), max(0,y1)
        x2,y2 = min(W-1,x2), min(H-1,y2)
        if x2>x1 and y2>y1:
            tl_roi = frame0[y1:y2, x1:x2]
    last_signal = classify_signal_color_bgr(tl_roi, prev_color=None)
    ts = TrainState()
    train_roi = pick_largest_valid_train(tr_boxes)
    ts = update_train_state(ts, gray_prev, gray_prev, train_roi)  # prev==curr at t=0
    last_reported_signal = None
    last_reported_train_mode  = None
    last_reported_train_direction = None

    # Emit initial line
    line0 = format_event(0.0, last_signal, ts.mode, ts.direction)
    print(line0)
    last_reported_signal = last_signal
    last_reported_train_mode  = ts.mode
    last_reported_train_direction = ts.direction

    # Annotate first frame
    if writer is not None:
        draw = frame0.copy()
        if tl_boxes:
            (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
            cv2.rectangle(draw, (x1,y1), (x2,y2), (0,255,255), 2)
            cv2.putText(draw, f"signal:{last_signal}", (x1, max(20,y1-6)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
        if train_roi is not None:
            x1,y1,x2,y2 = train_roi
            cv2.rectangle(draw, (x1,y1), (x2,y2), (0,128,255), 3)
            cv2.putText(draw, f"train:{ts.mode} ({ts.direction})", (x1, min(H-6,y2+18)), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,128,255), 2)
        cv2.putText(draw, f"t=0.00s", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
        writer.write(draw)

    # Iterate remaining frames
    i = 1
    while True:
        ok, frame = cap.read()
        if not ok:
            break
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        t_s = i / float(fps)

        tl_boxes, tr_boxes = det.infer(frame)
        tl_roi = None
        if tl_boxes:
            (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
            x1,y1 = max(0,x1), max(0,y1)
            x2,y2 = min(W-1,x2), min(H-1,y2)
            if x2>x1 and y2>y1:
                tl_roi = frame[y1:y2, x1:x2]
        sig = classify_signal_color_bgr(tl_roi, prev_color=last_signal)

        train_roi = pick_largest_valid_train(tr_boxes)
        ts = update_train_state(ts, gray_prev, gray, train_roi)

        changed = (sig != last_reported_signal) or (ts.mode != last_reported_train_mode) or (ts.direction != last_reported_train_direction)
        if changed:
            print(format_event(t_s, sig, ts.mode, ts.direction)) # Modified print statement
            last_reported_signal = sig
            last_reported_train_mode  = ts.mode
            last_reported_train_direction = ts.direction

        # Annotate if requested
        if writer is not None:
            draw = frame.copy()
            if tl_boxes:
                (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
                cv2.rectangle(draw, (x1,y1), (x2,y2), (0,255,255), 2)
                cv2.putText(draw, f"signal:{sig}", (x1, max(20,y1-6)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
            if train_roi is not None:
                x1,y1,x2,y2 = train_roi
                cv2.rectangle(draw, (x1,y1), (x2,y2), (0,128,255), 3)
                cv2.putText(draw, f"train:{ts.mode} ({ts.direction})", (x1, min(H-6,y2+18)), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,128,255), 2)
            cv2.putText(draw, f"t={t_s:.2f}s", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
            writer.write(draw)

        gray_prev = gray
        last_signal = sig
        i += 1

    cap.release()
    if writer is not None:
        writer.release()

# Rerun the pipeline with the modified function
VIDEO_PATH = "/content/drive/MyDrive/videos/Input Video.mp4"
run_pipeline(VIDEO_PATH, write_video=cfg.annotate_video)

t = 0.00s, signal = red, train = not_present, direction = unknown
t = 1.90s, signal = red, train = just_entered, direction = unknown
t = 1.95s, signal = red, train = moving, direction = left
t = 1.97s, signal = red, train = moving, direction = unknown
t = 2.00s, signal = red, train = moving, direction = left
t = 2.02s, signal = red, train = moving, direction = unknown
t = 2.07s, signal = red, train = moving, direction = left
t = 3.67s, signal = red, train = moving, direction = right
t = 3.68s, signal = red, train = moving, direction = left
t = 3.72s, signal = red, train = moving, direction = right
t = 3.73s, signal = red, train = moving, direction = left
t = 3.75s, signal = red, train = moving, direction = right
t = 3.78s, signal = red, train = moving, direction = left
t = 3.80s, signal = red, train = moving, direction = right
t = 3.83s, signal = red, train = moving, direction = left
t = 3.87s, signal = red, train = moving, direction = right
t = 3.88s, signal = red, train = moving, dir

## Modify the annotation

### Subtask:
Update the video annotation to display the detected train direction along with the train state.


**Reasoning**:
Modify the cv2.putText calls in the run_pipeline function to include the train direction in the annotation text for both the initial frame and the main loop.



In [4]:
# Modify the main pipeline to use the updated format_event and TrainState
def run_pipeline(video_path, write_video=cfg.annotate_video):
    det = Detector(cfg.yolo_weights)
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise SystemExit(f"Cannot open video: {video_path}")

    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    W  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 640)
    H  = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 360)

    writer = None
    if write_video:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        writer = cv2.VideoWriter(cfg.out_video_path, fourcc, fps, (W, H))

    ok, frame0 = cap.read()
    if not ok:
        raise SystemExit("Empty video.")
    gray_prev = cv2.cvtColor(frame0, cv2.COLOR_BGR2GRAY)

    # Process first frame at t=0
    tl_boxes, tr_boxes = det.infer(frame0)
    tl_roi = None
    if tl_boxes:
        (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
        x1,y1 = max(0,x1), max(0,y1)
        x2,y2 = min(W-1,x2), min(H-1,y2)
        if x2>x1 and y2>y1:
            tl_roi = frame0[y1:y2, x1:x2]
    last_signal = classify_signal_color_bgr(tl_roi, prev_color=None)
    ts = TrainState()
    train_roi = pick_largest_valid_train(tr_boxes)
    ts = update_train_state(ts, gray_prev, gray_prev, train_roi)  # prev==curr at t=0
    last_reported_signal = None
    last_reported_train_mode  = None
    last_reported_train_direction = None


    # Emit initial line
    line0 = format_event(0.0, last_signal, ts.mode, ts.direction)
    print(line0)
    last_reported_signal = last_signal
    last_reported_train_mode  = ts.mode
    last_reported_train_direction = ts.direction

    # Annotate first frame
    if writer is not None:
        draw = frame0.copy()
        if tl_boxes:
            (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
            cv2.rectangle(draw, (x1,y1), (x2,y2), (0,255,255), 2)
            cv2.putText(draw, f"signal:{last_signal}", (x1, max(20,y1-6)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
        if train_roi is not None:
            x1,y1,x2,y2 = train_roi
            cv2.rectangle(draw, (x1,y1), (x2,y2), (0,128,255), 3)
            cv2.putText(draw, f"train:{ts.mode} ({ts.direction})", (x1, min(H-6,y2+18)), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,128,255), 2) # Modified annotation
        cv2.putText(draw, f"t=0.00s", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
        writer.write(draw)

    # Iterate remaining frames
    i = 1
    while True:
        ok, frame = cap.read()
        if not ok:
            break
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        t_s = i / float(fps)

        tl_boxes, tr_boxes = det.infer(frame)
        tl_roi = None
        if tl_boxes:
            (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
            x1,y1 = max(0,x1), max(0,y1)
            x2,y2 = min(W-1,x2), min(H-1,y2)
            if x2>x1 and y2>y1:
                tl_roi = frame[y1:y2, x1:x2]
        sig = classify_signal_color_bgr(tl_roi, prev_color=last_signal)

        train_roi = pick_largest_valid_train(tr_boxes)
        ts = update_train_state(ts, gray_prev, gray, train_roi)

        changed = (sig != last_reported_signal) or (ts.mode != last_reported_train_mode) or (ts.direction != last_reported_train_direction)
        if changed:
            print(format_event(t_s, sig, ts.mode, ts.direction))
            last_reported_signal = sig
            last_reported_train_mode  = ts.mode
            last_reported_train_direction = ts.direction

        # Annotate if requested
        if writer is not None:
            draw = frame.copy()
            if tl_boxes:
                (x1,y1,x2,y2), _ = sorted(tl_boxes, key=lambda x:x[1], reverse=True)[0]
                cv2.rectangle(draw, (x1,y1), (x2,y2), (0,255,255), 2)
                cv2.putText(draw, f"signal:{sig}", (x1, max(20,y1-6)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)
            if train_roi is not None:
                x1,y1,x2,y2 = train_roi
                cv2.rectangle(draw, (x1,y1), (x2,y2), (0,128,255), 3)
                cv2.putText(draw, f"train:{ts.mode} ({ts.direction})", (x1, min(H-6,y2+18)), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,128,255), 2) # Modified annotation
            cv2.putText(draw, f"t={t_s:.2f}s", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255,255,255), 2)
            writer.write(draw)

        gray_prev = gray
        last_signal = sig
        i += 1

    cap.release()
    if writer is not None:
        writer.release()

# Rerun the pipeline with the modified function
VIDEO_PATH = "/content/drive/MyDrive/videos/Input Video.mp4"
run_pipeline(VIDEO_PATH, write_video=cfg.annotate_video)

t = 0.00s, signal = red, train = not_present, direction = unknown
t = 1.90s, signal = red, train = just_entered, direction = unknown
t = 1.95s, signal = red, train = moving, direction = left
t = 1.97s, signal = red, train = moving, direction = unknown
t = 2.00s, signal = red, train = moving, direction = left
t = 2.02s, signal = red, train = moving, direction = unknown
t = 2.07s, signal = red, train = moving, direction = left
t = 3.67s, signal = red, train = moving, direction = right
t = 3.68s, signal = red, train = moving, direction = left
t = 3.72s, signal = red, train = moving, direction = right
t = 3.73s, signal = red, train = moving, direction = left
t = 3.75s, signal = red, train = moving, direction = right
t = 3.78s, signal = red, train = moving, direction = left
t = 3.80s, signal = red, train = moving, direction = right
t = 3.83s, signal = red, train = moving, direction = left
t = 3.87s, signal = red, train = moving, direction = right
t = 3.88s, signal = red, train = moving, dir

## Summary:

### Data Analysis Key Findings

*   The `TrainState` dataclass was successfully updated to include a `direction` attribute to store the detected train direction.
*   The `update_train_state` function was enhanced to detect the train's direction ("left", "right", "stopped", or "unknown") based on motion analysis (ORB features and XOR fallback) and speed thresholds. A smoothing mechanism was implemented using a direction history list to improve the robustness of the direction detection.
*   The `format_event` function and the main `run_pipeline` function were modified to include the detected train direction in the printed output logs. The output now includes the direction when the train state changes.
*   The video annotation in the `run_pipeline` function was updated to display the detected train direction alongside the train state in the format "train:{ts.mode} ({ts.direction})".

### Insights or Next Steps

*   Further refinement of the direction smoothing algorithm could be explored to handle edge cases or rapid changes in direction more effectively.
*   Consider evaluating the performance of the direction detection using different motion analysis techniques or larger smoothing windows to optimize accuracy and responsiveness.
