In [18]:
# --- Clean imports & config ---
from pathlib import Path
import zipfile, urllib.request, random, time

import numpy as np
import pandas as pd
from PIL import Image

import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, datasets
from torchvision.models import resnet18, ResNet18_Weights
from torchvision.models.segmentation import fcn_resnet50, FCN_ResNet50_Weights
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

SEED = 42
random.seed(SEED); torch.manual_seed(SEED); torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.benchmark = True
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

BASE_DIR = Path("./data/jsrt").resolve()
BASE_DIR.mkdir(parents=True, exist_ok=True)

def download_if_needed(name: str, url: str, dst_dir: Path = BASE_DIR) -> Path:
    """Downloads <name>.zip to dst_dir and extracts into dst_dir/<name>."""
    out_dir = dst_dir / name
    zip_path = dst_dir / f"{name}.zip"
    if out_dir.exists() and any(out_dir.iterdir()):
        print(f"[{name}] ready at {out_dir}")
        return out_dir
    if not zip_path.exists():
        print(f"[DL] {name} -> {zip_path}")
        urllib.request.urlretrieve(url, zip_path)
    print(f"[EXTRACT] {zip_path} -> {out_dir}")
    out_dir.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(zip_path, "r") as zf:
        zf.extractall(out_dir)
    return out_dir

# ImageNet-normalized transforms (classification)
weights = ResNet18_Weights.DEFAULT

def get_imagenet_preprocess(weights):
    try:
        return weights.transforms()  # callable; works in newer torchvision
    except Exception:
        # Fallback for older torchvision
        from torchvision.transforms import InterpolationMode
        return transforms.Compose([
            transforms.Resize(232, interpolation=InterpolationMode.BILINEAR),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
        ])

preprocess = get_imagenet_preprocess(weights)

cls_tfms = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),
    preprocess,  # keep as a single callable; don't try to access .transforms
])

# --- Generic training helpers (classification) ---
def build_resnet18(num_classes: int, freeze_backbone: bool = True) -> nn.Module:
    m = resnet18(weights=weights)
    if freeze_backbone:
        for p in m.parameters():
            p.requires_grad = False
    m.fc = nn.Linear(m.fc.in_features, num_classes)
    return m.to(DEVICE)

def run_epoch(model, loader, criterion, optimizer=None):
    train_mode = optimizer is not None
    model.train(train_mode)
    losses, preds_all, targs_all = 0.0, [], []
    for x, y in loader:
        x, y = x.to(DEVICE), y.to(DEVICE)
        if train_mode: optimizer.zero_grad()
        out = model(x)
        loss = criterion(out, y)
        if train_mode:
            loss.backward(); optimizer.step()
        losses += loss.item() * x.size(0)
        preds_all.extend(out.argmax(1).detach().cpu().tolist())
        targs_all.extend(y.detach().cpu().tolist())
    return losses / len(loader.dataset), accuracy_score(targs_all, preds_all)

def fit(model, train_loader, val_loader, epochs=5, lr=1e-3):
    crit = nn.CrossEntropyLoss()
    opt = torch.optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=lr)
    best_acc, best = -1.0, None
    for ep in range(1, epochs+1):
        t0 = time.time()
        tr_loss, tr_acc = run_epoch(model, train_loader, crit, optimizer=opt)
        va_loss, va_acc = run_epoch(model, val_loader,   crit, optimizer=None)
        if va_acc > best_acc:
            best_acc, best = va_acc, {k: v.cpu() for k, v in model.state_dict().items()}
        print(f"E{ep:02d} | train {tr_loss:.4f}/{tr_acc:.3f} | val {va_loss:.4f}/{va_acc:.3f} | {time.time()-t0:.1f}s")
    if best is not None:
        model.load_state_dict({k: v.to(DEVICE) for k, v in best.items()})
    return model


In [19]:
# Directions01
DATASETS = {
    "Directions01": "http://imgcom.jsrt.or.jp/imgcom/wp-content/uploads/2018/11/Directions01.zip",
}

