In [1]:
%load_ext autoreload
%autoreload 2

import copy, os, socket, sys, time
from pathlib import Path
from tqdm import tqdm

import torch
from torch import optim

sys.path.insert(0, os.path.abspath(os.path.join(os.getcwd(), "../")))
from libs import data, fl, nn, wandb
from libs.distributed import *

In [2]:
class FedArgs():
    def __init__(self):
        self.name = "client-2"
        self.num_clients = 50
        self.epochs = 51
        self.local_rounds = 1
        self.client_batch_size = 32
        self.test_batch_size = 64
        self.learning_rate = 0.001
        self.weight_decay = None
        self.cuda = False
        self.seed = 1
        self.topic = "VJH_020_2"
        self.broker_ip = '172.16.26.40:9092'
        self.schema_ip = 'http://172.16.26.40:8081'
        self.wait_to_consume = 10
        self.dataset = "lemon"
        self.model = nn.LemonNet()
        self.train_func = fl.train_model
        self.eval_func = fl.evaluate
        
fedargs = FedArgs()

In [3]:
project = 'fl-kafka-client'
name = 'VJH_020_1-' + fedargs.name
wb = wandb.init(name, project)
fedargs.num_clients = 1

[34m[1mwandb[0m: Currently logged in as: [33mkasyah[0m (use `wandb login --relogin` to force relogin)
[34m[1mwandb[0m: wandb version 0.12.6 is available!  To upgrade, please run:
[34m[1mwandb[0m:  $ pip install wandb --upgrade


In [4]:
use_cuda = fedargs.cuda and torch.cuda.is_available()
torch.manual_seed(fedargs.seed)
device = torch.device("cuda" if use_cuda else "cpu")
kwargs = {"num_workers": 1, "pin_memory": True} if use_cuda else {}

In [5]:
host = socket.gethostname()
clients = [host + ": " + fedargs.name]
dt = Distributed(clients, fedargs.broker_ip, fedargs.schema_ip, fedargs.wait_to_consume)

In [6]:
# Initialize Global and Client models
global_model = copy.deepcopy(fedargs.model)
# Load Data to clients
train_loader, test_loader = data.load_dataset(fedargs.dataset, fedargs.client_batch_size, fedargs.test_batch_size)

client_details = {"name": clients[0],
                  "train_loader": train_loader,
                  "test_loader": test_loader,
                  "model": copy.deepcopy(global_model),
                  "model_update": None}

In [7]:
def process(client, epoch, dt, model, train_loader, test_loader, fedargs, device):
    # Consume Models
    client_model_updates = dt.consume_model(client, fedargs.topic, model, epoch)
    if client in client_model_updates:
        client_model_updates.pop(client)
    print("Epoch: {}, Processing Client {}, Received {} Updates From {}".format(epoch, client, 
                                                                                len(client_model_updates), 
                                                                                list(client_model_updates.keys())))

    #if len(client_model_updates) != 0:
    #    model = fl.federated_avg(client_model_updates)
    
    # Train
    model_update, model, loss = fedargs.train_func(model, train_loader, 
                                                   fedargs.learning_rate,
                                                   fedargs.weight_decay,
                                                   fedargs.local_rounds, device)
    
    # Publish Model
    epoch = epoch + 1
    dt.produce_model(client, fedargs.topic, model, epoch)

    # Test, Plot and Log
    test_output = fedargs.eval_func(model, test_loader, device)
    print("Epoch: {}, Accuracy: {}, Test Loss: {}".format(epoch, test_output["accuracy"], test_output["test_loss"]))
    wb.log({client: {"epoch": epoch, "time": time.time(), "acc": test_output["accuracy"], "loss": test_output["test_loss"]}})

    return model

In [None]:
# Federated Training
for epoch in tqdm(range(fedargs.epochs)):
    print("Federated Training Epoch {} of {}".format(epoch, fedargs.epochs))

    client_details['model'] = process(client_details['name'], epoch, dt,
                                              client_details['model'],
                                              client_details['train_loader'],
                                              client_details['test_loader'],
                                              fedargs, device)

  0%|          | 0/51 [00:00<?, ?it/s]

Federated Training Epoch 0 of 51
Epoch: 0, Processing Client bladecluster.iitp.org: client-2, Received 0 Updates From []
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 351


  2%|▏         | 1/51 [00:27<22:48, 27.36s/it]

Epoch: 1, Accuracy: 14.000000000000002, Test Loss: 2.1681354522705076
Federated Training Epoch 1 of 51
Epoch: 1, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 354


  4%|▍         | 2/51 [00:54<22:19, 27.34s/it]

Epoch: 2, Accuracy: 32.0, Test Loss: 1.7055078125
Federated Training Epoch 2 of 51
Epoch: 2, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 357


  6%|▌         | 3/51 [01:21<21:31, 26.91s/it]

Epoch: 3, Accuracy: 39.0, Test Loss: 1.5250565719604492
Federated Training Epoch 3 of 51
Epoch: 3, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 360


  8%|▊         | 4/51 [01:47<21:00, 26.83s/it]

Epoch: 4, Accuracy: 45.0, Test Loss: 1.5097036361694336
Federated Training Epoch 4 of 51
Epoch: 4, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 363


 10%|▉         | 5/51 [02:14<20:40, 26.96s/it]

Epoch: 5, Accuracy: 50.0, Test Loss: 1.3056281280517579
Federated Training Epoch 5 of 51
Epoch: 5, Processing Client bladecluster.iitp.org: client-2, Received 1 Updates From ['bladecluster.iitp.org: client-3']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 366


 12%|█▏        | 6/51 [02:41<20:07, 26.83s/it]

Epoch: 6, Accuracy: 56.00000000000001, Test Loss: 1.1381529235839845
Federated Training Epoch 6 of 51
Epoch: 6, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-1', 'bladecluster.iitp.org: client-3']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 369


 14%|█▎        | 7/51 [03:08<19:45, 26.93s/it]

Epoch: 7, Accuracy: 63.0, Test Loss: 1.1402099227905274
Federated Training Epoch 7 of 51
Epoch: 7, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 372


 16%|█▌        | 8/51 [03:37<19:40, 27.46s/it]

Epoch: 8, Accuracy: 63.0, Test Loss: 0.9598347854614258
Federated Training Epoch 8 of 51
Epoch: 8, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 375


 18%|█▊        | 9/51 [04:05<19:26, 27.78s/it]

Epoch: 9, Accuracy: 72.0, Test Loss: 0.8100106239318847
Federated Training Epoch 9 of 51
Epoch: 9, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 378


 20%|█▉        | 10/51 [04:32<18:50, 27.58s/it]

Epoch: 10, Accuracy: 66.0, Test Loss: 0.9250766944885254
Federated Training Epoch 10 of 51
Epoch: 10, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 381


 22%|██▏       | 11/51 [04:59<18:12, 27.32s/it]

Epoch: 11, Accuracy: 71.0, Test Loss: 0.9097770690917969
Federated Training Epoch 11 of 51
Epoch: 11, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 384


 24%|██▎       | 12/51 [05:26<17:42, 27.25s/it]

Epoch: 12, Accuracy: 71.0, Test Loss: 0.9853722953796387
Federated Training Epoch 12 of 51
Epoch: 12, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 387


 25%|██▌       | 13/51 [05:53<17:05, 26.99s/it]

Epoch: 13, Accuracy: 73.0, Test Loss: 0.7738109588623047
Federated Training Epoch 13 of 51
Epoch: 13, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 390


 27%|██▋       | 14/51 [06:19<16:33, 26.86s/it]

Epoch: 14, Accuracy: 79.0, Test Loss: 0.7727632141113281
Federated Training Epoch 14 of 51
Epoch: 14, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 393


 29%|██▉       | 15/51 [06:45<16:00, 26.68s/it]

Epoch: 15, Accuracy: 66.0, Test Loss: 1.2050891876220704
Federated Training Epoch 15 of 51
Epoch: 15, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 396


 31%|███▏      | 16/51 [07:12<15:28, 26.52s/it]

Epoch: 16, Accuracy: 77.0, Test Loss: 0.9332795143127441
Federated Training Epoch 16 of 51
Epoch: 16, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 399


 33%|███▎      | 17/51 [07:39<15:06, 26.66s/it]

Epoch: 17, Accuracy: 78.0, Test Loss: 0.8762683868408203
Federated Training Epoch 17 of 51
Epoch: 17, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 402


 35%|███▌      | 18/51 [08:05<14:38, 26.63s/it]

Epoch: 18, Accuracy: 74.0, Test Loss: 0.9804212188720703
Federated Training Epoch 18 of 51
Epoch: 18, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 405


 37%|███▋      | 19/51 [08:32<14:12, 26.65s/it]

Epoch: 19, Accuracy: 77.0, Test Loss: 0.9245452880859375
Federated Training Epoch 19 of 51
Epoch: 19, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 408


 39%|███▉      | 20/51 [08:59<13:47, 26.71s/it]

Epoch: 20, Accuracy: 67.0, Test Loss: 1.1922924041748046
Federated Training Epoch 20 of 51
Epoch: 20, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 411


 41%|████      | 21/51 [09:26<13:23, 26.79s/it]

Epoch: 21, Accuracy: 82.0, Test Loss: 0.9252519226074218
Federated Training Epoch 21 of 51
Epoch: 21, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 414


 43%|████▎     | 22/51 [09:52<12:54, 26.72s/it]

Epoch: 22, Accuracy: 75.0, Test Loss: 1.1841387176513671
Federated Training Epoch 22 of 51
Epoch: 22, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 417


 45%|████▌     | 23/51 [10:20<12:33, 26.93s/it]

Epoch: 23, Accuracy: 76.0, Test Loss: 0.8036065292358399
Federated Training Epoch 23 of 51
Epoch: 23, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 420


 47%|████▋     | 24/51 [10:47<12:07, 26.94s/it]

Epoch: 24, Accuracy: 71.0, Test Loss: 1.358422088623047
Federated Training Epoch 24 of 51
Epoch: 24, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 423


 49%|████▉     | 25/51 [11:13<11:39, 26.92s/it]

Epoch: 25, Accuracy: 79.0, Test Loss: 0.8725919914245606
Federated Training Epoch 25 of 51
Epoch: 25, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 426


 51%|█████     | 26/51 [11:40<11:12, 26.91s/it]

Epoch: 26, Accuracy: 79.0, Test Loss: 0.8656203842163086
Federated Training Epoch 26 of 51
Epoch: 26, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 429


 53%|█████▎    | 27/51 [12:08<10:51, 27.15s/it]

Epoch: 27, Accuracy: 75.0, Test Loss: 1.1556451797485352
Federated Training Epoch 27 of 51
Epoch: 27, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 432


 55%|█████▍    | 28/51 [12:36<10:26, 27.26s/it]

Epoch: 28, Accuracy: 75.0, Test Loss: 1.0585461044311524
Federated Training Epoch 28 of 51
Epoch: 28, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 435


 57%|█████▋    | 29/51 [13:03<10:01, 27.33s/it]

Epoch: 29, Accuracy: 75.0, Test Loss: 1.4398622512817383
Federated Training Epoch 29 of 51
Epoch: 29, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 438


 59%|█████▉    | 30/51 [13:30<09:28, 27.09s/it]

Epoch: 30, Accuracy: 73.0, Test Loss: 1.3506889724731446
Federated Training Epoch 30 of 51
Epoch: 30, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...
User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 441


 61%|██████    | 31/51 [13:56<08:57, 26.86s/it]

Epoch: 31, Accuracy: 74.0, Test Loss: 1.6404050445556642
Federated Training Epoch 31 of 51
Epoch: 31, Processing Client bladecluster.iitp.org: client-2, Received 2 Updates From ['bladecluster.iitp.org: client-3', 'bladecluster.iitp.org: client-1']
Producing user records to topic VJH_020_2. ^C to exit.
Flushing records...


[34m[1mwandb[0m: Network error resolved after 0:02:23.344946, resuming normal operation.


User record b'bladecluster.iitp.org: client-2' successfully produced to VJH_020_2 [0] at offset 443


 63%|██████▎   | 32/51 [14:23<08:30, 26.88s/it]

Epoch: 32, Accuracy: 75.0, Test Loss: 1.4351117706298828
Federated Training Epoch 32 of 51
Epoch: 32, Processing Client bladecluster.iitp.org: client-2, Received 1 Updates From ['bladecluster.iitp.org: client-1']
