In [4]:
# STAGE 1: First Model Training
# Cell 1: Imports & Config
from pathlib import Path
import math, cv2, warnings
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import albumentations as A
from albumentations.pytorch import ToTensorV2
import timm
from sklearn.model_selection import train_test_split
import os
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
warnings.filterwarnings("ignore")
warnings.filterwarnings("ignore", message="Unexpected keys .*")

# Paths
PROJECT_ROOT = Path(os.getcwd()).parent  

GT99_IMG_DIR = PROJECT_ROOT / "data" / "annotation_batch"
GT99_CSV = GT99_IMG_DIR / "keypoints_normalized_FIXED.csv"

# Hyperparams
IMG_SIZE = 512
HEATMAP_SIZE = 64
SIGMA = 4
BATCH_SIZE = 4
ACCUM_STEPS = 2     # gradient accumulation
LR = 1e-4
EPOCHS_STAGE1 = 80  # supervised
PATIENCE = 15

# Device priority: MPS > CUDA > CPU
if torch.backends.mps.is_available():
    device = torch.device("mps")
elif torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

torch.set_num_threads(4)  # limit CPU threads
LOG_PATH = PROJECT_ROOT / "training_log.csv"


In [None]:
# Cell 2: Utilities
def norberg_angle(p1, p2, p3):
    a, b, c = np.array(p1), np.array(p2), np.array(p3)
    AB, AC = b-a, c-a
    dot = np.dot(AB, AC)
    norms = np.linalg.norm(AB) * np.linalg.norm(AC)
    return 0 if norms == 0 else math.degrees(math.acos(np.clip(dot / norms, -1.0, 1.0)))

def generate_heatmaps(keypoints, heatmap_size=HEATMAP_SIZE, img_size=IMG_SIZE, sigma=SIGMA):
    num_kps = len(keypoints)
    heatmaps = np.zeros((num_kps, heatmap_size, heatmap_size), dtype=np.float32)
    for i, (x, y) in enumerate(keypoints):
        x_h = x * (heatmap_size / img_size)
        y_h = y * (heatmap_size / img_size)
        xx, yy = np.meshgrid(np.arange(heatmap_size), np.arange(heatmap_size))
        heatmaps[i] = np.exp(-((xx - x_h) ** 2 + (yy - y_h) ** 2) / (2 * sigma ** 2))
    return heatmaps

def soft_argmax_2d(heatmaps, output_size=IMG_SIZE, heatmap_size=HEATMAP_SIZE):
    N, K, H, W = heatmaps.shape
    flat = heatmaps.view(N, K, -1)
    flat = torch.softmax(flat, dim=-1)
    coords_x = torch.arange(W).repeat(H, 1).reshape(-1).float().to(flat.device)
    coords_y = torch.arange(H).repeat_interleave(W).float().to(flat.device)
    xs = torch.sum(flat * coords_x, dim=-1)
    ys = torch.sum(flat * coords_y, dim=-1)
    scale_x = output_size / float(heatmap_size)
    scale_y = output_size / float(heatmap_size)
    coords = torch.stack([xs * scale_x, ys * scale_y], dim=-1)
    return coords.view(N, K, 2)


In [None]:
# Cell 3: Dataset
class KeypointDataset(Dataset):
    def __init__(self, df, img_dir, transforms=None):
        self.df = df.reset_index(drop=True)
        self.img_dir = img_dir
        self.transforms = transforms

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image = cv2.cvtColor(cv2.imread(str(self.img_dir / row['image_name'])), cv2.COLOR_BGR2RGB)
        
        keypoints = np.array([
            (row['L_FHC_x'] * IMG_SIZE, row['L_FHC_y'] * IMG_SIZE),
            (row['R_FHC_x'] * IMG_SIZE, row['R_FHC_y'] * IMG_SIZE),
            (row['L_CAR_x'] * IMG_SIZE, row['L_CAR_y'] * IMG_SIZE),
            (row['R_CAR_x'] * IMG_SIZE, row['R_CAR_y'] * IMG_SIZE),
        ], dtype=np.float32)

        if self.transforms:
            # Albumentations will now receive and process a NumPy array
            augmented = self.transforms(image=image, keypoints=keypoints)
            image, keypoints = augmented['image'], augmented['keypoints']
        
        # Keypoints are already a NumPy array, just generate heatmaps
        heatmaps = generate_heatmaps(keypoints.tolist()) # generate_heatmaps needs a list
        return image, torch.tensor(heatmaps), np.array(keypoints, dtype=np.float32), row['image_name']

def custom_collate(batch):
    imgs, hmaps, kps, names = zip(*batch)
    return torch.stack(imgs), torch.stack(hmaps), list(kps), list(names)



In [None]:
# Cell 4: Augmentations
class HorizontalFlipWithKeypointSwap(A.DualTransform):
    def __init__(self, always_apply: bool = False, p: float = 0.5):
        super().__init__(always_apply=always_apply, p=p)

    def apply(self, image, **params):
        return cv2.flip(image, 1)

    def apply_to_keypoints(self, keypoints, **params):
        # Flip x-coordinate
        keypoints[:, 0] = IMG_SIZE - keypoints[:, 0]
        
        # Swap L-R pairs using NumPy's advanced indexing
        # Swap indices 0 and 1 (L_FHC <-> R_FHC)
        keypoints[[0, 1]] = keypoints[[1, 0]]
        # Swap indices 2 and 3 (L_CAR <-> R_CAR)
        keypoints[[2, 3]] = keypoints[[3, 2]]
        
        return keypoints

    
    def get_transform_init_args_names(self):
        return ()

# define transforms
train_tf_no_flip = A.Compose([
    A.ShiftScaleRotate(shift_limit=0.02, scale_limit=0.05, rotate_limit=7, 
                       border_mode=cv2.BORDER_CONSTANT, p=0.9),
    A.RandomBrightnessContrast(p=0.5),
    A.GaussianBlur(blur_limit=3, p=0.2),
    A.Resize(IMG_SIZE, IMG_SIZE),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
], keypoint_params=A.KeypointParams(format='xy'))

train_tf_flip = A.Compose([
    HorizontalFlipWithKeypointSwap(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.02, scale_limit=0.05, rotate_limit=7,
                       border_mode=cv2.BORDER_CONSTANT, p=0.9),
    A.RandomBrightnessContrast(p=0.5),
    A.GaussianBlur(blur_limit=3, p=0.2),
    A.Resize(IMG_SIZE, IMG_SIZE),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
], keypoint_params=A.KeypointParams(format='xy'))

val_tf = A.Compose([
    A.Resize(IMG_SIZE, IMG_SIZE),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
], keypoint_params=A.KeypointParams(format='xy'))


  validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)


