# Federated Learning - Minimal Example

## Imports

In [1]:
import pandas as pd
import numpy as np
import os
import tqdm
from sklearn.model_selection import train_test_split, RepeatedKFold
from sklearn.metrics import r2_score
import matplotlib.pyplot as plt
from itertools import product
from math import floor
import time

import tensorflow as tf
import tensorflow_federated as tff
from keras.models import Sequential
from keras.layers import Dense, InputLayer
from keras.callbacks import CSVLogger

# -> check tff
#print(tff.federated_computation(lambda: 'Hello World')()) 

2023-05-10 14:09:13.204554: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


## Ingest Data
---

In [2]:
# ingest data

df_locs = ['../output/data/insurance-clean.csv',
    "https://raw.githubusercontent.com/Olhaau/fl-official-statistics-addon/main/output/data/insurance-clean.csv"
]

def load_df(df_locs):
    """ Loads data from a path to a csv-file.
    
    :param df_locs: possible locations of a CSV file
    :type df_locs: str or list of str
    :output: Ingested Data.
    :rtype: pandas.DataFrame 
    """
    df = pd.DataFrame()

    if isinstance(df_locs, str): df_locs = [df_locs]
    
    for df_loc in df_locs:
        try:
            df = pd.read_csv(df_loc, index_col = 0)
            print("loaded data from {}".format(df_loc))
            if len(df) != 0: break
        except Exception as ex:
            print("{} in ".format(type(ex).__name__), df_loc)

    return df

df = load_df(df_locs)
df.head(3)

loaded data from ../output/data/insurance-clean.csv


Unnamed: 0,age,sex,bmi,children,smoker,region,charges,region0,region1,region2,region3
0,0.021739,0.0,0.321227,0.0,1.0,southwest,16884.924,0.0,0.0,0.0,1.0
1,0.0,1.0,0.47915,0.2,0.0,southeast,1725.5523,0.0,0.0,1.0,0.0
2,0.217391,1.0,0.458434,0.6,0.0,southeast,4449.462,0.0,0.0,1.0,0.0


In [3]:
# select features and target (first column)
features = ['age', 'sex', 'bmi', 'children', 'smoker'
            , 'region0', 'region1', 'region2', 'region3']
target = 'charges'

df.loc[:, [target]+features ].head(3)

Unnamed: 0,charges,age,sex,bmi,children,smoker,region0,region1,region2,region3
0,16884.924,0.021739,0.0,0.321227,0.0,1.0,0.0,0.0,0.0,1.0
1,1725.5523,0.0,1.0,0.47915,0.2,0.0,0.0,0.0,1.0,0.0
2,4449.462,0.217391,1.0,0.458434,0.6,0.0,0.0,0.0,1.0,0.0


## Create Data Shards (Federated)
---

In [140]:
# Prepare the data
# =====================

# 1. create client data
# =====================

#client_seperator = 'region'
#clients = [
#    df.loc[df[client_seperator] == x, df.columns != client_seperator][[target] + features] 
#    for x in df[client_seperator].unique()
#    ]

# or randomly
clients = [df[[target] + features].sample(frac = 1./4, ignore_index = True) for _ in range(4)]


# 2. evaluation splits
# ====================

clients_split = [train_test_split(data, test_size = 20, random_state = 42) for data in clients]
clients_train = [split[0] for split in clients_split]
clients_test  = [split[1] for split in clients_split]

# for cross validation use:
#cv = RepeatedKFold(n_splits = 5, n_repeats = 5, random_state = 42)
#client_cv_splits = [cv.split(data) for data in clients]
#[[[data.iloc[train,:], data.iloc[test,:]] for train, test in cv.split(data)] for data in clients][1][1]

# prep the data (incl. convert to tensor)
# =======================================

