In [None]:
# default_exp data
%load_ext lab_black
# nb_black if running in jupyter
%load_ext autoreload
# automatically reload python modules if there are changes in the
%autoreload 2

In [None]:
# hide
from nbdev.showdoc import *

# CustomClientData

> Create custom TFF ClientData from Pandas Dataframe

***input***: none / raw data

***output***: function for creating custom tff ClientData

***description:***

In this notebook we generate customer path data, and transform it into tensorflow federated [ClientData](https://www.tensorflow.org/federated/tutorials/working_with_client_data) format.
You can edit the code to load and clean and transform your own client path data.

## Import relevant modules

Uncomment the following cell to run with the newest version of tff

In [None]:
# tensorflow_federated_nightly also bring in tf_nightly, which
# can causes a duplicate tensorboard install, leading to errors.
#!pip uninstall --yes tensorboard tb-nightly

#!pip install --quiet --upgrade tensorflow-federated-nightly
#!pip install --quiet --upgrade nest-asyncio
#!pip install --quiet --upgrade tb-nightly  # or tensorboard, but not both

In [None]:
# export
import nest_asyncio

nest_asyncio.apply()
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_federated as tff

In [None]:
from pyarrow import feather
from scipy.special import softmax
from seaborn import heatmap

## Define notebook parameters

In [None]:
# This cell is tagged with 'parameters' for papermill use
seed = 0

SHUFFLE_BUFFER = 100
NUM_EPOCHS = 1
BATCH_SIZE = 32

n_customers = 10000  # number of customers (paths, assuming only one path per customer)
max_path_length = 100  # limit simulation length

Define any immediate derivative operations from the parameters:

In [None]:
# set seed
np.random.seed(seed)
tf.random.set_seed(seed)

## (or alternatively load your own data and turn it into an applicable format)

Try for example: https://cseweb.ucsd.edu/~jmcauley/datasets.html

or https://archive.ics.uci.edu/ml/datasets/Entree+Chicago+Recommendation+Data

In [None]:
from pathlib import Path
import requests, zipfile, io

p = Path(".") / "data" / "raw_data"
if not (p / "customer-journey-unil-ch-datasets").exists():  # check if already loaded
    r = requests.get(
        "http://customer-journey.me/wp-content/uploads/2018/02/customer-journey-unil-ch-datasets.zip"
    )
    z = zipfile.ZipFile(io.BytesIO(r.content))
    z.extractall(p)

In [None]:
filepath = (
    p
    / "customer-journey-unil-ch-datasets"
    / "csv"
    / "configuration6"
    / "excluding-solution"
    / "1.csv"
)

In [None]:
df = pd.read_csv(filepath)
df

Unnamed: 0,trace_id,activities,owner,employed,age,income
0,0,activity_09,no,yes,20-39yo,high
1,0,activity_04,no,yes,20-39yo,high
2,0,activity_02,no,yes,20-39yo,high
3,1,activity_09,no,yes,60-79yo,high
4,1,activity_04,no,yes,60-79yo,high
...,...,...,...,...,...,...
4295,999,activity_10,no,yes,40-59yo,middle
4296,999,activity_06,no,yes,40-59yo,middle
4297,999,activity_02,no,yes,40-59yo,middle
4298,999,activity_03,no,yes,40-59yo,middle


In [None]:
df.activities.unique()

array(['activity_09', 'activity_04', 'activity_02', 'activity_10',
       'activity_06', 'activity_03', 'activity_07', 'activity_08',
       'activity_01', 'activity_05'], dtype=object)

In [None]:
df.age.unique()

array(['20-39yo', '60-79yo', '0-19yo', '40-59yo', '80yo+'], dtype=object)

In [None]:
df.income.unique()

array(['high', 'middle', 'low'], dtype=object)

In [None]:
df.trace_id = df.trace_id.astype("int")
df.activities = df.activities.astype("category")
df.owner = df.owner.map(lambda x: 1 if "yes" else 0).astype("uint8")
df.employed = df.employed.map(lambda x: 1 if "yes" else 0).astype("uint8")
df.age = df.age.map(
    {"0-19yo": 0, "20-39yo": 1, "40-59yo": 2, "60-79yo": 3, "80yo+": 4}
).astype("uint8")
df.income = df.income.map({"low": 0, "middle": 1, "high": 2}).astype("uint8")

In [None]:
df.reset_index(inplace=True)
df.rename({"index": "action_index"}, axis=1, inplace=True)
df

Unnamed: 0,action_index,trace_id,activities,owner,employed,age,income
0,0,0,activity_09,1,1,1,2
1,1,0,activity_04,1,1,1,2
2,2,0,activity_02,1,1,1,2
3,3,1,activity_09,1,1,3,2
4,4,1,activity_04,1,1,3,2
...,...,...,...,...,...,...,...
4295,4295,999,activity_10,1,1,2,1
4296,4296,999,activity_06,1,1,2,1
4297,4297,999,activity_02,1,1,2,1
4298,4298,999,activity_03,1,1,2,1


In [None]:
#
# we need extra category for denoting that client activity has stopped
activity_ended = f"activity_{df.activities.nunique()+1:d}"
df.activities = df.activities.cat.add_categories([activity_ended])

- order of events for customer

- next and previous event

In [None]:
df.columns

Index(['action_index', 'trace_id', 'activities', 'owner', 'employed', 'age',
       'income'],
      dtype='object')

In [None]:
prev_next_df = pd.DataFrame(
    columns={
        "client_id": int,
        "action_index": "uint8",
        "prev_activity": "category",
        "owner": "uint8",
        "employed": "uint8",
        "age": "uint8",
        "income": "uint8",
        "next_activity": "category",
    }
)

for client_id, client_data in df.groupby("trace_id"):
    client_data.action_index -= client_data.action_index.min()
    client_data.action_index = client_data.action_index.astype(int)
    # rename columns
    buf_df = client_data.rename(
        {"trace_id": "client_id", "activities": "prev_activity"}, axis=1
    )

    # add new column for next activity
    buf_df["next_activity"] = client_data.activities.shift(
        periods=-1, fill_value=activity_ended
    ).astype("category")
    # add buffer to prev_next_df
    prev_next_df = pd.concat((prev_next_df, buf_df), axis=0, ignore_index=True)

In [None]:
prev_next_df

Unnamed: 0,client_id,action_index,prev_activity,owner,employed,age,income,next_activity
0,0,0,activity_09,1,1,1,2,activity_04
1,0,1,activity_04,1,1,1,2,activity_02
2,0,2,activity_02,1,1,1,2,activity_11
3,1,0,activity_09,1,1,3,2,activity_04
4,1,1,activity_04,1,1,3,2,activity_02
...,...,...,...,...,...,...,...,...
4295,999,0,activity_10,1,1,2,1,activity_06
4296,999,1,activity_06,1,1,2,1,activity_02
4297,999,2,activity_02,1,1,2,1,activity_03
4298,999,3,activity_03,1,1,2,1,activity_07


In [None]:
#
# convert state feature into one hot
onehot = pd.get_dummies(prev_next_df.prev_activity, prefix="prev")
prev_next_df[onehot.columns] = onehot
prev_next_df.drop("prev_activity", axis=1, inplace=True)

# convert label categories into numerical format
# (this is because for the moment TFF does not support multi-output models)
prev_next_df[["next_activity"]] = prev_next_df[["next_activity"]].apply(
    lambda x: x.cat.codes
)
prev_next_df = prev_next_df.astype(int)

In [None]:
prev_next_df.dtypes

client_id           int64
action_index        int64
owner               int64
employed            int64
age                 int64
income              int64
next_activity       int64
prev_activity_01    int64
prev_activity_02    int64
prev_activity_03    int64
prev_activity_04    int64
prev_activity_05    int64
prev_activity_06    int64
prev_activity_07    int64
prev_activity_08    int64
prev_activity_09    int64
prev_activity_10    int64
prev_activity_11    int64
dtype: object

In [None]:
prev_next_df

Unnamed: 0,client_id,action_index,owner,employed,age,income,next_activity,prev_activity_01,prev_activity_02,prev_activity_03,prev_activity_04,prev_activity_05,prev_activity_06,prev_activity_07,prev_activity_08,prev_activity_09,prev_activity_10,prev_activity_11
0,0,0,1,1,1,2,3,0,0,0,0,0,0,0,0,1,0,0
1,0,1,1,1,1,2,1,0,0,0,1,0,0,0,0,0,0,0
2,0,2,1,1,1,2,10,0,1,0,0,0,0,0,0,0,0,0
3,1,0,1,1,3,2,3,0,0,0,0,0,0,0,0,1,0,0
4,1,1,1,1,3,2,1,0,0,0,1,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4295,999,0,1,1,2,1,5,0,0,0,0,0,0,0,0,0,1,0
4296,999,1,1,1,2,1,1,0,0,0,0,0,1,0,0,0,0,0
4297,999,2,1,1,2,1,2,0,1,0,0,0,0,0,0,0,0,0
4298,999,3,1,1,2,1,6,0,0,1,0,0,0,0,0,0,0,0


In [None]:
#
# We need to convert the data into untidy nested format for TFF
# so that x is a vector and y is a scalar
cxy_df = pd.DataFrame(columns=["client_id", "x", "y"])
cxy_df.client_id = prev_next_df.client_id
cxy_df.x = prev_next_df[
    prev_next_df.drop(["client_id", "next_activity"], axis=1).columns
].apply(lambda row: row.to_numpy(), axis=1)
cxy_df.y = prev_next_df.next_activity

cxy_df

Unnamed: 0,client_id,x,y
0,0,"[0, 1, 1, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]",3
1,0,"[1, 1, 1, 1, 2, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]",1
2,0,"[2, 1, 1, 1, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]",10
3,1,"[0, 1, 1, 3, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]",3
4,1,"[1, 1, 1, 3, 2, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]",1
...,...,...,...
4295,999,"[0, 1, 1, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]",5
4296,999,"[1, 1, 1, 2, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0]",1
4297,999,"[2, 1, 1, 2, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]",2
4298,999,"[3, 1, 1, 2, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]",6


## Save data for further use

In [None]:
feather.write_feather(cxy_df, "data/preprocessed_data/data.f")

## Create function to convert df into tff ClientData

Following this discussion: https://stackoverflow.com/questions/58965488/how-to-create-federated-dataset-from-a-csv-file

In [None]:
# export

import collections


def create_tff_client_data_from_df(
    df,
    client_id_col="client_id",
    sample_size=1.0,
    shuffle_buffer=100,
    batch_size=32,
    num_epochs=20,
    prefetch_buffer=100,
    shuffle_seed=42,
):
    """
    turn pd dataframe into tff client dataset
    """

    def batch_format_fn(element):
        """format data into OrderedDict where x denotes features and y labels for a client"""
        return collections.OrderedDict(
            x=element["x"],  # tf.reshape(element[xcol], [-1, xshape]),
            y=element["y"],  # tf.reshape(element[ycol], [-1, yshape]),
        )

    def create_tf_dataset_for_client_fn(client_id):
        """a function which takes a client_id and returns a tf.data.Dataset for that client"""
        client_data = df[df[client_id_col] == int(client_id)]
        # create tf dataset
        dataset = tf.data.Dataset.from_tensor_slices(client_data.to_dict("list"))
        # dataset = dataset.shuffle(shuffle_buffer).batch(num_batch).repeat(num_epochs)
        dataset = (
            dataset.repeat(num_epochs)
            .shuffle(shuffle_buffer, seed=shuffle_seed)
            .batch(batch_size)
            .map(batch_format_fn)
            .prefetch(prefetch_buffer)
        )
        return dataset

    # split client id into train and test clients
    client_ids = np.random.choice(
        df[client_id_col].unique(),
        size=int(sample_size * df[client_id_col].nunique()),
    ).tolist()  # proportion of clients to use

    # train data
    def create_mapping(client_ids):
        """
        create mapping of client ids
        """
        mapping = {}
        for client_id in client_ids:
            mapping[client_id] = str(client_id)
        return mapping

    # create CustomClientData
    # this is slight misuse of FilePerUserClientData but it works for now,
    # and there seems not to be another solution for the moment :D
    # (please suggest correction if there is a better solution)
    tff_data = tff.simulation.datasets.FilePerUserClientData(
        client_ids_to_files=create_mapping(client_ids),
        dataset_fn=create_tf_dataset_for_client_fn,
    )

    return tff_data

Test (well, at least it should not crash :D)

In [None]:
tff_data = create_tff_client_data_from_df(cxy_df)

In [None]:
tff_data.create_tf_dataset_for_client(tff_data.client_ids[0])

<PrefetchDataset shapes: OrderedDict([(x, (None, 16)), (y, (None,))]), types: OrderedDict([(x, tf.int64), (y, tf.int32)])>

In [None]:
train_data, test_data = tff.simulation.datasets.ClientData.train_test_client_split(
    tff_data, 500
)