In [263]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn 
from torch.utils.data import random_split, DataLoader, TensorDataset 
import flwr as fl
from flwr.common import Metrics
from collections import OrderedDict
from typing import List, Tuple
import sys

In [264]:
# import platform, cpuinfo, GPUtil, psutil
# print(f"OS: {platform.uname().system} {platform.uname().release}")
# print(f"CPU: {cpuinfo.get_cpu_info()['brand_raw']}")
# print(f"GPU: {GPUtil.getGPUs()[0].name}")
# print(f"Memory: {psutil.virtual_memory().total / (1024 ** 3):.2f} GB")

In [265]:
print("Python version:", sys.version)
print("Version info:", sys.version_info)

Python version: 3.11.4 (tags/v3.11.4:d2340ef, Jun  7 2023, 05:45:37) [MSC v.1934 64 bit (AMD64)]
Version info: sys.version_info(major=3, minor=11, micro=4, releaselevel='final', serial=0)


In [266]:
DEVICE = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f"Training on {DEVICE} using PyTorch {torch.__version__} and Flower {fl.__version__}")

Training on cpu using PyTorch 2.0.1+cpu and Flower 1.5.0


In [267]:
NUM_CLIENTS = 3
EPOCHS = 10
ROUNDS = 5

BATCH_SIZE = 100
IN_FEATURES = 3
HIDDEN_LAYERS = 128
OUT_FEATURES = 2

In [268]:
def load_datasets():
    df = pd.read_csv('./datasets/label_data.csv')
    df = df.rename(columns={'label': 'target'})
    
    feature = df.iloc[:, :-1]
    target = df.loc[:, 'target']
    feature = torch.Tensor(feature.to_numpy())
    target = torch.tensor(target.to_numpy())
    tensor_data = TensorDataset(feature, target)
    
    number_rows = len(feature)
    test_split = int(number_rows * 0.2)
    train_split = number_rows - test_split
    train_set, test_set = random_split(tensor_data, [train_split, test_split])  

    # split_ratio = 0.8
    # split_index = int(len(df) * split_ratio)
    # 
    # train_set = tensor_data.iloc[:split_index, :]
    # test_set = tensor_data.iloc[split_index:, :]
    
    part_size = len(train_set) // NUM_CLIENTS
    length = [part_size] * NUM_CLIENTS  # lengths for each client
    
    # Split the test set evenly into thirds, removing the remainders
    # random_choose = np.random.choice(train_set.index, (len(train_set) % NUM_CLIENTS), replace=False)
    # train_set = train_set.drop(random_choose)
    
    datasets = random_split(train_set, length, generator=torch.Generator().manual_seed(42))
    
    train_loader = []
    val_loader = []
    
    for data in datasets:
        val_length = len(data) // 10
        train_length = len(data) - val_length
        length = [train_length, val_length]
        train_data, val_data = random_split(data, length, generator=torch.Generator().manual_seed(42))
        
        train_loader.append(DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True))
        val_loader.append(DataLoader(val_data, batch_size=BATCH_SIZE, shuffle=True))
    
    test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=True)
    return train_loader, val_loader, test_loader
    
train_loader, val_loader, test_loader = load_datasets()

In [269]:
class Network(nn.Module):
    def __init__(self, IN_FEATURES, HIDDEN_LAYERS, OUT_FEATURES):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(IN_FEATURES, HIDDEN_LAYERS), 
            nn.ReLU(),
            nn.Linear(HIDDEN_LAYERS, HIDDEN_LAYERS), 
            nn.ReLU(),
            nn.Linear(HIDDEN_LAYERS, OUT_FEATURES), 
            nn.Softmax(dim=1)
        )
        
    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

In [270]:
def train(model, train_loader, epochs):
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    model.train()
    
    for epoch in range(1, epochs + 1): 
        correct, total, epoch_loss = 0, 0, 0.0
        
        for feature, target in train_loader:
            feature, target = feature.to(DEVICE), target.to(DEVICE)
            optimizer.zero_grad()
            output = model(feature)
            train_loss = criterion(output, target)
            train_loss.backward()
            optimizer.step()
            
            epoch_loss += train_loss
            total += target.size(0)
            correct += (torch.max(output.data, 1)[1] == target).sum().item()
            
        epoch_loss /= len(train_loader.dataset)
        epoch_accuracy = correct / total
        
        print(f"Epoch {epoch}/{EPOCHS}: train loss: {epoch_loss:.8f}, accuracy: {epoch_accuracy:.8f}.")

In [271]:
def test(model, test_loader):
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    model.eval()
 
    with torch.no_grad():
        for feature, target in test_loader:
            feature, target = feature.to(DEVICE), target.to(DEVICE)
            output = model(feature)
            loss += criterion(output, target).item()
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
            
        loss /= len(test_loader.dataset)
        accuracy = correct / total
        return loss, accuracy

In [272]:
# PyTorch model testing
# train_loader = train_loader[1]
# val_loader = val_loader[1]
# model = Network(IN_FEATURES, HIDDEN_LAYERS, OUT_FEATURES).to(DEVICE)
# 
# train(model, train_loader, EPOCHS)
# 
# loss, accuracy = test(model, val_loader)
# print(f"Final test set performance: \n\tloss: {loss:.8f}, accuracy: {accuracy:.8f}")

In [273]:
def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]

def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)

