In [1]:
from N1_ML import *
from U1_IPFS import *
from tensorflow.python.keras.models import clone_model, load_model
from os import remove as rm
import pickle
from math import log as loge

In [2]:
model_dir = "./models/"

In [3]:
def combine_models(m1, m2):
    m = deepcopy(m1)
    for i in range(len(m1)):
        m[i] += m2[i]
    return m

def saveToFile(content, location):
    f = open(location, 'w')
    f.write(content)
    f.close()

def readFromFile(location):
    f = open(location, 'r')
    g = f.read()
    f.close()
    return g

def multiply_scalar(model, wei):
    for i in range(len(model)):
        model[i] *= wei
    return model

In [4]:
class Client:
    def __init__(self, id_):
        self.id_ = id_
        self.hash = hash(self.id_)
        self.round = 0 # <- useful for spacing out tests
        self.roundLimit = 1
        self.train_inp = None
        self.train_oup = None
        self.test_inp = None
        self.test_oup = None
        self.latest_quality = None
        self.contribution = None
        self.ipfs = IPFS(id_=self.id_)
        self.model = CNN()

    def getVolume(self):
        return len(self.train_inp)

    def setRoundLimit(self, limit):
        self.roundLimit = limit

    def train(self, loc, epoch_):
        suf='temp.h5'
        qual_name = 'qual'
        ts = time()
        self.latest_quality = self.model.trainW(str(self.id_), getSession(), epochs_=epoch_).history['accuracy'][-1]
        te = time()
        print("Cli# {0}: {1:.2f} sec, Q: {2:.4f}".format(self.id_, te - ts, self.latest_quality), end = '; ')
        # save model to file
        self.contribution = self.latest_quality * self.getVolume()
        saveToFile(str(self.contribution),loc+qual_name)
        self.model.model.save(loc+suf, save_format='h5')
        # save to IPFS
        mod_hash = self.ipfs.sendToIPFS(loc+suf)
        qual_hash = self.ipfs.sendToIPFS(loc+qual_name)
        rm(loc+suf)
        rm(loc+qual_name)
        return mod_hash, qual_hash

    def evaluate(self):
        self.model.model.evaluate(self.test_inp, self.test_oup, verbose=2)      

    def extractModel(self):
        return self.model
    
    def setData(self, train_inp_, train_oup_, test_inp_, test_oup_):
        self.train_inp = train_inp_
        self.train_oup = train_oup_
        self.test_inp = test_inp_
        self.test_oup = test_oup_
        self.model.setData(train_inp_, train_oup_)

    def replaceWeights(self, weights):
        self.model.model.set_weights(weights)
        #self.model.model.build((None,self,36,1))
        self.model.model.build((None,self,28,28,1))
        self.model.model.compile(optimizer='adam',loss='mse', metrics=['accuracy'])

In [5]:
class Server:
    def __init__(self, clients, epochs_, rounds_):
        self.id_ = 'S'
        self.rounds = rounds_
        self.clients = clients
        self.model = CNN()
        self.model.setup()
        self.ipfs = IPFS(self.id_)
        self.cli_models = []
        self.cli_contrib = []
        self.epochs_ = epochs_
        self.test_inp = None
        self.test_oup = None
    
    def setTestCase(self, tinp, tout):
        self.test_inp = tinp
        self.test_oup = tout
        
    def predict(self):
        self.model.model.evaluate(self.test_inp, self.test_oup, verbose=2)

    def askClients(self):
        for c in self.clients:
            mod_hash, con_hash = c.train(model_dir, self.epochs_)
            self.cli_models.append(mod_hash)
            self.cli_contrib.append(con_hash)

    def sendGlobalModel(self):
        suf = 'temp.h5'
        # save model to h5 as TEMP
        self.model.model.save(model_dir+suf, save_format='h5')
        # upload to IPFS
        glob_hash = self.ipfs.sendToIPFS(model_dir+suf)
        rm(model_dir+suf)
        # spread global model
        for c in self.clients:
            # client fetches the model to temp
            c.ipfs.getFromIPFS(glob_hash, model_dir)
            # load model
            model_weights = load_model(model_dir+glob_hash).get_weights()
            # replace model with the new model
            c.replaceWeights(model_weights)
            rm(model_dir+glob_hash)

    def iterate(self):
        for i in range(self.rounds):
            print("Iteration " + str(i + 1) + " of " + str(self.rounds), end=' | ')
            tsg = time()
            self.sendGlobalModel() # loop start
            tsh = time()
            self.askClients()
            tsi = time()
            self.aggregate()
            tsj = time()
            self.model.trainW()
            print(" | Glob: {0:.2f}s, Train: {1:.2f}s, Aggr: {2:.2f}s.".format(tsh-tsg, tsi-tsh, tsj-tsi))
            self.model.setData(self.test_inp, self.test_oup)
            self.model.train('Glob', getSession(), verbose=2)
        self.predict()
        self.cli_models = []
        self.cli_weights = []
        return self

    def aggregate(self):
        # fetch models from hashes
        total_contribution = 0
        cli_weights = []
        cli_contribs = []
        cli_proportion = []

        for i in range(len(self.cli_models)):
            hash_ = self.cli_models[i]
            cont_ = self.cli_contrib[i]
            # import from IPFS hash to temps
            self.ipfs.getFromIPFS(hash_, model_dir)
            md = load_model(model_dir+hash_).get_weights()
            cli_weights.append(md)
            rm(model_dir+hash_)

            temp_save = "./temp/"
            self.ipfs.getFromIPFS(cont_, temp_save)
            ccb = float(readFromFile(temp_save + cont_))
            total_contribution += ccb
            cli_contribs.append(ccb)
            rm(temp_save + cont_)
            
        # get proportions
        for w in cli_contribs:
            cli_proportion.append(w/total_contribution)
        
        # averaging
        average = None
        for i in range(len(cli_weights)):
            weighted = multiply_scalar(cli_weights[i],cli_proportion[i])
            if average == None:
                average = weighted
            else:
                average = combine_models(average, weighted)

        # repacking in global model
        self.model.model.set_weights(average)
        #self.model.model.build((None,36,1))
        self.model.model.build((None,28,28,1))
        self.model.model.compile(optimizer='adam',loss='mse', metrics=['accuracy'])

