# GPT-J-6B Fine-Tuning with Ray AIR and DeepSpeed

In this example, we will showcase how to use the Ray AIR for **GPT-J fine-tuning**.

This notebook can be interactively stepped through to:
1. [Set up Ray](#setup)
2. [Load the dataset](#load)
3. [Preprocess the dataset with Ray AIR](#preprocess)
4. [Run the training with Ray AIR](#train)
5. [Generate text from prompt with Ray AIR](#predict)

The notebook also includes a **One-Line** option to submit the entire script to create a training job.  Doing so executes the entire notebook as a script, called gptj_deepspeed_fine_tuning.py.  Feel free to update that script with your own data and run to start fine-tuning on your own data!


6. [Submit code as an Anyscale Job](#submit-as-an-anyscale-job)

Uncomment and run the following line in order to install all the necessary dependencies (this notebook is being tested with `transformers==4.26.0`):

In [None]:
import numpy as np
import pandas as pd
import os
import evaluate

## Set up Ray <a name="setup"></a>

First, let's set some global variables. We will use 16 workers, each being assigned 1 GPU and 8 CPUs.

In [None]:
model_name = "EleutherAI/gpt-j-6b"
use_gpu = True
num_workers = 16
cpus_per_worker = 8

We will use `ray.init()` to initialize a local cluster. By default, this cluster will be comprised of only the machine you are running this notebook on. You can also run this notebook on an Anyscale cluster.

We define a {ref}`runtime environment <runtime-environments>` to ensure that the Ray workers have access to all the necessary packages. You can omit the `runtime_env` argument if you have all of the packages already installed on each node in your cluster.

In [None]:
import ray

ray.init()

## Loading the dataset <a name="load"></a>

We will be fine-tuning the model on the [`tiny_shakespeare` dataset](https://huggingface.co/datasets/tiny_shakespeare), comprised of 40,000 lines of Shakespeare from a variety of Shakespeare's plays. The aim will be to make the GPT-J model better at generating text in the style of Shakespeare.

In [None]:
from datasets import load_dataset

print("Loading tiny_shakespeare dataset")
current_dataset = load_dataset("tiny_shakespeare")
current_dataset

We will use [Ray Data](data) for distributed preprocessing and data ingestion. We can easily convert the dataset obtained from Hugging Face Hub to Ray Data by using {meth}`ray.data.from_huggingface`.

In [None]:
import ray.data

ray_datasets = ray.data.from_huggingface(current_dataset)
ray_datasets

Because the dataset is represented by a single large string, we will need to do some preprocessing. For that, we will define two Ray AIR Preprocessors allowing us to define functions that will be applied on batches of data.

The `split_text` function will take the single string and split it into separate lines, removing empty lines and character names ending with ':' (eg. 'ROMEO:'). The `tokenize` function will take the lines and tokenize them using the 🤗 Tokenizer associated with the model, ensuring each entry has the same length (`block_size`) by padding and truncating. This is necessary for training.

```{note}
This preprocessing can be done in other ways. A common pattern is to tokenize first, and then split the obtained tokens into equally-sized blocks.
```

We will use the `splitter` and `tokenizer` Preprocessors below.

In [None]:
block_size = 512

In [None]:
from transformers import AutoTokenizer

from ray.data.preprocessors import BatchMapper


def split_text(batch: pd.DataFrame) -> pd.DataFrame:
    text = list(batch["text"])
    flat_text = "".join(text)
    split_text = [
        x.strip()
        for x in flat_text.split("\n")
        if x.strip() and not x.strip()[-1] == ":"
    ]
    return pd.DataFrame(split_text, columns=["text"])


def tokenize(batch: pd.DataFrame) -> dict:
    tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)
    tokenizer.pad_token = tokenizer.eos_token
    ret = tokenizer(
        list(batch["text"]),
        truncation=True,
        max_length=block_size,
        padding="max_length",
        return_tensors="np",
    )
    ret["labels"] = ret["input_ids"].copy()
    return dict(ret)


splitter = BatchMapper(split_text, batch_format="pandas")
tokenizer = BatchMapper(tokenize, batch_format="pandas")

### Fine-tuning the model with Ray AIR <a name="train"></a>

We can now configure Ray AIR's `ray.train.huggingface.TransformersTrainer` to perform distributed fine-tuning of the model. In order to do that, we specify a `trainer_init_per_worker` function, which creates a 🤗 Transformers `Trainer` that will be distributed by Ray using Distributed Data Parallelism (using PyTorch Distributed backend internally). This means that each worker will have its own copy of the model, but operate on different data, At the end of each step, all the workers will sync gradients.

Because GPT-J is a relatively large model, it may not be possible to fit it on smaller GPU types (<=16 GB GRAM). To deal with that issue, we can use [DeepSpeed](https://github.com/microsoft/DeepSpeed), a library to optimize the training process and allow us to (among other things) offload and partition optimizer and parameter states, reducing GRAM usage. Furthermore, DeepSpeed ZeRO Stage 3 allows us to load large models without running out of memory.

🤗 Transformers and Ray AIR's integration allow you to easily configure and use DDP and DeepSpeed. All you need to do is specify the DeepSpeed configuration in the [`TrainingArguments`](https://huggingface.co/docs/transformers/en/main_classes/trainer#transformers.TrainingArguments) object.

```{tip}
There are many DeepSpeed settings that allow you to trade-off speed for memory usage. The settings used below are tailored to the cluster setup used (16 g4dn.4xlarge nodes) and per device batch size of 16. Some things to keep in mind:
- If your GPUs support bfloat16, use that instead of float16 mixed precision to get better performance and prevent overflows. Replace `fp16=True` with `bf16=True` in `TrainingArguments`.
- If you are running out of GRAM: try reducing batch size (defined in the cell below the next one), set `"overlap_comm": False` in DeepSpeed config.
- If you are running out of RAM, add more nodes to your cluster, use nodes with more RAM, set `"pin_memory": False` in the DeepSpeed config, reduce the batch size, and remove `"offload_param"` from the DeepSpeed config.

For more information on DeepSpeed configuration, refer to [Hugging Face documentation](https://huggingface.co/docs/transformers/main_classes/deepspeed) and [DeepSpeed documentation](https://www.deepspeed.ai/docs/config-json/).

Additionally, if you prefer a lower-level API, the logic below can be expressed as an [Accelerate training loop](https://github.com/huggingface/accelerate/blob/main/examples/by_feature/deepspeed_with_config_support.py) distributed by a Ray AIR {class}`~ray.train.torch.torch_trainer.TorchTrainer`.
```

#### Training speed

As we are using data parallelism, each worker operates on its own shard of the data. The batch size set in `TrainingArguments` is the **per device batch size** (per worker batch size). By changing the number of workers, we can change the **effective batch size** and thus the time needed for training to complete. The effective batch size is then calculated as `per device batch size * number of workers * number of gradient accumulation steps`. As we add more workers, the effective batch size rises and thus we need less time to complete a full epoch. While the speedup is not exactly linear due to extra communication overheads, in many cases it can be close to linear.

The preprocessed dataset has 1348 examples. We have set per device batch size to 16.

* With 16 g4dn.4xlarge nodes, the effective batch size was 256, which equals to 85 steps per epoch. One epoch took **~2440 seconds** (including initialization time).

* With 32 g4dn.4xlarge nodes, the effective batch size was 512, which equals to 43 steps per epoch. One epoch took **~1280 seconds** (including initialization time).

In [None]:
from transformers import Trainer, TrainingArguments
from transformers import (
    GPTJForCausalLM,
    AutoTokenizer,
    default_data_collator,
)
from transformers.utils.logging import disable_progress_bar, enable_progress_bar
import torch

from ray.air import session


def trainer_init_per_worker(train_dataset, eval_dataset=None, **config):
    # Use the actual number of CPUs assigned by Ray
    os.environ["OMP_NUM_THREADS"] = str(
        session.get_trial_resources().bundles[-1].get("CPU", 1)
    )
    # Enable tf32 for better performance
    torch.backends.cuda.matmul.allow_tf32 = True

    batch_size = config.get("batch_size", 4)
    epochs = config.get("epochs", 2)
    warmup_steps = config.get("warmup_steps", 0)
    learning_rate = config.get("learning_rate", 0.00002)
    weight_decay = config.get("weight_decay", 0.01)

    deepspeed = {
        "fp16": {
            "enabled": "auto",
            "initial_scale_power": 8,
        },
        "bf16": {"enabled": "auto"},
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": "auto",
                "betas": "auto",
                "eps": "auto",
            },
        },
        "zero_optimization": {
            "stage": 3,
            "offload_optimizer": {
                "device": "cpu",
                "pin_memory": True,
            },
            "offload_param": {
                "device": "cpu",
                "pin_memory": True,
            },
            "overlap_comm": True,
            "contiguous_gradients": True,
            "reduce_bucket_size": "auto",
            "stage3_prefetch_bucket_size": "auto",
            "stage3_param_persistence_threshold": "auto",
            "gather_16bit_weights_on_model_save": True,
            "round_robin_gradients": True,
        },
        "gradient_accumulation_steps": "auto",
        "gradient_clipping": "auto",
        "steps_per_print": 10,
        "train_batch_size": "auto",
        "train_micro_batch_size_per_gpu": "auto",
        "wall_clock_breakdown": False,
    }

    print("Preparing training arguments")
    training_args = TrainingArguments(
        "output",
        per_device_train_batch_size=batch_size,
        logging_steps=1,
        save_strategy="no",
        per_device_eval_batch_size=batch_size,
        learning_rate=learning_rate,
        weight_decay=weight_decay,
        warmup_steps=warmup_steps,
        label_names=["input_ids", "attention_mask"],
        num_train_epochs=epochs,
        push_to_hub=False,
        disable_tqdm=True,  # declutter the output a little
        fp16=True,
        gradient_checkpointing=True,
        deepspeed=deepspeed,
    )
    disable_progress_bar()

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token

    print("Loading model")

    model = GPTJForCausalLM.from_pretrained(model_name, use_cache=False)
    model.resize_token_embeddings(len(tokenizer))

    print("Model loaded")

    enable_progress_bar()

    metric = evaluate.load("accuracy")

    def compute_metrics(eval_pred):
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        return metric.compute(predictions=predictions, references=labels)

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        compute_metrics=compute_metrics,
        tokenizer=tokenizer,
        data_collator=default_data_collator,
    )
    return trainer

With our `trainer_init_per_worker` complete, we can now instantiate the {class}`~ray.train.huggingface.TransformersTrainer`. Aside from the function, we set the `scaling_config`, controlling the amount of workers and resources used, and the `datasets` we will use for training and evaluation.

We pass the preprocessors we have defined earlier as an argument, wrapped in a {class}`~ray.data.preprocessors.chain.Chain`. The preprocessor will be included with the returned {class}`~ray.air.checkpoint.Checkpoint`, meaning it will also be applied during inference.

```{note}
If you want to upload checkpoints to cloud storage (eg. S3), set {class}`air.RunConfig(storage_path) <ray.air.RunConfig>`. See {ref}`train-run-config` for an example. Using cloud storage is highly recommended, especially for production.
```

In [None]:
from ray.train.huggingface import TransformersTrainer
from ray.air.config import ScalingConfig
from ray.data.preprocessors import Chain


trainer = TransformersTrainer(
    trainer_init_per_worker=trainer_init_per_worker,
    trainer_init_config={
        "batch_size": 16,  # per device
        "epochs": 1,
    },
    scaling_config=ScalingConfig(
        num_workers=num_workers,
        use_gpu=use_gpu,
        resources_per_worker={"GPU": 1, "CPU": cpus_per_worker},
    ),
    datasets={"train": ray_datasets["train"], "evaluation": ray_datasets["validation"]},
    preprocessor=Chain(splitter, tokenizer),
)

Finally, we call the {meth}`~ray.train.huggingface.TransformersTrainer.fit` method to start training with Ray AIR. We will save the {class}`~ray.air.Result` object to a variable so we can access metrics and checkpoints.

In [12]:
results = trainer.fit()

You can use the returned {class}`~ray.air.Result` object to access metrics and the Ray AIR {class}`~ray.air.checkpoint.Checkpoint` associated with the last iteration.

In [None]:
checkpoint = results.checkpoint
checkpoint

### Generate text from prompt

We can use the {class}`~ray.train.huggingface.huggingface_predictor.TransformersPredictor` to generate predictions from our fine-tuned model.

```{tip}
For large scale batch inference, consider configuring cloud checkpointing and then pass the cloud-backed {class}`~ray.air.checkpoint.Checkpoint` to {class}`~ray.train.batch_predictor.BatchPredictor`. More information [here](air-predictors).
```

Because the {class}`~ray.train.huggingface.huggingface_predictor.TransformersPredictor` uses a 🤗 Transformers [`pipeline`](https://huggingface.co/docs/transformers/en/main_classes/pipelines) under the hood, we disable the tokenizer AIR Preprocessor we have used for training and let the `pipeline` to tokenize the data itself.

In [None]:
checkpoint.set_preprocessor(None)

We also set `device_map="auto"` so that the model is automatically placed on the right device and set the `task` to `"text-generation"`. The `predict` method passes the arguments to a 🤗 Transformers `pipeline` call.

In [None]:
from ray.train.huggingface import TransformersPredictor
import pandas as pd

prompts = pd.DataFrame(["Romeo and Juliet", "Romeo", "Juliet"], columns=["text"])

# Predict on the head node.
predictor = TransformersPredictor.from_checkpoint(
    checkpoint=checkpoint,
    task="text-generation",
    torch_dtype=torch.float16 if use_gpu else None,
    device_map="auto",
    use_gpu=use_gpu,
)
prediction = predictor.predict(
    prompts,
    do_sample=True,
    temperature=0.9,
    min_length=32,
    max_length=128,
)

In [None]:
prediction

### Submit as an Anyscale Job

Anyscale Jobs execute Ray applications, like a training or fine-tuning, and provide out of the box:

1. Automated failure handling
2. Automated email alerting
3. Record and persist outputs such as logs

You may wish to update the script or configurations prior to submitting a production job, however there is no requirement for doing so.  Read more about compute configurations and Jobs here

In [None]:
!anyscale job submit -- python gptj_deepspeed_fine_tuning.py