In [10]:
import numpy as np
import pandas as pd
import os
import random
from sklearn.model_selection import train_test_split
from sklearn.linear_model import SGDClassifier
import math
from data_partition import DistributedDataSet
from init_ipfs_nodes import create_ipfs_nodes
import ipfshttpclient
import piskg
import copy
import multiprocessing
from multiprocessing.pool import ThreadPool
from datetime import datetime

In [11]:
def generate_random_data(data_size, n_classes, seed):
    random.seed(seed)
    x_data = pd.DataFrame([[random.random() for _ in range(data_size)], [random.random() for _ in range(data_size)]]).T
    y_data = pd.DataFrame([(list(np.arange(n_classes))*int(data_size/n_classes))])
    data = pd.DataFrame(np.concatenate((x_data.values, y_data.values.T), axis=1))
    data.columns = ['x1', 'x2', 'y']
    return data

In [12]:
CLIENTS = 8 #don't set more than 9 because of IPFS initialization script
CLIENT_GRAPH = {
    0: [1, 4],
    1: [0, 5, 2],
    2: [3, 1],
    3: [2, 4, 6],
    4: [3, 7, 0],
    5: [1],
    6: [3],
    7: [4]
}

SEED = 0
ROUNDS = 5
BATCH_SIZE = 45

#supports only pandas dataframe for now. 
#input values should be labelled as x0, x1, etc. 
#output values should be labelled as y
DATASET = generate_random_data(2000, 5, SEED)

In [13]:
dd = DistributedDataSet(DATASET, SEED, BATCH_SIZE, CLIENTS)
df_di, test = dd.get_distributed_dataset(0.1)

In [14]:
test

Unnamed: 0,x1,x2,y
0,0.352789,0.313610,4.0
1,0.076456,0.896365,3.0
2,0.196392,0.698080,1.0
3,0.795012,0.971569,1.0
4,0.620538,0.330931,2.0
...,...,...,...
195,0.551267,0.635284,2.0
196,0.633089,0.633140,1.0
197,0.004141,0.118818,0.0
198,0.700406,0.869067,1.0


In [6]:
# create_ipfs_nodes(CLIENTS)

In [7]:
# class ClientNetwork:
#     def __init__(self, count, structure, ipfs, blkchain, train_data, test_data):
#         self.count = count
#         self.structure = structure
#         self.clients = ipfs
#         self.blkchain = blkchain
#         self.train_data = train_data
#         self.test_data = test_data
        
#         self.clients = []
        
#     def start_clients(self):
#         for i in range(self.count):
#             ag = Client(i, self.structure[i], sekf.train_data[i], self.ipfs, self.blkchain, self.test)
#             self.clients.append(ag)
            

In [43]:
def getTime(messages):
    simulated_communication_times = {message.sender: message.body['time'] for message in messages}
    slowest_client = max(simulated_communication_times, key=simulated_communication_times.get)
    simulated_time = simulated_communication_times[slowest_client]  # simulated time it would take for server to receive all values
    return simulated_time

In [8]:
class Message:
    def __init__(self, sender, receiver, body):
        self.sender = sender
        self.receiver = receiver
        self.body = body

In [151]:
class Client:
    def __init__(self, client_id, neighbour_ids, client_datasets, client_ipfs_node, client_blkchain_node, test_dataset):
        
        self.client_id = client_id
        self.neighbour_ids = neighbour_ids
        self.client_datasets = client_datasets
        self.client_ipfs_node = client_ipfs_node
        self.client_blkchain_node = client_blkchain_node
        self.test_dataset = test_dataset
        
        self.personal_weights = {}
        self.personal_intercepts = {}
        
        self.federated_weights = {}
        self.federated_intercepts = {}
        
        self.personal_accuracy = {}
        self.federated_accuracy = {}
        
        self.train_time = {}
        
        self.nb_weights = {}
        self.nb_intercepts = {}
    
    def client_train_caller(self, deets):
        
        client_instance, msg = deets
        
        if(client_instance.client_id == self.client_id):
            m = self.train_client(msg)
        else:  
            m = client_instance.train_client(msg)
        return m
    
    def get_nb_weights(self, round_it):
        m = multiprocessing.Manager()
        lock = m.Lock()
        with ThreadPool(len(self.neighbour_ids)) as calling_pool:
            args = []
            self.neighbour_ids.append(self.client_id)
            for cl in (self.neighbour_ids):
                body = {'round': round_it, 'lock': lock}
                msg = Message(self.client_id, cl, body)
                args.append((allCl.clients[cl], msg))
            self.neighbour_ids.remove(self.client_id)
            msgs = calling_pool.map(self.client_train_caller, args)
        
        time_start = datetime.now()
        train_time = getTime(msgs)
        
        self.nb_weights[round_it] = []
        self.nb_intercepts[round_it] = []
        
        for msg in msgs:
            self.nb_weights[round_it].append(msg.body['weights'])
            self.nb_intercepts[round_it].append(msg.body['intercepts'])
        
        time_stop = datetime.now()
        fedtime = time_stop - time_start
        totTime = fedtime+train_time
