# Configuration

Import all required modules, hook PySyft, declare functions for simulating FL environment (Star Architecture)

In [2]:
%load_ext autoreload
%load_ext tensorboard
%autoreload 2

####################
# Required Modules #
####################

# Generic
import copy
import math
import os
import random
import sys
import time
from collections import OrderedDict
from pathlib import Path
from collections import defaultdict
import json

# Libs
import sklearn as skl
from sklearn import preprocessing
import sklearn.datasets as skld
from sklearn.metrics import mean_squared_error, accuracy_score, roc_auc_score
from sklearn.model_selection import train_test_split, StratifiedShuffleSplit
import numpy as np
import pandas as pd
import missingno as msno
import matplotlib.pyplot as plt
import seaborn as sbn
import syft as sy
import torch as th
from torch import nn
from torch.utils.data import TensorDataset, DataLoader
from tqdm import tqdm, tnrange, tqdm_notebook
from tqdm.notebook import trange
from IPython.display import display

# Custom
from src.datapipeline import Preprocessor


In [None]:
data_proportions = [[0.2, 0.2, 0.2, 0.2, 0.2],
                   [0.3, 0.3, 0.13, 0.13, 0.13], 
                   [0.4, 0.3, 0.2, 0.05, 0.05],
                   [0.7, 0.2, 0.03, 0.03, 0.03]]

DGPs = [[(0, 0.1), (0, 0.1), (0, 0.1), (0, 0.1), (0, 0.1)],
       [(0, 0.2), (0, 0.2), (0, 0.1), (0, 0.1), (0, 0.1)], 
       [(0, 0.3), (0, 0.2), (0, 0.1), (0, 0.1), (0, 0.1)],
       [(0, 0.1), (0, 0.1), (0, 0.1), (0, 0.1), (0, 0.1)]]

distribution_of_labels = [[(0.5, 0.5), (0.5, 0.5), (0.5, 0.5), (0.5, 0.5), (0.5, 0.5)],
                           [(0.7, 0.3), (0.6, 0.4), (0.6, 0.4), (0.5, 0.5), (0.5, 0.5)], 
                           [(0.85, 0.15), (0.8, 0.2), (0.7, 0.3), (0.6, 0.4), (0.5, 0.5)],
                           [(1, 0), (0, 1), (1, 0), (0, 1), (0, 1)]]

In [427]:
import numpy as np

from scipy.special import softmax

NUM_DIM = 20
class SyntheticDataset:

    def __init__(
            self,
            num_classes=2,
            seed=round(np.random.random() * 100),
            num_dim=NUM_DIM,
            prob_clusters=[0.5, 0.5]):

        np.random.seed(seed)

        self.num_classes = num_classes
        self.num_dim = num_dim
        self.num_clusters = len(prob_clusters)
        self.prob_clusters = prob_clusters

        self.side_info_dim = self.num_clusters

        self.Q = np.random.normal(
            loc=0.0, scale=1.0, size=(self.num_dim + 1, self.num_classes, self.side_info_dim))

        self.Sigma = np.zeros((self.num_dim, self.num_dim))
        for i in range(self.num_dim):
            self.Sigma[i, i] = (i + 1)**(-1.2)

        self.means = self._generate_clusters()

    def get_task(self, num_samples):
        cluster_idx = np.random.choice(
            range(self.num_clusters), size=None, replace=True, p=self.prob_clusters)
        new_task = self._generate_task(self.means[cluster_idx], cluster_idx, num_samples)
        return new_task

    def _generate_clusters(self):
        means = []
        for i in range(self.num_clusters):
            loc = np.random.normal(loc=0, scale=1., size=None)
            mu = np.random.normal(loc=loc, scale=1., size=self.side_info_dim)
            means.append(mu)
        return means

    def _generate_x(self, num_samples):
        B = np.random.normal(loc=0.0, scale=1.0, size=None)
        loc = np.random.normal(loc=B, scale=1.0, size=self.num_dim)

        samples = np.ones((num_samples, self.num_dim + 1))
        samples[:, 1:] = np.random.multivariate_normal(
            mean=loc, cov=self.Sigma, size=num_samples)

        return samples

    def _generate_y(self, x, cluster_mean):
        model_info = np.random.normal(loc=cluster_mean, scale=0.1, size=cluster_mean.shape)
        w = np.matmul(self.Q, model_info)
        
        num_samples = x.shape[0]
        prob = softmax(np.matmul(x, w) + np.random.normal(loc=0., scale=0.1, size=(num_samples, self.num_classes)), axis=1)
                
        y = np.argmax(prob, axis=1)
        return y, w, model_info

    def _generate_task(self, cluster_mean, cluster_id, num_samples):
        x = self._generate_x(num_samples)
        y, w, model_info = self._generate_y(x, cluster_mean)

        # now that we have y, we can remove the bias coeff
        x = x[:, 1:]

        return {'x': x, 'y': y, 'w': w, 'model_info': model_info, 'cluster': cluster_id}
    

In [428]:
synth_gen = SyntheticDataset()
synth_gen.get_task(100)

