# Harvard-GF 데이터셋 기반 의료 AI 편향성 분석

이 노트북은 **Harvard Glaucoma Fairness (Harvard-GF)** 데이터셋을 사용하여, 녹내장 진단 AI 모델의 인종 간 편향성을 심층적으로 분석하는 과정을 담고 있습니다.

FairFace 분석 노트북(`AIBiasExperiment.ipynb`)을 템플릿으로 삼아, 의료 영상 데이터셋의 특성에 맞게 데이터 로더와 일부 설정을 수정하였습니다.

**실험 목표:**
1.  **Baseline 모델 구축**: 안구 fundus 이미지로 CNN 모델(ResNet18)을 학습하고, 녹내장 진단에 대한 기본적인 성능과 인종 그룹별 공정성 지표를 측정합니다.
2.  **Micro-Bias Sensitivity Curve 분석**: 데이터셋의 인종 그룹 비율을 미세하게 조정했을 때, 모델의 진단 성능이 얼마나 민감하게 변하는지 측정합니다.
3.  **Over-Correction Damage Index (ODI) 계산**: 편향 완화 기법(Reweighing) 적용 시 발생하는 성능 저하(Trade-off)를 정량적으로 분석합니다.
4.  **Hidden Subgroup Discovery**: 특정 인종과 다른 임상 정보(e.g., 연령대)가 조합된 하위집단에서 발생하는 잠재적 편향을 탐지합니다.

---

## 단계 1: 실험 환경 설정 및 데이터셋 준비

의료 영상 처리에 필요한 라이브러리를 임포트하고, Harvard-GF 데이터셋을 불러올 준비를 합니다. 이미지를 모델이 학습할 수 있는 형태(Tensor)로 변환하는 PyTorch `Dataset`과 `DataLoader`를 정의합니다.


In [None]:
# === 1.1 라이브러리 임포트 ===
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import time
import copy
from tqdm.notebook import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, Sampler
from torchvision import transforms, models
from torchvision.models import ResNet18_Weights
from PIL import Image
from collections import defaultdict

# 경고 메시지 무시
import warnings
warnings.filterwarnings('ignore')

# Matplotlib 스타일 설정
plt.style.use('seaborn-v0_8-whitegrid')

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")


# === 1.2 주요 설정 (Configuration) ===
# TODO: README에 따라 Harvard-GF 데이터셋을 'data/harvard_gf/' 경로에 준비해주세요.
DATA_DIR = './data/harvard_gf/images/' # 실제 이미지 폴더 경로
LABEL_FILE = './data/harvard_gf/labels.csv' # 실제 라벨 파일 경로

# 실험 속성
TARGET_ATTR = 'glaucoma'  # 예측 대상: 녹내장 유무 (Binary)
SENSITIVE_ATTR = 'race' # 민감 속성

# 모델 및 학습 하이퍼파라미터
BATCH_SIZE = 32 # 의료 이미지는 크기가 클 수 있으므로 배치 사이즈 조정
NUM_WORKERS = 0
NUM_EPOCHS = 15 # 의료 데이터는 더 많은 학습이 필요할 수 있음
LEARNING_RATE = 0.001
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 파일 저장 경로
MODEL_SAVE_PATH = 'best_harvardgf_model.pth'
RESULTS_DIR = 'results_harvardgf'
os.makedirs(RESULTS_DIR, exist_ok=True)

print(f"Device: {DEVICE}")
print(f"Target Attribute: {TARGET_ATTR}")
print(f"Sensitive Attribute: {SENSITIVE_ATTR}")


In [None]:
# === 1.3 Harvard-GF 데이터셋 클래스 정의 ===

