In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import sys, subprocess

def pip_install(pkgs):
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q"] + pkgs)

pip_install([
    "ultralytics",
    "opencv-python",
    "torchvision",
    "numpy",
    "pandas"
])

import os, cv2, json, time
import numpy as np
import torch
import torch.nn.functional as F

from dataclasses import dataclass, asdict
from typing import Dict, List, Tuple

from ultralytics import YOLO
from torchvision.models.segmentation import deeplabv3_resnet50


In [None]:
# ====== PATHS ======
VIDEO_IN_1 = "/kaggle/input/demo_video/demo_vid.mov"
VIDEO_IN_2 = "/kaggle/input/demo_video/demo_vid2.mov"

YOLO_W = "/kaggle/input/yolo-bdd100k-best/best.pt"
DEEPLAB_W = "/kaggle/input/deeplabv3-bdd100k-best/best.pt"

OUT_DIR = "/kaggle/working/outputs"
os.makedirs(OUT_DIR, exist_ok=True)

# ====== CLASS DEFINITIONS ======
YOLO_CLASSES = ["car", "person", "truck", "bus", "traffic_control"]

SEG_OTHER = 0
SEG_ROAD = 1
SEG_SIDEWALK = 2
SEG_PERSON = 3
SEG_RIDER = 4
SEG_VEHICLE = 5


In [None]:
@dataclass
class Config:
    device: str = "cuda" if torch.cuda.is_available() else "cpu"

    # YOLO
    yolo_imgsz: int = 768
    yolo_conf: float = 0.25
    yolo_iou: float = 0.50
    yolo_tracker: str = "bytetrack.yaml"

    # DeepLab
    seg_in_size: Tuple[int,int] = (512, 512)
    seg_every_n: int = 5          # không cần chạy mỗi frame
    morph_kernel: int = 7         # làm mịn mask

    # Rule thresholds
    person_road_thr: float = 0.25
    vehicle_sidewalk_thr: float = 0.20

    # Visualization
    bottom_roi_start: float = 0.6
    seg_alpha: float = 0.35

cfg = Config()


In [None]:
class DeepLab6:
    def __init__(self, ckpt_path: str, device: str):
        self.device = device
        self.model = deeplabv3_resnet50(weights=None, num_classes=6)

        ckpt = torch.load(ckpt_path, map_location="cpu")
        if "state_dict" in ckpt:
            ckpt = ckpt["state_dict"]

        ckpt = {k.replace("module.", ""): v for k, v in ckpt.items()}
        self.model.load_state_dict(ckpt, strict=False)

        self.model.to(self.device).eval()

    @torch.inference_mode()
    def infer(self, frame_bgr: np.ndarray, size_hw: Tuple[int,int]):
        H, W = frame_bgr.shape[:2]
        h, w = size_hw

        img = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (w, h))
        x = torch.from_numpy(img).permute(2,0,1).float() / 255.0
        x = x.unsqueeze(0).to(self.device)

        out = self.model(x)["out"]
        seg = out.argmax(1)

        seg = F.interpolate(
            seg.unsqueeze(1).float(),
            size=(H, W),
            mode="nearest"
        ).squeeze(1).long()

        return seg[0].cpu().numpy()


In [None]:
def bbox_area(x1,y1,x2,y2):
    return max(0, x2-x1) * max(0, y2-y1)

def lower_bbox(x1,y1,x2,y2, frac=0.33):
    h = y2 - y1
    return x1, int(y2 - h*frac), x2, y2

def intersection_ratio(mask: np.ndarray, box):
    x1,y1,x2,y2 = box
    area = bbox_area(x1,y1,x2,y2)
    if area == 0:
        return 0.0
    return np.count_nonzero(mask[y1:y2, x1:x2]) / area

def smooth_mask(mask: np.ndarray, k: int):
    kernel = np.ones((k,k), np.uint8)
    m = (mask.astype(np.uint8) * 255)
    m = cv2.morphologyEx(m, cv2.MORPH_OPEN, kernel)
    m = cv2.morphologyEx(m, cv2.MORPH_CLOSE, kernel)
    return m > 0


