# Ray End-to-End NLP Example

**GOAL:** In this example, we will go through how to use Ray to implement an end-to-end NLP example. Specifically, we will go through:

- How to use RaySGD to scale the training of HuggingFace Transformer library.
- How to serve the trained model with Ray Serve

First we install some dependencies:

In [None]:
# !pip install uvicorn
# !pip install blist

And also import the libraries needed for the example:

In [1]:
import os
import time
import math
import random
import argparse
import json
from filelock import FileLock
import numpy as np

import ray
from ray import serve
from ray.util.sgd.torch import TrainingOperator
from ray.util.sgd import TorchTrainer

import requests

import torch
import torch.distributed as dist
from torch.utils.data import (DataLoader, RandomSampler, 
                              SequentialSampler, TensorDataset)
from torch.utils.tensorboard import SummaryWriter

from transformers import (
    AdamW,
    GPT2LMHeadModel, 
    GPT2Tokenizer,
    CONFIG_MAPPING,
    MODEL_WITH_LM_HEAD_MAPPING,
    AutoConfig,
    AutoModelWithLMHead,
    AutoTokenizer,
    DataCollatorForLanguageModeling,
    HfArgumentParser,
    LineByLineTextDataset,
    PreTrainedTokenizer,
    TextDataset,
    Trainer,
    TrainingArguments,
    get_linear_schedule_with_warmup,
)

try:
    from apex import amp
except ImportError:
    amp = None

We also initialize Ray to use RaySGD and Ray Serve later.

In [2]:
ray.init(address="auto")

{'node_ip_address': '172.31.30.37',
 'raylet_ip_address': '172.31.30.37',
 'redis_address': '172.31.30.37:6379',
 'object_store_address': '/tmp/ray/session_2020-06-05_22-20-41_529898_32512/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-06-05_22-20-41_529898_32512/sockets/raylet',
 'webui_url': 'localhost:8265',
 'session_dir': '/tmp/ray/session_2020-06-05_22-20-41_529898_32512'}

## Dataset

Download the dataset. Here we use the wikitext-2 dataset as a demonstrative example. Any text datasets are feasible for this example.

In [3]:
!wget https://s3.amazonaws.com/research.metamind.io/wikitext/wikitext-2-v1.zip
!unzip wikitext-2-v1.zip

--2020-06-05 22:32:11--  https://s3.amazonaws.com/research.metamind.io/wikitext/wikitext-2-v1.zip
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.40.150
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.40.150|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4475746 (4.3M) [application/zip]
Saving to: ‘wikitext-2-v1.zip.1’


2020-06-05 22:32:12 (6.45 MB/s) - ‘wikitext-2-v1.zip.1’ saved [4475746/4475746]

Archive:  wikitext-2-v1.zip
   creating: wikitext-2/
  inflating: wikitext-2/wiki.test.tokens  
  inflating: wikitext-2/wiki.valid.tokens  
  inflating: wikitext-2/wiki.train.tokens  


## Parallel Training with Ray SGD

In this section, we show how to use RaySGD to scale up the training of the HuggingFace Transformer library.

First we define the arguments for training:

In [4]:
# Training arguments (from hugging face)
training_arguments = TrainingArguments(
    output_dir = "/home/ubuntu/ray-e2e-nlp-example/output_dir/",
    learning_rate = 2e-5,
    num_train_epochs = 3,
    fp16 = True,
    do_train = True,
    do_eval = True
)
args = argparse.Namespace(**vars(training_arguments))
# args = training_arguments

# Model arguments
args.model_name_or_path = "gpt2"
args.model_type = "gpt2"
args.config_name = None
args.tokenizer_name = None
args.cache_dir = None

# Data processing arguments
args.train_data_file = "/home/ubuntu/ray-e2e-nlp-example/wikitext-2/wiki.train.tokens"
args.eval_data_file = "/home/ubuntu/ray-e2e-nlp-example/wikitext-2/wiki.test.tokens"
args.line_by_line = False
args.block_size = 128
args.overwrite_cache = False
args.tensorboard_dir = "/home/ubuntu/ray_results/ray-e2e-nlp-example/"