class HarvardGFDataset(Dataset):
    """Harvard-GF 의료 이미지 데이터셋을 위한 PyTorch Dataset 클래스"""
    
    def __init__(self, data_dir, label_file, transform=None, indices=None):
        self.data_dir = data_dir
        self.transform = transform
        
        try:
            full_df = pd.read_csv(label_file)
            # 제공된 인덱스가 있으면 해당 부분만 사용 (train/val 분할용)
            self.labels_df = full_df.iloc[indices].reset_index(drop=True) if indices is not None else full_df
        except FileNotFoundError:
            print(f"Error: Label file not found at {label_file}")
            self.labels_df = pd.DataFrame()
            return
        
        # TODO: 실제 라벨 파일의 컬럼명 확인 필요
        # 예: 'image_filename', 'glaucoma_diagnosis', 'patient_race' 등
        self.image_col = 'filename' 
        self.target_col = 'glaucoma'
        self.sensitive_col = 'race'
        
        # 라벨 인코딩
        self.glaucoma_map = {label: i for i, label in enumerate(sorted(self.labels_df[self.target_col].unique()))}
        self.race_map = {label: i for i, label in enumerate(sorted(self.labels_df[self.sensitive_col].unique()))}
        self.idx_to_glaucoma = {v: k for k, v in self.glaucoma_map.items()}
        self.idx_to_race = {v: k for k, v in self.race_map.items()}

        print(f"Dataset loaded. Number of samples: {len(self.labels_df)}")

    def __len__(self):
        return len(self.labels_df)

    def __getitem__(self, idx):
        img_path = os.path.join(self.data_dir, self.labels_df.loc[idx, self.image_col])
        try:
            image = Image.open(img_path).convert('RGB')
        except FileNotFoundError:
            return self.__getitem__((idx + 1) % len(self))
            
        glaucoma = self.glaucoma_map[self.labels_df.loc[idx, self.target_col]]
        race = self.race_map[self.labels_df.loc[idx, self.sensitive_col]]

        if self.transform:
            image = self.transform(image)
            
        sample = {
            'image': image, 
            'glaucoma': torch.tensor(glaucoma, dtype=torch.long), 
            'race': torch.tensor(race, dtype=torch.long)
        }
        return sample


In [None]:
# === 1.4 데이터 변환, 분할 및 데이터로더 생성 ===
from sklearn.model_selection import train_test_split

normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
        transforms.ToTensor(),
        normalize
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        normalize
    ]),
}

try:
    full_df = pd.read_csv(LABEL_FILE)
    # train/val 분할
    train_indices, val_indices = train_test_split(
        range(len(full_df)),
        test_size=0.2,
        random_state=42,
        stratify=full_df[TARGET_ATTR]
    )

    image_datasets = {
        'train': HarvardGFDataset(data_dir=DATA_DIR, label_file=LABEL_FILE, transform=data_transforms['train'], indices=train_indices),
        'val': HarvardGFDataset(data_dir=DATA_DIR, label_file=LABEL_FILE, transform=data_transforms['val'], indices=val_indices)
    }

    dataloaders = {
        x: DataLoader(image_datasets[x], batch_size=BATCH_SIZE, shuffle=True if x == 'train' else False, num_workers=NUM_WORKERS)
        for x in ['train', 'val']
    }
    dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}

    print(f"DataLoaders created. Train size: {dataset_sizes['train']}, Val size: {dataset_sizes['val']}")
    
    label_maps = {
        'glaucoma': image_datasets['train'].glaucoma_map,
        'race': image_datasets['train'].race_map
    }

except Exception as e:
    print(f"An error occurred: {e}")
    print("Please check file paths and column names in the configuration cell.")
    dataloaders = None


---
## 단계 2: Baseline 모델 구축 및 학습

Harvard-GF 데이터셋 준비가 완료되었으니, 사전 학습된 ResNet18 모델을 기반으로 녹내장 진단 Baseline 모델을 구축하고 fine-tuning을 진행합니다. 이 과정은 FairFace 분석과 동일한 모델 구조와 학습 방식을 사용합니다.


In [None]:
# === 2.1 모델 정의 및 학습/평가 함수 ===

def get_model(num_classes, pretrained=True):
    weights = ResNet18_Weights.IMAGENET1K_V1 if pretrained else None
    model = models.resnet18(weights=weights)
    if pretrained:
        for param in model.parameters():
            param.requires_grad = False
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, num_classes)
    return model.to(DEVICE)

