In [1]:

# 0) Install pinned deps (compat-safe)
!pip -q install "ultralytics==8.3.40" deep_sort_realtime==1.3.2 \
                 "opencv-python-headless==4.10.0.84" torch torchvision \
                 "transformers>=4.44.0" timm accelerate ffmpeg-python

# 1) Imports
import os, sys, json, time, warnings, shutil, tempfile, subprocess, math
from pathlib import Path
from typing import List, Tuple, Dict, Optional

import numpy as np
import cv2
import torch

from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
from google.colab import files
from contextlib import nullcontext

from transformers import (
    SegformerImageProcessor, SegformerForSemanticSegmentation,
    AutoImageProcessor, UperNetForSemanticSegmentation
)
import torch.nn as nn
from tqdm import tqdm

warnings.filterwarnings("ignore")

# 2) User flags (stage toggles)
RUN_DENOISING    = True
RUN_TRACKING     = True
RUN_OUTLINES     = True
RUN_BG_ENSEMBLE  = True
SAVE_TO_DRIVE    = False

# 3) Config

# CLAHE settings
CLAHE_CLIP_LIMIT  = 2.0
CLAHE_TILE_GRID   = (8, 8)

# Denoising config (ONLY for detection frames)
MEDIAN_KERNEL_SIZE = 5
DENOISE_EVERY_NTH  = 1

# Stage A (Tracking) config
yolo_det_weights = "yolov8s.pt"
yolo_conf        = 0.28
yolo_iou         = 0.65
yolo_imgsz       = 896
working_width    = 960
batch_size       = 24
vid_stride       = 1

# COCO subset for tracking
COCO = {
    "person":0, "bicycle":1, "car":2, "motorcycle":3, "bus":5, "truck":7,
    "bird":14, "cat":15, "dog":16, "horse":17, "sheep":18, "cow":19
}
CLASS_FILTER = {
    COCO["person"],COCO["bicycle"],COCO["car"],COCO["motorcycle"],
    COCO["bus"],COCO["truck"],
    COCO["cat"],COCO["dog"],COCO["horse"],COCO["sheep"],COCO["cow"],COCO["bird"]
}

# Colors for drawing tracked bounding boxes
COLOR = {
    "person": (0,220,0),
    "vehicle": (0,160,255),
    "pet": (255,120,0),
    "other": (200,200,200)
}

# Stage B: outlines config
yolo_seg_weights = "yolov8x-seg.pt"

# Stage C: BG semantic ensemble config
PROCESS_SCALE    = 0.6
FRAME_STRIDE     = 1
CONF_THRESH_BG   = 0.35
MIN_AREA_FRAC    = 0.0010
OPEN_K           = 3
CLOSE_K          = 5
MERGE_PAD_PX     = 3

# Foreground suppression config
DET_IMGSZ        = 960
DET_CONF         = 0.25
FG_CLASSES       = {0,1,2,3,5,7}
FG_SUPPRESS_GAMMA = 2.0
FG_SOFT_BLUR_SIGMA = 3.0

# 4) Device sanity (GPU required)
if not torch.cuda.is_available():
    raise RuntimeError("CUDA not available. In Colab: Runtime -> Change runtime type -> GPU, then rerun.")

DEVICE_STR = "cuda"
torch.cuda.set_device(0)

try:
    torch.set_float32_matmul_precision("high")
except Exception:
    pass
torch.backends.cudnn.benchmark = True

cv2.setNumThreads(max(1, os.cpu_count() or 1))
cv2.ocl.setUseOpenCL(False)

print(f"[INFO] Using: {DEVICE_STR} ({torch.cuda.get_device_name(0)})")

# # 5) Upload video
# print("Choose a video file from your computer (mp4/mov/etc.)...")
# uploaded = files.upload()
# if not uploaded:
#     raise RuntimeError("No video uploaded.")
# INPUT_VIDEO = "/content/" + next(iter(uploaded.keys()))
# print("[INFO] Uploaded:", INPUT_VIDEO)


