<a href="https://colab.research.google.com/github/XinyiYS/FairAndPrivateFederatedLearning/blob/master/Federated_Dataset_with_Shapley_MNIST.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Federated Dataset with Shapley.ipynb


In [1]:
!pip install 'syft[udacity]'

Collecting syft[udacity]
[?25l  Downloading https://files.pythonhosted.org/packages/1f/8b/dc9a253392908d480322466832d618d85cdb1b66a1781604cf1064b50c32/syft-0.2.4-py3-none-any.whl (341kB)
[K     |█                               | 10kB 27.0MB/s eta 0:00:01[K     |██                              | 20kB 34.7MB/s eta 0:00:01[K     |██▉                             | 30kB 38.7MB/s eta 0:00:01[K     |███▉                            | 40kB 40.2MB/s eta 0:00:01[K     |████▉                           | 51kB 42.7MB/s eta 0:00:01[K     |█████▊                          | 61kB 45.0MB/s eta 0:00:01[K     |██████▊                         | 71kB 44.6MB/s eta 0:00:01[K     |███████▊                        | 81kB 44.6MB/s eta 0:00:01[K     |████████▋                       | 92kB 45.6MB/s eta 0:00:01[K     |█████████▋                      | 102kB 44.1MB/s eta 0:00:01[K     |██████████▋                     | 112kB 44.1MB/s eta 0:00:01[K     |███████████▌                    | 122kB 4

In [1]:
%tensorflow_version 1.x

TensorFlow 1.x selected.


In [2]:
import random
from itertools import permutations
import copy
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

import syft as sy  # <-- NEW: import the Pysyft library
hook = sy.TorchHook(torch)  # <-- NEW: hook PyTorch ie add extra functionalities to support Federated Learning


# bob = sy.VirtualWorker(hook, id="bob")  # <-- NEW: define remote worker bob
# alice = sy.VirtualWorker(hook, id="alice")  # <-- NEW: and alice

Falling back to insecure randomness since the required custom op could not be found for the installed version of TensorFlow. Fix this by compiling custom ops. Missing file was '/usr/local/lib/python3.6/dist-packages/tf_encrypted/operations/secure_random/secure_random_module_tf_1.15.2.so'





In [0]:
class Arguments():
    def __init__(self):
        self.batch_size = 10 #@param
        self.test_batch_size = 5000 #@param
        self.epochs =  5#@param
        self.lr = 0.15 #@param
        self.momentum = 0.5
        self.no_cuda = False
        self.seed = 1
        self.log_interval = 150 #@param
        self.save_model = False
        self.num_workers = 3#@param
        self.workers =  [sy.VirtualWorker(hook, id=str(i)) for i in range(self.num_workers) ]


args = Arguments()

use_cuda = not args.no_cuda and torch.cuda.is_available()

torch.manual_seed(args.seed)

device = torch.device("cuda" if use_cuda else "cpu")

kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}

In [4]:
federated_train_loader = sy.FederatedDataLoader( # <-- this is now a FederatedDataLoader 
    datasets.MNIST('../data', train=True, download=True,
                   transform=transforms.Compose([
                       transforms.Pad((2,2,2,2)),
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))
                   ]))
    .federate(args.workers), # <-- NEW: we distribute the dataset across all the workers, it's now a FederatedDataset
    # .federate((bob, alice)), # <-- NEW: we distribute the dataset across all the workers, it's now a FederatedDataset
    batch_size=args.batch_size, shuffle=True, **kwargs)

test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=False, transform=transforms.Compose([
                    transforms.Pad((2,2,2,2)),
                    transforms.ToTensor(),
                    transforms.Normalize((0.1307,), (0.3081,))
                   ])),
    batch_size=args.test_batch_size, shuffle=True, **kwargs)


from collections import defaultdict
worker_counts = defaultdict(int)
worker_data_loader = defaultdict(list)
count = 0
for batch_idx, (data, target) in enumerate(federated_train_loader): # <-- now it is a distributed dataset
    count += 1
    worker_counts[data.location.id] += 1
    worker_data_loader[data.location.id].append((data, target))

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to ../data/MNIST/raw/train-images-idx3-ubyte.gz


HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))

Extracting ../data/MNIST/raw/train-images-idx3-ubyte.gz to ../data/MNIST/raw
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to ../data/MNIST/raw/train-labels-idx1-ubyte.gz


HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))

Extracting ../data/MNIST/raw/train-labels-idx1-ubyte.gz to ../data/MNIST/raw
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to ../data/MNIST/raw/t10k-images-idx3-ubyte.gz



HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))

Extracting ../data/MNIST/raw/t10k-images-idx3-ubyte.gz to ../data/MNIST/raw
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to ../data/MNIST/raw/t10k-labels-idx1-ubyte.gz


HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))

