[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/intel/openfl/blob/develop/openfl-tutorials/interactive_api/numpy_linear_regression/workspace/SingleNotebook.ipynb)

### Preparations in colab:
We need to clone the repository to run a federation because it contains director and envoy configs to start from.

1. Clone the OpenFL repository
2. Install OpenFL 
3. Go to the medmnist workspace

In [1]:
# For right now, install from source, later we would migrate to PyPI install
# !pip install openfl==1.2.1
# !git clone https://github.com/intel/openfl.git
# !cd openfl && pip install .

In [2]:
# import os
# from time import sleep

# os.chdir('./openfl/openfl-tutorials/interactive_api/PyTorch_MedMNIST_ResNet50/workspace')

# MedMNIST3D with PyTorch and OpenFL

In [3]:
!pip install medmnist torchvision==0.8.1 ACSConv

Collecting medmnist
  Using cached medmnist-2.0.2-py3-none-any.whl (21 kB)
Collecting torchvision==0.8.1
  Using cached torchvision-0.8.1-cp38-cp38-manylinux1_x86_64.whl (12.8 MB)
Collecting ACSConv
  Using cached ACSConv-0.1.1-py3-none-any.whl
Collecting pillow>=4.1.1
  Downloading Pillow-9.1.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.3 MB)
     |████████████████████████████████| 4.3 MB 1.7 MB/s            
[?25hCollecting torch==1.7.0
  Using cached torch-1.7.0-cp38-cp38-manylinux1_x86_64.whl (776.8 MB)
Collecting future
  Using cached future-0.18.2-py3-none-any.whl
Collecting dataclasses
  Using cached dataclasses-0.6-py3-none-any.whl (14 kB)
Collecting scikit-image
  Using cached scikit_image-0.19.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.0 MB)
Collecting fire
  Using cached fire-0.4.0-py2.py3-none-any.whl
Collecting matplotlib
  Using cached matplotlib-3.5.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl (11.3 MB)
Collecting termcolor

In [4]:
import os
import argparse
from tqdm import trange
import numpy as np
import time
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torchvision.transforms as transforms
from tensorboardX import SummaryWriter
from collections import OrderedDict
from models import ResNet18, ResNet50

from acsconv.converters import ACSConverter, Conv3dConverter, Conv2_5dConverter
from utils import model_to_syncbn, Transform3D
import medmnist
from medmnist import INFO, Evaluator

  from .autonotebook import tqdm as notebook_tqdm


The ``converters`` are currently experimental. It may not support operations including (but not limited to) Functions in ``torch.nn.functional`` that involved data dimension


In [5]:
def prepare_data(DataClass, batch_size):
    print('==> Preparing data...')
    
    
    train_transform = Transform3D()
    eval_transform = Transform3D()
     
    train_dataset = DataClass(split='train', transform=train_transform, download=True, as_rgb=False)
    train_dataset_at_eval = DataClass(split='train', transform=eval_transform, download=True, as_rgb=False)
    val_dataset = DataClass(split='val', transform=eval_transform, download=True, as_rgb=False)
    test_dataset = DataClass(split='test', transform=eval_transform, download=True, as_rgb=False)

    
    train_loader = data.DataLoader(dataset=train_dataset,
                                batch_size=batch_size,
                                shuffle=True)
    train_loader_at_eval = data.DataLoader(dataset=train_dataset_at_eval,
                                batch_size=batch_size,
                                shuffle=False)
    val_loader = data.DataLoader(dataset=val_dataset,
                                batch_size=batch_size,
                                shuffle=False)
    test_loader = data.DataLoader(dataset=test_dataset,
                                batch_size=batch_size,
                                shuffle=False)
    return (train_loader, train_loader_at_eval, val_loader, test_loader)

In [6]:
def get_device(gpu_ids):
    str_ids = gpu_ids.split(',')
    gpu_ids = []
    for str_id in str_ids:
        id = int(str_id)
        if id >= 0:
            gpu_ids.append(id)
    if len(gpu_ids) > 0:
        os.environ["CUDA_VISIBLE_DEVICES"]=str(gpu_ids[0])

    return torch.device('cuda:{}'.format(gpu_ids[0])) if gpu_ids else torch.device('cpu')

