PREPROCESSING 
   1. Optional cleaning
   2. Converting BGR->RGB
   3. Resizing
   4. Normalization
   5. Converting to tensor
   6. Loading .mat files
   7. Generating density maps 
   8. Downsampling the density map
   9. Converting to tensor
   10. Saving the .pt files 

In [1]:
import os
from pathlib import Path
import cv2
import numpy as np
import scipy.io as sio
from scipy.ndimage import gaussian_filter
import torch
from torch.utils.data import Dataset, DataLoader

ModuleNotFoundError: No module named 'torch'

In [None]:

RAW_IMG_DIR = Path(r"C:\Users\mahal\OneDrive\Desktop\DL\archive\ShanghaiTech\part_A\train_data\images")
RAW_GT_DIR  = Path(r"C:\Users\mahal\OneDrive\Desktop\DL\archive\ShanghaiTech\part_A\train_data\ground-truth")

In [None]:
OUT_IMG_TORCH_DIR = Path(r"C:\Users\mahal\OneDrive\Desktop\DL\torch_images_trainA")
OUT_GT_TORCH_DIR  = Path(r"C:\Users\mahal\OneDrive\Desktop\DL\torch_density_trainA")

In [None]:
TARGET_W, TARGET_H = 512, 512
DOWNSAMPLE_FACTOR = 8
GAUSSIAN_SIGMA = 4
CLEAN_METHOD = "none"        # "none", "denoise", "clahe", "denoise+clahe"
NORMALIZE_METHOD = "imagenet" # "imagenet" recommended
IMAGENET_MEAN = np.array([0.485, 0.456, 0.406], dtype=np.float32)
IMAGENET_STD  = np.array([0.229, 0.224, 0.225], dtype=np.float32)
EXTS = (".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff")
# ----------------------------------------------------

In [None]:
OUT_IMG_TORCH_DIR.mkdir(parents=True, exist_ok=True)
OUT_GT_TORCH_DIR.mkdir(parents=True, exist_ok=True)

In [None]:
#  robust mat reader
def read_points_from_mat(mat_path):
    """
    Robust reader for ShanghaiTech-style GT .mat files.
    Returns Nx2 float32 array of (x,y) points or empty array if nothing found.
    """
    mat = sio.loadmat(mat_path)
    # if common 'image_info' key exists try multiple nestings
    if "image_info" in mat:
        info = mat["image_info"]
        # try several common access patterns
        candidates = []
        try:
            candidates.append(info[0][0][0][0])
        except Exception:
            pass
        try:
            candidates.append(info[0][0][0][0][0])
        except Exception:
            pass
        try:
            candidates.append(info[0][0])
        except Exception:
            pass

        for cand in candidates:
            if isinstance(cand, np.ndarray) and cand.ndim == 2 and cand.shape[1] == 2:
                return cand.astype(np.float32)
            if isinstance(cand, np.ndarray) and cand.dtype == object:
                for item in cand.ravel():
                    if isinstance(item, np.ndarray) and item.ndim == 2 and item.shape[1] == 2:
                        return item.astype(np.float32)

    # fallback: scan all keys for a 2-column numeric array or inside object arrays
    for k, v in mat.items():
        if isinstance(v, np.ndarray) and v.ndim == 2 and v.shape[1] == 2:
            return v.astype(np.float32)
        if isinstance(v, np.ndarray) and v.dtype == object:
            for item in v.ravel():
                if isinstance(item, np.ndarray) and item.ndim == 2 and item.shape[1] == 2:
                    return item.astype(np.float32)

    # nothing found
    return np.zeros((0, 2), dtype=np.float32)

In [None]:
# optional cleaning 
def denoise(img_bgr):
    return cv2.fastNlMeansDenoisingColored(img_bgr, None, 10, 10, 7, 21)
def clahe(img_bgr):
    ycrcb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2YCrCb)
    y, cr, cb = cv2.split(ycrcb)
    c = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    y2 = c.apply(y)
    merged = cv2.merge((y2, cr, cb))
    return cv2.cvtColor(merged, cv2.COLOR_YCrCb2BGR)
def clean_image(img_bgr, method):
    if method == "none":
        return img_bgr
    if method == "denoise":
        return denoise(img_bgr)
    if method == "clahe":
        return clahe(img_bgr)
    if method == "denoise+clahe":
        return clahe(denoise(img_bgr))
    return img_bgr

In [None]:
# helpers 
def list_images(folder):
    return sorted([p.name for p in folder.iterdir() if p.suffix.lower() in EXTS])

In [None]:
def normalize_image_rgb(img_rgb, method=NORMALIZE_METHOD):
    img = img_rgb.astype(np.float32) / 255.0
    if method == "imagenet":
        return (img - IMAGENET_MEAN) / IMAGENET_STD
    return img

