
# IFT3395 Competition 2 - 稳健宽网络 MLP 解决方案

## 项目概述

本项目旨在使用**仅 NumPy 和 Python 标准库**实现一个多层感知机（MLP）分类器，用于糖尿病视网膜病变严重程度的图像分类任务。目标是在 Kaggle 公共排行榜上超越 baseline（0.455）。

## 核心策略

经过多次实验和优化，最终采用以下稳健策略：

1. **更宽的2层网络架构**：避免深层网络的梯度消失/爆炸问题，使用 512-896 维的隐藏层
2. **平衡的正则化**：适中的 dropout (0.1-0.2) 和 L2 正则化 (5e-5 到 9e-5)
3. **He 初始化**：使用 `sqrt(2.0 / input_dim)` 初始化权重，确保更好的梯度流
4. **梯度裁剪**：限制梯度范数 ≤ 5.0，防止训练不稳定
5. **系统化超参搜索**：测试 5 种不同配置，选择验证集表现最佳的模型
6. **数据增强 + TTA**：训练时使用翻转、噪声、亮度调整；测试时使用多种增强模式集成预测
7. **模型集成**：集成 top-3 模型进行软投票，提升泛化能力


## 1. 数据加载与探索

首先加载训练集和测试集数据，了解数据的基本结构。


In [None]:

# ============================================================================
# 导入必要的库
# ============================================================================
# 仅使用 NumPy 和 Python 标准库，符合作业要求（不使用 scikit-learn 等 ML 库）
import csv
import math
import pickle
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import numpy as np

# 数据目录和随机种子设置
DATA_DIR = Path('data')
SEED = 4040  # 固定随机种子以确保结果可复现
rng = np.random.default_rng(SEED)
np.set_printoptions(precision=4, suppress=True)  # 设置 NumPy 输出格式


## 2. 类别不平衡分析与权重计算

数据集存在明显的类别不平衡问题（类别 0 有 486 个样本，而类别 4 只有 66 个）。为了缓解这个问题，我们计算类别权重，在损失函数中给少数类更高的权重。


In [None]:

# ============================================================================
# 数据加载函数
# ============================================================================
def load_split(split: str) -> Dict[str, np.ndarray]:
    """
    加载 pickle 格式的数据文件
    
    Args:
        split: 'train' 或 'test'
    Returns:
        包含 'images' 键的字典（测试集）或包含 'images' 和 'labels' 的字典（训练集）
    """
    path = DATA_DIR / f"{split}_data.pkl"
    with path.open('rb') as f:
        return pickle.load(f)

# 加载训练集和测试集
train_data = load_split('train')
test_data = load_split('test')

# 提取图像和标签，转换为合适的类型
images = train_data['images'].astype(np.float32)  # 训练图像：(1080, 28, 28, 3)
labels = train_data['labels'].reshape(-1).astype(int)  # 训练标签：(1080,)
test_images = test_data['images'].astype(np.float32)  # 测试图像：(400, 28, 28, 3)
num_classes = len(np.unique(labels))  # 类别数量：5（严重程度 0-4）
print(f"Train: {images.shape}, Test: {test_images.shape}, Classes: {num_classes}")


Train: (1080, 28, 28, 3), Test: (400, 28, 28, 3), Classes: 5


## 3. 数据预处理

将图像展平为一维向量，并进行标准化处理。标准化有助于加速训练收敛并提高数值稳定性。


In [None]:

# ============================================================================
# 类别权重计算
# ============================================================================
# 统计每个类别的样本数量
unique, counts = np.unique(labels, return_counts=True)

# 计算基础权重：总样本数 / 各类别样本数（逆频率）
class_weights_base = counts.sum() / (counts.astype(np.float32) + 1e-6)

# 使用指数缩放（1.2 次方）增强少数类的权重，但不过度激进
# 这比直接使用逆频率更平滑，避免过度关注少数类导致过拟合
class_weights = class_weights_base ** 1.2

# 归一化权重，使其均值为 1，保持数值稳定性
class_weights = class_weights / class_weights.mean()

print('Class counts:', dict(zip(unique, counts)))
print('Class weights:', class_weights)
# 输出示例：类别 4（最少）的权重约为 2.4，类别 0（最多）的权重约为 0.22