In [7]:
def build_model(n_channels, num_classes, conv, device):
    print('==> Building model...')

    model = ResNet50(in_channels=n_channels, num_classes=num_classes)

    if conv=='ACSConv':
        model = model_to_syncbn(ACSConverter(model))
    if conv=='Conv2_5d':
        model = model_to_syncbn(Conv2_5dConverter(model))
    if conv=='Conv3d':
        if pretrained_3d == 'i3d':
            model = model_to_syncbn(Conv3dConverter(model, i3d_repeat_axis=-3))
        else:
            model = model_to_syncbn(Conv3dConverter(model, i3d_repeat_axis=None))
    
    model = model.to(device)
    return model

In [8]:
def train(model, train_loader, criterion, optimizer, device, writer):
    total_loss = []
    global iteration

    model.train()
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model(inputs.to(device))

        targets = torch.squeeze(targets, 1).long().to(device)
        loss = criterion(outputs, targets)

        total_loss.append(loss.item())
        writer.add_scalar('train_loss_logs', loss.item(), iteration)
        iteration += 1

        loss.backward()
        optimizer.step()
    
    epoch_loss = sum(total_loss)/len(total_loss)
    return epoch_loss

In [9]:
def test(model, evaluator, data_loader, criterion, device, run, save_folder=None):

    model.eval()

    total_loss = []
    y_score = torch.tensor([]).to(device)

    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(data_loader):
            outputs = model(inputs.to(device))
        
            targets = torch.squeeze(targets, 1).long().to(device)
            loss = criterion(outputs, targets)
            m = nn.Softmax(dim=1)
            outputs = m(outputs).to(device)
            targets = targets.float().resize_(len(targets), 1)

            total_loss.append(loss.item())

            y_score = torch.cat((y_score, outputs), 0)

        y_score = y_score.detach().cpu().numpy()
        auc, acc = evaluator.evaluate(y_score, save_folder, run)

        test_loss = sum(total_loss) / len(total_loss)

        return [test_loss, auc, acc]