In [None]:
# Cell 5: Multi-task Model
class HipNet(nn.Module):
    def __init__(self, num_keypoints=4):
        super().__init__()
        self.backbone = timm.create_model("mobilenetv3_large_100", pretrained=True, features_only=True)
        in_ch = self.backbone.feature_info[-1]['num_chs']
        # Keypoint heatmap head
        self.kp_head = nn.Sequential(
            nn.Conv2d(in_ch, 128, 3, padding=1), nn.ReLU(),
            nn.Conv2d(128, num_keypoints, 1)
        )
        # Angle regression head
        self.angle_head = nn.Sequential(
            nn.AdaptiveAvgPool2d(1), nn.Flatten(),
            nn.Linear(in_ch, 2)  # left & right Norberg angles in deg
        )
        # CHD classification head
        self.cls_head = nn.Sequential(
            nn.AdaptiveAvgPool2d(1), nn.Flatten(),
            nn.Linear(in_ch, 3)  # Normal, Borderline, CHD
        )
    def forward(self, x):
        feats = self.backbone(x)[-1]
        hmap = self.kp_head(torch.nn.functional.interpolate(feats, size=(HEATMAP_SIZE, HEATMAP_SIZE)))
        angles = self.angle_head(feats)
        cls_logits = self.cls_head(feats)
        return hmap, angles, cls_logits


In [None]:
# Cell 6: Losses
weights = torch.tensor([2.0, 2.0, 1.0, 1.0])
def weighted_mse_loss(pred, target):
    return (((pred - target) ** 2) * weights.view(1,-1,1,1).to(pred.device)).mean()

cls_loss_fn = nn.CrossEntropyLoss()
angle_loss_fn = nn.L1Loss()
coord_loss_fn = nn.MSELoss()

In [None]:
# Cell 7: Stage 1: Supervised Pretrain
df_gt99 = pd.read_csv(GT99_CSV)
train_df, val_df = train_test_split(df_gt99, test_size=0.2, random_state=42)

train_loader = DataLoader(KeypointDataset(train_df, GT99_IMG_DIR, train_tf_no_flip),
                          batch_size=BATCH_SIZE, shuffle=True, collate_fn=custom_collate)
val_loader = DataLoader(KeypointDataset(val_df, GT99_IMG_DIR, val_tf),
                        batch_size=BATCH_SIZE, collate_fn=custom_collate)

model = HipNet().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
best_rmse = float('inf')
patience = 0

for epoch in range(EPOCHS_STAGE1):
    model.train()
    if epoch == 20:  # enable flips
        train_loader = DataLoader(KeypointDataset(train_df, GT99_IMG_DIR, train_tf_flip),
                                  batch_size=BATCH_SIZE, shuffle=True, collate_fn=custom_collate)
    train_loss = 0
    optimizer.zero_grad()
    for step, (imgs, hmaps, gt_coords, names) in enumerate(train_loader):
        imgs, hmaps = imgs.to(device), hmaps.to(device)
        hmap_pred, angle_pred, cls_logits = model(imgs)
        coords_pred = soft_argmax_2d(hmap_pred)
        gt_coords_t = torch.tensor(np.stack(gt_coords), dtype=torch.float32).to(device)
        loss_kp = weighted_mse_loss(hmap_pred, hmaps)
        loss_coord = coord_loss_fn(coords_pred, gt_coords_t)
        loss = loss_kp + 0.05*loss_coord
        loss.backward()
        if (step+1) % ACCUM_STEPS == 0:
            optimizer.step()
            optimizer.zero_grad()
        train_loss += loss.item()
    # Validate
    model.eval()
    val_rmse_list = []
    with torch.no_grad():
        for imgs, _, gt_coords, _ in val_loader:
            imgs = imgs.to(device)
            hmap_pred, _, _ = model(imgs)
            coords_pred = soft_argmax_2d(hmap_pred).cpu().numpy()
            for b in range(len(gt_coords)):
                rmse = np.sqrt(((coords_pred[b] - gt_coords[b]) ** 2).sum(axis=1)).mean()
                val_rmse_list.append(rmse)
    avg_rmse = np.mean(val_rmse_list)
    print(f"[Stage1][Epoch {epoch+1}] TrainLoss {train_loss/len(train_loader):.4f} ValRMSE {avg_rmse:.2f}")
    if avg_rmse < best_rmse:
        best_rmse = avg_rmse
        torch.save(model.state_dict(), PROJECT_ROOT / "outputs" / "model" / "stage1.pth")
        patience = 0
    else:
        patience += 1
        if patience >= PATIENCE: break


Unexpected keys (classifier.bias, classifier.weight, conv_head.bias, conv_head.weight) found while loading pretrained weights. This may be expected if model is being adapted.


[Stage1][Epoch 1] TrainLoss 202.1872 ValRMSE 75.76
[Stage1][Epoch 2] TrainLoss 113.3235 ValRMSE 62.84
[Stage1][Epoch 3] TrainLoss 63.0418 ValRMSE 43.15
[Stage1][Epoch 4] TrainLoss 37.4805 ValRMSE 32.28
[Stage1][Epoch 5] TrainLoss 25.0986 ValRMSE 25.38
[Stage1][Epoch 6] TrainLoss 18.6609 ValRMSE 22.33
[Stage1][Epoch 7] TrainLoss 13.4232 ValRMSE 18.96
[Stage1][Epoch 8] TrainLoss 9.8789 ValRMSE 16.87
[Stage1][Epoch 9] TrainLoss 10.2398 ValRMSE 18.48
[Stage1][Epoch 10] TrainLoss 7.2755 ValRMSE 16.21
[Stage1][Epoch 11] TrainLoss 7.7128 ValRMSE 12.90
[Stage1][Epoch 12] TrainLoss 6.1988 ValRMSE 14.62
[Stage1][Epoch 13] TrainLoss 5.5694 ValRMSE 16.26
[Stage1][Epoch 14] TrainLoss 6.0174 ValRMSE 14.47
[Stage1][Epoch 15] TrainLoss 5.1632 ValRMSE 14.13
[Stage1][Epoch 16] TrainLoss 4.6335 ValRMSE 17.94
[Stage1][Epoch 17] TrainLoss 4.4493 ValRMSE 12.50
[Stage1][Epoch 18] TrainLoss 4.3447 ValRMSE 12.72
[Stage1][Epoch 19] TrainLoss 3.6946 ValRMSE 14.99
[Stage1][Epoch 20] TrainLoss 4.1506 ValRMSE 12.70

In [None]:
#CELL 7-1 saving stage 1 validation split
# Read the same annotations used in Stage 1
df = pd.read_csv(GT99_CSV)

# Match Stage 1's split settings
train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)

# Paths to save
train_path = PROJECT_ROOT / "outputs" / "csv" / "train_split_gt99.csv"
val_path = PROJECT_ROOT / "outputs" / "csv" / "val_split_gt99.csv"

train_df.to_csv(train_path, index=False)
val_df.to_csv(val_path, index=False)

print(f"Saved fixed Stage 1 train split at {train_path}")
print(f"Saved fixed Stage 1 validation split at {val_path}")


