<a href="https://colab.research.google.com/github/FarrelAD/Hology-8-2025-Data-Mining-PRIVATE/blob/dev%2Ffarrel/notebooks/vidi/SFCN/notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<a href="https://colab.research.google.com/github/FarrelAD/Hology-8-2025-Data-Mining-PRIVATE/blob/dev%2Fvidi/notebooks/vidi/SFCN/notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Crowd-counting with method SFCN + FPN

A new method that actually has better results than the other models we’ve tried so far.

# 1. Project Setup

## Import Libraries

In [None]:
import csv
import re
import json
import math
import random
from glob import glob
from typing import List, Tuple, Any
import numpy as np
import cv2
from scipy.spatial import KDTree
import numpy as np
import pandas as pd
import torch
from torch import Tensor
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Optimizer
from torch.cuda.amp import GradScaler
from torchvision import models, transforms
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import warnings


warnings.filterwarnings("ignore")


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

Device: cuda


## Dataset Download and Setup

In [2]:
# @title Setup Kaggle secret key
!pip install -q kaggle

from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

# Then move kaggle.json into the folder where the API expects to find it.
!mkdir -p ~/.kaggle/ && mv kaggle.json ~/.kaggle/ && chmod 600 ~/.kaggle/kaggle.json

Saving kaggle.json to kaggle.json
User uploaded file "kaggle.json" with length 64 bytes


In [3]:
# @title Setup dataset in Colab
import zipfile
import os
from google.colab import drive

drive.mount('/content/drive')

# Paths
zip_path = "/content/penyisihan-hology-8-0-2025-data-mining.zip"
drive_extract_path = "/content/drive/MyDrive/PROJECTS/Cognivio/Percobaan Hology 8 2025/dataset"
local_dataset_path = "/content/dataset"  # for current session

# ---------------------------
# Step 1: Download zip (if not exists in /content)
# ---------------------------
if not os.path.exists(zip_path):
    print("Dataset not found locally, downloading...")
    !kaggle competitions download -c penyisihan-hology-8-0-2025-data-mining -p /content
else:
    print("Dataset already exists, skipping download.")

# ---------------------------
# Step 2: Extract to Google Drive (for backup)
# ---------------------------
os.makedirs(drive_extract_path, exist_ok=True)

if not os.listdir(drive_extract_path):  # Check if folder is empty
    print("Extracting dataset to Google Drive...")
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(drive_extract_path)
    print("Dataset extracted to:", drive_extract_path)
else:
    print("Dataset already extracted at:", drive_extract_path)

# ---------------------------
# Step 3: Copy dataset to local /content (faster training)
# ---------------------------
if not os.path.exists(local_dataset_path):
    print("Copying dataset to Colab local storage (/content)...")
    !cp -r "$drive_extract_path" "$local_dataset_path"
else:
    print("Dataset already available in Colab local storage.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Dataset already exists, skipping download.
Dataset already extracted at: /content/drive/MyDrive/PROJECTS/Cognivio/Percobaan Hology 8 2025/dataset
Dataset already available in Colab local storage.


## Some Configuration

In [None]:
# ImageNet mean and std for normalisation
IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD = [0.229, 0.224, 0.225]

# Dataset path
TRAIN_IMG_DIR = os.path.join(local_dataset_path, "train", "images")
TRAIN_LABEL_DIR = os.path.join(local_dataset_path, "train", "labels")
TEST_IMG_DIR  = os.path.join(local_dataset_path, "test", "images")


# Seed for better reproducibility
SEED = 1337 # or 31
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)


# PREPROCESSING CONFIGURATION
CONTRAST_MODE = "clahe"
CLAHE_CLIP = 2.0
CLAHE_GRID = 8


# TRAINING CONFIGURATION
LEARNING_RATE = 1e-4
EPOCHS = 120
ACCUM_STEPS = 2
EARLY_STOP_PATIENCE = 15
CUDA_AMP = True         # CUDA Automatic Mixed Precision (AMP).

BASE_SIZE = 896
DOWN = 8
PATCH_SIZE = 384
PATCHES_PER_IMAGE = 6
AVOID_EMPTY_PATCHES = True
SIGMA_MODE = "adaptive"

BATCH = 4
NUM_WORKERS = 6

CRITERION = "huber"
COUNT_LESS_ALPHA = 0.2
DET_LOSS_ALPHA = 1.0
HYBRID_EVAL = True
DENS_THRES = 0.25
DET_PROB_THRES = 0.5

SAVE_MODEL_NAME = "sfcn_best.pth"

COMPARE_PRED = True

## Some Helper Functions

