# GPT-J-6B Fine-Tuning with Ray Train and DeepSpeed on Amazon EKS

This example showcases how to use Ray Train for GPT-J fine-tuning. GPT-J is a GPT-2-like causal language model trained on the Pile dataset. This particular model has 6 billion parameters. For more information, see GPT-J.

This example uses the Ray Train 🤗 Transformers integration and a pre-trained model from the Hugging Face Hub. Note that this example is adaptable to other similar models.

In [None]:
 # Installing core components
! pip install -U "ray[air]" "boto3" "ray"
! pip install -U protobuf==3.19.6 xgboost==1.3.3 xgboost-ray==0.1.15 pandas==1.5.3
! pip install "datasets" "evaluate" "accelerate==0.20.3" "transformers>=4.26.0" "torch>=1.12.0" "deepspeed==0.8.3"
! pip install pandas --upgrade

## Import required libraries

In [None]:
import numpy as np
import pandas as pd
import os
import ray
from datasets import load_dataset
import ray.data
from transformers import AutoTokenizer
from ray.data.preprocessors import BatchMapper
import evaluate
from transformers import Trainer, TrainingArguments
from transformers import (
    GPTJForCausalLM,
    AutoTokenizer,
    default_data_collator,
)
from transformers.utils.logging import disable_progress_bar, enable_progress_bar
import torch
from ray.air import session

## Global variables definition for training script

In [None]:
# GLOBAL VARIABLES DEFINITION
model_name = "EleutherAI/gpt-j-6B"
bucket = "fm-ops-datasets"
use_gpu = True
num_workers = 16
cpus_per_worker = 8
# Because the dataset is represented by a single large string, we will need to do some preprocessing
block_size = 512
# Since this example runs with multiple nodes, we need to persist checkpoints and other outputs to some external storage for access after training has completed.
storage_path=f"s3://{bucket}/checkpoints/"

## Connecting to Ray Cluster deployed in Amazon EKS

Note that we are using the internal cluster DNS powered by CoreDNS

In [None]:
# Connecting Ray with the cluster
ray.shutdown()
ray.init(
    address="ray://ray-cluster-train-kuberay-head-svc.ray-cluster-train.svc.cluster.local:10001",
    runtime_env={
        "pip": [
            "datasets",
            "evaluate",
            # Latest combination of accelerate==0.19.0 and transformers==4.29.0
            # seems to have issues with DeepSpeed process group initialization,
            # and will result in a batch_size validation problem.
            # TODO(jungong) : get rid of the pins once the issue is fixed.
            "accelerate==0.20.3",
            "transformers==4.26.0",
            "torch>=1.12.0",
        ]
    }
)


# Importing Dataset

We will be fine-tuning the model on the tiny_shakespeare dataset, comprised of 40,000 lines of Shakespeare from a variety of Shakespeare’s plays. The aim will be to make the GPT-J model better at generating text in the style of Shakespeare.

In [None]:
# Loading the Dataset
print("Loading tiny_shakespeare dataset")
current_dataset = load_dataset("tiny_shakespeare")

# Convert the dataset to a pandas DataFrame
df = pd.DataFrame(current_dataset["train"])

# Display the first few rows of the DataFrame
print(df.head(10))

## Pre processing dataset

Note that the dataset is represented by a single line of large string, and needs some preprocessing. To do this, use the map_batches() API to apply transformation functions to batches of data.

In [None]:
# Using Ray Data for distributed preporcessing data ingestion
ray_datasets = ray.data.from_huggingface(current_dataset)
ray_datasets

In [None]:
def split_text(batch: pd.DataFrame) -> pd.DataFrame:
    text = list(batch["text"])
    flat_text = "".join(text)
    split_text = [
        x.strip()
        for x in flat_text.split("\n")
        if x.strip() and not x.strip()[-1] == ":"
    ]
    return pd.DataFrame(split_text, columns=["text"])

def tokenize(batch: pd.DataFrame) -> dict:
    tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)
    tokenizer.pad_token = tokenizer.eos_token
    ret = tokenizer(
        list(batch["text"]),
        truncation=True,
        max_length=block_size,
        padding="max_length",
        return_tensors="np",
    )
    ret["labels"] = ret["input_ids"].copy()
    return dict(ret)

