# 联邦学习简单实现 —— FedAvg 算法

本 Notebook 实现了一个简单的联邦学习示例。我们将使用 MNIST 数据集，将训练数据分成多个客户端，每个客户端在本地训练一个简单的神经网络（MLP），然后采用 FedAvg 算法对各客户端的模型参数进行平均聚合，更新全局模型。整个过程分为数据加载与划分、模型定义、局部训练、模型聚合和全局评估等步骤。

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
import torchvision.transforms as transforms
import torchvision.datasets as datasets

# 超参数设置
num_clients = 5        # 客户端数量
local_epochs = 1       # 每个客户端本地训练的轮数
batch_size = 32        # 批次大小
num_rounds = 5         # 联邦学习的通信轮数
device = "cuda:0" if torch.cuda.is_available() else "cpu"
n_classes = 10



## 数据加载与客户端数据划分
我们使用 MNIST 数据集，并将训练数据均匀划分给多个客户端。每个客户端拥有自己独立的数据子集。

In [None]:
# 数据预处理：将图片转换为 tensor 并归一化
transform = transforms.Compose([
    transforms.ToTensor(), 
    transforms.Normalize((0.1307,), (0.3081,))
])

# 下载 MNIST 数据集
train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset  = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

# 将训练集均分给各个客户端（这里简单采用均分的方式）
#client_data_sizes = [len(train_dataset) // num_clients] * num_clients
#client_datasets = random_split(train_dataset, client_data_sizes)
#client_loaders = [DataLoader(dataset, batch_size=batch_size, shuffle=True) for dataset in client_datasets]


## 使用 Dirichlet 分布进行非 IID 数据划分并可视化分布情况

下面的代码实现了一个 `dirichlet_partition` 函数，该函数：

1. 固定随机种子（同时设置了 numpy、random、torch 的 seed），保证每次运行结果一致。
2. 对数据集按照类别进行划分：对于每个类别，从 Dirichlet 分布中采样各客户端分配比例，然后将该类别样本按照比例分配到各个客户端。
3. 划分结果生成后，我们统计各客户端中各类别的样本数，并绘制堆叠柱状图展示数据分布情况。

你可以调整 `alpha` 参数来控制各客户端数据分布的“非 IID 程度”（alpha 越小，各客户端类别分布越不均衡）。

In [None]:
import numpy as np
import random
import torch
from torch.utils.data import Subset
import matplotlib.pyplot as plt

def dirichlet_partition(dataset, n_clients, alpha, seed=42):
    """
    使用 Dirichlet 分布将数据集划分给 n_clients 个客户端，保证每次运行结果相同。
    
    参数:
        dataset: torch 数据集，要求 dataset.targets 为 tensor 类型的标签
        n_clients: 客户端数量
        alpha: Dirichlet 分布的参数，控制数据非 IID 程度，alpha 越小分布越极端
        seed: 随机种子，保证结果可重复
        
    返回:
        client_subsets: 长度为 n_clients 的 Subset 列表，每个 Subset 表示分配给对应客户端的数据
    """
    # 固定随机种子，保证结果可复现
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    
    # 获取所有标签（假设 dataset.targets 为 tensor 类型）
    targets = dataset.targets.numpy()
    n_classes = len(np.unique(targets))
    
    # 存储每个客户端的样本索引
    client_indices = {i: [] for i in range(n_clients)}
    
    # 针对每个类别进行划分
    for cls in range(n_classes):
        cls_idx = np.where(targets == cls)[0]
        # 打乱该类别样本顺序
        np.random.shuffle(cls_idx)
        # 从 Dirichlet 分布中采样各客户端分配比例
        proportions = np.random.dirichlet(alpha * np.ones(n_clients))
        # 根据比例计算每个客户端该类别样本数量的分割点
        proportions = (np.cumsum(proportions) * len(cls_idx)).astype(int)[:-1]
        cls_idx_split = np.split(cls_idx, proportions)
        for i, indices in enumerate(cls_idx_split):
            client_indices[i].extend(indices.tolist())
    
    # 根据索引生成 Subset
    client_subsets = [Subset(dataset, client_indices[i]) for i in range(n_clients)]
    return client_subsets

# 使用示例
alpha = 0.1    # Dirichlet 参数，可根据需要调整
n_clients = 5  # 客户端数量
seed = 42      # 固定随机种子

# 假设 train_dataset 已经定义并下载了 MNIST 数据集
client_datasets = dirichlet_partition(train_dataset, n_clients, alpha, seed)
client_loaders = [DataLoader(dataset, batch_size=batch_size, shuffle=True) for dataset in client_datasets]

# %% [markdown]
# ## 可视化各客户端的标签分布
#
# 下面统计每个客户端中各类别（0～9）的样本数量，并绘制堆叠柱状图展示分布情况。

# %%
n_classes = 10  # MNIST 有 10 个类别
client_class_counts = []

for client_subset in client_datasets:
    # 获取当前客户端的样本索引及对应标签
    indices = client_subset.indices
    labels = train_dataset.targets[indices].numpy()
    counts = [np.sum(labels == i) for i in range(n_classes)]
    client_class_counts.append(counts)

client_class_counts = np.array(client_class_counts)  # shape: (n_clients, n_classes)

plt.figure(figsize=(12, 8))
client_ids = np.arange(n_clients)
bottom = np.zeros(n_clients)
colors = plt.cm.tab10.colors  # 10种颜色

