# Simple Contribution Measure Model
This notebook implements a simple contribution model based on the notebook exemple of the DeAI repository. More precisely, it emulates a framework where `num_clients` clients are learning MNIST classification and diffusing their parameters on a graph representing by a communication matrix.

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import numpy as np
from tqdm import tqdm

# Reproductibiity
SEED = 123456789
torch.manual_seed(SEED)
np.random.seed(SEED)

# Print option
np.set_printoptions(precision=3, suppress=True)

In [2]:
# This cell is a copy-paste of the corresponding cell in the DeAI (only without using cuda)

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=2, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1)
        self.fc1 = nn.Linear(2048, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = self.conv3(x)
        x = F.relu(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output


def client_update(client_model, optimizer, train_loader, epoch=5):
    """Train a client_model on the train_loder data."""
    client_model.train()
    for e in range(epoch):
        for batch_idx, (data, target) in enumerate(train_loader):
            #data, target = data.cuda(), target.cuda()
            optimizer.zero_grad()
            output = client_model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
    return loss.item()


def diffuse_params(client_models, communication_matrix):
    """Diffuse the models with their neighbors."""
    if client_models:
      client_state_dicts = [model.state_dict() for model in client_models]
      keys = client_state_dicts[0].keys()
    for model, weights in zip(client_models, communication_matrix):
        neighbors = np.nonzero(weights)[0]
        model.load_state_dict(
            {
                key: torch.stack(
                    [weights[j]*client_state_dicts[j][key] for j in neighbors],
                    dim=0,
                ).sum(0) / weights.sum() 
                for key in keys
            }
        )


def average_models(global_model, client_models):
    """Average models across all clients."""
    global_dict = global_model.state_dict()
    for k in global_dict.keys():
        global_dict[k] = torch.stack([client_models[i].state_dict()[k] for i in range(len(client_models))], 0).mean(0)
    global_model.load_state_dict(global_dict)


def evaluate_model(model, data_loader):
    """Compute loss and accuracy of a single model on a data_loader."""
    model.eval()
    loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in data_loader:
            #data, target = data.cuda(), target.cuda()
            output = model(data)
            loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    loss /= len(data_loader.dataset)
    acc = correct / len(data_loader.dataset)

    return loss, acc

def evaluate_many_models(models, data_loader):
  """Compute average loss and accuracy of multiple models on a data_loader."""
  num_nodes = len(models)
  losses = np.zeros(num_nodes)
  accuracies = np.zeros(num_nodes)
  for i in range(num_nodes):
    losses[i], accuracies[i] = evaluate_model(models[i], data_loader)
  return losses, accuracies

### Federated Learning Contribution Measure
The following cells implement the case where the central server intitialize the model and the contributions $c_i^{(0)}$ of each user (to zero).

Then, for $t=1,...,r$ with $r=$ `num_rounds`:
1. The central server evaluates the accuracy $\hat{\theta}^{(t)}_{prior}$ of the model on a test datasets (theoretically representing the distribution across users, i.e. the overall tasks),
3. The central server distributes the model to **all** the $n=$ `num_clients` clients,
4. Each client evaluates the accuracy $\theta^{(t)}_{i,prior}$ of the model on its own data (i.e. its own distribution),
5. Each client learns the model on its own data (`epochs` times using batch of size `batch_size`),
6. Each client evaluates the new accuracy $\theta^{(t)}_{i,post}$ of the model on its own data,
7. Each client sends the trained model $\theta^{(t)}_{i,post}$, and $\theta^{(t)}_{i,prior} $ to the central server,
8. The central server evaluates each client model on its test dataset and compute their accuracy $\hat{\theta}^{(t)}_{i,post}$,
9. The central server aggregates the models and update the contributions of each user:
$$c_i^{(t)} = g\left(\hat{\theta}^{(t)}_{prior}, \{ \hat{\theta}^{(t)}_{i, post}, \theta^{(t)}_{i,prior}, \theta^{(t)}_{i,post}, c_i^{(t-1)}\}_{1\leq i \leq n}\right)$$ where $g$ is the contribution function.


Multiplicative version of $g(...)$:
$$c_i^{(t)} = c_i^{(t-1)}  + \theta^{(t)}_{i,prior} (\hat{\theta}^{(t)}_{i,post} - \hat{\theta}^{(t)}_{prior}) \left[ 1 - \frac{|(\theta^{(t)}_{i,post} - \theta^{(t)}_{i,prior}) - (\hat{\theta}^{(t)}_{i,post} - \hat{\theta}^{(t)}_{prior})|}{\sum_{j=1}^n |(\theta^{(t)}_{j,post} - \theta^{(t)}_{j,prior}) - (\hat{\theta}^{(t)}_{j,post} - \hat{\theta}^{(t)}_{prior})|} \right]$$

In [11]:
# contribution functions

def g_mult(theta_prior_s, thetas_post_s, thetas_prior, thetas_post, c_old):
    delta_global = thetas_post_s - theta_prior_s
    delta_global[delta_global < 0] = 0
    
    delta_clients = thetas_post - thetas_prior
    
    penalty = delta_clients - delta_global
    #penalty[penalty < 0] = 0
    
    if np.sum(np.abs(penalty)) < 1e-6:
        c = thetas_prior * c_old + delta_global
    else:
        c =  c_old + thetas_prior * delta_global * (1 -  np.abs(penalty)/np.sum(np.abs(penalty)))
    
    return c

### Loading the data

In [12]:
# Network topology
num_clients = 5
comm_matrix = np.ones((num_clients, num_clients)) / num_clients

# Training parameters
num_rounds = 5
epochs = 1
batch_size = 32

# Loading the train data
traindata_full = datasets.MNIST(root='./data', 
                           train=True, 
                           download=True, 
                           transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))]))


