In [None]:
import pandas as pd
import numpy as np
import os
import gdown
import h5py
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.utils import compute_sample_weight
from PIL import Image
import io

# --- 1. Data Loading ---
if not os.path.exists('Download'):
    print('Downloading data...')
    gdown.download_folder('https://drive.google.com/drive/folders/1UKUZZ6uTdEVdGWEKNh0ZRp08pK_AVnrm', output='Download')
else:
    print('Data already downloaded.')

df = pd.read_csv('Download/subject_data.csv', low_memory=False)
images_file = h5py.File('Download/images.hdf5', 'r')

print('Data loaded successfully.')
print(f'Tabular data shape: {df.shape}')
print(f'Number of images in HDF5 file: {len(images_file.keys())}')

In [None]:
# --- 2. Feature Selection and Preprocessing ---

# Define feature sets based on notebook exploration
lesion_geometry = [
    'clin_size_long_diam_mm',
    'tbp_lv_areaMM2',
    'tbp_lv_perimeterMM',
    'tbp_lv_minorAxisMM',
    'tbp_lv_eccentricity',
    'tbp_lv_area_perim_ratio',
    'tbp_lv_norm_border',
    'tbp_lv_symm_2axis',
    'tbp_lv_symm_2axis_angle',
]

lesion_color_texture = [
    'tbp_lv_A', 'tbp_lv_Aext', 'tbp_lv_B', 'tbp_lv_Bext',
    'tbp_lv_C', 'tbp_lv_Cext', 'tbp_lv_H', 'tbp_lv_Hext', 'tbp_lv_L',
    'tbp_lv_Lext', 'tbp_lv_color_std_mean', 'tbp_lv_deltaA', 'tbp_lv_deltaB',
    'tbp_lv_deltaL', 'tbp_lv_deltaLBnorm', 'tbp_lv_norm_color',
    'tbp_lv_radial_color_std_max', 'tbp_lv_stdL', 'tbp_lv_stdLExt',
]

numerical_features = ['age_approx'] + lesion_geometry + lesion_color_texture
categorical_features = ['sex', 'tbp_lv_location']

# Handle missing values
df['age_approx'].fillna(df['age_approx'].median(), inplace=True)
df['sex'].fillna('unknown', inplace=True)
# For tbp_lv_location, the notebook showed no NaNs, but we'll add a fillna for robustness
df['tbp_lv_location'].fillna('unknown', inplace=True)

# Create the preprocessor
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numerical_features),
        ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)
    ])

print('Preprocessing pipeline created.')

# Fit and transform the data
X_tabular = preprocessor.fit_transform(df)
y = df['target'].values

print(f'Shape of preprocessed tabular data (X): {X_tabular.shape}')
print(f'Shape of target variable (y): {y.shape}')

In [None]:
# Add a function to get images, which will be needed by the PyTorch Dataset later
def get_img(isic_id):
    """Loads image from HDF5 file and returns as PIL Image."""
    try:
        # The image data is stored as a byte array, so we use io.BytesIO
        image_data = images_file[isic_id][()]
        image = Image.open(io.BytesIO(image_data)).convert('RGB')
        return image
    except KeyError:
        # Return a blank image if the ID is not found
        return Image.new('RGB', (100, 100), color = 'red')

In [None]:
# --- 3. Baseline Model (XGBoost on Tabular Data) ---
import xgboost as xgb
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import roc_auc_score
import numpy as np

print('Starting baseline model training...')

# Calculate scale_pos_weight for handling class imbalance
scale_pos_weight = np.sum(y == 0) / np.sum(y == 1)
print(f'Calculated scale_pos_weight: {scale_pos_weight:.2f}')

# Setup Stratified K-Fold Cross-Validation
n_splits = 5
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
auc_scores = []

X_tabular_dense = X_tabular.toarray() # Convert to dense array for XGBoost