Saved fixed Stage 1 train split at /Users/aryan078/Desktop/CHD_project/train_split_gt99.csv
Saved fixed Stage 1 validation split at /Users/aryan078/Desktop/CHD_project/val_split_gt99.csv


In [None]:
# Cell 8: Visualization of Stage 1 Validation Predictions
import matplotlib.pyplot as plt

# Load fixed validation split
val_df = pd.read_csv(PROJECT_ROOT / "coutputs" / "csv" "val_split_gt99.csv")

# DataLoader for validation images
val_dataset = KeypointDataset(val_df, GT99_IMG_DIR, val_tf)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False, collate_fn=custom_collate)

# Reload the model and best weights
model = HipNet().to(device)
model.load_state_dict(torch.load(PROJECT_ROOT / "stage1.pth", map_location=device))
model.eval()

# Directory for saving visualizations
VAL_VIS_DIR = PROJECT_ROOT / "outputs" / "viisualizations" / "Stage1_final_vis"
VAL_VIS_DIR.mkdir(exist_ok=True)

# Loop over validation images
with torch.no_grad():
    for imgs, _, gt_coords, names in val_loader:
        imgs = imgs.to(device)
        
        # Predict heatmaps & extract coords
        hmap_pred, _, _ = model(imgs)
        coords_pred = soft_argmax_2d(hmap_pred).cpu().numpy()[0]
        coords_gt = np.array(gt_coords[0])
        
        # Convert tensor image back to NumPy for plotting
        img_np = imgs[0].cpu().permute(1, 2, 0).numpy()
        img_np = (img_np - img_np.min()) / (img_np.max() - img_np.min())  # normalize to [0,1]
        
        # Plot image with GT and predicted points
        plt.figure(figsize=(6, 6))
        plt.imshow(img_np)
        plt.scatter(coords_gt[:,0], coords_gt[:,1], c='g', marker='o', s=40, label='GT')
        plt.scatter(coords_pred[:,0], coords_pred[:,1], c='r', marker='x', s=40, label='Pred')
        plt.title(names[0])
        plt.legend()
        plt.tight_layout()
        
        # Save each visualization
        out_path = VAL_VIS_DIR / f"{names[0]}_pred_vs_gt.png"
        plt.savefig(out_path)
        plt.close()

print(f"Saved validation visualizations to: {VAL_VIS_DIR}")


Unexpected keys (classifier.bias, classifier.weight, conv_head.bias, conv_head.weight) found while loading pretrained weights. This may be expected if model is being adapted.


Saved validation visualizations to: /Users/aryan078/Desktop/CHD_project/val_predictions_vis


In [None]:
#STAGE 2: Fine Tuning
# Paths
GT99_IMG_DIR = PROJECT_ROOT / "data/annotation_batch"
GT99_CSV = GT99_IMG_DIR / "keypoints_normalized_FIXED.csv"
VAL_SPLIT_GT99_CSV = PROJECT_ROOT / "val_split_gt99.csv"
U110_IMG_DIR = PROJECT_ROOT / "data/u110"
U110_CSV = U110_IMG_DIR / "keypoints_normalized_from_annotations5.csv"

STAGE1_MODEL_PATH = PROJECT_ROOT / "outputs" / "model" / "stage1.pth"
STAGE2_MODEL_PATH = PROJECT_ROOT / "outputs" / "model" / "stage2_finetuned.pth"


# Hyperparameters
IMG_SIZE = 512
HEATMAP_SIZE = 128
SIGMA = 8
BATCH_SIZE = 4
ACCUM_STEPS = 2
LR = 1e-4
EPOCHS_STAGE2 = 80
PATIENCE = 15
WARMUP_EPOCHS = 5

# Device
if torch.backends.mps.is_available():
    device = torch.device("mps")
elif torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
print(f"Using device: {device}")

torch.set_num_threads(4)


Using device: cpu


In [None]:
# Load datasets
df_gt99 = pd.read_csv(GT99_CSV)
df_u110 = pd.read_csv(U110_CSV)

# Add absolute image paths
df_gt99['image_path'] = df_gt99['image_name'].apply(lambda n: str(GT99_IMG_DIR / n))
df_u110['image_path'] = df_u110['image_name'].apply(lambda n: str(U110_IMG_DIR / n))

# Validation splits
val_gt99_df = pd.read_csv(VAL_SPLIT_GT99_CSV)
val_gt99_names = set(val_gt99_df['image_name'])

train_u110_df, val_u110_df = train_test_split(df_u110, test_size=0.2, random_state=42)
val_u110_names = set(val_u110_df['image_name'])

final_val_names = val_gt99_names.union(val_u110_names)
df_combined = pd.concat([df_gt99, df_u110], ignore_index=True)
final_train_df = df_combined[~df_combined['image_name'].isin(final_val_names)].copy()
final_val_df = df_combined[df_combined['image_name'].isin(final_val_names)].copy()

# Save splits
final_train_df.to_csv(PROJECT_ROOT / "outputs" / "csv" / "train_split_combined.csv", index=False)
final_val_df.to_csv(PROJECT_ROOT / "outputs" / "csv" / "val_split_combined.csv", index=False)

print(f"Total: {len(df_combined)} | Train: {len(final_train_df)} | Val: {len(final_val_df)}")


Total: 215 | Train: 171 | Val: 44


In [67]:
def generate_heatmaps(keypoints, heatmap_size=HEATMAP_SIZE, img_size=IMG_SIZE, sigma=SIGMA):
    num_kps = len(keypoints)
    heatmaps = np.zeros((num_kps, heatmap_size, heatmap_size), dtype=np.float32)
    for i, (x, y) in enumerate(keypoints):
        x_h = x * (heatmap_size / img_size)
        y_h = y * (heatmap_size / img_size)
        xx, yy = np.meshgrid(np.arange(heatmap_size), np.arange(heatmap_size))
        heatmaps[i] = np.exp(-((xx - x_h) ** 2 + (yy - y_h) ** 2) / (2 * sigma ** 2))
    return heatmaps

def soft_argmax_2d(heatmaps, output_size=IMG_SIZE, heatmap_size=HEATMAP_SIZE):
    N, K, H, W = heatmaps.shape
    flat = heatmaps.view(N, K, -1)
    flat = torch.softmax(flat, dim=-1)
    coords_x = torch.arange(W).repeat(H, 1).reshape(-1).float().to(flat.device)
    coords_y = torch.arange(H).repeat_interleave(W).float().to(flat.device)
    xs = torch.sum(flat * coords_x, dim=-1)
    ys = torch.sum(flat * coords_y, dim=-1)
    scale = output_size / float(heatmap_size)
    coords = torch.stack([xs * scale, ys * scale], dim=-1)
    return coords.view(N, K, 2)

