In [1]:
%cd ../..

C:\Users\bram_\home\msc


# General overview of running federated learning with Tensorflow Federated

## Run a single round of Federated Averaging:
```python
    state, metrics = iterative_process.next(state, federated_training_data)
```

## Initialize a _state_ before first round of Federated Averaging:
```python
    state = iterative_process.initialize()
```

## The _iterative_process_ is an object obtained by the method _tff.learning.build_federated_averaging_process_:
```python
    iterative_process = tff.learning.build_federated_averaging_process(
        model_fn,
        client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
        server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0)
    )

```

## The _model_fn_ is a function that creates a new keras model and converts it to a _tff.learning.Model object_:
```python
    def model_fn():
        keras_model = create_keras_model()
        return tff.learning.from_keras_model(
    	    keras_model,
    	    input_spec=preprocessed_exampe_dataset.element_spec,
    	    loss=tf.keras.losses.SparseCategoricalCrossentropy(),
    	    metrics=[tf.keras.metrics.SparseCategoricalAccuracy()]
```

## The keras model is created via an auxiliary method _create_keras_model_ which needs to create a new model:
```python
    def create_keras_model():
        return tf.keras.models.Sequential([
            ...
        ])
```

## Create the _federated_train_data_ object
The `federated_train_data` object is a list of `tf.data.Dataset` objects. For each participating client, there is one `tf.data.Dataset` object. How to create one should be described [here](https://www.tensorflow.org/guide/data) according to [this](https://www.py4u.net/discuss/166632) blog post. Another interesting post was [here](https://stackoverflow.com/a/55462901/9356123) on StackOverflow.

# Implementation

In [2]:
# from IPython.display import clear_output

# tensorflow_federated_nightly also bring in tf_nightly, which
# can causes a duplicate tensorboard install, leading to errors.
# !pip uninstall --yes tensorboard tb-nightly

# !pip install --upgrade tensorflow-federated-nightly
# !pip install --upgrade nest-asyncio
# !pip install --upgrade tb-nightly

import nest_asyncio
nest_asyncio.apply()

In [3]:
# TensorFlow and tf.keras
import tensorflow as tf
import tensorflow_federated as tff

# Helper libraries
import numpy as np
import pandas as pd
import collections

from sklearn.model_selection import train_test_split

from utils import cidds_001 as utils

print(tf.__version__)

2.2.0


## Load data

In [4]:
# load and shuffle week1
week1 = pd.read_feather('saved_dfs/cidds-001/traffic/OpenStack/CIDDS-001-internal-week1-cleaned.feather')
week1_shuffled = week1.sample(frac=1, random_state=13).reset_index(drop=True)

# load and shuffle week2
week2 = pd.read_feather('saved_dfs/cidds-001/traffic/OpenStack/CIDDS-001-internal-week2-cleaned.feather')
week2_shuffled = week2.sample(frac=1, random_state=13).reset_index(drop=True)

## Balance datasets

In [5]:
week1_balanced = utils.get_balanced_cidds(week1_shuffled)
week2_balanced = utils.get_balanced_cidds(week2_shuffled)

## Inspect training of week1

In [6]:
pd.DataFrame(week1_balanced.groupby(by='attack_type').size(), columns=['count'])

Unnamed: 0_level_0,count
attack_type,Unnamed: 1_level_1
---,1626
bruteForce,1626
dos,1626
pingScan,1626
portScan,1626


## Inspect week2

In [7]:
pd.DataFrame(week2_balanced.groupby(by='attack_type').size(), columns=['count'])

Unnamed: 0_level_0,count
attack_type,Unnamed: 1_level_1
---,2731
bruteForce,2731
dos,2731
pingScan,2731
portScan,2731


## Preprocess datasets
* drop unused columns
* split datasets in features and labels and one hot encode the labels

In [8]:
week1_x = week1_balanced.drop(columns=utils.columns_to_drop + ['attack_type'])
week1_y = pd.get_dummies(week1_balanced['attack_type'])

week2_x = week2_balanced.drop(columns=utils.columns_to_drop + ['attack_type'])
week2_y = pd.get_dummies(week2_balanced['attack_type'])

## Normalize datasets

In [9]:
_ = utils.z_score_normalization(week1_x, utils.columns_to_normalize)
_ = utils.z_score_normalization(week2_x, utils.columns_to_normalize)

## Split week1 and week2 in training and testing datasets

In [10]:
x_train_week1, x_test_week1, y_train_week1, y_test_week1 = train_test_split(week1_x, week1_y, test_size=0.2, random_state=13)
x_train_week2, x_test_week2, y_train_week2, y_test_week2 = train_test_split(week2_x, week2_y, test_size=0.2, random_state=13)

## Convert everything to numpy arrays

In [11]:
x_train_week1 = x_train_week1.to_numpy()
x_test_week1  = x_test_week1.to_numpy()
x_train_week2 = x_train_week2.to_numpy()
x_test_week2  = x_test_week2.to_numpy()

In [12]:
y_train_week1 = y_train_week1.to_numpy()
y_test_week1  = y_test_week1.to_numpy()
y_train_week2 = y_train_week2.to_numpy()
y_test_week2  = y_test_week2.to_numpy()

## Create the `federated_train_data`

In [13]:
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 10

def preprocess(dataset):
    def batch_format_fn(x, y):
        return collections.OrderedDict(
            x=x,
            y=y
        )
    
    return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch(
        BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

In [14]:
def make_client_data():
    client_data_dict = {
        'client_1': tf.data.Dataset.from_tensor_slices((x_train_week1, y_train_week1)),
        'client_2': tf.data.Dataset.from_tensor_slices((x_train_week2, y_train_week2))
    }

    client_data = tff.simulation.ClientData.from_clients_and_fn(
        client_ids=['client_1', 'client_2'],
        create_tf_dataset_for_client_fn=lambda key: client_data_dict[key]
    )

    return client_data

In [15]:
def make_federated_data(client_data, client_ids):
    return [
        preprocess(client_data.create_tf_dataset_for_client(x))
        for x in client_ids
    ]

In [16]:
client_data = make_client_data()
client_ids = client_data.client_ids
federated_train_data = make_federated_data(client_data, client_ids)

## Obtain the element_spec of the input that the federated model will receive

In [17]:
preprocessed_example_data = federated_train_data[0]
tff_input_element_spec = preprocessed_example_data.element_spec

# sample_batch = tf.nest.map_structure(
#     lambda x: x.numpy(),
#     next(iter(preprocessed_example_data))
# )
# sample_batch

## Create functions to create the TFF model

In [18]:
def create_keras_model():
    return tf.keras.Sequential([
        tf.keras.layers.Input(shape=(16,)),
        tf.keras.layers.Dense(100, activation='relu'),
        tf.keras.layers.Dropout(rate=0.2),
        tf.keras.layers.Dense(100, activation='relu'),
        tf.keras.layers.Dropout(rate=0.2),
        tf.keras.layers.Dense(5, activation='softmax')
    ])

In [19]:
def model_fn():
    keras_model = create_keras_model()
    return tff.learning.from_keras_model(
        keras_model,
        input_spec=tff_input_element_spec,
        loss=tf.keras.losses.CategoricalCrossentropy(),
        metrics=[tf.keras.metrics.CategoricalAccuracy(), tf.keras.metrics.CategoricalCrossentropy()]
    )

## Train the model on federated data

In [20]:
iterative_process = tff.learning.build_federated_averaging_process(
    model_fn=model_fn,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02),
    server_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=1.0)
)