def prep_fed_train(X_train, y_train,
    #NUM_EPOCHS = 50,
    #BATCH_SIZE = 128,
    #SHUFFLE_BUFFER = 20,
    #PREFETCH_BUFFER = 5,
    #SEED = 42
    ):
    """
    See https://www.tensorflow.org/federated/tutorials/federated_learning_for_image_classification#preprocessing_the_input_data
    """

    return tf.data.Dataset.from_tensor_slices((
        tf.convert_to_tensor(X_train), 
        tf.convert_to_tensor(y_train)
    ))#.repeat(NUM_EPOCHS
    #).shuffle(SHUFFLE_BUFFER, seed = SEED
    #).batch(BATCH_SIZE
    #).prefetch(PREFETCH_BUFFER)
def prep_fed_test(X_test, y_test):
    return tf.data.Dataset.from_tensor_slices((
        tf.convert_to_tensor(np.expand_dims(X_test, axis=0)), 
        tf.convert_to_tensor(np.expand_dims(y_test, axis=0))
        )) 

train_data_fed = [prep_fed_train(df[features], df[target]) for df in clients_train]
test_data_fed  = [prep_fed_test (df[features], df[target]) for df in clients_test]

# show random_client_ds
if False: # only correct, without #.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER, seed=1).batch(BATCH_SIZE).prefetch(PREFETCH_BUFFER)
    i = 0 
    for client_dfs in random_client_ds:
        print('================================ client {} =================================='.format(i))
        client_train = list(client_dfs[0].as_numpy_iterator())
        client_test  = list(client_dfs[1])
        print('Train (obs = {})'.format(len(client_train[0])))

        print(pd.DataFrame(client_train[:3], columns = ["X", "y"]))
        print('Test (obs = {})'.format(len(client_test[0])))
        #print('obs: {}')
        print(pd.DataFrame(client_test[:3], columns = ["X", "y"]))
        # print([x[:3] for x in client_test])
        i += 1

## Build Federated Model
---

In [270]:
def keras_generator():
    return create_keras_model(nfeatures = 9, compile = False)

keras_generator

<function __main__.keras_generator()>

In [302]:
# build a TFF model from Keras
# ============================

def model_fn(
    keras_creator,
    loss = tf.keras.losses.MeanAbsoluteError(),
    metrics0 = [tf.keras.metrics.MeanAbsoluteError()]#,
    ):
    """ Wrap a Keras model as Tensorflow Federated model. 
    
    cf. https://www.tensorflow.org/federated/tutorials/federated_learning_for_image_classification#creating_a_model_with_keras
    """
    def _model():
        # We _must_ create a new model here, and _not_ capture it from an external
        # scope. TFF will call this within different graph contexts.
        
        #keras_model = create_keras_model(
        #    nfeatures = nfeatures, compile = False#, **kwargs
        #    )
        
        keras_model = keras_generator()

        return tff.learning.models.from_keras_model(
            keras_model,
            input_spec = (
                tf.TensorSpec((None, keras_model.input.shape[1]
                ), dtype = tf.float64),
                tf.TensorSpec((None,),           dtype = tf.float64)
            ), loss = loss#, metrics = metrics0#[tf.keras.metrics.MeanAbsoluteError()]
        )

    return _model

## Train
---

In [304]:
def keras_blueprint():
    return create_keras_model(nfeatures = 9, compile = False, units =[40,40,40])

print(keras_blueprint().summary())

model_fed = model_fn(keras_creator = keras_blueprint)

client_optimizer = lambda: tf.optimizers.Adam(learning_rate = .05)
server_optimizer = lambda: tf.optimizers.Adam(learning_rate = .05)

# Create iterative learning process which will perform the federated learning
process_fed = tff.learning.algorithms.build_weighted_fed_avg(
    model_fed,
    client_optimizer_fn = client_optimizer,
    server_optimizer_fn = server_optimizer)

# show the learning process
print(process_fed.initialize.type_signature.formatted_representation())

Model: "sequential_26"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_104 (Dense)           (None, 40)                400       
                                                                 
 dense_105 (Dense)           (None, 40)                1640      
                                                                 
 dense_106 (Dense)           (None, 40)                1640      
                                                                 
 dense_107 (Dense)           (None, 1)                 41        
                                                                 
