In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
DATA_ROOT = "/kaggle/input/biped-edge-detection/BIPED/edges"
TRAIN_LIST = f"{DATA_ROOT}/train_rgb.lst"

with open(TRAIN_LIST) as f:
    for _ in range(5):
        print(repr(f.readline().strip()))


# Imports & Config

In [None]:
import os
import random
from typing import List, Tuple

import numpy as np
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import torchvision.transforms.functional as TF

# Reproducibility
def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(42)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", DEVICE)

DATA_ROOT = "/kaggle/input/biped-edge-detection/BIPED/edges"
TRAIN_LIST = os.path.join(DATA_ROOT, "train_rgb.lst")
TEST_LIST  = os.path.join(DATA_ROOT, "test_rgb.lst")

#  Dataset class for BIPED

In [None]:
import os
import random
from typing import Tuple, List

from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms.functional as TF

import os
import random
from typing import Tuple, List

from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms.functional as TF


class BIPEDDataset(Dataset):
    def __init__(
        self,
        root_dir: str,
        list_file: str,
        split: str = "train",          # "train" ho·∫∑c "test"
        img_size: Tuple[int, int] = (320, 320),
        is_train: bool = True,
    ):
        self.root_dir = root_dir      # "/kaggle/input/biped-edge-detection/BIPED/edges"
        self.split = split            # "train" ho·∫∑c "test"
        self.img_size = img_size
        self.is_train = is_train

        self.img_paths: List[str] = []
        self.edge_paths: List[str] = []

        with open(list_file, "r") as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue

                parts = line.split()
                # file c≈© ki·ªÉu: rgbr/aug/p1/RGB_001.jpg rgbr/aug/p1/RGB_001.png
                rgb_rel = parts[0]
                edge_rel = parts[1] if len(parts) > 1 else parts[0]

                # ‚ö†Ô∏è CH·ªà L·∫§Y T√äN FILE, B·ªé H·∫æT FOLDER C≈®
                rgb_name = os.path.basename(rgb_rel)      # RGB_001.jpg
                edge_name = os.path.basename(edge_rel)    # RGB_001.png

                # ƒê·∫£m b·∫£o edge l√† .png
                edge_stem, _ = os.path.splitext(edge_name)
                edge_name = edge_stem + ".png"

                if self.split == "train":
                    img_path  = os.path.join(
                        self.root_dir, "imgs", "train", "rgbr", "real", rgb_name
                    )
                    edge_path = os.path.join(
                        self.root_dir, "edge_maps", "train", "rgbr", "real", edge_name
                    )
                else:  # test
                    img_path  = os.path.join(
                        self.root_dir, "imgs", "test", "rgbr", rgb_name
                    )
                    edge_path = os.path.join(
                        self.root_dir, "edge_maps", "test", "rgbr", edge_name
                    )

                self.img_paths.append(img_path)
                self.edge_paths.append(edge_path)

        print(
            f"[BIPEDDataset] Loaded {len(self.img_paths)} samples from "
            f"{os.path.basename(list_file)} (split={self.split})"
        )

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx: int):
        img_path  = self.img_paths[idx]
        edge_path = self.edge_paths[idx]

        # N·∫øu c√≤n l·ªói th√¨ b·∫≠t debug:
        # print("IMG:", img_path)
        # print("EDGE:", edge_path)

        img = Image.open(img_path).convert("RGB")
        edge = Image.open(edge_path).convert("L")

        # Resize
        img = img.resize(self.img_size, Image.BILINEAR)
        edge = edge.resize(self.img_size, Image.NEAREST)

        # Augment
        if self.is_train:
            if random.random() < 0.5:
                img = TF.hflip(img)
                edge = TF.hflip(edge)
            if random.random() < 0.5:
                img = TF.vflip(img)
                edge = TF.vflip(edge)

        img_tensor = TF.to_tensor(img)          # (3, H, W)
        edge_tensor = TF.to_tensor(edge)        # (1, H, W)
        edge_tensor = (edge_tensor > 0.5).float()

        return img_tensor, edge_tensor



IMG_SIZE   = (320, 320)
BATCH_SIZE = 4

train_dataset = BIPEDDataset(
    root_dir=DATA_ROOT,
    list_file=TRAIN_LIST,
    split="train",
    img_size=IMG_SIZE,
    is_train=True,
)

test_dataset = BIPEDDataset(
    root_dir=DATA_ROOT,
    list_file=TEST_LIST,
    split="test",
    img_size=IMG_SIZE,
    is_train=False,
)

train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=0,     # ƒë·ªÉ debug cho d·ªÖ, OK r·ªìi h·∫µng tƒÉng
    pin_memory=True,
)

test_loader = DataLoader(   # d√πng test l√†m val
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0,
    pin_memory=True,
)

val_loader = test_loader

x, y = next(iter(train_loader))
print("Batch shape:", x.shape, y.shape)

vx, vy = next(iter(val_loader))
print("Val batch:", vx.shape, vy.shape)