In [274]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, model, train_loader, val_loader):
        self.model = model
        self.train_loader = train_loader
        self.val_loader = val_loader

    def get_parameters(self, config):
        return get_parameters(self.model)

    def fit(self, parameters, config):
        set_parameters(self.model, parameters)
        train(self.model, self.train_loader, epochs=EPOCHS)
        return get_parameters(self.model), len(self.train_loader), {}

    def evaluate(self, parameters, config):
        set_parameters(self.model, parameters)
        loss, accuracy = test(self.model, self.val_loader)
        return float(loss), len(self.val_loader), {'accuracy: ': float(accuracy)}

In [275]:
def client_fn(cid: str) -> FlowerClient:
    print('Client:', cid)
    model = Network(IN_FEATURES, HIDDEN_LAYERS, OUT_FEATURES).to(DEVICE)
    train_loader, val_loader, test_loader = load_datasets()
    train_loader = train_loader[int(cid)]
    val_loader = val_loader[int(cid)]
    return FlowerClient(model, train_loader, val_loader)

In [276]:
def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
    accuracies = [num_examples * m['accuracy'] for num_examples, m in metrics]
    examples = [num_examples for num_examples, _ in metrics]
    return {'accuracy': sum(accuracies) / sum(examples)}

In [277]:
strategy = fl.server.strategy.FedAvg(
    fraction_fit=1.0,
    fraction_evaluate=1.0,
    min_fit_clients=NUM_CLIENTS,
    min_evaluate_clients=NUM_CLIENTS,
    min_available_clients=NUM_CLIENTS,
    evaluate_metrics_aggregation_fn=weighted_average,
)

history = fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=ROUNDS),
    strategy=strategy,
)

INFO flwr 2023-10-19 18:21:23,786 | app.py:175 | Starting Flower simulation, config: ServerConfig(num_rounds=5, round_timeout=None)
2023-10-19 18:21:29,098	INFO worker.py:1621 -- Started a local Ray instance.
INFO flwr 2023-10-19 18:21:33,373 | app.py:210 | Flower VCE: Ray initialized with resources: {'node:__internal_head__': 1.0, 'accelerator_type:G': 1.0, 'node:127.0.0.1': 1.0, 'object_store_memory': 5450156851.0, 'memory': 10900313703.0, 'CPU': 20.0, 'GPU': 1.0}
INFO flwr 2023-10-19 18:21:33,373 | app.py:218 | No `client_resources` specified. Using minimal resources for clients.
INFO flwr 2023-10-19 18:21:33,374 | app.py:224 | Flower VCE: Resources for each Virtual Client: {'num_cpus': 1, 'num_gpus': 0.0}
INFO flwr 2023-10-19 18:21:33,385 | app.py:270 | Flower VCE: Creating VirtualClientEngineActorPool with 20 actors
INFO flwr 2023-10-19 18:21:33,386 | server.py:89 | Initializing global parameters
INFO flwr 2023-10-19 18:21:33,386 | server.py:276 | Requesting initial parameters fro

[2m[36m(DefaultActor pid=26960)[0m Client: 2
[2m[36m(DefaultActor pid=26960)[0m Client: 0
[2m[36m(DefaultActor pid=26960)[0m Epoch 1/10: train loss: 0.00454311, accuracy: 0.85996963.
[2m[36m(DefaultActor pid=26932)[0m Client: 2[32m [repeated 2x across cluster][0m
[2m[36m(DefaultActor pid=26932)[0m Epoch 3/10: train loss: 0.00451123, accuracy: 0.86223800.[32m [repeated 7x across cluster][0m
[2m[36m(DefaultActor pid=26664)[0m Epoch 5/10: train loss: 0.00452391, accuracy: 0.86100129.[32m [repeated 7x across cluster][0m
[2m[36m(DefaultActor pid=26960)[0m Epoch 8/10: train loss: 0.00452431, accuracy: 0.86094362.[32m [repeated 7x across cluster][0m


DEBUG flwr 2023-10-19 18:22:10,761 | server.py:236 | fit_round 1 received 3 results and 0 failures
DEBUG flwr 2023-10-19 18:22:10,763 | server.py:173 | evaluate_round 1: strategy sampled 3 clients (out of 3)


[2m[36m(DefaultActor pid=26932)[0m Epoch 10/10: train loss: 0.00451137, accuracy: 0.86223800.[32m [repeated 7x across cluster][0m
[2m[36m(DefaultActor pid=26664)[0m Client: 0


DEBUG flwr 2023-10-19 18:22:11,175 | server.py:187 | evaluate_round 1 received 3 results and 0 failures
ERROR flwr 2023-10-19 18:22:11,176 | app.py:294 | 'accuracy'
ERROR flwr 2023-10-19 18:22:11,176 | app.py:295 | Your simulation crashed :(. This could be because of several reasons.The most common are: 
	 > Your system couldn't fit a single VirtualClient: try lowering `client_resources`.
	 > All the actors in your pool crashed. This could be because: 
		 - You clients hit an out-of-memory (OOM) error and actors couldn't recover from it. Try launching your simulation with more generous `client_resources` setting (i.e. it seems {'num_cpus': 1, 'num_gpus': 0.0} is not enough for your workload). Use fewer concurrent actors. 
		 - You were running a multi-node simulation and all worker nodes disconnected. The head node might still be alive but cannot accommodate any actor with resources: {'num_cpus': 1, 'num_gpus': 0.0}.


In [278]:
history