for fold, (train_index, val_index) in enumerate(skf.split(X_tabular_dense, y)):
    print(f'--- Fold {fold+1}/{n_splits} ---')
    X_train, X_val = X_tabular_dense[train_index], X_tabular_dense[val_index]
    y_train, y_val = y[train_index], y[val_index]

    # Initialize and train the XGBoost model
    model = xgb.XGBClassifier(
        objective='binary:logistic',
        eval_metric='logloss',
        scale_pos_weight=scale_pos_weight,
        use_label_encoder=False, # Deprecated
        random_state=42
    )
    model.fit(X_train, y_train)

    # Predict probabilities and calculate AUC
    y_pred_proba = model.predict_proba(X_val)[:, 1]
    auc = roc_auc_score(y_val, y_pred_proba)
    auc_scores.append(auc)
    print(f'AUC for fold {fold+1}: {auc:.4f}')

print('\n--- Cross-Validation Results ---')
print(f'Mean AUC: {np.mean(auc_scores):.4f}')
print(f'Standard Deviation of AUC: {np.std(auc_scores):.4f}')

In [None]:
# --- 4. Advanced Multi-modal Model --- 
# First, let's set up the PyTorch environment, Dataset, and DataLoaders

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import torchvision.transforms as transforms
import timm # PyTorch Image Models library
from sklearn.model_selection import train_test_split

# Define Focal Loss
class FocalLoss(nn.Module):
    def __init__(self, alpha=0.25, gamma=2.0, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        BCE_loss = F.cross_entropy(inputs, targets, reduction='none')
        pt = torch.exp(-BCE_loss)
        F_loss = self.alpha * (1-pt)**self.gamma * BCE_loss

        if self.reduction == 'mean':
            return torch.mean(F_loss)
        elif self.reduction == 'sum':
            return torch.sum(F_loss)
        else:
            return F_loss

# Custom PyTorch Dataset
class MultiModalDataset(Dataset):
    def __init__(self, dataframe, tabular_data, image_getter, transform=None):
        self.dataframe = dataframe
        self.tabular_data = torch.tensor(tabular_data.toarray(), dtype=torch.float32)
        self.image_getter = image_getter
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        isic_id = row['isic_id']
        image = self.image_getter(isic_id)
        tabular = self.tabular_data[idx]
        label = torch.tensor(row['target'], dtype=torch.long)

        if self.transform:
            image = self.transform(image)
            
        return {'image': image, 'tabular': tabular}, label

# Data Augmentation and Normalization
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Stratified Split for train/val/test
train_val_df, test_df, X_train_val_tab, X_test_tab, y_train_val, y_test = train_test_split(
    df, X_tabular, y, test_size=0.15, random_state=42, stratify=y
)
train_df, val_df, X_train_tab, X_val_tab, y_train, y_val = train_test_split(
    train_val_df, X_train_val_tab, y_train_val, test_size=0.18, random_state=42, stratify=y_train_val # 0.18 * 0.85 = ~0.15
)

train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

# Create Datasets
train_dataset = MultiModalDataset(train_df, X_train_tab, get_img, transform=train_transform)
val_dataset = MultiModalDataset(val_df, X_val_tab, get_img, transform=val_transform)
test_dataset = MultiModalDataset(test_df, X_test_tab, get_img, transform=val_transform)

# WeightedRandomSampler for training to handle imbalance
class_weights = compute_sample_weight(class_weight='balanced', y=train_df['target'])
sampler = WeightedRandomSampler(torch.DoubleTensor(class_weights), len(class_weights), replacement=True)

# Create DataLoaders
BATCH_SIZE = 32
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=sampler)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

print(f'Train samples: {len(train_dataset)}, Val samples: {len(val_dataset)}, Test samples: {len(test_dataset)}')
print('DataLoaders created successfully.')

In [None]:
# --- 5. Model Architecture (Cross-Attention) ---

