# Narrow end-to-end story (but where are the 'ends' these days?)

Here we'll take a look at
* loading and reshaping data
* training a model
* serving

... using a combination of tools.

__The focus/goal is to share the *flavor* of the APIs and systems, not to go focus on solving specific problems__

>We won't cover (but are certainly not neglecting the importance of) upstream activities like data acquisistion, discovery, and catalog integration...
parallel work like experiment tracking, recording dataset provenance and features, archiving artifacts... or key downstream activities like monitoring models in production, drift or bias detection, rollout/rollback of new model versions

## Loading data

For many use cases, the initial access of the data might be via Spark (or, e.g., Trino) in order to locate tables in __Nessie__ (https://projectnessie.org/) or a __Hive Metastore__ and to assemble/extract with (potentially complex) SQL.

In this example, we'll assume we already know the locations of our data and we'll use Dask to access it.

*By design, we are not going to create a Dask distributed cluster -- we'll use Dask to define some tasks but Ray to run them. If this is confusing, we'll have you covered in a couple of minutes.*

In [None]:
import dask.dataframe as ddf

df = ddf.read_csv('data/diamonds.csv', dtype={'table':'float64'})

df

We can extend the Dask dataframe graph with some common data prep operations

In [None]:
df2 = df.categorize()

df2

In [None]:
df3 = ddf.get_dummies(df2)

df3

Ray can schedule (compute) the operations from a Dask task graph. In fact, Ray Data can integrate with lots of other data sources: https://docs.ray.io/en/latest/data/dataset.html#supported-input-formats

In [None]:
import ray

ray.init(num_cpus=4)

In [None]:
ds = ray.data.from_dask(df3)

ds

We can do *some* data manipulation with Ray Data datasets.

Today, Ray Data is envisioned as "last-mile preprocessing" along with assisting tasks that are specific to paralellism (e.g., repartition) or which require special handling in the parallel case (e.g., train/test split).

In [None]:
ds1 = ds.drop_columns('Unnamed: 0').repartition(2)

ds1

In [None]:
ds1.take(1)

In [None]:
train_dataset, valid_dataset = ds1.train_test_split(test_size=0.2)

We can use the `Trainer` pattern (https://docs.ray.io/en/latest/train/train.html#intro-to-ray-train) -- here with XGBoost, but similarly for deep learning.

In [None]:
from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig

scale = ScalingConfig(num_workers=2, use_gpu=False)

trainer = XGBoostTrainer(scaling_config=scale,
    label_column="price",
    num_boost_round=20,
    params={ "objective": "reg:squarederror", "eval_metric": ["rmse", "error"], },
    datasets={"train": train_dataset, "valid": valid_dataset},
)

result = trainer.fit()
print(result.metrics)

If we had more time and wanted more accuracy, this would be a great point to try out __Ray Tune__ and get the best hyperparams we can: https://docs.ray.io/en/latest/tune/index.html

Instead, we'll move toward serving this model via a low-latency request-response prediction service with __Ray Serve__.

Before creating our service, let's make sure everything's working

In [None]:
from ray.train.xgboost import XGBoostPredictor

predictor = XGBoostPredictor.from_checkpoint(result.checkpoint)

In [None]:
smoke_test = valid_dataset.drop_columns('price')

smoke_test.to_pandas()[:1]

In [None]:
predictor.predict(smoke_test.to_pandas()[:1])

Ok, now we'll create a service with Ray Serve to deploy our model.

We'll serialize our last model checkpoint -- in production we could do something like this or use a model db or other mechanism to find the version we want to deploy.

In [None]:
import cloudpickle

checkpoint_serialized = cloudpickle.dumps(result.checkpoint)

At first, it might not be obvious why (or even whether) we want a system as complex as Ray for serving models.

In this demo case, we could probably solve the problem other ways. But when we have multiple services, ensembling of models, conditional flow, autoscaling and heterogeneous hardware ... we'll be glad to have a tool designed for just such challenges.

https://docs.ray.io/en/latest/serve/scaling-and-resource-allocation.html#autoscaling

In [None]:
import pandas as pd
from starlette.requests import Request
from typing import Dict
from ray import serve

@serve.deployment(route_prefix="/", num_replicas=2)
class DiamondPricerDeployment:
    def __init__(self, checkpoint:bytes):
        self._model = XGBoostPredictor.from_checkpoint(cloudpickle.loads(checkpoint))

    async def __call__(self, request: Request) -> Dict:
        data = await request.json()
        return { "result" : self._model.predict(pd.read_json(data)).predictions[0] }

serve.run(DiamondPricerDeployment.bind(checkpoint=checkpoint_serialized))

Ok... let's make some predictions!

In [None]:
sample_row = smoke_test.to_pandas()[:1].copy(True)
sample_row.carat = 0.8
sample_row

In [None]:
import requests

print(requests.post("http://localhost:8000/", json = sample_row.to_json()).json())