In [None]:
def derive_json_path(
    label_dir: str,
    img_path: str
) -> str:
    """Derive the corresponding JSON label path for a given image path.

    Tries to match the image basename to a JSON file in the label directory.
    Falls back to matching digits if an exact name isn't found.
    """
    name = os.path.splitext(os.path.basename(img_path))[0]
    cand = os.path.join(label_dir, name + ".json")
    if os.path.exists(cand):
        return cand
    # Try matching trailing digits
    m = re.findall(r"\d+", name)
    if m:
        alt = os.path.join(label_dir, f"{m[-1]}.json")
        if os.path.exists(alt):
            return alt
    # Fallback: any file starting with the same name
    lst = glob(os.path.join(label_dir, f"{name}*.json"))
    if lst:
        return lst[0]
    raise FileNotFoundError(f"JSON label not found for {img_path}")

def parse_points_from_json(
    path: str
) -> Tuple[np.ndarray, int]:
    """Parse annotated points from a JSON file.

    Supports several common crowd counting annotation formats. Returns an
    array of shape (N, 2) containing [x, y] coordinates and, if present
    in the JSON, the declared number of people. If no `human_num` or
    `num_human` field is present the count is returned as None.
    """
    with open(path, "r", encoding="utf-8") as f:
        obj = json.load(f)
    
    pts: List[List[float]]
    num = None
    
    if isinstance(obj, dict) and "points" in obj:
        pts = obj["points"]
        # unify possible keys for ground-truth count
        num = obj.get("human_num", obj.get("num_human", None))
        # If points are dicts, extract x/y fields
        if len(pts) > 0 and isinstance(pts[0], dict):
            pts = [[p["x"], p["y"]] for p in pts if "x" in p and "y" in p]
    elif isinstance(obj, dict) and "annotations" in obj:
        pts = [[a["x"], a["y"]] for a in obj["annotations"] if "x" in a and "y" in a]
        num = obj.get("human_num", obj.get("num_human", None))
    elif isinstance(obj, list):
        # direct list of [x,y]
        pts = obj
    else:
        raise ValueError(f"Unknown JSON schema: {path}")
    
    pts_arr = np.array(pts, dtype=np.float32) if len(pts) > 0 else np.zeros((0, 2), np.float32)
    
    return pts_arr, num

def letterbox(
    img: np.ndarray,
    target: int = 512
) -> Tuple[np.ndarray, float, int, int]:
    """Resize and pad an image to a square canvas without distortion.

    Returns the padded image, the scale factor used, and the left/top
    padding applied. The output size is (target, target).
    """
    h, w = img.shape[:2]
    scale = min(target / h, target / w)
    nh, nw = int(round(h * scale)), int(round(w * scale))
    img_rs = cv2.resize(img, (nw, nh), interpolation=cv2.INTER_LINEAR)
    top = (target - nh) // 2
    left = (target - nw) // 2
    canvas = np.zeros((target, target, 3), dtype=img_rs.dtype)
    canvas[top:top + nh, left:left + nw] = img_rs
    return canvas, scale, left, top


def make_density_map(
    points_xy: np.ndarray,
    grid_size: int,
    down: int = 8,
    sigma_mode: str = "adaptive",
    knn: int = 3,
    beta: float = 0.3,
    const_sigma: float = 2.0,
) -> np.ndarray:
    """Generate a density map on a grid given annotated points.

    The density map is of shape (grid_size//down, grid_size//down). Each
    point is represented by a Gaussian whose sigma is either constant or
    computed from the k-nearest neighbours. The integral of the density map
    approximates the number of points.
    """
    target = grid_size
    dh, dw = target // down, target // down
    den = np.zeros((dh, dw), dtype=np.float32)
    if len(points_xy) == 0:
        return den
    # Scale points to the density map resolution
    pts = points_xy.copy()
    pts[:, 0] = pts[:, 0] * (dw / target)
    pts[:, 1] = pts[:, 1] * (dh / target)
    tree = KDTree(pts) if len(pts) > 1 else None
    for (x, y) in pts:
        # Determine sigma
        if sigma_mode == "adaptive" and tree is not None and len(pts) > 3:
            dists, _ = tree.query([x, y], k=min(knn + 1, len(pts)))
            sigma = max(1.0, float(np.mean(dists[1:])) * beta)
        else:
            sigma = const_sigma
        cx, cy = float(x), float(y)
        rad = int(max(1, math.ceil(3 * sigma)))
        x0, x1 = max(0, int(math.floor(cx - rad))), min(dw, int(math.ceil(cx + rad + 1)))
        y0, y1 = max(0, int(math.floor(cy - rad))), min(dh, int(math.ceil(cy + rad + 1)))
        if x1 <= x0 or y1 <= y0:
            continue
        xs = np.arange(x0, x1) - cx
        ys = np.arange(y0, y1) - cy
        xx, yy = np.meshgrid(xs, ys)
        g = np.exp(-(xx**2 + yy**2) / (2 * sigma * sigma))
        s = g.sum()
        if s > 0:
            den[y0:y1, x0:x1] += (g / s).astype(np.float32)
    return den

