# Grayscale Depth Training (UNet) — Live Curves

Train a single-channel UNet for depth regression with **live loss/metric plots**.

**Features**
- Config cell to tweak params
- Mixed precision on CUDA
- L1 + SiLog loss
- Per-epoch RMSE / AbsRel on val
- **Live charts** updated every epoch
- Save `best.pt`, `last.pt`, `history.csv`, and `curves.png`


In [None]:
# ===== Config (EDIT) =====
config = {
    "train_images": "data_v2/train/images",
    "train_depths": "data_v2/train/depths",
    "val_images":   "data_v2/val/images",
    "val_depths":   "data_v2/val/depths",
    "img_size": 384,
    "max_depth": 80.0,  # set to your dataset's ~P99
    "batch": 16,
    "epochs": 50,
    "lr": 1e-3,
    "weight_decay": 1e-4,
    "w_l1": 1.0,
    "w_silog": 0.1,
    "device": "cuda",           # 'cuda' on 4080S, else 'cpu'
    "out_dir": "runs/dep_unet_v2"
}
config

In [None]:
# ===== Imports & utils =====
import os, math, time, random, csv
from pathlib import Path
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
import matplotlib.pyplot as plt
from IPython.display import clear_output
from torch.utils.tensorboard import SummaryWriter
import torchvision.utils as vutils  # 若要偶爾寫入可視化用


def set_seed(seed=42):
    random.seed(seed); np.random.seed(seed)
    torch.manual_seed(seed); torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = True

def valid_mask_from(gt: torch.Tensor):
    return torch.isfinite(gt) & (gt > 0)

def rmse(pred, gt, mask):
    if mask.sum() == 0: return torch.tensor(float('nan'), device=pred.device)
    return torch.sqrt(F.mse_loss(pred[mask], gt[mask]))

def abs_rel(pred, gt, mask):
    if mask.sum() == 0: return torch.tensor(float('nan'), device=pred.device)
    return ((pred[mask] - gt[mask]).abs() / gt[mask].clamp_min(1e-6)).mean()

def to_device(batch, device):
    x,y,m = batch
    return x.to(device, non_blocking=True), y.to(device, non_blocking=True), m.to(device, non_blocking=True)

device = torch.device('cuda' if (config['device']=='cuda' and torch.cuda.is_available()) else 'cpu')
device

In [None]:
import torchvision.transforms.functional as TF

