# Fedbiomed Researcher

Use for developing (autoreloads changes made across packages)

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from fedbiomed.researcher.requests import Requests
req = Requests()
req.list(verbose=True)

2021-12-17 15:12:50,071 fedbiomed INFO - Component environment:
2021-12-17 15:12:50,072 fedbiomed INFO - - type = ComponentType.RESEARCHER
2021-12-17 15:12:51,016 fedbiomed INFO - Messaging researcher_860a6809-2bc7-47ed-8f94-423486e71805 successfully connected to the message broker, object = <fedbiomed.common.messaging.Messaging object at 0x106bf64f0>
2021-12-17 15:12:51,125 fedbiomed INFO - Listing available datasets in all nodes... 
2021-12-17 15:12:51,142 fedbiomed INFO - log from: node_4f0f6a9a-a2d6-4e17-a448-7c853bcf561f / DEBUG - Message received: {'researcher_id': 'researcher_860a6809-2bc7-47ed-8f94-423486e71805', 'command': 'list'}
2021-12-17 15:12:51,144 fedbiomed INFO - log from: node_a6c386ed-2df1-40d9-9bdd-89c7606c8581 / DEBUG - Message received: {'researcher_id': 'researcher_860a6809-2bc7-47ed-8f94-423486e71805', 'command': 'list'}
2021-12-17 15:13:01,147 fedbiomed INFO - 
 Node: node_4f0f6a9a-a2d6-4e17-a448-7c853bcf561f | Number of Datasets: 1 
+---------+-------------+--

{'node_4f0f6a9a-a2d6-4e17-a448-7c853bcf561f': [{'name': 'mednist',
   'data_type': 'images',
   'tags': ['mednist'],
   'description': 'mednist',
   'shape': [14738, 3, 64, 64]}],
 'node_a6c386ed-2df1-40d9-9bdd-89c7606c8581': [{'name': 'mednist',
   'data_type': 'images',
   'tags': ['mednist'],
   'description': 'mednist',
   'shape': [14736, 3, 64, 64]}]}

