import os
os.environ['OPENCV_LOG_LEVEL'] = 'SILENT'
# Option 2 — Multi-region (direct classification)

Direct classification using MobileNetV2 (no SSL pretraining). Keeps dataset, splits, and augmentations from the original option2 notebook.

In [None]:
# Imports & config
import os
from pathlib import Path
import numpy as np
import pandas as pd
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from sklearn.metrics import roc_auc_score

class CFG:
    img_size = 224
    batch_size = 32
    epochs = 8
    lr = 1e-4
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    subset_size = None
    encoder_backbone = 'mobilenet_v2'  # 'mobilenet_v2' or 'custom'
cfg = CFG()

print(cfg.device)
print(f"Encoder backbone: {cfg.encoder_backbone}")

# Specify your custom folder path here
CUSTOM_DATA_PATH = "datasets"  # Change this to your desired folder

# Create the folder if it doesn't exist
os.makedirs(CUSTOM_DATA_PATH, exist_ok=True)


In [None]:
# Load dataset (same as original option2)
try:
    import kagglehub
    path = kagglehub.dataset_download("khanfashee/nih-chest-x-ray-14-224x224-resized")
    BASE_PATH = Path(path)
except Exception:
    BASE_PATH = Path('.')
df = pd.read_csv(BASE_PATH / 'Data_Entry_2017.csv')
images_dir = BASE_PATH / 'images-224' / 'images-224'
df['Image Path'] = [str(images_dir / p) for p in df['Image Index'].values]

DISEASE_CATEGORIES = [
    'Atelectasis','Cardiomegaly','Effusion','Infiltration','Mass',
    'Nodule','Pneumonia','Pneumothorax','Consolidation','Edema',
    'Emphysema','Fibrosis','Pleural_Thickening','Hernia'
]
for disease in DISEASE_CATEGORIES:
    df[disease] = df['Finding Labels'].apply(lambda x: 1 if disease in x else 0)

print('Loaded', len(df))

In [None]:
# Patient-level split
from sklearn.model_selection import train_test_split
unique_patients = df['Patient ID'].unique()
train_val_patients, test_patients = train_test_split(unique_patients, test_size=0.02, random_state=42)
train_patients, val_patients = train_test_split(train_val_patients, test_size=0.052, random_state=42)
train_df = df[df['Patient ID'].isin(train_patients)].copy()
val_df = df[df['Patient ID'].isin(val_patients)].copy()
test_df = df[df['Patient ID'].isin(test_patients)].copy()
if cfg.subset_size:
    train_df = train_df.head(cfg.subset_size)

print('Train/Val/Test:', len(train_df), len(val_df), len(test_df))

In [None]:
# Dataset (same augmentation pattern used previously)
class ClassificationDataset(Dataset):
    def __init__(self, df, disease_categories, img_size=224, is_training=False):
        self.df = df.copy().reset_index(drop=True)
        self.disease_categories = disease_categories
        self.img_size = img_size
        self.is_training = is_training
    def __len__(self):
        return len(self.df)
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img = Image.open(row['Image Path']).convert('L')
        img = img.resize((self.img_size, self.img_size), Image.LANCZOS)
        img = np.array(img, dtype=np.float32) / 255.0
        if self.is_training:
            if np.random.random() > 0.5:
                img = np.fliplr(img).copy()
            img = img * (0.8 + 0.4 * np.random.random())
            mean = img.mean()
            img = (img - mean) * (0.8 + 0.4 * np.random.random()) + mean
            if np.random.random() > 0.5:
                img = rotate(img, np.random.uniform(-10,10), reshape=False, mode='constant', cval=0)
            img = np.clip(img, 0, 1)
        img = torch.tensor(img, dtype=torch.float32).unsqueeze(0)
        labels = torch.tensor([row[d] for d in self.disease_categories], dtype=torch.float32)
        return img, labels

train_ds = ClassificationDataset(train_df, DISEASE_CATEGORIES, cfg.img_size, is_training=True)
val_ds = ClassificationDataset(val_df, DISEASE_CATEGORIES, cfg.img_size, is_training=False)
test_ds = ClassificationDataset(test_df, DISEASE_CATEGORIES, cfg.img_size, is_training=False)

train_loader = DataLoader(train_ds, batch_size=cfg.batch_size, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_ds, batch_size=cfg.batch_size, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_ds, batch_size=cfg.batch_size, shuffle=False, num_workers=2, pin_memory=True)