In [6]:
count = 7
test_ratio = 0.25

try:
    train_inps = pickle.load(open("./temp/trinps", "rb"))
    train_oups = pickle.load(open("./temp/troups", "rb"))
    test_inps = pickle.load(open("./temp/teinps", "rb"))
    test_oups = pickle.load(open("./temp/teoups", "rb"))
    test_inp = pickle.load(open("./temp/teinp", "rb"))
    test_oup = pickle.load(open("./temp/teoup", "rb"))
except:
    train_inp, test_inp, train_oup, test_oup = split(fetch(), test_ratio)
    train_inps = buildPortions(count, train_inp)
    train_oups = buildPortions(count, train_oup)
    test_inps = buildPortions(count, test_inp)
    test_oups = buildPortions(count, test_oup)
    pickle.dump(train_inps, open("./temp/trinps", "wb"))
    pickle.dump(train_oups, open("./temp/troups", "wb"))
    pickle.dump(test_inps, open("./temp/teinps", "wb"))
    pickle.dump(test_oups, open("./temp/teoups", "wb"))
    pickle.dump(test_inp, open("./temp/teinp", "wb"))
    pickle.dump(test_oup, open("./temp/teoup", "wb"))
    

In [9]:
def buildFL(train_inps_, train_oups_, test_inps_, test_oups_, test_inp_, test_oup_, epochs_, rounds_):
    clients = []
    for i in range(len(train_inps_)):
        c = Client(i)
        c.model.setup()
        c.setData(train_inps_[i], train_oups_[i], test_inps_[i], test_oups_[i])
        c.setRoundLimit(rounds_)
        clients.append(c)
    server = Server(clients, epochs_, rounds_)
    server.setTestCase(test_inp_, test_oup_)
    return server

epochs = 1
rounds = 2
server = buildFL(train_inps, train_oups, test_inps, test_oups, test_inp, test_oup, epochs, rounds)


In [10]:
server.iterate()

Iteration 1 of 2 | Cli# 0: 3.44 sec, Q: 0.7074; Cli# 1: 2.31 sec, Q: 0.6362; Cli# 2: 1.30 sec, Q: 0.5576; Cli# 3: 1.06 sec, Q: 0.4040; Cli# 4: 0.56 sec, Q: 0.2795; Cli# 5: 0.49 sec, Q: 0.2034; Cli# 6: 0.36 sec, Q: 0.2324;  | Glob: 16.89s, Train: 38.44s, Aggr: 29.18s.
Iteration 2 of 2 | Cli# 0: 5.41 sec, Q: 0.7793; Cli# 1: 2.84 sec, Q: 0.7709; Cli# 2: 1.47 sec, Q: 0.7604; Cli# 3: 0.89 sec, Q: 0.7508; Cli# 4: 0.65 sec, Q: 0.7871; Cli# 5: 0.44 sec, Q: 0.7676; Cli# 6: 0.38 sec, Q: 0.7893;  | Glob: 16.96s, Train: 41.02s, Aggr: 58.14s.
547/547 - 2s - loss: 0.0380 - accuracy: 0.7757


<__main__.Server at 0x215ecc98d48>