# U-Net-like model for edge detection

In [None]:
class DoubleConv(nn.Module):
    """(Conv -> BN -> ReLU) * 2"""
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, 3, padding=1, bias=False),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_ch, out_ch, 3, padding=1, bias=False),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        return self.net(x)


class Down(nn.Module):
    """Downscaling with maxpool then double conv"""
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.net = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_ch, out_ch),
        )

    def forward(self, x):
        return self.net(x)


class Up(nn.Module):
    """Upscaling then double conv (bilinear upsample)"""
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.up = nn.Upsample(scale_factor=2, mode="bilinear", align_corners=True)
        self.conv = DoubleConv(in_ch, out_ch)

    def forward(self, x1, x2):
        x1 = self.up(x1)

        # Pad if needed (in case of mismatched sizes)
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]
        x1 = F.pad(
            x1,
            [diffX // 2, diffX - diffX // 2,
             diffY // 2, diffY - diffY // 2],
        )

        # Concatenate
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


class EdgeUNet(nn.Module):
    """Small U-Net for edge detection (1-channel output)."""
    def __init__(self, n_channels=3, n_classes=1):
        super().__init__()
        self.inc = DoubleConv(n_channels, 32)
        self.down1 = Down(32, 64)
        self.down2 = Down(64, 128)
        self.down3 = Down(128, 256)

        self.up1 = Up(256 + 128, 128)
        self.up2 = Up(128 + 64, 64)
        self.up3 = Up(64 + 32, 32)

        self.outc = nn.Conv2d(32, n_classes, kernel_size=1)

    def forward(self, x):
        x1 = self.inc(x)       # 32
        x2 = self.down1(x1)    # 64
        x3 = self.down2(x2)    # 128
        x4 = self.down3(x3)    # 256

        x = self.up1(x4, x3)
        x = self.up2(x,  x2)
        x = self.up3(x,  x1)

        logits = self.outc(x)  # (B, 1, H, W)
        return logits


model = EdgeUNet().to(DEVICE)
print(model)


#  Loss, metrics & optimizer

In [None]:
# Hyperparameters
LR = 1e-4
EPOCHS = 5
BATCH_SIZE = 4
IMG_SIZE = (320, 320)

# Loss function
criterion = nn.BCEWithLogitsLoss()

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=LR)


def compute_iou(pred: torch.Tensor, target: torch.Tensor, thresh: float = 0.5) -> float:
    """
    pred, target: (B, 1, H, W), values in [0,1]
    """
    pred_bin = (pred > thresh).float()
    target_bin = (target > 0.5).float()

    intersection = (pred_bin * target_bin).sum(dim=(1,2,3))
    union = pred_bin.sum(dim=(1,2,3)) + target_bin.sum(dim=(1,2,3)) - intersection + 1e-6

    iou = (intersection / union).mean().item()
    return iou


# Training & Evaluation

In [None]:
def train_one_epoch(epoch, model, train_loader, optimizer, device, criterion):
    model.train()
    running_loss = 0.0
    running_iou = 0.0

    for step, (imgs, edges) in enumerate(train_loader):
        imgs = imgs.to(device)
        edges = edges.to(device)

        optimizer.zero_grad()
        logits = model(imgs)
        loss = criterion(logits, edges)

        loss.backward()
        optimizer.step()

        with torch.no_grad():
            probs = torch.sigmoid(logits)
            iou = compute_iou(probs, edges)

        running_loss += loss.item()
        running_iou += iou

        if (step + 1) % 50 == 0:
            print(
                f"Epoch [{epoch}] Step [{step+1}/{len(train_loader)}] "
                f"Loss: {running_loss/(step+1):.4f}  IoU: {running_iou/(step+1):.4f}"
            )

    epoch_loss = running_loss / len(train_loader)
    epoch_iou = running_iou / len(train_loader)
    print(f"[Train] Epoch {epoch}: Loss={epoch_loss:.4f}, IoU={epoch_iou:.4f}")
    return epoch_loss, epoch_iou


def validate(epoch, model, val_loader, device, criterion):
    model.eval()
    val_loss = 0.0
    val_iou = 0.0

    with torch.no_grad():
        for imgs, edges in val_loader:
            imgs = imgs.to(device)
            edges = edges.to(device)

            logits = model(imgs)
            loss = criterion(logits, edges)

            probs = torch.sigmoid(logits)
            iou = compute_iou(probs, edges)

            val_loss += loss.item()
            val_iou += iou

    val_loss /= len(val_loader)
    val_iou  /= len(val_loader)
    print(f"[Val]   Epoch {epoch}: Loss={val_loss:.4f}, IoU={val_iou:.4f}")
    return val_loss, val_iou



In [None]:
import os
import torch
import matplotlib.pyplot as plt

BEST_CKPT_PATH = "/kaggle/working/biped_edge_unet_best.pth"
NUM_EPOCHS = 5

