# Data Understanding & Preparation

Config & File Loading

In [None]:
from pathlib import Path
import numpy as np
import cv2
import pandas as pd
from tqdm import tqdm
import re

In [None]:
DATA_ROOT = Path("/kaggle/input/data-science-ara-7-0/dataset/dataset")
TRAIN_IMG_DIR = DATA_ROOT / "train" / "images"
TRAIN_MASK_DIR = DATA_ROOT / "train" / "mask"
TEST_IMG_DIR  = DATA_ROOT / "test" / "images"

IMG_EXTS = {".jpg", ".jpeg", ".png"}

In [None]:
train_images = sorted([p for p in TRAIN_IMG_DIR.iterdir() if p.suffix.lower() in IMG_EXTS])
train_masks  = sorted([p for p in TRAIN_MASK_DIR.iterdir() if p.suffix.lower() in IMG_EXTS])
test_images  = sorted([p for p in TEST_IMG_DIR.iterdir() if p.suffix.lower() in IMG_EXTS])

print(f"[INFO] Train images : {len(train_images)}")
print(f"[INFO] Train masks  : {len(train_masks)}")
print(f"[INFO] Test images  : {len(test_images)}")

Pair Image Mask & Manifest

In [None]:
def extract_index(name: str):
    m = re.search(r"(\d+)", name)
    return m.group(1) if m else None

# Build mask index
mask_index = {}
for m in train_masks:
    idx = extract_index(m.stem)
    if idx is not None:
        mask_index[idx] = m

# Pair image-mask
pairs = []
for img in train_images:
    idx = extract_index(img.stem)
    if idx in mask_index:
        pairs.append({
            "image_path": img,
            "mask_path": mask_index[idx],
            "id": idx
        })

assert len(pairs) > 0, "No valid image-mask pairs found"
print(f"[INFO] Valid image-mask pairs: {len(pairs)}")

# Final manifest
df_manifest = pd.DataFrame({
    "image_path": [str(p["image_path"]) for p in pairs],
    "mask_path":  [str(p["mask_path"]) for p in pairs],
    "id":         [p["id"] for p in pairs],
})

print(f"[INFO] Final training samples: {len(df_manifest)}")

Morphology & Dice Riks Analysis

In [None]:
records = []
all_component_areas = []

for p in tqdm(pairs, desc="Analyzing dataset"):
    mask = cv2.imread(str(p["mask_path"]), cv2.IMREAD_GRAYSCALE)
    h, w = mask.shape
    total_pixels = h * w

    bin_mask = (mask == 255).astype(np.uint8)
    pothole_pixels = bin_mask.sum()
    area_ratio = pothole_pixels / total_pixels

    num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(
        bin_mask, connectivity=8
    )

    component_areas = stats[1:, cv2.CC_STAT_AREA] if num_labels > 1 else []
    if len(component_areas) > 0:
        all_component_areas.extend(component_areas.tolist())

    records.append({
        "image": p["image_path"].name,
        "height": h,
        "width": w,
        "has_pothole": int(pothole_pixels > 0),
        "area_ratio": area_ratio,
        "total_pothole_pixels": pothole_pixels,
        "num_components": len(component_areas),
        "max_component_ratio": (
            component_areas.max() / total_pixels if len(component_areas) > 0 else 0.0
        ),
        "min_component_pixels": (
            component_areas.min() if len(component_areas) > 0 else 0
        ),
    })

df = pd.DataFrame(records)

Insight, Prior & Feasibility

In [None]:
print("\n[INSIGHT] Pothole presence distribution:")
print(df["has_pothole"].value_counts())

empty_ratio = (df["has_pothole"] == 0).mean()
print(f"\n[INSIGHT] Empty-mask ratio: {empty_ratio:.2%}")

print("\n[INSIGHT] Pothole area ratio:")
print(df["area_ratio"].describe(percentiles=[0.1, 0.25, 0.5, 0.75, 0.9]))

print("\n[INSIGHT] Number of components per image:")
print(df["num_components"].describe())

print("\n[INSIGHT] Dominant component ratio:")
print(df["max_component_ratio"].describe())


In [None]:
# Small-object analysis
comp_series = pd.Series(all_component_areas)

print("\n[INSIGHT] Connected component area (pixels):")
print(comp_series.describe(percentiles=[0.1, 0.25, 0.5, 0.75, 0.9]))