## Setting the client up
It is necessary to previously configure a node:
1. `./scripts/fedbiomed_run node add`
  * Select option 3 (images) to add MedNIST to the client
  * Confirm default tags by hitting "y" and ENTER
  * Pick the folder where MedNIST is contained
  * Data must have been added (if you get a warning saying that data must be unique is because it's been already added)
  
2. Check that your data has been added by executing `./scripts/fedbiomed_run node list`
3. Run the node using `./scripts/fedbiomed_run node run`. Wait until you get `Connected with result code 0`. it means you are online.

## Create an experiment to train a model on the data found

Declare a torch.nn MyTrainingPlan class to send for training on the node

In [3]:
from fedbiomed.researcher.environ import environ
import tempfile
tmp_dir_model = tempfile.TemporaryDirectory(dir=environ['TMP_DIR']+'/')
model_file = tmp_dir_model.name + '/class_export_mednist.py'

In [4]:
from monai.utils import set_determinism, first
from monai.transforms import (
    EnsureChannelFirstD,
    Compose,
    LoadImageD,
    RandRotateD,
    RandZoomD,
    ScaleIntensityRanged,
    EnsureTypeD,
)
from monai.data import DataLoader, Dataset, CacheDataset
from monai.config import print_config, USE_COMPILED
from monai.networks.nets import GlobalNet
from monai.networks.blocks import Warp
from monai.apps import MedNISTDataset

Note : write **only** the code to export in the following cell

In [5]:
%%writefile "$model_file"

import os
import numpy as np
import torch
from torch.nn import MSELoss
import torch.nn as nn
from fedbiomed.common.torchnn import TorchTrainingPlan
from torchvision import datasets, transforms
from typing import Union, List

from monai.utils import set_determinism, first
from monai.transforms import (
    EnsureChannelFirstD,
    Compose,
    LoadImageD,
    RandRotateD,
    RandZoomD,
    ScaleIntensityRanged,
    EnsureTypeD,
)
from monai.data import DataLoader, Dataset, CacheDataset
from monai.config import print_config, USE_COMPILED
from monai.networks.nets import GlobalNet
from monai.networks.blocks import Warp
from monai.apps import MedNISTDataset

# Here we define the model to be used. 
class MyMonaiTrainingPlan(TorchTrainingPlan):
    def __init__(self, kwargs):
        super(MyMonaiTrainingPlan, self).__init__()
        
        # Here we define the custom dependencies that will be needed by our custom Dataloader
        # In this case, we need the torch DataLoader classes
        # Since we will train on MNIST, we need datasets and transform from torchvision
        deps = ["import numpy as np",
                "import os",
                "from torch.nn import MSELoss",
                "from typing import Union, List",
                "from monai.utils import set_determinism, first",
                "from monai.transforms import (EnsureChannelFirstD,Compose,LoadImageD,RandRotateD,RandZoomD,ScaleIntensityRanged,EnsureTypeD,)",
                "from monai.data import DataLoader, Dataset, CacheDataset",
                "from monai.config import print_config, USE_COMPILED",
                "from monai.networks.nets import GlobalNet",
                "from monai.networks.blocks import Warp",
                "from monai.apps import MedNISTDataset",]
        self.add_dependency(deps)
        #self.num_class =  kwargs['num_class']
        #self.device = torch.device("cuda:0")
        self.model = GlobalNet(
            image_size=(64, 64),
            spatial_dims=2,
            in_channels=2,  # moving and fixed
            num_channel_initial=16,
            depth=3).to(self.device)
        self.image_loss = MSELoss()
        if USE_COMPILED:
            self.warp_layer = Warp(3, "border").to(self.device)
        else:
            self.warp_layer = Warp("bilinear", "border").to(self.device)
        self.optimizer = torch.optim.Adam(self.model.parameters(), 1e-5)
        
    def training_data(self, batch_size = 20):
        # Custom torch Dataloader for MedNIST data
        data_path = self.dataset_path.split('MedNIST/')[1]
        train_data = MedNISTDataset(root_dir=data_path, section="training", download=False, transform=None)
        training_datadict = [
            {"fixed_hand": item["image"], "moving_hand": item["image"]}
            for item in train_data.data if item["label"] == 4  # label 4 is for xray hands
        ]
        train_transforms = Compose(
            [
                LoadImageD(keys=["fixed_hand", "moving_hand"]),
                EnsureChannelFirstD(keys=["fixed_hand", "moving_hand"]),
                ScaleIntensityRanged(keys=["fixed_hand", "moving_hand"],
                                     a_min=0., a_max=255., b_min=0.0, b_max=1.0, clip=True,),
                RandRotateD(keys=["moving_hand"], range_x=np.pi/4, prob=1.0, keep_size=True, mode="bicubic"),
                RandZoomD(keys=["moving_hand"], min_zoom=0.9, max_zoom=1.1, prob=1.0, mode="bicubic", align_corners=False),
                EnsureTypeD(keys=["fixed_hand", "moving_hand"]),
            ]
        )
        train_ds = CacheDataset(data=training_datadict[:1000], transform=train_transforms,
                                cache_rate=1.0, num_workers=4)
        train_kwargs = {'batch_size': batch_size, 'shuffle': True, 'num_workers': 2}
        train_loader = DataLoader(train_ds, **train_kwargs)
        
        return train_loader

    # to be modified?
    def forward(self, x):
        return self.model(x)
    
    def training_step(self, moving, fixed):
        ddf = self.forward(torch.cat((moving, fixed), dim=1))
        pred_image = self.warp_layer(moving, ddf)
        loss = self.image_loss(pred_image, fixed)
        return loss
    
    def training_routine(self,
                         epochs: int = 2,
                         log_interval: int = 10,
                         lr: Union[int, float] = 1e-3,
                         batch_size: int = 48,
                         batch_maxnum: int = 0,
                         dry_run: bool = False,
                         monitor=None):
        
        if self.optimizer is None:
            self.optimizer = torch.optim.Adam(self.parameters(), lr=lr)

        # use_cuda = torch.cuda.is_available()
        # device = torch.device("cuda" if use_cuda else "cpu")
        self.device = "cpu"

        for epoch in range(1, epochs + 1):
            self.model.train()
            training_data = self.training_data(batch_size=batch_size)
            for batch_idx,batch_data in enumerate(training_data):
                self.optimizer.zero_grad()
                moving = batch_data["moving_hand"].to(self.device)
                fixed = batch_data["fixed_hand"].to(self.device)
                res = self.training_step(moving, fixed)
                res.backward()
                #loss.backward()
                self.optimizer.step()

                # do not take into account more than batch_maxnum
                # batches from the dataset
                if (batch_maxnum > 0) and (batch_idx >= batch_maxnum):
                    #print('Reached {} batches for this epoch, ignore remaining data'.format(batch_maxnum))
                    logger.debug('Reached {} batches for this epoch, ignore remaining data'.format(batch_maxnum))
                    break

                if batch_idx % log_interval == 0:
                    logger.info('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                        epoch,
                        batch_idx * len(moving),
                        len(training_data.dataset),
                        100 * batch_idx / len(training_data),
                        res.item()))

                    # Send scalar values via general/feedback topic
                    if monitor is not None:
                        monitor.add_scalar('Loss', res.item(), batch_idx, epoch)

                    if dry_run:
                        return

Writing /Users/balelli/ownCloud/INRIA_EPIONE/FedBioMed/fedbiomed/var/tmp/tmpw4t84yo8/class_export_mednist.py