Class counts: {np.int64(0): np.int64(486), np.int64(1): np.int64(128), np.int64(2): np.int64(206), np.int64(3): np.int64(194), np.int64(4): np.int64(66)}
Class weights: [0.2198 1.0898 0.6157 0.6617 2.413 ]


## 4. 数据增强与测试时增强（TTA）

为了提高模型的泛化能力，我们实现了两种增强策略：

1. **训练时数据增强**：随机水平/垂直翻转、添加高斯噪声、调整亮度
2. **测试时增强（TTA）**：对测试图像应用多种变换，然后对预测结果进行平均


In [None]:

# ============================================================================
# 数据预处理函数
# ============================================================================
def flatten_and_normalize(imgs: np.ndarray) -> np.ndarray:
    """
    将图像展平为一维向量并归一化到 [0, 1]
    
    Args:
        imgs: 形状为 (N, 28, 28, 3) 的图像数组
    Returns:
        形状为 (N, 2352) 的展平数组，值域 [0, 1]
    """
    flat = imgs.reshape(len(imgs), -1).astype(np.float32)
    return flat / 255.0


class StandardScaler:
    """
    标准缩放器：将数据标准化为均值 0、标准差 1
    这有助于神经网络训练，因为梯度更新更稳定
    """
    def __init__(self):
        self.mean_: Optional[np.ndarray] = None
        self.std_: Optional[np.ndarray] = None

    def fit(self, x: np.ndarray) -> 'StandardScaler':
        """在训练集上拟合，计算均值和标准差"""
        self.mean_ = x.mean(axis=0, keepdims=True)
        self.std_ = x.std(axis=0, keepdims=True) + 1e-5  # 添加小值防止除零
        return self

    def transform(self, x: np.ndarray) -> np.ndarray:
        """应用标准化变换"""
        if self.mean_ is None or self.std_ is None:
            raise ValueError('Scaler not fitted')
        return (x - self.mean_) / self.std_


# 预处理训练和测试数据
x_norm = flatten_and_normalize(images)  # 归一化到 [0, 1]：(1080, 2352)
x_test_norm = flatten_and_normalize(test_images)  # (400, 2352)

# 在训练集上拟合标准化器，然后应用到训练集和测试集
scaler = StandardScaler().fit(x_norm)
x_std = scaler.transform(x_norm)  # 标准化后的训练数据
x_test_std = scaler.transform(x_test_norm)  # 标准化后的测试数据

# 保存标准化参数，用于后续的数据增强
SCALE_MEAN = scaler.mean_
SCALE_STD = scaler.std_


def standardize_raw(raw_batch: np.ndarray) -> np.ndarray:
    """
    将归一化后的原始数据（[0, 1]）转换为标准化数据（均值 0，标准差 1）
    用于数据增强后的标准化
    """
    return (raw_batch - SCALE_MEAN) / SCALE_STD


def train_val_split_indices(n: int, val_ratio: float, seed: int) -> Tuple[np.ndarray, np.ndarray]:
    """
    将数据集划分为训练集和验证集
    
    Args:
        n: 总样本数
        val_ratio: 验证集比例（如 0.2 表示 20%）
        seed: 随机种子
    Returns:
        (train_indices, val_indices) 索引数组
    """
    rng = np.random.default_rng(seed)
    idx = rng.permutation(n)
    val_size = int(n * val_ratio)
    return idx[val_size:], idx[:val_size]


## 5. 模型实现：RobustMLP

实现一个稳健的 2 层 MLP，包含以下关键特性：

- **He 初始化**：确保梯度流稳定
- **ReLU 激活**：隐藏层使用 ReLU
- **Dropout 正则化**：防止过拟合
- **梯度裁剪**：防止梯度爆炸
- **加权损失函数**：处理类别不平衡
- **余弦学习率调度**：平滑的学习率衰减
- **早停机制**：自动选择最佳模型


In [None]:

# ============================================================================
# 数据增强函数
# ============================================================================
def augment_batch(raw_batch: np.ndarray, p_flip: float = 0.5, noise_std: float = 0.02, brightness: float = 0.1) -> np.ndarray:
    """
    训练时数据增强：随机应用多种变换
    
    Args:
        raw_batch: 归一化后的图像批次 (N, 2352)
        p_flip: 翻转概率
        noise_std: 高斯噪声标准差
        brightness: 亮度调整范围
    Returns:
        增强后的图像批次 (N, 2352)
    """
    # 重塑为图像格式
    imgs = raw_batch.reshape(len(raw_batch), 28, 28, 3).copy()
    
    # 随机水平翻转（50% 概率）
    if np.random.rand() < p_flip:
        imgs = imgs[:, :, ::-1, :]
    
    # 随机垂直翻转（25% 概率）
    if np.random.rand() < p_flip * 0.5:
        imgs = imgs[:, ::-1, :, :]
    
    # 添加高斯噪声（模拟传感器噪声）
    imgs += np.random.normal(0.0, noise_std, size=imgs.shape)
    
    # 随机亮度调整（模拟光照变化）
    imgs += (np.random.rand() - 0.5) * brightness
    
    # 裁剪到 [0, 1] 范围
    imgs = np.clip(imgs, 0.0, 1.0)
    
    return imgs.reshape(len(raw_batch), -1)


# 测试时增强（TTA）模式列表
TTA_MODES = ['identity', 'hflip', 'vflip', 'bright+', 'bright-']


def apply_tta(raw_batch: np.ndarray, mode: str) -> np.ndarray:
    """
    测试时增强：对测试图像应用特定变换
    
    Args:
        raw_batch: 归一化后的图像批次 (N, 2352)
        mode: 增强模式
    Returns:
        增强后的图像批次 (N, 2352)
    """
    imgs = raw_batch.reshape(len(raw_batch), 28, 28, 3)
    
    if mode == 'hflip':
        aug = imgs[:, :, ::-1, :]  # 水平翻转
    elif mode == 'vflip':
        aug = imgs[:, ::-1, :, :]  # 垂直翻转
    elif mode == 'bright+':
        aug = np.clip(imgs + 0.05, 0.0, 1.0)  # 亮度增加
    elif mode == 'bright-':
        aug = np.clip(imgs - 0.05, 0.0, 1.0)  # 亮度减少
    else:
        aug = imgs  # 原始图像
    
    return aug.reshape(len(raw_batch), -1)


In [None]:

class RobustMLP:
    """
    稳健的 2 层多层感知机（MLP）
    
    架构：输入层 → 隐藏层（ReLU） → 输出层（Softmax）
    特性：He 初始化、Dropout、梯度裁剪、加权损失、余弦学习率调度、早停
    """
    def __init__(self, input_dim: int, num_classes: int, hidden_dim: int, lr: float, l2: float, dropout: float, seed: int = 0, clip_grad: float = 5.0):
        """
        初始化模型
        
        Args:
            input_dim: 输入维度（2352）
            num_classes: 类别数（5）
            hidden_dim: 隐藏层维度（512-896）
            lr: 初始学习率
            l2: L2 正则化系数
            dropout: Dropout 概率
            seed: 随机种子
            clip_grad: 梯度裁剪阈值
        """
        self.hidden_dim = hidden_dim
        self.lr = lr
        self.base_lr = lr  # 保存初始学习率用于调度
        self.l2 = l2
        self.dropout = dropout
        self.clip_grad = clip_grad
        
        # He 初始化：适合 ReLU 激活函数
        rng = np.random.default_rng(seed)
        scale1 = np.sqrt(2.0 / input_dim)  # 第一层初始化尺度
        scale2 = np.sqrt(2.0 / hidden_dim)  # 第二层初始化尺度
        
        # 初始化权重和偏置
        self.w1 = rng.normal(0.0, scale1, size=(input_dim, hidden_dim))
        self.b1 = np.zeros(hidden_dim, dtype=np.float32)
        self.w2 = rng.normal(0.0, scale2, size=(hidden_dim, num_classes))
        self.b2 = np.zeros(num_classes, dtype=np.float32)

    def _softmax(self, logits: np.ndarray) -> np.ndarray:
        logits = logits - logits.max(axis=1, keepdims=True)
        exp = np.exp(logits)
        return exp / exp.sum(axis=1, keepdims=True)

    def _clip_gradients(self, grad_w1: np.ndarray, grad_b1: np.ndarray, grad_w2: np.ndarray, grad_b2: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        norm = np.sqrt(np.sum(grad_w1 * grad_w1) + np.sum(grad_b1 * grad_b1) + np.sum(grad_w2 * grad_w2) + np.sum(grad_b2 * grad_b2))
        if norm > self.clip_grad:
            scale = self.clip_grad / norm
            grad_w1 = grad_w1 * scale
            grad_b1 = grad_b1 * scale
            grad_w2 = grad_w2 * scale
            grad_b2 = grad_b2 * scale
        return grad_w1, grad_b1, grad_w2, grad_b2

    def _forward(self, x: np.ndarray, train: bool = False) -> Tuple[np.ndarray, Tuple[np.ndarray, np.ndarray, np.ndarray]]:
        if train and self.dropout > 0.0:
            mask = (np.random.random(size=x.shape) >= self.dropout).astype(np.float32)
            x = x * mask
        z1 = x @ self.w1 + self.b1
        h1 = np.maximum(0.0, z1)
        if train and self.dropout > 0.0:
            mask_h = (np.random.random(size=h1.shape) >= self.dropout).astype(np.float32)
            h1 = h1 * mask_h
        logits = h1 @ self.w2 + self.b2
        return logits, (x, z1, h1)

    def _loss_and_grads(self, x: np.ndarray, y: np.ndarray, class_weights: np.ndarray) -> Tuple[float, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        logits, cache = self._forward(x, train=True)
        probs = self._softmax(logits)
        batch = x.shape[0]
        sample_w = class_weights[y]
        loss = -np.sum(sample_w * np.log(probs[np.arange(batch), y] + 1e-8)) / sample_w.sum()
        loss += 0.5 * self.l2 * (np.sum(self.w1 * self.w1) + np.sum(self.w2 * self.w2))
        grad_logits = probs
        grad_logits[np.arange(batch), y] -= 1.0
        grad_logits *= (sample_w[:, None] / sample_w.sum())
        x_cache, z1, h1 = cache
        grad_w2 = h1.T @ grad_logits + self.l2 * self.w2
        grad_b2 = grad_logits.sum(axis=0)
        grad_hidden = grad_logits @ self.w2.T
        grad_hidden[z1 <= 0.0] = 0.0
        grad_w1 = x_cache.T @ grad_hidden + self.l2 * self.w1
        grad_b1 = grad_hidden.sum(axis=0)
        grad_w1, grad_b1, grad_w2, grad_b2 = self._clip_gradients(grad_w1, grad_b1, grad_w2, grad_b2)
        return loss, grad_w1, grad_b1, grad_w2, grad_b2

    def fit(self, x: np.ndarray, y: np.ndarray, *, class_weights: np.ndarray, epochs: int, batch_size: int, val_data: Optional[Tuple[np.ndarray, np.ndarray]] = None, raw_data: Optional[np.ndarray] = None, augment: bool = False, patience: int = 40) -> Dict[str, List[float]]:
        history = {'train_acc': [], 'val_acc': []}
        best_state = self.get_state()
        best_val = -np.inf
        wait = 0
        num_samples = x.shape[0]
        for epoch in range(epochs):
            lr_scale = 0.5 * (1 + math.cos(math.pi * epoch / epochs))
            self.lr = self.base_lr * lr_scale
            indices = np.random.permutation(num_samples)
            for start in range(0, num_samples, batch_size):
                end = start + batch_size
                idx = indices[start:end]
                xb = x[idx]
                if augment and raw_data is not None:
                    raw_aug = augment_batch(raw_data[idx])
                    xb = standardize_raw(raw_aug)
                yb = y[idx]
                _, grad_w1, grad_b1, grad_w2, grad_b2 = self._loss_and_grads(xb, yb, class_weights)
                self.w1 -= self.lr * grad_w1
                self.b1 -= self.lr * grad_b1
                self.w2 -= self.lr * grad_w2
                self.b2 -= self.lr * grad_b2
            train_acc = self.accuracy(x, y)
            history['train_acc'].append(train_acc)
            if val_data is not None:
                val_acc = self.accuracy(*val_data)
                history['val_acc'].append(val_acc)
                if val_acc > best_val + 1e-4:
                    best_val = val_acc
                    best_state = self.get_state()
                    wait = 0
                else:
                    wait += 1
                    if wait >= patience:
                        break
        if val_data is not None:
            self.load_state(best_state)
        return history

    def predict_proba(self, x: np.ndarray) -> np.ndarray:
        logits, _ = self._forward(x, train=False)
        return self._softmax(logits)

    def predict(self, x: np.ndarray) -> np.ndarray:
        return np.argmax(self.predict_proba(x), axis=1)

    def accuracy(self, x: np.ndarray, y: np.ndarray) -> float:
        return float((self.predict(x) == y).mean())

    def get_state(self):
        return {
            'w1': self.w1.copy(),
            'b1': self.b1.copy(),
            'w2': self.w2.copy(),
            'b2': self.b2.copy(),
        }

    def load_state(self, state):
        self.w1 = state['w1'].copy()
        self.b1 = state['b1'].copy()
        self.w2 = state['w2'].copy()
        self.b2 = state['b2'].copy()


## 6. 数据集划分

将训练集划分为训练集（80%）和验证集（20%），用于模型评估和超参数选择。


## 7. 超参数搜索

测试 5 种不同的超参数配置，选择验证集表现最佳的配置。每个配置都使用数据增强和早停机制。


In [None]:

train_idx, val_idx = train_val_split_indices(len(x_std), val_ratio=0.2, seed=SEED)
x_train_std = x_std[train_idx]
x_val_std = x_std[val_idx]
x_train_raw = x_norm[train_idx]
x_val_raw = x_norm[val_idx]
y_train = labels[train_idx]
y_val = labels[val_idx]
print(f"Train: {x_train_std.shape}, Val: {x_val_std.shape}")


Train: (864, 2352), Val: (216, 2352)


## 8. 模型集成

选择验证集表现最好的 top-3 配置，在全部训练数据上重新训练，用于最终预测。集成多个模型可以提高预测的稳定性和准确性。


In [None]:

search_space = [
    {'hidden_dim': 512, 'lr': 0.05, 'l2': 6e-5, 'dropout': 0.15, 'epochs': 350, 'batch_size': 112, 'patience': 50},
    {'hidden_dim': 640, 'lr': 0.055, 'l2': 8e-5, 'dropout': 0.18, 'epochs': 380, 'batch_size': 96, 'patience': 55},
    {'hidden_dim': 768, 'lr': 0.048, 'l2': 7e-5, 'dropout': 0.12, 'epochs': 400, 'batch_size': 128, 'patience': 60},
    {'hidden_dim': 896, 'lr': 0.052, 'l2': 9e-5, 'dropout': 0.2, 'epochs': 360, 'batch_size': 104, 'patience': 50},
    {'hidden_dim': 640, 'lr': 0.045, 'l2': 5e-5, 'dropout': 0.1, 'epochs': 420, 'batch_size': 120, 'patience': 65},
]

results = []
for i, params in enumerate(search_space, 1):
    print(f"Config {i}/{len(search_space)}: hidden_dim={params['hidden_dim']}, lr={params['lr']:.3f}, l2={params['l2']:.2e}, dropout={params['dropout']:.2f}")
    model = RobustMLP(
        input_dim=x_train_std.shape[1],
        num_classes=num_classes,
        hidden_dim=params['hidden_dim'],
        lr=params['lr'],
        l2=params['l2'],
        dropout=params['dropout'],
        seed=SEED + i
    )
    history = model.fit(
        x_train_std,
        y_train,
        class_weights=class_weights,
        epochs=params['epochs'],
        batch_size=params['batch_size'],
        val_data=(x_val_std, y_val),
        raw_data=x_train_raw,
        augment=True,
        patience=params['patience']
    )
    train_acc = history['train_acc'][-1]
    val_acc = history['val_acc'][-1]
    results.append({**params, 'train_acc': train_acc, 'val_acc': val_acc})
    print(f"  train acc={train_acc:.4f}, val acc={val_acc:.4f}\n")

best = max(results, key=lambda r: r['val_acc'])
print('Best config:', {k: v for k, v in best.items() if k not in {'train_acc', 'val_acc'}})


Config 1/5: hidden_dim=512, lr=0.050, l2=6.00e-05, dropout=0.15
  train acc=0.5914, val acc=0.5000

Config 2/5: hidden_dim=640, lr=0.055, l2=8.00e-05, dropout=0.18
  train acc=0.5856, val acc=0.4769

Config 3/5: hidden_dim=768, lr=0.048, l2=7.00e-05, dropout=0.12
  train acc=0.6273, val acc=0.5046

Config 4/5: hidden_dim=896, lr=0.052, l2=9.00e-05, dropout=0.20
  train acc=0.5289, val acc=0.4444

Config 5/5: hidden_dim=640, lr=0.045, l2=5.00e-05, dropout=0.10
  train acc=0.5995, val acc=0.4676

Best config: {'hidden_dim': 768, 'lr': 0.048, 'l2': 7e-05, 'dropout': 0.12, 'epochs': 400, 'batch_size': 128, 'patience': 60}


In [None]:

TOP_K = 3
sorted_configs = sorted(results, key=lambda r: r['val_acc'], reverse=True)[:TOP_K]
models: List[RobustMLP] = []
for idx, cfg in enumerate(sorted_configs):
    cfg_full = cfg.copy()
    cfg_full['epochs'] = cfg['epochs'] + 50
    cfg_full['patience'] = cfg['patience'] + 15
    print(f"Retraining top config {idx + 1}: hidden_dim={cfg_full['hidden_dim']}, lr={cfg_full['lr']:.3f}, l2={cfg_full['l2']:.2e}, dropout={cfg_full['dropout']:.2f}")
    model = RobustMLP(
        input_dim=x_std.shape[1],
        num_classes=num_classes,
        hidden_dim=cfg_full['hidden_dim'],
        lr=cfg_full['lr'],
        l2=cfg_full['l2'],
        dropout=cfg_full['dropout'],
        seed=SEED * 5 + idx
    )
    train_idx_full, val_idx_full = train_val_split_indices(len(x_std), val_ratio=0.1, seed=SEED * 6 + idx)
    model.fit(
        x_std[train_idx_full],
        labels[train_idx_full],
        class_weights=class_weights,
        epochs=cfg_full['epochs'],
        batch_size=cfg_full['batch_size'],
        val_data=(x_std[val_idx_full], labels[val_idx_full]),
        raw_data=x_norm[train_idx_full],
        augment=True,
        patience=cfg_full['patience']
    )
    models.append(model)
print(f"Total models trained: {len(models)}")


Retraining top config 1: hidden_dim=768, lr=0.048, l2=7.00e-05, dropout=0.12
Retraining top config 2: hidden_dim=512, lr=0.050, l2=6.00e-05, dropout=0.15
Retraining top config 3: hidden_dim=640, lr=0.055, l2=8.00e-05, dropout=0.18
Total models trained: 3


## 9. 测试集预测与提交文件生成

使用集成模型和测试时增强（TTA）对测试集进行预测：

1. 对每个测试图像应用 5 种 TTA 模式
2. 每个模型对每种模式预测概率
3. 对所有预测概率求平均
4. 选择概率最大的类别作为最终预测
5. 生成符合 Kaggle 格式的提交文件


In [19]:

probs_accum = np.zeros((len(x_test_std), num_classes), dtype=np.float32)
for model in models:
    for mode in TTA_MODES:
        raw_aug = apply_tta(x_test_norm, mode)
        std_aug = standardize_raw(raw_aug)
        probs_accum += model.predict_proba(std_aug)

probs_accum /= (len(models) * len(TTA_MODES))
test_preds = np.argmax(probs_accum, axis=1)

ids = [str(i) for i in range(1, len(test_preds) + 1)]
submission_path = Path('submission_robust_wide.csv')
with submission_path.open('w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['ID', 'Label'])
    for idx, label in zip(ids, test_preds):
        writer.writerow([idx, int(label)])

print(f'Submission saved to {submission_path.resolve()}')


Submission saved to C:\Users\yudim\Downloads\IFT3395_Competition2\submission_robust_wide.csv