[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/898.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[91m‚ï∏[0m [32m890.9/898.5 kB[0m [31m49.9 MB/s[0m eta [36m0:00:01[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m898.5/898.5 kB[0m [31m25.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m8.4/8.4 MB[0m [31m97.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m49.9/49.9 MB[0m [31m18.6 MB/s[0m eta [36m0:00:00[0m


In [2]:
# SHARED HELPERS + PREPROCESSING PIPELINE


def class_bucket(cid:int)->str:
    if cid == COCO["person"]:
        return "person"
    if cid in {COCO["bicycle"],COCO["car"],COCO["motorcycle"],COCO["bus"],COCO["truck"]}:
        return "vehicle"
    if cid in {COCO["cat"],COCO["dog"],COCO["horse"],COCO["sheep"],COCO["cow"],COCO["bird"]}:
        return "pet"
    return "other"

def draw_track(frame, tlbr, track_id, bucket):
    x1,y1,x2,y2 = map(int, tlbr)
    c = COLOR.get(bucket, COLOR["other"])
    cv2.rectangle(frame, (x1,y1), (x2,y2), c, 2)
    cv2.putText(frame, f"{bucket} #{track_id}", (x1, max(0,y1-6)),
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, c, 2)

def resize_keep_aspect(img, target_w):
    h,w = img.shape[:2]
    if not target_w or w == target_w:
        return img, (w,h)
    s = float(target_w)/float(w)
    new_w = target_w
    new_h = int(round(h*s))
    return cv2.resize(img, (new_w,new_h), interpolation=cv2.INTER_AREA), (new_w,new_h)

def apply_clahe_bgr(bgr: np.ndarray,
                    clip_limit: float = CLAHE_CLIP_LIMIT,
                    grid_size: Tuple[int,int] = CLAHE_TILE_GRID) -> np.ndarray:
    lab = cv2.cvtColor(bgr, cv2.COLOR_BGR2LAB)
    l,a,b = cv2.split(lab)
    l = cv2.createCLAHE(clipLimit=float(clip_limit),
                        tileGridSize=(int(grid_size[0]), int(grid_size[1]))).apply(l)
    return cv2.cvtColor(cv2.merge([l,a,b]), cv2.COLOR_LAB2BGR)

def ffmpeg_to_h264(inp, outp, fps):
    subprocess.run(
        ["ffmpeg","-y","-i",inp,"-movflags","+faststart","-vcodec","libx264",
         "-pix_fmt","yuv420p","-r",f"{fps:.3f}","-preset","veryfast",outp],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE
    )

def morph(mask, open_k=0, close_k=0, dilate_k=0):
    if open_k and open_k>1:
        mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN,
                                cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(open_k,open_k)))
    if close_k and close_k>1:
        mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE,
                                cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(close_k,close_k)))
    if dilate_k and dilate_k>0:
        mask = cv2.dilate(mask, cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(dilate_k,dilate_k)), iterations=1)
    return mask

def apply_median_denoise(frame, kernel_size=5):
    return cv2.medianBlur(frame, int(kernel_size))

# frame utilities
def _save_frame_at_ratio(video_path: str, out_path: str, ratio: float = 0.5) -> bool:
    try:
        if not os.path.exists(video_path) or os.path.getsize(video_path) < 1000:
            return False
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            return False
        total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
        if total <= 0:
            cap.release()
            return False
        r = min(max(ratio, 0.0), 1.0)
        idx = min(max(int(round((total - 1) * r)), 0), max(total - 1, 0))
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ok, frame = cap.read()
        cap.release()
        if not ok or frame is None:
            return False
        os.makedirs(os.path.dirname(out_path), exist_ok=True)
        return cv2.imwrite(out_path, frame)
    except Exception:
        return False

def save_best_tracking_paper_frame(
    out_path: str = "/content/paper_frame_tracking.jpg",
    prefer_list: List[str] = [
        "/content/debug_frame.jpg",
        "/content/debug_frame_midpoint.jpg",
        "/content/debug_frame_end.jpg",
    ],
    fallback_video: str = "/content/output_tracked.mp4",
    fallback_ratio: float = 0.5,
) -> bool:
    for p in prefer_list:
        if os.path.exists(p) and os.path.getsize(p) > 1000:
            try:
                img = cv2.imread(p)
                if img is not None:
                    return cv2.imwrite(out_path, img)
            except Exception:
                pass
    return _save_frame_at_ratio(fallback_video, out_path, ratio=fallback_ratio)

# Denoising stage
def run_video_denoising(input_path: str, output_path: str):
    print("\n[DENOISING] Enabled internally for detection only (median blur).")
    print("[DENOISING] Outputs remain sharp; file copy is used here.")
    shutil.copy(input_path, output_path)
    return output_path

print("[INFO] Helper functions loaded successfully")


[INFO] Helper functions loaded successfully


In [3]:
# STAGE A (YOLOv8 + DEEPSORT TRACKING)

def run_tracking_fullvideo(input_path:str, save_to_drive:bool=False):
    COCO_FILTER_SORTED = sorted(list(CLASS_FILTER))

    model = YOLO(yolo_det_weights)

    tracker = DeepSort(
        max_age=60, n_init=3, max_iou_distance=0.7, nms_max_overlap=1.0,
        embedder="mobilenet", embedder_gpu=True, half=True, bgr=True
    )

    cap_probe = cv2.VideoCapture(input_path)
    if not cap_probe.isOpened():
        raise RuntimeError("Cannot open video (probe).")

    fps_in = cap_probe.get(cv2.CAP_PROP_FPS) or 25.0
    total_frames = int(cap_probe.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
    ok, first_frame = cap_probe.read()
    if not ok:
        raise RuntimeError("Cannot read first frame (probe).")
    cap_probe.release()

    tmpdir = Path(tempfile.mkdtemp(prefix="yolo_track_"))
    local_in = str(tmpdir / Path(input_path).name)
    if os.path.abspath(local_in) != os.path.abspath(input_path):
        shutil.copy(input_path, local_in)
    else:
        local_in = input_path

    cap = cv2.VideoCapture(local_in)
    if not cap.isOpened():
        raise RuntimeError("Cannot open video (processing).")

    fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
    ok, fr0 = cap.read()
    if not ok:
        raise RuntimeError("Cannot read first frame (processing).")

    fr0s, (W,H) = resize_keep_aspect(fr0, working_width)

    raw_out_path = str(tmpdir / "temp_raw.mp4")
    writer_raw = cv2.VideoWriter(
        raw_out_path, cv2.VideoWriter_fourcc(*"mp4v"), max(1.0, fps), (W,H)
    )

    _ = model.predict(
        source=[np.zeros((int(H),int(W),3), np.uint8)],
        conf=yolo_conf, iou=yolo_iou, imgsz=yolo_imgsz,
        device="cuda", half=True, batch=1, verbose=False, classes=COCO_FILTER_SORTED
    )
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

    batch_bgr_detect = []
    batch_bgr_out    = []

    frames_written = 0
    detections_total = 0
    frame_idx = 0

    valid_debug_frame_saved = False
    midpoint_frame_index = max(1, total_frames//2)

    start_t = time.time()

    def run_yolo(frames_bgr):
        if not frames_bgr:
            return []
        return model.predict(
            source=frames_bgr, conf=yolo_conf, iou=yolo_iou, imgsz=yolo_imgsz,
            device="cuda", half=True, batch=len(frames_bgr), verbose=False, classes=COCO_FILTER_SORTED
        )

    def save_debug_frame(img_bgr, out_path):
        try:
            cv2.imwrite(out_path, img_bgr)
            print("[INFO] Saved debug frame to", out_path)
        except Exception:
            pass

    while True:
        ok, frame = cap.read()
        if not ok:
            break

        if (frame_idx % vid_stride) != 0:
            frame_idx += 1
            continue

        resized_original, _ = resize_keep_aspect(frame, working_width)
        out_frame = apply_clahe_bgr(resized_original)

        det_src = frame
        if RUN_DENOISING and (DENOISE_EVERY_NTH <= 1 or (frame_idx % int(DENOISE_EVERY_NTH) == 0)):
            det_src = apply_median_denoise(det_src, MEDIAN_KERNEL_SIZE)

        resized_det, _ = resize_keep_aspect(det_src, working_width)
        det_frame = apply_clahe_bgr(resized_det)

        batch_bgr_detect.append(det_frame)
        batch_bgr_out.append(out_frame)

        if len(batch_bgr_detect) >= batch_size:
            results = run_yolo(batch_bgr_detect)

            for bi, r in enumerate(results):
                f_bgr = batch_bgr_out[bi]
                f_bgr_detect = batch_bgr_detect[bi]
                had_det = False

                if getattr(r, "boxes", None) is not None and len(r.boxes) > 0:
                    boxes = r.boxes.xyxy.detach().cpu().numpy()
                    confs = r.boxes.conf.detach().cpu().numpy()
                    clses = r.boxes.cls.detach().cpu().numpy().astype(int)

                    det = []
                    for (x1,y1,x2,y2), cf, cid in zip(boxes, confs, clses):
                        if cf < yolo_conf or cid not in CLASS_FILTER:
                            continue
                        det.append(([float(x1), float(y1), float(x2-x1), float(y2-y1)], float(cf), int(cid)))

                    tracks = tracker.update_tracks(det, frame=f_bgr_detect)
                    detections_total += len(det)

                    for t in tracks:
                        if not t.is_confirmed() or t.time_since_update > 0:
                            continue
                        det_cls = getattr(t, "det_class", getattr(t, "cls", COCO["person"]))
                        draw_track(f_bgr, t.to_ltrb(), t.track_id, class_bucket(int(det_cls)))
                        had_det = True

                writer_raw.write(f_bgr)
                frames_written += 1

                if had_det and (not valid_debug_frame_saved):
                    save_debug_frame(f_bgr, "/content/debug_frame.jpg")
                    valid_debug_frame_saved = True

                if (not valid_debug_frame_saved) and (frame_idx >= midpoint_frame_index):
                    save_debug_frame(f_bgr, "/content/debug_frame_midpoint.jpg")

            batch_bgr_detect.clear()
            batch_bgr_out.clear()

        frame_idx += 1

    if batch_bgr_detect:
        results = run_yolo(batch_bgr_detect)
        for bi, r in enumerate(results):
            f_bgr = batch_bgr_out[bi]
            f_bgr_detect = batch_bgr_detect[bi]
            had_det = False

            if getattr(r, "boxes", None) is not None and len(r.boxes) > 0:
                boxes = r.boxes.xyxy.detach().cpu().numpy()
                confs = r.boxes.conf.detach().cpu().numpy()
                clses = r.boxes.cls.detach().cpu().numpy().astype(int)

                det=[]
                for (x1,y1,x2,y2), cf, cid in zip(boxes, confs, clses):
                    if cf < yolo_conf or cid not in CLASS_FILTER:
                        continue
                    det.append(([float(x1), float(y1), float(x2-x1), float(y2-y1)], float(cf), int(cid)))

                tracks = tracker.update_tracks(det, frame=f_bgr_detect)
                detections_total += len(det)
                for t in tracks:
                    if not t.is_confirmed() or t.time_since_update > 0:
                        continue
                    det_cls = getattr(t, "det_class", getattr(t, "cls", COCO["person"]))
                    draw_track(f_bgr, t.to_ltrb(), t.track_id, class_bucket(int(det_cls)))
                    had_det = True

            writer_raw.write(f_bgr)
            frames_written += 1

            if had_det and (not valid_debug_frame_saved):
                save_debug_frame(f_bgr, "/content/debug_frame.jpg")
                valid_debug_frame_saved = True

            if (not valid_debug_frame_saved) and (bi == len(results) - 1):
                save_debug_frame(f_bgr, "/content/debug_frame_end.jpg")

    cap.release()
    writer_raw.release()

    final_path = "/content/output_tracked.mp4"
    ffmpeg_to_h264(raw_out_path, final_path, fps_in)

    drive_copy = None
    if save_to_drive:
        try:
            from google.colab import drive
            drive.mount("/content/drive", force_remount=False)
            drive_dir = Path("/content/drive/MyDrive/colab_videos")
            drive_dir.mkdir(parents=True, exist_ok=True)
            drive_copy = str(drive_dir / Path(final_path).name)
            shutil.copy(final_path, drive_copy)
        except Exception as e:
            print("[WARN] Drive copy failed:", e)

    elapsed = time.time() - start_t
    eff_fps = frames_written / max(1e-6, elapsed)

    stats = {
        "device": DEVICE_STR,
        "gpu_name": torch.cuda.get_device_name(0),
        "mode_used": "paper_clahe + denoise_for_detection_only",
        "vid_stride_used": vid_stride,
        "frames_written": frames_written,
        "detections_total": detections_total,
        "elapsed_sec_total_pipeline": round(elapsed, 2),
        "effective_processing_fps": round(eff_fps, 2),
        "input_fps": fps_in,
        "input_frames": total_frames,
        "writer_fps": fps_in,
        "yolo_weights": yolo_det_weights,
        "yolo_conf": yolo_conf,
        "yolo_imgsz": yolo_imgsz,
        "output_file": final_path,
        "debug_frame_primary": "/content/debug_frame.jpg",
        "debug_frame_mid": "/content/debug_frame_midpoint.jpg",
        "debug_frame_end": "/content/debug_frame_end.jpg",
        "drive_copy": drive_copy,
        "clahe_enabled": True,
        "clahe_clip_limit": CLAHE_CLIP_LIMIT,
        "clahe_tile_grid": CLAHE_TILE_GRID,
        "denoising_enabled_for_detection": bool(RUN_DENOISING),
        "median_kernel_size": int(MEDIAN_KERNEL_SIZE),
    }
    with open("/content/run_stats.json","w") as f:
        json.dump(stats, f, indent=2)

    return stats


In [4]:
# STAGE B (SEGMENTATION OUTLINES)


def run_outlines(input_path:str, out_path:str):
    model = YOLO(yolo_seg_weights)
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        raise RuntimeError("Could not open video for outlines.")
    fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w,h))

    with tqdm(total=total_frames, desc="Outlines", unit="frame") as pbar:
        while True:
            ok, frame = cap.read()
            if not ok:
                break

            frame_vis = apply_clahe_bgr(frame)

            res = model.predict(frame_vis, verbose=False, device="cuda")
            r = res[0]

            masks_xy = []

            if getattr(r, "masks", None) is not None:
                if getattr(r.masks, "xy", None) is not None:
                    masks_xy = r.masks.xy

                elif getattr(r.masks, "data", None) is not None:
                    H,W = frame_vis.shape[:2]
                    for mi in r.masks.data.detach().cpu().numpy():
                        m = (mi>0.5).astype(np.uint8)*255
                        m = cv2.resize(m,(W,H), interpolation=cv2.INTER_NEAREST)
                        cnts,_ = cv2.findContours(m, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
                        for c in cnts:
                            masks_xy.append(c.reshape(-1,2))

            for pts in masks_xy:
                pts = np.asarray(pts, dtype=np.int32)
                if pts.ndim == 2:
                    pts = pts.reshape(-1,1,2)
                cv2.polylines(frame_vis, [pts], isClosed=True, color=(0,255,0), thickness=2)

            writer.write(frame_vis)
            pbar.update(1)

    cap.release()
    writer.release()
    print("[OK] Outlines saved ->", out_path)


In [5]:
#STAGE C + FINAL RUNNER

class ADE20KEnsembler:
    def __init__(self, device="cuda"):
        self.device = device

        self.proc1 = SegformerImageProcessor.from_pretrained("nvidia/segformer-b0-finetuned-ade-512-512")
        self.m1    = SegformerForSemanticSegmentation.from_pretrained("nvidia/segformer-b0-finetuned-ade-512-512").to(device).eval()

        self.proc2 = AutoImageProcessor.from_pretrained("openmmlab/upernet-convnext-tiny")
        self.m2    = UperNetForSemanticSegmentation.from_pretrained("openmmlab/upernet-convnext-tiny").to(device).eval()

        self.id2label_1 = self.m1.config.id2label
        self.id2label_2 = self.m2.config.id2label

        def ids_for(tokens, id2label):
            s=set()
            for cid,name in id2label.items():
                n=str(name).lower()
                if any(tok in n for tok in tokens):
                    s.add(int(cid))
            return s

        water_tokens = {"water","river","sea","ocean","lake","pond","canal","fountain","waterfall","pool","swimming"}
        veg_tokens   = {"tree","grass","plant","vegetation","forest","bush","shrub","hedge","branch","palm","leaf","leaves","meadow","field"}
        sky_tokens   = {"sky","cloud","clouds"}
        building_tokens = {"building","house","skyscraper","edifice","tower","structure","construction","architecture"}
        road_tokens  = {"road","street","path","highway","pavement","sidewalk","lane","asphalt","pathway"}
        grass_tokens = {"grass","lawn","turf","meadow","field"}

        self.water_ids_1 = ids_for(water_tokens, self.id2label_1)
        self.veg_ids_1   = ids_for(veg_tokens,   self.id2label_1)
        self.sky_ids_1   = ids_for(sky_tokens,   self.id2label_1)
        self.building_ids_1 = ids_for(building_tokens, self.id2label_1)
        self.road_ids_1  = ids_for(road_tokens,  self.id2label_1)
        self.grass_ids_1 = ids_for(grass_tokens, self.id2label_1)

        self.water_ids_2 = ids_for(water_tokens, self.id2label_2)
        self.veg_ids_2   = ids_for(veg_tokens,   self.id2label_2)
        self.sky_ids_2   = ids_for(sky_tokens,   self.id2label_2)
        self.building_ids_2 = ids_for(building_tokens, self.id2label_2)
        self.road_ids_2  = ids_for(road_tokens,  self.id2label_2)
        self.grass_ids_2 = ids_for(grass_tokens, self.id2label_2)

    @torch.inference_mode()
    def predict_probs(self, bgr: np.ndarray, amp=True):
        rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)

        inp1 = self.proc1(images=rgb, return_tensors="pt").to(self.device)
        inp2 = self.proc2(images=rgb, return_tensors="pt").to(self.device)

        autocast_ctx = torch.cuda.amp.autocast if (self.device=="cuda" and amp) else nullcontext
        with autocast_ctx(dtype=torch.float16) if (self.device=="cuda" and amp) else nullcontext():
            out1 = self.m1(**inp1).logits
            out2 = self.m2(**inp2).logits

        H,W = rgb.shape[:2]

        p1 = nn.functional.interpolate(out1, size=(H,W), mode="bilinear", align_corners=False).float().softmax(dim=1)[0]
        p2 = nn.functional.interpolate(out2, size=(H,W), mode="bilinear", align_corners=False).float().softmax(dim=1)[0]

        def sum_probs(pmap, ids:set):
            if not ids:
                return torch.zeros_like(pmap[0])
            idx = torch.tensor(sorted(list(ids)), device=pmap.device, dtype=torch.long)
            idx = idx[(idx>=0) & (idx<pmap.shape[0])]
            return pmap.index_select(0, idx).sum(dim=0) if idx.numel() else torch.zeros_like(pmap[0])

        water_p    = (sum_probs(p1,self.water_ids_1)    + sum_probs(p2,self.water_ids_2))    * 0.5
        veg_p      = (sum_probs(p1,self.veg_ids_1)      + sum_probs(p2,self.veg_ids_2))      * 0.5
        sky_p      = (sum_probs(p1,self.sky_ids_1)      + sum_probs(p2,self.sky_ids_2))      * 0.5
        building_p = (sum_probs(p1,self.building_ids_1) + sum_probs(p2,self.building_ids_2)) * 0.5
        road_p     = (sum_probs(p1,self.road_ids_1)     + sum_probs(p2,self.road_ids_2))     * 0.5
        grass_p    = (sum_probs(p1,self.grass_ids_1)    + sum_probs(p2,self.grass_ids_2))    * 0.5

        return (water_p.detach().cpu().numpy(),
                veg_p.detach().cpu().numpy(),
                sky_p.detach().cpu().numpy(),
                building_p.detach().cpu().numpy(),
                road_p.detach().cpu().numpy(),
                grass_p.detach().cpu().numpy())

class ForegroundSuppressor:
    def __init__(self, device="cuda"):
        self.model = YOLO("yolov8n-seg.pt")
        self.device = 0 if device=="cuda" else "cpu"

    @torch.inference_mode()
    def get_fg_mask(self, frame_bgr: np.ndarray, classes=FG_CLASSES, imgsz=DET_IMGSZ, conf=DET_CONF):
        H,W = frame_bgr.shape[:2]
        res = self.model.predict(frame_bgr, imgsz=imgsz, conf=conf, device=self.device, verbose=False)
        if not res or len(res)==0:
            return np.zeros((H,W), np.uint8)
        r = res[0]
        if r.masks is None or r.boxes is None:
            return np.zeros((H,W), np.uint8)

        clses = r.boxes.cls.cpu().numpy().astype(int)

        if getattr(r.masks, "xy", None) is not None:
            canvas = np.zeros((H,W), np.uint8)
            for poly, cls_id in zip(r.masks.xy, clses):
                if cls_id not in classes:
                    continue
                pts = np.asarray(poly, dtype=np.int32)
                if pts.ndim == 2:
                    pts = pts.reshape(-1,1,2)
                cv2.fillPoly(canvas, [pts], 255)
            k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(3,3))
            return cv2.morphologyEx(canvas, cv2.MORPH_CLOSE, k, iterations=1)

        data = []
        if getattr(r.masks, "data", None) is not None:
            data = r.masks.data.cpu().numpy()

        fg = np.zeros((H,W), np.uint8)
        for mi, cls_id in zip(data, clses):
            if cls_id not in classes:
                continue
            mask = (mi>0.5).astype(np.uint8)*255
            mask = cv2.resize(mask,(W,H), interpolation=cv2.INTER_NEAREST)
            fg = np.maximum(fg, mask)

        k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(3,3))
        return cv2.morphologyEx(fg, cv2.MORPH_CLOSE, k, iterations=1)

def run_bg_ensemble(input_path:str, out_path:str):
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        raise RuntimeError("Could not open video")

    fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
    W0 = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    H0 = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    Wp = max(64, int(W0*PROCESS_SCALE))
    Hp = max(64, int(H0*PROCESS_SCALE))

    writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (W0,H0))

    ens = ADE20KEnsembler(device="cuda")
    fgnet = ForegroundSuppressor(device="cuda")

    def draw_label_smart(img, x1,y1,x2,y2, text, color, font_scale=0.55, thick=2, pad=6):
        (tw,th),_ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, thick)
        H,W = img.shape[:2]

        ay2 = y1 - pad
        ay1 = ay2 - th - pad
        if ay1 >= 0:
            bx1 = max(0, min(W-(tw+2*pad), x1)); bx2 = min(W, bx1+tw+2*pad)
            cv2.rectangle(img, (bx1,ay1), (bx2,ay2), color, -1)
            cv2.putText(img, text, (bx1+pad, ay2-pad), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0,0,0), thick)
            return

        by1 = y2 + pad
        by2 = by1 + th + pad
        if by2 <= H:
            bx1 = max(0, min(W-(tw+2*pad), x1)); bx2 = min(W, bx1+tw+2*pad)
            cv2.rectangle(img, (bx1,by1), (bx2,by2), color, -1)
            cv2.putText(img, text, (bx1+pad, by2-pad), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0,0,0), thick)
            return

        rx1 = x2 + pad
        rx2 = rx1 + tw + 2*pad
        if rx2 > W:
            shift = rx2 - W
            rx1 -= shift; rx2 -= shift
            rx1 = max(0, rx1)
        ry1 = int((y1 + y2 - th - 2*pad) / 2)
        ry1 = max(0, min(H - (th + 2*pad), ry1))
        ry2 = ry1 + th + 2*pad
        cv2.rectangle(img,(rx1,ry1),(rx2,ry2), color, -1)
        cv2.putText(img, text, (rx1+pad, ry2-pad), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0,0,0), thick)

    i = 0
    t0 = time.time()

    with tqdm(total=total_frames, desc="BG Ensemble", unit="frame") as pbar:
        while True:
            ok, frame_full = cap.read()
            if not ok:
                break

            if (i % FRAME_STRIDE) != 0:
                i += 1
                pbar.update(1)
                continue

            frame_full = apply_clahe_bgr(frame_full)

            fg_mask_u8 = fgnet.get_fg_mask(frame_full)
            fg_prob = fg_mask_u8.astype(np.float32) / 255.0

            if FG_SOFT_BLUR_SIGMA and FG_SOFT_BLUR_SIGMA > 0:
                fg_prob = cv2.GaussianBlur(fg_prob, (0,0), float(FG_SOFT_BLUR_SIGMA))
                fg_prob = np.clip(fg_prob, 0.0, 1.0)

            suppress_factor = np.power(1.0 - fg_prob, float(FG_SUPPRESS_GAMMA))

            frame_proc = cv2.resize(frame_full, (Wp,Hp), interpolation=cv2.INTER_AREA)

            try:
                pw, pv, ps, pb, pr, pg = ens.predict_probs(frame_proc, amp=True)
            except torch.cuda.OutOfMemoryError:
                print("[WARN] CUDA OOM -> ensemble on CPU...")
                ens.device = "cpu"; ens.m1=ens.m1.to("cpu"); ens.m2=ens.m2.to("cpu")
                pw, pv, ps, pb, pr, pg = ens.predict_probs(frame_proc, amp=False)

            pw = cv2.resize(pw,(W0,H0), interpolation=cv2.INTER_LINEAR)
            pv = cv2.resize(pv,(W0,H0), interpolation=cv2.INTER_LINEAR)
            ps = cv2.resize(ps,(W0,H0), interpolation=cv2.INTER_LINEAR)
            pb = cv2.resize(pb,(W0,H0), interpolation=cv2.INTER_LINEAR)
            pr = cv2.resize(pr,(W0,H0), interpolation=cv2.INTER_LINEAR)
            pg = cv2.resize(pg,(W0,H0), interpolation=cv2.INTER_LINEAR)

            pw *= suppress_factor
            pv *= suppress_factor
            ps *= suppress_factor
            pb *= suppress_factor
            pr *= suppress_factor
            pg *= suppress_factor

            probs = np.stack([pw,pv,ps,pb,pr,pg], axis=0)
            maxprob = probs.max(axis=0)
            cls = probs.argmax(axis=0).astype(np.int32)
            cls[maxprob < CONF_THRESH_BG] = 6

            mw = (cls==0).astype(np.uint8)*255
            mv = (cls==1).astype(np.uint8)*255
            ms = (cls==2).astype(np.uint8)*255
            mb = (cls==3).astype(np.uint8)*255
            mr = (cls==4).astype(np.uint8)*255
            mg = (cls==5).astype(np.uint8)*255

            mw = morph(mw, OPEN_K, CLOSE_K, MERGE_PAD_PX)
            mv = morph(mv, OPEN_K, CLOSE_K, MERGE_PAD_PX)
            ms = morph(ms, OPEN_K, CLOSE_K, MERGE_PAD_PX)
            mb = morph(mb, OPEN_K, CLOSE_K, MERGE_PAD_PX)
            mr = morph(mr, OPEN_K, CLOSE_K, MERGE_PAD_PX)
            mg = morph(mg, OPEN_K, CLOSE_K, MERGE_PAD_PX)

            area_min = int(MIN_AREA_FRAC * W0 * H0)

            def comps_to_boxes(mask, prob_full):
                num, lbl, stats, _ = cv2.connectedComponentsWithStats(mask, connectivity=8)
                boxes=[]; scores=[]
                for cid in range(1, num):
                    x,y,w,h,area = stats[cid]
                    if area < area_min:
                        continue
                    boxes.append([x,y,x+w,y+h])
                    roi = (slice(y,y+h), slice(x,x+w))
                    scores.append(float(np.mean(prob_full[roi])))
                return boxes, scores

            bw,  sw  = comps_to_boxes(mw, pw)
            bv,  sv  = comps_to_boxes(mv, pv)
            bs,  ss  = comps_to_boxes(ms, ps)
            bb,  sb  = comps_to_boxes(mb, pb)
            brd, srd = comps_to_boxes(mr, pr)
            bgr, sgr = comps_to_boxes(mg, pg)

            vis = frame_full.copy()

            def draw_many(boxes, scores, color, name):
                for b, sc in zip(boxes, scores):
                    x1,y1,x2,y2 = map(int, b)
                    cv2.rectangle(vis, (x1,y1), (x2,y2), color, 2)
                    draw_label_smart(vis, x1,y1,x2,y2, f"{name} {sc:.2f}",
                                     color=(int(color[0]),int(color[1]),int(color[2])))

            draw_many(bw,  sw,  (0,0,255),   "water")
            draw_many(bv,  sv,  (0,180,0),   "vegetation")
            draw_many(bs,  ss,  (0,255,255), "sky")
            draw_many(bb,  sb,  (255,0,0),   "building")
            draw_many(brd, srd, (128,0,128), "road")
            draw_many(bgr, sgr, (0,255,128), "grass")

            cv2.putText(
                vis,
                f"W:{len(bw)} V:{len(bv)} S:{len(bs)} B:{len(bb)} R:{len(brd)} G:{len(bgr)} | s={PROCESS_SCALE} | gamma={FG_SUPPRESS_GAMMA}",
                (12, H0-12),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 2
            )

            writer.write(vis)
            i += 1
            pbar.update(1)

    cap.release()
    writer.release()
    elapsed = time.time() - t0
    print(f"[OK] BG boxes saved -> {out_path} ({i} frames in {elapsed:.1f}s)")

print("[INFO] Processing functions loaded successfully")



[INFO] Processing functions loaded successfully


In [6]:
# # ============================================================
# # FINAL RUNNER (RUN STAGES + SAVES + DOWNLOADS)
# # ============================================================

# if RUN_DENOISING:
#     denoised_video = "/content/video_denoised.mp4"
#     run_video_denoising(INPUT_VIDEO, denoised_video)
#     PROCESSING_VIDEO = denoised_video
# else:
#     PROCESSING_VIDEO = INPUT_VIDEO

# stats = None

# if RUN_TRACKING:
#     stats = run_tracking_fullvideo(PROCESSING_VIDEO, save_to_drive=SAVE_TO_DRIVE)
#     print(json.dumps(stats, indent=2))
#     try:
#         ok_pf = save_best_tracking_paper_frame("/content/paper_frame_tracking.jpg")
#         print("[paper] tracking still saved:", ok_pf)
#     except Exception as e:
#         print("[paper] tracking still failed:", e)

# if RUN_OUTLINES:
#     run_outlines(PROCESSING_VIDEO, "/content/outlined_output.mp4")
#     try:
#         ok_pf2 = _save_frame_at_ratio("/content/outlined_output.mp4",
#                                       "/content/paper_frame_outlines.jpg",
#                                       ratio=0.5)
#         print("[paper] outlines still saved:", ok_pf2)
#     except Exception as e:
#         print("[paper] outlines still failed:", e)

# if RUN_BG_ENSEMBLE:
#     run_bg_ensemble(PROCESSING_VIDEO, "/content/bg_boxes_clean_colors.mp4")
#     try:
#         ok_pf3 = _save_frame_at_ratio("/content/bg_boxes_clean_colors.mp4",
#                                       "/content/paper_frame_bg.jpg",
#                                       ratio=0.5)
#         print("[paper] bg still saved:", ok_pf3)
#     except Exception as e:
#         print("[paper] bg still failed:", e)

# print("\n=== ENVIRONMENT PROOF ===")
# import ultralytics, deep_sort_realtime
# print("Python:", sys.version)
# print("Torch:", torch.__version__)
# print("Torch CUDA available:", torch.cuda.is_available())
# print("CUDA device:", torch.cuda.get_device_name(0))
# print("Ultralytics version:", ultralytics.__version__)
# print("OpenCV version:", cv2.__version__)
# print("deep_sort_realtime version:", deep_sort_realtime.__version__)

# print("\nPreparing downloads...")
# def try_dl(p):
#     if os.path.exists(p):
#         try:
#             files.download(p)
#         except Exception as e:
#             print("download failed:", p, e)

# for p in [
#     "/content/video_denoised.mp4",
#     "/content/run_stats.json",
#     "/content/debug_frame.jpg",
#     "/content/debug_frame_midpoint.jpg",
#     "/content/debug_frame_end.jpg",
#     "/content/output_tracked.mp4",
#     "/content/outlined_output.mp4",
#     "/content/bg_boxes_clean_colors.mp4",
#     "/content/paper_frame_tracking.jpg",
#     "/content/paper_frame_outlines.jpg",
#     "/content/paper_frame_bg.jpg",
# ]:
#     try_dl(p)

# print("\n‚úÖ PIPELINE COMPLETE!")
# print("=" * 60)
# print("OUTPUTS:")
# if RUN_DENOISING:
#     print("  - Denoised video (copied): /content/video_denoised.mp4")
# if RUN_TRACKING:
#     print("  - Tracked video: /content/output_tracked.mp4")
# if RUN_OUTLINES:
#     print("  - Outlined video: /content/outlined_output.mp4")
# if RUN_BG_ENSEMBLE:
#     print("  - BG detection (6 classes): /content/bg_boxes_clean_colors.mp4")
# print("=" * 60)


In [None]:
!pip -q install -U gradio

import os, shutil, time, socket, threading
import gradio as gr

# ============================================================
# NEURO_VISION ‚Äî Gradio (Colab-friendly uploader + live logs)
# Outputs: 4 tiles (Denoise, Tracking/FG, Outlines, Background)
# ============================================================

# ---- 1) Choose a free port (fixes "Cannot find empty port 7860") ----
def find_free_port(start=7860, tries=50):
    for p in range(start, start + tries):
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            try:
                s.bind(("0.0.0.0", p))
                return p
            except OSError:
                continue
    return start + tries

SERVER_PORT = find_free_port(7860, 50)

# ---- 2) Live log capture ----
class LiveLogger:
    def __init__(self):
        self.lines = []
        self.lock = threading.Lock()

    def log(self, *args):
        msg = " ".join(str(a) for a in args)
        with self.lock:
            self.lines.append(msg)
        print(msg)

    def get_text(self):
        with self.lock:
            return "\n".join(self.lines[-350:])

LOGGER = LiveLogger()

# ============================================================
# 3) OPTIONAL: Safe H.264 re-encode (only if helper exists)
#    If your notebook already defines ffmpeg_to_h264(), we use it.
# ============================================================
def _maybe_h264(src_path, dst_path, fps=25.0):
    if not src_path or not os.path.exists(src_path):
        return None
    if "ffmpeg_to_h264" in globals() and callable(globals()["ffmpeg_to_h264"]):
        try:
            globals()["ffmpeg_to_h264"](src_path, dst_path, fps)
            return dst_path if os.path.exists(dst_path) else src_path
        except Exception:
            return src_path
    return src_path

# ============================================================
# 4) Pipeline runner
#    IMPORTANT: This uses YOUR existing notebook functions:
#      - run_video_denoising(in_path, out_path)
#      - run_tracking_fullvideo(in_path, save_to_drive=False)  [your signature]
#      - run_outlines(in_path, out_path)                      [your signature]
#      - run_bg_ensemble(in_path, out_path)                   [your signature]
#
#    And your notebook save paths are:
#      /content/video_denoised.mp4
#      /content/output_tracked.mp4
#      /content/outlined_output.mp4
#      /content/bg_boxes_clean_colors.mp4
# ============================================================
def run_pipeline(video_path, do_denoise, do_fg, do_outline, do_bg):
    LOGGER.lines = []
    t0 = time.time()

    # stable input copy
    in_path = "/content/input_video.mp4"
    shutil.copy(video_path, in_path)
    LOGGER.log("[INFO] Input copied ->", in_path)

    # set your global switches (if your notebook uses them)
    global RUN_DENOISING, RUN_TRACKING, RUN_OUTLINES, RUN_BG_ENSEMBLE
    RUN_DENOISING   = bool(do_denoise)
    RUN_TRACKING    = bool(do_fg)
    RUN_OUTLINES    = bool(do_outline)
    RUN_BG_ENSEMBLE = bool(do_bg)

    out_denoise = None
    out_track   = None
    out_outline = None
    out_bg      = None

    # -----------------------------
    # STAGE 1: Denoising
    # -----------------------------
    proc_in = in_path
    if RUN_DENOISING:
        denoised = "/content/video_denoised.mp4"
        LOGGER.log("[DENOISING] starting...")
        run_video_denoising(in_path, denoised)
        proc_in = denoised
        out_denoise = denoised
        LOGGER.log("[DENOISING] done ->", denoised)

        out_denoise = _maybe_h264(out_denoise, "/content/video_denoised_h264.mp4", 25.0)
        if out_denoise and out_denoise.endswith("_h264.mp4"):
            proc_in = out_denoise
            LOGGER.log("[DENOISING] re-encoded ->", out_denoise)

    # -----------------------------
    # STAGE 2: Foreground / Tracking
    # -----------------------------
    if RUN_TRACKING:
        LOGGER.log("[TRACKING] starting...")
        stats = run_tracking_fullvideo(proc_in, save_to_drive=False)
        # your function may return dict with output_file / input_fps
        out_track = stats.get("output_file") if isinstance(stats, dict) else "/content/output_tracked.mp4"
        fps = stats.get("input_fps", 25.0) if isinstance(stats, dict) else 25.0
        if not out_track:
            out_track = "/content/output_tracked.mp4"
        LOGGER.log("[TRACKING] done ->", out_track)

        out_track = _maybe_h264(out_track, "/content/output_tracked_h264.mp4", fps)
        if out_track and out_track.endswith("_h264.mp4"):
            LOGGER.log("[TRACKING] re-encoded ->", out_track)

    # -----------------------------
    # STAGE 3: Contour Outlines
    # (your notebook prints /content/outlined_output.mp4)
    # -----------------------------
    if RUN_OUTLINES:
        LOGGER.log("[OUTLINES] starting...")
        outlined = "/content/outlined_output.mp4"
        run_outlines(proc_in, outlined)
        out_outline = outlined
        LOGGER.log("[OUTLINES] done ->", out_outline)

        out_outline = _maybe_h264(out_outline, "/content/outlined_output_h264.mp4", 25.0)
        if out_outline and out_outline.endswith("_h264.mp4"):
            LOGGER.log("[OUTLINES] re-encoded ->", out_outline)

    # -----------------------------
    # STAGE 4: Background Ensemble
    # (your notebook prints /content/bg_boxes_clean_colors.mp4)
    # -----------------------------
    if RUN_BG_ENSEMBLE:
        LOGGER.log("[BG] starting...")
        bg_out = "/content/bg_boxes_clean_colors.mp4"
        run_bg_ensemble(proc_in, bg_out)
        out_bg = bg_out
        LOGGER.log("[BG] done ->", out_bg)

        out_bg = _maybe_h264(out_bg, "/content/bg_boxes_h264.mp4", 25.0)
        if out_bg and out_bg.endswith("_h264.mp4"):
            LOGGER.log("[BG] re-encoded ->", out_bg)

    LOGGER.log("[DONE] total_sec =", round(time.time() - t0, 2))
    return out_denoise, out_track, out_outline, out_bg

# ============================================================
# 5) Generator fn: keeps connection alive by yielding logs
# ============================================================
def gradio_runner(video, do_denoise, do_fg, do_outline, do_bg):
    if video is None:
        yield None, None, None, None, "Upload a video first."
        return

    video_path = video  # in your working reference, this is a filepath string

    result = {"denoise": None, "track": None, "outline": None, "bg": None, "done": False, "err": None}

    def _worker():
        try:
            d, t, o, b = run_pipeline(video_path, do_denoise, do_fg, do_outline, do_bg)
            result["denoise"], result["track"], result["outline"], result["bg"] = d, t, o, b
        except Exception as e:
            result["err"] = str(e)
        finally:
            result["done"] = True

    th = threading.Thread(target=_worker, daemon=True)
    th.start()

    while not result["done"]:
        yield result["denoise"], result["track"], result["outline"], result["bg"], LOGGER.get_text()
        time.sleep(0.7)

    if result["err"]:
        yield None, None, None, None, LOGGER.get_text() + "\n\n[ERROR] " + result["err"]
    else:
        yield result["denoise"], result["track"], result["outline"], result["bg"], LOGGER.get_text()

# ============================================================
# 6) UI ‚Äî NEURO_VISION (2x2 outputs + logs)
# ============================================================
with gr.Blocks(css="""
.gradio-container { background: #0f0f0f !important; color: #fff !important; }
h1, h2, h3 { color: #22c55e !important; }
""") as demo:

    gr.HTML("<h1 style='text-align:center; font-family:monospace;'>üé¨ NEURO_VISION</h1>")
    gr.Markdown("**Colab-friendly uploader + live logs + 4 output tiles (2√ó2)**")

    with gr.Row():
        video_in = gr.Video(label="Upload Video", format="mp4")  # keep reference uploader behavior
        with gr.Column():
            opt_denoise = gr.Checkbox(value=True,  label="‚ú® Denoising")
            opt_fg      = gr.Checkbox(value=True,  label="üéØ Foreground/Tracking (Stage A)")
            opt_outline = gr.Checkbox(value=True,  label="üìê Contour Outlining (Stage B)")
            opt_bg      = gr.Checkbox(value=True,  label="üé® Background (Stage C)")
            run_btn     = gr.Button("‚ñ∂ RUN NEURO_VISION", variant="primary")

    logs = gr.Textbox(label="Live Logs", lines=20)

    gr.Markdown("### Outputs (2√ó2)")
    with gr.Row():
        out_denoise = gr.Video(label="Denoised", format="mp4")
        out_track   = gr.Video(label="Tracking / FG", format="mp4")
    with gr.Row():
        out_outline = gr.Video(label="Outlines", format="mp4")
        out_bg      = gr.Video(label="Background", format="mp4")

    run_btn.click(
        fn=gradio_runner,
        inputs=[video_in, opt_denoise, opt_fg, opt_outline, opt_bg],
        outputs=[out_denoise, out_track, out_outline, out_bg, logs],
        concurrency_limit=1
    )

# Queue (version-safe)
try:
    demo.queue(max_size=8, default_concurrency_limit=1)
except TypeError:
    demo.queue()

demo.launch(share=True, server_port=SERVER_PORT, debug=True, max_threads=2)


[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m23.0/23.0 MB[0m [31m58.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m55.6/55.6 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25hColab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://cd471127b0f5377a00.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
