# The Exacerbation of Permutation Invariance and Overfitting in Federated Learning of Regression Tasks

In this notebook, we demonstrate that when performing federated learning of regression tasks, the issues of permutation
invariance and client-level overfitting are significantly more prominent. We observe that the fact that regression uses
real outputs leads significant variance in client's optimal weights since there are infinitely many weighted sums that
can lead to the correct output. In federated learning, clients hold non-i.i.d. datasets to eachother and may perform
many steps of model updates before synchronising with the rest of the system, this results in client models that may
individually fit well to a subset of the data but are so significantly different in weights to eachother that the aggregated
model does not fit well to any of the data. We call this effect client-level overfitting.

To start our demonstration we import the required external libraries, and establish constants.

In [1]:
from typing import NamedTuple, Tuple, List, Callable
from collections import namedtuple
from functools import partial
from numpy.typing import NDArray, ArrayLike
import numpy as np
import tensorflow as tf
import sklearn.datasets as skd
import sklearn.metrics as skm
from sklearn.model_selection import train_test_split
import scipy.optimize as sp_opt
from tqdm.auto import trange


seed = round(np.pi**13 + np.e * 21)
tf.random.set_seed(seed)
rng = np.random.default_rng(seed)
batch_size = 200
num_epochs = 25
num_clients = 10
num_rounds = num_epochs * num_clients

def evaluate(Y_test: NDArray, preds: ArrayLike):
    print(f"R2 score: {skm.r2_score(Y_test, preds)}")
    print(f"MAE: {skm.mean_absolute_error(Y_test, preds)}")

2023-09-07 15:27:54.188501: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-09-07 15:27:54.215139: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-09-07 15:27:54.215559: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


We will use the California housing dataset, which is a regression task where information about a house and its area is used to predict its
price in units of hundreds of thousands of dollars. The dataset is automatically attained from https://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html.
This dataset allows us to present the simplest format of this demonstration.

In [2]:
X, Y = skd.fetch_california_housing(return_X_y=True)

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=seed)

## Centralised Machine Learning

First, we will show that an equivalent centralised machine learning setting can effectively learn the task.

For our model, we use a simple multilayer perceptron. We can see that it attains good performance when trained locally on the entire training dataset with the Adam optimiser.

In [4]:
def create_model(input_shape: Tuple[int]):
    inputs = tf.keras.layers.Input(input_shape)
    x = inputs
    x = tf.keras.layers.Dense(100, activation="sigmoid")(x)
    x = tf.keras.layers.Dense(1, activation="relu")(x)  # All labels are positive
    model = tf.keras.Model(inputs=inputs, outputs=x)
    model.compile(optimizer=tf.keras.optimizers.Adam(0.001), loss="mean_absolute_error")
    return model

model = create_model(X_train[0].shape)
model.fit(X_train, Y_train, batch_size=batch_size, epochs=num_epochs, verbose=0)
evaluate(Y_test, model.predict(X_test, verbose=0))

R2 score: 0.544383624093583
MAE: 0.545953413051085


## Federated Learning

Now, we create a similar setting, but within federated learning. There is one small difference in our setting, data is distributed in a non-i.i.d. manner
across clients.

With that said, we first define a structure to hold each client's data, and a function to distribute the data to each client. The distribution function
is made non-i.i.d. by assigning each client and equidistant mean label and giving them all samples that have labels that are nearest to that mean.

In [10]:
class ClientData(NamedTuple):
    X: NDArray
    Y: NDArray


def non_iid_client_data(X_train: NDArray, Y_train: NDArray):
    hist, bins = np.histogram(Y_train, 9)
    bin_idx = np.digitize(Y_train, bins) - 1
    all_client_data = []
    for c in range(num_clients):
        idx = bin_idx == c
        all_client_data.append(ClientData(X_train[idx].copy(), Y_train[idx].copy()))
    return all_client_data


def iid_client_data(X_train: NDArray, Y_train: NDArray):
    data_per_client = len(Y_test) // num_clients
    all_client_data = []
    for c in range(num_clients):
        cidx = rng.choice(len(Y_train), data_per_client, replace=False)
        all_client_data.append(ClientData(X_train[cidx].copy(), Y_train[cidx].copy()))
        X_train, Y_train = np.delete(X_train, cidx, axis=0), np.delete(Y_train, cidx, axis=0)
    return all_client_data