def train_model(model, criterion, optimizer, dataloaders, num_epochs=10):
    since = time.time()
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}'); print('-' * 10)
        for phase in ['train', 'val']:
            model.train() if phase == 'train' else model.eval()
            running_loss, running_corrects = 0.0, 0
            for batch in tqdm(dataloaders[phase], desc=f"{phase.capitalize()}"):
                inputs = batch['image'].to(DEVICE)
                labels = batch[TARGET_ATTR].to(DEVICE)
                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    if phase == 'train':
                        loss.backward(); optimizer.step()
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]
            print(f'{phase.capitalize()} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            history[f'{phase}_loss'].append(epoch_loss)
            history[f'{phase}_acc'].append(epoch_acc.item())

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
                torch.save(model.state_dict(), MODEL_SAVE_PATH)
                print(f"** Best val acc updated: {best_acc:.4f}, Model saved.")

    time_elapsed = time.time() - since
    print(f'\nTraining complete in {time_elapsed//60:.0f}m {time_elapsed%60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')
    model.load_state_dict(best_model_wts)
    return model, history

def plot_training_history(history):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    ax1.plot(history['train_loss'], label='Train Loss'); ax1.plot(history['val_loss'], label='Val Loss')
    ax1.set_title('Model Loss'); ax1.set_xlabel('Epochs'); ax1.set_ylabel('Loss'); ax1.legend()
    ax2.plot(history['train_acc'], label='Train Acc'); ax2.plot(history['val_acc'], label='Val Acc')
    ax2.set_title('Model Accuracy'); ax2.set_xlabel('Epochs'); ax2.set_ylabel('Accuracy'); ax2.legend()
    fig.tight_layout()
    plt.savefig(os.path.join(RESULTS_DIR, 'harvardgf_baseline_training_history.png'))
    plt.show()

print("Model and helper functions are defined.")


In [None]:
# === 2.2 Baseline 모델 학습 실행 ===

if dataloaders:
    # 녹내장 유무는 Binary classification (2 classes)
    NUM_CLASSES = len(label_maps[TARGET_ATTR])
    baseline_model = get_model(num_classes=NUM_CLASSES, pretrained=True)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(baseline_model.fc.parameters(), lr=LEARNING_RATE, momentum=0.9)

    baseline_model, history = train_model(baseline_model, criterion, optimizer, dataloaders, num_epochs=NUM_EPOCHS)
    
    plot_training_history(history)
else:
    print("Dataloaders are not available. Cannot start training.")


---
### 단계 2.3: Baseline 모델 성능 및 공정성 평가

학습된 녹내장 진단 모델의 전반적인 성능과 인종 그룹별 공정성 지표를 평가합니다.


In [None]:
# === 2.3 성능 및 공정성 평가 함수 정의 및 실행 ===

def evaluate_fairness(model, dataloader, criterion):
    model.eval()
    y_true, y_pred, sensitive_attrs = [], [], []
    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            inputs = batch['image'].to(DEVICE)
            labels = batch[TARGET_ATTR].to(DEVICE)
            s_attrs = batch[SENSITIVE_ATTR].to(DEVICE)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())
            sensitive_attrs.extend(s_attrs.cpu().numpy())

    y_true, y_pred, sensitive_attrs = np.array(y_true), np.array(y_pred), np.array(sensitive_attrs)
    results = {'overall_acc': np.mean(y_true == y_pred)}
    
    group_accuracies, group_sizes = {}, {}
    s_map = dataloader.dataset.idx_to_race
    for group_idx, group_name in s_map.items():
        mask = (sensitive_attrs == group_idx)
        if mask.sum() > 0:
            group_accuracies[group_name] = np.mean(y_true[mask] == y_pred[mask])
            group_sizes[group_name] = mask.sum()
    
    results['group_accuracies'] = group_accuracies
    results['group_sizes'] = group_sizes
    if group_accuracies:
        results['accuracy_gap'] = max(group_accuracies.values()) - min(group_accuracies.values())
    return results

def plot_fairness_results(results):
    plt.figure(figsize=(12, 6))
    bars = plt.bar(results['group_accuracies'].keys(), results['group_accuracies'].values(), color=sns.color_palette('viridis', len(results['group_accuracies'])))
    plt.title('Harvard-GF Baseline Model Accuracy by Race', fontsize=16)
    plt.xlabel('Race'); plt.ylabel('Accuracy'); plt.ylim(0, 1.0)
    plt.axhline(y=results['overall_acc'], color='r', linestyle='--', label=f"Overall Acc: {results['overall_acc']:.3f}")
    plt.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(RESULTS_DIR, 'harvardgf_baseline_fairness_evaluation.png'))
    plt.show()

