In [None]:
# Reset pulito (opzionale)
!rm -rf /content/RAFT /content/oflow2d /content/outputs

# Dipendenze base (torch è già in Colab)
!pip -q install opencv-python tqdm matplotlib


In [None]:
!git clone https://github.com/princeton-vl/RAFT /content/RAFT
%cd /content/RAFT
!bash download_models.sh
%cd /content

# Verifica file essenziali
import os
for p in [
    "/content/RAFT/models/raft-small.pth",
    "/content/RAFT/models/raft-sintel.pth",
    "/content/RAFT/core/raft.py",
    "/content/RAFT/core/utils/utils.py",
]:
    print(("✓" if os.path.exists(p) else "✗"), p)


In [None]:

import os, pathlib, textwrap
base = pathlib.Path("/content/oflow2d")
(base/"common").mkdir(parents=True, exist_ok=True)
(base/"adapters").mkdir(parents=True, exist_ok=True)

# __init__.py
(base/"__init__.py").write_text("__all__=['common','adapters']\n")

# common/utils.py
(base/"common"/"utils.py").write_text(textwrap.dedent("""
import cv2, numpy as np

def load_video_frames(video_path, max_frames=None, stride=1):
    cap = cv2.VideoCapture(str(video_path))
    assert cap.isOpened(), f'Impossibile aprire {video_path}'
    frames, i = [], 0
    while True:
        ok, frame = cap.read()
        if not ok: break
        if i % stride == 0:
            frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            if max_frames and len(frames) >= max_frames: break
        i += 1
    cap.release()
    if len(frames) < 2: raise RuntimeError("Servono almeno 2 frame nel video.")
    return frames

def pairwise(frames):
    for i in range(len(frames)-1):
        yield frames[i], frames[i+1]
"""))

# common/viz.py
(base/"common"/"viz.py").write_text(textwrap.dedent("""
import numpy as np, cv2
def flow_to_color(flow, clip_flow=None):
    fx = flow[...,0]; fy = flow[...,1]
    rad = (fx**2 + fy**2) ** 0.5
    ang = np.arctan2(fy, fx)
    if clip_flow is not None: rad = np.clip(rad, 0, clip_flow)
    rad_n = (rad - rad.min()) / (rad.max() - rad.min() + 1e-8)
    ang_n = (ang + np.pi) / (2*np.pi)
    hsv = np.zeros((*flow.shape[:2],3), dtype=np.float32)
    hsv[...,0] = ang_n * 179.0; hsv[...,1] = 1.0; hsv[...,2] = rad_n
    bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
    return (bgr[:,:,::-1]*255.0).astype('uint8')
"""))

# common/metrics.py (se userai GT)
(base/"common"/"metrics.py").write_text(textwrap.dedent("""
import numpy as np
def epe(pred, gt, valid_mask=None):
    diff = pred - gt
    dist = np.sqrt((diff**2).sum(axis=-1))
    if valid_mask is not None: dist = dist[valid_mask>0]
    return float(dist.mean()) if dist.size>0 else float('nan')

def fl_all(pred, gt, tau=3.0, valid_mask=None):
    diff = pred - gt
    dist = np.sqrt((diff**2).sum(axis=-1))
    if valid_mask is not None: dist = dist[valid_mask>0]
    if dist.size==0: return float('nan')
    return float((dist>tau).mean()*100.0)

def angular_error(pred, gt, eps=1e-6, valid_mask=None):
    px,py = pred[...,0], pred[...,1]
    gx,gy = gt[...,0], gt[...,1]
    pnorm = np.sqrt(px*px+py*py)+eps
    gnorm = np.sqrt(gx*gx+gy*gy)+eps
    dot = (px*gx+py*gy)/(pnorm*gnorm)
    dot = np.clip(dot,-1,1)
    ang = np.degrees(np.arccos(dot))
    if valid_mask is not None: ang = ang[valid_mask>0]
    return float(ang.mean()) if ang.size>0 else float('nan')
"""))

# adapters/__init__.py
(base/"adapters"/"__init__.py").write_text(textwrap.dedent("""
def make_model(name:str, **kwargs):
    name = name.lower()
    if name == 'raft':
        from .raft import RAFTAdapter
        return RAFTAdapter(**kwargs)
    raise ValueError(f"Modello '{name}' non supportato. Usa 'raft'.")
"""))