Total params: 3,721
Trainable params: 3,721
Non-trainable params: 0
_________________________________________________________________
None
( -> <
  global_model_weights=<
    trainable=<
      float32[9,40],
      float32[40],
      float32[40,40],
      float32[40],
      float32[40,20],
      float32[20],
      fl

In [285]:
def train_fed(process, train_data,
    NUM_ROUNDS = 50,
    NUM_EPOCHS = 50,
    BATCH_SIZE = 128,
    SHUFFLE_BUFFER = 20,
    PREFETCH_BUFFER = 5,
    SEED = 2134,
    verbose = True
    ):
    
    # prep the data
    train_data = [
        data.
            repeat(NUM_EPOCHS).
            shuffle(SHUFFLE_BUFFER, seed = SEED).
            batch(BATCH_SIZE).
            prefetch(PREFETCH_BUFFER)

        for data in train_data]
    
    # initialize the process
    state = process.initialize()
    hist, states = [], []

    for round in range(NUM_ROUNDS):
        
        if SEED != None: tf.keras.utils.set_random_seed(SEED)
        result  = process.next(state, train_data)
        
        state   = result.state
        metrics = dict(result.metrics['client_work']['train'].items())
        states.append(state)
        hist.append(metrics)

        if verbose == True: print('round {:2d} / {}, metrics = {}'.format(round + 1, NUM_ROUNDS, metrics))


    return {'process': process, 'history': hist, 'states': states}




In [306]:
result = train_fed(
    process = process_fed, 
    train_data = train_data_fed, 
    NUM_ROUNDS = 150, 
    NUM_EPOCHS = 50
    )

2023-05-10 17:26:41.420777: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:982] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2023-05-10 17:26:41.420843: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 1
2023-05-10 17:26:41.421026: I tensorflow/core/grappler/clusters/single_machine.cc:358] Starting new session
2023-05-10 17:26:41.421477: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:982] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2023-05-10 17:26:41.421544: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:982] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2023-05-10 17:26:41.421587: I tensor

round  1 / 150, metrics = {'loss': 5280.538, 'num_examples': 62800, 'num_batches': 492}
round  2 / 150, metrics = {'loss': 5014.588, 'num_examples': 62800, 'num_batches': 492}
round  3 / 150, metrics = {'loss': 4863.902, 'num_examples': 62800, 'num_batches': 492}
round  4 / 150, metrics = {'loss': 4753.081, 'num_examples': 62800, 'num_batches': 492}
round  5 / 150, metrics = {'loss': 4607.9, 'num_examples': 62800, 'num_batches': 492}
round  6 / 150, metrics = {'loss': 4533.719, 'num_examples': 62800, 'num_batches': 492}
round  7 / 150, metrics = {'loss': 4438.5522, 'num_examples': 62800, 'num_batches': 492}
round  8 / 150, metrics = {'loss': 4317.974, 'num_examples': 62800, 'num_batches': 492}
round  9 / 150, metrics = {'loss': 4214.6895, 'num_examples': 62800, 'num_batches': 492}
round 10 / 150, metrics = {'loss': 4115.7007, 'num_examples': 62800, 'num_batches': 492}
round 11 / 150, metrics = {'loss': 4008.9834, 'num_examples': 62800, 'num_batches': 492}
round 12 / 150, metrics = {'lo

## Test

In [None]:
model_weights = iterative_process.get_model_weights(result['states'][-1])

In [None]:
# Model evaluation
evaluation = tff.learning.build_federated_evaluation(model_fed)
# print(evaluation.type_signature.formatted_representation())
perf = evaluation(model_weights, test_data_fed)

  evaluation = tff.learning.build_federated_evaluation(model_fed)
2023-05-10 17:20:17.877339: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:982] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2023-05-10 17:20:17.877407: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 1
2023-05-10 17:20:17.877573: I tensorflow/core/grappler/clusters/single_machine.cc:358] Starting new session
2023-05-10 17:20:17.878279: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:982] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have been built without NUMA support.
2023-05-10 17:20:17.878380: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:982] could not open file to read NUMA node: /sys/bus/pci/devices/0000:01:00.0/numa_node
Your kernel may have bee