for cls in range(n_classes):
    plt.bar(client_ids, client_class_counts[:, cls], bottom=bottom, 
            color=colors[cls], label=f"Class {cls}")
    bottom += client_class_counts[:, cls]

plt.xlabel("Client ID")
plt.ylabel("Number of samples")
plt.title(f"Label Distribution per Client (Dirichlet Partition, alpha={alpha})")
plt.xticks(client_ids, [f"Client {i}" for i in client_ids])
plt.legend(title="Class")
plt.show()


## 定义模型
这里我们定义一个简单的全连接神经网络（MLP），用于 MNIST 手写数字分类。模型由一个隐藏层构成。

In [None]:
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(28*28, 128)
        self.fc2 = nn.Linear(128, 10)
    
    def forward(self, x):
        # 将 28x28 的图像展平为 784 维向量
        x = x.view(-1, 28*28)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


## 局部训练函数
定义每个客户端的本地训练过程。每个客户端在自己的数据上训练若干个 epoch，并返回训练后的模型参数。

In [None]:
def local_train(model, dataloader, epochs, device):
    model.train()
    optimizer = optim.SGD(model.parameters(), lr=0.01)
    criterion = nn.CrossEntropyLoss()
    for epoch in range(epochs):
        for data, target in dataloader:
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
    # 返回训练后的模型参数
    return model.state_dict()


## FedAvg 聚合函数（数据量加权）

在实际应用中，各客户端的数据量可能不一致，因此在聚合时需要根据每个客户端的数据量进行加权：
$$
\theta_{global} = \sum_i \frac{n_i}{N_{total}} \theta_i
$$
其中$ n_i $是客户端$ i $的数据量，$ N_{total} $是所有客户端数据量之和。


In [None]:
def fed_avg(global_model, client_state_dicts, client_data_counts):
    global_state_dict = global_model.state_dict()
    # 初始化全局模型参数为零
    for key in global_state_dict.keys():
        global_state_dict[key] = torch.zeros_like(global_state_dict[key])
    total_samples = sum(client_data_counts)
    # 对每个客户端的参数按数据量权重累加
    for client_state, n_samples in zip(client_state_dicts, client_data_counts):
        weight = n_samples / total_samples
        for key in global_state_dict.keys():
            global_state_dict[key] += client_state[key] * weight
    # 更新全局模型
    global_model.load_state_dict(global_state_dict)
    return global_model


## 联邦学习主流程及每轮局部模型测试
在每一轮全局通信中，我们让每个客户端先在自己的本地训练后：
- 使用其本地模型在整个测试集上进行评估（计算总体准确率和各类别准确率）。
- 最后，将所有客户端的模型按照各自数据量的权重使用 FedAvg 进行聚合，形成更新后的全局模型。

这样可以帮助我们观察每个客户端在局部训练后的性能，同时也能追踪全局模型在聚合后的表现。

In [None]:
def evaluate_model(model, device, test_loader, n_classes=10):
    """
    在测试集上评估给定模型的表现，返回总体准确率及各类别准确率。
    """
    model.eval()
    correct = 0
    total = 0
    class_correct = [0 for _ in range(n_classes)]
    class_total = [0 for _ in range(n_classes)]
    
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            _, predicted = torch.max(output, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
            for i in range(target.size(0)):
                label = target[i].item()
                class_total[label] += 1
                if predicted[i].item() == label:
                    class_correct[label] += 1
                    
    overall_accuracy = 100.0 * correct / total
    class_accuracies = [100.0 * c / t if t > 0 else 0.0 for c, t in zip(class_correct, class_total)]
    return overall_accuracy, class_accuracies

# 定义测试数据加载器
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

## 联邦学习主流程（使用数据量加权的 FedAvg）
这里首先计算每个客户端的数据量，然后在每一轮通信中：

- 将全局模型下发到各客户端进行本地训练；
- 收集各客户端更新后的模型参数；
- 使用上述加权 FedAvg 方法聚合各客户端模型参数，更新全局模型。

In [None]:
# 计算每个客户端数据量
client_data_counts = [len(dataset) for dataset in client_datasets]
print("每个客户端数据量:", client_data_counts)

global_model = SimpleNN().to(device)


for r in range(num_rounds):
    print(f"==== 第 {r+1} 轮通信 ====")
    client_state_dicts = []
    local_models = []
    
    # 遍历每个客户端，进行本地训练和测试
    for c_id, client_loader in enumerate(client_loaders):
        local_model = SimpleNN().to(device)
        # 同步全局模型参数到客户端
        local_model.load_state_dict(global_model.state_dict())
        # 客户端本地训练
        local_state = local_train(local_model, client_loader, local_epochs, device)
        client_state_dicts.append(local_state)
        local_models.append(local_model)
        
        # 测试当前客户端本地模型在整个测试集上的表现
        overall_acc, class_acc = evaluate_model(local_model, device, test_loader, n_classes)
        print(f"Client {c_id} local model test accuracy: {overall_acc:.2f}%")
        for i in range(n_classes):
            print(f"  Class {i} accuracy: {class_acc[i]:.2f}%")
    
    # 使用数据量加权的 FedAvg 聚合各客户端模型
    global_model = fed_avg(global_model, client_state_dicts, client_data_counts)
    
    # 选择性：测试聚合后的全局模型
    global_acc, global_class_acc = evaluate_model(global_model, device, test_loader, n_classes)
    print(f"Global model after round {r+1} test accuracy: {global_acc:.2f}%")
    for i in range(n_classes):
        print(f"  Global model Class {i} accuracy: {global_class_acc[i]:.2f}%")
    print("\n")