# adapters/raft.py — robusto: RAFT-small+FP16, niente alt_cuda_corr, pesi con/ senza 'module.'
(base/"adapters"/"raft.py").write_text(textwrap.dedent("""
import os, sys, torch, numpy as np

class _DotArgs(dict):
    __getattr__ = dict.get
    def __setattr__(self, k, v): self[k] = v
    def __contains__(self, k): return dict.__contains__(self, k)

class RAFTAdapter:
    def __init__(self, weights:str, device:str=None,
                 small:bool=True, mixed_precision:bool=True, alternate_corr:bool=False):
        raft_root = os.environ.get("RAFT_ROOT", "/content/RAFT")
        core_path = os.path.join(raft_root, "core")
        for p in (core_path, raft_root):
            if p not in sys.path: sys.path.insert(0, p)

        try:
            from core.raft import RAFT as RAFTCore
            from core.utils.utils import InputPadder
        except Exception as e:
            raise ImportError("RAFT non trovato. Aggiungi /content/RAFT e /content/RAFT/core a sys.path.") from e

        self.device = torch.device(device) if device else torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.args = _DotArgs(small=small, mixed_precision=mixed_precision, alternate_corr=alternate_corr)
        self.net = RAFTCore(self.args)

        state = torch.load(weights, map_location='cpu')
        if isinstance(state, dict) and 'state_dict' in state and isinstance(state['state_dict'], dict):
            state = state['state_dict']
        if isinstance(state, dict):
            state = { (k.split('module.',1)[-1]): v for k,v in state.items() }

        missing, unexpected = self.net.load_state_dict(state, strict=False)
        if missing:    print("[RAFTAdapter] missing:", len(missing))
        if unexpected: print("[RAFTAdapter] unexpected:", len(unexpected))

        self.net.to(self.device).eval()

    @torch.no_grad()
    def __call__(self, im0, im1, iters:int=8):
        from core.utils.utils import InputPadder
        t0 = torch.from_numpy(im0.transpose(2,0,1)).float().unsqueeze(0)/255.0
        t1 = torch.from_numpy(im1.transpose(2,0,1)).float().unsqueeze(0)/255.0
        t0 = t0.to(self.device); t1 = t1.to(self.device)
        padder = InputPadder(t0.shape)
        t0p, t1p = padder.pad(t0, t1)
        _, flow_up = self.net(t0p, t1p, iters=iters, test_mode=True)
        flow = padder.unpad(flow_up)[0].permute(1,2,0).detach().cpu().numpy().astype('float32')
        return flow
"""))

print("Framework creato in /content/oflow2d ✓")


In [None]:
# === RAFT: CONFIG + UTILS =====================================================

import cv2, numpy as np, json, time, torch, os, pathlib
from oflow2d.common.viz import flow_to_color

# Config “di alto livello” per RAFT
MAX_EDGE = 640      # 512 se vuoi più sicurezza, 768 se la GPU regge
ITERS    = 8        # 6–12 in base alla qualità che vuoi
VIZ_FPS  = None     # None = usa fps del video, oppure metti un numero (es. 25.0)
SAVE_FLO = True     # salva anche i flow in formato .flo standard

