In [8]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

DEVICE = torch.device("cpu")  # Change to "cuda" for GPU support
NUM_CLIENTS = 10
BATCH_SIZE = 32

class CustomDataset(Dataset):
    def __init__(self, dataframe):
        self.data = dataframe
        self.feature_columns = [col for col in dataframe.columns if col != 'targetTput' and col != 'measTimeStampRf']
        self.target_column = 'targetTput'

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        features = torch.tensor(row[self.feature_columns].values.astype(np.float32), dtype=torch.float32)
        target = torch.tensor(row[self.target_column].astype(np.float32), dtype=torch.float32)
        return features, target
def load_csv_dataset(filepath):
    df = pd.read_csv(filepath)

    # Identify categorical columns
    categorical_columns = df.select_dtypes(include=['object']).columns.tolist()
    # Remove 'measTimeStampRf' from categorical columns as it's not relevant for one-hot encoding
    if 'measTimeStampRf' in categorical_columns:
        categorical_columns.remove('measTimeStampRf')

    # One-hot encode categorical columns
    ohe = OneHotEncoder(sparse_output=False, drop='first')  # Updated parameter name
    ohe_features = ohe.fit_transform(df[categorical_columns])
    ohe_feature_names = ohe.get_feature_names_out(categorical_columns)

    # Create a DataFrame with one-hot encoded features
    ohe_df = pd.DataFrame(ohe_features, columns=ohe_feature_names)

    # Drop original categorical columns and 'measTimeStampRf'
    df.drop(columns=categorical_columns + ['measTimeStampRf'], inplace=True)
    df = pd.concat([df.reset_index(drop=True), ohe_df.reset_index(drop=True)], axis=1)

    # Split dataset
    train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
    train_df, val_df = train_test_split(train_df, test_size=0.2, random_state=42)

    # Split train set into NUM_CLIENTS partitions
    train_partitions = np.array_split(train_df, NUM_CLIENTS)

    trainloaders = [DataLoader(CustomDataset(partition), batch_size=BATCH_SIZE, shuffle=True) for partition in train_partitions]
    testloader = DataLoader(CustomDataset(test_df), batch_size=BATCH_SIZE, shuffle=False)

    return trainloaders, testloader, len(train_df.columns) - 1  # Subtract 1 for the target column

class Net(nn.Module):
    def __init__(self, input_size: int) -> None:
        super(Net, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 32)
        self.fc4 = nn.Linear(32, 1)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

def train(net, trainloaders, epochs: int, verbose=False):
    """Train the network on the training set."""
    criterion = nn.MSELoss()
    optimizer = optim.Adam(net.parameters())
    net.train()
    for epoch in range(epochs):
        running_loss = 0.0
        for trainloader in trainloaders:  # Iterate over each client's data
            for features, targets in trainloader:
                features, targets = features.to(DEVICE), targets.to(DEVICE)
                optimizer.zero_grad()
                outputs = net(features)
                loss = criterion(outputs, targets.view(-1, 1))  # Ensure targets have the correct shape
                loss.backward()
                optimizer.step()
                running_loss += loss.item()
        average_loss = running_loss / (len(trainloaders) * len(trainloader.dataset))
        if verbose:
            print(f"Epoch {epoch+1}: train loss {average_loss:.4f}")

def test(net, testloader):
    """Evaluate the network on the test set."""
    criterion = nn.MSELoss()
    total_loss = 0.0
    net.eval()
    with torch.no_grad():
        for features, targets in testloader:
            features, targets = features.to(DEVICE), targets.to(DEVICE)
            outputs = net(features)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            correct += (predicted == labels).sum().item()
    accuracy = correct / len(testloader.dataset)
    return loss, accuracy
    

In [9]:
from collections import OrderedDict
from typing import Dict, Tuple
import flwr as fl
import torch
from flwr.common import NDArrays, Scalar


