# Multilayer Perceptron
**Comparing Federated Machine Learning to Centralized Machine Learning**

## Imports & Configs

In [1]:
from collections import OrderedDict
from typing import List, Tuple
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import multiprocessing
import torch
import torch.nn as nn
import torch.nn.functional as F
from datasets.utils.logging import disable_progress_bar
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

import flwr as fl
from flwr.client import Client, ClientApp, NumPyClient
from flwr.common import Metrics, Context
from flwr.server import ServerApp, ServerConfig, ServerAppComponents
from flwr.server.strategy import FedAvg
from flwr.simulation import run_simulation
from flwr_datasets import FederatedDataset

In [2]:
if torch.cuda.is_available():
    DEVICE = torch.device("cuda")
    n_gpu = float(torch.cuda.device_count())
    device_name = torch.cuda.get_device_name(DEVICE)
elif torch.backends.mps.is_available():
    DEVICE = torch.device("mps")
    device_name = "Apple Silicon"
    n_gpu = 0.0
else:
    DEVICE = torch.device("cpu")
    device_name = "CPU"
    n_gpu = 0.0
    
print(f"Training on {device_name}")
disable_progress_bar()
n_cores = multiprocessing.cpu_count()
print(f"Number of GPUs: {n_gpu}")
print(f"Number of CPU Cores: {n_cores}")
print(f"Flower {fl.__version__} / PyTorch {torch.__version__}")

Training on NVIDIA GeForce RTX 4090
Number of GPUs: 1.0
Number of CPU Cores: 24
Flower 1.14.0 / PyTorch 2.5.1+cu124


In [3]:
NUM_CLIENTS = 200
BATCH_SIZE = 32
NUM_EPOCHS = 8
NUM_ROUNDS = 5

torch.manual_seed(0)

<torch._C.Generator at 0x7f5aadbdfbf0>

## Loading Data

In [4]:
train_df = pd.read_csv('./Data/adult_train.csv')
test_df = pd.read_csv('./Data/adult_train.csv')
concated_df = pd.concat([train_df, test_df], ignore_index=True)

label_column_name = 'income'
x_train = train_df.drop(columns=[label_column_name]).values
x_test = test_df.drop(columns=[label_column_name]).values
y_train = train_df[label_column_name].values
y_test = test_df[label_column_name].values

In [5]:
class CustomDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)
    
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

In [6]:
def partition_dataset(features, labels, num_clients):
    dataset = CustomDataset(features, labels)
    dataset_size = len(dataset)
    partition_size = dataset_size // num_clients
    
    lengths = [partition_size] * num_clients
    lengths[-1] += dataset_size % num_clients
    partitions = random_split(dataset, lengths)
    return partitions

In [7]:
def load_federated_datasets(x_train, x_test, y_train, y_test, num_clients, batch_size):
    train_partitions = partition_dataset(x_train, y_train, num_clients)
    test_partitions = partition_dataset(x_test, y_test, num_clients)
    
    federated_trainloaders = []
    federated_testloaders = []

    for train_partition, test_partition in zip(train_partitions, test_partitions):
        trainloader = DataLoader(train_partition, batch_size=batch_size, shuffle=True)
        testloader = DataLoader(test_partition, batch_size=batch_size, shuffle=False)
        federated_trainloaders.append(trainloader)
        federated_testloaders.append(testloader)

    return federated_trainloaders, federated_testloaders

## Model

In [8]:
class Binary_MLP(nn.Module):
    def __init__(self):
        super(Binary_MLP, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(97, 64),            
            nn.ReLU(),
            nn.Dropout(0.3),              
            nn.BatchNorm1d(64),            
            
            nn.Linear(64, 32),             
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.BatchNorm1d(32),
            
            nn.Linear(32, 16),             
            nn.ReLU(),
            nn.Dropout(0.2),
            
            nn.Linear(16, 1)               
        )
        
    def forward(self, x):
        return self.model(x)

In [9]:
def evaluate_model(model, testloader, device):
    model.eval()
    all_preds = []
    all_labels = []
    total_loss = 0.0
    criterion = nn.BCEWithLogitsLoss()  # Assuming binary classification with logits output

    with torch.no_grad():
        for inputs, labels in testloader:
            inputs, labels = inputs.to(device), labels.to(device).float()  # Ensure labels are floats for BCELoss
            outputs = model(inputs)

            # Calculate loss for the batch
            batch_loss = criterion(outputs, labels.unsqueeze(1))  # Labels reshaped for compatibility
            total_loss += batch_loss.item()

            # Convert logits to predictions
            preds = torch.sigmoid(outputs).round()  # Threshold at 0.5 for binary classification
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, zero_division=1)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)

    # Average loss over all batches
    avg_loss = total_loss / len(testloader)

    return avg_loss, accuracy, precision, recall, f1

## Centralized Training

In [10]:
train_centralized = CustomDataset(x_train, y_train)
test_centralized = CustomDataset(x_test, y_test)
train_centralized_loader = DataLoader(train_centralized, batch_size=32, shuffle=True)
test_centralized_loader = DataLoader(test_centralized, batch_size=32, shuffle=True)

In [11]:
def centralized_training(model, loader, criterion, optimizer, num_epochs=NUM_EPOCHS):
    model.train()
    total_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    for epoch in range(num_epochs):
        for inputs, labels in loader:
            inputs = inputs.to(DEVICE).float()
            labels = labels.to(DEVICE).float().unsqueeze(1)  # Ensure correct shape

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # Accumulate loss
            total_loss += loss.item()

            # Calculate predictions and update accuracy metrics
            preds = torch.sigmoid(outputs).round()  # Threshold at 0.5 for binary classification
            correct_predictions += (preds == labels).sum().item()
            total_samples += labels.size(0)

    # Calculate overall metrics
    avg_loss = total_loss / (num_epochs * len(loader))  # Average loss over all batches
    overall_accuracy = correct_predictions / total_samples
    return overall_accuracy