In [10]:
def run_experiment(
    data_flag='organmnist3d',
    output_root='./output', # output root, where to save models
    num_epochs=100, # num of epochs of training, the script would only test model if set num_epochs to 0
    gpu_ids='0',
    batch_size=32,
    conv='ACSConv', # choose converter from Conv2_5d, Conv3d, ACSConv
    pretrained_3d='i3d',
    download=False,
    as_rgb=False, # to copy channels, tranform shape 1x28x28x28 to 3x28x28x28
    shape_transform=False, # for shape dataset, whether multiply 0.5 at eval
    model_path=None, # root of the pretrained model to test
    run='model1' # to name a standard evaluation csv file, named as {flag}_{split}_[AUC]{auc:.3f}_[ACC]{acc:.3f}@{run}.csv
):
    lr = 0.001
    gamma=0.1
    milestones = [0.5 * num_epochs, 0.75 * num_epochs]

    info = INFO[data_flag]
    task = info['task']
    n_channels = 3 if as_rgb else info['n_channels']
    n_classes = len(info['label'])

    DataClass = getattr(medmnist, info['python_class'])
    
    (train_loader, train_loader_at_eval, val_loader, test_loader) = prepare_data(DataClass, batch_size)
    
    device = get_device(gpu_ids)
        
    output_root = os.path.join(output_root, data_flag, time.strftime("%y%m%d_%H%M%S"))
    if not os.path.exists(output_root):
        os.makedirs(output_root)

    model = build_model(n_channels, n_classes, conv, device)

    train_evaluator = medmnist.Evaluator(data_flag, 'train')
    val_evaluator = medmnist.Evaluator(data_flag, 'val')
    test_evaluator = medmnist.Evaluator(data_flag, 'test')

    criterion = nn.CrossEntropyLoss()

    if model_path is not None:
        model.load_state_dict(torch.load(model_path, map_location=device)['net'], strict=True)
        train_metrics = test(model, train_evaluator, train_loader_at_eval, criterion, device, run, output_root)
        val_metrics = test(model, val_evaluator, val_loader, criterion, device, run, output_root)
        test_metrics = test(model, test_evaluator, test_loader, criterion, device, run, output_root)

        print('train  auc: %.5f  acc: %.5f\n' % (train_metrics[1], train_metrics[2]) + \
              'val  auc: %.5f  acc: %.5f\n' % (val_metrics[1], val_metrics[2]) + \
              'test  auc: %.5f  acc: %.5f\n' % (test_metrics[1], test_metrics[2]))

    if num_epochs == 0:
        return


    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=milestones, gamma=gamma)
    
    logs = ['loss', 'auc', 'acc']
    train_logs = ['train_'+log for log in logs]
    val_logs = ['val_'+log for log in logs]
    test_logs = ['test_'+log for log in logs]
    log_dict = OrderedDict.fromkeys(train_logs+val_logs+test_logs, 0)
    
    writer = SummaryWriter(log_dir=os.path.join(output_root, 'Tensorboard_Results'))

    best_auc = 0
    best_epoch = 0
    best_model = model

    global iteration
    iteration = 0

    for epoch in trange(num_epochs):
        
        train_loss = train(model, train_loader, criterion, optimizer, device, writer)
        
        train_metrics = test(model, train_evaluator, train_loader_at_eval, criterion, device, run)
        val_metrics = test(model, val_evaluator, val_loader, criterion, device, run)
        test_metrics = test(model, test_evaluator, test_loader, criterion, device, run)
        
        scheduler.step()
        
        for i, key in enumerate(train_logs):
            log_dict[key] = train_metrics[i]
        for i, key in enumerate(val_logs):
            log_dict[key] = val_metrics[i]
        for i, key in enumerate(test_logs):
            log_dict[key] = test_metrics[i]

        for key, value in log_dict.items():
            writer.add_scalar(key, value, epoch)
            
        cur_auc = val_metrics[1]
        if cur_auc > best_auc:
            best_epoch = epoch
            best_auc = cur_auc
            best_model = model

            print('cur_best_auc:', best_auc)
            print('cur_best_epoch', best_epoch)

    state = {
        'net': model.state_dict(),
    }

    path = os.path.join(output_root, 'best_model.pth')
    torch.save(state, path)

    train_metrics = test(best_model, train_evaluator, train_loader_at_eval, criterion, device, run, output_root)
    val_metrics = test(best_model, val_evaluator, val_loader, criterion, device, run, output_root)
    test_metrics = test(best_model, test_evaluator, test_loader, criterion, device, run, output_root)

    train_log = 'train  auc: %.5f  acc: %.5f\n' % (train_metrics[1], train_metrics[2])
    val_log = 'val  auc: %.5f  acc: %.5f\n' % (val_metrics[1], val_metrics[2])
    test_log = 'test  auc: %.5f  acc: %.5f\n' % (test_metrics[1], test_metrics[2])

    log = '%s\n' % (data_flag) + train_log + val_log + test_log + '\n'
    print(log)
    
    with open(os.path.join(output_root, '%s_log.txt' % (data_flag)), 'a') as f:
        f.write(log)        
            
    writer.close()

In [11]:
run_experiment(download=True, batch_size=16)

==> Preparing data...
Using downloaded and verified file: /home/itrushkin/.medmnist/organmnist3d.npz
Using downloaded and verified file: /home/itrushkin/.medmnist/organmnist3d.npz
Using downloaded and verified file: /home/itrushkin/.medmnist/organmnist3d.npz
Using downloaded and verified file: /home/itrushkin/.medmnist/organmnist3d.npz
==> Building model...


  0%|                                                                                                                                                                                             | 0/100 [00:37<?, ?it/s]


KeyboardInterrupt: 

# Now we run the same training on federated data

## 1. Start the Director service and several envoys with generated data

