# ISDS Traffic AI Full Pipeline

All project modules, scripts, configs, and tests from the ISDS traffic congestion repository are consolidated below as notebook cells so the full pipeline can be executed end-to-end from a single `.ipynb` file.


**Notebook Layout**
- Core package modules (config, data, models, pipeline, utils)
- Training and fine-tuning scripts
- Configuration YAML references
- Test suites as optional smoke checks


In [None]:
# File: src/__init__.py
"Top-level package for the ISDS traffic congestion estimation project."


In [None]:
# File: src/config/__init__.py
from .defaults import load_config, save_config

__all__ = ["load_config", "save_config"]


In [None]:
# File: src/config/defaults.py
from __future__ import annotations

import copy
from pathlib import Path
from typing import Any, Mapping, MutableMapping, Optional

import yaml


def _deep_update(base: MutableMapping[str, Any], updates: Mapping[str, Any]) -> MutableMapping[str, Any]:
    """Recursively merge ``updates`` into ``base``."""
    for key, value in updates.items():
        if isinstance(value, Mapping) and isinstance(base.get(key), Mapping):
            base[key] = _deep_update(dict(base[key]), value)
        else:
            base[key] = copy.deepcopy(value)
    return base


def load_config(path: str | Path, overrides: Optional[Mapping[str, Any]] = None) -> dict[str, Any]:
    """Load a YAML configuration file and optionally apply overrides."""
    cfg_path = Path(path)
    if not cfg_path.exists():
        raise FileNotFoundError(f"Config file not found: {cfg_path}")

    with cfg_path.open("r", encoding="utf-8") as handle:
        data = yaml.safe_load(handle) or {}

    if overrides:
        data = _deep_update(data, overrides)
    return data


def save_config(config: Mapping[str, Any], path: str | Path) -> None:
    """Persist a configuration mapping to disk."""
    cfg_path = Path(path)
    cfg_path.parent.mkdir(parents=True, exist_ok=True)
    with cfg_path.open("w", encoding="utf-8") as handle:
        yaml.safe_dump(dict(config), handle, sort_keys=False)


In [None]:
# File: src/data/__init__.py
from .datasets import SegmentationDataset, load_split_file

__all__ = ["SegmentationDataset", "load_split_file"]


In [None]:
# File: src/data/datasets.py
from __future__ import annotations

import csv
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple

import numpy as np
import torch
from PIL import Image
from torch.utils.data import Dataset
from torchvision.transforms import ColorJitter
from torchvision.transforms import functional as F

In [None]:
@dataclass(frozen=True)
class SegmentationSample:
    image_path: Path
    mask_path: Path

In [None]:
def load_split_file(file_path: str | Path) -> List[Path]:
    split_path = Path(file_path)
    if not split_path.exists():
        raise FileNotFoundError(f"Split file not found: {split_path}")
    with split_path.open("r", encoding="utf-8") as handle:
        entries = [line.strip() for line in handle if line.strip()]
    return [Path(entry) for entry in entries]

In [None]:
def collect_directory_samples(
    root: str | Path,
    image_dir: str,
    mask_dir: str,
    split_subdir: str,
    image_suffix: str,
    mask_suffix: str,
) -> List[Tuple[Path, Path]]:
    root = Path(root)
    image_root = root / image_dir / split_subdir
    mask_root = root / mask_dir / split_subdir

    if not image_root.exists():
        raise FileNotFoundError(f"Image split directory not found: {image_root}")
    if not mask_root.exists():
        raise FileNotFoundError(f"Mask split directory not found: {mask_root}")

    pattern = f"*{image_suffix}" if image_suffix else "*"
    samples: List[Tuple[Path, Path]] = []
    for image_path in sorted(image_root.glob(pattern)):
        stem = image_path.stem
        mask_path = mask_root / f"{stem}{mask_suffix}"
        if not mask_path.exists():
            raise FileNotFoundError(f"Missing mask for {image_path.name}: {mask_path}")
        samples.append((image_path, mask_path))

    if not samples:
        raise RuntimeError(f"No samples found under {image_root}")
    return samples