min_area_candidate = int(comp_series.quantile(0.10))
print(f"\n[RECOMMENDATION] Candidate MIN_AREA: ~{min_area_candidate} pixels")


In [None]:
# Dice feasibility
tiny_image_ratio = (df["area_ratio"] < 0.01).mean()

print("\n[FEASIBILITY CHECK]")
print(f"Images with pothole <1% area: {tiny_image_ratio:.2%}")

if tiny_image_ratio > 0.6:
    feasibility = "HARD"
elif tiny_image_ratio > 0.4:
    feasibility = "MODERATE"
else:
    feasibility = "FAVORABLE"

print(f"[FEASIBILITY STATUS] {feasibility}")

print("\n[THRESHOLD PRIOR]")
print("→ Start sweep in range: 0.30 – 0.45")

# Preprocessing & Data Augmentation

Normalization & Base Config

In [None]:
import albumentations as A
from albumentations.pytorch import ToTensorV2
import cv2

IMG_SIZE = 512

IMAGENET_MEAN = (0.485, 0.456, 0.406)
IMAGENET_STD  = (0.229, 0.224, 0.225)

Transform Factory

In [None]:
def build_base_transform():
    return [
        A.Resize(IMG_SIZE, IMG_SIZE, interpolation=cv2.INTER_LINEAR),
        A.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
        ToTensorV2(),
    ]

Train Augmentation 

In [None]:
train_transform = A.Compose(
    [
        A.Resize(IMG_SIZE, IMG_SIZE, interpolation=cv2.INTER_LINEAR),

        # Geometry (Dice-safe)
        A.HorizontalFlip(p=0.5),

        A.Affine(
            scale=(0.95, 1.07),
            translate_percent=(0.0, 0.04),
            rotate=(-3.0, 3.0),
            shear=(-2.0, 2.0),
            interpolation=cv2.INTER_LINEAR,
            mode=cv2.BORDER_REFLECT_101,
            p=0.45,
        ),

        # Photometric
        A.RandomBrightnessContrast(
            brightness_limit=0.20,
            contrast_limit=0.20,
            p=0.70,
        ),

        A.HueSaturationValue(
            hue_shift_limit=6,
            sat_shift_limit=12,
            val_shift_limit=6,
            p=0.35,
        ),

        # Shadow robustness
        A.RandomShadow(
            shadow_roi=(0, 0.5, 1, 1),
            num_shadows_lower=1,
            num_shadows_upper=2,
            shadow_dimension=5,
            p=0.25,
        ),

        # Mild noise / blur
        A.OneOf(
            [
                A.GaussianBlur(blur_limit=3),
                A.GaussNoise(var_limit=(4.0, 15.0)),
            ],
            p=0.18,
        ),

        # Normalize
        A.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD),
        ToTensorV2(),
    ],
    additional_targets={"mask": "mask"},
)


Valid & Test 

In [None]:
valid_transform = A.Compose(
    build_base_transform(),
    additional_targets={"mask": "mask"},
)

test_transform = A.Compose(
    build_base_transform()
)

# Model Construction & Training

In [None]:
!pip install -q segmentation-models-pytorch==0.3.3 timm

Seed, Device, Split

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR
import segmentation_models_pytorch as smp
from sklearn.model_selection import train_test_split
import numpy as np
import cv2
from tqdm import tqdm