# === 평가 실행 ===
if dataloaders:
    model_to_evaluate = get_model(num_classes=len(label_maps[TARGET_ATTR]), pretrained=False)
    try:
        model_to_evaluate.load_state_dict(torch.load(MODEL_SAVE_PATH))
        baseline_results = evaluate_fairness(model_to_evaluate, dataloaders['val'], nn.CrossEntropyLoss())
        print("\n--- Harvard-GF Baseline Model Evaluation Results ---")
        print(f"Overall Accuracy: {baseline_results['overall_acc']:.4f}")
        print("\nGroup Accuracies:")
        for group, acc in baseline_results['group_accuracies'].items():
            print(f"  - {group}: {acc:.4f} (n={baseline_results['group_sizes'][group]})")
        print(f"\nMaximum Accuracy Gap: {baseline_results.get('accuracy_gap', 0):.4f}")
        plot_fairness_results(baseline_results)
    except FileNotFoundError:
        print(f"Error: Saved model not found. Please run the training cell first.")
else:
    print("Dataloaders not available.")


---
## 단계 3: Micro-Bias Sensitivity Curve (편향 민감도 곡선) 분석

데이터셋의 인종 비율 변화에 모델의 진단 성능이 얼마나 민감하게 반응하는지 측정합니다.


In [None]:
# === 3.1 편향 민감도 곡선 분석 ===

# TODO: BIAS_INJECTION_GROUP을 Harvard-GF 데이터셋에 존재하는 인종 그룹명으로 수정해야 합니다.
BIAS_INJECTION_GROUP = 'White' # 예시
BIAS_STEPS = np.arange(0.2, 0.55, 0.05)

class HGFBiasedSampler(Sampler):
    def __init__(self, dataset, target_group_name, desired_group_ratio):
        self.dataset = dataset
        full_df = pd.read_csv(LABEL_FILE).iloc[dataset.indices]
        target_mask = (full_df[SENSITIVE_ATTR] == target_group_name)
        self.target_indices = full_df[target_mask].index.tolist()
        self.other_indices = full_df[~target_mask].index.tolist()
        self.num_target_samples = int(len(dataset) * desired_group_ratio)
        self.num_other_samples = len(dataset) - self.num_target_samples

    def __iter__(self):
        target_samples = np.random.choice(self.target_indices, self.num_target_samples, replace=True)
        other_samples = np.random.choice(self.other_indices, self.num_other_samples, replace=True)
        final_indices = np.concatenate([target_samples, other_samples])
        np.random.shuffle(final_indices)
        return iter(final_indices)
    def __len__(self): return len(self.dataset)

def run_sensitivity_analysis(model, dataset):
    results_list = []
    model.eval()
    for ratio in tqdm(BIAS_STEPS, desc="Analyzing Bias Sensitivity"):
        sampler = HGFBiasedSampler(dataset, BIAS_INJECTION_GROUP, ratio)
        # 중요: Sampler가 전체 데이터셋의 인덱스를 반환하므로, DataLoader의 dataset도 full_dataset을 가리키도록 해야함.
        # 이 경우, Subset이 아닌 원본 Dataset 클래스에 인덱스를 전달하는 방식으로 재구성 필요.
        # 여기서는 편의상 매번 데이터셋을 다시 필터링하여 생성합니다.
        
        # Sampler가 반환하는 인덱스로 새로운 Subset 생성
        sampled_indices = list(iter(sampler))
        sampled_subset = torch.utils.data.Subset(dataset.dataset, sampled_indices)
        dataloader = DataLoader(sampled_subset, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS)

        eval_results = evaluate_fairness(model, dataloader, nn.CrossEntropyLoss())
        eval_results['bias_ratio'] = ratio
        results_list.append(eval_results)
    return pd.DataFrame(results_list)

# --- 분석 실행 및 시각화 ---
if 'baseline_model' in locals():
    sensitivity_results_df = run_sensitivity_analysis(baseline_model, image_datasets['val'])
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 7))
    fig.suptitle(f"Harvard-GF - Micro-Bias Sensitivity Curve", fontsize=16)
    ax1.plot(sensitivity_results_df['bias_ratio'], sensitivity_results_df['overall_acc'], marker='o', label='Overall Acc')
    ax1.set_xlabel(f"Proportion of '{BIAS_INJECTION_GROUP}' samples"); ax1.set_ylabel('Accuracy')
    ax1.set_title('Accuracy vs. Bias'); ax1.legend()
    ax2.plot(sensitivity_results_df['bias_ratio'], sensitivity_results_df['accuracy_gap'], marker='o', color='r', label='Max Acc Gap')
    ax2.set_xlabel(f"Proportion of '{BIAS_INJECTION_GROUP}' samples"); ax2.set_ylabel('Max Accuracy Gap')
    ax2.set_title('Fairness vs. Bias'); ax2.legend()
    plt.savefig(os.path.join(RESULTS_DIR, 'harvardgf_micro_bias_sensitivity_curve.png'))
    plt.show()