def resize_pair(f0, f1, max_edge=640):
    """
    Ridimensiona la coppia di frame in modo che il lato max sia <= max_edge
    e divisibile per 8 (richiesta tipica dei modelli di optical flow).
    Ritorna: f0s, f1s, scale, (H_orig, W_orig)
    """
    h, w = f0.shape[:2]
    scale = min(1.0, max_edge / max(h, w))
    if scale < 1.0:
        new_w = int((w * scale) // 8 * 8)
        new_h = int((h * scale) // 8 * 8)
        f0s = cv2.resize(f0, (new_w, new_h), interpolation=cv2.INTER_AREA)
        f1s = cv2.resize(f1, (new_w, new_h), interpolation=cv2.INTER_AREA)
        return f0s, f1s, scale, (h, w)
    return f0, f1, 1.0, (h, w)

def upsample_flow(flow, orig_hw, scale):
    """
    Porta il flow dalla risoluzione ridotta alla risoluzione originale,
    correggendo anche la scala delle componenti (u,v).
    """
    H, W = orig_hw
    if scale != 1.0:
        flow = cv2.resize(flow, (W, H), interpolation=cv2.INTER_LINEAR)
        flow = flow / scale
    return flow

def save_flo(path, flow):
    """
    Salva il flow nel formato .flo standard (PIEH header).
    """
    with open(path, "wb") as f:
        f.write(b"PIEH")
        np.array([flow.shape[1]], np.int32).tofile(f)  # width
        np.array([flow.shape[0]], np.int32).tofile(f)  # height
        flow.astype(np.float32).tofile(f)

def video_fps(path, default=25.0):
    """
    Legge l'FPS dal file video (fallback a default se non disponibile).
    """
    cap = cv2.VideoCapture(path)
    fps = cap.get(cv2.CAP_PROP_FPS) or default
    cap.release()
    return float(fps) if fps and fps > 0 else default

def process_video(in_path, out_dir, model,
                  max_edge=MAX_EDGE, iters=ITERS,
                  viz_fps=None, save_flo_flag=True):
    """
    Esegue RAFT (o qualunque modello compatibile) su tutte le coppie di frame
    di un video:
      - legge il video
      - calcola il flow frame-to-frame
      - salva i flow (.npy + opzionale .flo)
      - genera un mp4 di visualizzazione color-coded
      - ritorna: (fps_modello, path_video_output)
    """
    os.makedirs(out_dir, exist_ok=True)

    # 1) Leggi tutti i frame in RGB
    cap = cv2.VideoCapture(in_path)
    frames = []
    while True:
        ok, frame = cap.read()
        if not ok:
            break
        frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    cap.release()
    assert len(frames) >= 2, "Il video deve avere almeno 2 frame."

    # 2) Loop sulle coppie consecutive
    times = []
    viz_frames = []
    for i in range(len(frames) - 1):
        f0, f1 = frames[i], frames[i + 1]
        f0s, f1s, scale, orig_hw = resize_pair(f0, f1, max_edge)

        t0 = time.time()
        flow = model(f0s, f1s, iters=iters)          # RAFTAdapter(...)
        times.append(time.time() - t0)

        flow = upsample_flow(flow, orig_hw, scale)

        # salva anche come .npy per analisi successive
        np.save(f"{out_dir}/flow_{i:06d}.npy", flow)
        if save_flo_flag:
            save_flo(f"{out_dir}/flow_{i:06d}.flo", flow)

        viz_frames.append(flow_to_color(flow))

        if torch.cuda.is_available() and i % 10 == 0:
            torch.cuda.empty_cache()

    # 3) Video di visualizzazione
    fps_model = 1.0 / np.mean(times)
    fps_out = viz_fps if viz_fps is not None else video_fps(in_path)

    h, w, _ = viz_frames[0].shape
    out_mp4 = f"{out_dir}/flow_viz.mp4"
    vw = cv2.VideoWriter(
        out_mp4,
        cv2.VideoWriter_fourcc(*"mp4v"),
        fps_out,
        (w, h),
    )
    for f in viz_frames:
        vw.write(cv2.cvtColor(f, cv2.COLOR_RGB2BGR))
    vw.release()

    # 4) Piccolo JSON con statistiche
    with open(f"{out_dir}/metrics.json", "w") as f:
        json.dump(
            {"pairs": len(times), "fps_model": float(fps_model)},
            f,
            indent=2,
        )

    return fps_model, out_mp4


In [None]:
# === RAFT: CREAZIONE MODELLO ==================================================
6
from oflow2d.adapters import make_model

def create_raft_model():
    """
    Crea un RAFTAdapter usando i pesi già scaricati in /content/RAFT/models.
    Se trova il modello small usa quello, altrimenti usa il RAFT Sintel.
    """
    weights_small = "/content/RAFT/models/raft-small.pth"
    weights_std   = "/content/RAFT/models/raft-sintel.pth"
    use_small = os.path.exists(weights_small)

    model = make_model(
        "raft",
        weights        = (weights_small if use_small else weights_std),
        small          = bool(use_small),
        mixed_precision= True,    # FP16 → memoria più bassa
        alternate_corr = False    # niente estensioni CUDA custom
    )
    print("RAFT model ready. small:", use_small)
    return model

raft_model = create_raft_model()


In [None]:
# === RAFT: DEMO SU VIDEO CARICATI ============================================

from google.colab import files
from IPython.display import Video, display

def run_raft_on_uploaded_videos(model,
                                max_edge=MAX_EDGE,
                                iters=ITERS,
                                viz_fps=VIZ_FPS,
                                save_flo=SAVE_FLO):
    """
    - Apre il file picker di Colab
    - Per ogni video caricato:
        * lancia RAFT con process_video
        * stampa FPS del modello
        * mostra il video di flow
    """
    uploaded = files.upload()  # seleziona uno o più .mp4/.mov/.mkv

    os.makedirs("/content/outputs", exist_ok=True)

    for name in uploaded:
        in_path = f"/content/{name}"
        base = pathlib.Path(name).stem
        out_dir = f"/content/outputs/{base}_raft"

        fps, mp4 = process_video(
            in_path, out_dir, model,
            max_edge=max_edge,
            iters=iters,
            viz_fps=viz_fps,
            save_flo_flag=save_flo,
        )

        print(f"{name}  ->  FPS modello: {fps:.2f}  |  {mp4}")
        display(Video(mp4, embed=True))

# lancia davvero la demo:
run_raft_on_uploaded_videos(raft_model)


In [None]:
# SPyNet: prova PyPI, se fallisce usa la repo GitHub
import sys, subprocess
def pip_install(spec):
    subprocess.run([sys.executable, "-m", "pip", "install", "-qU", spec], check=True)

try:
    pip_install("spynet-pytorch")
    print("✓ spynet-pytorch da PyPI")
except Exception:
    pip_install("git+https://github.com/Guillem96/spynet-pytorch")
    print("✓ spynet-pytorch da GitHub")


In [None]:
# /content/oflow2d/adapters/spynet_pkg.py
import pathlib, textwrap, importlib, shutil, os
pathlib.Path("/content/oflow2d/adapters").mkdir(parents=True, exist_ok=True)

pathlib.Path("/content/oflow2d/adapters/spynet_pkg.py").write_text(textwrap.dedent("""
import torch, numpy as np, torch.nn.functional as F

def _to_tensor(img):
    # img: HxWx3 uint8 RGB -> (1,3,H,W) float in [0,1]
    t = torch.from_numpy(img).permute(2,0,1).float().div(255.0)
    return t.unsqueeze(0)

class SPyNetPkgAdapter:
    def __init__(self, device=None, k: int = 5):
        # Import dal pacchetto funzionante
        try:
            from spynet.model import SpyNet as Net
        except Exception as e:
            raise ImportError("Pacchetto 'spynet-pytorch' non importabile.") from e

        self.device = torch.device(device) if device else torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        # Istanzia con pesi pre-addestrati (le versioni differiscono leggermente, gestiamo i casi)
        net = None
        # Tentativi robusti
        for attempt in (
            lambda: Net(pretrained=True),
            lambda: Net(k=k),            # crea i livelli e poi carichi tu i pesi se servisse
        ):
            try:
                net = attempt()
                break
            except Exception:
                pass
        if net is None:
            net = Net(k=k)

        self.net = net.to(self.device).eval()

    @torch.no_grad()
    def __call__(self, im0, im1, iters=None):
        t0 = _to_tensor(im0).to(self.device)
        t1 = _to_tensor(im1).to(self.device)

        # Alcune implementazioni richiedono multipli di 32
        H, W = t0.shape[-2], t0.shape[-1]
        Hp = (H + 31)//32*32; Wp = (W + 31)//32*32
        pad = (0, Wp-W, 0, Hp-H)
        t0p = F.pad(t0, pad, mode='replicate')
        t1p = F.pad(t1, pad, mode='replicate')

        # Forward: diverse firme → prova in ordine sicuro
        out = None
        try:
            out = self.net([t0p, t1p], limit_k=-1)
        except Exception:
            try:
                out = self.net([t0p, t1p])
            except Exception:
                try:
                    out = self.net((t0p, t1p))
                except Exception:
                    out = self.net(t0p, t1p)

        # Alcune versioni ritornano lista/tupla
        if isinstance(out, (list, tuple)):
            out = out[0]
        # Taglia padding e converti a (H,W,2) float32
        flow = out[:, :, :H, :W][0].permute(1,2,0).detach().cpu().numpy().astype("float32")
        return flow
"""))

# Aggiorna il factory del framework per includere 'spynet_pkg'
factory = "/content/oflow2d/adapters/__init__.py"
if not os.path.exists(factory) or "spynet_pkg" not in open(factory, "r").read():
    open(factory, "w").write("""
def make_model(name: str, **kwargs):
    name = name.lower()
    if name == 'raft':
        from .raft import RAFTAdapter
        return RAFTAdapter(**kwargs)
    if name in ('spynet', 'spynet_pkg', 'spynet-niklaus'):
        from .spynet_pkg import SPyNetPkgAdapter
        return SPyNetPkgAdapter(**kwargs)
    raise ValueError(f"Modello '{name}' non supportato. Usa 'raft' o 'spynet'.")
""")

# pulisci cache py e ricarica
shutil.rmtree("/content/oflow2d/__pycache__", ignore_errors=True)
shutil.rmtree("/content/oflow2d/adapters/__pycache__", ignore_errors=True)
import oflow2d.adapters as adapters; importlib.reload(adapters)
print("Adapter SPyNetPkg scritto e factory aggiornato ✓")


In [None]:
# === SPyNet: CREAZIONE MODELLO + SMOKE TEST ===================================

import numpy as np
from oflow2d.adapters import make_model

def create_spynet_model(device=None):
    """
    Crea un SPyNetPkgAdapter tramite la factory make_model('spynet').
    Esegue anche uno smoke test su due frame finti per verificare che funzioni.
    """
    cfg = {}
    if device is not None:
        cfg["device"] = device

    spynet = make_model("spynet", **cfg)
    print("SPyNet model ready.")

    # --- smoke test minimale ---
    a = np.zeros((240, 320, 3), np.uint8)
    b = a.copy()
    b[:, :10] = 255  # banda bianca per avere un po' di movimento
    f = spynet(a, b, iters=None)
    print("Smoke test SPyNet:", f.shape, f.dtype, np.isfinite(f).all())

    return spynet

spynet_model = create_spynet_model()


In [None]:
# === SPyNet: DEMO SU VIDEO CARICATI ===========================================

from google.colab import files
from IPython.display import Video, display
import pathlib, os

def run_spynet_on_uploaded_videos(model,
                                  max_edge=MAX_EDGE,
                                  iters=ITERS,
                                  viz_fps=VIZ_FPS,
                                  save_flo=SAVE_FLO):
    """
    - Apre il file picker di Colab
    - Per ogni video caricato:
        * lancia SPyNet usando process_video(...)
        * stampa FPS del modello
        * mostra il video di flow color-coded
    """
    uploaded = files.upload()  # seleziona uno o più video dal tuo PC

    os.makedirs("/content/outputs_spynet", exist_ok=True)

    for name in uploaded:
        vp = f"/content/{name}"
        base = pathlib.Path(name).stem
        out_dir = f"/content/outputs_spynet/{base}_SPyNet"

        fps, mp4 = process_video(
            vp,
            out_dir,
            model,
            max_edge=max_edge,
            iters=iters,
            viz_fps=viz_fps,
            save_flo_flag=save_flo,
        )

        print(f"[SPyNet] {name} -> {fps:.2f} FPS  |  {mp4}")
        display(Video(mp4, embed=True))

# lancia davvero la demo SPyNet:
run_spynet_on_uploaded_videos(spynet_model)


In [None]:
# === FlowNet2: deps in un site isolato =======================================
FLOW_SITE = "/content/_flow_site"

# crea la cartella e installa i pacchetti pin
!mkdir -p "$FLOW_SITE"
!python -m pip install -q -t "$FLOW_SITE" \
  "numpy==1.26.4" "scipy==1.11.4" \
  "torchmetrics==1.3.2" "lightning>=2,<2.5" \
  "opencv-python-headless==4.8.1.78" "loguru>=0.7" "jsonargparse>=4.27" \
  "requests==2.31" "einops>=0.6" "ptlflow==0.4.1" "timm>=0.9.12"

# aggiunge il site isolato al PYTHONPATH corrente
import site, importlib.util
site.addsitedir(FLOW_SITE)

import ptlflow
print("ptlflow visibile?", importlib.util.find_spec("ptlflow") is not None)
print("PTLFlow version:", getattr(ptlflow, "__version__", "unknown"))


In [None]:
# === Context manager: aggiunge/rimuove il path di FlowNet2 temporaneamente ===
import sys, contextlib

FLOW_SITE = "/content/_flow_site"

@contextlib.contextmanager
def use_flow_site():
    sys.path.insert(0, FLOW_SITE)
    try:
        yield
    finally:
        if FLOW_SITE in sys.path:
            sys.path.remove(FLOW_SITE)


In [None]:
# === FlowNet2Adapter + factory make_model (RAFT, SPyNet, FlowNet2) ===========
import os, textwrap, pathlib, importlib, shutil

base = "/content/oflow2d"
adp  = f"{base}/adapters"
os.makedirs(adp, exist_ok=True)
open(f"{base}/__init__.py", "a").close()
open(f"{adp}/__init__.py", "a").close()

adapter_code = f"""
import torch, numpy as np

# userà il context manager definito nel notebook (importato da __main__)
try:
    from __main__ import use_flow_site
except Exception:
    import sys
    FLOW_SITE = '{"/content/_flow_site"}'
    def use_flow_site():
        class _Ctx:
            def __enter__(self):
                sys.path.insert(0, FLOW_SITE)
            def __exit__(self, *args):
                if FLOW_SITE in sys.path:
                    sys.path.remove(FLOW_SITE)
        return _Ctx()

class FlowNet2Adapter:
    def __init__(self, device=None, ckpt="things"):
        self.device = torch.device(device) if device else torch.device(
            "cuda" if torch.cuda.is_available() else "cpu"
        )
        with use_flow_site():
            import ptlflow
            from ptlflow.utils.io_adapter import IOAdapter
            self._ptlflow = ptlflow
            self.model = ptlflow.get_model("flownet2", ckpt_path=ckpt).to(self.device).eval()
            self.IOAdapter = IOAdapter
        self._io = None

    @torch.inference_mode()
    def __call__(self, img0_uint8, img1_uint8, iters=None):
        \"\"\"img0_uint8, img1_uint8: HxWx3 uint8 (RGB o BGR, basta che siano coerenti).
        Ritorna: flow HxWx2 float32.
        \"\"\"
        assert img0_uint8.dtype == np.uint8 and img1_uint8.dtype == np.uint8
        h, w = img0_uint8.shape[:2]

        if self._io is None:
            with use_flow_site():
                self._io = self.IOAdapter(self.model, (h, w))

        with use_flow_site():
            inputs = self._io.prepare_inputs([img0_uint8, img1_uint8])  # {{'images': (1,2,3,H,W)}}
        for k in inputs:
            inputs[k] = inputs[k].to(self.device, non_blocking=True)

        with use_flow_site():
            preds = self.model(inputs)

        flows = preds.get("flows", preds.get("flow", None))
        if flows is None:
            raise RuntimeError("PTLFlow: output 'flows'/'flow' non trovato.")
        ft = flows[-1] if isinstance(flows, (list, tuple)) else flows
        if ft.dim() == 5:  # (B,1,2,H,W) -> (B,2,H,W)
            ft = ft[:, 0]
        flow = ft[0].permute(1, 2, 0).detach().cpu().float().numpy()
        return flow  # HxWx2 float32
"""

with open(f"{adp}/flownet2_adapter.py", "w") as f:
    f.write(textwrap.dedent(adapter_code))
print("Scritto:", f"{adp}/flownet2_adapter.py")

# Factory unica RAFT + SPyNet + FlowNet2
from pathlib import Path

Path(f"{adp}/__init__.py").write_text(r"""
def make_model(name: str, **kwargs):
    name = (name or "").lower()

    if name in ("raft", "raft_small", "raft_sintel"):
        from .raft import RAFTAdapter
        return RAFTAdapter(**kwargs)

    if name in ("spynet", "spynet_pkg", "spynet-niklaus", "spy"):
        try:
            from .spynet_adapter import SPyNetAdapter
            return SPyNetAdapter(**kwargs)
        except Exception:
            from .spynet_pkg import SPyNetPkgAdapter
            return SPyNetPkgAdapter(**kwargs)

    if name in ("flownet2", "flownet", "fn2"):
        from .flownet2_adapter import FlowNet2Adapter
        return FlowNet2Adapter(**kwargs)

    raise ValueError(f"Modello '{name}' non supportato: {name}")
""")
print("Factory aggiornata (RAFT, SPyNet, FlowNet2).")

# pulisci cache py e ricarica il modulo adapters
shutil.rmtree(f"{base}/__pycache__", ignore_errors=True)
shutil.rmtree(f"{adp}/__pycache__", ignore_errors=True)

import oflow2d.adapters as adapters
importlib.reload(adapters)
print("Modulo oflow2d.adapters ricaricato.")


In [None]:
# === FlowNet2: creazione modello + smoke test ================================
import numpy as np, torch
from oflow2d.adapters import make_model

def create_flownet2_model(device=None, ckpt="things"):
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"

    model = make_model("flownet2", device=device, ckpt=ckpt)
    print(f"FlowNet2 model ready on {device}, ckpt={ckpt}")

    # Smoke test veloce
    a = np.zeros((240, 320, 3), np.uint8)
    b = a.copy()
    b[:, 15:] = 255
    flow = model(a, b)
    print("Smoke FlowNet2:", flow.shape, flow.dtype, np.isfinite(flow).all())

    return model

flownet2_model = create_flownet2_model()


In [None]:
# === FlowNet2: DEMO SU VIDEO CARICATI (solo flow color-coded) ================
from google.colab import files
from IPython.display import Video, display
import pathlib, os

def run_flownet2_on_uploaded_videos(model,
                                    max_edge=MAX_EDGE,
                                    iters=ITERS,      # non usato da FlowNet2 ma compatibile con process_video
                                    viz_fps=VIZ_FPS,
                                    save_flo=SAVE_FLO):
    """
    - Apre il file picker di Colab
    - Per ogni video caricato:
        * lancia FlowNet2 con process_video(...)
        * stampa FPS del modello
        * mostra il video di flow (unico output)
    """
    uploaded = files.upload()  # scegli uno o più video dal tuo PC

    os.makedirs("/content/outputs_flownet2", exist_ok=True)

    for name in uploaded:
        vp = f"/content/{name}"
        base = pathlib.Path(name).stem
        out_dir = f"/content/outputs_flownet2/{base}_FlowNet2"

        fps, mp4 = process_video(
            vp,
            out_dir,
            model,
            max_edge=max_edge,
            iters=iters,
            viz_fps=viz_fps,
            save_flo_flag=save_flo,
        )

        print(f"[FlowNet2] {name} -> {fps:.2f} FPS  |  {mp4}")
        display(Video(mp4, embed=True))

# LANCIA LA DEMO
run_flownet2_on_uploaded_videos(flownet2_model)


In [None]:
# === PWCNetAdapter basato su PTLFlow (solo flow) =============================
import os, textwrap, pathlib, importlib, shutil

base = "/content/oflow2d"
adp  = f"{base}/adapters"
os.makedirs(adp, exist_ok=True)
pathlib.Path(f"{base}/__init__.py").touch()
pathlib.Path(f"{adp}/__init__.py").touch()

pwc_code = r'''
import torch, numpy as np

# usa il context manager del notebook per caricare PTLFlow dal site isolato
try:
    from __main__ import use_flow_site
except Exception:
    import sys
    FLOW_SITE = "/content/_flow_site"
    def use_flow_site():
        class _Ctx:
            def __enter__(self):
                sys.path.insert(0, FLOW_SITE)
            def __exit__(self, *a):
                if FLOW_SITE in sys.path:
                    sys.path.remove(FLOW_SITE)
        return _Ctx()

class PWCNetAdapter:
    def __init__(self, device=None, ckpt="things"):
        self.device = torch.device(device) if device else torch.device(
            "cuda" if torch.cuda.is_available() else "cpu"
        )
        with use_flow_site():
            import ptlflow
            last_err = None
            # prova alcuni nomi comuni
            for name in ("pwcnet", "pwc"):
                try:
                    self.model = ptlflow.get_model(name, ckpt_path=ckpt).to(self.device).eval()
                    self._model_name = name
                    break
                except Exception as e:
                    last_err = e
            if not hasattr(self, "model"):
                raise RuntimeError(
                    f"PWCNet non trovato in PTLFlow: {type(last_err).__name__}: {last_err}"
                )

            from ptlflow.utils.io_adapter import IOAdapter
            self.IOAdapter = IOAdapter

        self._io = None

    @torch.inference_mode()
    def __call__(self, img0_uint8, img1_uint8, iters=None):
        """img0_uint8, img1_uint8: HxWx3 uint8 (RGB o BGR, ma coerenti).
        Ritorna: flow HxWx2 float32.
        """
        assert img0_uint8.dtype == np.uint8 and img1_uint8.dtype == np.uint8, \
            "Servono frame uint8"

        h, w = img0_uint8.shape[:2]

        if self._io is None:
            with use_flow_site():
                self._io = self.IOAdapter(self.model, (h, w))

        with use_flow_site():
            x = self._io.prepare_inputs([img0_uint8, img1_uint8])  # {'images': (1,2,3,H,W)}
            for k in x:
                x[k] = x[k].to(self.device, non_blocking=True)
            preds = self.model(x)

        flows = preds.get("flows", preds.get("flow", None))
        if flows is None:
            raise RuntimeError("PTLFlow/PWCNet: output 'flows'/'flow' non trovato.")

        ft = flows[-1] if isinstance(flows, (list, tuple)) else flows
        if ft.dim() == 5:   # (B,1,2,H,W) -> (B,2,H,W)
            ft = ft[:, 0]

        flow = ft[0].permute(1, 2, 0).detach().cpu().float().numpy()  # HxWx2
        return flow
'''

open(f"{adp}/pwcnet_adapter.py","w").write(textwrap.dedent(pwc_code))
print("Scritto:", f"{adp}/pwcnet_adapter.py")


In [None]:
# === Factory make_model unica (RAFT, SPyNet, FlowNet2, PWCNet) ===============
from pathlib import Path

adp = "/content/oflow2d/adapters"

factory_txt = r"""
def make_model(name: str, **kwargs):
    name = (name or "").lower()

    # RAFT -------------------------------------------------------------
    if name in ("raft", "raft_small", "raft_sintel"):
        from .raft import RAFTAdapter
        return RAFTAdapter(**kwargs)

    # SPyNet -----------------------------------------------------------
    if name in ("spynet", "spynet_pkg", "spynet-niklaus", "spy"):
        try:
            from .spynet_adapter import SPyNetAdapter
            return SPyNetAdapter(**kwargs)
        except Exception:
            from .spynet_pkg import SPyNetPkgAdapter
            return SPyNetPkgAdapter(**kwargs)

    # FlowNet2 ---------------------------------------------------------
    if name in ("flownet2", "flownet", "fn2"):
        from .flownet2_adapter import FlowNet2Adapter
        return FlowNet2Adapter(**kwargs)

    # PWC-Net ----------------------------------------------------------
    if name in ("pwcnet", "pwc", "pwc-net"):
        from .pwcnet_adapter import PWCNetAdapter
        return PWCNetAdapter(**kwargs)

    raise ValueError(f"Modello '{name}' non supportato: {name}")
"""

Path(f"{adp}/__init__.py").write_text(factory_txt)
print("Factory adapters riscritta.")

# pulisci cache e ricarica il modulo adapters
import shutil, importlib
shutil.rmtree("/content/oflow2d/__pycache__", ignore_errors=True)
shutil.rmtree("/content/oflow2d/adapters/__pycache__", ignore_errors=True)

import oflow2d.adapters as adapters
importlib.reload(adapters)
print("Modulo oflow2d.adapters ricaricato.")


In [None]:
# === PWCNet: creazione modello + smoke test ==================================
import numpy as np, torch
from oflow2d.adapters import make_model

def create_pwcnet_model(device=None, ckpt="things"):
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"

    model = make_model("pwcnet", device=device, ckpt=ckpt)
    print(f"PWCNet model ready on {device}, ckpt={ckpt}")

    # smoke test veloce
    a = np.zeros((240, 320, 3), np.uint8)
    b = a.copy()
    b[:, 15:] = 255
    flow = model(a, b)
    print("Smoke PWCNet:", flow.shape, flow.dtype, np.isfinite(flow).all())

    return model

pwcnet_model = create_pwcnet_model()


In [None]:
# === PWCNet: DEMO SU VIDEO CARICATI (solo flow color-coded) ==================
from google.colab import files
from IPython.display import Video, display
import pathlib, os

def run_pwcnet_on_uploaded_videos(model,
                                  max_edge=MAX_EDGE,
                                  iters=ITERS,      # non usato da PTLFlow ma ok per compatibilità
                                  viz_fps=VIZ_FPS,
                                  save_flo=SAVE_FLO):
    """
    - Apre il file picker di Colab
    - Per ogni video caricato:
        * lancia PWCNet con process_video(...)
        * stampa FPS del modello
        * mostra il video di flow (unico output)
    """
    uploaded = files.upload()

    os.makedirs("/content/outputs_pwcnet", exist_ok=True)

    for name in uploaded:
        vp = f"/content/{name}"
        base = pathlib.Path(name).stem
        out_dir = f"/content/outputs_pwcnet/{base}_PWCNet"

        fps, mp4 = process_video(
            vp,
            out_dir,
            model,
            max_edge=max_edge,
            iters=iters,
            viz_fps=viz_fps,
            save_flo_flag=save_flo,
        )

        print(f"[PWCNet] {name} -> {fps:.2f} FPS  |  {mp4}")
        display(Video(mp4, embed=True))

# LANCIA LA DEMO
run_pwcnet_on_uploaded_videos(pwcnet_model)


In [None]:
import pathlib, textwrap
base = pathlib.Path("/content/oflow2d")
(base/"pipelines").mkdir(parents=True, exist_ok=True)
(open(base/"pipelines"/"__init__.py","a")).close()

path = base/"pipelines"/"video_demo.py"
path.write_text(textwrap.dedent("""
import os, json, time
import cv2
import numpy as np
import torch

from oflow2d.common.viz import flow_to_color
from oflow2d.adapters import make_model

# default uguali al notebook
MAX_EDGE = 640
ITERS    = 8
VIZ_FPS  = None
SAVE_FLO = True

def resize_pair(f0, f1, max_edge=MAX_EDGE):
    h, w = f0.shape[:2]
    scale = min(1.0, max_edge / max(h, w))
    if scale < 1.0:
        new_w = int((w * scale) // 8 * 8)
        new_h = int((h * scale) // 8 * 8)
        f0s = cv2.resize(f0, (new_w, new_h), interpolation=cv2.INTER_AREA)
        f1s = cv2.resize(f1, (new_w, new_h), interpolation=cv2.INTER_AREA)
        return f0s, f1s, scale, (h, w)
    return f0, f1, 1.0, (h, w)

def upsample_flow(flow, orig_hw, scale):
    H, W = orig_hw
    if scale != 1.0:
        flow = cv2.resize(flow, (W, H), interpolation=cv2.INTER_LINEAR)
        flow = flow / scale
    return flow

def save_flo(path, flow):
    with open(path, "wb") as f:
        f.write(b"PIEH")
        np.array([flow.shape[1]], np.int32).tofile(f)  # width
        np.array([flow.shape[0]], np.int32).tofile(f)  # height
        flow.astype(np.float32).tofile(f)

def video_fps(path, default=25.0):
    cap = cv2.VideoCapture(path)
    fps = cap.get(cv2.CAP_PROP_FPS) or default
    cap.release()
    return float(fps) if fps and fps > 0 else default

def process_video(in_path, out_dir, model,
                  max_edge=MAX_EDGE, iters=ITERS,
                  viz_fps=VIZ_FPS, save_flo_flag=SAVE_FLO):
    \"\"\"Video → modello → flow + video color-coded.
    Ritorna (fps_modello, path_mp4).
    \"\"\"
    os.makedirs(out_dir, exist_ok=True)

    cap = cv2.VideoCapture(in_path)
    frames = []
    while True:
        ok, frame = cap.read()
        if not ok:
            break
        frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    cap.release()
    assert len(frames) >= 2, "Il video deve avere almeno 2 frame."

    times = []
    viz_frames = []
    for i in range(len(frames) - 1):
        f0, f1 = frames[i], frames[i+1]
        f0s, f1s, scale, orig_hw = resize_pair(f0, f1, max_edge=max_edge)

        t0 = time.time()
        flow = model(f0s, f1s, iters=iters)
        times.append(time.time() - t0)

        flow = upsample_flow(flow, orig_hw, scale)

        np.save(os.path.join(out_dir, f"flow_{i:06d}.npy"), flow)
        if save_flo_flag:
            save_flo(os.path.join(out_dir, f"flow_{i:06d}.flo"), flow)

        viz_frames.append(flow_to_color(flow))

        if torch.cuda.is_available() and i % 10 == 0:
            torch.cuda.empty_cache()

    fps_model = 1.0 / np.mean(times)
    fps_out = viz_fps if viz_fps is not None else video_fps(in_path)

    h, w, _ = viz_frames[0].shape
    out_mp4 = os.path.join(out_dir, "flow_viz.mp4")
    vw = cv2.VideoWriter(
        out_mp4,
        cv2.VideoWriter_fourcc(*"mp4v"),
        fps_out,
        (w, h),
    )
    for f in viz_frames:
        vw.write(cv2.cvtColor(f, cv2.COLOR_RGB2BGR))
    vw.release()

    with open(os.path.join(out_dir, "metrics.json"), "w") as f:
        json.dump({"pairs": len(times), "fps_model": float(fps_model)}, f, indent=2)

    return fps_model, out_mp4

# --------- wrapper per creare i modelli -------------------------------------

def create_raft_model():
    weights_small = "/content/RAFT/models/raft-small.pth"
    weights_std   = "/content/RAFT/models/raft-sintel.pth"
    use_small = os.path.exists(weights_small)
    model = make_model(
        "raft",
        weights=(weights_small if use_small else weights_std),
        small=bool(use_small),
        mixed_precision=True,
        alternate_corr=False,
    )
    print("RAFT model ready. small:", use_small)
    return model

def create_spynet_model(device=None):
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
    model = make_model("spynet", device=device)
    print("SPyNet model ready on", device)
    return model

def create_flownet2_model(device=None, ckpt="things"):
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
    model = make_model("flownet2", device=device, ckpt=ckpt)
    print("FlowNet2 model ready on", device, "ckpt=", ckpt)
    return model

def create_pwcnet_model(device=None, ckpt="things"):
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
    model = make_model("pwcnet", device=device, ckpt=ckpt)
    print("PWCNet model ready on", device, "ckpt=", ckpt)
    return model
"""))
print("Scritto /content/oflow2d/pipelines/video_demo.py")


In [None]:
!cd /content && zip -r oflow2d.zip oflow2d