Extracting ../data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ../data/MNIST/raw
Processing...
Done!







In [0]:
# class Net(nn.Module):
#     def __init__(self):
#         super(Net, self).__init__()
#         self.conv1 = nn.Conv2d(1, 20, 5, 1)
#         self.conv2 = nn.Conv2d(20, 50, 5, 1)
#         self.fc1 = nn.Linear(4*4*50, 500)
#         self.fc2 = nn.Linear(500, 10)

#     def forward(self, x):
#         x = F.relu(self.conv1(x))
#         x = F.max_pool2d(x, 2, 2)
#         x = F.relu(self.conv2(x))
#         x = F.max_pool2d(x, 2, 2)
#         x = x.view(-1, 4*4*50)
#         x = F.relu(self.fc1(x))
#         x = self.fc2(x)
#         return F.log_softmax(x, dim=1)


class CNN_Net(nn.Module):
    def __init__(self):
        super(CNN_Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 64, 3, 1)
        self.conv2 = nn.Conv2d(64, 16, 7, 1)
        self.fc1 = nn.Linear(4*4*16, 200)
        self.fc2 = nn.Linear(200, 10)

    def forward(self, x):
        x = x.view(-1, 1, 32, 32)
        x = F.tanh(self.conv1(x))
        x = F.max_pool2d(x, 2, 2)
        x = F.tanh(self.conv2(x))
        x = F.max_pool2d(x, 2, 2)
        x = x.view(-1, 4*4*16)
        x = F.tanh(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)
    

class MLP_Net(nn.Module):
    def __init__(self):
        super(MLP_Net, self).__init__()        
        self.fc1 = nn.Linear(1024, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)

    def forward(self, x):
        x = x.view(-1,  1024)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)        
        return F.log_softmax(x, dim=1)

In [0]:
def train(args, model, device, federated_train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(federated_train_loader): # <-- now it is a distributed dataset
        model.send(data.location) # <-- NEW: send the model to the right location
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        model.get() # <-- NEW: get the model back
        if batch_idx % args.log_interval == 0:
            loss = loss.get() # <-- NEW: get the loss back
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * args.batch_size, len(federated_train_loader) * args.batch_size,
                100. * batch_idx / len(federated_train_loader), loss.item()))

def test(args, model, device, test_loader, verbose=True):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
            pred = output.argmax(1, keepdim=True) # get the index of the max log-probability 
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    if verbose:
        print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
            test_loss, correct, len(test_loader.dataset),
            100. * correct / len(test_loader.dataset)))
    test_acc = 1.* correct / len(test_loader.dataset)
    return test_acc

def averge_parameters(redundant_models):
    final_model = Net().to(device)
    for i, redundant_model in enumerate(redundant_models):
        for param_final, param_redundant in zip(final_model.parameters(), redundant_model.parameters()):
            if i == 0:
                param_final.data = param_redundant.data * 1./ len(redundant_models)
            else:
                param_final.data += param_redundant.data * 1./ len(redundant_models)
    return final_model


def add_update_to_model(model, update, weight=1.0):
    for param_model, param_update in zip(model.parameters(), update):
        param_model.data += weight * param_update.data
    return model

