# Tabular data training and serving with Keras and Ray AIR

Source: https://docs.ray.io/en/latest/ray-air/examples/tfx_tabular_train_to_serve.html

## Setup Ray

In [1]:
from pprint import pprint
import ray

if ray.is_initialized():
    ray.shutdown()

ray.init()

2023-09-06 09:23:24,126	INFO worker.py:1612 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.9.18
Ray version:,2.6.3
Dashboard:,http://127.0.0.1:8265


In [2]:
pprint(ray.cluster_resources())

{'CPU': 20.0,
 'GPU': 1.0,
 'accelerator_type:G': 1.0,
 'memory': 7054123008.0,
 'node:192.168.33.188': 1.0,
 'node:__internal_head__': 1.0,
 'object_store_memory': 3527061504.0}


## Getting the data

In [3]:
import pandas as pd 

INPUT = "input"
LABEL = "is_big_tip"

import pandas as pd

INPUT = "input"
LABEL = "is_big_tip"

def get_data() -> pd.DataFrame:
    """Fetch the taxi fare data to work on."""
    _data = pd.read_csv(
        "https://raw.githubusercontent.com/tensorflow/tfx/master/"
        "tfx/examples/chicago_taxi_pipeline/data/simple/data.csv"
    )
    _data[LABEL] = _data["tips"] / _data["fare"] > 0.2
    # We drop some columns here for the sake of simplicity.
    return _data.drop(
        [
            "tips",
            "fare",
            "dropoff_latitude",
            "dropoff_longitude",
            "pickup_latitude",
            "pickup_longitude",
            "pickup_census_tract",
        ],
        axis=1,
    )

In [4]:
data = get_data()
data.head()

Unnamed: 0,pickup_community_area,trip_start_month,trip_start_hour,trip_start_day,trip_start_timestamp,trip_miles,dropoff_census_tract,payment_type,company,trip_seconds,dropoff_community_area,is_big_tip
0,,5,19,6,1400269500,0.0,,Credit Card,Chicago Elite Cab Corp. (Chicago Carriag,0.0,,False
1,,3,19,5,1362683700,0.0,,Unknown,Chicago Elite Cab Corp.,300.0,,False
2,60.0,10,2,3,1380593700,12.6,,Cash,Taxi Affiliation Services,1380.0,,False
3,10.0,10,1,2,1382319000,0.0,,Cash,Taxi Affiliation Services,180.0,,False
4,14.0,5,7,5,1369897200,0.0,,Cash,Dispatch Taxi Affiliation,1080.0,,False


### Spliting the data to training set and test set

In [7]:
import numpy as np
from sklearn.model_selection import train_test_split
from typing import Tuple

def split_data(data: pd.DataFrame) -> Tuple[ray.data.Dataset, pd.DataFrame, np.array]:
    """Split the data in a stratified way.

    Returns:
        A tuple containing train dataset, test data and test label.
    """
    
    train_data, test_data = train_test_split(data, test_size=0.2, stratify=data[LABEL], random_state=1113)
    
    _train_ds = ray.data.from_pandas(train_data)
    _test_label = test_data[LABEL].values
    _test_df = test_data.drop(LABEL, axis=1)
    return _train_ds, _test_df, _test_label

train_ds, test_df, test_label = split_data(data)

In [8]:
train_ds.count(), test_df.shape, test_label.shape

(12001, (3001, 11), (3001,))

## Preprocessing

In [11]:
from ray.data.preprocessors import (
    BatchMapper,
    Concatenator,
    Chain,
    OneHotEncoder,
    SimpleImputer,
)

def get_preprocessor():
    """Construct a chain of preprocessors."""
    imputer1 = SimpleImputer(
        ["dropoff_census_tract"], strategy="most_frequent"
    )
    imputer2 = SimpleImputer(
        ["pickup_community_area", "dropoff_community_area"],
        strategy="most_frequent",
    )
    imputer3 = SimpleImputer(["payment_type"], strategy="most_frequent")
    imputer4 = SimpleImputer(
        ["company"], strategy="most_frequent")
    imputer5 = SimpleImputer(
        ["trip_start_timestamp", "trip_miles", "trip_seconds"], strategy="mean"
    )

    ohe = OneHotEncoder(
        columns=[
            "trip_start_hour",
            "trip_start_day",
            "trip_start_month",
            "dropoff_census_tract",
            "pickup_community_area",
            "dropoff_community_area",
            "payment_type",
            "company",
        ],
        max_categories={
            "dropoff_census_tract": 25,
            "pickup_community_area": 20,
            "dropoff_community_area": 20,
            "payment_type": 2,
            "company": 7,
        },
    )

    def batch_mapper_fn(df):
        df["trip_start_year"] = pd.to_datetime(df["trip_start_timestamp"], unit="s").dt.year
        df = df.drop(["trip_start_timestamp"], axis=1)
        return df

    chained_pp = Chain(
        imputer1,
        imputer2,
        imputer3,
        imputer4,
        imputer5,
        ohe,
        BatchMapper(batch_mapper_fn, batch_format="pandas"),
        # Concatenate all columns, except LABEL into a single tensor column with name INPUT.
        Concatenator(output_column_name=INPUT, exclude=[LABEL])
    )
    return chained_pp

In [18]:
# Note that `INPUT_SIZE` here is corresponding to the dimension
# of the previously created tensor column during preprocessing.
# This is used to specify the input shape of Keras model.
INPUT_SIZE = 120
# The global training batch size. Based on `NUM_WORKERS`, each worker
# will get its own share of this batch size. For example, if
# `NUM_WORKERS = 2`, each worker will work on 4 samples per batch.
BATCH_SIZE = 32
# Number of epoch. Adjust it based on how quickly you want the run to be.
EPOCH = 1
# Number of training workers.
# Adjust this accordingly based on the resources you have!
NUM_WORKERS = 10

## Training

In [19]:
import tensorflow as tf

def build_model():
    model = tf.keras.models.Sequential()
    model.add(tf.keras.Input(shape=(INPUT_SIZE,)))
    model.add(tf.keras.layers.Dense(50, activation="relu"))
    model.add(tf.keras.layers.Dense(1, activation="sigmoid"))
    return model

In [20]:
from ray.air import session, Checkpoint
from ray.train.tensorflow import TensorflowCheckpoint

def train_loop_per_worker():
    dataset_shard = session.get_dataset_shard("train")

    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
    with strategy.scope():
        model = build_model()
        model.compile(
            loss="binary_crossentropy",
            optimizer="adam",
            metrics=["accuracy"],
        )

    for epoch in range(EPOCH):            
        tf_dataset = dataset_shard.to_tf(feature_columns=INPUT, label_columns=LABEL, batch_size=BATCH_SIZE, drop_last=True)

        model.fit(tf_dataset, verbose=0)
        # This saves checkpoint in a way that can be used by Ray Serve coherently.
        session.report(
            {},
            checkpoint=TensorflowCheckpoint.from_model(model),
        )

In [21]:
from ray.train.tensorflow import TensorflowTrainer
from ray.air.config import ScalingConfig

trainer = TensorflowTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=NUM_WORKERS),
    datasets={"train": train_ds},
    preprocessor=get_preprocessor(),
)
result = trainer.fit()