directions_dir = download_if_needed("Directions01", DATASETS["Directions01"])
train_dir_1 = directions_dir / "train"   # expects subfolders: Up, Down, Left, Right
test_dir_1  = directions_dir / "test"

train_ds_1 = datasets.ImageFolder(train_dir_1, transform=cls_tfms)
test_ds_1  = datasets.ImageFolder(test_dir_1,  transform=cls_tfms)
print("Classes (Task 1):", train_ds_1.classes)

num_workers = 2
train_ld_1 = DataLoader(train_ds_1, batch_size=32, shuffle=True,  num_workers=num_workers, pin_memory=True)
test_ld_1  = DataLoader(test_ds_1,  batch_size=64, shuffle=False, num_workers=num_workers, pin_memory=True)

model_1 = build_resnet18(num_classes=4, freeze_backbone=True)
model_1 = fit(model_1, train_ld_1, test_ld_1, epochs=5, lr=1e-3)

test_loss_1, test_acc_1 = run_epoch(model_1, test_ld_1, nn.CrossEntropyLoss(), optimizer=None)
print(f"[Task 1] Test loss: {test_loss_1:.4f} | Test acc: {test_acc_1:.3f}")


[Directions01] ready at /scratch/hdharmen/ASU/CSE 507/data/jsrt/Directions01
Classes (Task 1): ['down', 'left', 'right', 'up']
E01 | train 0.8438/0.744 | val 0.7814/0.750 | 21.8s
E02 | train 0.3535/0.948 | val 0.3101/0.975 | 13.1s
E03 | train 0.2171/0.985 | val 0.1693/0.975 | 10.5s
E04 | train 0.1685/0.981 | val 0.1229/1.000 | 12.6s
E05 | train 0.1270/0.994 | val 0.1141/0.975 | 10.5s
[Task 1] Test loss: 0.1229 | Test acc: 1.000


In [20]:
# Gender01
DATASETS["Gender01"] = "http://imgcom.jsrt.or.jp/imgcom/wp-content/uploads/2018/11/Gender01.zip"

gender_dir = download_if_needed("Gender01", DATASETS["Gender01"])
train_dir_2 = gender_dir / "train"   # subfolders: female, male
test_dir_2  = gender_dir / "test"

train_ds_2 = datasets.ImageFolder(train_dir_2, transform=cls_tfms)
test_ds_2  = datasets.ImageFolder(test_dir_2,  transform=cls_tfms)
print("Classes (Task 2):", train_ds_2.classes)

train_ld_2 = DataLoader(train_ds_2, batch_size=16, shuffle=True,  num_workers=num_workers, pin_memory=True)
test_ld_2  = DataLoader(test_ds_2,  batch_size=32, shuffle=False, num_workers=num_workers, pin_memory=True)

model_2 = build_resnet18(num_classes=2, freeze_backbone=True)
model_2 = fit(model_2, train_ld_2, test_ld_2, epochs=6, lr=1e-3)

test_loss_2, test_acc_2 = run_epoch(model_2, test_ld_2, nn.CrossEntropyLoss(), optimizer=None)
print(f"[Task 2] Test loss: {test_loss_2:.4f} | Test acc: {test_acc_2:.3f}")


[Gender01] ready at /scratch/hdharmen/ASU/CSE 507/data/jsrt/Gender01
Classes (Task 2): ['female', 'male']
E01 | train 0.6466/0.610 | val 0.6249/0.667 | 7.2s
E02 | train 0.5214/0.831 | val 0.7319/0.473 | 1.0s
E03 | train 0.4514/0.805 | val 0.5267/0.806 | 0.9s
E04 | train 0.3908/0.896 | val 0.5954/0.634 | 1.1s
E05 | train 0.3824/0.870 | val 0.4635/0.882 | 0.9s
E06 | train 0.2969/0.955 | val 0.4784/0.806 | 0.8s
[Task 2] Test loss: 0.4635 | Test acc: 0.882