In [None]:
class SegmentationDataset(Dataset):
    def __init__(
        self,
        samples: Sequence[Tuple[Path, Path]],
        num_classes: int,
        transform: Optional[Callable] = None,
        use_binary_mask: bool = True,
    ) -> None:
        self.samples = samples
        self.num_classes = num_classes
        self.transform = transform
        self.use_binary_mask = use_binary_mask

    def __len__(self) -> int:
        return len(self.samples)

    def __getitem__(self, index: int) -> Tuple[torch.Tensor, torch.Tensor]:
        image_path, mask_path = self.samples[index]
        image = Image.open(image_path).convert("RGB")
        mask = Image.open(mask_path)

        if self.transform:
            image, mask = self.transform(image, mask)

        image_tensor = F.to_tensor(image)
        mask_tensor = torch.as_tensor(np.array(mask), dtype=torch.long)

        if self.use_binary_mask:
            mask_tensor = (mask_tensor > 0).long()

        return image_tensor, mask_tensor

In [None]:
def segmentation_collate(
    batch: Iterable[Tuple[torch.Tensor, torch.Tensor]],
) -> Tuple[torch.Tensor, torch.Tensor]:
    images, masks = zip(*batch)
    return torch.stack(images, 0), torch.stack(masks, 0)

In [None]:
# File: src/models/__init__.py


In [None]:
# File: src/models/detection/__init__.py
from .yolo_wrapper import YOLODetector, DetectionResult, TrackEvent

__all__ = ["YOLODetector", "DetectionResult", "TrackEvent"]


In [None]:
# File: src/models/detection/yolo_wrapper.py
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Generator, Iterable, List, Optional

import numpy as np

try:
    from ultralytics import YOLO  # type: ignore
except ImportError as exc:  # pragma: no cover - handled at runtime
    raise RuntimeError("ultralytics package is required for YOLODetector") from exc

In [None]:
@dataclass
class DetectionResult:
    frame_index: int
    timestamp: float
    boxes: np.ndarray  # shape [N, 4] in xyxy format
    scores: np.ndarray  # shape [N]
    class_ids: np.ndarray  # shape [N]

In [None]:
@dataclass
class TrackEvent:
    frame_index: int
    timestamp: float
    track_id: int
    class_id: int
    bbox_xyxy: np.ndarray
    is_confirmed: bool
    velocity: Optional[np.ndarray] = None

In [None]:
class YOLODetector:
    def __init__(
        self,
        weights: str | Path,
        device: str = "cuda",
        conf_threshold: float = 0.25,
        iou_threshold: float = 0.45,
    ) -> None:
        self.model = YOLO(str(weights))
        self.device = device
        self.conf_threshold = conf_threshold
        self.iou_threshold = iou_threshold

In [None]:
    def finetune(self, data_cfg: str | Path, epochs: int, batch: int, img_size: int, project_dir: str | Path, name: str, **kwargs) -> None:
        """Fine-tune the detector using Ultralytics training loop."""
        self.model.train(
            data=str(data_cfg),
            epochs=epochs,
            batch=batch,
            imgsz=img_size,
            device=self.device,
            project=str(project_dir),
            name=name,
            conf=self.conf_threshold,
            iou=self.iou_threshold,
            **kwargs,
        )

In [None]:
    def predict(self, frame: np.ndarray, frame_index: int, timestamp: float) -> DetectionResult:
        results = self.model.predict(
            source=frame,
            device=self.device,
            conf=self.conf_threshold,
            iou=self.iou_threshold,
            verbose=False,
        )[0]
        return DetectionResult(
            frame_index=frame_index,
            timestamp=timestamp,
            boxes=results.boxes.xyxy.cpu().numpy(),
            scores=results.boxes.conf.cpu().numpy(),
            class_ids=results.boxes.cls.cpu().numpy().astype(int),
        )

In [None]:
    def track(
        self,
        frame: np.ndarray,
        frame_index: int,
        timestamp: float,
        tracker_cfg: str | Path,
        classes: Optional[List[int]] = None,
    ) -> Generator[TrackEvent, None, None]:
        results = self.model.track(
            source=frame,
            tracker=str(tracker_cfg),
            classes=classes,
            device=self.device,
            conf=self.conf_threshold,
            iou=self.iou_threshold,
            verbose=False,
            persist=True,
        )[0]

        if results.boxes.id is None:
            return

        track_ids = results.boxes.id.cpu().numpy().astype(int)
        boxes_xyxy = results.boxes.xyxy.cpu().numpy()
        class_ids = results.boxes.cls.cpu().numpy().astype(int)
        conf_scores = results.boxes.conf.cpu().numpy()

        for i, track_id in enumerate(track_ids):
            yield TrackEvent(
                frame_index=frame_index,
                timestamp=timestamp,
                track_id=track_id,
                class_id=class_ids[i],
                bbox_xyxy=boxes_xyxy[i],
                is_confirmed=(conf_scores[i] > self.conf_threshold),
            )

In [None]:
# File: src/models/segmentation/__init__.py