def compute_grad_update(old_model, new_model):
    # maybe later to implement on selected layers/parameters
    return [(new_param.data - old_param.data) for old_param, new_param in zip(old_model.parameters(), new_model.parameters())]


def add_gradient_updates(grad_update_1, grad_update_2):
    assert len(grad_update_1) == len(grad_update_2), "Lengths of the two grad_updates not equal"
    return [ grad_update_1[i] + grad_update_2[i]  for i in range(len(grad_update_1))]

def cosine_similarity(old_grad_update, new_grad_update):    
    cos = nn.CosineSimilarity(dim=0)
    # flatten the gradient updates and find cos_sim layer-wise and then take average
    similarity = 0
    for param_update_old, param_update_new in zip(old_grad_update, new_grad_update):
        similarity += cos(param_update_old.data.view(-1), param_update_new.data.view(-1))
    similarity /= len(old_grad_update) # divide by # layers
    return similarity

def train_shapley(args, model, device, worker_data_loader, optimizer, epoch, contributions, Max_num_sequences=20):
    
    workerIds_str = [worker.id for worker in args.workers]
    workerIds_int = [int(worker.id) for worker in args.workers]

    all_sequences = list(permutations(workerIds_int))
    if len(all_sequences) > Max_num_sequences:
        random.shuffle(all_sequences)
        all_sequences = all_sequences[:Max_num_sequences]

    test_acc_prev_epoch = test(args, model, device, test_loader, verbose=False)

    model_prev_epoch = copy.deepcopy(model)
    model_prev_epoch.load_state_dict(model.state_dict())

    # need to deep clone the model before starting the optimizer step and so on
    # in principle, there should be M different models/different sets of gradient updates after one epoch
    # M being the number of sequences tried

    model.train()
    # <optional> optimization: for each worker, no longer goes through the entire load: 1. random sampling or 2. organized iteration
    grad_updates = [None for _ in workerIds]
    # gather all the model updates
    for workerId in workerIds_str:
        for data, target in worker_data_loader[workerId]:
            model.send(data.location) # <-- NEW: send the model to the right location
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            model.get() # <-- NEW: get the model back
        
        grad_updates[int(workerId)] = compute_grad_update(model_prev_epoch, model)
        model.load_state_dict(model_prev_epoch.state_dict())
    print("decentralized training complete and all the gradient updates collected")
    # compute the running shapley by evaluating using test acc
    update_weight = 1. / len(args.workers)

    marginal_contributions = torch.tensor([0.0 for i in workerIds])
    leave_one_out_contributions = torch.tensor([0.0 for i  in workerIds])
    # coalition_test_acc_dict = {}
    for sequence in all_sequences:
        curr_contributions = []
        sequential_running_model = copy.deepcopy(model_prev_epoch)

        # curr_coalition = set()
        for i, workerId in enumerate(sequence):
            # curr_coalition.add(workerId)
            # coalition = tuple(sorted(list(curr_coalition)))
            # if coalition in coalition_test_acc_dict:
                # test_acc = coalition_test_acc_dict[coalition]
            # else:
                # test_acc = test(args, sequential_running_model, device, test_loader, verbose=False)
                # sequence_test_acc_dict[coalition] = test_acc
            sequential_running_model = add_update_to_model(sequential_running_model, grad_updates[workerId], weight=update_weight)
            test_acc = test(args, sequential_running_model, device, test_loader, verbose=False)
            contribution = test_acc
            if not curr_contributions:
                marginal_contributions[workerId] += contribution - test_acc_prev_epoch
            else:
                marginal_contributions[workerId] += contribution - curr_contributions[-1]
            
                if i == len(sequence)-1 and not leave_one_out_contributions[workerId]:
                    leave_one_out_contributions[workerId] = test_acc - curr_contributions[-1]
            curr_contributions.append(contribution)

        print(curr_contributions)
    num_sequences = len(all_sequences)

    contributions['shapley'] += marginal_contributions/ num_sequences
    contributions['loo'] += leave_one_out_contributions

    print("Marginal contributions this epoch:", marginal_contributions/ num_sequences)
    print("LOO contributions this epoch:", leave_one_out_contributions)

    model.load_state_dict(sequential_running_model.state_dict())

    return contributions