class KeypointDataset(Dataset):
    def __init__(self, df, transforms=None):
        self.df = df.reset_index(drop=True)
        self.transforms = transforms
    def __len__(self): return len(self.df)
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image = cv2.cvtColor(cv2.imread(row['image_path']), cv2.COLOR_BGR2RGB)
        keypoints = np.array([
            (row['L_FHC_x'] * IMG_SIZE, row['L_FHC_y'] * IMG_SIZE),
            (row['R_FHC_x'] * IMG_SIZE, row['R_FHC_y'] * IMG_SIZE),
            (row['L_CAR_x'] * IMG_SIZE, row['L_CAR_y'] * IMG_SIZE),
            (row['R_CAR_x'] * IMG_SIZE, row['R_CAR_y'] * IMG_SIZE),
        ], dtype=np.float32)
        if self.transforms:
            aug = self.transforms(image=image, keypoints=keypoints)
            image, keypoints = aug['image'], aug['keypoints']
        heatmaps = generate_heatmaps(keypoints.tolist())
        return image, torch.tensor(heatmaps), np.array(keypoints, dtype=np.float32), row['image_name']

def custom_collate(batch):
    imgs, hmaps, kps, names = zip(*batch)
    return torch.stack(imgs), torch.stack(hmaps), list(kps), list(names)

class HorizontalFlipWithKeypointSwap(A.DualTransform):
    def apply(self, image, **params): return cv2.flip(image, 1)
    def apply_to_keypoints(self, keypoints, **params):
        keypoints[:, 0] = IMG_SIZE - keypoints[:, 0]
        keypoints[[0, 1]] = keypoints[[1, 0]]
        keypoints[[2, 3]] = keypoints[[3, 2]]
        return keypoints
    def get_transform_init_args_names(self): return ()


In [68]:
train_tf_flip = A.Compose([
    HorizontalFlipWithKeypointSwap(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.02, scale_limit=0.05, rotate_limit=7, border_mode=cv2.BORDER_CONSTANT, p=0.9),
    A.RandomBrightnessContrast(p=0.5),
    A.GaussianBlur(blur_limit=3, p=0.2),
    A.Resize(IMG_SIZE, IMG_SIZE),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
], keypoint_params=A.KeypointParams(format='xy'))

val_tf = A.Compose([
    A.Resize(IMG_SIZE, IMG_SIZE),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
], keypoint_params=A.KeypointParams(format='xy'))


In [None]:
class HipNet(nn.Module):
    def __init__(self, num_keypoints=4):
        super().__init__()
        self.backbone = timm.create_model("mobilenetv3_large_100", pretrained=True, features_only=True)
        in_ch = self.backbone.feature_info[-1]['num_chs']
        self.kp_head = nn.Sequential(nn.Conv2d(in_ch, 128, 3, padding=1), nn.ReLU(),
                                     nn.Conv2d(128, num_keypoints, 1))
    def forward(self, x):
        feats = self.backbone(x)[-1]
        hmap = self.kp_head(torch.nn.functional.interpolate(feats, size=(HEATMAP_SIZE, HEATMAP_SIZE)))
        return hmap, None, None

weights = torch.tensor([2.0, 2.0, 1.0, 1.0])
def weighted_mse_loss(pred, target):
    return (((pred - target) ** 2) * weights.view(1,-1,1,1).to(pred.device)).mean()

coord_loss_fn = nn.MSELoss()


In [None]:
# Dataloaders
train_loader = DataLoader(KeypointDataset(final_train_df, train_tf_flip),
                          batch_size=BATCH_SIZE, shuffle=True, collate_fn=custom_collate,
                          num_workers=0, pin_memory=True)
val_loader = DataLoader(KeypointDataset(final_val_df, val_tf),
                        batch_size=BATCH_SIZE, collate_fn=custom_collate,
                        num_workers=0, pin_memory=True)

# Model init + load Stage 1
model = HipNet().to(device)
state_dict = torch.load(STAGE1_MODEL_PATH, map_location=device)
filtered_state_dict = {k: v for k, v in state_dict.items() if k in model.state_dict()}
model.load_state_dict(filtered_state_dict, strict=False)

print(f"Loaded Stage 1 weights from {STAGE1_MODEL_PATH}")

# Freeze backbone for warm-up
for p in model.backbone.parameters():
    p.requires_grad = False
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=LR)

best_rmse = float('inf')
epochs_without_improvement = 0

for epoch in range(EPOCHS_STAGE2):

    if epoch == WARMUP_EPOCHS:
        print(f"\n[Epoch {epoch+1}] Unfreezing backbone...\n")
        for p in model.backbone.parameters():
            p.requires_grad = True
        optimizer = torch.optim.Adam(model.parameters(), lr=LR)

    # Train
    model.train()
    train_loss = 0
    optimizer.zero_grad()

    for step, (imgs, hmaps, gt_coords, _) in enumerate(train_loader):
        imgs, hmaps = imgs.to(device), hmaps.to(device)
        hmap_pred, _, _ = model(imgs)
        coords_pred = soft_argmax_2d(hmap_pred)
        gt_coords_t = torch.tensor(np.stack(gt_coords), dtype=torch.float32).to(device)
        loss = weighted_mse_loss(hmap_pred, hmaps) + 0.05 * coord_loss_fn(coords_pred, gt_coords_t)
        loss.backward()
        if (step + 1) % ACCUM_STEPS == 0:
            optimizer.step()
            optimizer.zero_grad()
        train_loss += loss.item()

    # Validate
    model.eval()
    val_rmse_list = []
    with torch.no_grad():
        for imgs, _, gt_coords, names in val_loader:
            imgs = imgs.to(device)
            hmap_pred, _, _ = model(imgs)
            coords_pred = soft_argmax_2d(hmap_pred).cpu().numpy()
            for b in range(len(gt_coords)):
                rmse = np.sqrt(((coords_pred[b] - gt_coords[b]) ** 2).sum(axis=1)).mean()
                val_rmse_list.append(rmse)
    avg_rmse = np.mean(val_rmse_list)
    print(f"[Epoch {epoch+1}/{EPOCHS_STAGE2}] TrainLoss: {train_loss/len(train_loader):.4f} | ValRMSE: {avg_rmse:.2f}px")

    if avg_rmse < best_rmse:
        best_rmse = avg_rmse
        torch.save(model.state_dict(), STAGE2_MODEL_PATH)
        print(f"  -> New best model saved! ValRMSE: {best_rmse:.2f}px")
        epochs_without_improvement = 0
    else:
        epochs_without_improvement += 1
        if epochs_without_improvement >= PATIENCE:
            print(f"Early stopping after {PATIENCE} bad epochs")
            break

print(f"Done. Best ValRMSE: {best_rmse:.2f}px | Saved to {STAGE2_MODEL_PATH}")


Unexpected keys (classifier.bias, classifier.weight, conv_head.bias, conv_head.weight) found while loading pretrained weights. This may be expected if model is being adapted.


Loaded Stage 1 weights from /Users/aryan078/Desktop/CHD_project/stage1_best.pth
[Epoch 1/80] TrainLoss: 6.2751 | ValRMSE: 10.10px
  -> New best model saved! ValRMSE: 10.10px