SEED = 42
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
np.random.seed(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# Use df_manifest from STAGE 1
df_train, df_val = train_test_split(
    df_manifest,
    test_size=0.15,
    random_state=SEED,
    shuffle=True
)

print("Train:", len(df_train), "| Val:", len(df_val))

Dataset Class

In [None]:
class PotholeDataset(Dataset):
    def __init__(self, df, transform):
        self.df = df.reset_index(drop=True)
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img = cv2.imread(self.df.loc[idx, "image_path"])
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        mask = cv2.imread(self.df.loc[idx, "mask_path"], cv2.IMREAD_GRAYSCALE)
        mask = (mask == 255).astype("float32")

        aug = self.transform(image=img, mask=mask)
        return aug["image"], aug["mask"].unsqueeze(0)

Loss, Metric, Model Factory

In [None]:
def dice_coef(prob, target, thr=0.40, eps=1e-7):
    pred = (prob > thr).float()
    inter = (pred * target).sum(dim=(2,3))
    union = pred.sum(dim=(2,3)) + target.sum(dim=(2,3))
    return ((2 * inter + eps) / (union + eps)).mean()

dice_loss = smp.losses.DiceLoss(mode="binary", from_logits=True)
focal_loss = smp.losses.FocalLoss(mode="binary", gamma=2.0)

def build_model(name):
    if name == "unetpp":
        return smp.UnetPlusPlus(
            encoder_name="efficientnet-b4",
            encoder_weights="imagenet",
            in_channels=3,
            classes=1,
        )
    elif name == "deeplab":
        return smp.DeepLabV3Plus(
            encoder_name="resnet101",
            encoder_weights="imagenet",
            in_channels=3,
            classes=1,
        )
    else:
        raise ValueError("Unknown model name")

Train Loop

In [None]:
def train_one_model(name, max_epoch):

    print(f"\n===== TRAINING {name.upper()} =====")

    model = build_model(name).to(device)

    optimizer = AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)
    scheduler = CosineAnnealingLR(optimizer, T_max=max_epoch)

    train_loader = DataLoader(
        PotholeDataset(df_train, train_transform),
        batch_size=4,
        shuffle=True,
        num_workers=2,
        pin_memory=True
    )

    val_loader = DataLoader(
        PotholeDataset(df_val, valid_transform),
        batch_size=4,
        shuffle=False,
        num_workers=2,
        pin_memory=True
    )

    best_val = 0.0

    for epoch in range(max_epoch):

        # -------- TRAIN --------
        model.train()
        total_loss = 0.0

        for imgs, masks in tqdm(train_loader, desc=f"{name} | Epoch {epoch+1}"):
            imgs, masks = imgs.to(device), masks.to(device)

            optimizer.zero_grad()

            logits = model(imgs)
            loss = dice_loss(logits, masks) + 0.6 * focal_loss(logits, masks)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        scheduler.step()
        avg_loss = total_loss / len(train_loader)

        # -------- VALID --------
        model.eval()
        dices = []

        with torch.no_grad():
            for imgs, masks in val_loader:
                imgs, masks = imgs.to(device), masks.to(device)
                prob = torch.sigmoid(model(imgs))
                dices.append(dice_coef(prob, masks).item())

        val_dice = float(np.mean(dices))

        print(
            f"{name} | Epoch {epoch+1:02d} | "
            f"TrainLoss {avg_loss:.4f} | ValDice {val_dice:.4f}"
        )

        if val_dice > best_val:
            best_val = val_dice
            torch.save(model.state_dict(), f"/kaggle/working/best_{name}.pt")
            print(f">> Best {name} saved")

    print(f"[DONE] {name} best Val Dice: {best_val:.4f}")

Run Training 

In [None]:
train_one_model("unetpp", max_epoch=28)
train_one_model("deeplab", max_epoch=20)

# Optimization, Validation & Refinement

In [None]:
!pip install -q optuna

Setup

In [None]:
import optuna
import numpy as np
import torch
import cv2
from tqdm import tqdm

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", DEVICE)

# Use df_val from STAGE 3
print("[INFO] Validation samples:", len(df_val))

Validation Loader 

In [None]:
val_loader = DataLoader(
    PotholeDataset(df_val, valid_transform),
    batch_size=4,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)

Load Models

In [None]:
unetpp = build_model("unetpp").to(DEVICE)
deeplab = build_model("deeplab").to(DEVICE)

unetpp.load_state_dict(torch.load("/kaggle/working/best_unetpp.pt", map_location=DEVICE))
deeplab.load_state_dict(torch.load("/kaggle/working/best_deeplab.pt", map_location=DEVICE))

unetpp.eval()
deeplab.eval()

print("Models loaded")

Metric & Postprocess

In [None]:
def dice_score(pred, target, eps=1e-7):
    inter = (pred * target).sum()
    union = pred.sum() + target.sum()
    return (2 * inter + eps) / (union + eps)

def remove_small_objects(mask, min_area):
    num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(
        mask.astype(np.uint8), connectivity=8
    )
    clean = np.zeros_like(mask, dtype=np.uint8)
    for i in range(1, num_labels):
        if stats[i, cv2.CC_STAT_AREA] >= min_area:
            clean[labels == i] = 1
    return clean

Optuna Objective 