In [None]:
def run_video(video_in, video_out, log_path):
    yolo = YOLO(YOLO_W)
    deeplab = DeepLab6(DEEPLAB_W, cfg.device)

    cap = cv2.VideoCapture(video_in)
    fps = cap.get(cv2.CAP_PROP_FPS) or 30
    W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    writer = cv2.VideoWriter(
        video_out,
        cv2.VideoWriter_fourcc(*"mp4v"),
        fps,
        (W, H)
    )

    log_f = open(log_path, "w", encoding="utf-8")

    seg_cache = np.zeros((H, W), dtype=np.int32)

    frame_idx = 0
    while True:
        ok, frame = cap.read()
        if not ok:
            break

        # --- segmentation mỗi N frame
        if frame_idx % cfg.seg_every_n == 0:
            seg_cache = deeplab.infer(frame, cfg.seg_in_size)

        road_mask = smooth_mask(seg_cache == SEG_ROAD, cfg.morph_kernel)
        sidewalk_mask = smooth_mask(seg_cache == SEG_SIDEWALK, cfg.morph_kernel)

        out = frame.copy()

        # --- overlay DeepLab (bottom ROI)
        y0 = int(H * cfg.bottom_roi_start)
        heat = np.maximum(
            road_mask[y0:]*255,
            sidewalk_mask[y0:]*180
        ).astype(np.uint8)

        heat_color = cv2.applyColorMap(heat, cv2.COLORMAP_JET)
        out[y0:] = cv2.addWeighted(
            out[y0:], 1-cfg.seg_alpha,
            heat_color, cfg.seg_alpha, 0
        )

        # --- YOLO detection + tracking
        res = yolo.track(
            source=frame,
            imgsz=cfg.yolo_imgsz,
            conf=cfg.yolo_conf,
            iou=cfg.yolo_iou,
            persist=True,
            tracker=cfg.yolo_tracker,
            verbose=False
        )[0]

        if res.boxes is not None:
            for box, cls_id, conf, tid in zip(
                res.boxes.xyxy.cpu().numpy(),
                res.boxes.cls.cpu().numpy().astype(int),
                res.boxes.conf.cpu().numpy(),
                res.boxes.id.cpu().numpy() if res.boxes.id is not None else [-1]*len(res.boxes)
            ):
                x1,y1,x2,y2 = map(int, box)
                cls = YOLO_CLASSES[cls_id]

                bx = lower_bbox(x1,y1,x2,y2)

                # rule examples
                if cls == "person":
                    r = intersection_ratio(road_mask, bx)
                    if r > cfg.person_road_thr:
                        log_f.write(json.dumps({
                            "frame": frame_idx,
                            "track_id": int(tid),
                            "event": "PedestrianOnRoad",
                            "ratio": r
                        }) + "\n")

                if cls in ["car","truck","bus"]:
                    r = intersection_ratio(sidewalk_mask, bx)
                    if r > cfg.vehicle_sidewalk_thr:
                        log_f.write(json.dumps({
                            "frame": frame_idx,
                            "track_id": int(tid),
                            "event": "VehicleOnSidewalk",
                            "ratio": r
                        }) + "\n")

                cv2.rectangle(out, (x1,y1), (x2,y2), (0,255,0), 2)
                cv2.putText(
                    out,
                    f"{cls}#{int(tid)}",
                    (x1, y1-5),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.6,
                    (255,255,255),
                    2
                )

        writer.write(out)
        frame_idx += 1

    log_f.close()
    writer.release()
    cap.release()


In [None]:
run_video(
    VIDEO_IN_1,
    f"{OUT_DIR}/demo1_out.mp4",
    f"{OUT_DIR}/demo1_events.jsonl"
)

run_video(
    VIDEO_IN_2,
    f"{OUT_DIR}/demo2_out.mp4",
    f"{OUT_DIR}/demo2_events.jsonl"
)

print("DONE")