splitter = BatchMapper(split_text, batch_format="pandas")
tokenizer = BatchMapper(tokenize, batch_format="pandas")

## Fine-tuning the model with Ray Train

Configure Ray Train’s TorchTrainer to perform distributed fine-tuning of the model. Specify a train_loop_per_worker function, which defines the training logic to be distributed by Ray using Distributed Data Parallelism, which uses the PyTorch Distributed backend internally. Each worker has its own copy of the model, but operates on different data. At the end of each step, all the workers sync gradients.

In [None]:
def trainer_init_per_worker(train_dataset, eval_dataset=None, **config):
    # Use the actual number of CPUs assigned by Ray
    os.environ["OMP_NUM_THREADS"] = str(
        session.get_trial_resources().bundles[-1].get("CPU", 1)
    )
    # Enable tf32 for better performance
    torch.backends.cuda.matmul.allow_tf32 = True

    batch_size = config.get("batch_size", 4)
    epochs = config.get("epochs", 2)
    warmup_steps = config.get("warmup_steps", 0)
    learning_rate = config.get("learning_rate", 0.00002)
    weight_decay = config.get("weight_decay", 0.01)

    deepspeed = {
        "fp16": {
            "enabled": "auto",
            "initial_scale_power": 8,
        },
        "bf16": {"enabled": "auto"},
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": "auto",
                "betas": "auto",
                "eps": "auto",
            },
        },
        "zero_optimization": {
            "stage": 3,
            "offload_optimizer": {
                "device": "cpu",
                "pin_memory": True,
            },
            "offload_param": {
                "device": "cpu",
                "pin_memory": True,
            },
            "overlap_comm": True,
            "contiguous_gradients": True,
            "reduce_bucket_size": "auto",
            "stage3_prefetch_bucket_size": "auto",
            "stage3_param_persistence_threshold": "auto",
            "gather_16bit_weights_on_model_save": True,
            "round_robin_gradients": True,
        },
        "gradient_accumulation_steps": "auto",
        "gradient_clipping": "auto",
        "steps_per_print": 10,
        "train_batch_size": "auto",
        "train_micro_batch_size_per_gpu": "auto",
        "wall_clock_breakdown": False,
    }

    print("Preparing training arguments")
    training_args = TrainingArguments(
        "output",
        per_device_train_batch_size=batch_size,
        logging_steps=1,
        save_strategy="no",
        per_device_eval_batch_size=batch_size,
        learning_rate=learning_rate,
        weight_decay=weight_decay,
        warmup_steps=warmup_steps,
        label_names=["input_ids", "attention_mask"],
        num_train_epochs=epochs,
        push_to_hub=False,
        disable_tqdm=True,  # declutter the output a little
        fp16=True,
        gradient_checkpointing=True,
        deepspeed=deepspeed,
    )
    disable_progress_bar()

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token

    print("Loading model")

    model = GPTJForCausalLM.from_pretrained(model_name, use_cache=False)
    model.resize_token_embeddings(len(tokenizer))

    print("Model loaded")

    enable_progress_bar()

    metric = evaluate.load("accuracy")

    def compute_metrics(eval_pred):
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        return metric.compute(predictions=predictions, references=labels)

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        compute_metrics=compute_metrics,
        tokenizer=tokenizer,
        data_collator=default_data_collator,
    )
    return trainer

## Training Speed

The preprocessed dataset has 1348 examples, and we have set the per-device batch size to 16.

- With 16 g4dn.4xlarge nodes, the effective batch size was 256, which equals 85 steps per epoch. One epoch took ~2440 seconds (including initialization time).

- With 32 g4dn.4xlarge nodes, the effective batch size was 512, which equals 43 steps per epoch. One epoch took ~1280 seconds (including initialization time).

- With 4 g5.4xlarge nodes, the effective batch size was 64 (16 per device * 4 nodes). This results in approximately 21 steps per epoch (1348 examples / 64). The total running time was 3hr 53min 11s (or 13978 seconds), including initialization time.

### Detailed Results

Training finished at 2023-09-05 16:18:36. Total running time: 3hr 53min 11s (or 13978 seconds).