def mixed_iid_client_data(X_train: NDArray, Y_train: NDArray, proportion_iid: float = 0.5):
    iid_idx = rng.choice(len(Y_train), round(len(Y_train) * proportion_iid), replace=False)
    all_iid_data = iid_client_data(X_train[iid_idx].copy(), Y_train[iid_idx].copy())
    all_non_iid_data = non_iid_client_data(np.delete(X_train.copy(), iid_idx, axis=0), np.delete(Y_train.copy(), iid_idx, axis=0))
    all_client_data = [
        ClientData(np.concatenate((iidd.X, niidd.X)), np.concatenate((iidd.Y, niidd.Y)))
        for iidd, niidd in zip(all_iid_data, all_non_iid_data)
    ]
    return all_client_data

We next define our federated optimisers: federated averaging and FedAdam.

In [6]:
def fedavg(
    client_parameters: List[List[NDArray]],
    client_samples: List[int],
    *_args
):
    agg_parameters = []
    num_layers = len(client_parameters[0])
    for i in range(num_layers):
        agg_parameters.append(
            np.average([cp[i] for cp in client_parameters], weights=client_samples, axis=0)
        )
    return agg_parameters


class FedAdam:
    def __init__(
        self,
        learning_rate: float = 0.01,
        beta_1: float = 0.9,
        beta_2: float = 0.999,
        epsilon: float = 1e-07
    ):
        self.mu = None
        self.nu = None
        self.learning_rate = learning_rate
        self.beta_1 = beta_1
        self.beta_2 = beta_2
        self.epsilon = epsilon

    def __call__(
        self,
        client_parameters: List[List[NDArray]],
        client_samples: List[int],
        global_parameters: List[NDArray]
    ):
        if self.mu is None:
            self.mu = [np.zeros_like(p) for p in global_parameters]
            self.nu = [np.zeros_like(p) for p in global_parameters]

        client_grads = [
            [cp - gp for cp, gp in zip(cparams, global_parameters)] for cparams in client_parameters
        ]
        agg_grads = fedavg(client_grads, client_samples)
        for i in range(len(global_parameters)):
            self.mu[i] = self.beta_1 * self.mu[i] + (1 - self.beta_1) * agg_grads[i]
            self.nu[i] = self.beta_2 * self.nu[i] + (1 - self.beta_2) * agg_grads[i]**2
            mu_hat = self.mu[i] / (1 - self.beta_1)
            nu_hat = self.nu[i] / (1 - self.beta_2)
            layer_update = mu_hat / (np.sqrt(nu_hat) + self.epsilon)
            global_parameters[i] += self.learning_rate * layer_update
        return global_parameters

And finally, we define a function to perform our federated learning experiment by creating the clients, having them train for each round, aggregate the client models
using the specified optimizer at the end of each round. When learning is completed, we print the $R^2$ score and mean absolute error of the learnt global model evaluated
on the testing dataset.

In [7]:
def federated_learning(
    create_model_fun: Callable[[Tuple[int]], tf.keras.Model],
    optimiser: Callable[[List[List[NDArray]], List[int], List[NDArray]], List[NDArray]],
    all_client_data: List[ClientData]
):
    global_model = create_model_fun(X_train[0].shape)
    client_models = [create_model_fun(X_train[0].shape) for _ in range(num_clients)]
    num_client_samples = [len(cd.Y) for cd in all_client_data]

    for r in (pbar := trange(num_rounds)):
        losses = []
        for client_model, client_data in zip(client_models, all_client_data):
            client_model.set_weights(global_model.get_weights())
            history = client_model.fit(client_data.X, client_data.Y, batch_size=batch_size, verbose=0)
            losses.append(history.history['loss'][0])
        global_model.set_weights(
            optimiser([cm.get_weights() for cm in client_models], num_client_samples, global_model.get_weights())
        )
        pbar.set_postfix_str(f"Loss: {np.average(losses, weights=num_client_samples):.3f}")

    evaluate(Y_test, global_model.predict(X_test, verbose=0))

We are now set up for the experiments, first we demonstrate with the same model as the centralised training and federated averaging as
the federated optimiser.

In [11]:
federated_learning(create_model, fedavg, non_iid_client_data(X_train, Y_train))

  0%|          | 0/250 [00:00<?, ?it/s]

R2 score: -0.13630436970418502
MAE: 0.9026521411046777


We see that this form of training is completely ineffective, so we try a different federated optimiser, FedAdam.

In [12]:
federated_learning(create_model, FedAdam(), non_iid_client_data(X_train, Y_train))

  0%|          | 0/250 [00:00<?, ?it/s]

