In [None]:
#https://docs.ray.io/en/latest/ray-core/examples/batch_prediction.html

# Task-based batch prediction

In [1]:
import pandas as pd
import numpy as np

def load_model():
    # A dummy model.
    def model(batch: pd.DataFrame) -> pd.DataFrame:
        # Dummy payload so copying the model will actually copy some data
        # across nodes.
        model.payload = np.zeros(100_000_000)
        return pd.DataFrame({"score": batch["passenger_count"] % 2 == 0})
    
    return model

In [2]:
import pyarrow.parquet as pq
import ray

@ray.remote
def make_prediction(model, shard_path):
    df = pq.read_table(shard_path).to_pandas()
    result = model(df)

    # Write out the prediction result.
    # NOTE: unless the driver will have to further process the
    # result (other than simply writing out to storage system),
    # writing out at remote task is recommended, as it can avoid
    # congesting or overloading the driver.
    # ...

    # Here we just return the size about the result in this example.
    return len(result)

In [3]:
# 12 files, one for each remote task.
input_files = [
        f"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet"
        f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet"
        for i in range(12)
]

# ray.put() the model just once to local object store, and then pass the
# reference to the remote tasks.
model = load_model()
model_ref = ray.put(model)

result_refs = []

# Launch all prediction tasks.
for file in input_files:
    # Launch a prediction task by passing model reference and shard file to it.
    # NOTE: it would be highly inefficient if you are passing the model itself
    # like make_prediction.remote(model, file), which in order to pass the model
    # to remote node will ray.put(model) for each task, potentially overwhelming
    # the local object store and causing out-of-disk error.
    result_refs.append(make_prediction.remote(model_ref, file))

results = ray.get(result_refs)

# Let's check prediction output size.
for r in results:
    print("Prediction output size:", r)

2022-12-12 18:43:55,194	INFO worker.py:1230 -- Using address localhost:9031 set in the environment variable RAY_ADDRESS
2022-12-12 18:43:55,665	INFO worker.py:1352 -- Connecting to existing Ray cluster at address: 10.0.63.8:9031...
2022-12-12 18:43:55,683	INFO worker.py:1529 -- Connected to Ray cluster. View the dashboard at [1m[32mhttps://console.anyscale.com/api/v2/sessions/ses_buwxbm99nq8dryqg6p8sbytw/services?redirect_to=dashboard [39m[22m
2022-12-12 18:43:55,692	INFO packaging.py:373 -- Pushing file package 'gcs://_ray_pkg_5bb1f25bbe9ee2fb06464440d8021e4c.zip' (0.11MiB) to Ray cluster...
2022-12-12 18:43:55,695	INFO packaging.py:386 -- Successfully pushed file package 'gcs://_ray_pkg_5bb1f25bbe9ee2fb06464440d8021e4c.zip'.


Prediction output size: 141062
Prediction output size: 133932
Prediction output size: 144014
Prediction output size: 143087
Prediction output size: 148108
Prediction output size: 141981
Prediction output size: 136394
Prediction output size: 136999
Prediction output size: 139985
Prediction output size: 156198
Prediction output size: 142893
Prediction output size: 145976
(scheduler +12s) Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.
(scheduler +12s) Adding 1 node(s) of type worker-node-type-0.
(scheduler +2m22s) Resized to 24 CPUs.


# Actor-based batch prediction

In [5]:
import pandas as pd
import pyarrow.parquet as pq
import ray

@ray.remote
class BatchPredictor:
    def __init__(self, model):
        self.model = model
        
    def predict(self, shard_path):
        df = pq.read_table(shard_path).to_pandas()
        result =self.model(df)

        # Write out the prediction result.
        # NOTE: unless the driver will have to further process the
        # result (other than simply writing out to storage system),
        # writing out at remote task is recommended, as it can avoid
        # congesting or overloading the driver.
        # ...

        # Here we just return the size about the result in this example.
        return len(result)

In [6]:
from ray.util.actor_pool import ActorPool

model = load_model()
model_ref = ray.put(model)
num_actors = 4
actors = [BatchPredictor.remote(model_ref) for _ in range(num_actors)]
pool = ActorPool(actors)
input_files = [
        f"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet"
        f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet"
        for i in range(12)
]
for file in input_files:
    pool.submit(lambda a, v: a.predict.remote(v), file)
while pool.has_next():
    print("Prediction output size:", pool.get_next())

Prediction output size: 141062
Prediction output size: 133932
Prediction output size: 144014
Prediction output size: 143087
Prediction output size: 148108
Prediction output size: 141981
Prediction output size: 136394
Prediction output size: 136999
Prediction output size: 139985
Prediction output size: 156198
Prediction output size: 142893
Prediction output size: 145976
(scheduler +7m25s) Removing 1 nodes of type worker-node-type-0 (idle).
(scheduler +7m36s) Resized to 8 CPUs.


# Batch prediction with GPUs

In [7]:
import torch

@ray.remote(num_gpus=1)
def make_torch_prediction(model: torch.nn.Module, shard_path):
    # Move model to GPU.
    model.to(torch.device("cuda"))
    inputs = pq.read_table(shard_path).to_pandas().to_numpy()

    results = []
    # for each tensor in inputs:
    #   results.append(model(tensor))
    #
    # Write out the results right in task instead of returning back
    # to the driver node (unless you have to), to avoid congest/overload
    # driver node.
    # ...

    # Here we just return simple/light meta information.
    return len(results)

  from .autonotebook import tqdm as notebook_tqdm
