# 联合学习设置
对于使用TrainConfig的联合学习设置，我们需要不同的参与者：

* 工作者：自己的数据集。

* 协调员：知道工作人员以及每个工作人员中存在的数据集名称的实体。

* 评估器：保存测试数据并跟踪模型性能

每个工作进程由两部分表示，即调度程序本地的代理（Websocket客户端工作进程）和保存数据并执行计算的远程实例。远程部分称为Websocket服务器工作程序。

In [1]:
%load_ext autoreload
%autoreload 2

# inspect模块主要用来查看相关的代码。可以显示源代码。
import inspect

## 1 准备工作：启动WebSocket的worker

因此，首先，我们需要创建远程工作者。为此，您需要在终端中运行（无法从笔记本计算机上运行）：

python start_websocket_servers.py


这是怎么回事？
该脚本将实例化三个工作人员Alice，Bob和Charlie并准备他们的本地数据。每个工作人员都设置为拥有MNIST培训数据集的子集。爱丽丝持有与数字0-3对应的所有图像，鲍勃持有与数字4-6对应的所有图像，查理持有与数字7-9对应的所有图像。

工人	本地数据集中的数字	样品数
爱丽丝	0-3	24754
鲍勃	4-6	17181
查理	7-9	18065

| Worker      | Digits in local dataset | Number of samples |
| ----------- | ----------------------- | ----------------- |
| Alice       | 0-3                     | 24754             |
| Bob         | 4-6                     | 17181             |
| Charlie     | 7-9                     | 18065             |


该评估程序将称为“测试”，并保存整个MNIST测试数据集。

| Evaluator   | Digits in local dataset | Number of samples |
| ----------- | ----------------------- | ----------------- |
| Testing     | 0-9                     | 10000             |


In [13]:
import run_websocket_server
# 用来查看模块内部的代码。
print(inspect.getsource(run_websocket_server.start_websocket_server_worker))