# Ray arguments
args.num_workers = 4
args.address = "auto"

use_gpu = torch.cuda.is_available() and not args.no_cuda
args.device = torch.device("cuda" if use_gpu else "cpu")

args

Namespace(adam_epsilon=1e-08, address='auto', block_size=128, cache_dir=None, config_name=None, dataloader_drop_last=False, device=device(type='cuda'), do_eval=True, do_predict=False, do_train=True, eval_data_file='/home/ubuntu/ray-e2e-nlp-example/wikitext-2/wiki.test.tokens', evaluate_during_training=False, fp16=True, fp16_opt_level='O1', gradient_accumulation_steps=1, learning_rate=2e-05, line_by_line=False, local_rank=-1, logging_dir='runs/Jun05_22-32-13_ip-172-31-30-37', logging_first_step=False, logging_steps=500, max_grad_norm=1.0, max_steps=-1, model_name_or_path='gpt2', model_type='gpt2', no_cuda=False, num_train_epochs=3, num_workers=4, output_dir='/home/ubuntu/ray-e2e-nlp-example/output_dir/', overwrite_cache=False, overwrite_output_dir=False, per_device_eval_batch_size=8, per_device_train_batch_size=8, per_gpu_eval_batch_size=None, per_gpu_train_batch_size=None, save_steps=500, save_total_limit=None, seed=42, tensorboard_dir='/home/ubuntu/ray_results/ray-e2e-nlp-example/', t

Here we set the random seeds for reproducibility:

In [5]:
def set_seed(args):
    random.seed(args.seed)
    np.random.seed(args.seed)
    torch.manual_seed(args.seed)
    torch.cuda.manual_seed_all(args.seed)

set_seed(args)

### Data creator

Then we define the data creator for the trainer. The data creator creates a train data loader object for training. Note that we do not need to wrap the data loader with a distributed loader since the RaySGD trainer will automatically does that.

In [6]:
def data_creator(config):
    args = config["args"]
    tokenizer = AutoTokenizer.from_pretrained(
        args.tokenizer_name
        if args.tokenizer_name else args.model_name_or_path,
        cache_dir=args.cache_dir if args.cache_dir else None,
    )
    
    if args.block_size <= 0:
        args.block_size = tokenizer.max_len
        # Our input block size will be the max possible for the model
    else:
        args.block_size = min(args.block_size, tokenizer.max_len)
    train_dataset = TextDataset(
        tokenizer=tokenizer, file_path=args.train_data_file, 
        block_size=args.block_size, overwrite_cache=args.overwrite_cache
    )
    train_sampler = RandomSampler(train_dataset) if not dist.is_initialized() else None
    train_loader = DataLoader(
        train_dataset,
        sampler=train_sampler,
        batch_size=args.per_device_train_batch_size
    )
    return train_loader

### Model creator

The model creator creates models for each training worker. Here we initialize the modelwith a trained GPT-2 model. 

In [7]:
def model_creator(config):
    with FileLock(os.path.expanduser("~/.download.lock")):
        args = config["args"]
        tokenizer = AutoTokenizer.from_pretrained(
            args.tokenizer_name
            if args.tokenizer_name else args.model_name_or_path,
            cache_dir=args.cache_dir if args.cache_dir else None,
        )
        model_config = AutoConfig.from_pretrained(
            args.config_name if args.config_name else args.model_name_or_path,
            cache_dir=args.cache_dir if args.cache_dir else None,
        )
        model = AutoModelWithLMHead.from_pretrained(
            args.model_name_or_path,
            from_tf=bool(".ckpt" in args.model_name_or_path),
            config=model_config,
            cache_dir=args.cache_dir if args.cache_dir else None,
        )
        model.resize_token_embeddings(len(tokenizer))
    return model

### Optimizer creator

We use Adam optimizer for training. In the following code, we group the parameters into two groups: one with weight decay and one without weight decay for training accuracy.

In [8]:
def optimizer_creator(model, config):
    args = config["args"]
    no_decay = ["bias", "LayerNorm.weight"]
    optimizer_grouped_parameters = [
        {
            "params": [
                p for n, p in model.named_parameters()
                if not any(nd in n for nd in no_decay)
            ],
            "weight_decay": args.weight_decay,
        },
        {
            "params": [
                p for n, p in model.named_parameters()
                if any(nd in n for nd in no_decay)
            ],
            "weight_decay": 0.0
        },
    ]

    return AdamW(
        optimizer_grouped_parameters,
        lr=args.learning_rate,
        eps=args.adam_epsilon)

### Training operator

Next we define the training operator. The training operator defines a custom training loop that includes gradient accumulation (i.e. perform gradient updates after a certain amount of forward and backward propagations). The training operator here also defines the warmup learning rate scheduler for the Adam optimizer.

In [9]:
def announce_training(args, dataset_len, t_total):
    # Train!
    print("***** Running training *****")
    print("CUDA_VISIBLE_DEVICES", os.environ["CUDA_VISIBLE_DEVICES"])
    print("  Num examples = %d" % dataset_len)
    print("  Num Epochs = %d" % args.num_train_epochs)
    print("  Instantaneous batch size per GPU = %d" %
          args.per_device_train_batch_size)
    print(
        "  Total train batch size (w. parallel, distributed & accum) = %d" %
        args.per_device_train_batch_size * args.gradient_accumulation_steps *
        args.num_workers
    )
    print("  Gradient Accumulation steps = %d" %
          args.gradient_accumulation_steps)
    print("  Total optimization steps = %d" % t_total)


class TransformerOperator(TrainingOperator):
    def setup(self, config):
        self.args = args = config["args"]
        self.tokenizer = AutoTokenizer.from_pretrained(
            args.tokenizer_name
            if args.tokenizer_name else args.model_name_or_path,
            cache_dir=args.cache_dir if args.cache_dir else None,
        )

        self.train_data_len = len(self.train_loader)
        self._warmup_scheduler = get_linear_schedule_with_warmup(
            self.optimizer,
            num_warmup_steps=args.warmup_steps,
            num_training_steps=self.calculate_t_total())
        self._global_step = 0

        announce_training(args, self.train_data_len, self.calculate_t_total())

    def train_batch(self, batch, batch_info=None):
        args = self.args
        model = self.model
        optimizer = self.optimizer
        step = batch_info["batch_idx"]

        model.train()
        batch = batch.to(self.device)
        outputs = model(input_ids=batch, labels=batch)

        # model outputs are always tuple in transformers (see doc)
        loss = outputs[0]

        if args.gradient_accumulation_steps > 1:
            loss = loss / args.gradient_accumulation_steps

        if args.fp16:
            with amp.scale_loss(loss, optimizer) as scaled_loss:
                scaled_loss.backward()
        else:
            loss.backward()

        batch_loss = loss.item()

        # last step in epoch but step is always smaller
        # than gradient_accumulation_steps
        ending = (self.train_data_len <= args.gradient_accumulation_steps
                  and (step + 1) == self.train_data_len)
        if (step + 1) % args.gradient_accumulation_steps == 0 or ending:
            if args.fp16:
                torch.nn.utils.clip_grad_norm_(
                    amp.master_params(optimizer), args.max_grad_norm)
            else:
                torch.nn.utils.clip_grad_norm_(model.parameters(),
                                               args.max_grad_norm)

            self.optimizer.step()
            self._warmup_scheduler.step()  # Update learning rate schedule
            model.zero_grad()
            self._global_step += 1

        learning_rate_scalar = self._warmup_scheduler.get_lr()[0]
        return {"learning_rate": learning_rate_scalar, "loss": batch_loss}

    def calculate_t_total(self):
        args = self.args
        grad_accum_steps = args.gradient_accumulation_steps
        train_data_len = len(self.train_loader)
        if args.max_steps > 0:
            t_total = args.max_steps
            args.num_train_epochs = args.max_steps // (
                train_data_len // grad_accum_steps) + 1
        else:
            t_total = (
                train_data_len // grad_accum_steps * args.num_train_epochs)
        return t_total

### RaySGD Torch Trainer

Finallly we define a RaySGD Torch trainer to perform distributed training.

In [10]:
trainer = TorchTrainer(
    model_creator=model_creator,
    data_creator=data_creator,
    optimizer_creator=optimizer_creator,
    training_operator_cls=TransformerOperator,
    use_fp16=args.fp16,
    apex_args={"opt_level": args.fp16_opt_level},
    num_workers=args.num_workers,
    use_gpu=use_gpu,
    use_tqdm=False,
    config={"args": args}
)

HBox(children=(FloatProgress(value=0.0, description='Downloading', max=665.0, style=ProgressStyle(description_…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=1042301.0, style=ProgressStyle(descript…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=456318.0, style=ProgressStyle(descripti…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=548118077.0, style=ProgressStyle(descri…


Selected optimization level O1:  Insert automatic casts around Pytorch functions and Tensor methods.

Defaults for this optimization level are:
enabled                : True
opt_level              : O1
cast_model_type        : None
patch_torch_functions  : True
keep_batchnorm_fp32    : None
master_weights         : None
loss_scale             : dynamic
Processing user overrides (additional kwargs that are not None)...
After processing overrides, optimization options are:
enabled                : True
opt_level              : O1
cast_model_type        : None
patch_torch_functions  : True
keep_batchnorm_fp32    : None
master_weights         : None
loss_scale             : dynamic
[2m[36m(pid=33029)[0m Selected optimization level O1:  Insert automatic casts around Pytorch functions and Tensor methods.
[2m[36m(pid=33029)[0m 
[2m[36m(pid=33029)[0m Defaults for this optimization level are:
[2m[36m(pid=33029)[0m enabled                : True
[2m[36m(pid=33029)[0m opt_level    

### Evaluation

Here we define the evalutate function, which evaluates the trained model on the evalutation dataset.

In [11]:
def evaluate(args, model, tokenizer):
    # Loop to handle MNLI double evaluation (matched, mis-matched)

    results = {}
    
    eval_dataset = TextDataset(
        tokenizer=tokenizer, file_path=args.eval_data_file, 
        block_size=args.block_size, overwrite_cache=args.overwrite_cache
    )

    args.eval_batch_size = args.per_device_eval_batch_size

    eval_sampler = SequentialSampler(eval_dataset)
    eval_dataloader = DataLoader(
        eval_dataset,
        sampler=eval_sampler,
        batch_size=args.eval_batch_size)

    eval_loss = 0.0
    nb_eval_steps = 0
    for batch in eval_dataloader:
        model.eval()
        batch = batch.to(args.device)

        with torch.no_grad():
            outputs = model(input_ids=batch, labels=batch)
            tmp_eval_loss = outputs[0]
            eval_loss += tmp_eval_loss.mean().item()
        nb_eval_steps += 1

    eval_loss = eval_loss / nb_eval_steps
    return {"loss": eval_loss}

### Training Loop

We define the training loop here. We will evaluate the model on the validation set every epoch. We also log the results to the tensorboard and thus we can get the training curve by clicking the tensorboard button on the Anyscale dashboard.

In [12]:
tokenizer = trainer.get_local_operator().tokenizer
local_model = trainer.get_model()

epochs_trained = 0
train_iterator = range(
    epochs_trained,
    int(args.num_train_epochs)
)
tensorboard_writer = SummaryWriter(log_dir=args.tensorboard_dir, flush_secs=30)

if args.do_train:
    for _ in train_iterator:
        train_stats = trainer.train()
        eval_stats = evaluate(args, local_model, tokenizer)
        print("Training stats:", train_stats)
        print("Validation stats:", eval_stats)
        tensorboard_writer.add_scalar('Loss/train', train_stats['loss'], train_stats["epoch"])
        tensorboard_writer.add_scalar('Loss/eval', eval_stats['loss'], train_stats["epoch"])

Gradient overflow.  Skipping step, loss scaler 0 reducing loss scale to 32768.0
[2m[36m(pid=33032)[0m Gradient overflow.  Skipping step, loss scaler 0 reducing loss scale to 32768.0
[2m[36m(pid=33029)[0m Gradient overflow.  Skipping step, loss scaler 0 reducing loss scale to 32768.0
[2m[36m(pid=33058)[0m Gradient overflow.  Skipping step, loss scaler 0 reducing loss scale to 32768.0
Gradient overflow.  Skipping step, loss scaler 0 reducing loss scale to 16384.0
[2m[36m(pid=33029)[0m Gradient overflow.  Skipping step, loss scaler 0 reducing loss scale to 16384.0
[2m[36m(pid=33032)[0m Gradient overflow.  Skipping step, loss scaler 0 reducing loss scale to 16384.0
[2m[36m(pid=33058)[0m Gradient overflow.  Skipping step, loss scaler 0 reducing loss scale to 16384.0
Gradient overflow.  Skipping step, loss scaler 0 reducing loss scale to 8192.0
[2m[36m(pid=33032)[0m Gradient overflow.  Skipping step, loss scaler 0 reducing loss scale to 8192.0
[2m[36m(pid=33029)[0m Gr

When the training finishes, we save the model to the disk and also shutdown the trainer to release the GPUs for serving.

In [13]:
def save_model(args, model, tokenizer):
    if not os.path.exists(args.output_dir):
        os.makedirs(args.output_dir)
    print("Saving model checkpoint to %s" % args.output_dir)
    model.save_pretrained(args.output_dir)
    tokenizer.save_pretrained(args.output_dir)
    torch.save(args, os.path.join(args.output_dir, "training_args.bin"))

save_model(args, local_model, tokenizer)
trainer.shutdown()

Saving model checkpoint to /home/ubuntu/ray-e2e-nlp-example/output_dir/


## Serving

Here we demonstrate how to use Ray Serve to serve the model we just trained.

First we define a serving backend, which is a Ray actor that processes incoming requests. Here we assume the request is a prefix of an English sentence, and we will use our model to predict the next word of the input segement.

In [14]:
serve.init()

[2m[36m(pid=33037)[0m 2020-06-05 22:57:38,205	INFO master.py:183 -- Starting metric exporter with name 'SERVE_METRIC_SINK_ACTOR'
[2m[36m(pid=33037)[0m 2020-06-05 22:57:38,222	INFO master.py:129 -- Starting router with name 'SERVE_ROUTER_ACTOR'
[2m[36m(pid=33037)[0m 2020-06-05 22:57:38,233	INFO master.py:152 -- Starting HTTP proxy with name 'SERVE_PROXY_ACTOR' on node 'node:172.31.30.37'
[2m[36m(pid=33046)[0m INFO:     Started server process [33046]
[2m[36m(pid=33046)[0m INFO:     Waiting for application startup.
[2m[36m(pid=33046)[0m INFO:     Application startup complete.


In [15]:
class NextWord:
    def __init__(self, args):
        self.args = args
        self.model = AutoModelWithLMHead.from_pretrained(args.output_dir)
        self.tokenizer = AutoTokenizer.from_pretrained(args.output_dir)
        self.model.to(args.device)

    def __call__(self, flask_request):
        input_sentence = flask_request.data.decode("utf-8")
        generated = self.tokenizer.encode(input_sentence)
        context = torch.tensor([generated]).to(args.device)
        past = None

        output, past = self.model(context, past=past)
        token = torch.argmax(output[..., -1, :])

        generated += [token.tolist()]
        context = token.unsqueeze(0)

        sequence = self.tokenizer.decode(generated)

        return sequence

# If the backend name have been defined before, we should delete them before create a new one.
# serve.delete_endpoint("nextword")
serve.create_backend("nextword", NextWord, args, ray_actor_options={"num_gpus": 1})

Now we create a serving endpoint at `/nextword`.

In [16]:
# Similarly, of the endpoint name have been defined before, we should delete them before create a new one.
# serve.delete_endpoint("nextword")
serve.create_endpoint("nextword", "/nextword", methods=["GET", "POST"])

[2m[36m(pid=33037)[0m 2020-06-05 22:57:53,092	INFO master.py:536 -- Registering route /nextword to endpoint nextword with methods ['GET', 'POST'].


Connect the endpoint with the backend.

In [17]:
serve.set_traffic("nextword", {"nextword": 1.0})

Now we can send the request to the server and receive the results:

In [18]:
r = requests.post("http://127.0.0.1:8000/nextword", data="The Manhattan bridge is a major")
r.text

'The Manhattan bridge is a major part'