(dolly_v2_deepspeed_instruction_finetune)=

# Dolly-V2-3B Instruction Fine-Tuning wiht Ray AIR and DeepSpeed

In this demonstration, we'll show how to use the Ray AIR for Dolly V2 3B model instruction fine-tuning using the deep-speed framework. Please uncomment the next two cells and install the following libraries dependencies.

This work builds upon [existing efforts](https://github.com/ray-project/ray/blob/master/doc/source/ray-air/examples/gptj_deepspeed_fine_tuning.ipynb) by incorporating an instruction fine-tuning component.

In [4]:
!conda install -c conda-forge mpi4py -y
!conda install gcc gxx_linux-64 -y

done
Solving environment: - 
  - conda-forge/linux-64::mpi-1.0-openmpi, conda-forge/linux-64::openmpi-4.0.4-hdf1f1ad_0, defaults/linux-64::mpi4py-3.1.4-py310h3e5f7c9_0
  - conda-forge/linux-64::mpi-1.0-mpich, defaults/linux-64::mpi4py-3.1.4-py310hfc96bbd_0, defaults/linux-64::mpich-3.3.2-hc856adbdone


  current version: 23.3.1
  latest version: 23.7.4

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=23.7.4



## Package Plan ##

  environment location: /opt/conda

  added / updated specs:
    - mpi4py


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    ca-certificates-2023.7.22  |       hbcca054_0         146 KB  conda-forge
    certifi-2023.7.22          |     pyhd8ed1ab_0         150 KB  conda-forge
    conda-23.7.4               |  py310hff52083_0        1006 KB

In [1]:
! pip install "ray==2.5.1" "accelerate==0.16.0" "datasets==2.12.0" "transformers==4.26.0"  "torch==1.13.0" "deepspeed==0.9.2"

Collecting ray==2.5.1
  Downloading ray-2.5.1-cp310-cp310-manylinux2014_x86_64.whl (56.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.2/56.2 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hCollecting accelerate==0.16.0
  Downloading accelerate-0.16.0-py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting datasets==2.12.0
  Downloading datasets-2.12.0-py3-none-any.whl (474 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m474.6/474.6 kB[0m [31m30.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting transformers==4.26.0
  Downloading transformers-4.26.0-py3-none-any.whl (6.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.3/6.3 MB[0m [31m63.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting torch==1.13.0
  Downloading torch-1.13.0-cp310-cp310-manylinux1_x86_64.whl (890.1 MB)
[2K 

In [6]:
!pip install evaluate

Collecting evaluate
  Downloading evaluate-0.4.0-py3-none-any.whl (81 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m81.4/81.4 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: evaluate
Successfully installed evaluate-0.4.0
[0m

In [1]:
import numpy as np
import pandas as pd
import os
from transformers import AutoTokenizer
import ray.data
import ray
from datasets import load_dataset
import evaluate
from transformers import Trainer, TrainingArguments
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
)
from transformers.utils.logging import enable_progress_bar
import torch

import transformers
from ray.train.huggingface import TransformersTrainer
from ray.air.config import ScalingConfig

## Set up Ray <a name="setup"></a>

First, we will use 2 workers, each being assigned 1 GPU and 28 CPUs.

In [2]:
model_name = "databricks/dolly-v2-3b"
use_gpu = True
num_workers = 1
cpus_per_worker = 12

In [3]:
#fq_ray_ip = #<replace-this-with-your-ray-server-ip-address>

In [4]:
pip_env = {
    "pip": [
        "datasets==2.12.0",
        "evaluate==0.4.0",
        "accelerate==0.16.0",  # https://github.com/OpenGVLab/InternImage/issues/111
        "transformers==4.26.0",
        "torch==1.13.0",
        "deepspeed==0.9.2",
        "ipython==8.14.0",
    ]
}

In [5]:
conda_env = {
    "conda": {
        "dependencies": ["mpi4py", "pip", pip_env]
    }  # pip install mpi4py won't work, use conda install instead
}

In [6]:
# ray.init(
#     f"ray://{fq_ray_ip}:10001",  # Note: the port and ip-address depends on your ray server setup.
#     runtime_env=conda_env,
# )

## Loading the dataset <a name="load"></a>

We will be fine-tuning the model on the [`alpaca-cleaned` dataset](https://datasets-server.huggingface.co/splits?dataset=yahma%2Falpaca-cleaned), comprised of 51,000 lines of Q&A. The aim will be to make the databricks model better at generating answer by following the instruction.

We will use `generate_prompt` function to prepare our dataset for instruction fine-tuning.

In [7]:
current_dataset = load_dataset("yahma/alpaca-cleaned")
current_dataset

Found cached dataset json (/root/.cache/huggingface/datasets/yahma___json/yahma--alpaca-cleaned-5d24553f76c14acc/0.0.0/e347ab1c932092252e717ff3f949105a4dd28b27e842dd53157d2f72e276c2e4)


  0%|          | 0/1 [00:00<?, ?it/s]

DatasetDict({
    train: Dataset({
        features: ['instruction', 'output', 'input'],
        num_rows: 51760
    })
})

In [8]:
def generate_prompt(data_point):
    # ref: https://github.com/tloen/alpaca-lora
    if data_point["instruction"]:
        return f"""Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{data_point["instruction"]}

### Input:
{data_point["input"]}

### Response:
{data_point["output"]}"""
    else:
        return f"""Below is an instruction that describes a task. Write a response that appropriately completes the request.

### Instruction:
{data_point["instruction"]}

### Response:
{data_point["output"]}"""

In [9]:
tokenizer = AutoTokenizer.from_pretrained(model_name, padding_side="left")
tokenizer.pad_token_id = 0
CUTOFF_LEN = 128

current_dataset = current_dataset.shuffle().map(
    lambda data_point: tokenizer(
        generate_prompt(data_point),
        truncation=True,
        max_length=CUTOFF_LEN,
        padding="max_length",
    ),
)

Map:   0%|          | 0/51760 [00:00<?, ? examples/s]

In [10]:
ray_datasets = ray.data.from_huggingface(current_dataset["train"])
ray_datasets

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


2023-09-21 19:37:15,232	INFO worker.py:1636 -- Started a local Ray instance.


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)



Learn more here: https://docs.ray.io/en/master/data/faq.html#migrating-to-strict-mode[0m


MaterializedDataset(
   num_blocks=1,
   num_rows=51760,
   schema={
      instruction: string,
      output: string,
      input: string,
      input_ids: list<item: int32>,
      attention_mask: list<item: int8>
   }
)

## Instruction fine-tuning the model with Ray AIR

In [11]:
def trainer_init_per_worker(train_dataset, eval_dataset=None, **config):
    batch_size = config.get("batch_size", 1)
    epochs = config.get("epochs", 1)
    warmup_steps = config.get("warmup_steps", 0)
    learning_rate = config.get("learning_rate", 0.00002)
    weight_decay = config.get("weight_decay", 0.01)

    deepspeed = {
        "fp16": {
            "enabled": "auto",
            "initial_scale_power": 8,
        },
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": "auto",
                "betas": "auto",
                "eps": "auto",
            },
        },
        "zero_optimization": {
            "stage": 3,
            "offload_optimizer": {
                "device": "cpu",
                "pin_memory": True,
            },
            "offload_param": {
                "device": "cpu",
                "pin_memory": True,
            },
            "overlap_comm": True,
            "contiguous_gradients": True,
            "reduce_bucket_size": "auto",
            "stage3_prefetch_bucket_size": "auto",
            "stage3_param_persistence_threshold": "auto",
            "gather_16bit_weights_on_model_save": True,
            "round_robin_gradients": True,
        },
        "gradient_accumulation_steps": "auto",
        "gradient_clipping": "auto",
        "steps_per_print": 10,
        "train_batch_size": "auto",
        "train_micro_batch_size_per_gpu": "auto",
        "wall_clock_breakdown": False,
    }

    print(f"batch_size: {batch_size}")
    print("Preparing training arguments")
    training_args = TrainingArguments(
        output_dir="deepspeed-dolly",
        per_device_train_batch_size=batch_size,
        logging_steps=1,
        learning_rate=learning_rate,
        weight_decay=weight_decay,
        warmup_steps=warmup_steps,
        num_train_epochs=epochs,
        push_to_hub=False,
        disable_tqdm=False,
        fp16=True,
        gradient_accumulation_steps=16,
        deepspeed=deepspeed,
    )

    tokenizer = AutoTokenizer.from_pretrained(model_name, padding_side="left")
    tokenizer.pad_token_id = 0

    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.float16,
    )

    enable_progress_bar()

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        tokenizer=tokenizer,
        data_collator=transformers.DataCollatorForLanguageModeling(
            tokenizer, mlm=False
        ),
    )
    return trainer

In [12]:
trainer = TransformersTrainer(
    trainer_init_per_worker=trainer_init_per_worker,
    trainer_init_config={
        "batch_size": 16,  # batch_size per device
        "epochs": 1,
    },
    scaling_config=ScalingConfig(
        num_workers=num_workers,
        use_gpu=use_gpu,
        resources_per_worker={
            "GPU": 1,
            "CPU": cpus_per_worker,
        },  # NOTE: huggingface transformers only support 1 GPU per worker.
    ),
    run_config=ray.air.RunConfig(
        sync_config=ray.tune.syncer.SyncConfig(
            sync_on_checkpoint=False  # Note: one can also set up a storage path to persist the model checkpoint to a cloud bucket
        )
    ),
    datasets={
        "train": ray_datasets,
    },
)

Finally, we call the `~ray.train.huggingface.TransformersTrainer.fit` method to start training with Ray AIR.

In [13]:
results = trainer.fit()

  tuner = Tuner(
2023-09-21 19:37:50,755	INFO tensorboardx.py:178 -- pip install "ray[tune]" to see TensorBoard files.


0,1
Current time:,2023-09-21 19:39:47
Running for:,00:01:56.94
Memory:,20.8/503.6 GiB

Trial name,# failures,error file
TransformersTrainer_59b78_00000,1,/root/ray_results/TransformersTrainer_2023-09-21_19-37-50/TransformersTrainer_59b78_00000_0_2023-09-21_19-37-50/error.txt

Trial name,status,loc
TransformersTrainer_59b78_00000,ERROR,172.17.0.4:29764


[2m[36m(TransformersTrainer pid=29764)[0m 
[2m[36m(TransformersTrainer pid=29764)[0m Learn more here: https://docs.ray.io/en/master/data/faq.html#migrating-to-strict-mode[0m
[2m[36m(TransformersTrainer pid=29764)[0m 2023-09-21 19:37:57,475	INFO backend_executor.py:137 -- Starting distributed worker processes: ['29899 (172.17.0.4)']
[2m[36m(RayTrainWorker pid=29899)[0m 2023-09-21 19:37:58,175	INFO config.py:86 -- Setting up process group for: env:// [rank=0, world_size=1]


(pid=29899) - RandomizeBlockOrder 1:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=29899) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(RayTrainWorker pid=29899)[0m 2023-09-21 19:38:01,361	INFO streaming_executor.py:91 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[RandomizeBlockOrder]
[2m[36m(RayTrainWorker pid=29899)[0m 2023-09-21 19:38:01,361	INFO streaming_executor.py:92 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
[2m[36m(RayTrainWorker pid=29899)[0m 2023-09-21 19:38:01,361	INFO streaming_executor.py:94 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[2m[36m(RayTrainWorker pid=29899)[0m 2023-09-21 19:38:01,372	INFO streaming_executor.py:149 -- Shutting down <StreamingExecutor(Thread-3, stopped daemon 139699959088896)>.


[2m[36m(RayTrainWorker pid=29899)[0m batch_size: 16
[2m[36m(RayTrainWorker pid=29899)[0m Preparing training arguments


Downloading (…)lve/main/config.json: 100%|██████████| 819/819 [00:00<00:00, 2.48MB/s]
Downloading pytorch_model.bin:   0%|          | 0.00/5.68G [00:00<?, ?B/s]
Downloading pytorch_model.bin:   0%|          | 10.5M/5.68G [00:00<01:53, 50.1MB/s]
Downloading pytorch_model.bin:   0%|          | 21.0M/5.68G [00:00<01:18, 72.0MB/s]
Downloading pytorch_model.bin:   1%|          | 31.5M/5.68G [00:00<01:09, 81.6MB/s]
Downloading pytorch_model.bin:   1%|          | 41.9M/5.68G [00:00<01:06, 84.3MB/s]
Downloading pytorch_model.bin:   1%|          | 52.4M/5.68G [00:00<01:05, 85.9MB/s]
Downloading pytorch_model.bin:   1%|▏         | 73.4M/5.68G [00:00<00:57, 97.4MB/s]
Downloading pytorch_model.bin:   1%|▏         | 83.9M/5.68G [00:00<00:58, 96.2MB/s]
Downloading pytorch_model.bin:   2%|▏         | 94.4M/5.68G [00:01<00:59, 94.1MB/s]
Downloading pytorch_model.bin:   2%|▏         | 105M/5.68G [00:01<01:00, 91.8MB/s] 
Downloading pytorch_model.bin:   2%|▏         | 115M/5.68G [00:01<01:11, 78.3MB/s]


[2m[36m(RayTrainWorker pid=29899)[0m [2023-09-21 19:39:16,162] [INFO] [partition_parameters.py:454:__exit__] finished initializing model with 2.78B parameters
[2m[36m(RayTrainWorker pid=29899)[0m [2023-09-21 19:39:20,301] [INFO] [logging.py:96:log_dist] [Rank 0] DeepSpeed info: version=0.9.2, git-hash=unknown, git-branch=unknown
[2m[36m(RayTrainWorker pid=29899)[0m [2023-09-21 19:39:20,314] [INFO] [logging.py:96:log_dist] [Rank 0] DeepSpeed Flops Profiler Enabled: False


[2m[36m(RayTrainWorker pid=29899)[0m Using cuda_amp half precision backend


[2m[36m(RayTrainWorker pid=29899)[0m [1/3] /usr/local/cuda/bin/nvcc  -DTORCH_EXTENSION_NAME=cpu_adam -DTORCH_API_INCLUDE_EXTENSION_H -DPYBIND11_COMPILER_TYPE=\"_gcc\" -DPYBIND11_STDLIB=\"_libstdcpp\" -DPYBIND11_BUILD_ABI=\"_cxxabi1011\" -I/opt/conda/lib/python3.10/site-packages/deepspeed/ops/csrc/includes -I/usr/local/cuda/include -isystem /opt/conda/lib/python3.10/site-packages/torch/include -isystem /opt/conda/lib/python3.10/site-packages/torch/include/torch/csrc/api/include -isystem /opt/conda/lib/python3.10/site-packages/torch/include/TH -isystem /opt/conda/lib/python3.10/site-packages/torch/include/THC -isystem /usr/local/cuda/include -isystem /opt/conda/include/python3.10 -D_GLIBCXX_USE_CXX11_ABI=0 -D__CUDA_NO_HALF_OPERATORS__ -D__CUDA_NO_HALF_CONVERSIONS__ -D__CUDA_NO_BFLOAT16_CONVERSIONS__ -D__CUDA_NO_HALF2_OPERATORS__ --expt-relaxed-constexpr -gencode=arch=compute_86,code=compute_86 -gencode=arch=compute_86,code=sm_86 --compiler-options '-fPIC' -O3 --use_fast_math -std=c++1

[2m[36m(RayTrainWorker pid=29899)[0m Using /root/.cache/torch_extensions/py310_cu117 as PyTorch extensions root...
[2m[36m(RayTrainWorker pid=29899)[0m Creating extension directory /root/.cache/torch_extensions/py310_cu117/cpu_adam...
[2m[36m(RayTrainWorker pid=29899)[0m Detected CUDA files, patching ldflags
[2m[36m(RayTrainWorker pid=29899)[0m Emitting ninja build file /root/.cache/torch_extensions/py310_cu117/cpu_adam/build.ninja...
[2m[36m(RayTrainWorker pid=29899)[0m Building extension module cpu_adam...
[2m[36m(RayTrainWorker pid=29899)[0m Allowing ninja to set a default number of workers... (overridable by setting the environment variable MAX_JOBS=N)
2023-09-21 19:39:47,693	ERROR tune_controller.py:873 -- Trial task failed for trial TransformersTrainer_59b78_00000
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/ray/air/execution/_internal/event_manager.py", line 110, in resolve_future
    result = ray.get(future)
  File "/opt/c

[2m[36m(RayTrainWorker pid=29899)[0m [2/3] c++ -MMD -MF cpu_adam.o.d -DTORCH_EXTENSION_NAME=cpu_adam -DTORCH_API_INCLUDE_EXTENSION_H -DPYBIND11_COMPILER_TYPE=\"_gcc\" -DPYBIND11_STDLIB=\"_libstdcpp\" -DPYBIND11_BUILD_ABI=\"_cxxabi1011\" -I/opt/conda/lib/python3.10/site-packages/deepspeed/ops/csrc/includes -I/usr/local/cuda/include -isystem /opt/conda/lib/python3.10/site-packages/torch/include -isystem /opt/conda/lib/python3.10/site-packages/torch/include/torch/csrc/api/include -isystem /opt/conda/lib/python3.10/site-packages/torch/include/TH -isystem /opt/conda/lib/python3.10/site-packages/torch/include/THC -isystem /usr/local/cuda/include -isystem /opt/conda/include/python3.10 -D_GLIBCXX_USE_CXX11_ABI=0 -fPIC -std=c++14 -O3 -std=c++14 -g -Wno-reorder -L/usr/local/cuda/lib64 -lcudart -lcublas -g -march=native -fopenmp -D__AVX256__ -D__ENABLE_CUDA__ -c /opt/conda/lib/python3.10/site-packages/deepspeed/ops/csrc/adam/cpu_adam.cpp -o cpu_adam.o 
[2m[36m(RayTrainWorker pid=29899)[0m n

Trial name,date,hostname,node_ip,pid,timestamp,trial_id
TransformersTrainer_59b78_00000,2023-09-21_19-37-56,56a026cbdb4c,172.17.0.4,29764,1695325076,59b78_00000


2023-09-21 19:39:47,705	ERROR tune.py:1107 -- Trials did not complete: [TransformersTrainer_59b78_00000]
2023-09-21 19:39:47,706	INFO tune.py:1111 -- Total run time: 116.95 seconds (116.94 seconds for the tuning loop).
- /root/ray_results/TransformersTrainer_2023-09-21_19-37-50/TransformersTrainer_59b78_00000_0_2023-09-21_19-37-50


TrainingFailedError: The Ray Train run failed. Please inspect the previous error messages for a cause. After fixing the issue (assuming that the error is not caused by your own application logic, but rather an error such as OOM), you can restart the run from scratch or continue this run.
To continue this run, you can use: `trainer = TransformersTrainer.restore("/root/ray_results/TransformersTrainer_2023-09-21_19-37-50")`.
To start a new run that will retry on training failures, set `air.RunConfig(failure_config=air.FailureConfig(max_failures))` in the Trainer's `run_config` with `max_failures > 0`, or `max_failures = -1` for unlimited retries.