# Introduction to Batch Training on Ray AIR/Data

### Learning objectives
In this this tutorial, you will learn about:
 * [Ray Dataset](#dataset)
 * [AIR Trainer](#trainer)
 * [Ray Tune](#tune)

Batch training and tuning are common tasks in simple machine learning use-cases such as time series forecasting. They require fitting of simple models on multiple data batches corresponding to locations, products, etc. This notebook showcases how to conduct batch training using [Ray Dataset](https://docs.ray.io/en/latest/data/dataset.html), [Ray AIR Trainers](https://docs.ray.io/en/master/ray-air/trainer.html#air-trainers), and [Ray Tune](https://docs.ray.io/en/master/ray-air/tuner.html).

For the data, we will use the [NYC Taxi dataset](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page).  This popular tabular dataset contains historical taxi pickups by timestamp and location in NYC.  <s>The goal is to predict future, hourly taxi demand by location in NYC.</s>  To demonstrate batch training & tuning, we will simplify the data to a linear regression problem to predict `trip_duration`) and use Scikit-learn.

To demonstrate how data and training can be batch-parallelized, we will train a separate model for each pickup location. This means we can use the pickup_location_id column in the dataset to group the dataset into data batches. Then we will fit a separate model for each batch. 

Let’s start by importing a few required libraries, including open-source [Ray](https://github.com/ray-project/ray) itself!

In [1]:
import sys, os
import pandas as pd
# import numpy as np
import random
import pyarrow as pa
import pyarrow.dataset as pads
import ray
from ray.data import Dataset

# import utility functions
import local_utils.dataprep

num_available_cpus = os.cpu_count()
print(f'Number of CPUs in this system: {num_available_cpus}')

Number of CPUs in this system: 8


# Data <a class="anchor" id="dataset"></a>

Next, read some data using Ray Dataset.   This will initialize a Ray cluster.  Then we can use the [Ray Dataset](https://docs.ray.io/en/latest/data/getting-started.html#datasets-getting-started) APIs to quickly inspect the data.

In [2]:
# To speed things up, we’ll only use a small subset of the full dataset consisting of two last months of 2019. 
# You can choose to use the full dataset for 2018-2019 by setting the SMOKE_TEST variable to False.

SMOKE_TEST = True


In [3]:
# Read some Parquet files in parallel.

dataset = pads.dataset(
    "s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/",
    partitioning=["year", "month"],
)
starting_idx = -2 if SMOKE_TEST else 0

data_files = [f"s3://{file}" for file in dataset.files][starting_idx:]
print(f"Obtained {len(data_files)} files!")
data_files

Obtained 2 files!


['s3://air-example-data/ursa-labs-taxi-data/by_year/2019/05/data.parquet/359c21b3e28f40328e68cf66f7ba40e2_000000.parquet',
 's3://air-example-data/ursa-labs-taxi-data/by_year/2019/06/data.parquet/ab5b9d2b8cc94be19346e260b543ec35_000000.parquet']

In [4]:
# Read some Parquet files in parallel.

# TODO get rid of this sampling later
data_files = data_files[0]

ds = ray.data.read_parquet(data_files)
print(type(ds))

2022-10-09 12:49:50,903	INFO worker.py:1223 -- Using address localhost:9031 set in the environment variable RAY_ADDRESS
2022-10-09 12:49:54,199	INFO worker.py:1333 -- Connecting to existing Ray cluster at address: 172.31.113.136:9031...
2022-10-09 12:49:54,208	INFO worker.py:1509 -- Connected to Ray cluster. View the dashboard at [1m[32mhttps://console.anyscale-staging.com/api/v2/sessions/ses_H1JcaTJ4HC9zz1U32BnfSwYa/services?redirect_to=dashboard [39m[22m
2022-10-09 12:49:54,397	INFO packaging.py:342 -- Pushing file package 'gcs://_ray_pkg_bf59b22914c31152b1df18d55f17ac18.zip' (63.00MiB) to Ray cluster...
2022-10-09 12:49:55,425	INFO packaging.py:351 -- Successfully pushed file package 'gcs://_ray_pkg_bf59b22914c31152b1df18d55f17ac18.zip'.


<class 'ray.data.dataset.Dataset'>


In [5]:
# Parquet stores the number of rows per file in the Parquet metadata, 
# so we can get the number of rows in ds without triggering a full data read!
print(f"Number rows: {ds.count()}")

# Parquet pulls size-in-bytes from its metadata (not triggering a data read)
# This could be significantly different than actual in-memory size!
print(f"Size bytes (from parquet metadata): {ds.size_bytes()}")
# Trigger full reading of the dataset and inspect the size in bytes.
print(f"Size bytes (from full data read): {ds.fully_executed().size_bytes()}")

# Fetch the schema from the underlying Parquet metadata.
print("\nSchema data types:")
data_types = list(zip(ds.schema().names, ds.schema().types))
[print(f"{s[0]}: {s[1]}") for s in data_types]

# Take a peek at a sample row
print("\nLook at a sample row:")
ds.take(1)

# Number rows: 7565261

Number rows: 7565261
Size bytes (from parquet metadata): 1002980150


Read progress: 100%|██████████| 1/1 [00:03<00:00,  3.13s/it]


Size bytes (from full data read): 625081194

Schema data types:
vendor_id: string
pickup_at: timestamp[us]
dropoff_at: timestamp[us]
passenger_count: int8
trip_distance: float
rate_code_id: string
store_and_fwd_flag: string
pickup_location_id: int32
dropoff_location_id: int32
payment_type: string
fare_amount: float
extra: float
mta_tax: float
tip_amount: float
tolls_amount: float
improvement_surcharge: float
total_amount: float
congestion_surcharge: float

Look at a sample row:


[ArrowRow({'vendor_id': '1',
           'pickup_at': datetime.datetime(2019, 5, 1, 0, 14, 50),
           'dropoff_at': datetime.datetime(2019, 5, 1, 0, 16, 48),
           'passenger_count': 1,
           'trip_distance': 0.0,
           'rate_code_id': '1',
           'store_and_fwd_flag': 'N',
           'pickup_location_id': 145,
           'dropoff_location_id': 145,
           'payment_type': '2',
           'fare_amount': 3.0,
           'extra': 0.5,
           'mta_tax': 0.5,
           'tip_amount': 0.0,
           'tolls_amount': 0.0,
           'improvement_surcharge': 0.30000001192092896,
           'total_amount': 4.300000190734863,
           'congestion_surcharge': 0.0})]

Normally there is some data exploration to determine the cleaning steps.  Let's just assume we know the data cleaning steps are:
- Drop negative trip distances, 0 fares, 0 passengers, less than 1min trip durations
- Drop 2 unknown zones ['264', '265']
- Calculate trip duration in minutes and add it as a new column
- Groupby, aggregate sum taxi rides, hourly per pickup location


In [6]:
# A Pandas DataFrame UDF for transforming the underlying blocks of a Dataset in parallel.
def transform_batch_scratch(the_df: pd.DataFrame) -> pd.DataFrame:
    df = the_df.copy()
    df["trip_duration"] = (df["dropoff_at"] - df["pickup_at"]).dt.seconds
    df = df[df["trip_distance"] > 0]
    df = df[df["fare_amount"] > 0]
    df = df[df["passenger_count"] > 0]
    df = df[df["trip_duration"] >= 60]
    return df

# batch_format="pandas" tells Datasets to provide the transformer with blocks
# represented as Pandas DataFrames.
print(ds.count())
ds = ds.map_batches(transform_batch_scratch, batch_format="pandas")

# verify row count
ds_rows = ds.count()
print(f"Final number rows: {ds_rows}")

# 236,848 rows were dropped with that cleaning


7565261


Map_Batches: 100%|██████████| 1/1 [00:24<00:00, 24.52s/it]


Final number rows: 7328413


In [None]:
# # Q. Is there an easier way to get count distinct?

# # Num distinct pickup location_ids
# groupby_agg = ds.groupby("pickup_location_id").mean("trip_distance").take()
# num_location_id = len(groupby_agg)
# print(f"Count distinct pickup location ids: {num_location_id}")

In [8]:
# This data has simplified location_ids 1 ... 20
sample_locations = list(range(1, 21))

if SMOKE_TEST:
    sample_locations = random.sample(sample_locations, 3)
    
print(sample_locations)

[7, 18, 17]


<b>Filter on Read - Projection and Filter Pushdown</b>

Note that Ray Datasets' Parquet reader supports projection (column selection) and row filter pushdown, where we can push the above column selection and the row-based filter to the Parquet read. If we specify column selection at Parquet read time, the unselected columns won't even be read from disk!

The row-based filter is specified via [Arrow's dataset field expressions](https://arrow.apache.org/docs/6.0/python/generated/pyarrow.dataset.Expression.html#pyarrow.dataset.Expression). 

<b>Best practice is to filter as much as you can directly in the Ray Dataset read_parquet() statement.</b>

TODO: repartition data to 2 CPU


In [9]:
def pushdown_read_data(files_list: list,
                       sample_ids: list) -> Dataset:
    filter_expr = (
        (pads.field("passenger_count") > 0)
        & (pads.field("trip_distance") > 0)
        & (pads.field("fare_amount") > 0)
        & (pads.field("pickup_location_id").isin(sample_ids))
    )

    the_dataset = ray.data.read_parquet(
        files_list,
        columns=[
            'pickup_at', 'dropoff_at', 'pickup_location_id',
            'passenger_count', 'trip_distance', 'fare_amount'], 
        filter=filter_expr,
    )

    # Force full execution of both of the file reads.
    the_dataset = the_dataset.fully_executed()
    return the_dataset

In [10]:
# Test the pushdown_read_data function
pushdown_ds = pushdown_read_data(data_files, sample_locations)

print(f"Number rows: {pushdown_ds.count()}")
# Display some metadata about the dataset.
print("\nMetadata: ")
print(pushdown_ds)
# Fetch the schema from the underlying Parquet metadata.
print("\nSchema:")
print(pushdown_ds.schema())
# Take a peek at a single row
print("\nLook at a sample row:")
pushdown_ds.take(1)


Read progress: 100%|██████████| 1/1 [00:01<00:00,  1.71s/it]


Number rows: 11495

Metadata: 
Dataset(num_blocks=1, num_rows=11495, schema={pickup_at: timestamp[us], dropoff_at: timestamp[us], pickup_location_id: int32, passenger_count: int8, trip_distance: float, fare_amount: float})

Schema:
pickup_at: timestamp[us]
dropoff_at: timestamp[us]
pickup_location_id: int32
passenger_count: int8
trip_distance: float
fare_amount: float
-- schema metadata --
pandas: '{"index_columns": [{"kind": "range", "name": null, "start": 0, "' + 2548

Look at a sample row:


[ArrowRow({'pickup_at': datetime.datetime(2019, 5, 1, 0, 7, 43),
           'dropoff_at': datetime.datetime(2019, 5, 1, 0, 10, 56),
           'pickup_location_id': 7,
           'passenger_count': 1,
           'trip_distance': 0.8600000143051147,
           'fare_amount': 4.5})]

In [11]:
# check sampling
# Q. Some sort of mapping is happening for location_ids ?
df = pushdown_ds.to_pandas()
print(df[["pickup_location_id", "trip_distance"]].groupby("pickup_location_id").count())

df = ds.to_pandas(limit=ds.count())
# How many ids in all the data?
print("\nCount distinct location_ids in original data")
print(df[["pickup_location_id", "trip_distance"]].groupby("pickup_location_id").count().shape[0])
print(df[["pickup_location_id", "trip_distance"]].groupby("pickup_location_id").count())

                    trip_distance
pickup_location_id               
7                           10143
17                           1199
18                            153

Count distinct location_ids in original data
261
                    trip_distance
pickup_location_id               
1                              61
2                              17
3                             135
4                           12215
5                              14
...                           ...
261                         41438
262                         90327
263                        134458
264                         56325
265                          1660

[261 rows x 1 columns]


<b>Custom data transform functions</b>

Ray Datasets allows you to specify custom data transform functions using familiar syntax, such as Pandas.  These <b>custom functions, or UDFs,</b> can be called using `ds.map_batches(my_UDF, batch_format="pandas")`.  It is necessary to specify the language you are using the `batch_format parameter`.

TODO: Reference link for syntax supported in Datasets UDFs <br>
TODO: Mention chaining UDFs using [BatchMapper](https://docs.ray.io/en/latest/ray-air/check-ingest.html)

In [12]:
# A Pandas DataFrame UDF for transforming the underlying blocks of a Dataset in parallel.
def transform_batch(the_df: pd.DataFrame) -> pd.DataFrame:
    df = the_df.copy()    
    df["trip_duration"] = (df["dropoff_at"] - df["pickup_at"]).dt.seconds    
    df = df[df["trip_duration"] >= 60]    
    df.drop(["dropoff_at", "pickup_at"], axis=1, inplace=True)
    df['pickup_location_id'] = df['pickup_location_id'].fillna(-1)
    return df

In [17]:
# Test the transform UDF function
print(pushdown_ds.count())

# batch_format="pandas" tells Datasets to provide the transformer with blocks
# represented as Pandas DataFrames.
pushdown_ds = pushdown_ds.map_batches(transform_batch, batch_format="pandas")

# verify row count
pushdown_rows = pushdown_ds.count()
print(f"Final number rows: {pushdown_rows}")

# Looks good. Replace ds with pushdown
ds = pushdown_ds


Final number rows: 11398


<b>Random shuffle</b>

Randomly shuffling data is an important part of training machine learning models: it decorrelates samples, preventing overfitting and improving generalization. For many models, even between-epoch shuffling can drastically improve the precision gain per step/epoch. Datasets has a hyper-scalable distributed random shuffle that allows you to realize the model accuracy benefits of per-epoch shuffling without sacrificing training throughput, even at large data scales and even when doing distributed data-parallel training across multiple GPUs/nodes.

In [18]:
# do a full global random shuffle to decorrelate the data
ds = ds.random_shuffle()

Shuffle Map: 100%|██████████| 1/1 [00:00<00:00, 91.34it/s]
Shuffle Reduce: 100%|██████████| 1/1 [00:00<00:00, 113.94it/s]


<b>Split data into train/valid/test </b> 

We are ready to split the data into train/valid/test.  For now, we will just randomly split the data into 80/20 train/test.


In [19]:
target = "trip_duration"

# Split data into train and validation.
train_ds, valid_ds = ds.train_test_split(test_size=0.2)

# Create a test dataset by dropping the target column.
test_ds = valid_ds.drop_columns(cols=[target])

assert train_ds.count() + valid_ds.count() == ds.count()
print(f"Number rows train, test: ", end="")
print(f"{train_ds.count()}, {test_ds.count()}")

Map_Batches: 100%|██████████| 1/1 [00:00<00:00, 105.89it/s]


Number rows train, test: 9118, 2280


In [20]:
# delete data to free up memory in our Ray cluster
del ds
del train_ds
del valid_ds
del test_ds

<b>Tidying up</b>

To make our code more modular and easier to read, let's put all those data processing steps into a single function called `prepare_data()`.  See [scripts](../scripts/dataprep.py) for the full code.


In [11]:
# Q. Seeing errors calling included functions this way?

# # Single function call for preparing data using Ray Dataset
# train_ds, valid_ds, test_ds = local_utils.dataprep.prepare_data(data_files, target)


In [21]:
from typing import Tuple
def prepare_data(files_list: list, 
                 target: str, 
                 sample_locations: list) -> Tuple[Dataset, Dataset, Dataset]:
    # Pushdown - filter on read data from parquet
    the_dataset = pushdown_read_data(files_list, sample_locations)
    
    # Perform transformation using pandas UDF `transform_batch`
    the_dataset = the_dataset.map_batches(
            transform_batch, 
            batch_format="pandas")   
        
    # Perform a global shuffle
    the_dataset = the_dataset.random_shuffle()
    
    # Split data into train/valid
    train_dataset, valid_dataset = \
        the_dataset.train_test_split(test_size=0.2)
    
    # Create test data same as valid
    test_dataset = valid_dataset.drop_columns([target])
    
    # Return train, valid, test
    return train_dataset, valid_dataset, test_dataset

In [22]:
# Test the prepare_data function
target = "trip_duration"
train_ds, valid_ds, test_ds = prepare_data(data_files, target, sample_locations)

print(f"Number rows train, test: ", end="")
print(f"{train_ds.count()}, {test_ds.count()}")

Read progress: 100%|██████████| 1/1 [00:01<00:00,  1.50s/it]
Map_Batches: 100%|██████████| 1/1 [00:00<00:00, 32.97it/s]
Shuffle Map: 100%|██████████| 1/1 [00:00<00:00, 152.11it/s]
Shuffle Reduce: 100%|██████████| 1/1 [00:00<00:00, 147.95it/s]
Map_Batches: 100%|██████████| 1/1 [00:00<00:00, 79.24it/s]


Number rows train, test: 9118, 2280


In [23]:
test_ds.take(1)

[PandasRow({'pickup_location_id': 7,
            'passenger_count': 1,
            'trip_distance': 1.159999966621399,
            'fare_amount': 7.5})]

# AIR Trainer <a class="anchor" id="trainer"></a>

Ray AI Runtime (AIR) is a scalable and unified toolkit for ML applications.  AIR builds on Ray’s best-in-class libraries for Preprocessing, Training, Tuning, Scoring, Serving, and Reinforcement Learning to bring together an ecosystem of integrations.

<b>Scaling</b>

By default, Dataset tasks use all available cluster CPU resources for execution. This can sometimes conflict with Trainer resource requests. For example, if Trainers allocate all CPU resources in the cluster, then no Datasets tasks can run.

A good rule of thumb, if you know you need to do other things besides Train, is to reserve a couple CPUs for those other purposes.

TODO:  Better explanation!  This doesn't quite make sense yet.  Trainer output looks like it used 2 cpu, even though 5 were specified, why?

In [24]:
print(f'Number of CPUs in this system: {num_available_cpus}')

# decide how many processors to use for training
# always reserve 1 cpu for the Ray head node
num_datasets_cpus = 2
num_training_cpus = num_available_cpus - num_datasets_cpus - 1
print(f"Dataset CPUs: {num_datasets_cpus}, Trainer CPUs: {num_training_cpus}")

# assign resources for AIR Trainer
trainer_resources = {"CPU": num_training_cpus}

Number of CPUs in this system: 8
Dataset CPUs: 2, Trainer CPUs: 5


<b>Training preprocessor</b>

After data preparation, often there are special transformations required per algorithm.  For example:
- One-hot encoding for categorical variables
- Variable encoding for categorical variables
- Standard scaler for numeric variables

You can pass these preprocessors to a trainer. Ray Train will take care of applying the preprocessor to the dataset in a distributed fashion.


In [26]:
# Create a preprocessor to scale some columns.
from ray.data.preprocessors import Chain, OrdinalEncoder, StandardScaler

preprocessor = StandardScaler(
    columns=["passenger_count", "fare_amount", ])

AIR Trainer

Ray Train has several Trainer classes that make it possible to do distributed training. Trainers are wrapper classes around third-party training frameworks like Scikit-learn, XGBoost, LightGBM, HuggingFace, Tensorflow, Horovod, Pytorch, RLlib, and more. AIR Trainers provide integration with core Ray actors (for distribution), Tune, and Dataset.

Q. Below why do I see 2 outputs for Train?  I expected 5 outputs because of 5 parallel CPUs for Training? <br>
Q. Why do you have to explicitly do ray.init() before train but you don't have to for Datasets?

Tip: Below, you might want to turn on scrolling for the output. In the cell below, right-click, select Enable Scrolling for Outputs.

In [30]:
# Not sure why, but might have to run this in terminal!
# python3 -m pip install scikit-learn

/usr/bin/python3: No module named pip


In [31]:
import sklearn
from sklearn.linear_model import LinearRegression
from ray.train.sklearn import SklearnTrainer, SklearnPredictor
from ray.air.config import ScalingConfig
from ray.air.result import Result

trainer = SklearnTrainer(
    # SKlearn specific params
    estimator=LinearRegression(),
    # Scaling params
    scaling_config=ScalingConfig(trainer_resources=trainer_resources),
    label_column=target,
    cv=5,
    # Ray Datasets to use for train/valid
    datasets={"train": train_ds, "valid": valid_ds},
    # Ray Datasets preprocessor before training
    preprocessor=preprocessor,
)
result = trainer.fit()

Trial name,status,loc,iter,total time (s),fit_time
SklearnTrainer_77252_00000,TERMINATED,172.31.113.136:11001,1,2.72673,1.16454


Result for SklearnTrainer_77252_00000:
  cv:
    fit_time: [0.0037877559661865234, 0.0030989646911621094, 0.002981901168823242, 0.0030295848846435547,
      0.003016948699951172]
    fit_time_mean: 0.0031830310821533204
    fit_time_std: 0.0003047430651245634
    score_time: [0.0014061927795410156, 0.0013928413391113281, 0.001375436782836914,
      0.001344442367553711, 0.0013527870178222656]
    score_time_mean: 0.0013743400573730468
    score_time_std: 2.3308803112227476e-05
    test_score: [0.011420282770100765, 0.1048737937677513, 0.0862853510099334, 0.030362331879895788,
      0.014760074659894928]
    test_score_mean: 0.04954036681751524
    test_score_std: 0.03858131297863796
  date: 2022-10-09_13-03-44
  done: false
  experiment_id: c3dd0da438c44e9f9adf0cdd32babc11
  fit_time: 1.1645383834838867
  hostname: ip-172-31-113-136
  iterations_since_restore: 1
  node_ip: 172.31.113.136
  pid: 11001
  should_checkpoint: true
  time_since_restore: 2.726732015609741
  time_this_iter_s: 

2022-10-09 13:03:45,009	INFO tune.py:758 -- Total run time: 6.11 seconds (4.96 seconds for the tuning loop).


In [None]:
# def train_sklearn(num_cpus: int, use_gpu: bool = False) -> Result:
#     if use_gpu and not cuMLRandomForestClassifier:
#         raise RuntimeError("cuML must be installed for GPU enabled sklearn estimators.")

#     train_dataset, valid_dataset, _ = prepare_data()

#     # Scale some random columns
#     columns_to_scale = ["mean radius", "mean texture"]
#     preprocessor = Chain(
#         OrdinalEncoder(["categorical_column"]), StandardScaler(columns=columns_to_scale)
#     )

#     if use_gpu:
#         trainer_resources = {"CPU": 1, "GPU": 1}
#         estimator = cuMLRandomForestClassifier()
#     else:
#         trainer_resources = {"CPU": num_cpus}
#         estimator = RandomForestClassifier()

#     trainer = SklearnTrainer(
#         estimator=estimator,
#         label_column="target",
#         datasets={"train": train_dataset, "valid": valid_dataset},
#         preprocessor=preprocessor,
#         cv=5,
#         scaling_config=ScalingConfig(trainer_resources=trainer_resources),
#     )
#     result = trainer.fit()
#     print(result.metrics)

#     return result
