# Ray Training Pipeline

This notebook is based upon one of the [Ray tutorials](https://docs.ray.io/en/latest/train/examples/transformers/huggingface_text_classification.html) on the official documentation pages.

We will take a dataset, train a Huggging Face transformers model on the data
and then save the model for future inferencing downstream.

You can run this notebook in Google Colab (recommended) or locally using the devcontainer setup provided in the repo folder.

## Pointers


*   If running locally and you don't have GPU, set GPU=false.
* In general, check what memory you have available, especially if running a Docker container (OOM errors are common!).
* If on Colab or elsewhere with free compute, have fun but remember you are still only running on a single node cluster.







In [None]:
# from google.colab import files
# uploaded = files.upload()

TypeError: 'NoneType' object is not subscriptable

In [None]:
# !pip install -r requirements-colab.txt

In [None]:
!pip install -U "datasets==3.6.0" "evaluate==0.4.3" "transformers>=4.19.0" \
"torch>=1.10.0" "mlflow==3.1.0" \
"ray[train,tune,default]==2.47.0" "jupyterlab==4.4.3" \
"jupyter-client<8" "notebook" "jupyter-kernel-gateway<2.6" "torchvision>=0.11.0,<0.17.0"

Collecting datasets==3.6.0
  Downloading datasets-3.6.0-py3-none-any.whl.metadata (19 kB)
Collecting evaluate==0.4.3
  Downloading evaluate-0.4.3-py3-none-any.whl.metadata (9.2 kB)
Collecting torch>=1.10.0
  Downloading torch-2.7.1-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (29 kB)
Collecting mlflow==3.1.0
  Downloading mlflow-3.1.0-py3-none-any.whl.metadata (29 kB)
Collecting ray==2.47.0 (from ray[default,train,tune]==2.47.0)
  Downloading ray-2.47.0-cp311-cp311-manylinux2014_x86_64.whl.metadata (20 kB)
Collecting jupyterlab==4.4.3
  Downloading jupyterlab-4.4.3-py3-none-any.whl.metadata (16 kB)
Collecting jupyter-client<8
  Downloading jupyter_client-7.4.9-py3-none-any.whl.metadata (8.5 kB)
Collecting notebook
  Downloading notebook-7.4.4-py3-none-any.whl.metadata (10 kB)
Collecting torchvision<0.17.0,>=0.11.0
  Downloading torchvision-0.16.2-cp311-cp311-manylinux1_x86_64.whl.metadata (6.6 kB)
Collecting fsspec<=2025.3.0,>=2023.1.0 (from fsspec[http]<=2025.3.0,>=2023.1.0->dataset

In [None]:
from pprint import pprint
import logging
import ray

ray.init(
    _memory=3 * 1024**3,              # 3 GB total usable memory
    object_store_memory=512 * 1024**2, # 512 MB for object store
    num_cpus=2,
    logging_level=logging.INFO
)

In [None]:
pprint(ray.cluster_resources())

In [None]:
use_gpu = False  # set this to False to run on CPUs
num_workers = 1  # set this to number of GPUs or CPUs you want to use

In [None]:

GLUE_TASKS = [
    "cola",
    "mnli",
    "mnli-mm",
    "mrpc",
    "qnli",
    "qqp",
    "rte",
    "sst2",
    "stsb",
    "wnli",
]

In [None]:
task = "cola"
model_checkpoint = "distilbert-base-uncased"
batch_size = 16

## Load Dataset

In [None]:
from google.colab import userdata
userdata.get('HF_TOKEN')

In [None]:
from datasets import load_dataset

actual_task = "mnli" if task == "mnli-mm" else task
datasets = load_dataset("glue", actual_task)

# Process Dataset

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, use_fast=True)

In [None]:
task_to_keys = {
    "cola": ("sentence", None),
    "mnli": ("premise", "hypothesis"),
    "mnli-mm": ("premise", "hypothesis"),
    "mrpc": ("sentence1", "sentence2"),
    "qnli": ("question", "sentence"),
    "qqp": ("question1", "question2"),
    "rte": ("sentence1", "sentence2"),
    "sst2": ("sentence", None),
    "stsb": ("sentence1", "sentence2"),
    "wnli": ("sentence1", "sentence2"),
}

In [None]:
import ray.data

ray_datasets = {
    "train": ray.data.from_huggingface(datasets["train"]),
    "validation": ray.data.from_huggingface(datasets["validation"]),
    "test": ray.data.from_huggingface(datasets["test"]),
}
ray_datasets

In [None]:
import numpy as np
import torch


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Tokenize input sentences
def collate_fn(examples: dict[str, np.array]):
    sentence1_key, sentence2_key = task_to_keys[task]
    if sentence2_key is None:
        outputs = tokenizer(
            list(examples[sentence1_key]),
            truncation=True,
            padding="longest",
            return_tensors="pt",
        )
    else:
        outputs = tokenizer(
            list(examples[sentence1_key]),
            list(examples[sentence2_key]),
            truncation=True,
            padding="longest",
            return_tensors="pt",
        )

    outputs["labels"] = torch.LongTensor(examples["label"])

    # Move all tensors to CPU (or GPU if available)
    for key, value in outputs.items():
        outputs[key] = value.to(device)

    return outputs

## Fine tuning

In [None]:
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer

In [None]:
import torch
import numpy as np

from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer
import evaluate


import ray.train
from ray.train.huggingface.transformers import prepare_trainer, RayTrainReportCallback

num_labels = 3 if task.startswith("mnli") else 1 if task == "stsb" else 2
metric_name = (
    "pearson"
    if task == "stsb"
    else "matthews_correlation"
    if task == "cola"
    else "accuracy"
)
model_name = model_checkpoint.split("/")[-1]
validation_key = (
    "validation_mismatched"
    if task == "mnli-mm"
    else "validation_matched"
    if task == "mnli"
    else "validation"
)
name = f"{model_name}-finetuned-{task}"

# Calculate the maximum steps per epoch based on the number of rows in the training dataset.
# Make sure to scale by the total number of training workers and the per device batch size.
max_steps_per_epoch = ray_datasets["train"].count() // (batch_size * num_workers)


def train_func(config):
    print(f"Is CUDA available: {torch.cuda.is_available()}")

    metric = evaluate.load("glue", actual_task)
    tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, use_fast=True)
    model = AutoModelForSequenceClassification.from_pretrained(
        model_checkpoint, num_labels=num_labels
    )

    train_ds = ray.train.get_dataset_shard("train")
    eval_ds = ray.train.get_dataset_shard("eval")

    train_ds_iterable = train_ds.iter_torch_batches(
        batch_size=batch_size, collate_fn=collate_fn
    )
    eval_ds_iterable = eval_ds.iter_torch_batches(
        batch_size=batch_size, collate_fn=collate_fn
    )

    print("max_steps_per_epoch: ", max_steps_per_epoch)

    args = TrainingArguments(
        name,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        logging_strategy="epoch",
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        learning_rate=config.get("learning_rate", 2e-5),
        num_train_epochs=config.get("epochs", 2),
        weight_decay=config.get("weight_decay", 0.01),
        push_to_hub=False,
        max_steps=max_steps_per_epoch * config.get("epochs", 2),
        disable_tqdm=True,  # declutter the output a little
        no_cuda=not use_gpu,  # you need to explicitly set no_cuda if you want CPUs
        report_to="none",
    )

    def compute_metrics(eval_pred):
        predictions, labels = eval_pred
        if task != "stsb":
            predictions = np.argmax(predictions, axis=1)
        else:
            predictions = predictions[:, 0]
        return metric.compute(predictions=predictions, references=labels)

    trainer = Trainer(
        model,
        args,
        train_dataset=train_ds_iterable,
        eval_dataset=eval_ds_iterable,
        tokenizer=tokenizer,
        compute_metrics=compute_metrics,
    )

    trainer.add_callback(RayTrainReportCallback())

    trainer = prepare_trainer(trainer)

    print("Starting training")
    trainer.train()

In [None]:
from ray.train.torch import TorchTrainer
from ray.train import RunConfig, ScalingConfig, CheckpointConfig

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
    datasets={
        "train": ray_datasets["train"],
        "eval": ray_datasets["validation"],
    },
    run_config=RunConfig(
        checkpoint_config=CheckpointConfig(
            num_to_keep=1,
            checkpoint_score_attribute="eval_loss",
            checkpoint_score_order="min",
        ),
    ),
)

In [None]:
result = trainer.fit()