In [None]:
perf

OrderedDict([('eval',
              OrderedDict([('loss', 2038.998),
                           ('num_examples', 80),
                           ('num_batches', 4)]))])

In [None]:
model = keras_blueprint()
model_weights.assign_weights_to(model)
model.compile(
    loss=tf.losses.mae,
    # loss=tf.losses.mean_squared_error,
    optimizer=tf.optimizers.Adam(),
    metrics=["mae", 'mean_squared_error']
)

X_test = pd.concat([data[features] for data in clients_test])
y_test = pd.concat([data[target]   for data in clients_test])

# The evaluation results, for technical reasons the metrics_names is called afterwards. However, its order fits to the results
print(model.evaluate(X_test, y_test, verbose = 0))
print(model.metrics_names)


ValueError: Cannot assign value to variable ' dense_110/kernel:0': Shape mismatch.The variable shape (40, 40), and the assigned value shape (40, 20) are incompatible.

## Repeat

In [None]:
# appendix
def build_model(
    nfeatures = 9,
    units = [40, 40, 20], 
    activations = ['relu'] * 3, 
    compile = True,
    loss = 'mean_squared_error',
    optimizer = tf.optimizers.legacy.Adam(learning_rate = .05),
    metrics = ["mae", 'mean_squared_error', r2_score], 
    run_eagerly = True
    ):
  
  """Construct a fully connected neural network and compile it.
  
  Parameters
  ------------
  nfeatures: int, optional
    Number of input features. Default is 9.
  units: list of int, optional
    List of number of units of the hidden dense layers. The length of ``units`` defines the number of hidden layers. Default are 3 layers with 40, 40 an 20 units, respectively.
  activations: list of str, optional
    List of activation functions used in the hidden layers.
  loss: str, optional
    Used loss function for compiling.
  optimizer: keras.optimizers, optional
    Used optimizer for compiling.
  metrics: list of str or sklearn.metrics
    List of metrics for compiling.
  run_eagerly: bool
    Parameter for compiling

  Return
  ------------
    model: keras.engine.sequential.Sequential
      Keras sequential fully connected neural network. Already compiled.
  """
  
  # construct model
  model = Sequential()
  model.add(InputLayer(input_shape = [nfeatures]))
  for ind in range(len(units)):
    model.add(Dense(
      units = units[ind], 
      activation = activations[ind]
      ))
  model.add(Dense(1))
  
  # compile model
  if compile:
    model.compile(
      loss = loss,
      optimizer = optimizer,
      metrics = metrics,
      run_eagerly = run_eagerly
    )

  return model


def model_fn(nfeatures = 5):
    model = build_model(nfeatures = nfeatures, compile = False)
    return tff.learning.models.from_keras_model(
        model,
        input_spec = (
            tf.TensorSpec((None, nfeatures), dtype = tf.float64),
            tf.TensorSpec((None,), dtype = tf.float64)),
        loss = tf.keras.losses.MeanAbsoluteError(),
        #loss = tf.keras.losses.MeanSquaredError(),
        metrics = [tf.keras.metrics.MeanAbsoluteError()
        #metrics = [tf.keras.losses.MeanSquaredError()
        #, tfa.metrics.RSquare()
        ]
    )

def model_fed(nfeatures = 5):
    def _model():
        return model_fn(nfeatures = nfeatures)

    iterative_process = tff.learning.algorithms.build_weighted_fed_avg(
        _model,
        client_optimizer_fn=lambda: tf.optimizers.Adam(learning_rate = .05),
        server_optimizer_fn=lambda: tf.optimizers.Adam(learning_rate = .05),)
    return iterative_process

iterative_process = model_fed(9)