def apply_contrast_enhancement(
    img_rgb: np.ndarray, 
    mode: str = "none",
    clahe_clip: float = 2.0, 
    clahe_grid: int = 8
) -> np.ndarray:
    if mode == "none": return img_rgb
    if mode == "clahe":
        lab = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2LAB)
        l,a,b = cv2.split(lab)
        clahe = cv2.createCLAHE(clipLimit=clahe_clip, tileGridSize=(clahe_grid,clahe_grid))
        l2 = clahe.apply(l)
        return cv2.cvtColor(cv2.merge([l2,a,b]), cv2.COLOR_LAB2RGB)
    if mode == "histeq":
        ycrcb = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2YCrCb)
        y,cr,cb = cv2.split(ycrcb)
        y2 = cv2.equalizeHist(y)
        return cv2.cvtColor(cv2.merge([y2,cr,cb]), cv2.COLOR_YCrCb2RGB)
    return img_rgb


def make_occupancy_map(
    points_xy: np.ndarray,
    grid_size: int,
    down: int = 8,
    radius: int = 1,
) -> np.ndarray:
    """
    Create an occupancy map marking regions around given points.

    Args:
        points_xy: Array of shape (N, 2) with x,y coordinates of points.
        grid_size: Size of the square grid (image resolution).
        down: Downsampling factor for the grid.
        radius: Radius around each point to mark as occupied.

    Returns:
        A 2D occupancy map (numpy array) of shape (grid_size//down, grid_size//down).
    """
    dh, dw = grid_size // down, grid_size // down
    occupancy = np.zeros((dh, dw), dtype=np.float32)

    if points_xy.size == 0:
        return occupancy

    # Normalize coordinates into grid coordinates
    pts = points_xy.copy()
    pts[:, 0] *= dw / grid_size
    pts[:, 1] *= dh / grid_size

    for x, y in pts:
        xi, yi = int(round(x)), int(round(y))

        x0, x1 = max(0, xi - radius), min(dw - 1, xi + radius)
        y0, y1 = max(0, yi - radius), min(dh - 1, yi + radius)

        occupancy[y0:y1 + 1, x0:x1 + 1] = 1.0

    return occupancy

# 2. Data Preprocessing

In [6]:
# Build list of images and randomly shuffle before splitting
all_imgs = sorted(glob(os.path.join(TRAIN_IMG_DIR, "*.*")))

random.shuffle(all_imgs)
n_images = len(all_imgs)

if n_images < 2:
    raise ValueError("Need at least 2 images for training and validation")

n_val = max(1, int(0.1 * n_images))
n_train = n_images - n_val

train_imgs = all_imgs[:n_train]
val_imgs = all_imgs[n_train:]


# TODO: put your any data preprocessing below it!

# 3. Dataset Loading

