In [1]:
%load_ext autoreload
%autoreload 2

import asyncio, copy, os, pickle, socket, sys, time
from functools import partial
from multiprocessing import Pool, Process
from pathlib import Path
from tqdm import tqdm

import torch
from torch import optim
from torch.utils.tensorboard import SummaryWriter

sys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), "../")))
from libs import agg, data, fl, log, nn, plot, poison, resnet, sim, wandb
from libs.distributed import *
from cfgs.fedargs import *

  if epoch is not -1 or str(epoch) == str(_epoch):


In [2]:
#fedargs.name = "client-1"
project = 'fl-kafka-client'
name = 'fedavg-cnn-mnist-na-' + fedargs.name

# Save Logs To File (info | debug | warning | error | critical) [optional]
log.init("info")
#log.init("info", name)
#log.init("debug", name)

fedargs.tb = SummaryWriter('../out/runs/' + project + '/' + name, comment="fl")
plot = plot.init(name, project)
wb = wandb.init(name, project)

[34m[1mwandb[0m: Currently logged in as: [33mkasyah[0m (use `wandb login --relogin` to force relogin)
[34m[1mwandb[0m: wandb version 0.12.6 is available!  To upgrade, please run:
[34m[1mwandb[0m:  $ pip install wandb --upgrade


In [3]:
# Kafka topic to publish and subscribe
fedargs.topic = 'pyflx'
fedargs.num_clients = 1

In [4]:
use_cuda = fedargs.cuda and torch.cuda.is_available()
torch.manual_seed(fedargs.seed)
device = torch.device("cuda" if use_cuda else "cpu")
kwargs = {"num_workers": 1, "pin_memory": True} if use_cuda else {}

In [5]:
host = socket.gethostname()
clients = [host + ": " + fedargs.name]

In [6]:
# Distributed topology
dt = Distributed(clients)

In [7]:
# Initialize Global and Client models
global_model = copy.deepcopy(fedargs.model)
# Load Data to clients
train_data, test_data = data.load_dataset(fedargs.dataset)

In [8]:
clients_data = data.split_data(train_data, clients)

In [9]:
client_train_loaders, client_test_loaders = data.load_client_data(clients_data, fedargs.client_batch_size, 0.2, **kwargs)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=fedargs.test_batch_size, shuffle=True, **kwargs)

client_details = {
        client: {"train_loader": client_train_loaders[client],
                 "test_loader": client_test_loaders[client],
                 "model": copy.deepcopy(global_model),
                 "model_update": None}
        for client in clients
    }

In [10]:
def process(client, epoch, dt, model, train_loader, test_loader, fedargs, device):
    log.info("Epoch: {}, Processing Client {}".format(epoch, client))
    
    # Consume and Average, epoch passed is actually prev epoch, for which we want to consume updates
    client_model_updates = dt.consume_model(client, fedargs.topic, model, epoch)
    
    # Pop one's own update
    if client in client_model_updates:
        client_model_updates.pop(client)

    log.info("Epoch: {}, Client {} received {} model update(s) from {}".format(epoch, client, 
                                                                               len(client_model_updates), 
                                                                               list(client_model_updates.keys())))
    
    if len(client_model_updates) != 0:
        model = fl.federated_avg(client_model_updates, model)

    # Train    
    model_update, model, loss = fedargs.train_func(model, train_loader, 
                                                   fedargs.learning_rate,
                                                   fedargs.weight_decay,
                                                   fedargs.local_rounds, device)

    # Publish
    epoch = epoch + 1
    dt.produce_model(client, fedargs.topic, model_update, epoch)

    log.jsondebug(loss, "Epoch {} : Federated Training loss, Client {}".format(epoch, client))
    log.modeldebug(model, "Epoch {}: Client {} Update".format(epoch, client))

    # Test, Plot and Log
    test_output = fedargs.eval_func(model, test_loader, device)
    fedargs.tb.add_scalar("Accuracy/" + client, test_output["accuracy"], epoch)
    fedargs.tb.add_scalar("Test Loss/" + client, test_output["test_loss"], epoch)
    plot.alog(client, {epoch: {"time": time.time(), "acc": test_output["accuracy"], "loss": test_output["test_loss"]}})
    wb.log({client: {"epoch": epoch, "time": time.time(), "acc": test_output["accuracy"], "loss": test_output["test_loss"]}})
    log.jsoninfo(test_output, "Test Outut after Epoch {} of {} for Client {}".format(epoch, fedargs.epochs, client))

    return model

In [11]:
import time
start_time = time.time()

# Federated Training
for epoch in tqdm(range(fedargs.epochs)):
    log.info("Federated Training Epoch {} of {}".format(epoch, fedargs.epochs))

    for client in clients:
        client_details[client]['model'] = process(client, epoch, dt, client_details[client]['model'],
                                                  client_details[client]['train_loader'],
                                                  client_details[client]['test_loader'],
                                                  fedargs, device)
print(time.time() - start_time)

  0%|          | 0/51 [00:00<?, ?it/s]2021-10-29 10:50:42,025 - <ipython-input-11-387a89c474b9>::<module>(l:6) : Federated Training Epoch 0 of 51 [MainProcess : MainThread (INFO)]
2021-10-29 10:50:42,050 - <ipython-input-10-a04f38dd6afa>::process(l:2) : Epoch: 0, Processing Client bladecluster.iitp.org: client-x [MainProcess : MainThread (INFO)]
2021-10-29 10:51:02,123 - <ipython-input-10-a04f38dd6afa>::process(l:11) : Epoch: 0, Client bladecluster.iitp.org: client-x received 50 model update(s) from ['bladecluster.iitp.org(2)', 'bladecluster.iitp.org(4)', 'bladecluster.iitp.org(3)', 'bladecluster.iitp.org(1)', 'bladecluster.iitp.org(5)', 'bladecluster.iitp.org(6)', 'bladecluster.iitp.org(23)', 'bladecluster.iitp.org(7)', 'bladecluster.iitp.org(27)', 'bladecluster.iitp.org(14)', 'bladecluster.iitp.org(32)', 'bladecluster.iitp.org(17)', 'bladecluster.iitp.org(24)', 'bladecluster.iitp.org(18)', 'bladecluster.iitp.org(30)', 'bladecluster.iitp.org(11)', 'bladecluster.iitp.org(19)', 'bladecl

2021-10-29 10:54:46,954 - <ipython-input-10-a04f38dd6afa>::process(l:37) : Test Outut after Epoch 4 of 51 for Client bladecluster.iitp.org: client-x {
    "accuracy": 92.025,
    "correct": 11043,
    "test_loss": 0.2732634908060233
} [MainProcess : MainThread (INFO)]
  8%|▊         | 4/51 [04:05<49:57, 63.79s/it]2021-10-29 10:54:46,970 - <ipython-input-11-387a89c474b9>::<module>(l:6) : Federated Training Epoch 4 of 51 [MainProcess : MainThread (INFO)]
2021-10-29 10:54:46,989 - <ipython-input-10-a04f38dd6afa>::process(l:2) : Epoch: 4, Processing Client bladecluster.iitp.org: client-x [MainProcess : MainThread (INFO)]
2021-10-29 10:55:07,106 - <ipython-input-10-a04f38dd6afa>::process(l:11) : Epoch: 4, Client bladecluster.iitp.org: client-x received 1 model update(s) from ['bladecluster.iitp.org: client-1'] [MainProcess : MainThread (INFO)]
2021-10-29 10:55:49,636 - /home/harsh_1921cs01/hub/AgroFed/fl/libs/protobuf_producer.py::produce(l:56) : Producing user records to topic pyflx. ^C to

2021-10-29 11:01:28,941 - /home/harsh_1921cs01/hub/AgroFed/fl/libs/protobuf_producer.py::produce(l:66) : Flushing records... [MainProcess : MainThread (INFO)]
2021-10-29 11:01:29,845 - /home/harsh_1921cs01/hub/AgroFed/fl/libs/protobuf_producer.py::delivery_report(l:50) : User record b'bladecluster.iitp.org: client-x' successfully produced to pyflx [0] at offset 923 [MainProcess : MainThread (INFO)]
2021-10-29 11:01:33,642 - <ipython-input-10-a04f38dd6afa>::process(l:37) : Test Outut after Epoch 10 of 51 for Client bladecluster.iitp.org: client-x {
    "accuracy": 95.84166666666667,
    "correct": 11501,
    "test_loss": 0.1394776295721531
} [MainProcess : MainThread (INFO)]
 20%|█▉        | 10/51 [10:51<46:07, 67.50s/it]2021-10-29 11:01:33,657 - <ipython-input-11-387a89c474b9>::<module>(l:6) : Federated Training Epoch 10 of 51 [MainProcess : MainThread (INFO)]
2021-10-29 11:01:33,675 - <ipython-input-10-a04f38dd6afa>::process(l:2) : Epoch: 10, Processing Client bladecluster.iitp.org: c

2021-10-29 11:07:27,352 - <ipython-input-10-a04f38dd6afa>::process(l:11) : Epoch: 15, Client bladecluster.iitp.org: client-x received 1 model update(s) from ['bladecluster.iitp.org: client-1'] [MainProcess : MainThread (INFO)]
2021-10-29 11:08:07,214 - /home/harsh_1921cs01/hub/AgroFed/fl/libs/protobuf_producer.py::produce(l:56) : Producing user records to topic pyflx. ^C to exit. [MainProcess : MainThread (INFO)]
2021-10-29 11:08:07,409 - /home/harsh_1921cs01/hub/AgroFed/fl/libs/protobuf_producer.py::produce(l:66) : Flushing records... [MainProcess : MainThread (INFO)]
2021-10-29 11:08:08,284 - /home/harsh_1921cs01/hub/AgroFed/fl/libs/protobuf_producer.py::delivery_report(l:50) : User record b'bladecluster.iitp.org: client-x' successfully produced to pyflx [0] at offset 935 [MainProcess : MainThread (INFO)]
2021-10-29 11:08:11,998 - <ipython-input-10-a04f38dd6afa>::process(l:37) : Test Outut after Epoch 16 of 51 for Client bladecluster.iitp.org: client-x {
    "accuracy": 95.7333333333

 41%|████      | 21/51 [22:52<32:11, 64.37s/it]2021-10-29 11:13:34,308 - <ipython-input-11-387a89c474b9>::<module>(l:6) : Federated Training Epoch 21 of 51 [MainProcess : MainThread (INFO)]
2021-10-29 11:13:34,327 - <ipython-input-10-a04f38dd6afa>::process(l:2) : Epoch: 21, Processing Client bladecluster.iitp.org: client-x [MainProcess : MainThread (INFO)]
2021-10-29 11:13:54,461 - <ipython-input-10-a04f38dd6afa>::process(l:11) : Epoch: 21, Client bladecluster.iitp.org: client-x received 1 model update(s) from ['bladecluster.iitp.org: client-1'] [MainProcess : MainThread (INFO)]
2021-10-29 11:14:30,873 - /home/harsh_1921cs01/hub/AgroFed/fl/libs/protobuf_producer.py::produce(l:56) : Producing user records to topic pyflx. ^C to exit. [MainProcess : MainThread (INFO)]
2021-10-29 11:14:31,096 - /home/harsh_1921cs01/hub/AgroFed/fl/libs/protobuf_producer.py::produce(l:66) : Flushing records... [MainProcess : MainThread (INFO)]
2021-10-29 11:14:32,616 - /home/harsh_1921cs01/hub/AgroFed/fl/lib

2021-10-29 11:19:39,583 - /home/harsh_1921cs01/hub/AgroFed/fl/libs/protobuf_producer.py::delivery_report(l:50) : User record b'bladecluster.iitp.org: client-x' successfully produced to pyflx [0] at offset 957 [MainProcess : MainThread (INFO)]
2021-10-29 11:19:43,128 - <ipython-input-10-a04f38dd6afa>::process(l:37) : Test Outut after Epoch 27 of 51 for Client bladecluster.iitp.org: client-x {
    "accuracy": 11.408333333333333,
    "correct": 1369,
    "test_loss": 2.3025506114959717
} [MainProcess : MainThread (INFO)]
 53%|█████▎    | 27/51 [29:01<24:35, 61.48s/it]2021-10-29 11:19:43,141 - <ipython-input-11-387a89c474b9>::<module>(l:6) : Federated Training Epoch 27 of 51 [MainProcess : MainThread (INFO)]
2021-10-29 11:19:43,160 - <ipython-input-10-a04f38dd6afa>::process(l:2) : Epoch: 27, Processing Client bladecluster.iitp.org: client-x [MainProcess : MainThread (INFO)]
2021-10-29 11:20:03,210 - <ipython-input-10-a04f38dd6afa>::process(l:11) : Epoch: 27, Client bladecluster.iitp.org: c

2021-10-29 11:25:44,796 - /home/harsh_1921cs01/hub/AgroFed/fl/libs/protobuf_producer.py::produce(l:56) : Producing user records to topic pyflx. ^C to exit. [MainProcess : MainThread (INFO)]
2021-10-29 11:25:45,025 - /home/harsh_1921cs01/hub/AgroFed/fl/libs/protobuf_producer.py::produce(l:66) : Flushing records... [MainProcess : MainThread (INFO)]
2021-10-29 11:25:45,856 - /home/harsh_1921cs01/hub/AgroFed/fl/libs/protobuf_producer.py::delivery_report(l:50) : User record b'bladecluster.iitp.org: client-x' successfully produced to pyflx [0] at offset 969 [MainProcess : MainThread (INFO)]
2021-10-29 11:25:49,591 - <ipython-input-10-a04f38dd6afa>::process(l:37) : Test Outut after Epoch 33 of 51 for Client bladecluster.iitp.org: client-x {
    "accuracy": 9.641666666666666,
    "correct": 1157,
    "test_loss": 2.302580988566081
} [MainProcess : MainThread (INFO)]
 65%|██████▍   | 33/51 [35:07<18:14, 60.82s/it]2021-10-29 11:25:49,606 - <ipython-input-11-387a89c474b9>::<module>(l:6) : Federat

Traceback (most recent call last):
  File "/home/harsh_1921cs01/anaconda3/envs/syft/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3437, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-11-387a89c474b9>", line 9, in <module>
    client_details[client]['model'] = process(client, epoch, dt, client_details[client]['model'],
  File "<ipython-input-10-a04f38dd6afa>", line 19, in process
    model_update, model, loss = fedargs.train_func(model, train_loader,
  File "/home/harsh_1921cs01/hub/AgroFed/fl/libs/fl.py", line 182, in train_model
    model, loss = client_update(_model, train_loader, lr, wd, r, device)
  File "/home/harsh_1921cs01/hub/AgroFed/fl/libs/fl.py", line 72, in client_update
    output = model(data)
  File "/home/harsh_1921cs01/anaconda3/envs/syft/lib/python3.9/site-packages/torch/nn/modules/module.py", line 889, in _call_impl
    result = self.forward(*input, **kwargs)
  File "/home/harsh_1921cs01/hub/AgroFed/fl/l

TypeError: object of type 'NoneType' has no len()

<h1> End </h1>