# PyTorchとOpenFLを用いたFederated Learningのチュートリアル

（注）このチュートリアルは[公式サンプル](https://github.com/intel/openfl/blob/v1.5/openfl-tutorials/interactive_api/PyTorch_TinyImageNet/workspace/pytorch_tinyimagenet.ipynb)を元ネタとして、より分かりやすくするために一部を編集しております。

## ライブラリーのインストール

In [None]:
!pip install torch==1.13.1
!pip install torchvision==0.14.1
!pip install setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability
!pip install wheel>=0.38.0 # not directly required, pinned by Snyk to avoid a vulnerability

## ライブラリーのインポート

In [None]:
import os
import glob

from PIL import Image

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from openfl.interface.interactive_api.federation import Federation
from openfl.interface.interactive_api.experiment import TaskInterface, DataInterface, ModelInterface, FLExperiment
from copy import deepcopy
import torchvision
from torchvision import transforms as T
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import tqdm

torch.manual_seed(0)
np.random.seed(0)

---

## PyTorchベースのモデル学習スクリプト作成
ただし、一部OpenFLのお作法に則り実装すべき個所（【OpenFL独自コード】という部分）がある。

## データセットの定義

通常のPyTorchプログラミングと同様にDatasetとデータ加工処理を定義する。

In [None]:
normalize = T.Normalize(
    mean=[0.485, 0.456, 0.406],
    std=[0.229, 0.224, 0.225]
)

augmentation = T.RandomApply(
    [T.RandomHorizontalFlip(),
     T.RandomRotation(10),
     T.RandomResizedCrop(64)], 
    p=.8
)

training_transform = T.Compose(
    [T.Lambda(lambda x: x.convert("RGB")),
     T.ToTensor(),
     augmentation,
     normalize]
)

valid_transform = T.Compose(
    [T.Lambda(lambda x: x.convert("RGB")),
     T.ToTensor(),
     normalize]
)

class TransformedDataset(Dataset):
    """Image Person ReID Dataset."""

    def __init__(self, dataset, transform=None, target_transform=None):
        """Initialize Dataset."""
        self.dataset = dataset
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        """Length of dataset."""
        return len(self.dataset)

    def __getitem__(self, index):
        img, label = self.dataset[index]
        label = self.target_transform(label) if self.target_transform else label
        img = self.transform(img) if self.transform else img
        return img, label


【OpenFL独自コード】
OpenFLが提供するDataInterfaceクラスを継承して、下記サンプルのお作法通りに実装する。
このクラスが本ソースコードと各コラボレーター上のShard Descriptorとのインターフェースになる。

In [None]:
class TinyImageNetDataset(DataInterface):
    def __init__(self, **kwargs):
        self.kwargs = kwargs
    
    @property
    def shard_descriptor(self):
        return self._shard_descriptor
        
    @shard_descriptor.setter
    def shard_descriptor(self, shard_descriptor):
        """
        Describe per-collaborator procedures or sharding.

        This method will be called during a collaborator initialization.
        Local shard_descriptor  will be set by Envoy.
        """
        self._shard_descriptor = shard_descriptor
        
        self.train_set = TransformedDataset(
            self._shard_descriptor.get_dataset('train'),
            transform=training_transform
        )
        self.valid_set = TransformedDataset(
            self._shard_descriptor.get_dataset('val'),
            transform=valid_transform
        )
        
    def get_train_loader(self, **kwargs):
        """
        Output of this method will be provided to tasks with optimizer in contract
        """
        generator=torch.Generator()
        generator.manual_seed(0)
        return DataLoader(
            self.train_set, batch_size=self.kwargs['train_bs'], shuffle=True, generator=generator
            )

    def get_valid_loader(self, **kwargs):
        """
        Output of this method will be provided to tasks without optimizer in contract
        """
        return DataLoader(self.valid_set, batch_size=self.kwargs['valid_bs'])

    def get_train_data_size(self):
        """
        Information for aggregation
        """
        return len(self.train_set)

    def get_valid_data_size(self):
        """
        Information for aggregation
        """
        return len(self.valid_set)
    

In [None]:
fed_dataset = TinyImageNetDataset(train_bs=64, valid_bs=64)

## モデルの定義

通常のPyTorchプログラミングと同様にモデルを定義する。本デモではTorchvisionのMobileNet V2をファインチューニングしていく。

In [None]:
"""
MobileNetV2 model
"""

class Net(nn.Module):
    def __init__(self):
        torch.manual_seed(0)
        super(Net, self).__init__()
        self.model = torchvision.models.mobilenet_v2(pretrained=True)
        self.model.requires_grad_(False)
        self.model.classifier[1] = torch.nn.Linear(in_features=1280, \
                        out_features=200, bias=True)

    def forward(self, x):
        x = self.model.forward(x)
        return x

model_net = Net()

こちらも通常のPyTorchプログラミングと同様にOptimizerと損失関数を定義する

In [None]:
params_to_update = []
for param in model_net.parameters():
    if param.requires_grad == True:
        params_to_update.append(param)
        
optimizer_adam = optim.Adam(params_to_update, lr=1e-4)

def cross_entropy(output, target):
    """Binary cross-entropy metric
    """
    return F.cross_entropy(input=output,target=target)

【OpenFL独自コード】OpenFLが提供するModelInterfaceクラスのインスタンスを作成し、モデルとOptimizerのインスタンスをセットする。
このModelInterfaceによりモデルデータがコラボレーターに転送される。

In [None]:
framework_adapter = 'openfl.plugins.frameworks_adapters.pytorch_adapter.FrameworkAdapterPlugin'
model_interface = ModelInterface(model=model_net, optimizer=optimizer_adam, framework_plugin=framework_adapter)

# Save the initial model state
initial_model = deepcopy(model_net)

## 学習処理（Train）と検証処理（Validation）の定義

【OpenFL独自コード】TrainとValの処理自体は通常のPyTorchのお作法で実装できるが、その関数をOpenFLのTaskInterfaceにタスクとして登録する必要がある。書き方は以下の通り、少し癖があるが慣れるしかない。

In [None]:
task_interface = TaskInterface()


# The Interactive API supports registering functions definied in main module or imported.
def function_defined_in_notebook(some_parameter):
    print(f'Also I accept a parameter and it is {some_parameter}')

# Task interface currently supports only standalone functions.
@task_interface.add_kwargs(**{'some_parameter': 42})
@task_interface.register_fl_task(model='net_model', data_loader='train_loader', \
                     device='device', optimizer='optimizer')     
def train(net_model, train_loader, optimizer, device, loss_fn=cross_entropy, some_parameter=None):
    torch.manual_seed(0)
    device='cpu'
    function_defined_in_notebook(some_parameter)
    
    train_loader = tqdm.tqdm(train_loader, desc="train")
    net_model.train()
    net_model.to(device)

    losses = []

    for data, target in train_loader:
        data, target = torch.tensor(data).to(device), torch.tensor(
            target).to(device)
        optimizer.zero_grad()
        output = net_model(data)
        loss = loss_fn(output=output, target=target)
        loss.backward()
        optimizer.step()
        losses.append(loss.detach().cpu().numpy())
        
    return {'train_loss': np.mean(losses),}


@task_interface.register_fl_task(model='net_model', data_loader='val_loader', device='device')     
def validate(net_model, val_loader, device):
    torch.manual_seed(0)
    device = torch.device('cpu')
    net_model.eval()
    net_model.to(device)
    
    val_loader = tqdm.tqdm(val_loader, desc="validate")
    val_score = 0
    total_samples = 0

    with torch.no_grad():
        for data, target in val_loader:
            samples = target.shape[0]
            total_samples += samples
            data, target = torch.tensor(data).to(device), \
                torch.tensor(target).to(device, dtype=torch.int64)
            output = net_model(data)
            pred = output.argmax(dim=1,keepdim=True)
            val_score += pred.eq(target).sum().cpu().numpy()
            
    return {'acc': val_score / total_samples,}

ここまででモデルの学習に関連するPyTorchの実装は完了です。通常のPyTorchプログラミングに加えてOpenFL独自のコードを多少加える必要があります。

---

## 連合（Federation）へ接続

In [None]:
client_id = 'api'
director_node_fqdn = 'localhost'

# 1) TLS無しで接続（検証、PoC向け）
federation = Federation(
    client_id=client_id, 
    director_node_fqdn=director_node_fqdn, 
    director_port='50051', 
    tls=False)

# --------------------------------------------------------------------------------------------------------------------
# please use the same identificator that was used in signed certificate
# 2) mTLS有りで接続（本番環境向け）
# ユーザーがmTLSを有効にする場合、CAルートチェーンと署名されたキーペアをフェデレーションインターフェースに提供する必要があります。
# cert_dir = 'cert'
# cert_chain = f'{cert_dir}/root_ca.crt'
# api_certificate = f'{cert_dir}/{client_id}.crt'
# api_private_key = f'{cert_dir}/{client_id}.key'
# federation = Federation(
#     client_id=client_id, 
#     director_node_fqdn=director_node_fqdn, 
#     director_port='50051',
#     cert_chain=cert_chain, 
#     api_cert=api_certificate, 
#     api_private_key=api_private_key)

接続できたことを確認するため、Directorから現在接続されているコラボレーターの情報を取得する

In [None]:
shard_registry = federation.get_shard_registry()
shard_registry

## 連合学習の開始

まずは学習の単位を表す概念である「実験（Experiment）」を作成する。実験にはFederationインスタンスとユニークな名前をセットする。

In [None]:
# create an experimnet in federation
experiment_name = 'tinyimagenet_test_experiment'
fl_experiment = FLExperiment(federation=federation, experiment_name=experiment_name)

その上で、実験のstartメソッドを呼び出す。引数としてこれまで定義してきたDataInterface、ModelInterface、TaskInterfaceをそれぞれセットする。rounds_to_trainはDirector⇔Collaboratorの一連のやり取りを何回実施するかを指定する。

In [None]:
# The following command zips the workspace and python requirements to be transfered to collaborator nodes
fl_experiment.start(
    model_provider=model_interface, 
    task_keeper=task_interface,
    data_loader=fed_dataset,
    rounds_to_train=5,
    opt_treatment='CONTINUE_GLOBAL',
    override_config={'network.settings.agg_port': 50002}
)

## 学習の途中経過のモニター

In [None]:
# If user want to stop IPython session, then reconnect and check how experiment is going
# fl_experiment.restore_experiment_state(model_interface)

fl_experiment.stream_metrics(tensorboard_logs=False)

## 学習後の結果の取得

In [None]:
best_model = fl_experiment.get_best_model()
torch.save(best_model.state_dict(), 'best_model.pth')

new_model = Net()
new_model.load_state_dict(torch.load('best_model.pth'))

## 実験データの削除

In [None]:
fl_experiment.remove_experiment_data()