In [None]:
class CrowdDataset(Dataset):
    def __init__(
        self,
        img_dir: str,
        label_dir: str,
        base_size: int = 768,
        down: int = 8,
        aug: bool = True,
        mode: str = "train",
        patch_size: int = 0,
        patches_per_image: int = 1,
        sigma_mode: str = "adaptive",
        avoid_empty_patches: bool = False,
        contrast_mode: str = "none",
        clahe_clip: float = 2.0,
        clahe_grid: int = 8,
    ) -> None:
        super().__init__()

        self.img_paths: List[str] = sorted(glob(os.path.join(img_dir, "*.*")))
        if not self.img_paths:
            raise ValueError(f"No images found in {img_dir}")

        self.label_dir = label_dir
        self.base_size = base_size
        self.down = down
        self.aug = aug
        self.mode = mode
        self.patch_size = patch_size
        self.patches_per_image = max(1, int(patches_per_image))
        self.sigma_mode = sigma_mode
        self.avoid_empty_patches = avoid_empty_patches
        self.contrast_mode = contrast_mode
        self.clahe_clip = clahe_clip
        self.clahe_grid = clahe_grid

        # Transforms
        self.to_tensor = transforms.ToTensor()
        self.normalize = transforms.Normalize(mean=IMAGENET_MEAN, std=IMAGENET_STD)
        self.color_jit = transforms.ColorJitter(0.1, 0.1, 0.1, 0.05)
        self.extra_aug = transforms.Compose([
            transforms.RandomApply([transforms.RandomGrayscale(p=1.0)], p=0.15),
            transforms.RandomApply([transforms.GaussianBlur(kernel_size=3, sigma=(0.1, 2.0))], p=0.30),
            transforms.RandomResizedCrop(size=base_size, scale=(0.80, 1.00), ratio=(0.90, 1.10)),
        ])
        self.rand_erase = transforms.RandomErasing(
            p=0.25, scale=(0.02, 0.20), ratio=(0.3, 3.3), value=0
        )

        # Dataset length logic
        is_training_with_patches = (
            self.mode == "train"
            and self.patch_size > 0
            and self.patches_per_image > 1
        )
        self.effective_len = (
            len(self.img_paths) * self.patches_per_image
            if is_training_with_patches else len(self.img_paths)
        )

    def __len__(self) -> int:
        return self.effective_len

    def __getitem__(self, index: int) -> tuple[Any, Tensor, Tensor, Tensor]:
        idx_base = self._get_base_index(index)
        img, points = self._load_img_and_points(idx_base)

        if self.mode == "train" and self.patch_size > 0:
            img, points, grid = self._sample_patch(img, points)
        else:
            grid = self.base_size

        den = make_density_map(points, grid_size=grid, down=self.down, sigma_mode=self.sigma_mode)
        occ = make_occupancy_map(points, grid_size=grid, down=self.down, radius=1)

        tensor_img = self.to_tensor(img)
        if self.mode == "train" and self.aug:
            tensor_img = self.rand_erase(tensor_img)
        tensor_img = self.normalize(tensor_img)

        density = torch.from_numpy(den).unsqueeze(0)
        occupancy = torch.from_numpy(occ).unsqueeze(0)
        count = torch.tensor([float(len(points))], dtype=torch.float32)

        return tensor_img, density, count, occupancy

    def _get_base_index(self, index: int) -> int:
        """Convert effective dataset index into base image index."""
        if (
            self.mode == "train"
            and self.patch_size > 0
            and self.patches_per_image > 1
        ):
            index //= self.patches_per_image
        return index % len(self.img_paths)

    def _load_img_and_points(
        self, 
        idx: int
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Load an image and its corresponding points."""
        img_path = self.img_paths[idx]
        img = cv2.imread(img_path, cv2.IMREAD_COLOR)
        h, w = img.shape[:2]

        img = apply_contrast_enhancement(
            img, self.contrast_mode, self.clahe_clip, self.clahe_grid
        )

        lbl_path = derive_json_path(self.label_dir, img_path)
        points, _ = parse_points_from_json(lbl_path)

        # Augmentations
        if self.mode == "train" and self.aug:
            img, points = self._apply_augmentations(img, points, w)

        # Letterbox resize
        canvas, scale, left, top = letterbox(img, target=self.base_size)
        points = self._transform_points(points, scale, left, top)

        return canvas, points

    def _apply_augmentations(
        self, img: np.ndarray, points: np.ndarray, width: int
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Apply random augmentations to image and points."""
        if random.random() < 0.5:  # Horizontal flip
            img = img[:, ::-1, :].copy()
            if points.size > 0:
                points = points.copy()
                points[:, 0] = (width - 1) - points[:, 0]

        if random.random() < 0.5:  # Color jitter
            pil_img = transforms.ToPILImage()(img)
            img = np.array(self.color_jit(pil_img))

        # Extra augmentations
        pil_img = transforms.ToPILImage()(img)
        img = np.array(self.extra_aug(pil_img))

        return img, points

    def _transform_points(
        self, points: np.ndarray, scale: float, left: int, top: int
    ) -> np.ndarray:
        """Scale and shift points after letterboxing."""
        if points.size == 0:
            return np.zeros((0, 2), np.float32)

        pts = points.copy()
        pts[:, 0] = pts[:, 0] * scale + left
        pts[:, 1] = pts[:, 1] * scale + top

        mask = (
            (0 <= pts[:, 0]) & (pts[:, 0] < self.base_size) &
            (0 <= pts[:, 1]) & (pts[:, 1] < self.base_size)
        )
        return pts[mask]

    def _sample_patch(
        self, img: np.ndarray, points: np.ndarray
    ) -> Tuple[np.ndarray, np.ndarray, int]:
        """Sample a random patch from image and points."""
        ps = self.patch_size
        max_off = self.base_size - ps

        attempts = range(10) if self.avoid_empty_patches else [0]
        for attempt in attempts:
            ox = random.randint(0, max_off) if max_off > 0 else 0
            oy = random.randint(0, max_off) if max_off > 0 else 0

            crop = img[oy:oy + ps, ox:ox + ps, :]
            cropped_points = self._crop_points(points, ox, oy, ps)

            if not self.avoid_empty_patches or cropped_points.size > 0 or attempt == 9:
                return crop, cropped_points, ps

        return img, points, ps  # Fallback

    def _crop_points(
        self, points: np.ndarray, ox: int, oy: int, patch_size: int
    ) -> np.ndarray:
        """Shift and filter points inside a patch."""
        if points.size == 0:
            return np.zeros((0, 2), np.float32)

        pts = points.copy()
        pts[:, 0] -= ox
        pts[:, 1] -= oy

        mask = (
            (0 <= pts[:, 0]) & (pts[:, 0] < patch_size) &
            (0 <= pts[:, 1]) & (pts[:, 1] < patch_size)
        )
        return pts[mask]

In [8]:
# Instantiate datasets
train_ds = CrowdDataset(
    img_dir=TRAIN_IMG_DIR,
    label_dir=TRAIN_LABEL_DIR,
    base_size=BASE_SIZE,
    down=DOWN,
    aug=True,
    mode="train",
    patch_size=PATCH_SIZE,
    patches_per_image=PATCHES_PER_IMAGE,
    sigma_mode=SIGMA_MODE,
    avoid_empty_patches=AVOID_EMPTY_PATCHES,
)
val_ds = CrowdDataset(
    img_dir=TRAIN_IMG_DIR,
    label_dir=TRAIN_LABEL_DIR,
    base_size=BASE_SIZE,
    down=DOWN,
    aug=False,
    mode="val",
    patch_size=0,
    patches_per_image=1,
    sigma_mode=SIGMA_MODE,
    avoid_empty_patches=False,
)


# Override image paths after shuffling
train_ds.img_paths = train_imgs
val_ds.img_paths = val_imgs

# Recompute effective lengths for patch training
if train_ds.mode == "train" and train_ds.patch_size > 0 and train_ds.patches_per_image > 1:
    train_ds.effective_len = len(train_ds.img_paths) * train_ds.patches_per_image
else:
    train_ds.effective_len = len(train_ds.img_paths)
val_ds.effective_len = len(val_ds.img_paths)


# Data loaders
train_loader = DataLoader(
    train_ds,
    batch_size=BATCH,
    shuffle=True,
    num_workers=NUM_WORKERS,
    pin_memory=True,
    drop_last=True,
)
val_loader = DataLoader(
    val_ds,
    batch_size= BATCH,
    shuffle=False,
    num_workers=NUM_WORKERS,
    pin_memory=True,
)

# 4. Model Architecture

In [None]:
class SpatialEncoder(nn.Module):
    """
    Depthwise spatial encoder using separate horizontal and vertical convolutions.
    """

    def __init__(self, channels: int, k: int = 9) -> None:
        super().__init__()

        padding = k // 2

        # Depthwise horizontal convolutions
        self.h1: nn.Conv2d = nn.Conv2d(
            in_channels=channels,
            out_channels=channels,
            kernel_size=(1, k),
            padding=(0, padding),
            groups=channels,
            bias=False
        )

        self.h2: nn.Conv2d = nn.Conv2d(
            in_channels=channels,
            out_channels=channels,
            kernel_size=(1, k),
            padding=(0, padding),
            groups=channels,
            bias=False
        )

        # Depthwise vertical convolutions
        self.v1: nn.Conv2d = nn.Conv2d(
            in_channels=channels,
            out_channels=channels,
            kernel_size=(k, 1),
            padding=(padding, 0),
            groups=channels,
            bias=False
        )

        self.v2: nn.Conv2d = nn.Conv2d(
            in_channels=channels,
            out_channels=channels,
            kernel_size=(k, 1),
            padding=(padding, 0),
            groups=channels,
            bias=False
        )

        # Projection to original channel size
        self.proj: nn.Conv2d = nn.Conv2d(
            in_channels=channels * 4,
            out_channels=channels,
            kernel_size=1,
            bias=False
        )

        # Activation
        self.act: nn.ReLU = nn.ReLU(inplace=True)

    def forward(self, x: Tensor) -> Tensor:
        h1_out: Tensor = self.h1(x)
        h2_out: Tensor = self.h2(x)
        v1_out: Tensor = self.v1(x)
        v2_out: Tensor = self.v2(x)

        cat_out: Tensor = torch.cat([h1_out, h2_out, v1_out, v2_out], dim=1)
        proj_out: Tensor = self.proj(cat_out)
        act_out: Tensor = self.act(proj_out)

        return act_out


class SFCN_VGG_FPN(nn.Module):
    """
    VGG16-BN backbone with FPN (C2/C3/C4 -> P2/P3/P4), stride-8 head.
    Outputs (density_map, detection_logits).
    """

    def __init__(
        self, 
        pretrained: bool = True, 
        use_spatial_encoder: bool = True
    ) -> None:
        super().__init__()

        # VGG16-BN backbone
        weights = models.VGG16_BN_Weights.IMAGENET1K_V1 if pretrained else None
        vgg = models.vgg16_bn(weights=weights)
        self.features = vgg.features  # length 43

        self.use_spatial = use_spatial_encoder
        self.senc = SpatialEncoder(256, k=9) if self.use_spatial else nn.Identity()

        # FPN lateral 1x1 convolutions
        self.lat_c2 = nn.Conv2d(128, 256, kernel_size=1)
        self.lat_c3 = nn.Conv2d(256, 256, kernel_size=1)
        self.lat_c4 = nn.Conv2d(512, 256, kernel_size=1)

        # FPN smooth 3x3 convolutions
        self.smooth2 = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.smooth3 = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.smooth4 = nn.Conv2d(256, 256, kernel_size=3, padding=1)

        # Density head
        self.density_head = nn.Sequential(
            nn.Conv2d(256, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 1, kernel_size=1)
        )

        # Detection head (logits)
        self.detect_head = nn.Conv2d(256, 1, kernel_size=1)

    def _forward_backbone(
        self, 
        x: Tensor
    ) -> tuple[Tensor, Tensor, Tensor]:
        """Forward pass through VGG16 backbone, returning C2, C3, C4 features."""
        c2 = c3 = c4 = None
        pool_count = 0

        for layer in self.features:
            x = layer(x)
            if isinstance(layer, nn.MaxPool2d):
                pool_count += 1
                if pool_count == 2:
                    c2 = x
                elif pool_count == 3:
                    c3 = x
                elif pool_count == 4:
                    c4 = x

        return c2, c3, c4 # type: ignore

    def _upsample_add(
        self, 
        x: Tensor, 
        y: Tensor
    ) -> Tensor:
        """Upsample x to y's size and add."""
        return F.interpolate(x, size=y.shape[-2:], mode="nearest") + y

    def forward(
        self, 
        x: Tensor
    ) -> tuple[Tensor, Tensor]:
        c2, c3, c4 = self._forward_backbone(x)

        # FPN lateral + smooth
        p4 = self.smooth4(self.lat_c4(c4))
        p3 = self.smooth3(self._upsample_add(p4, self.lat_c3(c3)))
        p2 = self.smooth2(self._upsample_add(p3, self.lat_c2(c2)))

        # Use stride-8 features
        f = self.senc(p3)

        # Heads
        dens = F.softplus(self.density_head(f))
        det = self.detect_head(f)

        return dens, det


# Model, optimizer, scaler, and scheduler
model = SFCN_VGG_FPN(pretrained=True).to(device)
opt = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)
scaler = torch.cuda.amp.GradScaler() if (CUDA_AMP and device.type == "cuda") else None
sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=EPOCHS)