else:
    print("Baseline model not available.")


---
## 단계 4: Over-Correction Damage Index (ODI, 과보정 피해 지수) 분석

Reweighing 기법 적용의 효율성을 평가하기 위해 ODI를 계산합니다.


In [None]:
# === 4.1 ODI 계산 ===

if 'baseline_results' in locals():
    # --- 1. Reweighing 가중치 계산 ---
    train_df = pd.read_csv(LABEL_FILE).iloc[image_datasets['train'].indices]
    group_counts = train_df[SENSITIVE_ATTR].value_counts()
    group_weights = {group: len(train_df) / (len(group_counts) * count) for group, count in group_counts.items()}
    sample_weights = train_df[SENSITIVE_ATTR].apply(lambda g: group_weights[g]).to_numpy()
    sampler = WeightedRandomSampler(torch.from_numpy(sample_weights).double(), len(sample_weights))
    reweighed_loader = DataLoader(image_datasets['train'], batch_size=BATCH_SIZE, sampler=sampler, num_workers=NUM_WORKERS)
    reweighed_dataloaders = {'train': reweighed_loader, 'val': dataloaders['val']}

    # --- 2. 모델 재학습 ---
    reweighed_model = get_model(num_classes=len(label_maps[TARGET_ATTR]), pretrained=True)
    optimizer = optim.SGD(reweighed_model.fc.parameters(), lr=LEARNING_RATE, momentum=0.9)
    reweighed_model, _ = train_model(reweighed_model, nn.CrossEntropyLoss(), optimizer, reweighed_dataloaders, num_epochs=NUM_EPOCHS)

    # --- 3. ODI 계산 ---
    reweighed_results = evaluate_fairness(reweighed_model, dataloaders['val'], nn.CrossEntropyLoss())
    accuracy_drop = baseline_results['overall_acc'] - reweighed_results['overall_acc']
    fairness_gain = baseline_results['accuracy_gap'] - reweighed_results['accuracy_gap']
    odi = accuracy_drop / fairness_gain if fairness_gain > 1e-6 else float('inf')
    
    print("\n--- Over-Correction Damage Index (ODI) ---")
    print(f"Accuracy Drop: {accuracy_drop:.4f}")
    print(f"Fairness Gain (Gap Reduction): {fairness_gain:.4f}")
    print(f"ODI (Accuracy Drop / Fairness Gain): {odi:.4f}")
else:
    print("Baseline model results not available.")


---
## 단계 5: Hidden Subgroup Discovery (잠복 하위집단 탐지)

단일 속성(`race`) 분석에서는 드러나지 않는, 여러 속성이 교차하는 특정 하위집단(예: '특정 인종의 특정 성별')에서의 잠재적 편향을 탐지합니다.

**참고**: 이 분석을 위해서는 라벨 파일에 `race` 외에 `gender`와 같은 추가적인 민감 속성 컬럼이 필요합니다. 아래 코드는 해당 컬럼이 존재한다고 가정하고 작성된 예시입니다.


In [None]:
# === 5.1 잠복 하위집단 분석 ===
# TODO: 이 셀을 실행하려면 Harvard-GF 라벨 파일에 'gender'와 같은 추가 속성 컬럼이 필요합니다.

ADDITIONAL_ATTR = 'gender' # 분석에 사용할 추가 속성

def get_predictions_for_subgroup_analysis(model, dataloader):
    # 이 함수는 데이터셋 클래스에 추가 속성 처리가 구현되어 있다고 가정합니다.
    # (현재 HarvardGFDataset에는 'gender' 처리가 미구현)
    # 아래는 개념 증명을 위한 Pseudo-code에 가깝습니다.
    pass # 실제 구현 필요

if 'baseline_model' in locals() and ADDITIONAL_ATTR in pd.read_csv(LABEL_FILE).columns:
    # predictions_df = get_predictions_for_subgroup_analysis(baseline_model, dataloaders['val'])
    # subgroup_analysis = predictions_df.groupby([SENSITIVE_ATTR, ADDITIONAL_ATTR]).agg(...)
    # ... 시각화 코드 ...
    print("Hidden subgroup analysis code would run here.")
    print("Please implement the data handling for the additional attribute first.")
else:
    print(f"'{ADDITIONAL_ATTR}' column not found in the label file or baseline model is not ready.")
    print("Skipping hidden subgroup discovery.")