class ImageDepthNPY(Dataset):
    def __init__(self, img_dir, dep_dir, img_size=384, max_depth=80.0, aug=False, allow_exts=None):
        self.img_dir = Path(img_dir); self.dep_dir = Path(dep_dir)
        self.img_size = int(img_size); self.max_depth = float(max_depth)
        self.aug = bool(aug)
        if allow_exts is None:
            allow_exts = (".png",".jpg",".jpeg",".bmp",".tif",".tiff",".webp")

        imgs = [p for p in self.img_dir.iterdir() if p.is_file() and p.suffix.lower() in allow_exts]
        imgs = sorted(imgs)

        self.samples = []
        for p in imgs:
            npy = self.dep_dir / (p.stem + '.npy')
            if npy.exists():
                self.samples.append((p, npy))
        self.samples = sorted(self.samples)

        if not self.samples:
            raise FileNotFoundError(f'No paired samples under {img_dir} & {dep_dir}')

        # 僅保留「強度」相關的 normalize；幾何由我們自己同步處理
        self.to_tensor = T.ToTensor()
        self.norm = T.Normalize(mean=(0.5,), std=(0.5,))

        # RandomResizedCrop 參數
        self.scale_range = (0.7, 1.0)
        self.ratio_range = (3/4, 4/3)

    def __len__(self): 
        return len(self.samples)

    def _random_resized_crop_params(self, w, h):
        # 使用 torchvision 官方的取參邏輯
        i, j, h_out, w_out = T.RandomResizedCrop.get_params(
            img=torch.empty(1, h, w),  # 只需尺寸
            scale=self.scale_range,
            ratio=self.ratio_range
        )
        return i, j, h_out, w_out

    def __getitem__(self, idx):
        img_p, dep_p = self.samples[idx]

        # --- load ---
        pil = Image.open(img_p).convert('L')  # [H, W]
        d_np = np.load(dep_p).astype(np.float32)  # [H, W]
        d = torch.from_numpy(d_np).unsqueeze(0)   # [1, H, W]  (單通道深度)

        # --- geometry (同步影像 & 深度) ---
        if self.aug:
            # 抽一次參數，套用到兩者
            i, j, h_out, w_out = self._random_resized_crop_params(pil.width, pil.height)
            pil = TF.resized_crop(pil, top=i, left=j, height=h_out, width=w_out, size=[self.img_size, self.img_size], antialias=True)
            # 對深度做相同裁切與resize；避免混值，resize 用 nearest
            d = d[:, i:i+h_out, j:j+w_out]
            d = d.unsqueeze(0)  # [1,1,h,w]
            d = F.interpolate(d, size=(self.img_size, self.img_size), mode='nearest').squeeze(0)  # [1,H,W]
        else:
            pil = TF.resize(pil, [self.img_size, self.img_size], antialias=True)
            d = d.unsqueeze(0)
            d = F.interpolate(d, size=(self.img_size, self.img_size), mode='nearest').squeeze(0)

        # --- to tensor & normalize ---
        x = self.to_tensor(pil)          # [1,H,W], 0~1
        x = self.norm(x)                 # normalize

        # --- mask & normalize depth ---
        # 先做有效遮罩，再正規化；同時忽略超過上限的值（可選）
        m = torch.isfinite(d) & (d > 0) & (d <= self.max_depth)
        y = torch.zeros_like(d)
        if m.any():
            y[m] = (d[m] / self.max_depth).clamp(0, 1)

        # --- horizontal flip (一致套用到三者) ---
        if self.aug and random.random() < 0.5:
            x = torch.flip(x, dims=[2])
            y = torch.flip(y, dims=[2])
            m = torch.flip(m, dims=[2])

        return x, y, m


In [None]:
class DoubleConv(nn.Module):
    def __init__(self, in_c, out_c, norm="bn"):
        super().__init__()
        Norm = nn.BatchNorm2d if norm=="bn" else lambda c: nn.GroupNorm(32, c)
        self.block = nn.Sequential(
            nn.Conv2d(in_c, out_c, 3, padding=1, bias=False),
            Norm(out_c),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_c, out_c, 3, padding=1, bias=False),
            Norm(out_c),
            nn.ReLU(inplace=True),
        )
    def forward(self, x):
        return self.block(x)

class UpBlock(nn.Module):
    def __init__(self, in_c, skip_c, out_c, norm="bn"):
        super().__init__()
        self.up = nn.Upsample(scale_factor=2, mode="bilinear", align_corners=False)
        self.conv = DoubleConv(in_c + skip_c, out_c, norm=norm)
    def forward(self, x, skip):
        x = self.up(x)
        x = torch.cat([x, skip], dim=1)
        return self.conv(x)

class UNetSmall(nn.Module):
    def __init__(self, in_ch=1, out_ch=1, norm="bn", final_act="sigmoid"):
        super().__init__()
        ch = [64,128,256,512]
        self.d1 = DoubleConv(in_ch,   ch[0], norm)
        self.p1 = nn.MaxPool2d(2)
        self.d2 = DoubleConv(ch[0],   ch[1], norm)
        self.p2 = nn.MaxPool2d(2)
        self.d3 = DoubleConv(ch[1],   ch[2], norm)
        self.p3 = nn.MaxPool2d(2)
        self.d4 = DoubleConv(ch[2],   ch[3], norm)

        self.u3 = UpBlock(ch[3], ch[2], ch[2], norm)
        self.u2 = UpBlock(ch[2], ch[1], ch[1], norm)
        self.u1 = UpBlock(ch[1], ch[0], ch[0], norm)

        self.head = nn.Conv2d(ch[0], out_ch, 1)
        self.final_act = final_act

        self._init_weights()

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
                if m.bias is not None: nn.init.zeros_(m.bias)

    def forward(self, x):
        x1 = self.d1(x)
        x2 = self.d2(self.p1(x1))
        x3 = self.d3(self.p2(x2))
        x4 = self.d4(self.p3(x3))

        y  = self.u3(x4, x3)
        y  = self.u2(y,  x2)
        y  = self.u1(y,  x1)
        y  = self.head(y)

        if self.final_act == "sigmoid":
            y = torch.sigmoid(y)
        elif self.final_act == "relu":
            y = torch.relu(y).clamp_(0,1)
        # else: linear (no activation), 交給 loss 再 clamp
        return y