{'x': array([[-2.19375009,  1.32144189,  0.48307649, ..., -1.53719885,
         -0.17183379, -1.45830464],
        [-1.04096292,  0.74812346,  0.69908505, ..., -2.05817883,
         -0.22098152, -1.20791264],
        [-2.54963164,  0.06346844,  0.32003966, ..., -1.74613894,
         -0.17518618, -0.96918059],
        ...,
        [-2.22060479, -0.04840877,  0.80082354, ..., -1.65613586,
         -0.28828986, -1.2509536 ],
        [-2.64532884, -0.63004091,  0.58311658, ..., -1.6780055 ,
         -0.34086216, -1.04046092],
        [-2.19337052, -0.38636502,  0.26911878, ..., -2.0223011 ,
         -0.07011181, -1.17605659]]),
 'y': array([0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0], dtype=int64),
 'w': ar

In [3]:
def secret_share(tensor, workers, crypto_provider, precision_fractional):
    """ Transform to fixed precision and secret share a tensor 
    
    Args:
        tensor             (PointerTensor): Pointer to be shared
        workers   (list(sy.VirtualWorker)): Involved workers of the grid
        crypto_provider (sy.VirtualWorker): Arbiter (i.e. TTP) of the grid
        precision_fractional (int): Precision for casting integer ring arithimetic
    """
    return (
        tensor
        .fix_precision(precision_fractional=precision_fractional)
        .share(
            *workers, 
            crypto_provider=crypto_provider, 
            requires_grad=True
        )
    )

def setup_FL_env(training_datasets, validation_datasets, 
                 testing_dataset, is_shared=False):
    """ Sets up a basic federated learning environment using virtual workers,
        with a allocated arbiter (i.e. TTP) to faciliate in model development
        & utilisation, and deploys datasets to their respective workers
        
    Args:

        training_datasets   (dict(tuple(th.Tensor))): Datasets to be used for training
        validation_datasets (dict(tuple(th.Tensor))): Datasets to be used for validation
        testing_dataset           (tuple(th.Tensor)): Datasets to be used for testing
        is_shared (bool): Toggles if SMPC encryption protocols are active
    Returns:
        training_pointers  (dict(sy.BaseDataset))
        validation_pointer (dict(sy.BaseDataset))
        testing_pointer    (sy.BaseDataset)
        workers            (list(sy.VirtualWorker))
        crypto_provider    (sy.VirtualWorker)
    """
    # Simulate FL computation amongst K worker nodes, 
    # where K is the no. of datasets to be federated
    workers = connect_to_workers(n_workers=len(training_datasets))
    
    # Allow for 1 exchanger/Arbiter (i.e. TTP)
    crypto_provider = connect_to_crypto_provider()
    crypto_provider.clear_objects()
    
    assert (len(crypto_provider._objects) == 0)
    
    # Send training & validation datasets to their respective workers
    training_pointers = {}
    validation_pointers = {}
    for w_idx in range(len(workers)):

        # Retrieve & prepare worker for receiving dataset
        curr_worker = workers[w_idx]
        curr_worker.clear_objects()

        assert (len(curr_worker._objects) == 0)

        train_data = training_datasets[w_idx]
        validation_data = validation_datasets[w_idx]
        
        # Cast dataset into a Tensor & send it to the relevant worker
        train_pointer = sy.BaseDataset(*train_data).send(curr_worker)
        validation_pointer = sy.BaseDataset(*validation_data).send(curr_worker)
        
        # Store data pointers for subsequent reference
        training_pointers[curr_worker] = train_pointer
        validation_pointers[curr_worker] = validation_pointer
    
    # 'Me' serves as the client -> test pointer stays with me, but is shared via SMPC
    testing_pointer = sy.BaseDataset(*testing_dataset).send(crypto_provider)
    
    return training_pointers, validation_pointers, testing_pointer, workers, crypto_provider

In [100]:
def convert_to_FL_batches(model_hyperparams, train_pointers, validation_pointers, test_pointer): 
    """ Supplementary function to convert initialised datasets into their
        SGD compatible dataloaders in the context of PySyft's federated learning
        (NOTE: This is based on the assumption that querying database size does
               not break FL abstraction (i.e. not willing to share quantity))
    Args:
        model_hyperparams                      (model_hyperparams): Parameters defining current experiment
        train_pointers      (dict(sy.BaseDataset)): Distributed datasets for training
        validation_pointers (dict(sy.BaseDataset)): Distributed datasets for model calibration
        test_pointer              (sy.BaseDataset): Distributed dataset for verifying performance
    Returns:
        train_loaders     (sy.FederatedDataLoader)
        validation_loader (sy.FederatedDataLoader)
        test_loader       (sy.FederatedDataLoader)
    """
    
    def construct_FL_loader(data_pointers, **kwargs):
        """ Cast paired data & labels into configured tensor dataloaders
        Args:
            dataset (list(sy.BaseDataset)): A tuple of X features & y labels
            kwargs: Additional parameters to configure PyTorch's Dataloader
        Returns:
            Configured dataloader (th.utils.data.DataLoader)
        """
        federated_dataset = sy.FederatedDataset(data_pointer)
        
#         print(federated_dataset)
        
        federated_data_loader = sy.FederatedDataLoader(
            federated_dataset, 
            batch_size=(
                model_hyperparams['batch_size']
                if model_hyperparams['batch_size'] 
                else len(federated_dataset)
            ), 
            shuffle=True,
            iter_per_worker=True, # for subsequent parallelization
            **kwargs
        )
        
        return federated_data_loader
        
        
    # Load training pointers into a configured federated dataloader
    train_loader = construct_FL_loader(train_pointers.values())
    
    # Load validation pointer into a configured federated dataloader
    validation_loader = construct_FL_loader(validation_pointers.values())
    
    # Load testing dataset into a configured federated dataloader
    test_loader = construct_FL_loader([test_pointer])
    
    return train_loader, validation_loader, test_loader



In [162]:
(training_pointers, 
 validation_pointers, 
 testing_pointer, 
 workers, 
 crypto_provider) = setup_FL_env(
    training_datasets,
    validation_datasets, 
    testing_dataset
)


In [160]:
for batch_idx, batch in enumerate(train_loader):
    print(batch[0].shape)
#     for worker, (data, labels) in batch.items():
# #         print("========")
# #         print(batch_idx, batch)
#         print(worker)
#         print(data.shape)

torch.Size([45, 20])
torch.Size([9, 20])


In [177]:
workers

[<VirtualWorker id:worker1 #objects:2>, <VirtualWorker id:worker2 #objects:4>]

In [157]:
one_client_key = list(training_pointers.keys())[0]

In [158]:
train = {one_client_key:training_pointers[one_client_key]}
val = {one_client_key:validation_pointers[one_client_key]}

In [166]:

trainloaders = {}
for worker_id in list(training_pointers.keys()):
    train = {worker_id:training_pointers[worker_id]}
    val = {worker_id:validation_pointers[worker_id]}
    #============
    # Convert training datasets into syft dataloaders. 
    train_loader, validation_loader, test_loader = convert_to_FL_batches(
        binary_model_hyperparams,
        train, 
        val, 
        testing_pointer
    )
    trainloaders.update({worker_id : train_loader})
    
print(trainloaders)

{<VirtualWorker id:worker1 #objects:2>: <syft.frameworks.torch.fl.dataloader.FederatedDataLoader object at 0x000001B868743748>, <VirtualWorker id:worker2 #objects:2>: <syft.frameworks.torch.fl.dataloader.FederatedDataLoader object at 0x000001B854829EC8>}


In [191]:
for key in trainloaders: 
    for batch_idx, batch in enumerate(trainloaders[key]):
        print(batch[1].shape)
#         for worker, (data, labels) in batch.items():
#             print(worker)

torch.Size([45, 1])
torch.Size([9, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([45, 1])
torch.Size([20, 1])


In [210]:
def perform_FL_training(model_hyperparams, 
                        model_structure,
                        datasets, 
                        workers, 
                        crypto_provider,
                        optimizer=th.optim.SGD):
    """ 
    Simulates a PySyft federated learning cycle using PyTorch, in order
    to prove that it can be done conceptually using the PyTorch interface
        
    Args:
        model_hyperparams (model_hyperparams): 
                                            Parameters defining current experiment
        datasets  (sy.FederatedDataLoader): 
                                        Distributed training datasets
        workers   (list(sy.VirtualWorker)): 
                                        Workers involved in training
        crypto_provider (sy.VirtualWorker): 
                                        Arbiter supervising training
        model     (nn.Module): 
                            Current PyTorch model to train
        optimizer (th.optim): 
                            Optimizer to use
    Returns:
        global_model (nn.Module) : The trained model 
        global_states (dict)
                        {timestep: nn.Module}
                        : The record of trained global models at each timestep.
        client_states (dict)
                        {timestep {worker_id: nn.Module}}
                        : The record of trained models for each worker at each
                          timestep. 
        scale_coeffs (dict)
                        {worker_id: float}
                        : A dictionary of the update weightings for each worker
                          based on individual dataset size. 
    """
    
    criterion = model_hyperparams['criterion']

    def perform_parallel_training(datasets, 
                                  models, 
                                  optimizers, 
                                  criterions, 
                                  epochs):
        """ 
        Parallelizes training across each distributed dataset (i.e. simulated worker)
        Parallelization here refers to the training of all distributed models per
        epoch.
        NOTE: Current approach does not have early stopping implemented
            
        Args:
            datasets   (dict(th.utils.data.DataLoader)): 
                                                       Distributed training datasets
            models     (list(nn.Module)): 
                                        Simulated local models (after distribution)
            optimizers (list(th.optim)): 
                                       Simulated local optimizers (after distribution)
            criterions (list(th.nn)):  
                                    Simulated local objective function (after distribution)
            epochs (int): 
                        No. of epochs to train each local model
        Returns:
            trained local models
        """
        for e in range(epochs):
            for worker in datasets:
#                 print("========================")
#                 print(worker)
                for batch_idx, batch in enumerate(datasets[worker]):
#                     print(batch_idx)
                    data = batch[0]
                    labels = batch[1]
                    '''
                    ========================
                    Each worker trains its own model individually.
                    ========================
                    '''
                    curr_model = models[worker]
                    curr_optimizer = optimizers[worker]
                    curr_criterion = criterions[worker]

                    # Zero gradients to prevent accumulation                    
                    curr_model.train()
                    curr_optimizer.zero_grad()

                    # Forward Propagation
                    predictions = curr_model(data.float())
#                     print(predictions.shape)
#                     print(labels.shape)

                    if model_hyperparams['is_condensed']:
                        loss = curr_criterion(predictions, labels.float())
                    else:
                        loss = curr_criterion(predictions, labels.long())

                    # Backward propagation
                    loss.backward()
                    curr_optimizer.step()

                    # Update models, optimisers & losses
                    models[worker] = curr_model
                    optimizers[worker] = curr_optimizer
                    criterions[worker] = curr_criterion

                    assert (models[worker] == curr_model and 
                            optimizers[worker] == curr_optimizer and 
                            criterions[worker] == curr_criterion)

        trained_models = {w: m.send(crypto_provider) for w,m in models.items()}

        return trained_models
    
    def calculate_global_params(global_model, models, datasets):
        """ Aggregates weights from trained locally trained models after a round.
        
        Args:
            global_model   (nn.Module): Global model to be trained federatedly
            models   (dict(nn.Module)): Simulated local models (after distribution)
            datasets (dict(th.utils.data.DataLoader)): Distributed training datasets
        Returns:
            Aggregated parameters (OrderedDict)
        """
        param_types = global_model.state_dict().keys()
        model_states = {w: m.state_dict() for w,m in models.items()}

        
        # Find size of all distributed datasets for calculating scaling factor
#         print("==================")
#         print(datasets)
#         obs_counts = {}
#         for worker in datasets:
#             for batch_idx, batch in enumerate(datasets[worker_id]):
#                 data = batch[0]
#                 labels = data[1]
#                 curr_count = data.shape[0]
# #                 print("+++++")
# #                 print(curr_count)
#                 if worker in obs_counts.keys():
#                     obs_counts[worker] += curr_count
#                 else:
#                     obs_counts[worker] = curr_count
        
        # Calculate scaling factors for each worker
        scale_coeffs = {w: 1/len(list(datasets.keys())) for w in list(datasets.keys())}

        # PyTorch models can only swap weights of the same structure
        # Hence, aggregate weights while maintaining original layering structure
        aggregated_params = OrderedDict()
        
        '''
        ======================
        Grab the param_states
        ======================
        '''
        params = {}
        
        for p_type in param_types:
            #param_states = [th.mul(ms[p_type], sc) 
            #                for ms,sc in zip(model_states, scale_coeffs)]
            param_states = [
                th.mul(
                    model_states[w][p_type],
                    scale_coeffs[w]
                ).get().get() for w in workers
            ]
            
            '''
            ======================
            Grab the param_states
            ======================
            '''   
            params.update({p_type : param_states})
            
            layer_shape = tuple(global_model.state_dict()[p_type].shape)
            
            '''
            ======================
            Modification made here to allow multiple layers.
            ======================
            '''  
            aggregated_params[p_type] = th.zeros(param_states[0].shape, dtype=th.float64)
            for param_state in param_states:
                aggregated_params[p_type] += param_state
            aggregated_params[p_type] = aggregated_params[p_type].view(*layer_shape)

        return aggregated_params, params, scale_coeffs

    # Generate a global model & send it to the TTP
    
    template_model = Model(model_structure)
    
    global_model = copy.deepcopy(template_model).send(crypto_provider)
    
    print("Global model parameters:\n", [p.location for p in list(global_model.parameters())],
          "\nID:\n", [p.id_at_location for p in list(global_model.parameters())],
          "\n Cloning effect on global model:\n", [p.clone() for p in list(global_model.parameters())])
    
    rounds = 0
    pbar = tqdm(total=model_hyperparams['rounds'], desc='Rounds', leave=True)
    
    '''
    * Dicts for model and client states
    
    '''
    global_states = {}
    client_states = {}
    global_model_state_dicts = {}

    client_template = copy.deepcopy(template_model)
    
    while rounds < model_hyperparams['rounds']:

        local_models = {w: copy.deepcopy(client_template).send(w) for w in workers}

        optimizers = {
            w: optimizer(
                params=model.parameters(), 
                lr=model_hyperparams['lr'], 
                weight_decay=model_hyperparams['decay']
            ) for w, model in local_models.items()
        }
        
        criterions = {w: criterion(reduction='mean') 
                      for w,m in local_models.items()}

        trained_models = perform_parallel_training(
            datasets, 
            local_models, 
            optimizers, 
            criterions, 
            model_hyperparams['epochs']
        )
        
        aggregated_params, params, scale_coeffs = calculate_global_params(
            global_model, 
            trained_models, 
            datasets
        )

        '''
        ============================
        * Save states to dictionary

        '''
        global_model_transfer_out = global_model.get()
        global_states.update({rounds : copy.deepcopy(global_model_transfer_out)})
        global_model_state_dicts.update({rounds : global_model_transfer_out.state_dict()})
            
        client_states.update({rounds + 1 : params})

        # Update weights with aggregated parameters 
        global_model_transfer_out.load_state_dict(aggregated_params)
#         model = copy.deepcopy(global_model_transfer_out)
        client_template = copy.deepcopy(global_model_transfer_out)
        global_model = global_model_transfer_out.send(crypto_provider)
        
        rounds += 1
        pbar.update(1)
        
    '''
    ============================
    * Save final global state

    '''
    global_model_transfer_out = global_model.get()
    global_states.update({rounds : copy.deepcopy(global_model_transfer_out)})
    global_model = global_model_transfer_out.send(crypto_provider)
    global_model_state_dicts.update({rounds : global_model_transfer_out.state_dict()})
    pbar.close()

    return global_model, global_states, client_states, scale_coeffs, global_model_state_dicts

# Default datasets

In [6]:
# Paths to sourcefiles simulating Horizontal learning
DATA_IID_1_PATH = Path("./unified_dataset_iid_1_1000.csv").resolve()
DATA_IID_2_PATH = Path("./unified_dataset_iid_2_1000.csv").resolve()
DATA_nIID_1_PATH = Path("./unified_dataset_noniid_1_1000.csv").resolve()
DATA_nIID_2_PATH = Path("./unified_dataset_noniid_2_1000.csv").resolve()
DATA_VALIDATION_PATH = Path("./unified_dataset_validation_1000.csv").resolve()
DATA_HEADERS = ['age', 'sex', 
                'cp', 'trestbps', 
                'chol', 'fbs', 
                'restecg', 'thalach', 
                'exang', 'oldpeak', 
                'slope', 'ca', 
                'thal', 'target']
DATA_SCHEMA = {'age': 'int32',
               'sex': 'category', 
               'cp': 'category', 
               'trestbps': 'int32', 
               'chol': 'int32', 
               'fbs': 'category', 
               'restecg': 'category', 
               'thalach': 'int32', 
               'exang': 'category', 
               'oldpeak': 'float64', 
               'slope': 'category', 
               'ca': 'category', 
               'thal': 'category', 
               'target': 'category'}
DATA_IDEAL_FEATURES = ['age', 'trestbps', 'chol', 'thalach', 'oldpeak', 
                       'sex_1.0', 'cp_2', 'cp_3', 'cp_4', 'fbs_1', 
                       'restecg_1', 'restecg_2', 'exang_1', 'slope_2', 'slope_3',
                       'ca_1', 'ca_2', 'ca_3', 'thal_6', 'thal_7']


def initialise_datasets(train_src_paths, test_src_path,
                        schema=DATA_SCHEMA, is_condensed=True):
    """ Loads in all training & testing datasets, and automates the process
        of splitting them into X & y pairs
        
    Args:
        src_paths (list(str)): Paths to training sources
        schema         (dict): Universal set of datatypes for source features
    Returns:
        training_datasets           (dict(tuple(th.Tensor)))
        combined_validation_dataset (dict(tuple(th.Tensor)))
        testing_dataset             (tuple(th.Tensor))
    """

    def initialise_dataset(data_path):
        """ Loads in data into a preprocessor to obtain symmetrical OHE samples
        
        Args:
            data_path (str): Path where dataset is stored
        Returns:
            X_train (th.Tensor)
            X_test  (th.Tensor)
            y_train (th.Tensor)
            y_test  (th.Tensor)
        """

        # Data-specific operation - Ensure that categorical classes are valid
         # (This operation is unique to dataset, subjected to changes)
        data = pd.read_csv(data_path).replace(
            [-9.0, '?'], np.nan
        ).replace(
            {'ca': 9, 'slope': 0, 'thal': [1,2,5]}, np.nan
        )

        preprocessor = Preprocessor(data, schema)
        preprocessor.interpolate()
        X_train, X_test, y_train, y_test = preprocessor.transform(
            condense=is_condensed
        )

        return (
            th.from_numpy(X_train), 
            th.from_numpy(X_test),
            th.from_numpy(y_train),
            th.from_numpy(y_test)
        )
    
    
    training_datasets = {}
    validation_datasets = {}   
    for s_idx in range(len(train_src_paths)):

        curr_path = train_src_paths[s_idx]
        X_train, X_test, y_train, y_test = initialise_dataset(curr_path)
        
        training_datasets[s_idx] = (X_train, y_train)
        validation_datasets[s_idx] = (X_test, y_test)
     
    OHE_testing_datasets = initialise_dataset(test_src_path)
    test_X_vals = np.concatenate(OHE_testing_datasets[:2])
    test_y_vals = np.concatenate(OHE_testing_datasets[2:])
    testing_dataset = (th.from_numpy(test_X_vals), th.from_numpy(test_y_vals))
        
    return training_datasets, validation_datasets, testing_dataset

In [7]:
(training_datasets, 
 validation_datasets, 
 testing_dataset
) = initialise_datasets([DATA_nIID_1_PATH, DATA_nIID_2_PATH], DATA_VALIDATION_PATH)

# Variant A: Synthetic, Even Amounts, Equal DGP, Gaussian Noise
### Contributions of clients using this dataset should be equal or close to equal.
This function produces a dataset which is evenly distributed among clients, with variable noise. This represents the base test case for CC functionality.

FL functions in this notebook convert individual CSVs for each client into a dictionary of tuples, one tuple for each client, where first entry is a torch tensor of data, and second entry is a torch tensor of labels. in set_up_fl_env, indexing data dictionaries is how datasets are disseminated to workers. 

In [46]:
def synth_A(datagen_config,
            is_binary,
            num_workers, 
             val_proportion, 
             test_proportion,
             garbage_proportion,
             garbage_severity):
    
    """ Takes in a synthetic dataset generated via Skl.datasets, 
        splits it into num_workers even segments, converts them into 
        torch tensors, formats into a dictionary, for use in FL 
        training. 
        
    Args:
        num_workers (int): The number of workers to split datasets among. 
        synth_data (tuple(numpy.ndarray, numpy.ndarray)): A synthetic classification dataset - First entry is data, second is labels. 
        val_proportion (float): Proportion of datapoints to be used for validation. Float between 0 and 1.
        test_proportion (float): Proportion of datapoints to be used for testing. Float between 0 and 1. 
    Returns:
        training_datasets (defaultdict{
                                    int:tuple(torch.Tensor, torch.Tensor)
                                    }) : A dictionary where data is organized as tuple of torch tensors and indexed by the worker
                                        it belongs to. Follows same ordering as synth_data.
                                        
        validation_datasets (defaultdict{
                                    int:tuple(torch.Tensor, torch.Tensor)
                                    }) : A dictionary where data is organized as tuple of torch tensors and indexed by the worker
                                        it belongs to. Follows same ordering as synth_data.
                                        
        testing_dataset (tuple(torch.Tensor, torch.Tensor)): A tuple containing test data.
    """
    
#     print(datagen_config)
    
    synth_data = skld.make_classification(**datagen_config)
    
    if is_binary:
        synth_data = (synth_data[0], np.reshape(synth_data[1], (-1, 1)))
    
    #============
    # Number of data points in synthetic data set
    synth_data_length = synth_data[0].shape[0]
    
    #============
    # Number of data points per worker
    segment_size = math.floor(synth_data_length / num_workers)
    
    #============
    # Initialize dictionaries for holding datasets.
    # defaultdict() used to simplify dict modification.
    training_datasets = defaultdict()
    validation_datasets = defaultdict()
    testing_dataset = None
    
    #============
    # Select the workers whose data will be corrupted. 
    garbage_indices = np.random.choice(num_workers, int(np.floor(num_workers * garbage_proportion)), replace=False)
    
    for worker_idx in range(num_workers):
        
        #============
        # Number of data points for test, val, train.
        test_size = math.floor(test_proportion * segment_size)
        val_size = math.floor(val_proportion * segment_size)
        train_size = math.floor((1 - val_proportion - test_proportion) * segment_size)
        
#         print(test_size, val_size, train_size)
        
        #============
        # Start and end index of worker's datasegment out of entire dataset. 
        start_idx = worker_idx * segment_size
        end_idx = (worker_idx + 1) * segment_size
        
#         print(start_idx, end_idx)
        
        #============
        # Within each segment, the indexes of test, val, train subsegments.
        test_idx = start_idx + test_size
        val_idx = test_idx + val_size
        train_idx = val_idx + train_size
        
#         print(test_idx, val_idx, train_idx)

        #============
        # Slice dataset by above segments. Convert to torch tensors. 
        validation_datasets[worker_idx] = (th.from_numpy(synth_data[0][test_idx:val_idx, :]), 
                                           th.from_numpy(synth_data[1][test_idx:val_idx]))
        
        training_datasets[worker_idx] = (th.from_numpy(synth_data[0][val_idx:train_idx, :]), 
                                         th.from_numpy(synth_data[1][val_idx:train_idx]))
        
        #============
        # Scatter random noise over data matrices of selected workers.
        if worker_idx in garbage_indices:
            
            val_shape = validation_datasets[worker_idx][0].shape
            train_shape = training_datasets[worker_idx][0].shape
            
            val_noise = np.random.rand(val_shape[0], val_shape[1]) * garbage_severity
            train_noise = np.random.rand(train_shape[0], train_shape[1]) * garbage_severity
            
            validation_datasets[worker_idx] = (validation_datasets[worker_idx][0] + th.from_numpy(val_noise), 
                                               validation_datasets[worker_idx][1])  
            training_datasets[worker_idx] = (training_datasets[worker_idx][0] + th.from_numpy(train_noise), 
                                             training_datasets[worker_idx][1])

        
        #============
        # Testing dataset is in tuple form since it is used by ttp.
        # Each segment contributes a portion of the total, hence the
        # concatenation. 
        if testing_dataset is None:
            testing_dataset = (synth_data[0][start_idx:test_idx, :], 
                                synth_data[1][start_idx:test_idx])
        else:
            testing_dataset = (np.concatenate((testing_dataset[0], 
                                                synth_data[0][start_idx:test_idx, :]), 
                                               axis = 0), 
                                np.concatenate((testing_dataset[1], 
                                                synth_data[1][start_idx:test_idx]), 
                                               axis = 0))
    
    #============
    # Convert testing dataset to torch tensors. 
    testing_dataset = (th.from_numpy(testing_dataset[0]), 
                        th.from_numpy(testing_dataset[1]))
        
    returned_info_dict = {'garbage_indices': garbage_indices, 'worker_proportions': []}
    return training_datasets, validation_datasets, testing_dataset, returned_info_dict
        

# Variant B: Synthetic, Uneven Amounts, Equal DGP, No Additional Noise. 
### Contributions of clients using this dataset should be based on observation numbers per client. 
This function produces a dataset which is unevenly distributed among clients, with variable noise, but from the same distribution. 

In [47]:
def synth_B(datagen_config,
            is_binary,
            num_workers, 
            val_proportion, 
            test_proportion,
            garbage_proportion,
            garbage_severity):
    
    """ Takes in a synthetic dataset generated via Skl.datasets, 
        splits it into num_workers even segments, converts them into 
        torch tensors, formats into a dictionary, for use in FL 
        training. 
        
    Args:
        num_workers (int): The number of workers to split datasets among. 
        synth_data (tuple(numpy.ndarray, numpy.ndarray)): A synthetic classification dataset - First entry is data, second is labels. 
        val_proportion (float): Proportion of datapoints to be used for validation. Float between 0 and 1.
        test_proportion (float): Proportion of datapoints to be used for testing. Float between 0 and 1. 
    Returns:
        training_datasets (defaultdict{
                                    int:tuple(torch.Tensor, torch.Tensor)
                                    }) : A dictionary where data is organized as tuple of torch tensors and indexed by the worker
                                        it belongs to. Follows same ordering as synth_data.
                                        
        validation_datasets (defaultdict{
                                    int:tuple(torch.Tensor, torch.Tensor)
                                    }) : A dictionary where data is organized as tuple of torch tensors and indexed by the worker
                                        it belongs to. Follows same ordering as synth_data.
                                        
        testing_dataset (tuple(torch.Tensor, torch.Tensor)): A tuple containing test data.
    """
    returned_info_dict = {}
    
    synth_data = skld.make_classification(**datagen_config)
    
    if is_binary:
        synth_data = (synth_data[0], np.reshape(synth_data[1], (-1, 1)))
    
    worker_proportions = [np.random.random() for i in range(num_workers)]
    worker_proportions /= np.sum(worker_proportions)
    
#     print(worker_proportions)
    
    #============
    # Number of data points in synthetic data set
    synth_data_length = synth_data[0].shape[0]
    
    #============
    # Initialize dictionaries for holding datasets.
    # defaultdict() used to simplify dict modification.
    training_datasets = defaultdict()
    validation_datasets = defaultdict()
    testing_dataset = None

    previous_end_idx = 0
    for worker_idx in range(num_workers):
        
        #============
        # Number of data points for test, val, train.
        test_size = math.floor((test_proportion * worker_proportions[worker_idx]) * synth_data_length)
        val_size = math.floor((val_proportion * worker_proportions[worker_idx]) * synth_data_length)
        train_size = math.floor(((1 - val_proportion - test_proportion) * worker_proportions[worker_idx]) * synth_data_length)
         
#         print(test_size, val_size, train_size)
        
        #============
        # Start and end index of worker's datasegment out of entire dataset. 
        
#         print(worker_proportions[worker_idx] * synth_data_length)
        start_idx = previous_end_idx
        end_idx = math.floor(previous_end_idx + (worker_proportions[worker_idx] * synth_data_length))
        previous_end_idx = end_idx
#         print(start_idx, end_idx)
        
        #============
        # Within each segment, the indexes of test, val, train subsegments.
        test_idx = start_idx + test_size
        val_idx = test_idx + val_size
        train_idx = val_idx + train_size
        
#         print(test_idx, val_idx, train_idx)

        #============
        # Slice dataset by above segments. Convert to torch tensors. 
        validation_datasets[worker_idx] = (th.from_numpy(synth_data[0][test_idx:val_idx, :]), 
                                           th.from_numpy(synth_data[1][test_idx:val_idx]))
        
        training_datasets[worker_idx] = (th.from_numpy(synth_data[0][val_idx:train_idx, :]), 
                                         th.from_numpy(synth_data[1][val_idx:train_idx]))
        
        #============
        # Testing dataset is in tuple form since it is used by ttp.
        # Each segment contributes a portion of the total, hence the
        # concatenation. 
        if testing_dataset is None:
            testing_dataset = (synth_data[0][start_idx:test_idx, :], 
                                synth_data[1][start_idx:test_idx])
        else:
            testing_dataset = (np.concatenate((testing_dataset[0], 
                                                synth_data[0][start_idx:test_idx, :]), 
                                               axis = 0), 
                                np.concatenate((testing_dataset[1], 
                                                synth_data[1][start_idx:test_idx]), 
                                               axis = 0))
    
    #============
    # Convert testing dataset to torch tensors. 
    testing_dataset = (th.from_numpy(testing_dataset[0]), 
                        th.from_numpy(testing_dataset[1]))
        
    returned_info_dict = {'garbage_indices': [], 'worker_proportions': worker_proportions}
    return training_datasets, validation_datasets, testing_dataset, returned_info_dict
    

In [291]:
proportions = [x]
x = np.random.random()
print("x: {x}".format(x = x))
y = np.random.normal(1, scale=0.4, size = None)
print("y: {y}".format(y = y))
proportions = [x, (x + y)]
print(proportions)
# print(sum(proportions))
# # print(min(proportions))
# proportions = [x / sum(proportions), ((x + y)) / sum(proportions)]
# print(proportions)

x: 0.9225398153900244
y: 0.4007348708975329
[0.9225398153900244, 1.3232746862875573]


In [292]:
z = np.random.normal(0.4, scale=0.001, size = None)

In [344]:
test = []
for i in range(4):
    test.append(np.random.normal(1, scale=0.4, size = None))
test = (np.array(test))
print(test)

[0.96234434 1.12841068 1.05760708 0.71440494]


In [366]:
worker_proportions = test
average_difference = 0
for i in worker_proportions:
    avg_diff_per_worker = 0
    for j in worker_proportions:
        if i != j:
            avg_diff_per_worker += abs(i - j)
#     print(avg_diff_per_worker)
            
    average_difference += (avg_diff_per_worker / (len(worker_proportions) - 1))
    
print(average_difference / len(worker_proportions))

0.13626521458157845


In [367]:
test = []
for i in range(30):
    test.append(np.random.random() * 0.4)
print(test)

[0.22470950122727237, 0.23975070923995856, 0.2569738538954861, 0.012919479225867737, 0.13895059447922115, 0.14234720883946675, 0.22506705432002155, 0.0021860759137620977, 0.33720510852075863, 0.3163645883628665, 0.21143379555246772, 0.11231988759584062, 0.22466781026175175, 0.030445576279362244, 0.38600324618341575, 0.39606567413674076, 0.16461733094911307, 0.34105543820780226, 0.0790866246326059, 0.3432027501822075, 0.33760505875516, 0.26186193378499373, 0.22565345469568282, 0.007054403377193453, 0.10118440256808321, 0.03092697388128296, 0.04534309203436724, 0.30210542116887973, 0.3323023630425529, 0.10466432373850819]


In [206]:
worker_proportions = [np.random.random() for i in range(4)]
worker_proportions /= np.sum(worker_proportions)
worker_proportions

array([0.36906296, 0.45056731, 0.16122364, 0.01914609])

In [375]:
proportions = [0.4, 0.1, 0.5]

In [377]:
worker_proportions = proportions
average_difference = 0
for i in worker_proportions:
    avg_diff_per_worker = 0
    for j in worker_proportions:
        if i != j:
            avg_diff_per_worker += abs(i - j)
#     print(avg_diff_per_worker)
    average_difference += (avg_diff_per_worker)
print(average_difference / len(worker_proportions))

0.5333333333333333


In [None]:
data_proportions = [[0.2, 0.2, 0.2, 0.2, 0.2],
                   [0.3, 0.3, 0.13, 0.13, 0.13], 
                   [0.4, 0.3, 0.2, 0.05, 0.05],
                   [0.7, 0.2, 0.03, 0.03, 0.03]]

DGPs = [[(0, 0.1), (0, 0.1), (0, 0.1), (0, 0.1), (0, 0.1)],
       [(0, 0.2), (0, 0.2), (0, 0.1), (0, 0.1), (0, 0.1)], 
       [(0, 0.3), (0, 0.2), (0, 0.1), (0, 0.1), (0, 0.1)],
       [(0, 0.1), (0, 0.1), (0, 0.1), (0, 0.1), (0, 0.1)]]

distribution_of_labels = [[(0.5, 0.5), (0.5, 0.5), (0.5, 0.5), (0.5, 0.5), (0.5, 0.5)],
                           [(0.7, 0.3), (0.6, 0.4), (0.6, 0.4), (0.5, 0.5), (0.5, 0.5)], 
                           [(0.85, 0.15), (0.8, 0.2), (0.7, 0.3), (0.6, 0.4), (0.5, 0.5)],
                           [(1, 0), (0, 1), (1, 0), (0, 1), (0, 1)]]

# Variant C: Synthetic, Even Amounts, Unequal DGPS, No Additional Noise, Test dataset from the mean DGP. 
### Contributions of clients using this dataset should be based on observation numbers per client. 
This function produces a dataset which is unevenly distributed among clients, with variable noise, but from the same distribution. 

# Variant D: Synthetic, Uneven Amounts, Unequal DGPS, No Additional Noise, Test dataset from the DGP whose client has the least observations. 
### Contributions of clients using this dataset should be based on observation numbers per client. 
This function produces a dataset which is unevenly distributed among clients, with variable noise, but from the same distribution. 

In [191]:
def synth_C(datagen_config,
            is_binary,
            num_workers, 
             val_proportion, 
             test_proportion,
             garbage_proportion,
             garbage_severity,
            average_quantity_difference = 0.4):
    
    """ Takes in a synthetic dataset generated via Skl.datasets, 
        splits it into num_workers even segments, converts them into 
        torch tensors, formats into a dictionary, for use in FL 
        training. 
        
    Args:
        num_workers (int): The number of workers to split datasets among. 
        synth_data (tuple(numpy.ndarray, numpy.ndarray)): A synthetic classification dataset - First entry is data, second is labels. 
        val_proportion (float): Proportion of datapoints to be used for validation. Float between 0 and 1.
        test_proportion (float): Proportion of datapoints to be used for testing. Float between 0 and 1. 
    Returns:
        training_datasets (defaultdict{
                                    int:tuple(torch.Tensor, torch.Tensor)
                                    }) : A dictionary where data is organized as tuple of torch tensors and indexed by the worker
                                        it belongs to. Follows same ordering as synth_data.
                                        
        validation_datasets (defaultdict{
                                    int:tuple(torch.Tensor, torch.Tensor)
                                    }) : A dictionary where data is organized as tuple of torch tensors and indexed by the worker
                                        it belongs to. Follows same ordering as synth_data.
                                        
        testing_dataset (tuple(torch.Tensor, torch.Tensor)): A tuple containing test data.
    """
    
#     print(datagen_config)
    
    synth_data = skld.make_classification(**datagen_config)
    
    if is_binary:
        synth_data = (synth_data[0], np.reshape(synth_data[1], (-1, 1)))
    
    #============
    # Number of data points in synthetic data set
    synth_data_length = synth_data[0].shape[0]
    
    #============
    # Number of data points per worker
    segment_size = math.floor(synth_data_length / num_workers)
    
    #============
    # Initialize dictionaries for holding datasets.
    # defaultdict() used to simplify dict modification.
    training_datasets = defaultdict()
    validation_datasets = defaultdict()
    testing_dataset = None
    
    #============
    # Select the workers whose data will be corrupted. 
    garbage_indices = np.random.choice(num_workers, int(np.floor(num_workers * garbage_proportion)), replace=False)
    
    for worker_idx in range(num_workers):
        
        #============
        # Number of data points for test, val, train.
        test_size = math.floor(test_proportion * segment_size)
        val_size = math.floor(val_proportion * segment_size)
        train_size = math.floor((1 - val_proportion - test_proportion) * segment_size)
        
#         print(test_size, val_size, train_size)
        
        #============
        # Start and end index of worker's datasegment out of entire dataset. 
        start_idx = worker_idx * segment_size
        end_idx = (worker_idx + 1) * segment_size
        
#         print(start_idx, end_idx)
        
        #============
        # Within each segment, the indexes of test, val, train subsegments.
        test_idx = start_idx + test_size
        val_idx = test_idx + val_size
        train_idx = val_idx + train_size
        
#         print(test_idx, val_idx, train_idx)

        #============
        # Slice dataset by above segments. Convert to torch tensors. 
        validation_datasets[worker_idx] = (th.from_numpy(synth_data[0][test_idx:val_idx, :]), 
                                           th.from_numpy(synth_data[1][test_idx:val_idx]))
        
        training_datasets[worker_idx] = (th.from_numpy(synth_data[0][val_idx:train_idx, :]), 
                                         th.from_numpy(synth_data[1][val_idx:train_idx]))
        
        #============
        # Scatter random noise over data matrices of selected workers.
        if worker_idx in garbage_indices:
            
            val_shape = validation_datasets[worker_idx][0].shape
            train_shape = training_datasets[worker_idx][0].shape
            
            val_noise = np.random.rand(val_shape[0], val_shape[1]) * garbage_severity
            train_noise = np.random.rand(train_shape[0], train_shape[1]) * garbage_severity
            
            validation_datasets[worker_idx] = (validation_datasets[worker_idx][0] + th.from_numpy(val_noise), 
                                               validation_datasets[worker_idx][1])  
            training_datasets[worker_idx] = (training_datasets[worker_idx][0] + th.from_numpy(train_noise), 
                                             training_datasets[worker_idx][1])

        
        #============
        # Testing dataset is in tuple form since it is used by ttp.
        # Each segment contributes a portion of the total, hence the
        # concatenation. 
        if testing_dataset is None:
            testing_dataset = (synth_data[0][start_idx:test_idx, :], 
                                synth_data[1][start_idx:test_idx])
        else:
            testing_dataset = (np.concatenate((testing_dataset[0], 
                                                synth_data[0][start_idx:test_idx, :]), 
                                               axis = 0), 
                                np.concatenate((testing_dataset[1], 
                                                synth_data[1][start_idx:test_idx]), 
                                               axis = 0))
    
    #============
    # Convert testing dataset to torch tensors. 
    testing_dataset = (th.from_numpy(testing_dataset[0]), 
                        th.from_numpy(testing_dataset[1]))
        
    returned_info_dict = {'garbage_indices': garbage_indices, 'worker_proportions': []}
    return training_datasets, validation_datasets, testing_dataset, returned_info_dict
        

# Initialize Synthetic Datasets
Produce synthetic datasets according to the settings here.

In [500]:

#============
# Settings for sklearn.datasets.make_classification function. 
binary_synth_datagen_config = {
    "n_samples": 3000, 
     "n_features": 20, 
     "n_informative": 20, 
     "n_redundant": 0, 
     "n_repeated": 0, 
     "n_classes": 2, 
     "n_clusters_per_class": 1, 
     "weights": None, 
     "flip_y": 0.01, 
     "class_sep": 0.1,
     "hypercube": True, 
     "shift": 0.0, 
     "scale": 1.0, 
     "shuffle": True, 
     "random_state": None
}

#============
# Binary dataset creation and formatting. 

# synth_data = skld.make_classification(**binary_synth_datagen_config)
# synth_data = (synth_data[0], np.reshape(synth_data[1], (-1, 1)))



multiclass_synth_datagen_config = {
    "n_samples": 3000, 
     "n_features": 20, 
     "n_informative": 20, 
     "n_redundant": 0, 
     "n_repeated": 0, 
     "n_classes": 4, 
     "n_clusters_per_class": 1, 
     "weights": None, 
     "flip_y": 0.01, 
     "class_sep": 0.1,
     "hypercube": False, 
     "shift": 0.0, 
     "scale": 1.0, 
     "shuffle": True, 
     "random_state": None
}

#============
# Multiclass dataset creation. 

# synth_data = skld.make_classification(**multiclass_synth_datagen_config)

#============
# Settings for dataset split function in above cell. 
synth_gen_config = {
    "datagen_config": binary_synth_datagen_config,
    "is_binary": True,
    "num_workers": 5,
    "val_proportion": 0.2,
    "test_proportion": 0.1, 
    "garbage_proportion": 0,
    "garbage_severity": 3
}

#============
# Perform dataset splitting.
# (training_datasets, validation_datasets, testing_dataset, returned_info_dict) = synth_A(**synth_gen_config)

(training_datasets, validation_datasets, testing_dataset, returned_info_dict) = synth_B(**synth_gen_config)

In [501]:
training_datasets[1][0].shape

torch.Size([280, 20])

In [502]:
training_datasets[0][0].shape

torch.Size([311, 20])

In [503]:
returned_info_dict['worker_proportions']

array([0.14847446, 0.13371493, 0.42542594, 0.01199297, 0.28039171])

In [504]:
training_datasets[0][1].shape

torch.Size([311, 1])

In [505]:
training_datasets[0][0].shape

torch.Size([311, 20])

# Calculate mean and standard deviation of dataset quantity. 

In [506]:
def measure_quantity(dataset_dict):
    x = []
    for client_idx in dataset_dict:
        x.append(dataset_dict[client_idx][0].shape[0])
    print(np.array(x).mean())
    print(np.array(x).std())
    print((np.array(x) - np.array(x).mean()) / (np.array(x).std() + 0.0000001))

In [507]:
measure_quantity(training_datasets)

419.4
296.46490517428873
[-0.36564193 -0.47020743  1.59749094 -1.33034296  0.56870138]


# Calculate average difference in quantity between clients. For each client, calculate the average difference in quantity between that client and all others

In [484]:
def average_quantity_per_client(dataset_dict):
    total = 0
    average_difference_per_client = {}
    for client_idx in dataset_dict:
        total_per_client = 0
        client_quantity = dataset_dict[client_idx][0].shape[0]
        for other_client_idx in dataset_dict:
            if client_idx != other_client_idx:
                other_client_quantity = dataset_dict[other_client_idx][0].shape[0]
                total_per_client += abs(client_quantity - other_client_quantity)
        total_per_client /= (len(dataset_dict.keys()) - 1)
        
        average_difference_per_client.update({client_idx: total_per_client})
        total += total_per_client
        
    total /= len(dataset_dict.keys())
    return total, average_difference_per_client

In [485]:
average_quantity_per_client(training_datasets)

(330.6, {0: 257.0, 1: 411.75, 2: 407.0, 3: 327.75, 4: 249.5})

In [200]:
print(training_datasets.keys())

dict_keys([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])


In [201]:
train_data = json.load(open('./train/data_niid_0_keep_5_train_6.json'))
test_data = json.load(open('./test/data_niid_0_keep_5_test_6.json'))
train_data.keys()

dict_keys(['users', 'num_samples', 'user_data'])

In [514]:
def count_labels_per_client(dataset_dict):
    client_counts = defaultdict(int)
    max_num_labels = 0
    for client_idx in dataset_dict:
        label_counts = defaultdict(int)
        labels = dataset_dict[client_idx][1].view(-1, )
        for label in labels.tolist():
            label_counts[label] += 1
            if len(labels.tolist()) > max_num_labels:
                max_num_labels = len(labels.unique())
        client_counts[client_idx] = label_counts
    return client_counts, max_num_labels

def average_class_discrepancy_per_client(dataset_dict):
    average_label_difference_per_client = {}
    client_counts, max_num_labels = average_difference_label_numbers(training_datasets)
    average_percentage_difference_relative_to_individual = 0
    for client_idx in client_counts:
        total_diff = 0
        total = 0
        for label in client_counts[client_idx]:
            label_count = client_counts[client_idx][label]
            total += label_count
            for other_label in client_counts[client_idx]:
                if label != other_label:
                    other_label_count = client_counts[client_idx][other_label]
                    total_diff += abs(label_count - other_label_count)
        average_label_difference_per_client.update({client_idx: total_diff/total})
        total_diff /= len(client_counts[client_idx].keys()) + 0.00001
        
#         print(round(total_diff))
#         print(round(total_diff) / (total + 0.00001))
        average_percentage_difference_relative_to_individual += round(total_diff) / (total + 0.00001)
    return average_percentage_difference_relative_to_individual / len(client_counts.keys()), average_label_difference_per_client

In [558]:
count_labels_per_client(training_datasets)

(defaultdict(int,
             {0: defaultdict(int, {0: 152, 1: 159}),
              1: defaultdict(int, {1: 137, 0: 143}),
              2: defaultdict(int, {0: 467, 1: 426}),
              3: defaultdict(int, {0: 12, 1: 13}),
              4: defaultdict(int, {1: 290, 0: 298})}),
 2)

In [532]:
def measure_label_distribution_per_client(training_datasets):
    client_counts, num_labels = count_labels_per_client(training_datasets)
    count_dict = defaultdict(list)
    for i in range(num_labels):
        for client_idx in client_counts:
            count_dict[i].append(client_counts[client_idx][i])
    mean_std_dict = copy.deepcopy(count_dict)
    for label in count_dict:
        mean = np.array(count_dict[label]).mean()
        std = np.array(count_dict[label]).std()
        for client_idx in client_counts:
            client_counts[client_idx][label] = (client_counts[client_idx][label] - mean) / std
#         for i in range(len(count_dict[label])):
#             mean_std_dict[i] = (mean_std_dict[i] - mean) / std
    return client_counts

In [533]:
measure_label_distribution_per_client(training_datasets)

defaultdict(int,
            {0: defaultdict(int,
                         {0: -0.4015320976299024, 1: -0.32580714537395156}),
             1: defaultdict(int,
                         {1: -0.4816279540310588, 0: -0.45944538094190757}),
             2: defaultdict(int,
                         {0: 1.6254328182902777, 1: 1.565290850600941}),
             3: defaultdict(int,
                         {0: -1.3024053935944269, 1: -1.3598906937347544}),
             4: defaultdict(int,
                         {1: 0.6020349425388235, 0: 0.5379500538759588})})

In [169]:
returned_info_dict['worker_proportions']

array([0.05129574, 0.22208825, 0.1289808 , 0.03305123, 0.15348252,
       0.09724967, 0.15352131, 0.01237889, 0.09860318, 0.04934841])

In [559]:
def count_samples_per_client(dataset_dict):
    x = []
    for client_idx in dataset_dict:
        x.append(dataset_dict[client_idx][0].shape[0])
    return x

In [571]:
def measure_mean_label_discrepancy_per_client(training_datasets):
    client_counts, num_labels = count_labels_per_client(training_datasets)
    differences = defaultdict(float)
    print(client_counts)
    discrepancies = []
    for client_idx in client_counts:
        total = 0
        for label in client_counts[client_idx]:
            for other_label in client_counts[client_idx]:
                if label != other_label:
                    total += np.abs(client_counts[client_idx][label] - client_counts[client_idx][other_label])
        
        discrepancies.append(total / 2)
#     discrepancies = (np.array(discrepancies) - np.min(np.array(discrepancies))) / np.max(np.array(discrepancies))
    totals = count_samples_per_client(training_datasets)
#     print(np.array(discrepancies) / np.array(totals))
    discrepancies = np.array(discrepancies) / np.array(totals)
    print(discrepancies)
#     print((np.array(discrepancies) - np.min(np.array(discrepancies))) / np.max(np.array(discrepancies)))
    print(np.array(discrepancies).mean())
    print(np.array(discrepancies).std())
    print((np.array(discrepancies) - np.array(discrepancies).mean()) / np.array(discrepancies).std())

In [572]:
measure_mean_label_discrepancy_per_client(training_datasets)

defaultdict(<class 'int'>, {0: defaultdict(<class 'int'>, {0: 152, 1: 159}), 1: defaultdict(<class 'int'>, {1: 137, 0: 143}), 2: defaultdict(<class 'int'>, {0: 467, 1: 426}), 3: defaultdict(<class 'int'>, {0: 12, 1: 13}), 4: defaultdict(<class 'int'>, {1: 290, 0: 298})})
[0.02250804 0.02142857 0.04591265 0.04       0.01360544]
0.02869094123320302
0.012190363260188545
[-0.50719593 -0.59574679  1.41273171  0.92770482 -1.23749381]


In [113]:
# train_data['user_data']

In [200]:
def find_num_classes(df):
    num = 0
    for user in df['users']:
        num_class = len(set(df['user_data'][user]['y']))
        if num_class > num:
            num = num_class
    return num

In [244]:
def format_LEAF_train_data(df):
    dataset_dict = defaultdict()
    num_classes = find_num_classes(df)
    
    for user in df['users']:
        if num_classes > 2:
            dataset_dict[int(user)] = (th.tensor(df['user_data'][user]['x']), 
                                  th.tensor(df['user_data'][user]['y']))
        else:
            dataset_dict[int(user)] = (th.tensor(df['user_data'][user]['x']), 
                                    th.tensor(np.reshape(df['user_data'][user]['y'], (-1, 1))))
    return dataset_dict

def format_LEAF_test_data(df):
    testing_dataset = None
    for user in df['users']:
        if testing_dataset is None:
            testing_dataset = (df['user_data'][user]['x'], 
                                df['user_data'][user]['y'])
        else:
            testing_dataset = (np.concatenate((testing_dataset[0], 
                                                df['user_data'][user]['x']), 
                                               axis = 0), 
                                np.concatenate((testing_dataset[1], 
                                                df['user_data'][user]['y']), 
                                               axis = 0))
    testing_dataset = (th.tensor(testing_dataset[0]), 
                        th.tensor(np.reshape(testing_dataset[1], (-1, 1))))
    return testing_dataset

In [245]:
training_datasets = format_LEAF_train_data(train_data)
validation_datasets = format_LEAF_train_data(test_data)
testing_dataset = format_LEAF_test_data(test_data)

In [246]:
testing_dataset[0].shape

torch.Size([268, 40])

In [247]:
testing_dataset[1].shape

torch.Size([268, 1])