In [12]:
# Here are the main parameters for our Federation
n_cols=7

In [13]:
import os
from time import sleep
from pathlib import Path
import yaml
from typing import Dict, List, Union
from models import ResNet18, ResNet50
from torch import nn
import torch
from torch.utils.data import DataLoader
from utils import model_to_syncbn
from acsconv.converters import ACSConverter
from sklearn.metrics import roc_auc_score
from sklearn.metrics import accuracy_score
import numpy as np

### Start the Director service

In [14]:
cwd = Path.cwd()
director_workspace_path = Path('../director/').absolute()
director_config_file = director_workspace_path / 'director_config.yaml'
director_logfile = director_workspace_path / 'director.log'
if director_logfile.is_file(): director_logfile.unlink()

os.environ['main_folder'] = str(cwd)
os.environ['director_workspace_path'] = str(director_workspace_path)
os.environ['director_logfile'] = str(director_logfile)
os.environ['director_config_file'] = str(director_config_file)

In [15]:
%%script /bin/bash --bg
cd $director_workspace_path
fx director start --disable-tls -c $director_config_file > $director_logfile &
cd $main_folder

## Start Envoys

#### First, we create several envoy config files 

In [16]:
# Read the original envoy config file content
with open(Path('../envoy/envoy_config.yaml'), "r") as stream:
    orig_config = yaml.safe_load(stream)

def generate_envoy_configs(config: Dict,
                           save_path: Union[str, Path] = '../envoy/',
                           n_cols: int = 10,) -> List[Path]:
    
    config_paths = [(Path(save_path) / f'{i}_envoy_config.yaml').absolute()
                for i in range(1, n_cols + 1)]

    for i, path in enumerate(config_paths):
        config['params']['cuda_devices'] = [i%3]
        config['shard_descriptor']['params']['rank'] = i
        config['shard_descriptor']['params']['worldsize'] = n_cols
        with open(path, "w") as stream:
            yaml.safe_dump(config, stream)
            
    return config_paths
            
def remove_configs(config_paths):
    for path in config_paths:
        path.unlink()

In [17]:
config_paths = generate_envoy_configs(orig_config,
                                      n_cols=n_cols)
# remove_configs(config_paths)

#### Now start Envoy processes in a loop

In [18]:
# envoy_workspace_path = Path('../envoy/').absolute()
def start_envoys(config_paths: List[Path]) -> None:
    envoy_workspace_path = config_paths[0].parent
    cwd = Path.cwd()
    os.chdir(envoy_workspace_path)
    for i, path in enumerate(config_paths):
        env_name = f'env_{i + 1}'
        print(f'Starting {env_name}...')
        os.system(f'fx envoy start -n {env_name} --disable-tls '
                  f'--envoy-config-path {path} -dh localhost -dp 50049 '
                  f'>env_{i + 1}.log &')
    os.chdir(cwd)

sleep(5)

start_envoys(config_paths)

sleep(25)

Starting env_1...
Starting env_2...
Starting env_3...
Starting env_4...
Starting env_5...
Starting env_6...
Starting env_7...


## 2. Connect to the Director service of out Federation as Data scientist

In [19]:
# Create a federation
from openfl.interface.interactive_api.federation import Federation

# please use the same identificator that was used in signed certificate
client_id = 'frontend'
director_node_fqdn = 'localhost'
director_port = 50049

federation = Federation(
    client_id=client_id,
    director_node_fqdn=director_node_fqdn,
    director_port=director_port,
    tls=False
)

In [20]:
# Data scientist may request a list of connected envoys
shard_registry = federation.get_shard_registry()

# WARNING!

# Make sure shard registry contains all the envoys you started!
# In other case try rereconnecting to the Director (the cell above).
shard_registry