In [None]:
class DepthLoss(nn.Module):
    def __init__(self, w_l1=1.0, w_silog=0.1, clamp01=True):
        super().__init__()
        self.w_l1 = float(w_l1)
        self.w_silog = float(w_silog)
        self.clamp01 = clamp01

    def silog(self, pred, gt, mask, eps=1e-6, lam=0.85):
        # 僅在有效像素上計算
        p = pred[mask]
        g = gt[mask]
        # 數值穩定
        d = (p.clamp_min(eps)).log() - (g.clamp_min(eps)).log()
        return torch.mean(d * d) - lam * (torch.mean(d) ** 2)

    def forward(self, pred, gt, mask):
        if self.clamp01:
            pred = pred.clamp(0, 1)

        loss = pred.new_tensor(0.0)
        if mask.any():
            if self.w_l1 > 0:
                loss = loss + self.w_l1 * F.l1_loss(pred[mask], gt[mask])
            if self.w_silog > 0:
                loss = loss + self.w_silog * self.silog(pred, gt, mask)
        # 如果整個 batch 都沒有有效像素，就回 0（或回 nan 也行，看你要不要讓外層丟棄）
        return loss


In [None]:
# ===== Train with tqdm batch bar =====
from tqdm import tqdm
import platform

set_seed(42)

# Windows + Notebook 最穩：num_workers=0, pin_memory=False
num_workers = 0
pin_mem = False
if platform.system() == "Linux" and device.type == "cuda":
    # 你在 Linux server 可試著開高一點
    num_workers = 4
    pin_mem = True

train_set = ImageDepthNPY(config['train_images'], config['train_depths'], config['img_size'], config['max_depth'], aug=True)
val_set   = ImageDepthNPY(config['val_images'],   config['val_depths'],   config['img_size'], config['max_depth'], aug=False)

print("train samples:", len(train_set), "val samples:", len(val_set))

train_loader = DataLoader(
    train_set, batch_size=config['batch'], shuffle=True,
    num_workers=num_workers, pin_memory=pin_mem, drop_last=True
)
val_loader   = DataLoader(
    val_set, batch_size=config['batch'], shuffle=False,
    num_workers=num_workers, pin_memory=pin_mem
)
print("batches per epoch -> train:", len(train_loader), "val:", len(val_loader))

model = UNetSmall(in_ch=1).to(device)
loss_fn = DepthLoss(config['w_l1'], config['w_silog'])
opt = torch.optim.AdamW(model.parameters(), lr=config['lr'], weight_decay=config['weight_decay'])

def lr_lambda(e):
    warm=max(3,int(0.1*config['epochs']))
    if e < warm: return (e+1)/warm
    prog=(e-warm)/max(1, config['epochs']-warm)
    return 0.5*(1+math.cos(math.pi*prog))
sch = torch.optim.lr_scheduler.LambdaLR(opt, lr_lambda)

outdir = Path(config['out_dir']); outdir.mkdir(parents=True, exist_ok=True)
run_name = time.strftime("%Y%m%d_%H%M%S")
tb_dir = outdir / f"tb_{run_name}"
tb_dir.mkdir(parents=True, exist_ok=True)
writer = SummaryWriter(log_dir=str(tb_dir))

# 可選：把 config 也記下，之後查參數方便
for k, v in config.items():
    writer.add_text("config", f"{k}: {v}", global_step=0)

best_rmse=float('inf')
scaler = torch.cuda.amp.GradScaler(enabled=(device.type=='cuda'))

hist = { 'train_loss': [], 'val_rmse': [], 'val_absrel': [] }

# 移除 clear_output 的干擾（先專心看進度）
# fig,axs = plt.subplots(1,3, figsize=(12,3))
print(f"Device={device}; img_size={config['img_size']} batch={config['batch']}")