class CrossAttention(nn.Module):
    def __init__(self, in_dim_q, in_dim_kv, out_dim, num_heads=4):
        super(CrossAttention, self).__init__()
        self.num_heads = num_heads
        self.head_dim = out_dim // num_heads
        self.scale = self.head_dim ** -0.5
        
        self.to_q = nn.Linear(in_dim_q, out_dim, bias=False)
        self.to_kv = nn.Linear(in_dim_kv, out_dim * 2, bias=False)
        self.to_out = nn.Linear(out_dim, out_dim)

    def forward(self, q, kv):
        q = self.to_q(q)
        k, v = self.to_kv(kv).chunk(2, dim=-1)
        
        # Reshape for multi-head attention
        b, _, n, _ = q.shape
        q = q.view(b, n, self.num_heads, self.head_dim).transpose(1, 2)
        k = k.view(b, n, self.num_heads, self.head_dim).transpose(1, 2)
        v = v.view(b, n, self.num_heads, self.head_dim).transpose(1, 2)
        
        attn_scores = (q @ k.transpose(-2, -1)) * self.scale
        attn = F.softmax(attn_scores, dim=-1)
        
        out = (attn @ v).transpose(1, 2).reshape(b, n, -1)
        
        return self.to_out(out)

class MultiModalModel(nn.Module):
    def __init__(self, image_backbone_name, tabular_in_features, num_classes=2):
        super(MultiModalModel, self).__init__()
        # Image Branch
        self.image_backbone = timm.create_model(image_backbone_name, pretrained=True, num_classes=0) # num_classes=0 removes head
        image_out_features = self.image_backbone.num_features
        
        # Tabular Branch
        self.tabular_mlp = nn.Sequential(
            nn.Linear(tabular_in_features, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 256)
        )
        tabular_out_features = 256
        
        # Cross-Attention Fusion
        self.img_to_tab_attn = CrossAttention(image_out_features, tabular_out_features, 128)
        self.tab_to_img_attn = CrossAttention(tabular_out_features, image_out_features, 128)
        
        # Classifier
        self.classifier = nn.Sequential(
            nn.Linear(image_out_features + tabular_out_features + 256, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        image_features = self.image_backbone(x['image'])
        tabular_features = self.tabular_mlp(x['tabular'])
        
        # Add sequence dimension for attention
        img_q = image_features.unsqueeze(1)
        tab_q = tabular_features.unsqueeze(1)
        
        img_attended = self.tab_to_img_attn(tab_q, img_q).squeeze(1)
        tab_attended = self.img_to_tab_attn(img_q, tab_q).squeeze(1)
        
        # Fusion
        combined_features = torch.cat([image_features, tabular_features, img_attended, tab_attended], dim=1)
        
        output = self.classifier(combined_features)
        return output

print('Model architecture defined.')

In [None]:
# --- 6. Training and Evaluation Loop ---
from sklearn.metrics import roc_auc_score, f1_score, precision_score, recall_score
from tqdm.auto import tqdm

def train_and_evaluate(model, train_loader, val_loader, model_name):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    
    criterion = FocalLoss(alpha=0.75, gamma=2.0) # Higher alpha for the positive class
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-3)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=2, factor=0.5)
    
    best_val_auc = 0.0
    epochs = 5 # A smaller number of epochs for this demonstration
    
    print(f'\n--- Starting Training for {model_name} ---')
    
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        progress_bar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{epochs} [Train]', leave=False)
        for data, target in progress_bar:
            image, tabular = data['image'].to(device), data['tabular'].to(device)
            target = target.to(device)
            
            optimizer.zero_grad()
            output = model({'image': image, 'tabular': tabular})
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            progress_bar.set_postfix({'loss': f'{loss.item():.4f}'})
        
        model.eval()
        val_preds = []
        val_targets = []
        with torch.no_grad():
            for data, target in tqdm(val_loader, desc=f'Epoch {epoch+1}/{epochs} [Val]', leave=False):
                image, tabular = data['image'].to(device), data['tabular'].to(device)
                output = model({'image': image, 'tabular': tabular})
                val_preds.extend(output.softmax(1)[:, 1].cpu().numpy())
                val_targets.extend(target.cpu().numpy())
        
        val_auc = roc_auc_score(val_targets, val_preds)
        scheduler.step(val_auc)
        
        print(f'Epoch {epoch+1} | Train Loss: {train_loss/len(train_loader):.4f} | Val AUC: {val_auc:.4f}')
        
        if val_auc > best_val_auc:
            best_val_auc = val_auc
            torch.save(model.state_dict(), f'{model_name}_best.pth')
            print(f'   -> New best model saved with AUC: {val_auc:.4f}')
            
    return best_val_auc