This group of arguments correspond respectively:
* `model_args`: a dictionary with the arguments related to the model (e.g. number of layers, features, etc.). This will be passed to the model class on the client side.
* `training_args`: a dictionary containing the arguments for the training routine (e.g. batch size, learning rate, epochs, etc.). This will be passed to the routine on the client side.

**NOTE:** typos and/or lack of positional (required) arguments will raise error. 🤓

In [6]:
model_args = {'USE_COMPILED': True}

training_args = {
    'batch_size': 20, 
    'lr': 1e-5, 
    'epochs': 1, 
    'dry_run': False,  
    'batch_maxnum':250 # Fast pass for development : only use ( batch_maxnum * batch_size ) samples
}

Define an experiment
- search nodes serving data for these `tags`, optionally filter on a list of client ID with `clients`
- run a round of local training on nodes with model defined in `model_path` + federation with `aggregator`
- run for `rounds` rounds, applying the `client_selection_strategy` between the rounds

In [7]:
from fedbiomed.researcher.experiment import Experiment
from fedbiomed.researcher.aggregators.fedavg import FedAverage

tags =  ['mednist']
rounds = 1

exp = Experiment(tags=tags,
                 #clients=None,
                 model_path=model_file,
                 model_args=model_args,
                 model_class='MyMonaiTrainingPlan',
                 training_args=training_args,
                 rounds=rounds,
                 aggregator=FedAverage(),
                 node_selection_strategy=None
                )

2021-12-17 15:13:14,842 fedbiomed INFO - Searching dataset with data tags: ['mednist'] for all nodes
2021-12-17 15:13:14,856 fedbiomed INFO - log from: node_4f0f6a9a-a2d6-4e17-a448-7c853bcf561f / DEBUG - Message received: {'researcher_id': 'researcher_860a6809-2bc7-47ed-8f94-423486e71805', 'tags': ['mednist'], 'command': 'search'}
2021-12-17 15:13:14,861 fedbiomed INFO - log from: node_a6c386ed-2df1-40d9-9bdd-89c7606c8581 / DEBUG - Message received: {'researcher_id': 'researcher_860a6809-2bc7-47ed-8f94-423486e71805', 'tags': ['mednist'], 'command': 'search'}
2021-12-17 15:13:24,860 fedbiomed INFO - Node selected for training -> node_4f0f6a9a-a2d6-4e17-a448-7c853bcf561f
2021-12-17 15:13:24,861 fedbiomed INFO - Node selected for training -> node_a6c386ed-2df1-40d9-9bdd-89c7606c8581
2021-12-17 15:13:24,862 fedbiomed INFO - Checking data quality of federated datasets...
2021-12-17 15:13:24,986 fedbiomed DEBUG - torchnn saved model filename: /Users/balelli/ownCloud/INRIA_EPIONE/FedBioMed/fe

Let's start the experiment.

By default, this function doesn't stop until all the `rounds` are done for all the clients

In [8]:
exp.run()

2021-12-17 15:13:27,617 fedbiomed INFO - Sampled nodes in round 0 ['node_4f0f6a9a-a2d6-4e17-a448-7c853bcf561f', 'node_a6c386ed-2df1-40d9-9bdd-89c7606c8581']
2021-12-17 15:13:27,620 fedbiomed INFO - Send message to node node_4f0f6a9a-a2d6-4e17-a448-7c853bcf561f - {'researcher_id': 'researcher_860a6809-2bc7-47ed-8f94-423486e71805', 'job_id': '38893627-7f3a-479c-9cde-f8489e298d6a', 'training_args': {'batch_size': 20, 'lr': 1e-05, 'epochs': 1, 'dry_run': False, 'batch_maxnum': 250}, 'model_args': {'USE_COMPILED': True}, 'command': 'train', 'model_url': 'http://localhost:8844/media/uploads/2021/12/17/my_model_4bc2380e-71b5-474c-b4ae-2c21e24a01b3.py', 'params_url': 'http://localhost:8844/media/uploads/2021/12/17/my_model_298789a9-be40-4a2d-aa9a-bb52df16bdff.pt', 'model_class': 'MyMonaiTrainingPlan', 'training_data': {'node_4f0f6a9a-a2d6-4e17-a448-7c853bcf561f': ['dataset_13af71e1-82a2-428d-8799-411874784dbb']}}
2021-12-17 15:13:27,622 fedbiomed DEBUG - researcher_860a6809-2bc7-47ed-8f94-4234

MissingSchema: Invalid URL '': No schema supplied. Perhaps you meant http://?

Local training results for each round and each node are available in `exp.training_replies` (index 0 to (`rounds` - 1) ).

For example you can view the training results for the last round below.