# 5. Evaluation Function

In [None]:
@torch.no_grad()
def evaluate(
    model: torch.nn.Module,
    loader: DataLoader,
    device: torch.device,
    dens_thres: float = 0.25,
    det_prob_thres: float = 0.5,
    hybrid: bool = True,
) -> Tuple[float, float]:
    """
    Evaluate model on a dataset.

    Args:
        model: The PyTorch model.
        loader: DataLoader for validation dataset.
        device: Device to run inference on.
        dens_thresh: Density threshold for hybrid counting.
        det_prob_thr: Detection probability threshold (currently unused in code).
        hybrid: If True, combine density and detection for counting.

    Returns:
        Tuple of (MAE, RMSE) over the dataset.
    """
    model.eval()
    mae = 0.0
    mse = 0.0
    nimg = 0

    for imgs, dens, _, occ in tqdm(loader, desc="Val", leave=False):
        imgs = imgs.to(device)
        dens = dens.to(device)
        occ = occ.to(device)

        pred_den, pred_det = model(imgs)

        if hybrid:
            prob = torch.sigmoid(pred_det)
            w = (pred_den < dens_thres).float() * prob
            combined = (1.0 - w) * pred_den + w * prob
            pred_count = combined.sum((1, 2, 3))
        else:
            pred_count = pred_den.sum((1, 2, 3))

        diff = (pred_count - dens.sum((1, 2, 3))).detach().cpu().numpy()
        mae += np.abs(diff).sum()
        mse += (diff**2).sum()
        nimg += imgs.size(0)

    return mae / max(1, nimg), math.sqrt(mse / max(1, nimg))