```
╭────────────────────────────────────╮
│ Training result                    │
├────────────────────────────────────┤
│ time_this_iter_s           272.831 │
│ time_total_s                 13978 │
│ training_iteration             338 │
│ epoch                            1 │
│ learning_rate                    0 │
│ loss                        0.0665 │
│ step                           338 │
│ train_loss                 0.13638 │
│ train_runtime              13424.2 │
│ train_samples_per_second     0.403 │
│ train_steps_per_second       0.025 │
╰────────────────────────────────────╯
```

With 4 g5.4xlarge nodes, the model completed one epoch in approximately 13978 seconds. Note that the number of steps (338) reported does not align perfectly with the calculated 21 steps per epoch based on batch size, which suggests that gradient accumulation or other settings might be in effect.


## Instantiate TorchTrainer

After defining the training function, instantiate the TorchTrainer. Aside from the function, set the scaling_config to control the number of workers and amount of resources to use, and datasets(the preprocessed Ray Datasets) to use for training and evaluation.

In [None]:
from ray.train.huggingface import TransformersTrainer
from ray.air import RunConfig, ScalingConfig
from ray.data.preprocessors import Chain

trainer = TransformersTrainer(
    trainer_init_per_worker=trainer_init_per_worker,
    trainer_init_config={
        "batch_size": 16,  # per device
        "epochs": 1,
    },
    scaling_config=ScalingConfig(
        num_workers=num_workers,
        use_gpu=use_gpu,
        resources_per_worker={"GPU": 1, "CPU": cpus_per_worker},
    ),
    datasets={"train": ray_datasets["train"], "evaluation": ray_datasets["validation"]},
    preprocessor=Chain(splitter, tokenizer),
    run_config=RunConfig(storage_path=storage_path),
)

Finally, call the fit() method to start training with Ray Train. Save the Result object to a variable to access metrics and checkpoint

In [None]:
# This train will only kickstart the training in the Ray cluster, but we are not gonna use that
results = trainer.fit()

## Close connection to Ray cluster

In [None]:
ray.shutdown()

# Ray's Job Submission API to submit a training job to a Ray cluster.

In your code snippet, you're leveraging Ray's Job Submission API to remotely execute a training job on a Ray cluster. You initialize a JobSubmissionClient by connecting to the head node of the Ray cluster, then define a shell command (ray_training) that carries out several tasks: it removes any existing folder named fm-ops-eks, clones a specific git repository, grants executable permissions to a Python script (train_gptj.py), and then runs the script for training. Finally, you submit this shell command as a job to the Ray cluster using the submit_job method of the JobSubmissionClient.

The reason for using Ray's Job Submission API instead of trainer.fit() directly in a Jupyter Notebook is that the latter doesn't allow you to see the logs directly within the notebook interface. Using the Job Submission API gives you more control over job monitoring and log inspection, which is especially useful for debugging and real-time monitoring of training progress.

In [None]:
# Ray JOB Submission script
from ray.job_submission import JobSubmissionClient

ray_client = JobSubmissionClient("http://ray-cluster-train-kuberay-head-svc.ray-cluster-train.svc.cluster.local:8265")
ray_training = (
    "rm -rf fm-ops-eks && git clone https://github.com/lusoal/fm-ops-eks || true;"
    "chmod +x fm-ops-eks/scripts/train_gptj.py && python fm-ops-eks/scripts/train_gptj.py"
)
submission_id = ray_client.submit_job(entrypoint=ray_training)


In [None]:
# Stop and deleting job that we used for testing
ray_client.stop_job(submission_id)
ray_client.delete_job(submission_id)

In [None]:
import torch

In [None]:
torch.cuda.is_available()

In [None]:
! nvcc --version

In [None]:
! nvidia-smi

In [None]:
import os
conda_env_path = os.environ.get('CONDA_PREFIX')
print(f'Conda environment path: {conda_env_path}')

In [None]:
! conda list | grep -i cuda

In [None]:
! ls /usr/local/

In [None]:
! find / -iname '*cuda*'

In [None]:
! python --version

In [None]:
import torch
print("CUDA Available:", torch.cuda.is_available())
print("CUDA Devices count:",torch.cuda.device_count())
print("CUDA Version: ", torch.version.cuda)