In [10]:
# try randomly sampling from all the possible sequences
# and compute an approximation to the Shapley values
# for each sequence, there is a contribution value for all workers involved
# and average out all the contribution values for a single worker, across all the sampled sequence to compute this iteration's Shapley Value

workerIds = [worker.id for worker in args.workers]

model = MLP_Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=args.lr) # TODO momentum is not supported at the moment

past_contributions = torch.zeros(np.array(workerIds).shape)
loo_contributions = torch.zeros(np.array(workerIds).shape)
contributions = {'shapley':past_contributions, 'loo':loo_contributions}

for epoch in range(1, args.epochs + 1):
    contributions = train_shapley(args, model, device, worker_data_loader, optimizer, epoch, contributions)
    test(args, model, device, test_loader)
    print(contributions)

if (args.save_model):
    torch.save(model.state_dict(), "mnist_mlp.pt")

decentralized training complete and all the gradient updates collected
[0.8872, 0.9116, 0.9124]
[0.8872, 0.8953, 0.9124]
[0.9196, 0.9116, 0.9124]
[0.9196, 0.9187, 0.9124]
[0.9036, 0.8953, 0.9124]
[0.9036, 0.9187, 0.9124]
Marginal contributions this epoch: tensor([0.2548, 0.2826, 0.2665])
LOO contributions this epoch: tensor([-0.0063,  0.0171,  0.0008])

Test set: Average loss: 0.3952, Accuracy: 9124/10000 (91%)

{'shapley': tensor([0.2548, 0.2826, 0.2665]), 'loo': tensor([-0.0063,  0.0171,  0.0008])}
decentralized training complete and all the gradient updates collected
[0.9387, 0.9517, 0.9572]
[0.9387, 0.9501, 0.9572]
[0.9399, 0.9517, 0.9572]
[0.9399, 0.9516, 0.9572]
[0.9377, 0.9501, 0.9572]
[0.9377, 0.9516, 0.9572]
Marginal contributions this epoch: tensor([0.0147, 0.0160, 0.0141])
LOO contributions this epoch: tensor([0.0056, 0.0071, 0.0055])

Test set: Average loss: 0.1401, Accuracy: 9572/10000 (96%)