#         print(totTime)
        self.FedAvg(round_it)
        return msgs
    
    def FedAvg(self, round_it):
        self.federated_weights[round_it] = np.average(self.nb_weights[round_it], axis=0)
        self.federated_intercepts[round_it] = np.average(self.nb_intercepts[round_it], axis=0)
    
    def train_client(self, message):
        round_it = message.body['round']
        start_time = datetime.now()
        for epoch in range(len(self.client_datasets)):
            X = self.client_datasets[epoch][['x1','x2']]
            y = self.client_datasets[epoch]['y']
            weights, intercepts = self.compute_epoch(X, y, round_it, epoch)
            if(epoch==0):
                self.personal_weights[round_it] = []
                self.personal_intercepts[round_it] = []
                
            self.personal_weights[round_it].append(weights)
            self.personal_intercepts[round_it].append(intercepts)
        stop_time = datetime.now()
        comp_time = stop_time - start_time
        self.train_time[round_it] = comp_time
        
        body = {
            'weights': self.personal_weights[round_it][epoch],
            'intercepts': self.personal_intercepts[round_it][epoch],
            'time': self.train_time[round_it]
        }
        
        m = Message(self.client_id, message.sender, body)
        
        return m
    
    def compute_epoch(self, X, y, round_it, epoch):            
            
        lr = SGDClassifier(alpha=0.0001, loss="log", random_state=0)

        if epoch >= 1:
            weights = copy.deepcopy(self.personal_weights[round_it][epoch - 1])
            intercepts = copy.deepcopy(self.personal_intercepts[round_it][epoch - 1])
        else:
            if(round_it>=1):
#                 if(sender==receiver):
#                     weights = copy.deepcopy(self.federated_weights[round_it - 1])
#                     intercepts = copy.deepcopy(self.federated_intercepts[round_it - 1])
#                 else:
                    weights = None
                    intercepts = None

            else:
                weights = None
                intercepts = None
        
        lr.fit(X, y, coef_init=weights, intercept_init=intercepts)
        local_weights = lr.coef_
        local_intercepts = lr.intercept_
        
        if(epoch==(len(self.client_datasets) - 1)):
            acc = lr.score(self.test_dataset[['x1', 'x2']], self.test_dataset['y'])
            self.personal_accuracy[round_it] = acc
            
        return local_weights, local_intercepts
        
#     def federated_averaging(self):
        

In [152]:
class AllClients:
    def __init__(self, clients):
        self.clients = clients

In [153]:
client_li = []
for i in range(CLIENTS):
    ag = Client(i, CLIENT_GRAPH[i], df_di[i], '.ipfs_fl'+str(i), '1234', test)
    client_li.append(ag)

allCl = AllClients(client_li)

In [154]:
for i in range(ROUNDS):
    allCl.clients[0].get_nb_weights(i)

In [146]:
allCl.clients[0].federated_intercepts

{0: array([-2.9494123 , -2.2328988 , -1.08268978, -2.6132876 , -6.12077549]),
 1: array([-2.9494123 , -2.2328988 , -1.08268978, -2.6132876 , -6.12077549]),
 2: array([-2.9494123 , -2.2328988 , -1.08268978, -2.6132876 , -6.12077549]),
 3: array([-2.9494123 , -2.2328988 , -1.08268978, -2.6132876 , -6.12077549]),
 4: array([-2.9494123 , -2.2328988 , -1.08268978, -2.6132876 , -6.12077549])}

In [118]:
allCl.clients[0].federated_weights

{0: array([[-0.00656982, -2.56678521],
        [ 1.48872879,  0.956123  ],
        [-4.35790851,  0.59425873],
        [-3.75900294, -0.31640623],
        [ 2.58218439,  0.89388052]])}

In [68]:
allCl.clients[0].nb_intercepts

{0: [array([-1.44089513, -3.60416632,  3.40383282, -6.52277488,  0.35369421]),
  array([ -4.61638252,  -0.93427944,  -4.9498795 ,   3.39185783,
         -19.35442808]),
  array([-2.79095926, -2.16025066, -1.70202267, -4.70894576,  0.63840739])]}

In [10]:
def client_computation_caller(inp):
    client_instance, body = inp
    return_message = client_instance.train_client(body['round'])
    return return_message

In [11]:
for ro in range(ROUNDS):
    m = multiprocessing.Manager()
    lock = m.Lock()
    with ThreadPool(CLIENTS) as calling_pool:
        args = []
        for cl in range(CLIENTS):
            body = {'round': ro, 'lock': lock}
            args.append((allCl.clients[cl], body))
        calling_pool.map(client_computation_caller, args)

In [46]:
allCl.clients[1].personal_intercepts[0]

[array([-1.19024283, -8.22172283, -9.42661473, -3.97311779, -4.33640538]),
 array([-10.33395532,   3.24845949,   5.356186  ,  -9.3852317 ,
         -2.30635674]),
 array([-10.55145539, -12.46190679,   0.64291909,  -1.13745339,
         -0.18818607]),
 array([  0.52149334, -16.64293375,  -6.27774062,  -0.99584345,
         -0.70731457]),
 array([-1.44089513, -3.60416632,  3.40383282, -6.52277488,  0.35369421])]