In [None]:
from __future__ import absolute_import, division, print_function
import nest_asyncio

nest_asyncio.apply()

import time
import numpy as np
import tensorflow as tf
import tensorflow_federated as tff
import pandas as pd
import matplotlib.pyplot as plt

from user import User
from average import Average
from tensorflow import keras
from sklearn.model_selection import train_test_split

import collections
import warnings

from six.moves import range
import six

SEED = 0

# import os
# os.environ['PYTHONHASHSEED']=str(SEED)
# np.random.seed(SEED)
# import random
# random.seed(SEED)
# tf.set_random_seed(SEED)
# could need to force keras to not use parallelism, see documentation

%load_ext autoreload
%autoreload 2
%matplotlib inline

#@test {"skip": true}

# NOTE: If you are running a Jupyter notebook, and installing a locally built
# pip package, you may need to edit the following to point to the '.whl' file
# on your local filesystem.

# NOTE: The high-performance executor components used in this tutorial are not
# yet included in the released pip package; you may need to compile from source.

# NOTE: Jupyter requires a patch to asyncio.


warnings.simplefilter('ignore')

tf.compat.v1.enable_v2_behavior()

np.random.seed(0)

# NOTE: If the statement below fails, it means that you are
# using an older version of TFF without the high-performance
# executor stack. Call `tff.framework.set_default_executor()`
# instead to use the default reference runtime.
if six.PY3:
    tff.framework.set_default_executor(tff.framework.create_local_executor())

tff.federated_computation(lambda: 'Hello, World!')()

In [None]:
def read_file(file):
    """
    return 2d df after imputing with 0s"""

    # read data
    df = pd.read_csv(file)

    # replace the question marks with NaN and then change data type to float 32
    df.replace(["?"],np.nan, inplace = True)
    df = df.astype(np.float32)

    # imputation
    df.fillna(0,inplace=True) # fill nulls with 0
    return df

def shuffle_df(df, seed = None):
    """Shuffle dataframe and reset the index"""
    df = df.take(np.random.RandomState(seed=SEED).permutation(df.shape[0]))
    df.reset_index(drop = True, inplace = True)
    
    return df

def acquire_user_data(df, for_user = None, seed = None):
    """
    split the dataframe into train, validation and test splits based on the same seed
    Empty dataframes if no data present
    """
    # split into train, validation and test data using sklearn and return dfs for each
    if for_user!=None:
        df = df[df["User"] == for_user]
    if df.shape[0] == 0:
        # if no data for the user, then return 9 empty dfs as per the api
        # print(f"Dataframe for user {user} is of shape {df.shape}, no data. Skipping...")
        df = pd.DataFrame()
        return df, df
    target = df["Class"]

    # drop the class and user identifier columns from data frame
    df   = df.drop(df.columns[[0,1]], axis=1)
    return df, target





def init_users(df, averaging_methods, averaging_metric = "accuracy", seed = None):
    """
    Requires the DF to contain a "User" column giving numeric identity to a user
    0 to unique_user_count-1
    
    Averaging method is a list of methods out of which a random one is selected
    
    initialise users based on dataframe given and assign random averaging method
    to them based on the list passed in.
    returns a dictionary of users(key: user object) and a global user object
    """    
    print("Initialising User instances...")
    users = dict()
    num_users = df["User"].nunique()

    for user_id in range(num_users):
        
        user_df, target = acquire_user_data(df = df, for_user=user_id, seed = seed)
        
        if user_df.shape[0]==0:
            print(f"User {user_id} has no data, no instance created...")
            continue
        
        dataset = tf.data.Dataset.from_tensor_slices((user_df.values, target.values))
        
        users[user_id] = dataset        

    print(f"{len(users.keys())} User datasets created!")
    return users

Trying to create federated ClientData dataset

In [None]:
SEED = 0
NUM_EPOCHS = 10
BATCH_SIZE = 20
SHUFFLE_BUFFER = 500
averaging_methods = [Average.all,Average.std_dev,Average.weighted_avg]


df = read_file("../dataset/allUsers.lcl.csv")
df = shuffle_df(df, SEED)
client_id_colname = 'Class' # the column that represents client IDNUM_CLIENTS = len(users)
users= init_users(df = df, 
                        averaging_methods = averaging_methods, 
                        seed = SEED)
client_ids = np.array(users.keys())
client_ids = df["User"].unique()