In [None]:
def generate_density_map(img_h, img_w, points, sigma=GAUSSIAN_SIGMA):
    density = np.zeros((img_h, img_w), dtype=np.float32)
    for p in points:
        x = int(min(img_w - 1, max(0, round(p[0]))))
        y = int(min(img_h - 1, max(0, round(p[1]))))
        density[y, x] += 1.0
    density = gaussian_filter(density, sigma=sigma)
    return density

In [None]:
def downsample_density(density, factor):
    H, W = density.shape
    new_h, new_w = H // factor, W // factor
    if new_h <= 0 or new_w <= 0:
        raise ValueError("Downsample factor too large for density size.")
    small = cv2.resize(density, (new_w, new_h), interpolation=cv2.INTER_AREA)
    small = small * (factor * factor)  # preserve counts
    return small

In [None]:
# main preprocessing
def preprocess_save_torch(
    raw_img_dir=RAW_IMG_DIR,
    raw_gt_dir=RAW_GT_DIR,
    out_img_torch_dir=OUT_IMG_TORCH_DIR,
    out_gt_torch_dir=OUT_GT_TORCH_DIR,
    target_w=TARGET_W,
    target_h=TARGET_H,
    clean_method=CLEAN_METHOD,
    normalize_method=NORMALIZE_METHOD,
    downsample_factor=DOWNSAMPLE_FACTOR
):
    img_files = list_images(raw_img_dir)
    print(f"Found {len(img_files)} images in {raw_img_dir}")

    for i, fname in enumerate(img_files, 1):
        base = os.path.splitext(fname)[0]
        img_path = raw_img_dir / fname

        # try GT filename patterns used by ShanghaiTech
        gt_path1 = raw_gt_dir / f"GT_{base}.mat"
        gt_path2 = raw_gt_dir / f"{base}.mat"
        gt_path = gt_path1 if gt_path1.exists() else (gt_path2 if gt_path2.exists() else None)

        if gt_path is None:
            print(f"[{i}/{len(img_files)}] Missing GT for {fname} (expected {gt_path1.name} or {gt_path2.name}) -> skipping")
            continue

        img_bgr = cv2.imread(str(img_path))
        if img_bgr is None:
            print(f"[{i}/{len(img_files)}] Failed to read image {img_path} -> skipping")
            continue

        orig_h, orig_w = img_bgr.shape[:2]
        sx = target_w / float(orig_w)
        sy = target_h / float(orig_h)

        # optional cleaning
        img_bgr = clean_image(img_bgr, clean_method)

        # resize
        resized_bgr = cv2.resize(img_bgr, (target_w, target_h), interpolation=cv2.INTER_AREA)
        resized_rgb = cv2.cvtColor(resized_bgr, cv2.COLOR_BGR2RGB)

        # normalize
        img_norm = normalize_image_rgb(resized_rgb, method=normalize_method)

        # to torch tensor [C,H,W]
        img_t = torch.from_numpy(img_norm.astype(np.float32)).permute(2, 0, 1).contiguous()

        # read points robustly and resize them to target
        pts = read_points_from_mat(str(gt_path))
        if pts.size != 0:
            pts_resized = pts.copy()
            pts_resized[:, 0] = pts[:, 0] * sx
            pts_resized[:, 1] = pts[:, 1] * sy
        else:
            pts_resized = pts

        # generate density full-res and downsample
        density_full = generate_density_map(target_h, target_w, pts_resized, sigma=GAUSSIAN_SIGMA)
        density_down = downsample_density(density_full, downsample_factor)

        # to torch density tensor [1, H_down, W_down]
        dens_t = torch.from_numpy(density_down.astype(np.float32)).unsqueeze(0).contiguous()

        # save .pt files
        img_out = out_img_torch_dir / (base + ".pt")
        gt_out  = out_gt_torch_dir  / (base + ".pt")
        torch.save(img_t, str(img_out))
        torch.save(dens_t, str(gt_out))

        print(f"[{i}/{len(img_files)}] Saved {base}  pts:{pts_resized.shape[0]}  img:{img_t.shape} gt:{dens_t.shape}")

    print("\nâœ” Preprocessing completed.")
    print("Images saved to:", out_img_torch_dir)
    print("GTs saved to   :", out_gt_torch_dir)

In [None]:

if __name__ == "__main__":
    # Run preprocessing
    preprocess_save_torch()


    # Get one .pt file from the output directory
    sample_files = list(OUT_IMG_TORCH_DIR.glob("*.pt"))
    
    if len(sample_files) == 0:
        print("No .pt files found in output directory.")
    else:
        sample_path = sample_files[0]  # take the first .pt file
        base = sample_path.stem
        
        # Load saved tensors
        img_t = torch.load(OUT_IMG_TORCH_DIR / f"{base}.pt")
        gt_t  = torch.load(OUT_GT_TORCH_DIR  / f"{base}.pt")

        # Print shapes
        print("\n=== SHAPE CHECK ===")
        print("Image tensor shape :", img_t.shape)    # [3, H, W]
        print("GT density shape   :", gt_t.shape)     # [1, H_down, W_down]
        print("Sample file        :", base)