# 6. Model Training

In [None]:
def train_epoch(
    model: nn.Module,
    loader: DataLoader,
    device: torch.device,
    optimizer: Optimizer,
    scaler: GradScaler | None = None,
    accum_steps: int = 1,
    criterion: str = "mse",
    count_loss_alpha: float = 0.0,
    det_loss_alpha: float = 1.0,
) -> float:
    """Train model for one epoch and return MAE over the dataset."""
    
    model.train()

    if criterion == "mse":
        crit = nn.MSELoss()
    elif criterion == "huber":
        crit = nn.SmoothL1Loss()
    else:
        raise ValueError("criterion must be 'mse' or 'huber'")

    bce = nn.BCEWithLogitsLoss()
    running_mae = 0.0
    nimg = 0
    optimizer.zero_grad(set_to_none=True)

    for step, (imgs, dens, _, occ) in enumerate(tqdm(loader, desc="Train", leave=False), 1):
        imgs = imgs.to(device)
        dens = dens.to(device)
        occ = occ.to(device)

        # ---------------- Forward + Loss ---------------- #
        if scaler is not None:
            autocast_ctx = torch.autocast(device_type="cuda", dtype=torch.float16)
        else:
            autocast_ctx = torch.no_grad()  # dummy context

        with autocast_ctx:
            pred_den, pred_det = model(imgs)
            map_loss = crit(pred_den, dens)
            det_loss = bce(pred_det, occ)
            total_loss = map_loss + det_loss_alpha * det_loss

            if count_loss_alpha > 0.0:
                count_loss = F.mse_loss(pred_den.sum((1, 2, 3)), dens.sum((1, 2, 3)))
                total_loss = total_loss + count_loss_alpha * count_loss

            total_loss = total_loss / accum_steps

        # ---------------- Backward ---------------- #
        if scaler is not None:
            scaler.scale(total_loss).backward()
            if step % accum_steps == 0:
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad(set_to_none=True)
        else:
            total_loss.backward()
            if step % accum_steps == 0:
                optimizer.step()
                optimizer.zero_grad(set_to_none=True)

        # ---------------- Metric ---------------- #
        with torch.no_grad():
            pred_count = pred_den.sum((1, 2, 3)).detach().cpu().numpy()
            gt_count = dens.sum((1, 2, 3)).detach().cpu().numpy()
            running_mae += np.abs(pred_count - gt_count).sum()
            nimg += imgs.size(0)

    return running_mae / max(1, nimg)