def start_websocket_server_worker(id, host, port, hook, verbose, keep_labels=None, training=True):
    """Helper function for spinning up a websocket server and setting up the local datasets."""

    server = websocket_server.WebsocketServerWorker(
        id=id, host=host, port=port, hook=hook, verbose=verbose
    )

    # Setup toy data (mnist example)
    mnist_dataset = datasets.MNIST(
        root="../../官方教程/data",
        train=training,
        download=True,
        transform=transforms.Compose(
            [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
        ),
    )

    if training:
        indices = np.isin(mnist_dataset.targets, keep_labels).astype("uint8")
        logger.info("number of true indices: %s", indices.sum())
        selected_data = (
            torch.native_masked_select(mnist_dataset.data.transpose(0, 2), torch.tensor(indices))
            .view(28, 28, -1)
            .transpose(2, 0)
        )
        logger.info("after selection: %

在继续之前，我们首先需要导入依赖项，设置所需的参数并配置日志记录。

In [3]:
# 导入模块
import sys
# python中异步IO的实现方法。提供了websocket一种应用层全双工的异步、非阻塞通信方式，通过消息响应实现通信。
import asyncio

# syft模块主要封装实现了基于websocket的异步通信。
import syft as sy
from syft.workers.websocket_client import WebsocketClientWorker
from syft.frameworks.torch.fl import utils

# torch主要提供了机器学习的算法。
import torch
from torchvision import datasets, transforms
import numpy as np

# rwc提供了客户端运行的主要方法。
import run_websocket_client as rwc


In [4]:
# 将syft与torch建立联系
hook = sy.TorchHook(torch)

In [5]:
# 配置训练过程中的相关列参数。
# batch_size batch大小
# cuda 是否启用GPU
# federate_after_n_batches多少轮之后进行联邦平均
# lr学习率
# test_batch_size测试数据集
# training_round worker上训练的次数。
# verbose 概要？用来做什么的不清楚。

args = rwc.define_and_get_arguments(args=[])
use_cuda = args.cuda and torch.cuda.is_available()
torch.manual_seed(args.seed)
device = torch.device("cuda" if use_cuda else "cpu")
print(args)

Namespace(batch_size=32, cuda=False, federate_after_n_batches=10, lr=0.1, save_model=False, seed=1, test_batch_size=128, training_rounds=40, verbose=False)


In [14]:
# 配置一个日志模块。使用python原本的logging模块。
import logging

# 获得一个命名的记录器
logger = logging.getLogger("run_websocket_client")

if not len(logger.handlers):
    # print(123)
    FORMAT = "%(asctime)s - %(message)s"
    DATE_FMT = "%H:%M:%S"
    formatter = logging.Formatter(FORMAT, DATE_FMT)
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.propagate = False
LOG_LEVEL = logging.DEBUG
logger.setLevel(LOG_LEVEL)

现在，让我们实例化websocket客户端工作程序，即远程工作程序的本地代理。请注意，如果websocket服务器工作程序未在运行，则此步骤将失败。

工人Alice，Bob和Charlie将进行培训，然后由测试人员托管测试数据并进行评估。

In [7]:
# 在客户端定义服务端的句柄。通过websocketclientworker类，建立通信。每一个类维护一个通信链接。
# 将客户端websocket与启动worker服务端的websocket建立一对一链接。
# pysyft通过设置，将通信模块单独剥离出来。
kwargs_websocket = {"host": "127.0.0.1", "hook": hook, "verbose": args.verbose}
alice = WebsocketClientWorker(id="alice", port=8777, **kwargs_websocket)
bob = WebsocketClientWorker(id="bob", port=8778, **kwargs_websocket)
charlie = WebsocketClientWorker(id="charlie", port=8779, **kwargs_websocket)
testing = WebsocketClientWorker(id="testing", port=8780, **kwargs_websocket)

# 用来试下通信的句柄。
worker_instances = [alice, bob, charlie]

## 2 设置培训

让我们实例化机器学习模型。这是一个具有2个卷积层和2个完全连接层的小型神经网络。它使用ReLU激活和最大池化。

In [8]:
# 输出模型。
print(inspect.getsource(rwc.Net))

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, 5, 1)
        self.conv2 = nn.Conv2d(20, 50, 5, 1)
        self.fc1 = nn.Linear(4 * 4 * 50, 500)
        self.fc2 = nn.Linear(500, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4 * 4 * 50)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)



In [9]:
model = rwc.Net().to(device)
print(model)

Net(
  (conv1): Conv2d(1, 20, kernel_size=(5, 5), stride=(1, 1))
  (conv2): Conv2d(20, 50, kernel_size=(5, 5), stride=(1, 1))
  (fc1): Linear(in_features=800, out_features=500, bias=True)
  (fc2): Linear(in_features=500, out_features=10, bias=True)
)


### 使模型可序列化
为了将模型发送给工作人员，我们需要模型可序列化，为此我们使用jit。

In [10]:
# 将需要训练的模型进行序列化。
# jit提供了一种不依赖Python环境的执行方法。这样在发送到客户端之后，即是没有导入相关的包。也能运行模型，进行梯度下降。
traced_model = torch.jit.trace(model, torch.zeros([1, 1, 28, 28], dtype=torch.float))

## 3  让我们开始训练
现在我们准备开始联合培训。我们将分别对每个工人进行给定数量的批次培训，然后计算所得模型的联合平均值。

每隔10轮培训，我们将评估工人返回的模型以及通过联合平均获得的模型的性能。

性能将作为准确性（正确预测的比率）和预测数字的直方图给出。这很有趣，因为每个工人仅拥有数字的一个子集。因此，在开始时，每个工作人员将仅预测他们的人数，并且仅通过联合平均过程知道其他人数。

培训以异步方式完成。这意味着调度程序仅告诉工人进行培训，而不会阻止与下一个工人交谈之前等待培训的结果。

训练的参数在参数中给出。每个工作人员将按照给定数量的批次进行培训，该数量由federate_after_n_batches的值给出。还配置了培训批次大小和学习率。

In [11]:
print("Federate_after_n_batches: " + str(args.federate_after_n_batches))
print("Batch size: " + str(args.batch_size))
print("Initial learning rate: " + str(args.lr))