Different timings (in seconds) are reported for each dataset of a node participating in a round :
- `rtime_training` real time (clock time) spent in the training function on the node
- `ptime_training` process time (user and system CPU) spent in the training function on the node
- `rtime_total` real time (clock time) spent in the researcher between sending the request and handling the response, at the `Job()` layer

In [None]:
print("\nList the training rounds : ", exp.training_replies.keys())

print("\nList the clients for the last training round and their timings : ")
round_data = exp.training_replies[rounds - 1].data
for c in range(len(round_data)):
    print("\t- {id} :\
    \n\t\trtime_training={rtraining:.2f} seconds\
    \n\t\tptime_training={ptraining:.2f} seconds\
    \n\t\trtime_total={rtotal:.2f} seconds".format(id = round_data[c]['node_id'],
        rtraining = round_data[c]['timing']['rtime_training'],
        ptraining = round_data[c]['timing']['ptime_training'],
        rtotal = round_data[c]['timing']['rtime_total']))
print('\n')
    
exp.training_replies[rounds - 1].dataframe

Federated parameters for each round are available in `exp.aggregated_params` (index 0 to (`rounds` - 1) ).

For example you can view the federated parameters for the last round of the experiment :

In [None]:
print("\nList the training rounds : ", exp.aggregated_params.keys())

print("\nAccess the federated params for the last training round :")
print("\t- params_path: ", exp.aggregated_params[rounds - 1]['params_path'])
print("\t- parameter data: ", exp.aggregated_params[rounds - 1]['params'].keys())


## Testing


In [None]:
import os
import shutil
import tempfile
import PIL
import torch
import numpy as np
from sklearn.metrics import classification_report

from monai.apps import download_and_extract
from monai.config import print_config
from monai.data import decollate_batch
from monai.metrics import ROCAUCMetric
from monai.networks.nets import DenseNet121
from monai.transforms import (
    Activations,
    AddChannel,
    AsDiscrete,
    Compose,
    LoadImage,
    RandFlip,
    RandRotate,
    RandZoom,
    ScaleIntensity,
    EnsureType,
)
from monai.utils import set_determinism

print_config()

data_dir = '/Users/balelli/Downloads/MedNIST_4'

In [None]:
class_names = sorted(x for x in os.listdir(data_dir)
                     if os.path.isdir(os.path.join(data_dir, x)))
num_class = len(class_names)
image_files = [
    [
        os.path.join(data_dir, class_names[i], x)
        for x in os.listdir(os.path.join(data_dir, class_names[i]))
    ]
    for i in range(num_class)
]

num_each = [len(image_files[i]) for i in range(num_class)]
image_files_list = []

image_class = []
for i in range(num_class):
    image_files_list.extend(image_files[i])
    image_class.extend([i] * num_each[i])
num_total = len(image_class)
image_width, image_height = PIL.Image.open(image_files_list[0]).size

print(f"Total image count: {num_total}")
print(f"Image dimensions: {image_width} x {image_height}")
print(f"Label names: {class_names}")
print(f"Label counts: {num_each}")

In [None]:
length = len(image_files_list)
indices = np.arange(length)
np.random.shuffle(indices)


test_split = int(0.1 * length)
test_indices = indices[:test_split]

test_x = [image_files_list[i] for i in test_indices]
test_y = [image_class[i] for i in test_indices]

val_transforms = Compose(
    [LoadImage(image_only=True), AddChannel(), ScaleIntensity(), EnsureType()])

y_pred_trans = Compose([EnsureType(), Activations(softmax=True)])
y_trans = Compose([EnsureType(), AsDiscrete(to_onehot=num_class)])

In [None]:
class MedNISTDataset(torch.utils.data.Dataset):
    def __init__(self, image_files, labels, transforms):
        self.image_files = image_files
        self.labels = labels
        self.transforms = transforms

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, index):
        return self.transforms(self.image_files[index]), self.labels[index]


test_ds = MedNISTDataset(test_x, test_y, val_transforms)
test_loader = torch.utils.data.DataLoader(
    test_ds, batch_size=300)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DenseNet121(spatial_dims=2, in_channels=1,
                    out_channels=num_class).to(device)
loss_function = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), 1e-5)
max_epochs = 4
val_interval = 1
auc_metric = ROCAUCMetric()

In [None]:
model = exp.model_instance
model.load_state_dict(exp.aggregated_params[rounds - 1]['params'])

In [None]:
y_true = []
y_pred = []
with torch.no_grad():
    for test_data in test_loader:
        test_images, test_labels = (
            test_data[0].to(device),
            test_data[1].to(device),
        )
        pred = model(test_images).argmax(dim=1)
        for i in range(len(pred)):
            y_true.append(test_labels[i].item())
            y_pred.append(pred[i].item())


In [None]:
print(classification_report(
    y_true, y_pred, target_names=class_names, digits=4))