In [1]:
# tag::ds_create[]
from ray.data import from_pandas
from sklearn.datasets import load_breast_cancer


data_raw = load_breast_cancer(as_frame=True)  # <1>

dataset_df = data_raw["data"]
predict_ds = from_pandas(dataset_df)  # <2>

dataset_df["target"] = data_raw["target"]
dataset = from_pandas(dataset_df)
# end::ds_create[]

2022-10-05 02:43:54,507	INFO worker.py:1518 -- Started a local Ray instance.


In [None]:
# tag::preprocessor_create[]
# TODO this doesn't exist. Can we use another preprocessor that need less explanation
#  (we don't really cover repartitioning) and exists somewhere?
from ray.data.preprocessors import Chain, StandardScaler
# from dummy import Repartitioner

preprocessor = Chain(  # <1>
    StandardScaler(["worst radius", "worst area"]),  # <2>
    # Repartitioner(num_partitions=2)  # <3>
)
# end::preprocessor_create[]

In [None]:
# tag::train_fit_basic[]
from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig

trainer = XGBoostTrainer(  # <1>
    scaling_config=ScalingConfig(  # <2>
        num_workers=2,
        resources_per_worker={
            "CPU": 2,
            "GPU": 0,
        },
    ),
    label_column="target",  # <3>
    params={  # <4>
        "tree_method": "approx",
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    },
    datasets={"train": dataset},
    preprocessor=preprocessor,
)

result = trainer.fit()  # <5>

print(result)
# end::train_fit_basic[]

In [None]:
# tag::training_func_setup[]
import torch
import torch.nn as nn


num_samples = 20
input_size = 10
layer_size = 15
output_size = 5


class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)

    def forward(self, input_data):
        return self.layer2(self.relu(self.layer1(input_data)))


input = torch.randn(num_samples, input_size)  # <1>
labels = torch.randn(num_samples, output_size)


def train_loop(model, loss_fn, optimizer):
    output = model(input)
    loss = loss_fn(output, labels)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


def train_func():  # <2>
    num_epochs = 3
    model = NeuralNetwork()
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
    for epoch in range(num_epochs):
        train_loop(model, loss_fn, optimizer)
# end::training_func_setup[]

In [None]:
# tag::dist_func_setup[]
from ray.train.torch import prepare_model


def train_func_distributed():
    num_epochs = 3
    model = NeuralNetwork()
    model = prepare_model(model)  # <1>
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
    for epoch in range(num_epochs):
        train_loop(model, loss_fn, optimizer)

# end::dist_func_setup[]

In [None]:
# tag::train_start[]
from ray.train import Trainer


trainer = Trainer(backend="torch", num_workers=4, use_gpu=False)  # <1>
trainer.start()
results = trainer.run(train_func_distributed)
trainer.shutdown()
# end::train_start[]

In [None]:
# tag::xgb_start[]
scaling_config = ScalingConfig(num_workers=10, use_gpu=True)

trainer = XGBoostTrainer(
    scaling_config=scaling_config,
    datasets={"train": dataset},
    label_column="target",
    params={}
    # ...
)
# end::xgb_start[]

In [None]:
# tag::get_datasets[]
import torch
import torch.nn as nn

import ray
import ray.train as train
from ray.train import Trainer


def get_datasets(a=5, b=10, size=1000, split=0.8):

    def get_dataset(a, b, size):
        items = [i / size for i in range(size)]
        dataset = ray.data.from_items([{"x": x, "y": a * x + b} for x in items])
        return dataset

    dataset = get_dataset(a, b, size)
    split_index = int(dataset.count() * split)
    train_dataset, validation_dataset = dataset.random_shuffle().split_at_indices(
        [split_index]
    )
    train_dataset_pipeline = train_dataset.repeat().random_shuffle_each_window()
    validation_dataset_pipeline = validation_dataset.repeat()
    datasets = {
        "train": train_dataset_pipeline,
        "validation": validation_dataset_pipeline,
    }
    return datasets
# end::get_datasets[]

