<a href="https://colab.research.google.com/github/alfredcs/AWSabs/blob/master/Copy_of_pbt_transformers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Tuning 🤗 Transformers with Population Based Training
 

## Setup

The first step is to import our main libraries:

In this notebook we show how to fine tune our Huggingface transformers using Population Based Training. The corresponding blog post is [here](https://medium.com/@amog_97444/c4e32c6c989b?source=friends_link&sk=92c2ed36420cd9e26281fd51da7c19b6).

For our implementation of the fine tuning, we used [Ray Tune](https://https://docs.ray.io/en/master/tune/index.html), an open source library for scalable hyperparameter tuning. It is built on top of the [Ray](https://https://ray.io/) framework, which makes it perfect for parallel hyperparameter tuning on multiple GPUs. Make sure to set you runtime to use GPUs when going through this notebook. Since Colab provides us with limited memory and a single GPU, we use a much smaller transformer (tiny-distilroberta), run only 3 samples, and use a perturbation interval of 2 iterations in this notebook. The results in the blog post were obtained with a standard BERT model, 8 samples, perturbation after every iteration, and was run on a AWS p3.16xlarge instance. The exact code used for the blog post is [here](https://https://docs.ray.io/en/master/tune/examples/pbt_transformers.html)

Let’s take a look at how we can implement parallel Population Based Training for our transformers using this library!

In [None]:
!pip install transformers==3.0.2
!pip install ray==0.8.7
!pip install ray[tune]

Depending on your current setup, there might be other libraries you have to install like torch. Also if you’re wondering how I made the beautiful plots in the blog post, it’s with a library called [Weights & Biases](https://https://www.wandb.com/). If you'd like, we’ll go through how we can easily integrate W&B with our code as well so you can visualize your training runs, though using W&B is optional. First, create an account with them, and then we can install it and login:


In [None]:
!pip install wandb
import os
os.environ["WANDB_API_KEY"] = "567cfcfcfb79b870512bc37972a2c7d1a3d158f8"

Now we can get started with our code! The first step is to start up ray. If you’re running this on a cluster, make sure to specify an address to ray. For this notebook example, we don't have to worry about this. Also make sure to set log_to_driver to False, otherwise we get hit with a bunch of unnecessary tqdm training bars!

In [None]:
import ray

# If running on a cluster uncomment use the line below instead 
# ray.init(address="auto", log_to_driver=False)

ray.shutdown()
ray.init(log_to_driver=True, ignore_reinit_error=True)

Then, we can load and cache our transformer model, tokenizer, and the RTE dataset.


In [None]:
import os
from transformers import AutoTokenizer, AutoModelForSequenceClassification
# Util import
from ray.tune.examples.pbt_transformers import utils



# Set this to whatever you like
data_dir_name = "./data"
data_dir = os.path.abspath(os.path.join(os.getcwd(), data_dir_name))
if not os.path.exists(data_dir):
    os.mkdir(data_dir, 0o755)

# Change these as needed.
model_name = "sshleifer/tiny-distilroberta-base"
task_name = "rte"

task_data_dir = os.path.join(data_dir, task_name.upper())

# Download and cache tokenizer, model, and features
print("Downloading and caching Tokenizer")

# Triggers tokenizer download to cache
AutoTokenizer.from_pretrained(model_name)
print("Downloading and caching pre-trained model")

# Triggers model download to cache
AutoModelForSequenceClassification.from_pretrained(model_name)

# Download data.
utils.download_data(task_name, data_dir)

## Training

With everything now downloaded and cached, we can now set up our training function. Our training function defines the training execution for a single hyperparameter configuration. For now we pull these hyperparameters from a config argument, but we’ll see later how this is passed in.

First we get our datasets- we only use the first half of the dev dataset for validation, and leave the rest of testing:

In [None]:
from transformers import GlueDataTrainingArguments as DataTrainingArguments
from transformers import GlueDataset

def get_datasets(config):
  data_args = DataTrainingArguments(
        task_name=config["task_name"], data_dir=config["data_dir"])
  tokenizer = AutoTokenizer.from_pretrained(config["model_name"])
  train_dataset = GlueDataset(
      data_args,
      tokenizer=tokenizer,
      mode="train",
      cache_dir=config["data_dir"])
  eval_dataset = GlueDataset(
      data_args,
      tokenizer=tokenizer,
      mode="dev",
      cache_dir=config["data_dir"])
  # Only use the first half for validation
  eval_dataset = eval_dataset[:len(eval_dataset) // 2]
  return train_dataset, eval_dataset

### Checkpointing

We also need to add extra functionality for *checkpointing*. After every epoch of training, we need to save our training state. This is crucial for Population Based Training since it allows us to continue training from where we left off even when hyperparameters are perturbed. The Huggingface Trainer provides functionality to save and load from a checkpoint, but we do have to make some modifications to integrate this with Ray Tune checkpointing and to checkpoint after every epoch. The first step is to subclass the Trainer from the transformers library. Ray Tune provides this [TuneTransformerTrainer](https://github.com/ray-project/ray/blob/master/python/ray/tune/examples/pbt_transformers/trainer.py) subclass which we utilize. Take a look at the class- we see that it handles reporting evaluation metrics to Tune, checkpointing everytime evaluate is called, and even a way to pass in custom W&B arguments

In [None]:
import logging
import os
from typing import Dict, Optional, Tuple

from ray import tune

import transformers
from transformers.file_utils import is_torch_tpu_available
from transformers.trainer_utils import PREFIX_CHECKPOINT_DIR, is_wandb_available

import torch
from torch.utils.data import Dataset

if is_wandb_available():
  import wandb

class TuneTransformerTrainer(transformers.Trainer):
    def get_optimizers(
            self, num_training_steps
    ):
        self.current_optimizer, self.current_scheduler = super(
        ).get_optimizers(num_training_steps)
        return (self.current_optimizer, self.current_scheduler)

    def evaluate(self,
                 eval_dataset= None):
        eval_dataloader = self.get_eval_dataloader(eval_dataset)
        output = self._prediction_loop(
            eval_dataloader, description="Evaluation")
        self._log(output.metrics)

        self.save_state()

        tune.report(**output.metrics)

        return output.metrics

    def save_state(self):
        with tune.checkpoint_dir(step=self.global_step) as checkpoint_dir:
            self.args.output_dir = checkpoint_dir
            # This is the directory name that Huggingface requires.
            output_dir = os.path.join(
                self.args.output_dir,
                f"{PREFIX_CHECKPOINT_DIR}-{self.global_step}")
            self.save_model(output_dir)
            if self.is_world_master():
                torch.save(self.current_optimizer.state_dict(),
                           os.path.join(output_dir, "optimizer.pt"))
                torch.save(self.current_scheduler.state_dict(),
                           os.path.join(output_dir, "scheduler.pt"))

The only addition we have to make is to add a function to recover the checkpoint file from Tune's checkpoint directory

In [None]:
def recover_checkpoint(tune_checkpoint_dir, model_name=None):
    if tune_checkpoint_dir is None or len(tune_checkpoint_dir) == 0:
        return model_name
    # Get subdirectory used for Huggingface.
    subdirs = [
        os.path.join(tune_checkpoint_dir, name)
        for name in os.listdir(tune_checkpoint_dir)
        if os.path.isdir(os.path.join(tune_checkpoint_dir, name))
    ]
    # There should only be 1 subdir.
    assert len(subdirs) == 1, subdirs
    return subdirs[0]

Finally, we put all of these together as well as create our training arguments, model, and Huggingface Trainer:

In [None]:
from transformers import AutoConfig, TrainingArguments, glue_tasks_num_labels
from ray.tune.integration.wandb import wandb_mixin

@wandb_mixin
def train_transformer(config, checkpoint_dir=None):
  train_dataset, eval_dataset = get_datasets(config)

  training_args = TrainingArguments(
        output_dir=tune.get_trial_dir(),
        learning_rate=config["learning_rate"],
        do_train=True,
        do_eval=True,
        evaluate_during_training=True,
        # Run eval after every epoch.
        eval_steps=(len(train_dataset) // config["per_gpu_train_batch_size"]) +
        1,
        # We explicitly set save to 0, and do checkpointing in evaluate instead
        save_steps=0,
        num_train_epochs=config["num_epochs"],
        max_steps=config["max_steps"],
        per_device_train_batch_size=config["per_gpu_train_batch_size"],
        per_device_eval_batch_size=config["per_gpu_val_batch_size"],
        warmup_steps=0,
        weight_decay=config["weight_decay"],
        logging_dir="./logs",
    )

  model_name_or_path = recover_checkpoint(checkpoint_dir, config["model_name"])
  num_labels = glue_tasks_num_labels[config["task_name"]]

  config = AutoConfig.from_pretrained(
        model_name_or_path,
        num_labels=num_labels,
        finetuning_task=task_name,
    )
  model = AutoModelForSequenceClassification.from_pretrained(
        model_name_or_path,
        config=config,
    )
   
  # Use our modified TuneTransformerTrainer
  tune_trainer = TuneTransformerTrainer(
      model=model,
      args=training_args,
      train_dataset=train_dataset,
      eval_dataset=eval_dataset,
      compute_metrics=utils.build_compute_metrics_fn(task_name),
  )
  tune_trainer.train(model_name_or_path)

Our training function takes in 2 parameters: config which contains all of our hyperparameters, and checkpoint_dir which is a directory containing the previous state of our trial. As we'll see below, these 2 arguments are passed in to our training function by Tune


## Hyperparameter Tuning with Ray Tune

Now that we have our training function setup, we run our hyperparameter search with Ray Tune. We first create an initial hyperparameter configuration which specifies the hyperparameters each trial will use initially. For some of our hyperparameters, we want to try different configurations, so we sample those from a distribution.

We also pass in our W&B arguments here.

In [None]:
config = {
        # These 3 configs below were defined earlier
        "model_name": model_name,
        "task_name": task_name,
        "data_dir": task_data_dir,
        "per_gpu_val_batch_size": 32,
        "per_gpu_train_batch_size": tune.choice([16, 32, 64]),
        "learning_rate": tune.uniform(1e-5, 5e-5),
        "weight_decay": tune.uniform(0.0, 0.3),
        "num_epochs": tune.choice([2, 3, 4, 5]),
        "max_steps": -1,  # We use num_epochs instead.
        "wandb": {
            "project": "pbt_transformers",
            "reinit": True,
            "allow_val_change": True
        }
    }

Now we can set up our Population Based Training scheduler

In [None]:
from ray.tune.schedulers import PopulationBasedTraining

scheduler = PopulationBasedTraining(
        time_attr="training_iteration",
        metric="eval_acc",
        mode="max",
        perturbation_interval=2,
        hyperparam_mutations={
            "weight_decay": lambda: tune.uniform(0.0, 0.3).func(None),
            "learning_rate": lambda: tune.uniform(1e-5, 5e-5).func(None),
            "per_gpu_train_batch_size": [16, 32, 64],
        })

We also create a CLI reporter to view our results from the command line. We specify the hyperparameters we want to see from the command line, as well as what metrics we want to see. The metrics are the inputs to the tune.report we call we make in TuneTransformerTrainer.evaluate

In [None]:
from ray.tune import CLIReporter

reporter = CLIReporter(
        parameter_columns={
            "weight_decay": "w_decay",
            "learning_rate": "lr",
            "per_gpu_train_batch_size": "train_bs/gpu",
            "num_epochs": "num_epochs"
        },
        metric_columns=[
            "eval_acc", "eval_loss", "epoch", "training_iteration"
        ])

Finally, we pass in our training function, config, PBT scheduler, and reporter to tune:

In [None]:
analysis = tune.run(
        train_transformer,
        resources_per_trial={
            "cpu": 1,
            "gpu": 1
        },
        config=config,
        num_samples=3,
        scheduler=scheduler,
        keep_checkpoints_num=3,
        checkpoint_score_attr="training_iteration",
        progress_reporter=reporter,
        local_dir="./ray_results/",
        name="tune_transformer_pbt")

Let’s dive deeper into what’s going on here. Initially, tune creates 3 (from num_samples) trials, or instantiations of our training function. Each trial has a hyperparameter configuration provided by config. So we have 3 different executions of transformer fine-tuning, each with different hyperparameters, all running in parallel. However, we also pass in a PBT scheduler, with time_attr set to training_iteration and perturbation_interval set to 2. So, after 2 training iterations, we see PBT come into effect. The bottom 25% of trials according to eval_acc exploit from the top 25% of trials by copying over their model weights and hyperparameters. Then after copying over, we do exploration on these trials, by mutating certain hyperparameters specified by hyperparam_mutations. This is where checkpointing becomes crucial- this process results in a creation of a new trial, so we need checkpointing to continue training where we left off, except with the new hyperparameters. This process continues after each training iteration, and instead of randomly searching across our entire hyperparameter space, we can focus on the best performing trials and do a more fine-grained search in that smaller area.

## Testing the Best Model

Once our hyperparameter tuning experiment is complete, we can get the best performin model and try it out on our test set.

In [None]:
data_args = DataTrainingArguments(task_name=config["task_name"], data_dir=config["data_dir"])

tokenizer = AutoTokenizer.from_pretrained(config["model_name"])

best_config = analysis.get_best_config(metric="eval_acc", mode="max")
print(best_config)
best_checkpoint = recover_checkpoint(
    analysis.get_best_trial(metric="eval_acc",
                            mode="max").checkpoint.value)
print(best_checkpoint)
best_model = AutoModelForSequenceClassification.from_pretrained(
    best_checkpoint).to("cuda")

test_args = TrainingArguments(output_dir="./best_model_results", )
test_dataset = GlueDataset(
    data_args, tokenizer=tokenizer, mode="dev", cache_dir=data_dir)
test_dataset = test_dataset[len(test_dataset) // 2:]

test_trainer = transformers.Trainer(
    best_model,
    test_args,
    compute_metrics=utils.build_compute_metrics_fn(task_name))

metrics = test_trainer.evaluate(test_dataset)
print(metrics)