# --- 7. Instantiate and Train Models ---
tabular_input_dim = X_train_tab.toarray().shape[1]

# Model A: ConvNeXt
convnext_model = MultiModalModel('convnext_tiny.in12k_ft_in1k', tabular_input_dim)
best_auc_convnext = train_and_evaluate(convnext_model, train_loader, val_loader, 'convnext')

# Model B: Vision Transformer (ViT)
vit_model = MultiModalModel('vit_base_patch16_224.augreg_in21k', tabular_input_dim)
best_auc_vit = train_and_evaluate(vit_model, train_loader, val_loader, 'vit')

print('\n--- Training Complete ---')
print(f'Best ConvNeXt AUC: {best_auc_convnext:.4f}')
print(f'Best ViT AUC: {best_auc_vit:.4f}')

In [None]:
# --- 8. Final Evaluation on Test Set ---
from sklearn.metrics import classification_report

def evaluate_final_model(model, loader, device):
    model.eval()
    all_preds_proba = []
    all_targets = []
    with torch.no_grad():
        for data, target in loader:
            image, tabular = data['image'].to(device), data['tabular'].to(device)
            output = model({'image': image, 'tabular': tabular})
            all_preds_proba.extend(output.softmax(1)[:, 1].cpu().numpy())
            all_targets.extend(target.cpu().numpy())
    return all_targets, all_preds_proba

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
results = {}

# Evaluate ConvNeXt
print('Evaluating ConvNeXt model...')
convnext_model.load_state_dict(torch.load('convnext_best.pth'))
convnext_model.to(device)
targets_cn, preds_proba_cn = evaluate_final_model(convnext_model, test_loader, device)
preds_cn = (np.array(preds_proba_cn) > 0.5).astype(int)
results['ConvNeXt'] = {
    'AUC': roc_auc_score(targets_cn, preds_proba_cn),
    'Precision': precision_score(targets_cn, preds_cn),
    'Recall': recall_score(targets_cn, preds_cn),
    'F1': f1_score(targets_cn, preds_cn)
}
print(classification_report(targets_cn, preds_cn, target_names=['Benign', 'Malignant']))

# Evaluate ViT
print('Evaluating ViT model...')
vit_model.load_state_dict(torch.load('vit_best.pth'))
vit_model.to(device)
targets_vit, preds_proba_vit = evaluate_final_model(vit_model, test_loader, device)
preds_vit = (np.array(preds_proba_vit) > 0.5).astype(int)
results['ViT'] = {
    'AUC': roc_auc_score(targets_vit, preds_proba_vit),
    'Precision': precision_score(targets_vit, preds_vit),
    'Recall': recall_score(targets_vit, preds_vit),
    'F1': f1_score(targets_vit, preds_vit)
}
print(classification_report(targets_vit, preds_vit, target_names=['Benign', 'Malignant']))

# Evaluate XGBoost Baseline
print('Evaluating XGBoost baseline model...')
xgb_final = xgb.XGBClassifier(objective='binary:logistic', scale_pos_weight=scale_pos_weight, use_label_encoder=False, random_state=42)
xgb_final.fit(X_train_val_tab.toarray(), y_train_val)
xgb_preds_proba = xgb_final.predict_proba(X_test_tab.toarray())[:, 1]
xgb_preds = (xgb_preds_proba > 0.5).astype(int)
results['XGBoost'] = {
    'AUC': roc_auc_score(y_test, xgb_preds_proba),
    'Precision': precision_score(y_test, xgb_preds),
    'Recall': recall_score(y_test, xgb_preds),
    'F1': f1_score(y_test, xgb_preds)
}
print(classification_report(y_test, xgb_preds, target_names=['Benign', 'Malignant']))

results_df = pd.DataFrame(results).T
print('\n--- Final Model Comparison ---')
print(results_df)

## Conclusion

Based on the evaluation metrics from the test set, the best performing model can be selected. Key metrics to consider are AUC, which gives a general measure of model quality, and Recall, which is critical in this medical context to minimize the number of missed malignant cases (false negatives). The F1-score provides a good balance between Precision and Recall.

The final `results_df` dataframe provides a clear summary to make an informed decision.