In [None]:
best_mae = float("inf")
patience = EARLY_STOP_PATIENCE
bad_epochs = 0
for epoch in range(1, EPOCHS + 1):
    print(f"\nEpoch {epoch}/{EPOCHS}")
    tr_mae = train_epoch(
        model,
        loader=train_loader,
        device=device,
        optimizer=opt,
        scaler=scaler,
        accum_steps=ACCUM_STEPS,
        criterion=CRITERION,
        count_loss_alpha=COUNT_LESS_ALPHA,
        det_loss_alpha=DET_LOSS_ALPHA
    )
    va_mae, va_rmse = evaluate(
        model, 
        loader=val_loader, 
        device=device,
        dens_thres=DENS_THRES,
        det_prob_thres=DET_PROB_THRES,
        hybrid=HYBRID_EVAL
    )
    sched.step()
    print(
        f"Train MAE: {tr_mae:.3f} | Val MAE: {va_mae:.3f} | Val RMSE: {va_rmse:.3f}"
    )
    
    if COMPARE_PRED:
        print(f"Perbandingan ground truth vs prediksi pada validation set untuk epoch {epoch}:")
        # Evaluate on validation set with current model
        model.eval()
        idx = 0
        with torch.no_grad():
            for imgs, dens, _, occ in val_loader:
                imgs = imgs.to(device)
                dens = dens.to(device)
                pred_den, pred_det = model(imgs)
                # Compute predicted counts using hybrid gating if enabled
                if args.hybrid_eval:
                    prob = torch.sigmoid(pred_det)
                    gate_on = (prob >= args.det_prob_thr).float()
                    w = (pred_den < args.dens_thresh).float() * gate_on
                    comb = (1.0 - w) * pred_den + w * prob
                    pred_cnt_batch = comb.sum((1, 2, 3))
                else:
                    pred_cnt_batch = pred_den.sum((1, 2, 3))
                gt_cnt_batch = dens.sum((1, 2, 3))
                # Iterate over each sample in the batch
                for j in range(pred_cnt_batch.size(0)):
                    # Safely retrieve filename; fallback to index if out of range
                    if idx < len(val_ds.img_paths):
                        image_name = os.path.basename(val_ds.img_paths[idx])
                    else:
                        image_name = f"sample_{idx}"
                    print(f"{image_name}: GT {gt_cnt_batch[j].item():.2f}, Pred {pred_cnt_batch[j].item():.2f}")
                    idx += 1
    
    
    if va_mae + 1e-6 < best_mae:
        best_mae = va_mae
        bad_epochs = 0

        # Save the model to .pth file if necessary
        # torch.save(
        #     {
        #         "model": model.state_dict(),
        #         "epoch": epoch,
        #         "val_mae": va_mae,
        #         "": vars(),
        #     },
        #     SAVE_MODEL_NAME,
        # )
        # print(f"✅ New best: {SAVE_MODEL_NAME} (Val MAE={va_mae:.3f})")
    else:
        bad_epochs += 1
        print(f"No improvement for {bad_epochs} epoch(s).")
        if bad_epochs >= patience:
            print(
                f"Early stopping at epoch {epoch}. Best Val MAE: {best_mae:.3f}"
            )
            break