for epoch in range(config['epochs']):
    model.train()
    running = 0.0
    pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{config['epochs']}", leave=True)
    for i,(x,y,m) in enumerate(pbar):
        x,y,m = to_device((x,y,m), device)
        opt.zero_grad(set_to_none=True)
        with torch.autocast(device.type if device.type!='cpu' else 'cpu',
                            dtype=torch.float16 if device.type=='cuda' else torch.bfloat16,
                            enabled=(device.type!='cpu')):
            pred = model(x)
            loss = loss_fn(pred, y, m)
        scaler.scale(loss).backward()
        scaler.step(opt); scaler.update()
        running += loss.item() * x.size(0)

        # 在進度條上顯示目前 loss
        if (i+1) % 10 == 0:
            pbar.set_postfix(loss=f"{loss.item():.4f}", lr=f"{sch.get_last_lr()[0]:.2e}")

    sch.step()
    tr_loss = running / len(train_loader.dataset)

    # 驗證
    val_rmse, val_absrel = evaluate(model, val_loader, device, config['max_depth'])
    hist['train_loss'].append(tr_loss); hist['val_rmse'].append(val_rmse); hist['val_absrel'].append(val_absrel)

    # 存 ckpt
    torch.save({'epoch': epoch, 'model': model.state_dict()}, outdir/'last.pt')
    if val_rmse < best_rmse:
        best_rmse = val_rmse
        torch.save({'epoch': epoch, 'model': model.state_dict()}, outdir/'best.pt')

    # 畫曲線（不清空輸出）
    # axs[0].cla(); axs[1].cla(); axs[2].cla()
    # axs[0].plot(hist['train_loss']); axs[0].set_title('train loss')
    # axs[1].plot(hist['val_rmse']);   axs[1].set_title('val RMSE')
    # axs[2].plot(hist['val_absrel']); axs[2].set_title('val AbsRel')
    # plt.tight_layout(); display(fig)

    print(f"Epoch {epoch+1} done. train {tr_loss:.4f} | val RMSE {val_rmse:.3f} AbsRel {val_absrel:.3f} | best RMSE {best_rmse:.3f}")

fig.savefig(outdir/'curves.png', dpi=150)
print("Training finished. Best RMSE =", best_rmse)


In [None]:
# === Inference (folder) -> .npy + heatmap PNG (INFERNO) ===
from pathlib import Path
import numpy as np
import torch, torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
from PIL import Image
import cv2
from tqdm import tqdm

# ========= 填這些參數 =========
ckpt_path      = r"runs/dep_unet_v2/best.pt"     # 你的 best.pt
in_images_dir  = r"0703_image"                   # 要推論的影像資料夾
out_dir        = r"output_npy_color"                   # 輸出資料夾（自動建立）
img_size       = 384                             # 與訓練一致
max_depth      = 80.0                            # 與訓練一致
device_str     = "cuda"                          # "cuda" 或 "cpu"
save_stack     = True                            # 另存 原圖/熱力圖 上下拼接 PNG

# ========= 若 notebook 尚未有 UNetSmall，這裡補上最小相容模型 =========
try:
    UNetSmall
except NameError:
    class DoubleConv(nn.Module):
        def __init__(self, in_c, out_c):
            super().__init__()
            self.net = nn.Sequential(
                nn.Conv2d(in_c, out_c, 3, padding=1), nn.BatchNorm2d(out_c), nn.ReLU(inplace=True),
                nn.Conv2d(out_c, out_c, 3, padding=1), nn.BatchNorm2d(out_c), nn.ReLU(inplace=True),
            )
        def forward(self, x): return self.net(x)

    class UNetSmall(nn.Module):
        def __init__(self, in_ch=1, out_ch=1):
            super().__init__()
            ch=[64,128,256,512]
            self.d1=DoubleConv(in_ch,ch[0]); self.p1=nn.MaxPool2d(2)
            self.d2=DoubleConv(ch[0],ch[1]); self.p2=nn.MaxPool2d(2)
            self.d3=DoubleConv(ch[1],ch[2]); self.p3=nn.MaxPool2d(2)
            self.d4=DoubleConv(ch[2],ch[3])
            self.u3=nn.ConvTranspose2d(ch[3],ch[2],2,2); self.dc3=DoubleConv(ch[2]*2,ch[2])
            self.u2=nn.ConvTranspose2d(ch[2],ch[1],2,2); self.dc2=DoubleConv(ch[1]*2,ch[1])
            self.u1=nn.ConvTranspose2d(ch[1],ch[0],2,2); self.dc1=DoubleConv(ch[0]*2,ch[0])
            self.head=nn.Conv2d(ch[0],out_ch,1)
        def forward(self,x):
            x1=self.d1(x); x2=self.d2(self.p1(x1)); x3=self.d3(self.p2(x2)); x4=self.d4(self.p3(x3))
            y=self.u3(x4); y=self.dc3(torch.cat([y,x3],1))
            y=self.u2(y);  y=self.dc2(torch.cat([y,x2],1))
            y=self.u1(y);  y=self.dc1(torch.cat([y,x1],1))
            return torch.sigmoid(self.head(y))