class FlowerClient(fl.client.NumPyClient):
    def __init__(self, trainloader, valloader,input_size) -> None:
        super().__init__()
        self.input_size= input_size
        self.trainloader = trainloader
        self.valloader = valloader
        self.model = Net(input_size).to(DEVICE)

    def set_parameters(self, parameters):
        """With the model parameters received from the server,
        overwrite the uninitialise model in this class with them."""

        params_dict = zip(self.model.state_dict().keys(), parameters)
        state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
        # now replace the parameters
        self.model.load_state_dict(state_dict, strict=True)

    def get_parameters(self, config: Dict[str, Scalar]):
        """Extract all model parameters and convert them to a list of
        NumPy arrays. The server doesn't work with PyTorch/TF/etc."""
        return [val.cpu().numpy() for _, val in self.model.state_dict().items()]

    def fit(self, parameters, config):
        """This method train the model using the parameters sent by the
        server on the dataset of this client. At then end, the parameters
        of the locally trained model are communicated back to the server"""

        # copy parameters sent by the server into client's local model
        self.set_parameters(parameters)

        # Define the optimizer -------------------------------------------------------------- Essentially the same as in the centralised example above
        optim = torch.optim.SGD(self.model.parameters(), lr=0.01, momentum=0.9)

        # do local training  -------------------------------------------------------------- Essentially the same as in the centralised example above (but now using the client's data instead of the whole dataset)
        train(self.model, self.trainloader, optim, epochs=1)

        # return the model parameters to the server as well as extra info (number of training examples in this case)
        return self.get_parameters({}), len(self.trainloader), {}

    def evaluate(self, parameters: NDArrays, config: Dict[str, Scalar]):
        """Evaluate the model sent by the server on this client's
        local validation set. Then return performance metrics."""

        self.set_parameters(parameters)
        loss = test(
            self.model, self.valloader
        )  # <-------------------------- calls the `test` function, just what we did in the centralised setting (but this time using the client's local validation set)
        # send statistics back to the server
        return float(loss), len(self.valloader), {"accuracy": accuracy}
    

In [10]:
def get_evaluate_fn(testloader):
    """This is a function that returns a function. The returned
    function (i.e. `evaluate_fn`) will be executed by the strategy
    at the end of each round to evaluate the stat of the global
    model."""

    def evaluate_fn(server_round: int, parameters, config):
        """This function is executed by the strategy it will instantiate
        a model and replace its parameters with those from the global model.
        The, the model will be evaluate on the test set (recall this is the
        whole MNIST test set)."""

        model = Net(input_size).to(DEVICE)

        # set parameters to the model
        params_dict = zip(model.state_dict().keys(), parameters)
        state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
        model.load_state_dict(state_dict, strict=True)

        # call test
        loss, accuracy = test(
            model, testloader
        )  # <-------------------------- calls the `test` function, just what we did in the centralised setting
        return loss, {"accuracy": accuracy}

    return evaluate_fn


# now we can define the strategy
strategy = fl.server.strategy.FedAvg(
    fraction_fit=0.1,  # let's sample 10% of the client each round to do local training
    fraction_evaluate=0.1,  # after each round, let's sample 20% of the clients to asses how well the global model is doing
    min_available_clients=100,  # total number of clients available in the experiment
    evaluate_fn=get_evaluate_fn(testloader),
)  # a callback to a function that the strategy can execute to evaluate the state of the global model on a centralised dataset

In [11]:
def generate_client_fn(trainloaders, valloaders,input_size):
    def client_fn(cid: str):
        """Returns a FlowerClient containing the cid-th data partition"""

        return FlowerClient(
            trainloader=trainloaders[int(cid)], valloader=valloaders[int(cid)], input_size=input_size
        ).to_client()

    return client_fn
trainloaders, testloader, input_size = load_csv_dataset('src_ue.csv')

client_fn_callback = generate_client_fn(trainloaders, testloader,input_size)

  return bound(*args, **kwds)


In [12]:
history = fl.simulation.start_simulation(
    client_fn=client_fn_callback,  # a callback to construct a client
    num_clients=2,  # total number of clients in the experiment
    config=fl.server.ServerConfig(num_rounds=2),  # let's run for 10 rounds
    strategy=strategy,  # the strategy that will orchestrate the whole FL pipeline
)

[92mINFO [0m:      Starting Flower simulation, config: num_rounds=2, no round_timeout
2024-07-29 00:44:45,408	INFO worker.py:1781 -- Started a local Ray instance.
[92mINFO [0m:      Flower VCE: Ray initialized with resources: {'object_store_memory': 3679212748.0, 'node:10.2.0.5': 1.0, 'memory': 7358425499.0, 'node:__internal_head__': 1.0, 'CPU': 2.0}
[92mINFO [0m:      Optimize your simulation with Flower VCE: https://flower.ai/docs/framework/how-to-run-simulations.html
[92mINFO [0m:      No `client_resources` specified. Using minimal resources for clients.
[92mINFO [0m:      Flower VCE: Resources for each Virtual Client: {'num_cpus': 1, 'num_gpus': 0.0}
[92mINFO [0m:      Flower VCE: Creating VirtualClientEngineActorPool with 2 actors
[92mINFO [0m:      [INIT]
[92mINFO [0m:      Requesting initial parameters from one random client
[91mERROR [0m:     Traceback (most recent call last):
  File "/home/azureuser/anaconda3/lib/python3.12/site-packages/flwr/simulation/ray_t

RuntimeError: Simulation crashed.