[Epoch 2/80] TrainLoss: 3.6035 | ValRMSE: 10.00px
  -> New best model saved! ValRMSE: 10.00px
[Epoch 3/80] TrainLoss: 3.4837 | ValRMSE: 9.72px
  -> New best model saved! ValRMSE: 9.72px
[Epoch 4/80] TrainLoss: 3.7704 | ValRMSE: 9.77px
[Epoch 5/80] TrainLoss: 5.4999 | ValRMSE: 9.92px

[Epoch 6] Unfreezing backbone...

[Epoch 6/80] TrainLoss: 3.7345 | ValRMSE: 10.12px
[Epoch 7/80] TrainLoss: 2.3545 | ValRMSE: 10.84px
[Epoch 8/80] TrainLoss: 2.4060 | ValRMSE: 9.99px
[Epoch 9/80] TrainLoss: 2.2650 | ValRMSE: 10.18px
[Epoch 10/80] TrainLoss: 2.0522 | ValRMSE: 9.62px
  -> New best model saved! ValRMSE: 9.62px
[Epoch 11/80] TrainLoss: 2.1184 | ValRMSE: 9.06px
  -> New best model saved! ValRMSE: 9.06px
[Epoch 12/80] TrainLoss: 1.8706 | ValRMSE: 8.90px
  -> New best model saved! ValRMSE: 8.90px
[Epoch 13/80] TrainLoss: 1.69

In [None]:
# Paths
GT99_IMG_DIR = PROJECT_ROOT / "data/annotation_batch"
GT99_CSV = GT99_IMG_DIR / "keypoints_normalized_FIXED.csv"
VAL_SPLIT_GT99_CSV = PROJECT_ROOT / "outputs" / "csv" / "val_split_gt99.csv"
U110_IMG_DIR = PROJECT_ROOT / "data/u110"
U110_CSV = U110_IMG_DIR / "keypoints_normalized_from_annotations5.csv"

STAGE2_MODEL_PATH = PROJECT_ROOT / "outputs" / "model" / "stage2_finetuned.pth"
STAGE2_CPU_PATH = PROJECT_ROOT / "outputs" / "model" / "stage2_finetuned_cpu_best.pth"
VAL_VIZ_DIR = PROJECT_ROOT / "outputs" / "visualizations" / "stage2_cpu_val_viz"
VAL_VIZ_DIR.mkdir(exist_ok=True, parents=True)

# Hyperparameters
IMG_SIZE = 512
HEATMAP_SIZE = 256
SIGMA = 8
BATCH_SIZE = 1
ACCUM_STEPS = 4
LR = 1e-4
EPOCHS = 8
WARMUP_EPOCHS = 2
HEAT_W = 1.0
COORD_W = 0.12

device = torch.device("cpu")
torch.set_num_threads(4)

# Utils
def generate_heatmaps(keypoints, heatmap_size=HEATMAP_SIZE, img_size=IMG_SIZE, sigma=SIGMA):
    num_kps = len(keypoints)
    heatmaps = np.zeros((num_kps, heatmap_size, heatmap_size), dtype=np.float32)
    for i, (x, y) in enumerate(keypoints):
        x_h = x * (heatmap_size / img_size)
        y_h = y * (heatmap_size / img_size)
        xx, yy = np.meshgrid(np.arange(heatmap_size), np.arange(heatmap_size))
        heatmaps[i] = np.exp(-((xx - x_h)**2 + (yy - y_h)**2) / (2 * sigma**2))
    return heatmaps

def soft_argmax_2d(heatmaps, output_size=IMG_SIZE, heatmap_size=HEATMAP_SIZE):
    N, K, H, W = heatmaps.shape
    flat = heatmaps.view(N, K, -1)
    flat = torch.softmax(flat, dim=-1)
    coords_x = torch.arange(W).repeat(H, 1).reshape(-1).float().to(flat.device)
    coords_y = torch.arange(H).repeat_interleave(W).float().to(flat.device)
    xs = torch.sum(flat * coords_x, dim=-1)
    ys = torch.sum(flat * coords_y, dim=-1)
    scale = output_size / float(heatmap_size)
    coords = torch.stack([xs * scale, ys * scale], dim=-1)
    return coords.view(N, K, 2)

class KeypointDataset(Dataset):
    def __init__(self, df, transforms=None):
        self.df = df.reset_index(drop=True)
        self.transforms = transforms
    def __len__(self): return len(self.df)
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image = cv2.cvtColor(cv2.imread(row["image_path"]), cv2.COLOR_BGR2RGB)
        keypoints = np.array([
            (row['L_FHC_x'] * IMG_SIZE, row['L_FHC_y'] * IMG_SIZE),
            (row['R_FHC_x'] * IMG_SIZE, row['R_FHC_y'] * IMG_SIZE),
            (row['L_CAR_x'] * IMG_SIZE, row['L_CAR_y'] * IMG_SIZE),
            (row['R_CAR_x'] * IMG_SIZE, row['R_CAR_y'] * IMG_SIZE),
        ], dtype=np.float32)
        if self.transforms:
            aug = self.transforms(image=image, keypoints=keypoints)
            image, keypoints = aug['image'], aug['keypoints']
        heatmaps = generate_heatmaps(keypoints.tolist())
        return image, torch.tensor(heatmaps), np.array(keypoints, dtype=np.float32), row["image_name"]

def custom_collate(batch):
    imgs, hmaps, kps, names = zip(*batch)
    return torch.stack(imgs), torch.stack(hmaps), list(kps), list(names)

class HorizontalFlipWithKeypointSwap(A.DualTransform):
    def apply(self, image, **params): return cv2.flip(image, 1)
    def apply_to_keypoints(self, keypoints, **params):
        keypoints[:, 0] = IMG_SIZE - keypoints[:, 0]
        keypoints[[0, 1]] = keypoints[[1, 0]]
        keypoints[[2, 3]] = keypoints[[3, 2]]
        return keypoints
    def get_transform_init_args_names(self): return ()

train_tf = A.Compose([
    HorizontalFlipWithKeypointSwap(p=0.5),
    A.ShiftScaleRotate(shift_limit=0.02, scale_limit=0.05, rotate_limit=7, border_mode=cv2.BORDER_CONSTANT, p=0.9),
    A.CLAHE(clip_limit=2.0, tile_grid_size=(8,8), p=0.25),
    A.RandomGamma(gamma_limit=(80,120), p=0.25),
    A.Resize(IMG_SIZE, IMG_SIZE),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
], keypoint_params=A.KeypointParams(format='xy'))

val_tf = A.Compose([
    A.Resize(IMG_SIZE, IMG_SIZE),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
], keypoint_params=A.KeypointParams(format='xy'))