In [None]:
# File: src/models/segmentation/deeplab_seresnet.py
from __future__ import annotations

from typing import Dict

import torch
from torch import nn
from torch.nn import functional as F
from torchvision.models.segmentation.deeplabv3 import ASPP

from .backbones.resnet_se_dw import SEResNet34, seresnet34_backbone

In [None]:
class DeepLabSEResNet34(nn.Module):
    """DeepLabV3+ style head coupled with the SEResNet34 backbone."""

    def __init__(self, num_classes: int = 2, output_stride: int = 16, dropout: float = 0.1) -> None:
        super().__init__()
        self.backbone: SEResNet34 = seresnet34_backbone(output_stride=output_stride)
        atrous_rates = [6, 12, 18]
        if output_stride == 8:
            atrous_rates = [rate * 2 for rate in atrous_rates]
        elif output_stride == 32:
            atrous_rates = [rate // 2 for rate in atrous_rates]
        self.aspp = ASPP(512, tuple(atrous_rates), out_channels=256)
        self.aspp_proj = nn.Sequential(
            nn.Conv2d(256, 256, kernel_size=1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
        )
        self.low_level_proj = nn.Sequential(
            nn.Conv2d(64, 48, kernel_size=1, bias=False),
            nn.BatchNorm2d(48),
            nn.ReLU(inplace=True),
        )
        self.decoder = nn.Sequential(
            nn.Conv2d(256 + 48, 256, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Dropout(p=dropout, inplace=False),
            nn.Conv2d(256, num_classes, kernel_size=1),
        )

In [None]:
    def forward(self, x: torch.Tensor) -> Dict[str, torch.Tensor]:
        input_size = x.shape[-2:]
        features = self.backbone(x)
        low_level = features["low_level"]
        encoder_out = features["out"]

        aspp_out = self.aspp(encoder_out)
        aspp_out = self.aspp_proj(aspp_out)

        low_level = self.low_level_proj(low_level)
        low_level_interp = F.interpolate(
            low_level, size=aspp_out.shape[-2:], mode="bilinear", align_corners=False
        )

        decoder_in = torch.cat([aspp_out, low_level_interp], dim=1)
        decoder_out = self.decoder(decoder_in)
        decoder_out = F.interpolate(decoder_out, size=input_size, mode="bilinear", align_corners=False)

        return {"out": decoder_out}

In [None]:
def build_segmentation_model(num_classes: int, **kwargs) -> DeepLabSEResNet34:
    return DeepLabSEResNet34(num_classes=num_classes, **kwargs)

In [None]:
# File: src/models/segmentation/backbones/__init__.py


In [None]:
# File: src/models/segmentation/backbones/resnet_se_dw.py
from __future__ import annotations

from collections import OrderedDict
from typing import Dict

import torch
from torch import nn

In [None]:
class DepthwiseSeparableConv(nn.Module):
    """Depthwise separable convolution used inside the modified ResNet blocks."""

    def __init__(self, in_channels: int, out_channels: int, stride: int = 1, dilation: int = 1, bias: bool = False) -> None:
        super().__init__()
        padding = dilation
        self.depthwise = nn.Conv2d(
            in_channels,
            in_channels,
            kernel_size=3,
            stride=stride,
            padding=padding,
            dilation=dilation,
            groups=in_channels,
            bias=bias,
        )
        self.pointwise = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=bias)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.depthwise(x)
        x = self.pointwise(x)
        return x

In [None]:
class SEBlock(nn.Module):
    def __init__(self, channels: int, reduction: int = 16) -> None:
        super().__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        hidden = max(channels // reduction, 8)
        self.fc = nn.Sequential(
            nn.Linear(channels, hidden, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(hidden, channels, bias=False),
            nn.Sigmoid(),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y

In [None]:
class SEDWBasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes: int, planes: int, stride: int = 1, downsample: nn.Module | None = None, dilation: int = 1) -> None:
        super().__init__()
        self.conv1 = DepthwiseSeparableConv(inplanes, planes, stride=stride, dilation=dilation)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = DepthwiseSeparableConv(planes, planes, dilation=dilation)
        self.bn2 = nn.BatchNorm2d(planes)
        self.se = SEBlock(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.se(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)
        return out

In [None]:
class SEResNet34(nn.Module):
    def __init__(self, output_stride: int = 16) -> None:
        super().__init__()
        if output_stride not in {8, 16, 32}:
            raise ValueError(f"Invalid output_stride: {output_stride}")

        strides = [1, 2, 2, 2]
        dilations = [1, 1, 1, 1]
        if output_stride == 16:
            strides[3] = 1
            dilations[3] = 2
        elif output_stride == 8:
            strides[2] = 1
            strides[3] = 1
            dilations[2] = 2
            dilations[3] = 4

        self.inplanes = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(SEDWBasicBlock, 64, 3, stride=strides[0], dilation=dilations[0])
        self.layer2 = self._make_layer(SEDWBasicBlock, 128, 4, stride=strides[1], dilation=dilations[1])
        self.layer3 = self._make_layer(SEDWBasicBlock, 256, 6, stride=strides[2], dilation=dilations[2])
        self.layer4 = self._make_layer(SEDWBasicBlock, 512, 3, stride=strides[3], dilation=dilations[3])

In [None]:
    def _make_layer(self, block, planes, blocks, stride=1, dilation=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * block.expansion),
            )

        layers = [block(self.inplanes, planes, stride, downsample, dilation=1)]
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes, dilation=dilation))
        return nn.Sequential(*layers)

In [None]:
    def forward(self, x: torch.Tensor) -> Dict[str, torch.Tensor]:
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x1 = self.layer1(x)
        x2 = self.layer2(x1)
        x3 = self.layer3(x2)
        x4 = self.layer4(x3)

        return OrderedDict([("low_level", x1), ("out", x4)])

In [None]:
def seresnet34_backbone(**kwargs) -> SEResNet34:
    return SEResNet34(**kwargs)

In [None]:
# File: src/pipeline/__init__.py


In [None]:
# File: src/pipeline/congestion.py
from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path
from statistics import mean
from typing import Dict, Iterable, List, Optional

import cv2
import numpy as np
import torch

from src.models.detection import TrackEvent, YOLODetector
from src.models.segmentation.deeplab_seresnet import build_segmentation_model
from src.pipeline.roi import LaneROIExtractor, ROILine
from src.pipeline.speed import SpeedEstimator
from src.utils import get_logger
from src.utils.video import iter_video_frames


LEVEL_ORDER = ["A", "B", "C", "D", "E", "F"]

In [None]:
@dataclass
class LoSThresholds:
    density: Dict[str, float]
    speed_two_wheel: Dict[str, float]
    speed_four_wheel: Dict[str, float]
    default_vehicle_length_m: float = 3.0

In [None]:
@dataclass
class LaneMetrics:
    lane_index: int
    vehicle_count: int
    avg_speed_kph: float
    density: float
    level_of_service: str

In [None]:
@dataclass
class PipelineResult:
    window_start: float
    window_end: float
    lanes: List[LaneMetrics]

In [None]:
class CongestionClassifier:
    def __init__(self, thresholds: LoSThresholds) -> None:
        self.thresholds = thresholds

    def classify(self, density: float, avg_speed_kph: float, vehicle_type: str) -> str:
        density_level = self._level_from_density(density)
        speed_level = self._level_from_speed(avg_speed_kph, vehicle_type)
        return LEVEL_ORDER[max(LEVEL_ORDER.index(density_level), LEVEL_ORDER.index(speed_level))]

    def _level_from_density(self, density: float) -> str:
        for level in LEVEL_ORDER[:-1]:  # exclude F which is catch-all
            threshold = self.thresholds.density.get(level)
            if threshold is not None and density <= threshold:
                return level
        return "F"

    def _level_from_speed(self, speed_kph: float, vehicle_type: str) -> str:
        speed_thresholds = self.thresholds.speed_two_wheel if "two" in vehicle_type else self.thresholds.speed_four_wheel
        for level in LEVEL_ORDER[:-1]:
            threshold = speed_thresholds.get(level)
            if threshold is not None and speed_kph >= threshold:
                return level
        return "F"

In [None]:
# File: src/pipeline/roi.py
from __future__ import annotations

from dataclasses import dataclass
from typing import List, Sequence, Tuple

import cv2
import numpy as np

In [None]:
@dataclass
class ROILine:
    y: float
    x_left: float
    x_right: float
    pixel_length: float

    @property
    def center(self) -> tuple[float, float]:
        return (0.5 * (self.x_left + self.x_right), self.y)

    @property
    def width(self) -> float:
        return abs(self.x_right - self.x_left)

In [None]:
class LaneROIExtractor:
    def __init__(
        self,
        dash_lengths_m: Sequence[float] = (1.0, 2.0),
        y_group_tolerance_px: int = 30,
        min_component_area: int = 40,
        aspect_ratio_range: Tuple[float, float] = (1.0, 15.0),
    ) -> None:
        self.dash_lengths_m = dash_lengths_m
        self.y_group_tolerance_px = y_group_tolerance_px
        self.min_component_area = min_component_area
        self.aspect_ratio_range = aspect_ratio_range

In [None]:
    def extract(self, mask: np.ndarray) -> Tuple[List[ROILine], float]:
        if mask.ndim != 2:
            raise ValueError("Segmentation mask must be a 2D array")

        binary = (mask > 0).astype(np.uint8) * 255
        binary = cv2.medianBlur(binary, 3)

        num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(binary, connectivity=8)
        components = []
        for i in range(1, num_labels):  # skip background
            area = stats[i, cv2.CC_STAT_AREA]
            if area < self.min_component_area:
                continue
            width = stats[i, cv2.CC_STAT_WIDTH]
            height = stats[i, cv2.CC_STAT_HEIGHT]
            if width == 0 or height == 0:
                continue
            aspect = max(width, height) / max(min(width, height), 1)
            if not (self.aspect_ratio_range[0] <= aspect <= self.aspect_ratio_range[1]):
                continue
            cx, cy = centroids[i]
            components.append((cy, cx, width, height))

        if not components:
            return [], 0.0

        # ... (rest of the function)

In [None]:
# File: src/pipeline/speed.py
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Dict, Optional

import numpy as np

from src.models.detection.yolo_wrapper import TrackEvent

In [None]:
@dataclass
class SpeedEstimate:
    track_id: int
    timestamp: float
    speed_kph: float
    displacement_m: float
    duration_s: float

In [None]:
@dataclass
class _TrackHistory:
    first_timestamp: float
    first_centroid: np.ndarray
    last_centroid: np.ndarray = field(default_factory=lambda: np.zeros(2))

In [None]:
class SpeedEstimator:
    def __init__(self, meter_per_pixel: float) -> None:
        self.meter_per_pixel = meter_per_pixel
        self._histories: Dict[int, _TrackHistory] = {}

    def reset(self) -> None:
        self._histories.clear()

    def update(self, event: TrackEvent) -> Optional[SpeedEstimate]:
        if self.meter_per_pixel <= 0:
            return None

        x1, y1, x2, y2 = event.bbox_xyxy
        centroid = np.array([(x1 + x2) * 0.5, (y1 + y2) * 0.5], dtype=float)

        history = self._histories.get(event.track_id)
        if history is None:
            self._histories[event.track_id] = _TrackHistory(
                first_timestamp=event.timestamp,
                first_centroid=centroid,
                last_centroid=centroid,
            )
            return None

        duration = event.timestamp - history.first_timestamp
        if duration <= 0:
            return None

        displacement_px = float(np.linalg.norm(centroid - history.first_centroid))
        displacement_m = displacement_px * self.meter_per_pixel
        speed_mps = displacement_m / duration
        speed_kph = speed_mps * 3.6
        history.last_centroid = centroid

        return SpeedEstimate(
            track_id=event.track_id,
            timestamp=event.timestamp,
            speed_kph=float(speed_kph),
            displacement_m=displacement_m,
            duration_s=duration,
        )

In [None]:
# File: src/utils/__init__.py
from .logging import configure_logging, get_logger
from .video import VideoWriter, iter_video_frames

__all__ = ["configure_logging", "get_logger", "VideoWriter", "iter_video_frames"]


In [None]:
# File: src/utils/logging.py
from __future__ import annotations

import logging
from typing import Optional

from rich.logging import RichHandler


_LOGGER_INITIALIZED = False


def configure_logging(level: str = "INFO") -> None:
    global _LOGGER_INITIALIZED
    if _LOGGER_INITIALIZED:
        return
    logging.basicConfig(
        level=getattr(logging, level.upper(), logging.INFO),
        format="%(message)s",
        datefmt="[X]",
        handlers=[RichHandler(rich_tracebacks=True, markup=True)],
    )
    _LOGGER_INITIALIZED = True


def get_logger(name: Optional[str] = None) -> logging.Logger:
    if not _LOGGER_INITIALIZED:
        configure_logging()
    return logging.getLogger(name or "src")


In [None]:
# File: src/utils/video.py
from __future__ import annotations

from contextlib import contextmanager
from pathlib import Path
from typing import Generator, Iterator, Tuple

import cv2


def iter_video_frames(path: str | Path) -> Iterator[tuple[int, float, any]]:
    """Yield (frame_index, timestamp_seconds, frame_bgr)."""
    capture = cv2.VideoCapture(str(path))
    if not capture.isOpened():
        raise RuntimeError(f"Failed to open video: {path}")

    fps = capture.get(cv2.CAP_PROP_FPS) or 30.0
    frame_idx = 0
    try:
        while True:
            ok, frame = capture.read()
            if not ok:
                break
            timestamp = frame_idx / fps
            yield frame_idx, timestamp, frame
            frame_idx += 1
    finally:
        capture.release()


class VideoWriter:
    """Simple wrapper above OpenCV VideoWriter."""

    def __init__(self, path: str | Path, fps: float, frame_size: Tuple[int, int]) -> None:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        self._writer = cv2.VideoWriter(str(path), fourcc, fps, frame_size)
        if not self._writer.isOpened():
            raise RuntimeError(f"Failed to create video writer at {path}")

    def write(self, frame) -> None:
        self._writer.write(frame)

    def close(self) -> None:
        self._writer.release()

    def __enter__(self) -> "VideoWriter":
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        self.close()


In [None]:
# File: scripts/__init__.py
'''
CLI entrypoints for the ISDS traffic congestion project.
'''


In [None]:
# File: scripts/run_pipeline.py
from __future__ import annotations

import argparse
import json
from pathlib import Path
from typing import List

from rich.console import Console

from src.config import load_config
from src.pipeline.congestion import CongestionPipeline, LoSThresholds
from src.pipeline.roi import LaneROIExtractor
from src.utils import configure_logging, get_logger

console = Console()


def main() -> None:
    parser = argparse.ArgumentParser(description="Run the traffic congestion estimation pipeline")
    parser.add_argument("--config", required=True)
    parser.add_argument("--video", default=None)
    args = parser.parse_args()

    configure_logging()
    logger = get_logger("run_pipeline")

    cfg = load_config(args.config)

    inputs_cfg = cfg["inputs"]
    video_path = args.video or inputs_cfg.get("video_path")
    if video_path is None:
        raise ValueError("Video path must be provided via --video or config")

    # ... (rest of the function)

In [None]:
# File: scripts/train_segmentation.py
from __future__ import annotations

import argparse
from pathlib import Path
from typing import Any, Dict, Sequence, Tuple

import torch
from rich.console import Console
from rich.progress import Progress
from torch.cuda.amp import GradScaler, autocast

from src.config import load_config
from src.data import SegmentationDataset, load_split_file
from src.data.datasets import collect_directory_samples, segmentation_collate
from src.models.segmentation.deeplab_seresnet import build_segmentation_model
from src.utils import configure_logging, get_logger

try:
    import wandb  # type: ignore
except ImportError:  # pragma: no cover - optional dependency
    wandb = None  # type: ignore

console = Console()

In [None]:
def focal_tversky_loss(
    logits: torch.Tensor,
    targets: torch.Tensor,
    alpha: float = 0.7,
    beta: float = 0.3,
    gamma: float = 1.5,
) -> torch.Tensor:
    if logits.shape[1] > 1:
        probs = torch.softmax(logits, dim=1)[:, 1:2]
    else:
        probs = torch.sigmoid(logits)
    targets = targets.float()
    if targets.ndim == 4 and targets.shape[1] != probs.shape[1]:
        targets = targets.squeeze(1).unsqueeze(1)

    true_pos = torch.sum(probs * targets)
    false_neg = torch.sum(targets * (1 - probs))
    false_pos = torch.sum((1 - targets) * probs)
    tversky = (true_pos + 1e-6) / (true_pos + alpha * false_neg + beta * false_pos + 1e-6)
    loss = torch.pow((1 - tversky), gamma)
    return loss

In [None]:
def dice_score(preds: torch.Tensor, targets: torch.Tensor, threshold: float = 0.5) -> float:
    if preds.shape[1] > 1:
        probs = torch.softmax(preds, dim=1)[:, 1]
    else:
        probs = torch.sigmoid(preds[:, 0])
    preds_bin = (probs > threshold).float()
    targets_bin = targets.float().squeeze(1)
    intersection = (preds_bin * targets_bin).sum(dim=(1, 2))
    union = preds_bin.sum(dim=(1, 2)) + targets_bin.sum(dim=(1, 2))
    score = (2.0 * intersection) / (union + 1e-6)
    return score.mean().item()

In [None]:
# File: scripts/finetune_yolo.py
from __future__ import annotations

import argparse
from pathlib import Path

import torch
from rich.console import Console

from src.config import load_config
from src.models.detection import YOLODetector
from src.utils import configure_logging, get_logger

console = Console()

In [None]:
def write_dataset_yaml(cfg: dict, output_dir: Path) -> Path:
    output_dir.mkdir(parents=True, exist_ok=True)
    yaml_path = output_dir / "dataset.yaml"
    content = f"""
path: {cfg['root']}
train: {cfg['train_images']}
val: {cfg['val_images']}
names: {cfg['class_names']}
"""
    yaml_path.write_text(content, encoding="utf-8")
    return yaml_path

In [None]:
def main() -> None:
    parser = argparse.ArgumentParser(description="Fine-tune YOLO detector using Ultralytics")
    parser.add_argument(" --config", required=True)
    parser.add_argument(" --device", default="cuda")
    args = parser.parse_args()

    configure_logging()
    logger = get_logger("finetune_yolo")

    cfg = load_config(args.config)
    data_cfg = cfg["data"]
    model_cfg = cfg["model"]
    logging_cfg = cfg.get("logging", {}

    project_dir = Path(logging_cfg.get("project_dir", "outputs/detection"))
    experiment_name = logging_cfg.get("experiment_name", "yolo_finetune")
    dataset_yaml = write_dataset_yaml(data_cfg, project_dir)

    device = args.device if torch.cuda.is_available() or args.device == "cpu" else "cpu"
    detector = YOLODetector(
        weights=model_cfg["pretrained_weights"],
        device=device,
        conf_threshold=model_cfg.get("conf_threshold", 0.25),
        iou_threshold=model_cfg.get("iou_threshold", 0.45),
    )

    train_kwargs = {"epochs": model_cfg.get("epochs", 100), "batch": model_cfg.get("batch_size", 16), "img_size": model_cfg.get("img_size", 1280), "project_dir": project_dir, "name": experiment_name}
    detector.finetune(data_cfg=dataset_yaml, **train_kwargs)

### configs/pipeline.yaml
```yaml
inputs:
  video_path: null
  output_dir: outputs/pipeline
  window_seconds: 30
  min_objects_per_window: 5

segmentation:
  checkpoint: outputs/segmentation/best.ckpt
  device: cuda
  threshold: 0.5
  smooth_kernel: 5

roi:
  dash_lengths_m: [1.0, 2.0]
  y_group_tolerance_px: 30
  min_component_area: 40
  aspect_ratio_range: [0.2, 5.0]

tracking:
  detector_weights: outputs/detection/yolov11s.pt
  tracker_config: ultralytics/cfg/trackers/botsort.yaml
  conf_threshold: 0.35
  iou_threshold: 0.4
  device: cuda

loss_thresholds:
  density: {A: 5, B: 10, C: 14, D: 24, E: 40}
  speed_two_wheel: {A: 40, B: 35, C: 30, D: 25, E: 20, F: 0}
  speed_four_wheel: {A: 45, B: 40, C: 35, D: 25, E: 15, F: 0}
  default_vehicle_length_m: 3.0

reporting:
  export_tracks: true
  export_json: true
  display: false

```

### configs/detection.yaml
```yaml
data:
  root: data/detection
  train_images: data/detection/images/train
  val_images: data/detection/images/val
  train_labels: data/detection/labels/train
  val_labels: data/detection/labels/val
  class_names: [motorbike, car, bus, truck]
  speed_labels: data/detection/speeds.csv
  format: yolo  # or coco

model:
  architecture: yolo11s
  pretrained_weights: yolov11s.pt
  img_size: 1280
  batch_size: 16
  epochs: 100
  optimizer: sgd
  learning_rate: 0.002
  momentum: 0.937
  weight_decay: 0.0005
  warmup_epochs: 3
  augment:
    hsv: true
    mosaic: true
    mixup: false

tracking:
  tracker_config: ultralytics/cfg/trackers/botsort.yaml
  conf_threshold: 0.35
  iou_threshold: 0.45

logging:
  project_dir: outputs/detection
  experiment_name: yolov11s_finetune

```

### configs/segmentation.yaml
```yaml
dataset:
  root: data/segmentation
  format: directory
  image_dir: images
  mask_dir: labels
  train_subdir: train
  val_subdir: val
  image_suffix: .jpg
  mask_suffix: .png
  palette_csv: data/segmentation/rlmd.csv
  include_class_ids: [4, 5, 6, 7, 8, 9, 10, 16]
  background_class_id: 0
  binary_mask: true
  num_classes: 2
  class_names: [background, marking]
  augmentation:
    horizontal_flip: true
    color_jitter:
      brightness: 0.2
      contrast: 0.2
      saturation: 0.2
      hue: 0.05
    gaussian_noise: false

model:
  backbone:
    pretrained: false
    output_stride: 16
  dropout: 0.1

training:
  epochs: 120
  batch_size: 4
  learning_rate: 0.0005
  weight_decay: 0.0001
  optimizer: adamw
  scheduler:
    type: cosine
    t_initial: 10
    eta_min: 0.00001
  gradient_clip_norm: 1.0
  num_workers: 4
  mixed_precision: true
  checkpoint_dir: outputs/segmentation
  resume_from: null
  validate_interval: 1
  log_interval: 10

loss:
  name: focal_tversky
  focal_gamma: 1.5
  tversky_alpha: 0.7
  tversky_beta: 0.3

metrics:
  evaluate_thresholds: [0.5]

wandb:
  enabled: false
  project: isds-traffic-ai
  run_name: null
  entity: null
  tags: [segmentation]
  notes: null
  watch: true
  watch_log: gradients
  watch_log_freq: 200

```

In [None]:
# File: tests/__init__.py


In [None]:
# File: tests/test_congestion_pipeline.py
from src.pipeline.congestion import (
    CongestionClassifier,
    LoSThresholds,
    PipelineResult,
    WindowAggregator,
)
from src.pipeline.roi import ROILine


def test_congestion_classifier_levels():
    thresholds = LoSThresholds(
        density={"A": 5, "B": 10, "C": 14, "D": 24, "E": 40},
        speed_two_wheel={"A": 40, "B": 35, "C": 30, "D": 25, "E": 20, "F": 0},
        speed_four_wheel={"A": 45, "B": 40, "C": 35, "D": 25, "E": 15, "F": 0},
    )
    classifier = CongestionClassifier(thresholds)
    assert classifier.classify(density=4, avg_speed_kph=50, vehicle_type="four_wheeler") == "A"
    assert classifier.classify(density=30, avg_speed_kph=10, vehicle_type="two_wheeler") == "F"


def test_window_aggregator_outputs_results():
    thresholds = LoSThresholds(
        density={"A": 5, "B": 10, "C": 14, "D": 24, "E": 40},
        speed_two_wheel={"A": 40, "B": 35, "C": 30, "D": 25, "E": 20, "F": 0},
        speed_four_wheel={"A": 45, "B": 40, "C": 35, "D": 25, "E": 15, "F": 0},
    )
    classifier = CongestionClassifier(thresholds)
    roi_lines = [ROILine(y=50.0, x_left=0.0, x_right=100.0, pixel_length=100.0)]
    aggregator = WindowAggregator(
        roi_lines=roi_lines,
        meter_per_pixel=0.01,
        classifier=classifier,
        window_seconds=30,
        min_objects=1,
        vehicle_type_map={0: "two_wheeler"},
    )

    result = aggregator.record(0, class_id=0, speed_kph=42.0, timestamp=31.0)
    assert isinstance(result, PipelineResult)
    lane = result.lanes[0]
    assert lane.vehicle_count == 1
    assert lane.level_of_service in {"A", "B", "C", "D", "E", "F"}


In [None]:
# File: tests/test_roi_extractor.py
import numpy as np
import cv2

from src.pipeline.roi import LaneROIExtractor


def test_roi_extraction_produces_lines():
    mask = np.zeros((120, 200), dtype=np.uint8)
    cv2.rectangle(mask, (40, 20), (50, 70), 255, -1)
    cv2.rectangle(mask, (150, 25), (160, 75), 255, -1)
    cv2.rectangle(mask, (42, 90), (52, 110), 255, -1)
    cv2.rectangle(mask, (148, 88), (158, 108), 255, -1)

    extractor = LaneROIExtractor(dash_lengths_m=(1.0,), y_group_tolerance_px=20, min_component_area=10)
    lines, meter_per_pixel = extractor.extract(mask)

    assert len(lines) >= 1
    assert meter_per_pixel > 0
    for line in lines:
        assert line.x_left < line.x_right


In [None]:
# File: tests/test_segmentation_model.py
import torch

from src.models.segmentation.deeplab_seresnet import build_segmentation_model


def test_deeplab_seresnet_forward():
    model = build_segmentation_model(num_classes=2)
    dummy = torch.randn(2, 3, 256, 256)
    outputs = model(dummy)
    assert "out" in outputs
    out = outputs["out"]
    assert out.shape == (2, 2, 256, 256)
    out.mean().backward()

**Usage Tips**
- Run the code cells sequentially to define all modules in the notebook session.
- Use the YAML reference cells to load or tweak configuration dynamically (e.g. via `yaml.safe_load`).
- Training and pipeline scripts can be invoked directly once prerequisites (datasets, weights, dependencies) are available.
- The final test cells can be run as quick smoke tests to validate key components.