In [None]:
# --- Task 3: Age estimation (XPAge01_RGB) ---
# Dataset spec: 247 JPEGs (2048×2048) + CSV with ages.  (miniJSRT XPAge01_RGB)

age_dir = download_if_needed(
    "XPAge01_RGB",
    "http://imgcom.jsrt.or.jp/imgcom/wp-content/uploads/2019/07/XPAge01_RGB.zip"
)
csv_path = find_first_csv(age_dir)
df = pd.read_csv(csv_path)

# Try to auto-detect column names
fname_col = next((c for c in df.columns if c.lower().startswith("file")), df.columns[0])
age_col   = next((c for c in df.columns if "age" in c.lower()), df.columns[-1])

def resolve_img(root: Path, fname: str) -> Path:
    p = root / fname
    if p.exists(): return p
    hits = list(root.rglob(fname))
    if hits: return hits[0]
    raise FileNotFoundError(f"Image {fname} not found under {root}")

IMAGENET = ResNet18_Weights.DEFAULT
age_tfms  = build_img_tfms(size=224, grayscale=True)

class AgeDataset(Dataset):
    def __init__(self, table, img_root: Path, fname_col: str, age_col: str, transform=None):
        self.table = table.reset_index(drop=True)
        self.img_root = img_root
        self.fname_col = fname_col
        self.age_col = age_col
        self.transform = transform
    def __len__(self): return len(self.table)
    def __getitem__(self, i):
        row = self.table.iloc[i]
        img = Image.open(resolve_img(self.img_root, str(row[self.fname_col]))).convert("RGB")
        if self.transform: img = self.transform(img)
        age = torch.tensor([float(row[self.age_col])], dtype=torch.float32)
        return img, age

train_df, test_df = train_test_split(df, test_size=0.2, random_state=SEED)
ds_tr = AgeDataset(train_df, age_dir, fname_col, age_col, transform=age_tfms)
ds_te = AgeDataset(test_df,  age_dir, fname_col, age_col, transform=age_tfms)
ld_tr = DataLoader(ds_tr, batch_size=8, shuffle=True,  num_workers=2, pin_memory=True)
ld_te = DataLoader(ds_te, batch_size=8, shuffle=False, num_workers=2, pin_memory=True)

model_age = resnet18(weights=IMAGENET)
for p in model_age.parameters(): p.requires_grad = False   # start by freezing backbone
model_age.fc = nn.Linear(model_age.fc.in_features, 1)
model_age = model_age.to(DEVICE)

fit_regression(model_age, ld_tr, ld_te, epochs=5, lr=1e-4)


NameError: name 'age_dir' is not defined

In [None]:
# --- Task 4: Segmentation01 (lung vs background) ---
# Spec: 256×256 PNGs; train=50, test=10; masks: lung=255, background=0.

seg1_dir = download_if_needed(
    "Segmentation01",
    "http://imgcom.jsrt.or.jp/imgcom/wp-content/uploads/2018/11/Segmentation01.zip"
)
org_train = seg1_dir / "org_train"
lbl_train = seg1_dir / "label_train"
org_test  = seg1_dir / "org_test"
lbl_test  = seg1_dir / "label_test"

img_tfms_seg = build_img_tfms(size=256, grayscale=True)

class LungSegDataset(Dataset):
    def __init__(self, img_dir, mask_dir, transform=None):
        self.imgs  = sorted(list(img_dir.glob("*.*")))
        self.masks = sorted(list(mask_dir.glob("*.*")))
        assert len(self.imgs)==len(self.masks), "Image/mask count mismatch"
        self.transform = transform
    def __len__(self): return len(self.imgs)
    def __getitem__(self, i):
        img  = Image.open(self.imgs[i]).convert("RGB")
        m    = Image.open(self.masks[i]).convert("L")
        img  = self.transform(img) if self.transform else transforms.ToTensor()(img)
        mask = torch.from_numpy((np.array(m)>0).astype(np.int64))   # -> {0,1}
        return img, mask