def run_training(model, train_loader, val_loader, optimizer, device, num_epochs = NUM_EPOCHS, save_path=BEST_CKPT_PATH):
    best_val_iou = 0.0
    best_epoch = 0

    for epoch in range(1, num_epochs + 1):
        train_one_epoch(epoch, model, train_loader, optimizer, device, criterion)
        val_loss, val_iou = validate(epoch, model, val_loader, device, criterion)

        if val_iou > best_val_iou:
            best_val_iou = val_iou
            best_epoch = epoch
            torch.save({
                "epoch": epoch,
                "model": model.state_dict(),
                "optimizer": optimizer.state_dict(),
                "val_loss": val_loss,
                "val_iou": val_iou
            }, save_path)

    print(f"Training finished. Best IoU={best_val_iou:.4f} (epoch {best_epoch})")


In [None]:
# D√πng test_loader nh∆∞ val_loader
run_training(
    model=model,
    train_loader=train_loader,
    val_loader=test_loader,
    optimizer=optimizer,
    device=DEVICE,
)



In [None]:
import os
import torch

def load_trained_model(
    model,
    device,
    local_ckpt="/kaggle/working/biped_edge_unet_best.pth",
    filename_in_input="biped_edge_unet_best.pth",
):
    """
    ∆Øu ti√™n:
    1. Load checkpoint local ·ªü /kaggle/working (n·∫øu ƒë√£ train trong run n√†y)
    2. N·∫øu kh√¥ng c√≥, ƒëi t√¨m file `filename_in_input` trong /kaggle/input (datasets + models)
    3. N·∫øu v·∫´n kh√¥ng c√≥, d√πng model kh·ªüi t·∫°o
    """

    # 1) Check local checkpoint trong /kaggle/working
    if os.path.exists(local_ckpt):
        print(f"üîπ Loading LOCAL checkpoint: {local_ckpt}")
        ckpt_path = local_ckpt
    else:
        # 2) T√¨m trong /kaggle/input
        print("üîç Kh√¥ng th·∫•y local checkpoint, ƒëang t√¨m trong /kaggle/input ...")
        ckpt_path = None
        for root, dirs, files in os.walk("/kaggle/input"):
            if filename_in_input in files:
                ckpt_path = os.path.join(root, filename_in_input)
                break

        if ckpt_path is None:
            print("‚ö†Ô∏è Kh√¥ng t√¨m th·∫•y checkpoint trong /kaggle/input ‚Äî d√πng model kh·ªüi t·∫°o.")
            model.to(device)
            return model

    # 3) Load checkpoint t·ª´ ckpt_path
    print(f"‚úÖ Loading checkpoint from: {ckpt_path}")
    ckpt = torch.load(ckpt_path, map_location=device)

    # T·ª± ƒëo√°n key
    if "model_state_dict" in ckpt:
        state_dict = ckpt["model_state_dict"]
    elif "model" in ckpt:
        state_dict = ckpt["model"]
    else:
        raise KeyError(f"Checkpoint kh√¥ng c√≥ key 'model_state_dict' ho·∫∑c 'model'. Keys: {ckpt.keys()}")

    model.load_state_dict(state_dict)
    model.to(device)
    model.eval()
    print(f"‚ú® Loaded model, epoch = {ckpt.get('epoch', 'N/A')}, val_iou = {ckpt.get('val_iou', 'N/A')}")
    return model


In [None]:
def visualize_predictions(model,
                          val_loader,
                          device,
                          num_samples: int = 3,
                          ckpt_path: str = BEST_CKPT_PATH):

    # Load model (local n·∫øu c√≥, kh√¥ng th√¨ t√¨m trong /kaggle/input)
    model = load_trained_model(
        model,
        device,
        local_ckpt=ckpt_path,
        filename_in_input="biped_edge_unet_best.pth",  # ƒë√∫ng t√™n file b·∫°n upload
    )

    model.eval()

    imgs, edges = next(iter(val_loader))
    imgs = imgs.to(device)
    edges = edges.to(device)

    with torch.no_grad():
        logits = model(imgs)
        probs = torch.sigmoid(logits)

    num_samples = min(num_samples, imgs.size(0))

    for i in range(num_samples):
        img_np = imgs[i].cpu().permute(1, 2, 0).numpy()
        gt_np = edges[i].cpu().squeeze(0).numpy()
        pred_np = probs[i].cpu().squeeze(0).numpy()

        plt.figure(figsize=(12, 4))
        plt.subplot(1, 3, 1)
        plt.title("Input")
        plt.imshow(img_np)
        plt.axis("off")

        plt.subplot(1, 3, 2)
        plt.title("GT edge")
        plt.imshow(gt_np, cmap="gray")
        plt.axis("off")

        plt.subplot(1, 3, 3)
        plt.title("Predicted edge")
        plt.imshow(pred_np, cmap="gray")
        plt.axis("off")

        plt.show()


In [None]:

visualize_predictions(
    model=model,
    val_loader=test_loader,
    device=DEVICE
)