print("Training finished. Best Val MAE:", best_mae)

if COMPARE_PRED:
    print("Perbandingan ground truth vs prediksi pada validation set:")
    # Load the best model from checkpoint if it exists to ensure evaluation uses the best model
    if os.path.exists(SAVE_MODEL_NAME):
        ckpt = torch.load(SAVE_MODEL_NAME, map_location=device)
        # Some checkpoints save under 'model' key, others are raw state_dict
        if isinstance(ckpt, dict) and "model" in ckpt:
            model.load_state_dict(ckpt["model"])
        else:
            model.load_state_dict(ckpt)
    # Set model to evaluation mode
    model.eval()
    # Index to map validation samples to filenames
    idx = 0
    with torch.no_grad():
        for imgs, dens, _, occ in val_loader:
            imgs = imgs.to(device)
            dens = dens.to(device)
            # Forward pass
            pred_den, pred_det = model(imgs)
            # Compute counts using hybrid gating if requested
            if HYBRID_EVAL:
                prob = torch.sigmoid(pred_det)
                gate_on = (prob >= DET_PROB_THRES).float()
                w = (pred_den < args.dens_thresh).float() * gate_on
                comb = (1.0 - w) * pred_den + w * prob
                pred_cnt_batch = comb.sum((1, 2, 3))
            else:
                pred_cnt_batch = pred_den.sum((1, 2, 3))
            gt_cnt_batch = dens.sum((1, 2, 3))
            # Iterate over each sample in the batch
            for j in range(pred_cnt_batch.size(0)):
                # Safely retrieve filename; fallback to index if out of range
                if idx < len(val_ds.img_paths):
                    image_name = os.path.basename(val_ds.img_paths[idx])
                else:
                    image_name = f"sample_{idx}"
                print(f"{image_name}: GT {gt_cnt_batch[j].item():.2f}, Pred {pred_cnt_batch[j].item():.2f}")
                idx += 1

# 7. Test Set Prediction and Submit Result

In [None]:
class TestDataset(Dataset):
    """Dataset for test images."""
    def __init__(
        self,
        image_dir: str,
        transform=None
    ):
        self.image_dir = image_dir
        self.transform = transform
        self.image_files = sorted([
            f for f in os.listdir(image_dir) if f.endswith('.jpg')],
            key=lambda x: int(os.path.splitext(x)[0]
        ))

    def __len__(self) -> int:
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = self.image_files[idx]
        img_path = os.path.join(self.image_dir, img_name)

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        if self.transform:
            augmented = self.transform(image=image)
            image = augmented['image']

        return image, img_name

# --- Prediction Function ---
def generate_sfcn_predictions(
    model,
    test_dir: str,
    output_file: str = "sfcn_submission.csv",
    batch_size: int = 16,
    transform=None,
) -> pd.DataFrame:
    """Generate test predictions using the SFCN model."""

    print("Generating test predictions with SFCN model...")

    # Dataset + DataLoader
    test_dataset = TestDataset(test_dir, transform)
    test_loader = DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=2,
    )

    model.eval()
    predictions = []
    image_names = []

    with torch.no_grad():
        for images, names in test_loader:
            images = images.to(device)
            pred_density, pred_count = model(images)

            # Use count predictions for submission
            batch_preds = pred_count.cpu().numpy()
            predictions.extend([max(0, int(round(pred))) for pred in batch_preds])
            image_names.extend(names)

    # Create submission DataFrame
    submission_df = pd.DataFrame({
        'image_id': image_names,
        'predicted_count': predictions
    })

    # Sort by image_id
    submission_df['sort_key'] = submission_df['image_id'].apply(lambda x: int(os.path.splitext(x)[0]))
    submission_df = submission_df.sort_values('sort_key').drop('sort_key', axis=1).reset_index(drop=True)

    # Save submission
    submission_df.to_csv(output_file, index=False)

    print(f"Submission saved to {output_file}")
    print(f"Predictions for {len(submission_df)} test images")

    # Statistics
    pred_counts = submission_df['predicted_count'].values
    print(f"\nTest Predictions Statistics:")
    print(f"Min: {pred_counts.min()}")
    print(f"Max: {pred_counts.max()}")
    print(f"Mean: {pred_counts.mean():.2f}")
    print(f"Median: {np.median(pred_counts):.2f}")

    return submission_df

# Run predictions
submission_df = generate_sfcn_predictions(
    model,
    test_dir=TEST_IMG_DIR,
    batch_size=16,
    # transform=test_transforms
)
submission_df.head(50)