In [12]:
model = Binary_MLP().to(DEVICE)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

centralized_training(model, train_centralized_loader, criterion, optimizer)

0.8455475876048033

In [13]:
c_loss, c_accuracy, c_precision, c_recall, c_f1 = evaluate_model(model, test_centralized_loader, DEVICE)

print(f"Centralized Model - Average Loss: {c_loss:.4f}, Accuracy: {c_accuracy:.4f}, Precision: {c_precision:.4f}, Recall: {c_recall:.4f}, F1: {c_f1:.4f}")

Centralized Model - Average Loss: 0.3060, Accuracy: 0.8608, Precision: 0.7657, Recall: 0.6080, F1: 0.6778


In [14]:
for param_tensor in model.state_dict():
    print(param_tensor, "\t", model.state_dict()[param_tensor].size())

model.0.weight 	 torch.Size([64, 97])
model.0.bias 	 torch.Size([64])
model.3.weight 	 torch.Size([64])
model.3.bias 	 torch.Size([64])
model.3.running_mean 	 torch.Size([64])
model.3.running_var 	 torch.Size([64])
model.3.num_batches_tracked 	 torch.Size([])
model.4.weight 	 torch.Size([32, 64])
model.4.bias 	 torch.Size([32])
model.7.weight 	 torch.Size([32])
model.7.bias 	 torch.Size([32])
model.7.running_mean 	 torch.Size([32])
model.7.running_var 	 torch.Size([32])
model.7.num_batches_tracked 	 torch.Size([])
model.8.weight 	 torch.Size([16, 32])
model.8.bias 	 torch.Size([16])
model.11.weight 	 torch.Size([1, 16])
model.11.bias 	 torch.Size([1])


## Federated Training

In [15]:
backend_config = {
    "client_resources": {
        "num_cpus": n_cores,
        "num_gpus": n_gpu
    }
}

In [16]:
def set_parameters(model, parameters):
    params_dict = zip(model.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.from_numpy(v) for k, v in params_dict})
    model.load_state_dict(state_dict, strict=True)

def get_parameters(model) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in model.state_dict().items()]

In [17]:
class FlowerClient(NumPyClient):
    def __init__(self, model, trainloader, testloader):
        self.model = model
        self.trainloader = trainloader
        self.testloader = testloader

    def get_parameters(self, config):
        return get_parameters(self.model)

    def fit(self, parameters, config):
        set_parameters(self.model, parameters)
        accuracy = centralized_training(self.model, self.trainloader, criterion, optimizer)
        return get_parameters(self.model), len(self.trainloader), {"accuracy": float(accuracy)}

    def evaluate(self, parameters, config):
        set_parameters(self.model, parameters)
        loss, accuracy, precision, recall, f1 = evaluate_model(self.model, self.testloader, DEVICE)
        return float(loss), len(self.testloader), {"accuracy": float(accuracy)}

In [18]:
def client_fn(context: Context) -> Client:
    model = Binary_MLP().to(DEVICE)
    partition_id = context.node_config["partition-id"]

    trainloaders, testloaders = load_federated_datasets(
        x_train,
        x_test,
        y_train,
        y_test,
        NUM_CLIENTS,
        BATCH_SIZE
    )

    trainloader = trainloaders[partition_id]
    testloader = testloaders[partition_id]

    return FlowerClient(model, trainloader, testloader).to_client()

In [19]:
def weighted_average(metrics: List[Tuple[int, Metrics]]) -> Metrics:
    accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics]
    examples = [num_examples for num_examples, _ in metrics]

    return {"accuracy": sum(accuracies) / sum(examples)}

In [20]:
def server_fn(context: Context) -> ServerAppComponents:
    strategy = FedAvg(
        fraction_fit=1.0,  # Sample 100% of available clients for training
        fraction_evaluate=1.0,  # Sample 100% of available clients for evaluation
        min_fit_clients=10,  # Never sample less than 10 clients for training
        min_evaluate_clients=5,  # Never sample less than 5 clients for evaluation
        min_available_clients=10, # Wait until all 10 clients are available
        evaluate_metrics_aggregation_fn=weighted_average,
        fit_metrics_aggregation_fn=weighted_average
    )
    config = ServerConfig(num_rounds=NUM_ROUNDS)
    return ServerAppComponents(strategy=strategy, config=config)


In [21]:
client = ClientApp(client_fn=client_fn)
server = ServerApp(server_fn=server_fn)

In [22]:
run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_CLIENTS,
    backend_config=backend_config,
)

[92mINFO [0m:      Starting Flower ServerApp, config: num_rounds=5, no round_timeout
[92mINFO [0m:      
[92mINFO [0m:      [INIT]
[92mINFO [0m:      Requesting initial parameters from one random client
[92mINFO [0m:      Received initial parameters from one random client
[92mINFO [0m:      Starting evaluation of initial global parameters
[92mINFO [0m:      Evaluation returned no results (`None`)
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 1]
[92mINFO [0m:      configure_fit: strategy sampled 200 clients (out of 200)
[92mINFO [0m:      aggregate_fit: received 200 results and 0 failures
[92mINFO [0m:      configure_evaluate: strategy sampled 200 clients (out of 200)
[92mINFO [0m:      aggregate_evaluate: received 200 results and 0 failures
[92mINFO [0m:      
[92mINFO [0m:      [ROUND 2]
[92mINFO [0m:      configure_fit: strategy sampled 200 clients (out of 200)
[92mINFO [0m:      aggregate_fit: received 200 results and 0 failures
[92mINFO [0m:     

## Comparison