class HipNet(nn.Module):
    def __init__(self, num_keypoints=4):
        super().__init__()
        self.backbone = timm.create_model("mobilenetv3_large_100", pretrained=True, features_only=True)
        in_ch = self.backbone.feature_info[-1]['num_chs']
        self.kp_head = nn.Sequential(
            nn.Conv2d(in_ch, 128, 3, padding=1), nn.ReLU(),
            nn.Conv2d(128, num_keypoints, 1)
        )
    def forward(self, x):
        feats = self.backbone(x)[-1]
        hmap = self.kp_head(torch.nn.functional.interpolate(feats, size=(HEATMAP_SIZE, HEATMAP_SIZE)))
        return hmap, None, None

weights = torch.tensor([2.0, 2.0, 1.0, 1.0])
def weighted_mse_loss(pred, target):
    return (((pred - target)**2) * weights.view(1,-1,1,1).to(pred.device)).mean()
coord_loss_fn = nn.MSELoss()

# Prepare Data (same split as Stage 2)
df_gt99 = pd.read_csv(GT99_CSV)
df_u110 = pd.read_csv(U110_CSV)
df_gt99['image_path'] = df_gt99['image_name'].apply(lambda n: str(GT99_IMG_DIR / n))
df_u110['image_path'] = df_u110['image_name'].apply(lambda n: str(U110_IMG_DIR / n))
def filter_missing(df):
    mask = df['image_path'].apply(lambda p: Path(p).exists())
    missing = df[~mask]
    if len(missing) > 0:
        print(f"⚠ {len(missing)} missing files will be dropped:")
        print(missing[['image_name', 'image_path']].head(10))
    return df[mask]

df_gt99 = filter_missing(df_gt99)
df_u110 = filter_missing(df_u110)

val_gt99_names = set(pd.read_csv(VAL_SPLIT_GT99_CSV)['image_name'])
train_u110_df, val_u110_df = train_test_split(df_u110, test_size=0.2, random_state=42)
val_u110_names = set(val_u110_df['image_name'])

final_val_names = val_gt99_names.union(val_u110_names)
df_combined = pd.concat([df_gt99, df_u110], ignore_index=True)
final_train_df = df_combined[~df_combined['image_name'].isin(final_val_names)]
final_val_df = df_combined[df_combined['image_name'].isin(final_val_names)]

train_loader = DataLoader(KeypointDataset(final_train_df, train_tf), batch_size=BATCH_SIZE,
                          shuffle=True, collate_fn=custom_collate)
val_loader = DataLoader(KeypointDataset(final_val_df, val_tf), batch_size=1,
                        collate_fn=custom_collate)

# Load Model from Stage2
model = HipNet().to(device)
model.load_state_dict(torch.load(STAGE2_MODEL_PATH, map_location=device))

# Freeze backbone for warmup
for p in model.backbone.parameters(): p.requires_grad = False
optimizer = torch.optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()),
                               lr=LR, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min',
                                                       factor=0.5, patience=2, min_lr=1e-6)
best_rmse = float('inf')

# Training Loop
for epoch in range(EPOCHS):
    if epoch == WARMUP_EPOCHS:
        for p in model.backbone.parameters(): p.requires_grad = True
        optimizer = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=1e-4)

    # Train
    model.train()
    train_loss = 0
    optimizer.zero_grad()
    for step, (imgs, hmaps, gt_coords, _) in enumerate(train_loader):
        imgs, hmaps = imgs.to(device), hmaps.to(device)
        hmap_pred, _, _ = model(imgs)
        coords_pred = soft_argmax_2d(hmap_pred)
        gt_coords_t = torch.tensor(np.stack(gt_coords), dtype=torch.float32).to(device)
        loss = HEAT_W * weighted_mse_loss(hmap_pred, hmaps) + COORD_W * coord_loss_fn(coords_pred, gt_coords_t)
        loss.backward()
        if (step+1) % ACCUM_STEPS == 0:
            optimizer.step()
            optimizer.zero_grad()
        train_loss += loss.item()

    # Validate with TTA
    model.eval()
    val_rmse_list = []
    with torch.no_grad():
        for imgs, _, gt_coords, names in val_loader:
            # Original
            hmap_pred, _, _ = model(imgs.to(device))
            coords1 = soft_argmax_2d(hmap_pred).cpu().numpy()
            # Flipped
            imgs_flip = torch.flip(imgs, dims=[3])
            hmap_flip, _, _ = model(imgs_flip.to(device))
            hmap_flip = torch.flip(hmap_flip, dims=[3])
            coords2 = soft_argmax_2d(hmap_flip).cpu().numpy()
            coords2[:, [0,1]] = coords2[:, [1,0]]
            coords2[:, [2,3]] = coords2[:, [3,2]]
            coords_avg = (coords1 + coords2) / 2.0
            for b in range(len(gt_coords)):
                rmse = np.sqrt(((coords_avg[b] - gt_coords[b])**2).sum(axis=1)).mean()
                val_rmse_list.append(rmse)

    avg_rmse = np.mean(val_rmse_list)
    print(f"[CPU Finetune][Epoch {epoch+1}/{EPOCHS}] TrainLoss {train_loss/len(train_loader):.4f} | ValRMSE {avg_rmse:.2f}")
    scheduler.step(avg_rmse)

    if avg_rmse < best_rmse:
        best_rmse = avg_rmse
        torch.save(model.state_dict(), STAGE2_CPU_PATH)
        print(f"  -> Saved new best model with RMSE {best_rmse:.2f}")

print(f"Best Val RMSE: {best_rmse:.2f} px, saved at {STAGE2_CPU_PATH}")


⚠ 1 missing files will be dropped:
               image_name                                         image_path
100  Radiograph4_hip0.jpg  /Users/aryan078/Desktop/CHD_project/data/u110/...


Unexpected keys (classifier.bias, classifier.weight, conv_head.bias, conv_head.weight) found while loading pretrained weights. This may be expected if model is being adapted.


[CPU Finetune][Epoch 1/8] TrainLoss 1.7167 | ValRMSE 3.88
  -> Saved new best model with RMSE 3.88
[CPU Finetune][Epoch 2/8] TrainLoss 1.7150 | ValRMSE 4.89
[CPU Finetune][Epoch 3/8] TrainLoss 1.9894 | ValRMSE 4.36
[CPU Finetune][Epoch 4/8] TrainLoss 1.6133 | ValRMSE 4.95
[CPU Finetune][Epoch 5/8] TrainLoss 1.5816 | ValRMSE 3.48
  -> Saved new best model with RMSE 3.48
[CPU Finetune][Epoch 6/8] TrainLoss 1.4329 | ValRMSE 3.51
[CPU Finetune][Epoch 7/8] TrainLoss 1.2541 | ValRMSE 4.83
[CPU Finetune][Epoch 8/8] TrainLoss 1.2707 | ValRMSE 3.21
  -> Saved new best model with RMSE 3.21
Best Val RMSE: 3.21 px, saved at /Users/aryan078/Desktop/CHD_project/stage2_finetuned_cpu_best.pth


In [None]:
# Visualization & Save for Stage2 CPU Best
model = HipNet().to(device)
model.load_state_dict(torch.load(STAGE2_CPU_PATH, map_location=device))
model.eval()

