# Federated PyTorch TinyImageNet Tutorial
## Using low-level Python API

# Long-Living entities update

* We now may have director running on another machine.
* We use Federation API to communicate with Director.
* Federation object should hold a Director's client (for user service)
* Keeping in mind that several API instances may be connacted to one Director.


* We do not think for now how we start a Director.
* But it knows the data shape and target shape for the DataScience problem in the Federation.
* Director holds the list of connected envoys, we do not need to specify it anymore.
* Director and Envoys are responsible for encrypting connections, we do not need to worry about certs.


* Yet we MUST have a cert to communicate to the Director.
* We MUST know the FQDN of a Director.
* Director communicates data and target shape to the Federation interface object.


* Experiment API may use this info to construct a dummy dataset and a `shard descriptor` stub.

In [1]:
!pip install torchvision==0.10.1

You should consider upgrading via the '/home/itrushkin/.virtualenvs/openfl/bin/python -m pip install --upgrade pip' command.[0m


## Connect to the Federation

In [2]:
# Create a federation
from openfl.interface.interactive_api.federation import Federation

# please use the same identificator that was used in signed certificate
client_id = 'api'
cert_dir = 'cert'
director_node_fqdn = 'nnlicv431.inn.intel.com'
# 1) Run with API layer - Director mTLS 
# If the user wants to enable mTLS their must provide CA root chain, and signed key pair to the federation interface
# cert_chain = f'{cert_dir}/root_ca.crt'
# api_certificate = f'{cert_dir}/{client_id}.crt'
# api_private_key = f'{cert_dir}/{client_id}.key'

# federation = Federation(client_id=client_id, director_node_fqdn=director_node_fqdn, director_port='50051',
#                        cert_chain=cert_chain, api_cert=api_certificate, api_private_key=api_private_key)

# --------------------------------------------------------------------------------------------------------------------

# 2) Run with TLS disabled (trusted environment)
# Federation can also determine local fqdn automatically
federation = Federation(client_id=client_id, director_node_fqdn=director_node_fqdn, director_port='50053', tls=False)


In [3]:
federation.target_shape

['300', '400']

In [4]:
shard_registry = federation.get_shard_registry()
shard_registry

{'env_5': {'shard_info': node_info {
    name: "env_5"
  }
  shard_description: "TinyImageNetDataset dataset, shard number 5 out of 10"
  sample_shape: "300"
  sample_shape: "400"
  sample_shape: "3"
  target_shape: "300"
  target_shape: "400",
  'is_online': True,
  'is_experiment_running': False,
  'last_updated': '2022-02-03 15:50:38',
  'current_time': '2022-02-03 15:50:52',
  'valid_duration': seconds: 120,
  'experiment_name': 'ExperimentName Mock'},
 'env_6': {'shard_info': node_info {
    name: "env_6"
  }
  shard_description: "TinyImageNetDataset dataset, shard number 6 out of 10"
  sample_shape: "300"
  sample_shape: "400"
  sample_shape: "3"
  target_shape: "300"
  target_shape: "400",
  'is_online': True,
  'is_experiment_running': False,
  'last_updated': '2022-02-03 15:50:38',
  'current_time': '2022-02-03 15:50:52',
  'valid_duration': seconds: 120,
  'experiment_name': 'ExperimentName Mock'},
 'env_2': {'shard_info': node_info {
    name: "env_2"
  }
  shard_description

In [5]:
# First, request a dummy_shard_desc that holds information about the federated dataset 
dummy_shard_desc = federation.get_dummy_shard_descriptor(size=10)
sample, target = dummy_shard_desc.get_dataset('train')[0]

## Creating a FL experiment using Interactive API

In [6]:
from openfl.interface.interactive_api.experiment import TaskInterface, DataInterface, ModelInterface, FLExperiment

2022-02-03 15:50:52.619103: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-02-03 15:50:52.619129: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


### Register dataset

In [7]:
import torchvision
from torchvision import transforms as T

normalize = T.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])

augmentation = T.RandomApply(
    [T.RandomHorizontalFlip(),
     T.RandomRotation(10),
     T.RandomResizedCrop(64)], 
    p=.8
)

training_transform = T.Compose(
    [T.Lambda(lambda x: x.convert("RGB")),
     augmentation,
     T.ToTensor(),
     normalize]
)

valid_transform = T.Compose(
    [T.Lambda(lambda x: x.convert("RGB")),
     T.ToTensor(),
     normalize]
)


In [8]:
from torch.utils.data import Dataset


class TransformedDataset(Dataset):
    """Image Person ReID Dataset."""

    def __init__(self, dataset, transform=None, target_transform=None):
        """Initialize Dataset."""
        self.dataset = dataset
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        """Length of dataset."""
        return len(self.dataset)

    def __getitem__(self, index):
        img, label = self.dataset[index]
        label = self.target_transform(label) if self.target_transform else label
        img = self.transform(img) if self.transform else img
        return img, label


In [9]:
class TinyImageNetDataset(DataInterface):
    def __init__(self, **kwargs):
        self.kwargs = kwargs
    
    @property
    def shard_descriptor(self):
        return self._shard_descriptor
        
    @shard_descriptor.setter
    def shard_descriptor(self, shard_descriptor):
        """
        Describe per-collaborator procedures or sharding.

        This method will be called during a collaborator initialization.
        Local shard_descriptor  will be set by Envoy.
        """
        self._shard_descriptor = shard_descriptor
        
        self.train_set = TransformedDataset(
            self._shard_descriptor.get_dataset('train'),
            transform=training_transform
        )
        self.valid_set = TransformedDataset(
            self._shard_descriptor.get_dataset('val'),
            transform=valid_transform
        )
        
    def get_train_loader(self, **kwargs):
        """
        Output of this method will be provided to tasks with optimizer in contract
        """
        return DataLoader(
            self.train_set, num_workers=8, batch_size=self.kwargs['train_bs'], shuffle=True
            )

    def get_valid_loader(self, **kwargs):
        """
        Output of this method will be provided to tasks without optimizer in contract
        """
        return DataLoader(self.valid_set, num_workers=8, batch_size=self.kwargs['valid_bs'])

    def get_train_data_size(self):
        """
        Information for aggregation
        """
        return len(self.train_set)

    def get_valid_data_size(self):
        """
        Information for aggregation
        """
        return len(self.valid_set)
    

In [10]:
fed_dataset = TinyImageNetDataset(train_bs=1, valid_bs=1)

### Describe the model and optimizer

In [11]:
import os
import glob
from torch.utils.data import Dataset, DataLoader
from PIL import Image

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [12]:
torch.manual_seed(0)
np.random.seed(0)

In [13]:
"""
MobileNetV2 model
"""

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.model = torchvision.models.mobilenet_v2(pretrained=True)
        self.model.requires_grad_(False)
        self.model.classifier[1] = torch.nn.Linear(in_features=1280, \
                        out_features=200, bias=True)

    def forward(self, x):
        x = self.model.forward(x)
        return x

model_net = Net()

In [14]:
params_to_update = []
for param in model_net.parameters():
    if param.requires_grad == True:
        params_to_update.append(param)
        
optimizer_adam = optim.Adam(params_to_update, lr=1e-4)

def cross_entropy(output, target):
    """Binary cross-entropy metric
    """
    return F.cross_entropy(input=output,target=target)

### Register model

In [15]:
from copy import deepcopy

framework_adapter = 'openfl.plugins.frameworks_adapters.pytorch_adapter.FrameworkAdapterPlugin'
model_interface = ModelInterface(model=model_net, optimizer=optimizer_adam, framework_plugin=framework_adapter)

# Save the initial model state
initial_model = deepcopy(model_net)

## Define and register FL tasks

In [16]:
task_interface = TaskInterface()
import torch

import tqdm
from openfl.utilities.fedcurv.torch import FedCurv
from openfl.utilities.fedcurv.aggregation_function import FedCurvWeightedAverage

# The Interactive API supports registering functions definied in main module or imported.
def function_defined_in_notebook(some_parameter):
    print(f'Also I accept a parameter and it is {some_parameter}')
    
fedcurv = FedCurv(model_interface.provide_model(), importance=0.01)

# Task interface currently supports only standalone functions.
@task_interface.add_kwargs(**{'some_parameter': 42})
@task_interface.register_fl_task(model='net_model', data_loader='train_loader', \
                     device='device', optimizer='optimizer')
@task_interface.set_aggregation_function(FedCurvWeightedAverage())                          
def train(net_model, train_loader, optimizer, device, loss_fn=cross_entropy, some_parameter=None):
    fedcurv.on_train_begin(net_model)
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
    function_defined_in_notebook(some_parameter)
    
    train_loader = tqdm.tqdm(train_loader, desc="train")
    net_model.train()
    net_model.to(device)

    losses = []

    for data, target in train_loader:
        data, target = torch.tensor(data).to(device), torch.tensor(
            target).to(device) 
        optimizer.zero_grad()
        output = net_model(data)
        loss = loss_fn(output=output, target=target) + fedcurv.get_penalty(net_model)
        loss.backward()
        optimizer.step()
        losses.append(loss.detach().cpu().numpy())
    fedcurv.on_train_end(net_model, train_loader, device)
        
    return {'train_loss': np.mean(losses),}


@task_interface.register_fl_task(model='net_model', data_loader='val_loader', device='device')     
def validate(net_model, val_loader, device):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    net_model.eval()
    net_model.to(device)
    
    val_loader = tqdm.tqdm(val_loader, desc="validate")
    val_score = 0
    total_samples = 0

    with torch.no_grad():
        for data, target in val_loader:
            samples = target.shape[0]
            total_samples += samples
            data, target = torch.tensor(data).to(device), \
                torch.tensor(target).to(device, dtype=torch.int64)
            output = net_model(data)
            pred = output.argmax(dim=1,keepdim=True)
            val_score += pred.eq(target).sum().cpu().numpy()
            
    return {'acc': val_score / total_samples,}

## Time to start a federated learning experiment

In [17]:
# create an experimnet in federation
experiment_name = f'tinyimagenet_test_experiment_{fedcurv.importance}'
fl_experiment = FLExperiment(federation=federation, experiment_name=experiment_name)

In [18]:
# The following command zips the workspace and python requirements to be transfered to collaborator nodes
fl_experiment.start(
    model_provider=model_interface, 
    task_keeper=task_interface,
    data_loader=fed_dataset,
    rounds_to_train=5,
    opt_treatment='CONTINUE_GLOBAL'
)

In [19]:
# If user want to stop IPython session, then reconnect and check how experiment is going
# fl_experiment.restore_experiment_state(model_interface)

fl_experiment.stream_metrics()