def load_ckpt(ckpt_path, device):
    ck = torch.load(ckpt_path, map_location=device)
    model = UNetSmall(in_ch=1).to(device)
    sd = ck["model"] if isinstance(ck, dict) and "model" in ck else ck
    model.load_state_dict(sd); model.eval()
    return model

# ========= 推論主程式 =========
device = torch.device("cuda" if (device_str=="cuda" and torch.cuda.is_available()) else "cpu")
model  = load_ckpt(ckpt_path, device)

in_dir  = Path(in_images_dir)
out_dir = Path(out_dir); out_dir.mkdir(parents=True, exist_ok=True)
viz_dir = out_dir / "viz"; viz_dir.mkdir(parents=True, exist_ok=True)
stack_dir = out_dir / "stack"; 
if save_stack: stack_dir.mkdir(parents=True, exist_ok=True)

exts = (".png",".jpg",".jpeg",".bmp",".tif",".tiff",".webp")
imgs = sorted([p for p in in_dir.iterdir() if p.is_file() and p.suffix.lower() in exts])
if not imgs:
    raise FileNotFoundError(f"No images in {in_dir}")

tf = T.Compose([
    T.Resize((img_size, img_size)),
    T.ToTensor(),
    T.Normalize(mean=(0.5,), std=(0.5,)),   # 與訓練一致（灰階）
])

print(f"Device={device}; images={len(imgs)}; saving to {out_dir}")
for p in tqdm(imgs, desc="infer"):
    pil = Image.open(p).convert("L")
    W0, H0 = pil.size
    x = tf(pil).unsqueeze(0).to(device)

    with torch.autocast(device.type if device.type!="cpu" else "cpu",
                        dtype=torch.float16 if device.type=="cuda" else torch.bfloat16,
                        enabled=(device.type!="cpu")):
        y = model(x)                    # [1,1,h,w], in [0,1]

    # 還原到 metric depth & 原圖大小
    y = (y * max_depth).squeeze(0)      # [1,h,w]
    y = F.interpolate(y.unsqueeze(0), size=(H0, W0), mode="bilinear", align_corners=False).squeeze(0)
    depth = y.squeeze(0).detach().cpu().numpy().astype(np.float32)   # [H,W]

    # 存 .npy
    np.save(out_dir / f"{p.stem}.npy", depth)

    # 依你之前方法：裁到 [0,max_depth] -> 0~255 -> INFERNO colormap
    depth_vis = np.clip(depth, 0.0, max_depth)
    depth_u8  = np.round(depth_vis / max_depth * 255.0).astype(np.uint8)
    heatmap   = cv2.applyColorMap(depth_u8, cv2.COLORMAP_INFERNO)
    cv2.imwrite(str(viz_dir / f"{p.stem}_heatmap.png"), heatmap)

    # 可選：與原圖上下拼接（原圖為 BGR）
    if save_stack:
        img_bgr = cv2.cvtColor(np.array(pil.resize((W0, H0))), cv2.COLOR_GRAY2BGR)
        stacked = np.vstack([img_bgr, heatmap])
        cv2.imwrite(str(stack_dir / f"{p.stem}_stack.png"), stacked)

print(f"Done. Wrote {len(imgs)} .npy to {out_dir} and heatmaps to {viz_dir}" + (f", stacks to {stack_dir}" if save_stack else ""))