{'shapley': tensor([0.2694, 0.2987, 0.2806]), 'loo': tensor([-0.0007,  0.0242,  0

In [11]:
workerIds = [worker.id for worker in args.workers]

model = CNN_Net().to(device)
optimizer = optim.SGD(model.parameters(), lr=args.lr) # TODO momentum is not supported at the moment

past_contributions = torch.zeros(np.array(workerIds).shape)
loo_contributions = torch.zeros(np.array(workerIds).shape)
contributions = {'shapley':past_contributions, 'loo':loo_contributions}

for epoch in range(1, args.epochs + 1):
    contributions = train_shapley(args, model, device, worker_data_loader, optimizer, epoch, contributions)
    test(args, model, device, test_loader)
    print(contributions)

if (args.save_model):
    torch.save(model.state_dict(), "mnist_mlp.pt")



decentralized training complete and all the gradient updates collected
[0.9435, 0.9444, 0.942]
[0.9435, 0.9367, 0.942]
[0.9402, 0.9444, 0.942]
[0.9402, 0.9453, 0.942]
[0.9419, 0.9367, 0.942]
[0.9419, 0.9453, 0.942]
Marginal contributions this epoch: tensor([0.2925, 0.2952, 0.2921])
LOO contributions this epoch: tensor([-0.0033,  0.0053, -0.0024])

Test set: Average loss: 0.1748, Accuracy: 9420/10000 (94%)

{'shapley': tensor([0.2925, 0.2952, 0.2921]), 'loo': tensor([-0.0033,  0.0053, -0.0024])}
decentralized training complete and all the gradient updates collected
[0.9667, 0.9753, 0.9805]
[0.9667, 0.976, 0.9805]
[0.9666, 0.9753, 0.9805]
[0.9666, 0.9757, 0.9805]
[0.9677, 0.976, 0.9805]
[0.9677, 0.9757, 0.9805]
Marginal contributions this epoch: tensor([0.0127, 0.0125, 0.0134])
LOO contributions this epoch: tensor([0.0048, 0.0045, 0.0052])

Test set: Average loss: 0.0633, Accuracy: 9805/10000 (98%)

{'shapley': tensor([0.3052, 0.3076, 0.3055]), 'loo': tensor([0.0015, 0.0098, 0.0028])}
de

# train shapley that supports both
1. shapley train model in each epoch and evaluate based on test acc
2. shapley evaluate based on cosine similarity with global historic gradiant update


In [0]:

# this train_shapley works for shapley train, NOT federated learning and only on test acc
def train_shapley(args, model, device, worker_data_loader, optimizer, epoch, past_contributions, Max_num_sequences=20):
    
    workerIds = [worker.id for worker in args.workers]
    all_sequences = list(permutations(workerIds))
    if len(all_sequences) > Max_num_sequences:
        random.shuffle(all_sequences)
        all_sequences =all_sequences[:Max_num_sequences]

    sequence_contribution_dict = {}
    test_acc_prev_epoch = test(args, model, device, test_loader, verbose=False)
    sequence_contribution_dict['-1'] = test_acc_prev_epoch

    model.train()
    
    # need to deep clone the model before starting the optimizer step and so on
    # in principle, there should be M different models/different sets of gradient updates after one epoch
    # M being the number of sequences tried
    model_prev_epoch = Net().to(device)
    model_prev_epoch.load_state_dict(model.state_dict())


    # another way of measuring contribution:
    # compute a global gradient update history (a up-to-date vector)
    # compare the cosine similarity between each individual worker with the global
    past_gradient_updates = None
    cos = nn.CosineSimilarity()
    # flatten the gradient updates and find cos_sim layer-wise and then take average
    cos_sim = 0
    for param_curr, param_past in zip(model.parameters(), past_model.parameters()):
        cos_sim += cos(param_curr.grad.data.view(-1), param_past.grad.data.view(-1))
    cos_sim /= len(model.parameters()) # divide by # layers


    # <optional> optimization: for each worker, no longer goes through the entire load: 1. random sampling or 2. organized iteration

    redundant_models = []
    marginal_contributions = torch.tensor([0.0 for i  in workerIds])
    for sequence in all_sequences:

        curr_contributiuons = []
        for workerId in sequence:

            for data, target in worker_data_loader[workerId]:
                model.send(data.location) # <-- NEW: send the model to the right location
                data, target = data.to(device), target.to(device)
                optimizer.zero_grad()
                output = model(data)
                loss = F.nll_loss(output, target)
                loss.backward()
                optimizer.step()

                model.get() # <-- NEW: get the model back
            
            test_acc = test(args, model, device, test_loader, verbose=False)
            contribution = test_acc
            if curr_contributiuons:
                marginal_contributions[int(workerId)] += contribution - curr_contributiuons[-1]
            else:
                marginal_contributions[int(workerId)] += contribution - sequence_contribution_dict['-1']

            curr_contributiuons.append(contribution)
        assert len(curr_contributiuons) == len(sequence), "Current contributions not equal to num of workers"    
        sequence_contribution_dict['_'.join(sequence)] = curr_contributiuons

        redundant_model = Net().to(device)
        redundant_model.load_state_dict(model.state_dict())
        redundant_models.append(redundant_model)

        model.load_state_dict(model_prev_epoch.state_dict())

    num_sequences = len(all_sequences)
    past_contributions += marginal_contributions/ num_sequences
    print("Marginal contributions this epoch:", marginal_contributions/ num_sequences)

    final_model = averge_parameters(redundant_models)
    model.load_state_dict(final_model.state_dict())
    del final_model
    del redundant_models

    return past_contributions