with torch.no_grad():
    for imgs, _, gt_coords, names in val_loader:
        out = model(imgs.to(device))
        if isinstance(out, tuple):
            hmap_pred = out[0]
        else:
            hmap_pred = out

        coords_pred = soft_argmax_2d(hmap_pred).cpu().numpy()[0]

        img_np = imgs[0].cpu().permute(1, 2, 0).numpy()
        img_np = (img_np - img_np.min()) / (img_np.max() - img_np.min())

        plt.figure()
        plt.imshow(img_np)
        plt.scatter(gt_coords[0][:,0], gt_coords[0][:,1], c='g', s=15, label="GT")
        plt.scatter(coords_pred[:,0], coords_pred[:,1], c='r', s=15, label="Pred")
        plt.legend()
        plt.title(names[0])
        plt.axis('off')

        out_path = VAL_VIZ_DIR / f"{names[0]}_pred_vs_gt.png"
        plt.savefig(out_path, dpi=300, bbox_inches='tight')
        plt.close()

print(f"✅ Visualization images saved to: {VAL_VIZ_DIR}")


Unexpected keys (classifier.bias, classifier.weight, conv_head.bias, conv_head.weight) found while loading pretrained weights. This may be expected if model is being adapted.


✅ Visualization images saved to: /Users/aryan078/Desktop/CHD_project/stage2_cpu_val_viz


In [9]:
# STAGE 3: EVALUATION (with extra plots & visualizations)

import math, cv2, torch, timm
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.metrics import confusion_matrix
import albumentations as A
from albumentations.pytorch import ToTensorV2
import torch.nn as nn
import matplotlib.pyplot as plt
import seaborn as sns

# config
IMG_SIZE = 512
HEATMAP_SIZE = 256
SIGMA = 8
ANGLE_THR = 105.0
device = torch.device("cpu")

# paths
MODEL_PATH = PROJECT_ROOT / "outputs" / "model" / "stage2_finetuned_cpu_best.pth"
input_dir = PROJECT_ROOT / "data" / "check"
gt_csv_path = PROJECT_ROOT / "outputs" / "csv" / "val_split_combined.csv" 
out_csv_path = PROJECT_ROOT / "outputs" / "csv" / "val_predictions_with_metrics.csv"
annot_dir = PROJECT_ROOT / "outputs" / "visualizations" / "annotated_preds"
plots_dir = PROJECT_ROOT / "outputs" / "visualizations" / "evaluation_plots"
annot_dir.mkdir(exist_ok=True)
plots_dir.mkdir(exist_ok=True)

# utilities
def compute_norberg_angles(coords):
    def angle(a, b, c):
        AB = a - b
        CB = c - b
        dot = np.dot(AB, CB)
        denom = np.linalg.norm(AB) * np.linalg.norm(CB)
        return math.degrees(math.acos(np.clip(dot / denom, -1, 1))) if denom > 0 else 0
    angL = angle(coords[2], coords[0], coords[1])
    angR = angle(coords[3], coords[1], coords[0])
    return angL, angR

def soft_argmax_2d(hmaps):
    N, K, H, W = hmaps.shape
    flat = hmaps.view(N, K, -1)
    flat = torch.softmax(flat, dim=-1)
    coords_x = torch.arange(W).repeat(H, 1).reshape(-1).float().to(flat.device)
    coords_y = torch.arange(H).repeat_interleave(W).float().to(flat.device)
    xs = torch.sum(flat * coords_x, dim=-1)
    ys = torch.sum(flat * coords_y, dim=-1)
    scale = IMG_SIZE / float(HEATMAP_SIZE)
    coords = torch.stack([xs * scale, ys * scale], dim=-1)
    return coords.view(N, K, 2)

def draw_angle_arc(img, center, p1, p2, angle_val, color=(0,255,0), radius=50, thickness=2):
    def _angle(v1, v2):
        dot = np.dot(v1, v2)
        return math.degrees(math.acos(np.clip(dot / (np.linalg.norm(v1)*np.linalg.norm(v2)), -1, 1)))
    v1 = np.array(p1) - np.array(center)
    v2 = np.array(p2) - np.array(center)
    ang = _angle(v1, v2)
    start_angle = math.degrees(math.atan2(-v1[1], v1[0]))
    end_angle = start_angle + ang
    cv2.ellipse(img, (int(center[0]), int(center[1])), (radius, radius), 0, start_angle, end_angle, color, thickness)
    label_pos = (int(center[0] + radius/2), int(center[1] - radius/2))
    cv2.putText(img, f"{angle_val:.1f}", label_pos, cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA)
    return img

# transforms
val_tf = A.Compose([
    A.Resize(IMG_SIZE, IMG_SIZE),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
], keypoint_params=A.KeypointParams(format='xy'))

# model
class HipNet(nn.Module):
    def __init__(self, num_keypoints=4):
        super().__init__()
        self.backbone = timm.create_model("mobilenetv3_large_100", pretrained=True, features_only=True)
        in_ch = self.backbone.feature_info[-1]['num_chs']
        self.kp_head = nn.Sequential(
            nn.Conv2d(in_ch, 128, 3, padding=1), nn.ReLU(),
            nn.Conv2d(128, num_keypoints, 1)
        )
    def forward(self, x):
        feats = self.backbone(x)[-1]
        hmap = self.kp_head(torch.nn.functional.interpolate(feats, size=(HEATMAP_SIZE, HEATMAP_SIZE)))
        return hmap

model = HipNet().to(device)
model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
model.eval()

# inference
gt_df = pd.read_csv(gt_csv_path)
results = []