# smaller dataset for code testing
#N_full = len(traindata_full)
#N = 1000 * num_clients
#traindata = torch.utils.data.random_split(traindata_full, [N, N_full - N])[0]
traindata = traindata_full


traindata_split = torch.utils.data.random_split(traindata,
                                                [int(len(traindata) / num_clients) for _ in range(num_clients)])

train_loaders = [torch.utils.data.DataLoader(x, batch_size=batch_size, shuffle=True) for x in traindata_split]


# Loading the test data
testdata_full = datasets.MNIST(root='./data', 
                           train=False, 
                           download=True, 
                           transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))]))


# smaller dataset for code testing
#N_full_te = len(testdata_full)
#N_te = 1000 * num_clients
#testdata = torch.utils.data.random_split(testdata_full, [N_te, N_full_te - N_te])[0]
testdata = testdata_full


testdata_split = torch.utils.data.random_split(testdata,
                                               [int(len(testdata) / num_clients) for _ in range(num_clients)])

test_loaders = [torch.utils.data.DataLoader(x, batch_size=10*batch_size, shuffle=True) for x in testdata_split]

# Full dataset for the server (_s for server)
test_loader_s = torch.utils.data.DataLoader(testdata, batch_size=10*batch_size, shuffle=True)

### Emulating federated learning

In [13]:
# step 0. Initialization
global_model = Net()
contributions = np.zeros((num_rounds + 1, num_clients))

# server side
theta_prior_s = np.zeros(num_rounds)
test_loss_prior_s = np.zeros(num_rounds)
thetas_post_s = np.zeros((num_rounds, num_clients))
test_losses_post_s = np.zeros((num_rounds, num_clients))

# client side
thetas_prior = np.zeros((num_rounds, num_clients))
test_losses_prior = np.zeros((num_rounds, num_clients))
thetas_post = np.zeros((num_rounds, num_clients))
test_losses_post = np.zeros((num_rounds, num_clients))



# Iteration
for r in range(num_rounds):
    print('%d-th round' % (r))
    
    # step 1. Evaluating the prior model on the global test dataset
    test_loss_prior_s[r], theta_prior_s[r] = evaluate_model(global_model, test_loader_s)
    print("\tPrior performance on server side: ", theta_prior_s[r])


    # step 2: Sending the model (emulating)
    client_models = [Net() for _ in range(num_clients)]
    for model in client_models:
        model.load_state_dict(global_model.state_dict())


    # step 3. Evaluating the prior model on each local test dataset
    for i in range(num_clients):
        test_losses_prior[r, i], thetas_prior[r, i] = evaluate_model(client_models[i], test_loaders[i])

    print("\tPrior performance on client side: ", thetas_prior[r, :])

    
    # step 4. Learning on client side
    opt = [optim.SGD(model.parameters(), lr=0.1) for model in client_models]
    loss = 0
    for i in range(num_clients):
        loss += client_update(client_models[i], opt[i], train_loaders[i], epoch=epochs)
        
    #print('\tAverage train loss %0.3g' % (loss / num_clients))

    
    # step 5. Evaluating the posterior model on each local test dataset

    for i in range(num_clients):
        test_losses_post[r, i], thetas_post[r, i] = evaluate_model(client_models[i], test_loaders[i])
    
    print("\tPosterior performance on client side: ", thetas_post[r, :])
    
    
    # step 6. Sending the local models to the server (nothing to do)


    # step 7. Evaluating the posterior models on the global test dataset
    for i in range(num_clients):
        test_losses_post_s[r, i], thetas_post_s[r, i] = evaluate_model(client_models[i], test_loader_s)
    
    print("\tPosterior performance on server side: ", thetas_post_s[r, :])
    
    # step 8. Aggregating the models and updating the contributions
    average_models(global_model, client_models)
    contributions[r+1,:]= g_mult(theta_prior_s[r], thetas_post_s[r,:], thetas_prior[r,:], thetas_post[r,:], contributions[r,:])
    
    print("\tTotal contributions: {c_tot} \t Normalized: {c}".format(c_tot=contributions[r+1, :], c=contributions[r+1, :]/np.sum(contributions[r+1, :])))

    
    
    
test_loss_final_s, theta_final_s = evaluate_model(global_model, test_loader_s)
print("\tFinal performance: ", theta_final_s)

0-th round
	Prior performance on server side:  0.114
	Prior performance on client side:  [0.115 0.114 0.117 0.105 0.118]
	Posterior performance on client side:  [0.937 0.941 0.934 0.925 0.957]
	Posterior performance on server side:  [0.938 0.944 0.942 0.929 0.952]
	Total contributions: [0.087 0.082 0.046 0.064 0.098] 	 Normalized: [0.231 0.216 0.122 0.17  0.261]
1-th round
	Prior performance on server side:  0.9539
	Prior performance on client side:  [0.952 0.956 0.951 0.951 0.96 ]
	Posterior performance on client side:  [0.966 0.959 0.962 0.957 0.964]
	Posterior performance on server side:  [0.968 0.962 0.965 0.963 0.959]
	Total contributions: [0.1   0.085 0.056 0.07  0.103] 	 Normalized: [0.241 0.206 0.135 0.169 0.248]
2-th round
	Prior performance on server side:  0.9737
	Prior performance on client side:  [0.974 0.976 0.973 0.971 0.976]
	Posterior performance on client side:  [0.974 0.966 0.971 0.965 0.968]
	Posterior performance on server side:  [0.968 0.965 0.972 0.971 0.967]
	To