train_client_ids = np.random.choice(client_ids, client_ids.size//2, replace=False).tolist()
print(train_client_ids)
# client_ids.sample(frac=0.5).tolist()
test_client_ids = [x for x in client_ids if x not in train_client_ids]

def create_tf_dataset_for_client_fn(client_id):
    # a function which takes a client_id and returns a
    # tf.data.Dataset for that client
    client_data = df[df[client_id_colname] == client_id]
    dataset = tf.data.Dataset.from_tensor_slices(client_data.to_dict('list'))
#     dataset = dataset.shuffle(SHUFFLE_BUFFER).batch(1).repeat(NUM_EPOCHS)
    dataset = dataset.repeat(NUM_EPOCHS).shuffle(
          SHUFFLE_BUFFER).batch(BATCH_SIZE)
    return dataset

train_data = tff.simulation.ClientData.from_clients_and_fn(
        client_ids=train_client_ids,
        create_tf_dataset_for_client_fn=create_tf_dataset_for_client_fn
    )
test_data = tff.simulation.ClientData.from_clients_and_fn(
        client_ids=test_client_ids,
        create_tf_dataset_for_client_fn=create_tf_dataset_for_client_fn
    )

example_dataset = train_data.create_tf_dataset_for_client(
        train_data.client_ids[0]
    )
print(type(example_dataset))
example_element = iter(example_dataset).next()
print(example_element)
# <class 'tensorflow.python.data.ops.dataset_ops.RepeatDataset'>
# {'age': <tf.Tensor: shape=(1,), dtype=int32, numpy=array([37], dtype=int32)>, 'workclass': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'Local-gov'], dtype=object)>, .

This is before I started making HDF5 files and datasets from them
Structure of the file:
- examples
    - userID
        - features


- https://github.com/tensorflow/federated/blob/master/tensorflow_federated/python/simulation/hdf5_client_data_test.py
- https://github.com/tensorflow/federated/blob/v0.11.0/tensorflow_federated/python/simulation/hdf5_client_data.py
- http://docs.h5py.org/en/stable/high/group.html#Group.create_dataset
- https://stackoverflow.com/questions/55434004/create-a-custom-federated-data-set-in-tensorflow-federated
- https://stackoverflow.com/questions/58965488/how-to-create-federated-dataset-from-a-csv-file


### EMNIST example from the website
- https://www.tensorflow.org/federated/tutorials/federated_learning_for_image_classification

In [None]:
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()
example_dataset = emnist_train.create_tf_dataset_for_client(
    emnist_train.client_ids[0])

example_element = iter(example_dataset).next()

print(example_element['label'].numpy())

In [None]:

NUM_CLIENTS = 10
NUM_EPOCHS = 10
BATCH_SIZE = 20
SHUFFLE_BUFFER = 500

def preprocess(dataset):

    def element_fn(element):
        return collections.OrderedDict([
            ('x', tf.reshape(element['pixels'], [-1])),
            ('y', tf.reshape(element['label'], [1])),
        ])

    return dataset.repeat(NUM_EPOCHS).map(element_fn).shuffle(
      SHUFFLE_BUFFER).batch(BATCH_SIZE)
type(emnist_train)

preprocessed_example_dataset = preprocess(example_dataset)

sample_batch = tf.nest.map_structure(
    lambda x: x.numpy(), iter(preprocessed_example_dataset).next())

sample_batch

def make_federated_data(client_data, client_ids):
    return [preprocess(client_data.create_tf_dataset_for_client(x))
          for x in client_ids]

sample_clients = emnist_train.client_ids[0:NUM_CLIENTS]

federated_train_data = make_federated_data(emnist_train, sample_clients)

len(federated_train_data), federated_train_data[0]

def create_compiled_keras_model():
    model = tf.keras.models.Sequential([
      tf.keras.layers.Dense(
          10, activation=tf.nn.softmax, kernel_initializer='zeros', input_shape=(784,))])

    model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.02),
      metrics=[tf.keras.metrics.SparseCategoricalAccuracy()])
    return model

def model_fn():
    keras_model = create_compiled_keras_model()
    return tff.learning.from_compiled_keras_model(keras_model, sample_batch)


iterative_process = tff.learning.build_federated_averaging_process(model_fn)
str(iterative_process.initialize.type_signature)
state = iterative_process.initialize()
state, metrics = iterative_process.next(state, federated_train_data)
print('round  1, metrics={}'.format(metrics))
NUM_ROUNDS = 11
for round_num in range(2, NUM_ROUNDS):
    state, metrics = iterative_process.next(state, federated_train_data)
    print('round {:2d}, metrics={}'.format(round_num, metrics))