R2 score: -0.06884925502126049
MAE: 0.8845697449340192


It remains arguable that this effect could simple be a side effect of the client-side Adam optimiser being used in training, and the updates
it produces which have differing denominators causing incorrect aggregation. However, with the following we construct a closer replication of
the centralised setting by applying the SGD optimiser at the client-side and FedAdam at the server. Still, we see that client-level
overfitting has a significant negative impact on the resulting global model performance.

In [9]:
def create_sgd_model(input_shape: Tuple[int]):
    inputs = tf.keras.layers.Input(input_shape)
    x = inputs
    x = tf.keras.layers.Dense(100, activation="sigmoid")(x)
    x = tf.keras.layers.Dense(1, activation="relu")(x)  # All labels are positive
    model = tf.keras.Model(inputs=inputs, outputs=x)
    model.compile(optimizer=tf.keras.optimizers.SGD(0.1), loss="mean_absolute_error
    ")
    return model

federated_learning(create_sgd_model, FedAdam(), non_iid_client_data(X_train, Y_train))

100%|████████████████████████████████████████████████| 250/250 [01:32<00:00,  2.70it/s, Loss: 0.634]

R2 score: -0.1577248615841711
MAE: 0.9079689140678654





### With i.i.d. Data

With the following we show that this effect is not present when the data is i.i.d. with respect to the clients.

In [13]:
federated_learning(create_model, fedavg, iid_client_data(X_train, Y_train))

  0%|          | 0/250 [00:00<?, ?it/s]

R2 score: 0.5511912343208749
MAE: 0.5600718261213312


In [16]:
federated_learning(create_model, fedavg, mixed_iid_client_data(X_train, Y_train, 0.5))

  0%|          | 0/250 [00:00<?, ?it/s]

R2 score: -0.01119342703684234
MAE: 0.8657854685469931


# With Ridge Regression

In [45]:
class RidgeModel:
    def __init__(self, input_shape, alpha=1.0):
        self.alpha = alpha
        self.parameters = np.zeros(input_shape)

    def set_weights(self, weights):
        self.parameters = weights[0]
    
    def get_weights(self):
        return [self.parameters]

    def fit(self, X, Y, epochs=4, **_kwargs):
        # The default number of epochs makes up for the fact that there are no longer minibatches
        info = sp_opt.minimize(
            partial(RidgeModel.func, X, Y, self.alpha),
            x0=self.parameters,
            method="L-BFGS-B",
            tol=1e-6,
            bounds=[(0, np.inf)] * X.shape[1],
            jac=True,
            options={"maxiter": epochs}
        )
        self.parameters = info['x']
        return namedtuple("History", ["history"])(history={"loss": [info['fun'] / X.shape[1]]})

    def predict(self, X, **_kwargs):
        return X.dot(self.parameters)

    def func(X, Y, alpha, w):
        residual = X.dot(w) - Y
        f = 0.5 * residual.dot(residual) + 0.5 * alpha * w.dot(w)
        grad = X.T @ residual + alpha * w
        return f, grad

num_rounds = num_epochs * num_clients

## Centralised training

In [20]:
model = RidgeModel(X_train[0].shape)
model.fit(X_train, Y_train, epochs=num_epochs)
evaluate(Y_test, model.predict(X_test))

R2 score: 0.4475166555450888
MAE: 0.6637108430770596


In [47]:
federated_learning(RidgeModel, fedavg, iid_client_data(X_train, Y_train))

  0%|          | 0/250 [00:00<?, ?it/s]

R2 score: 0.4705381888755382
MAE: 0.6504825909689994


In [50]:
federated_learning(RidgeModel, fedavg, non_iid_client_data(X_train, Y_train))

  0%|          | 0/250 [00:00<?, ?it/s]

R2 score: -0.11029128020171264
MAE: 0.9635788628739557


In [49]:
federated_learning(RidgeModel, FedAdam(), non_iid_client_data(X_train, Y_train))

  0%|          | 0/250 [00:00<?, ?it/s]

R2 score: -0.3044178628197636
MAE: 1.0437436436506904


In [51]:
federated_learning(RidgeModel, fedavg, mixed_iid_client_data(X_train, Y_train, 0.5))

  0%|          | 0/250 [00:00<?, ?it/s]

R2 score: 0.21636184315453033
MAE: 0.8100472070090917


TODO: Plot trade-off between iid-proportion and performance