Instructions for updating:
If using Keras pass *_constraint arguments to layers.


In [21]:
str(iterative_process.initialize.type_signature)

'( -> <model=<trainable=<float32[16,100],float32[100],float32[100,100],float32[100],float32[100,5],float32[5]>,non_trainable=<>>,optimizer_state=<int64>,delta_aggregate_state=<value_sum_process=<>,weight_sum_process=<>>,model_broadcast_state=<>>@SERVER)'

In [22]:
state = iterative_process.initialize()

In [23]:
NUM_ROUNDS = 20
for round_num in range(NUM_ROUNDS):
  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num+1, metrics))

round  1, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('value_sum_process', ()), ('weight_sum_process', ())])), ('train', OrderedDict([('categorical_accuracy', 0.8156759), ('categorical_crossentropy', 0.5535047), ('loss', 0.553455)]))])
round  2, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('value_sum_process', ()), ('weight_sum_process', ())])), ('train', OrderedDict([('categorical_accuracy', 0.87873536), ('categorical_crossentropy', 0.34057474), ('loss', 0.3405747)]))])
round  3, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('value_sum_process', ()), ('weight_sum_process', ())])), ('train', OrderedDict([('categorical_accuracy', 0.8936195), ('categorical_crossentropy', 0.30198026), ('loss', 0.3019388)]))])
round  4, metrics=OrderedDict([('broadcast', ()), ('aggregation', OrderedDict([('value_sum_process', ()), ('weight_sum_process', ())])), ('train', OrderedDict([('categorical_accuracy', 0.9014919), ('categorica

# Test the global model

## Concatenate test sets from week1 and week2 to obtain a bigger test set

In [24]:
test_x = np.concatenate([x_test_week1, x_test_week2])
test_y = np.concatenate([y_test_week1, y_test_week2])

## Assign the federated trained weights to a model that can be used

In [25]:
model = create_keras_model()
model.compile(
    loss=tf.keras.losses.CategoricalCrossentropy(),
    metrics=[tf.keras.metrics.CategoricalAccuracy(), tf.keras.metrics.CategoricalCrossentropy()])
state.model.assign_weights_to(model)

## Predict the test set and create a confusion matrix

In [26]:
pred_y = model.predict(test_x)

In [27]:
y_lbl = np.argmax(test_y, axis=1)
pred_y_lbl = np.argmax(pred_y, axis=1)

In [28]:
tf.math.confusion_matrix(labels=y_lbl, predictions=pred_y_lbl, num_classes=5)

<tf.Tensor: shape=(5, 5), dtype=int32, numpy=
array([[863,  23,   5,   2,   2],
       [105, 667,   0,   6,  45],
       [  1,   0, 905,   0,   0],
       [ 38,  27,   0, 810,   3],
       [ 11,  75,   0,  35, 734]])>