ds_tr4 = LungSegDataset(org_train, lbl_train, img_tfms_seg)
ds_te4 = LungSegDataset(org_test,  lbl_test,  img_tfms_seg)
ld_tr4 = DataLoader(ds_tr4, batch_size=4, shuffle=True,  num_workers=2, pin_memory=True)
ld_te4 = DataLoader(ds_te4, batch_size=4, shuffle=False, num_workers=2, pin_memory=True)

model_seg_bin = fcn_resnet50(weights=FCN_ResNet50_Weights.DEFAULT, num_classes=2).to(DEVICE)
fit_segmentation(model_seg_bin, ld_tr4, epochs=3, lr=1e-4)

miou4 = mean_iou(model_seg_bin, ld_te4, num_classes=2)
print(f"[Task 4] mean IoU: {miou4:.3f}")


[Segmentation01] Downloading -> /scratch/hdharmen/ASU/CSE 507/data/jsrt/Segmentation01.zip
[Segmentation01] Extracting...
[Segmentation01] Ready at: /scratch/hdharmen/ASU/CSE 507/data/jsrt/Segmentation01


NameError: name 'age_tfms' is not defined

In [None]:
# --- Task 5: Segmentation02 (multi-class organs) ---
# Spec: 256×256 BMP inputs; label PNG with values {0 (outside body), 170 (chest area), 255 (lungs), 85 (heart)}.
# Map to contiguous class IDs {0,1,2,3} for CrossEntropyLoss.

seg2_dir = download_if_needed(
    "Segmentation02",
    "http://imgcom.jsrt.or.jp/imgcom/wp-content/uploads/2019/07/segmentation02.zip"
)
org_train2 = seg2_dir / "org_train"
lbl_train2 = seg2_dir / "label_train"
org_test2  = seg2_dir / "org_test"
lbl_test2  = seg2_dir / "label_test"

VAL2IDX = {0:0, 170:1, 255:2, 85:3}

img_tfms_seg2 = build_img_tfms(size=256, grayscale=True)

def remap_mask(arr: np.ndarray, mapping=VAL2IDX) -> np.ndarray:
    out = np.zeros_like(arr, dtype=np.int64)
    for v,i in mapping.items():
        out[arr==v] = i
    return out

class OrganSegDataset(Dataset):
    def __init__(self, img_dir, mask_dir, transform=None):
        self.imgs  = sorted(list(img_dir.glob("*.*")))   # BMP inputs
        self.masks = sorted(list(mask_dir.glob("*.*")))  # PNG labels
        assert len(self.imgs)==len(self.masks), "Image/mask count mismatch"
        self.transform = transform
    def __len__(self): return len(self.imgs)
    def __getitem__(self, i):
        img  = Image.open(self.imgs[i]).convert("RGB")
        m    = Image.open(self.masks[i]).convert("L")
        img  = self.transform(img) if self.transform else transforms.ToTensor()(img)
        mask = torch.from_numpy(remap_mask(np.array(m, dtype=np.int64)))  # -> {0..3}
        return img, mask

ds_tr5 = OrganSegDataset(org_train2, lbl_train2, img_tfms_seg2)
ds_te5 = OrganSegDataset(org_test2,  lbl_test2,  img_tfms_seg2)
ld_tr5 = DataLoader(ds_tr5, batch_size=2, shuffle=True,  num_workers=2, pin_memory=True)
ld_te5 = DataLoader(ds_te5, batch_size=2, shuffle=False, num_workers=2, pin_memory=True)

model_seg_multi = fcn_resnet50(weights=FCN_ResNet50_Weights.DEFAULT, num_classes=4).to(DEVICE)
fit_segmentation(model_seg_multi, ld_tr5, epochs=3, lr=1e-4)

miou5 = mean_iou(model_seg_multi, ld_te5, num_classes=4)
print(f"[Task 5] mean IoU: {miou5:.3f}")