print('Data ready')

In [None]:
# Model with encoder backbone selection
class MobileNetV2Encoder(nn.Module):
    """MobileNetV2 Encoder backbone for feature extraction"""
    
    def __init__(self, in_channels=1, feat_dim=256, pretrained=True):
        super().__init__()
        from torchvision import models
        
        # Load pretrained MobileNetV2 (expects 3 channels)
        mobilenet = models.mobilenet_v2(pretrained=pretrained)
        
        # Adapt for grayscale (1 channel) input
        original_conv = mobilenet.features[0][0]
        new_conv = nn.Conv2d(in_channels, 32, kernel_size=3, stride=2, padding=1, bias=False)
        
        # Initialize with average of RGB weights if converting from pretrained
        if pretrained and in_channels == 1:
            new_conv.weight.data = original_conv.weight.data.mean(dim=1, keepdim=True)
        
        mobilenet.features[0][0] = new_conv
        
        # Extract feature extractor (everything before classifier)
        self.features = mobilenet.features
        self.global_pool = nn.AdaptiveAvgPool2d((1, 1))
        
        # MobileNetV2 output is 1280 channels
        self.fc = nn.Sequential(
            nn.Linear(1280, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, feat_dim)
        )
    
    def forward(self, x):
        x = self.features(x)
        x = self.global_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


class CustomEncoder(nn.Module):
    """Custom CNN encoder backbone"""
    
    def __init__(self, in_channels=1, feat_dim=256):
        super().__init__()
        self.features = nn.Sequential(
            nn.Conv2d(in_channels, 32, 3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            
            nn.Conv2d(32, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            
            nn.Conv2d(64, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2),
            
            nn.Conv2d(128, 256, 3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1))
        )
        self.fc = nn.Sequential(
            nn.Linear(256, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, feat_dim)
        )
    
    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


def get_model(num_classes, backbone='mobilenet_v2', pretrained=True):
    """Create model with specified backbone"""
    if backbone == 'mobilenet_v2':
        mobilenet = models.mobilenet_v2(pretrained=pretrained)
        # Adapt first layer for grayscale input
        original_conv = mobilenet.features[0][0]
        new_conv = nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1, bias=False)
        if pretrained:
            new_conv.weight.data = original_conv.weight.data.mean(dim=1, keepdim=True)
        mobilenet.features[0][0] = new_conv
        
        in_features = mobilenet.classifier[1].in_features
        mobilenet.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(in_features, num_classes)
        )
        return mobilenet
    else:  # custom
        encoder = CustomEncoder(in_channels=1, feat_dim=512)
        classifier = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, num_classes)
        )
        return nn.Sequential(encoder, classifier)


model = get_model(len(DISEASE_CATEGORIES), cfg.encoder_backbone, pretrained=True).to(cfg.device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=cfg.lr)

print(f"Model created with backbone: {cfg.encoder_backbone}")
print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")

def train_epoch(model, loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for imgs, targets in loader:
        imgs = imgs.to(device)
        targets = targets.to(device)
        optimizer.zero_grad()
        outputs = model(imgs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * imgs.size(0)
    return running_loss / len(loader.dataset)

@torch.no_grad()
def validate(model, loader, device):
    model.eval()
    all_targets, all_preds = [], []
    for imgs, targets in loader:
        imgs = imgs.to(device)
        outputs = model(imgs)
        preds = torch.sigmoid(outputs).cpu().numpy()
        all_preds.append(preds)
        all_targets.append(targets.numpy())
    all_preds = np.vstack(all_preds)
    all_targets = np.vstack(all_targets)
    aucs = []
    for i in range(all_targets.shape[1]):
        try:
            aucs.append(roc_auc_score(all_targets[:,i], all_preds[:,i]))
        except Exception:
            aucs.append(np.nan)
    return np.nanmean(aucs), aucs

best_auc = 0.0
for epoch in range(cfg.epochs):
    train_loss = train_epoch(model, train_loader, criterion, optimizer, cfg.device)
    val_auc, _ = validate(model, val_loader, cfg.device)
    print(f'Epoch {epoch+1}/{cfg.epochs} - loss {train_loss:.4f} - val AUC {val_auc:.4f}')
    if val_auc > best_auc:
        best_auc = val_auc
        torch.save(model.state_dict(), f'option2_{cfg.encoder_backbone}_best.pth')

print('Done. Best val AUC:', best_auc)