for _, row in gt_df.iterrows():
    img_path = Path(input_dir) / row["image_name"]
    img_bgr = cv2.imread(str(img_path))
    if img_bgr is None:
        print(f"⚠ Missing image: {img_path}")
        continue
    img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)

    tf_img = val_tf(image=img_rgb, keypoints=[(0,0)]*4)
    img_tensor = tf_img["image"].unsqueeze(0).to(device)

    with torch.no_grad():
        coords1 = soft_argmax_2d(model(img_tensor)).cpu().numpy()
        img_flip = torch.flip(img_tensor, dims=[3])
        hmap_flip = model(img_flip)
        hmap_flip = torch.flip(hmap_flip, dims=[3])
        coords2 = soft_argmax_2d(hmap_flip).cpu().numpy()
        coords2[:, [0,1]] = coords2[:, [1,0]]
        coords2[:, [2,3]] = coords2[:, [3,2]]
        coords_avg = (coords1 + coords2) / 2.0
        coords_avg = coords_avg[0]

    pred_L, pred_R = compute_norberg_angles(coords_avg)
    pred_class = "Normal" if min(pred_L, pred_R) >= ANGLE_THR else "Dysplasia"

    gt_coords = np.array([
        (row['L_FHC_x']*IMG_SIZE, row['L_FHC_y']*IMG_SIZE),
        (row['R_FHC_x']*IMG_SIZE, row['R_FHC_y']*IMG_SIZE),
        (row['L_CAR_x']*IMG_SIZE, row['L_CAR_y']*IMG_SIZE),
        (row['R_CAR_x']*IMG_SIZE, row['R_CAR_y']*IMG_SIZE),
    ])
    gt_L, gt_R = compute_norberg_angles(gt_coords)
    gt_class = "Normal" if min(gt_L, gt_R) >= ANGLE_THR else "Dysplasia"

    results.append({
        "filename": row["image_name"],
        "GT_angle_L": gt_L, "GT_angle_R": gt_R, "GT_class": gt_class,
        "Pred_angle_L": pred_L, "Pred_angle_R": pred_R, "Pred_class": pred_class
    })

    img_annot = img_rgb.copy()
    # GT arcs
    cv2.line(img_annot, tuple(map(int, gt_coords[0])), tuple(map(int, gt_coords[2])), (0,255,0), 2)
    cv2.line(img_annot, tuple(map(int, gt_coords[0])), tuple(map(int, gt_coords[1])), (0,255,0), 2)
    img_annot = draw_angle_arc(img_annot, gt_coords[0], gt_coords[2], gt_coords[1], gt_L, (0,255,0))
    cv2.line(img_annot, tuple(map(int, gt_coords[1])), tuple(map(int, gt_coords[3])), (0,255,0), 2)
    cv2.line(img_annot, tuple(map(int, gt_coords[1])), tuple(map(int, gt_coords[0])), (0,255,0), 2)
    img_annot = draw_angle_arc(img_annot, gt_coords[1], gt_coords[3], gt_coords[0], gt_R, (0,255,0))
    # Pred arcs
    cv2.line(img_annot, tuple(map(int, coords_avg[0])), tuple(map(int, coords_avg[2])), (0,0,255), 2)
    cv2.line(img_annot, tuple(map(int, coords_avg[0])), tuple(map(int, coords_avg[1])), (0,0,255), 2)
    img_annot = draw_angle_arc(img_annot, coords_avg[0], coords_avg[2], coords_avg[1], pred_L, (0,0,255))
    cv2.line(img_annot, tuple(map(int, coords_avg[1])), tuple(map(int, coords_avg[3])), (0,0,255), 2)
    cv2.line(img_annot, tuple(map(int, coords_avg[1])), tuple(map(int, coords_avg[0])), (0,0,255), 2)
    img_annot = draw_angle_arc(img_annot, coords_avg[1], coords_avg[3], coords_avg[0], pred_R, (0,0,255))
    # Keypoints
    for (x,y) in gt_coords: cv2.circle(img_annot, (int(x),int(y)), 5, (0,255,0), -1)
    for (x,y) in coords_avg: cv2.circle(img_annot, (int(x),int(y)), 5, (0,0,255), -1)
    # Labels
    cv2.putText(img_annot, f"GT: {gt_class}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2)
    cv2.putText(img_annot, f"PR: {pred_class}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,255), 2)

    save_path = annot_dir / f"{row['image_name'].rsplit('.',1)[0]}_annot.png"
    cv2.imwrite(str(save_path), cv2.cvtColor(img_annot, cv2.COLOR_RGB2BGR))

# metrics & plots
res_df = pd.DataFrame(results)
res_df.to_csv(out_csv_path, index=False)
print(f"✅ Results saved to: {out_csv_path}")
print(f"✅ Annotated images saved to: {annot_dir}")

label_map = {"Normal": 0, "Dysplasia": 1}
y_true = res_df["GT_class"].map(label_map)
y_pred = res_df["Pred_class"].map(label_map)
cm = confusion_matrix(y_true, y_pred, labels=[0,1])
TN, FP, FN, TP = cm.ravel()
accuracy = (TP + TN) / cm.sum()
sensitivity = TP / (TP + FN) if (TP+FN) > 0 else 0
specificity = TN / (TN + FP) if (TN+FP) > 0 else 0

print("Confusion Matrix:\n", cm)
print(f"Accuracy   : {accuracy:.3f}")
print(f"Sensitivity: {sensitivity:.3f}")
print(f"Specificity: {specificity:.3f}")

# Confusion matrix heatmap
plt.figure(figsize=(5,4))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["Normal","Dysplasia"],
            yticklabels=["Normal","Dysplasia"])
plt.xlabel("Predicted"); plt.ylabel("True"); plt.title("Confusion Matrix")
plt.tight_layout()
plt.savefig(plots_dir / "confusion_matrix.png", dpi=300)
plt.close()

# Bar chart for metrics
metric_names = ["Accuracy", "Sensitivity", "Specificity"]
metric_values = [accuracy, sensitivity, specificity]
plt.figure(figsize=(5,4))
sns.barplot(x=metric_names, y=metric_values, palette="viridis")
plt.ylim(0,1)
for i, val in enumerate(metric_values):
    plt.text(i, val+0.01, f"{val:.2f}", ha='center', va='bottom')
plt.ylabel("Score"); plt.title("Evaluation Metrics")
plt.tight_layout()
plt.savefig(plots_dir / "metrics_barplot.png", dpi=300)
plt.close()

# Histogram of angles (GT vs Pred L/R)
for side in ["L", "R"]:
    plt.figure(figsize=(6,4))
    plt.hist(res_df[f"GT_angle_{side}"], bins=20, alpha=0.5, label=f"GT {side}")
    plt.hist(res_df[f"Pred_angle_{side}"], bins=20, alpha=0.5, label=f"Pred {side}")
    plt.axvline(ANGLE_THR, color='red', linestyle='--', label="Threshold")
    plt.xlabel("Angle (deg)"); plt.ylabel("Count")
    plt.title(f"Norberg Angle Distribution - {side}")
    plt.legend()
    plt.tight_layout()
    plt.savefig(plots_dir / f"angle_dist_{side}.png", dpi=300)
    plt.close()

print(f" Plots saved to: {plots_dir}")


Unexpected keys (classifier.bias, classifier.weight, conv_head.bias, conv_head.weight) found while loading pretrained weights. This may be expected if model is being adapted.


⚠ Missing image: /Users/aryan078/Desktop/CHD_project/data/check/Radiograph02_hip0.jpg


[ WARN:0@1829.234] global loadsave.cpp:275 findDecoder imread_('/Users/aryan078/Desktop/CHD_project/data/check/Radiograph02_hip0.jpg'): can't open/read file: check file path/integrity


✅ Results saved to: /Users/aryan078/Desktop/CHD_project/outputs/csv/val_predictions_with_metrics.csv
✅ Annotated images saved to: /Users/aryan078/Desktop/CHD_project/outputs/visualizations/annotated_preds
Confusion Matrix:
 [[ 7  6]
 [ 4 24]]
Accuracy   : 0.756
Sensitivity: 0.857
Specificity: 0.538
 Plots saved to: /Users/aryan078/Desktop/CHD_project/outputs/visualizations/evaluation_plots