Federate_after_n_batches: 10
Batch size: 32
Initial learning rate: 0.1


In [12]:
learning_rate = args.lr
device = "cpu"  #torch.device("cpu")
traced_model = torch.jit.trace(model, torch.zeros([1, 1, 28, 28], dtype=torch.float))
for curr_round in range(1, args.training_rounds + 1):
    logger.info("Training round %s/%s", curr_round, args.training_rounds)

    # 异步调用多个客户端执行并行训练。await等待多个异步调用执行完成。
    # 这里包含了模型的发送过程和取回过程。
    results = await asyncio.gather(
        *[
            rwc.fit_model_on_worker(
                worker=worker,
                traced_model=traced_model,
                batch_size=args.batch_size,
                curr_round=curr_round,
                max_nr_batches=args.federate_after_n_batches,
                lr=learning_rate,
            )
            for worker in worker_instances
        ]
    )
    models = {}
    loss_values = {}
    
    # 每10轮进行一次test。使用test客户端检验当前结果的准确性。
    # 这里主要测试，每个客户端发过来的模型的准确率。
    test_models = curr_round % 10 == 1 or curr_round == args.training_rounds
    if test_models:
        logger.info("Evaluating models")
        np.set_printoptions(formatter={"float": "{: .0f}".format})
        for worker_id, worker_model, _ in results:
            rwc.evaluate_model_on_worker(
                model_identifier="Model update " + worker_id,
                worker=testing,
                dataset_key="mnist_testing",
                model=worker_model,
                nr_bins=10,
                batch_size=128,
                print_target_hist=False,
                device=device
            )

    # 将并行执行的多个客户端训练的结果，进行聚合。
    for worker_id, worker_model, worker_loss in results:
        if worker_model is not None:
            models[worker_id] = worker_model
            loss_values[worker_id] = worker_loss

    # 调用联邦平均算法，对分布式models进行聚合。
    traced_model = utils.federated_avg(models)

    # 每10轮进行一次test。使用test客户端检验当前结果的准确性。
    # 这里主要测试，模型聚合后，模型的准确率。
    if test_models:
        rwc.evaluate_model_on_worker(
            model_identifier="Federated model",
            worker=testing,
            dataset_key="mnist_testing",
            model=traced_model,
            nr_bins=10,
            batch_size=128,
            print_target_hist=False,
            device=device
        )

    # decay learning rate
    learning_rate = max(0.98 * learning_rate, args.lr * 0.01)

if args.save_model:
    torch.save(model.state_dict(), "mnist_cnn.pt")

20:21:34 - Training round 1/40
20:21:42 - Evaluating models
20:21:45 - Model update alice: Percentage numbers 0-3: 100%, 4-6: 0%, 7-9: 0%
20:21:45 - Model update alice: Average loss: 0.0216, Accuracy: 1498/10000 (14.98%)
20:21:49 - Model update bob: Percentage numbers 0-3: 0%, 4-6: 100%, 7-9: 0%
20:21:49 - Model update bob: Average loss: 0.0441, Accuracy: 892/10000 (8.92%)
20:21:52 - Model update charlie: Percentage numbers 0-3: 0%, 4-6: 0%, 7-9: 100%
20:21:52 - Model update charlie: Average loss: 0.0323, Accuracy: 1092/10000 (10.92%)
20:21:56 - Federated model: Percentage numbers 0-3: 0%, 4-6: 99%, 7-9: 0%
20:21:56 - Federated model: Average loss: 0.0177, Accuracy: 892/10000 (8.92%)
20:21:56 - Training round 2/40
20:22:02 - Training round 3/40
20:22:10 - Training round 4/40
20:22:17 - Training round 5/40
20:22:24 - Training round 6/40
20:22:32 - Training round 7/40
20:22:39 - Training round 8/40
20:22:46 - Training round 9/40
20:22:53 - Training round 10/40
20:23:00 - Training round 1

经过40轮训练，我们在整个测试数据集上的准确率均达到95％以上。鉴于没有工人能使用超过4位数字，这给人留下了深刻的印象！