In [None]:
# tag::train_func_with_data[]
def train_func(config):
    batch_size = config["batch_size"]
    # hidden_size = config["hidden_size"]
    # lr = config.get("lr", 1e-2)
    epochs = config.get("epochs", 3)

    train_dataset_pipeline_shard = train.get_dataset_shard("train")
    validation_dataset_pipeline_shard = train.get_dataset_shard("validation")
    train_dataset_iterator = train_dataset_pipeline_shard.iter_epochs()
    validation_dataset_iterator = validation_dataset_pipeline_shard.iter_epochs()

    for _ in range(epochs):
        train_dataset = next(train_dataset_iterator)
        validation_dataset = next(validation_dataset_iterator)
        train_torch_dataset = train_dataset.to_torch(
            label_column="y",
            feature_columns=["x"],
            label_column_dtype=torch.float,
            feature_column_dtypes=torch.float,
            batch_size=batch_size,
        )
        validation_torch_dataset = validation_dataset.to_torch(
            label_column="y",
            feature_columns=["x"],
            label_column_dtype=torch.float,
            feature_column_dtypes=torch.float,
            batch_size=batch_size,
        )
        # ... training

    return results
# end::train_func_with_data[]

In [None]:
# tag::put_things_together[]
from ray.tune.logger import JsonLoggerCallback, TBXLoggerCallback

datasets = get_datasets()
trainer = Trainer("torch", num_workers=4, use_gpu=False)
config = {"lr": 1e-2, "hidden_size": 1, "batch_size": 4, "epochs": 3}
trainer.start()
results = trainer.run(
   train_func,
   config,
   dataset=datasets,
   #  TODO: I got an error with the JSON logger:
    #   (is there any benefit of using it here?)
    #   AttributeError: 'JsonLoggerCallback' object has no
    #   attribute 'start_training'
   # callbacks=[JsonLoggerCallback(), TBXLoggerCallback()],
)
trainer.shutdown()
print(results)

# end::put_things_together[]

In [None]:
# tag::ray_tune_integration[]
from ray import tune

fail_after_finished = True
prep_v1 = preprocessor
prep_v2 = preprocessor

param_space = {
    "scaling_config": ScalingConfig(
        num_workers=tune.grid_search([2, 4]),
        resources_per_worker={
            "CPU": 2,
            "GPU": 0,
        },
    ),
    "preprocessor": tune.grid_search([prep_v1, prep_v2]),
    # TODO: make this work
    # "datasets": {
    #     "train_dataset": tune.grid_search([dataset_v1, dataset_v2]),
    # },
    "params": {
        "objective": "binary:logistic",
        "tree_method": "approx",
        "eval_metric": ["logloss", "error"],
        "eta": tune.loguniform(1e-4, 1e-1),
        "subsample": tune.uniform(0.5, 1.0),
        "max_depth": tune.randint(1, 9),
    },
}

# from dummy import StopperCallbaack
# TODO this also does not exist. Even "fail_after_finished"
#  seems to be made up. How to fix this?
if fail_after_finished > 0:
    callbacks = None
    # callbacks = [StopperCallback(fail_after_finished=fail_after_finished)]
else:
    callbacks = None
tuner = tune.Tuner(
    XGBoostTrainer(
        run_config={"max_actor_restarts": 1},
        scaling_config=None,
        resume_from_checkpoint=None,
        label_column="target",
        datasets={"train": dataset}
    ),
    run_config={},
    param_space=param_space,
    name="tuner_resume",
    callbacks=callbacks,
)
results = tuner.fit()
print(results.results)
# end::ray_tune_integration[]



In [None]:
# tag::trainer_callbacks[]
from ray.air.callbacks.mlflow import MLflowLoggerCallback


result = trainer.fit(
    train_func,
    callbacks=[
        MLflowLoggerCallback(experiment_name="train_experiment"),
        TBXLoggerCallback(),
    ],
)
# end::trainer_callbacks[]

In [None]:
# tag::export_model[]
# TODO why fit again?
result = trainer.fit(dataset=dataset, preprocessor=preprocessor)
print(result)

this_checkpoint = result.checkpoint
this_model = this_checkpoint.load_model()
predicted = this_model.predict(predict_ds)
print(predicted.to_pandas())
# end::export_model[]