{'env_2': {'shard_info': node_info {
    name: "env_2"
    cuda_devices {
      index: 1
    }
  }
  shard_description: "Allowed dataset types are `train` and `val`"
  sample_shape: "1"
  sample_shape: "28"
  sample_shape: "28"
  sample_shape: "28"
  target_shape: "1",
  'is_online': True,
  'is_experiment_running': False,
  'last_updated': '2022-04-04 10:52:27',
  'current_time': '2022-04-04 10:52:30',
  'valid_duration': seconds: 10,
  'experiment_name': 'ExperimentName Mock'},
 'env_1': {'shard_info': node_info {
    name: "env_1"
    cuda_devices {
    }
  }
  shard_description: "Allowed dataset types are `train` and `val`"
  sample_shape: "1"
  sample_shape: "28"
  sample_shape: "28"
  sample_shape: "28"
  target_shape: "1",
  'is_online': True,
  'is_experiment_running': False,
  'last_updated': '2022-04-04 10:52:28',
  'current_time': '2022-04-04 10:52:30',
  'valid_duration': seconds: 10,
  'experiment_name': 'ExperimentName Mock'},
 'env_3': {'shard_info': node_info {
    name

### Now we will prepare an FL experimnet using OpenFL Python API

### Data

In [21]:
from openfl.interface.interactive_api.experiment import TaskInterface, DataInterface, ModelInterface, FLExperiment

class MedMnistDataSet(DataInterface):
    def __init__(self, **kwargs):
        """Initialize DataLoader."""
        self.kwargs = kwargs
        pass

    @property
    def shard_descriptor(self):
        """Return shard descriptor."""
        return self._shard_descriptor
    
    @shard_descriptor.setter
    def shard_descriptor(self, shard_descriptor):
        """
        Describe per-collaborator procedures or sharding.

        This method will be called during a collaborator initialization.
        Local shard_descriptor  will be set by Envoy.
        """
        self._shard_descriptor = shard_descriptor
        self.train_set = shard_descriptor.get_dataset("train")
        self.val_set = shard_descriptor.get_dataset("val")

    def get_train_loader(self, **kwargs):
        """Output of this method will be provided to tasks with optimizer in contract."""
        return DataLoader(self.train_set, batch_size=self.kwargs['train_bs'], shuffle=True)

    def get_valid_loader(self, **kwargs):
        """Output of this method will be provided to tasks without optimizer in contract."""
        return DataLoader(self.val_set, batch_size=self.kwargs['valid_bs'], shuffle=True)

    def get_train_data_size(self):
        """Information for aggregation."""
        return len(self.train_set)

    def get_valid_data_size(self):
        """Information for aggregation."""
        return len(self.val_set)
    
lin_reg_dataset = MedMnistDataSet(train_bs=4, valid_bs=4)

### Model

In [22]:
framework_adapter = 'openfl.plugins.frameworks_adapters.pytorch_adapter.FrameworkAdapterPlugin'
fed_model = ResNet50(in_channels=1, num_classes=11)
lr = 0.001
optimizer = torch.optim.Adam(fed_model.parameters(), lr=lr)
MI = ModelInterface(model=fed_model, optimizer=optimizer, framework_plugin=framework_adapter)

# Save the initial model state
initial_model = ResNet50(in_channels=1, num_classes=11)
fed_model = model_to_syncbn(ACSConverter(fed_model))

### Tasks
Using an Optimizer does not make sense for this experiment. Yet it is a required part of a training task contract in the current version of OpenFL, so we just pass None.
We need to employ a trick reporting metrics. OpenFL decides which model is the best based on an *increasing* metric.

In [23]:
TI = TaskInterface()

@TI.register_fl_task(model='my_model', data_loader='train_loader', \
                     device='device', optimizer='optimizer')     
def train(my_model, train_loader, optimizer, device, criterion=nn.CrossEntropyLoss()):
    total_loss = []
    my_model = my_model.to(device)
    my_model.train()
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = my_model(torch.tensor(inputs).to(device))

        targets = torch.squeeze(targets, 1).long().to(device)
        loss = criterion(outputs, targets)

        total_loss.append(loss.item())

        loss.backward()
        optimizer.step()
    
    epoch_loss = sum(total_loss)/len(total_loss)
    return {'loss': epoch_loss}

@TI.register_fl_task(model='my_model', data_loader='val_loader', device='device')     
def validate(my_model, val_loader, device, criterion=nn.CrossEntropyLoss()):
    def get_auc(y_true, y_score):
        auc = 0
        for i in range(y_score.shape[1]):
            y_true_binary = (y_true == i).astype(float)
            y_score_binary = y_score[:, i]
            auc += roc_auc_score(y_true_binary, y_score_binary)
        ret = auc / y_score.shape[1]
        
        return ret
    
    def get_acc(y_true, y_score):
        ret = accuracy_score(y_true, np.argmax(y_score, axis=-1))

        return ret
    
    my_model = my_model.to(device)
    my_model.eval()

    total_loss = []
    y_true = []
    y_score = torch.tensor([]).to(device)

    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(val_loader):
            outputs = my_model(inputs.to(device))
            y_true += targets
            targets = torch.squeeze(targets, 1).long().to(device)
            loss = criterion(outputs, targets)
            m = nn.Softmax(dim=1)
            outputs = m(outputs).to(device)
            targets = targets.float().resize_(len(targets), 1)

            total_loss.append(loss.item())

            y_score = torch.cat((y_score, outputs), 0)
        
        y_true = np.stack(y_true)
        y_score = y_score.detach().cpu().numpy()
        try:
            auc = get_auc(y_true, y_score)
        except:
            print(f'{y_true=}')
            raise
        acc = get_acc(y_true, y_score)

        val_loss = sum(total_loss) / len(total_loss)

        return {'val_loss':val_loss, 'auc':auc, 'acc':acc}

### Run

In [24]:
experiment_name = 'medmnist_experiment'
fl_experiment = FLExperiment(federation=federation, experiment_name=experiment_name,
                            )

In [25]:
fl_experiment.start(model_provider=MI, 
                    task_keeper=TI,
                    data_loader=lin_reg_dataset,
                    rounds_to_train=10,
                    device_assignment_policy='CUDA_PREFERRED')



In [26]:
# This method not only prints messages recieved from the director, 
# but also saves logs in the tensorboard format (by default)
fl_experiment.stream_metrics()

You should consider upgrading via the '/home/itrushkin/.virtualenvs/director/bin/python -m pip install --upgrade pip' command.
You should consider upgrading via the '/home/itrushkin/.virtualenvs/director/bin/python -m pip install --upgrade pip' command.
You should consider upgrading via the '/home/itrushkin/.virtualenvs/director/bin/python -m pip install --upgrade pip' command.
You should consider upgrading via the '/home/itrushkin/.virtualenvs/director/bin/python -m pip install --upgrade pip' command.
You should consider upgrading via the '/home/itrushkin/.virtualenvs/director/bin/python -m pip install --upgrade pip' command.
You should consider upgrading via the '/home/itrushkin/.virtualenvs/director/bin/python -m pip install --upgrade pip' command.
You should consider upgrading via the '/home/itrushkin/.virtualenvs/director/bin/python -m pip install --upgrade pip' command.


### Optional: start tensorboard

Locally, start tensorboard in background and open localhost:6006 in your browser:

In [27]:
%%script /bin/bash --bg
tensorboard --host $(hostname --all-fqdns | awk '{print $1}') --logdir logs

In Google Colab you may use the inline extension 

In [28]:
%load_ext tensorboard
%tensorboard --logdir logs

Reusing TensorBoard on port 6007 (pid 7900), started 2 days, 16:51:46 ago. (Use '!kill 7900' to kill it.)

### 3. Retrieve the trained model from the Director

In [29]:
last_model = fl_experiment.get_last_model()
best_model = fl_experiment.get_best_model()
print(best_model.weights)
print(last_model.weights)
print(f"last model MSE: {last_model.mse(x,y)}")
print(f"best model MSE: {best_model.mse(x,y)}")

ModuleAttributeError: 'ResNet' object has no attribute 'weights'

## Cleaning

In [None]:
# To stop all services run
!pkill fx

In [None]:
remove_configs(config_paths)