0,1
Current time:,2023-09-06 09:49:51
Running for:,00:00:14.74
Memory:,26.1/30.9 GiB

Trial name,status,loc,iter,total time (s)
TensorflowTrainer_044cb_00000,TERMINATED,192.168.33.188:29228,1,10.966


2023-09-06 09:49:36,785	INFO data_parallel_trainer.py:404 -- GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[36m(pid=29228)[0m 2023-09-06 09:49:37.753894: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
[2m[36m(pid=29228)[0m 2023-09-06 09:49:37.779600: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
[2m[36m(pid=29228)[0m To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
[2m[36m(TrainTrainable pid=29228)[0m GPUs are detected in your Ray cluster

(pid=29228) - RandomizeBlockOrder 1:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=29228) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(TensorflowTrainer pid=29228)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(SimpleImputer._transform_pandas)->MapBatches(get_pd_value_counts)] -> AllToAllOperator[RandomizeBlockOrder]
[2m[36m(TensorflowTrainer pid=29228)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(TensorflowTrainer pid=29228)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


(pid=29228) - RandomizeBlockOrder 1:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=29228) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(TensorflowTrainer pid=29228)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(get_pd_value_counts)]
[2m[36m(TensorflowTrainer pid=29228)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(TensorflowTrainer pid=29228)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(TensorflowTrainer pid=29228)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(get_pd_value_counts)]
[2m[36m(TensorflowTrainer pid=29228)[0m Execution config: ExecutionOption

(pid=29228) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(TensorflowTrainer pid=29228)[0m Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
[2m[36m(TensorflowTrainer pid=29228)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)] -> AllToAllOperator[Aggregate]
[2m[36m(TensorflowTrainer pid=29228)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(TensorflowTrainer pid=29228)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


(pid=29228) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=29228) - Aggregate 1:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=29228) Shuffle Map 2:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=29228) Shuffle Reduce 3:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=29228) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(TensorflowTrainer pid=29228)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(get_pd_value_counts)]
[2m[36m(TensorflowTrainer pid=29228)[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(TensorflowTrainer pid=29228)[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


(pid=29228) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(TensorflowTrainer pid=29228)[0m GPUs are detected in your Ray cluster, but GPU training is not enabled for this trainer. To enable GPU training, make sure to set `use_gpu` to True in your scaling config.
[2m[36m(TensorflowTrainer pid=29228)[0m Starting distributed worker processes: ['29334 (192.168.33.188)', '29335 (192.168.33.188)', '29336 (192.168.33.188)', '29337 (192.168.33.188)', '29338 (192.168.33.188)', '29339 (192.168.33.188)', '29340 (192.168.33.188)', '29341 (192.168.33.188)', '29342 (192.168.33.188)', '29343 (192.168.33.188)']
[2m[36m(TensorflowTrainer pid=29228)[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(OneHotEncoder._transform_pandas)->MapBatches(BatchMapper._transform_pandas)->MapBatches(Concatenator._t

(pid=29228) - RandomizeBlockOrder 1:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=29228) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(OneHotEncoder._transform_pandas)->MapBatches(BatchMapper._transform_pandas)->MapBatches(Concatenator._transform_pandas) pid=29272)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pandas)->MapBatches(OneHotEncoder._transform_pandas)->MapBatches(BatchMapper._transform_pandas)->MapBatches(Concatenator._transform_pandas) pid=29272)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(MapBatches(SimpleImputer._transform_pandas)->MapBatches(SimpleImputer._transform_pan

## Moving on to Serve

In [22]:
from ray import serve
from ray.air.checkpoint import Checkpoint
from ray.train.tensorflow import TensorflowPredictor
from ray.serve import PredictorDeployment
from ray.serve.http_adapters import pandas_read_json

def serve_model(checkpoint: Checkpoint, model_definition, name="Model") -> str:
    """Expose a serve endpoint.

    Returns:
        serve URL.
    """
    serve.run(
        PredictorDeployment.options(name=name).bind(
            TensorflowPredictor,
            checkpoint,
            model_definition=model_definition,
            http_adapter=pandas_read_json,
        )
    )
    return f"http://localhost:8000/"

In [23]:
import ray
# Generally speaking, training and serving are done in totally different ray clusters.
# To simulate that, let's shutdown the old ray cluster in preparation for serving.
ray.shutdown()

endpoint_uri = serve_model(result.checkpoint, build_model)

2023-09-06 09:52:08,992	INFO worker.py:1612 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
[2m[36m(HTTPProxyActor pid=33765)[0m INFO:     Started server process [33765]
[2m[36m(ServeController pid=33742)[0m INFO 2023-09-06 09:52:11,055 controller 33742 deployment_state.py:1308 - Deploying new version of deployment default_Model.
[2m[36m(ServeController pid=33742)[0m INFO 2023-09-06 09:52:11,158 controller 33742 deployment_state.py:1571 - Adding 1 replica to deployment default_Model.
[2m[36m(ServeReplica:default_Model pid=33801)[0m 2023-09-06 09:52:11.933455: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
[2m[36m(ServeReplica:default_Model pid=33801)[0m 2023-09-06 09:52:11.965000: I tensorflow/core/platform/cpu_fea

In [24]:
import json
import requests
import pandas as pd
import numpy as np

NUM_SERVE_REQUESTS = 10

def send_requests(df: pd.DataFrame, label: np.array):
    for i in range(NUM_SERVE_REQUESTS):
        one_row = df.iloc[[i]].to_dict()
        serve_result = requests.post(endpoint_uri, data=json.dumps(one_row), headers={"Content-Type": "application/json"}).json()
        print(
            f"request{i} prediction: {serve_result[0]['predictions']} "
            f"- label: {str(label[i])}"
        )

In [25]:
send_requests(test_df, test_label)

request0 prediction: [0.027962926775217056] - label: False
request1 prediction: [0.06391758471727371] - label: False
request2 prediction: [0.11197128146886826] - label: False
request3 prediction: [0.24359434843063354] - label: False
request4 prediction: [0.07752858847379684] - label: False
request5 prediction: [0.001054437831044197] - label: False
request6 prediction: [0.07377337664365768] - label: False


[2m[36m(ServeReplica:default_Model pid=33801)[0m   return pd.read_json(raw_json, **raw_request.query_params)
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeRepli

request7 prediction: [0.10239043831825256] - label: False
request8 prediction: [0.040483150631189346] - label: False
request9 prediction: [0.1115097627043724] - label: False


[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36

[2m[36m(ServeReplica:default_Model pid=33801)[0m Missing columns specified in 'exclude': {'is_big_tip'}
[2m[36m(ServeReplica:default_Model pid=33801)[0m   return bound(*args, **kwds)
[2m[36m(ServeReplica:default_Model pid=33801)[0m INFO 2023-09-06 09:52:40,933 default_Model default_Model#CjKzIu sMshFSTmtM / default replica.py:723 - __CALL__ OK 26.7ms
[2m[36m(ServeReplica:default_Model pid=33801)[0m   return pd.read_json(raw_json, **raw_request.query_params)
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default_Model pid=33801)[0m   df[f"{column}_{column_value}"] = (df[column] == column_value).astype(
[2m[36m(ServeReplica:default