In [None]:

    w_u, w_d = w_u / s, w_d / s

    threshold = trial.suggest_float("threshold", 0.30, 0.45)
    min_area  = trial.suggest_int("min_area", 100, 400, step=20)

    dices = []

    with torch.no_grad():
        for imgs, masks in val_loader:

            imgs = imgs.to(DEVICE)
            gt = masks.numpy()

            pu = torch.sigmoid(unetpp(imgs)).cpu().numpy()
            pd = torch.sigmoid(deeplab(imgs)).cpu().numpy()

            prob = w_u * pu + w_d * pd

            for i in range(prob.shape[0]):

                pred = (prob[i, 0] > threshold).astype(np.uint8)
                pred = remove_small_objects(pred, min_area)

                dices.append(dice_score(pred, gt[i, 0]))

    return float(np.mean(dices))

Run Optuna 

In [None]:
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=40, show_progress_bar=True)

best = study.best_params
best_dice = study.best_value

# normalize weights
ws = best["w_unetpp"] + best["w_deeplab"]
best["w_unetpp"] /= ws
best["w_deeplab"] /= ws

OPT_CONFIG = {
    "weights": {
        "unetpp": best["w_unetpp"],
        "deeplab": best["w_deeplab"],
    },
    "threshold": best["threshold"],
    "min_area": best["min_area"],
}

print("\n[OPTUNA BEST CONFIG]")
for k, v in OPT_CONFIG.items():
    print(k, ":", v)

print(f"Validation Dice: {best_dice:.4f}")

# Inference, Encoding & Submission

Setup

In [None]:
import pandas as pd
from pathlib import Path

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

TEST_IMG_DIR = DATA_ROOT / "test" / "images"
SAMPLE_SUB = Path("/kaggle/input/data-science-ara-7-0/sample_submission.csv")

W_U = OPT_CONFIG["weights"]["unetpp"]
W_D = OPT_CONFIG["weights"]["deeplab"]
THRESHOLD = OPT_CONFIG["threshold"]
MIN_AREA = OPT_CONFIG["min_area"]

print("Using optimized ensemble config")

Load Models 

In [None]:
unetpp = build_model("unetpp").to(DEVICE)
deeplab = build_model("deeplab").to(DEVICE)

unetpp.load_state_dict(torch.load("/kaggle/working/best_unetpp.pt", map_location=DEVICE))
deeplab.load_state_dict(torch.load("/kaggle/working/best_deeplab.pt", map_location=DEVICE))

unetpp.eval()
deeplab.eval()

print("Models loaded")

Test Transform

In [None]:
test_transform = A.Compose(
    build_base_transform()
)

RLE

In [None]:
def encode_rle(mask: np.ndarray) -> str:
    binary = (mask == 1).astype(np.uint8)
    pixels = binary.T.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[0::2]
    return " ".join(str(x) for x in runs)

Final Ensemble INference

In [None]:
test_images = sorted(TEST_IMG_DIR.glob("*.jpg"))
records = []

with torch.no_grad():
    for img_path in tqdm(test_images, desc="Final Inference"):

        img_name = img_path.name

        img = cv2.imread(str(img_path))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        h0, w0 = img.shape[:2]

        # apply same transform as training
        aug = test_transform(image=img)
        x = aug["image"].unsqueeze(0).to(DEVICE)

        x_flip = torch.flip(x, dims=[3])

        # forward
        p_u = torch.sigmoid(unetpp(x))
        p_d = torch.sigmoid(deeplab(x))

        p_u_f = torch.flip(torch.sigmoid(unetpp(x_flip)), dims=[3])
        p_d_f = torch.flip(torch.sigmoid(deeplab(x_flip)), dims=[3])

        p_u = (p_u + p_u_f) / 2.0
        p_d = (p_d + p_d_f) / 2.0

        prob = (W_U * p_u + W_D * p_d)[0, 0].cpu().numpy()

        pred = (prob > THRESHOLD).astype(np.uint8)
        pred = remove_small_objects(pred, MIN_AREA)

        # resize back
        pred = cv2.resize(pred, (w0, h0), interpolation=cv2.INTER_NEAREST)

        rle = "" if pred.sum() == 0 else encode_rle(pred)

        records.append({
            "ImageId": img_name,
            "rle": rle
        })

Submission

In [None]:
df_sub = pd.DataFrame(records)
df_sample = pd.read_csv(SAMPLE_SUB)

df_sub = df_sub.set_index("ImageId").loc[df_sample["ImageId"]].reset_index()

OUT_SUB = "/kaggle/working/submission.csv"
df_sub.to_csv(OUT_SUB, index=False)

print("Saved to:", OUT_SUB)
print("Rows:", len(df_sub))
print("Empty RLE:", (df_sub["rle"] == "").sum())
print(df_sub.head())