# Distributed training with Accelerate
As models get bigger, **parallelism** has emerged as a strategy for **training larger models on limited hardware and accelerating training speed** by several orders of magnitude.

HuggingFace created the Accelerate library to help users easily train a Transformers model on any type of distributed setup, whether it is **multiple GPU’s on one machine** or **multiple GPU’s across several machines**.

In this tutorial, we will learn how to customize our native PyTorch training loop to enable training in a distributed environment.

We get start by importing and creating an Accelerator object. The Accelerator will automatically detect our type of distributed setup and initialize all the necessary components for training. You don’t need to explicitly place your model on a device.

In [1]:
from accelerate import Accelerator

accelerator = Accelerator()

## Prepare to accelerate
The next step is to pass all the relevant training objects to the prepare method.

In [None]:
train_dataloader, eval_dataloader, model, optimizer = accelerator.prepare(
    train_dataloader, eval_dataloader, model, optimizer
)

Your training dataloader will be sharded across all GPUs/TPU cores available so that each one sees a different portion of the training dataset.

Also, the random states of all processes will be synchronized at the beginning of each iteration through your dataloader, to make sure the data is shuffled the same way (if you decided to use shuffle=True or any kind of random sampler).

> The actual batch size for your training will be the number of devices used multiplied by the batch size you set in your script: for instance training on 4 GPUs with a batch size of 16 set when creating the training dataloader will train at an actual batch size of 64.

Alternatively, you can use the option <code>split_batches=True</code> when creating and initializing your <code>Accelerator</code>, in which case the batch size will always stay the same, whether you run your script on 1, 2, 4, or 64 GPUs.

You may or may not want to send your validation dataloader to prepare(), depending on whether you want to run distributed evaluation or not.

## Backward
The last addition is to replace the typical loss.backward() in your training loop with Accelerate’s backward method:

In [None]:
for epoch in range(num_epochs):
    for batch in train_dataloader:
        outputs = model(**batch)
        loss = outputs.loss
        accelerator.backward(loss)  # I'm here!

        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()
        progress_bar.update(1)

We only need to add four additional lines of code to our training loop to enable distributed training!

The overall code is given below.

In [21]:
import torch
import evaluate
from tqdm import tqdm
from accelerate import Accelerator
from torch.utils.data import DataLoader
from datasets import load_dataset
from transformers import AdamW, AutoModelForSequenceClassification, get_scheduler, AutoTokenizer


def training():

    # Initialize the Accelerator object for distributed training
    accelerator = Accelerator()
    
    # Load the pretrained model, tokenizer
    model = AutoModelForSequenceClassification.from_pretrained("bert-base-cased", num_labels=5)
    tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
    
    # Define the AdamW optimizer
    optimizer = AdamW(model.parameters(), lr=3e-5)


    # Define a function to tokenize the text examples
    def tokenize_function(examples):
        return tokenizer(examples["text"], padding="max_length", truncation=True)

        
    # Load the yelp review dataset and apply the tokenizer
    dataset = load_dataset("yelp_review_full")
    tokenized_datasets = dataset.map(tokenize_function, batched=True)
    tokenized_datasets = tokenized_datasets.remove_columns(["text"])
    tokenized_datasets = tokenized_datasets.rename_column("label", "labels")
    tokenized_datasets.set_format("torch")

    small_train_dataset = tokenized_datasets["train"].shuffle(seed=42).select(range(1000))
    small_eval_dataset = tokenized_datasets["test"].shuffle(seed=42).select(range(1000))
    
    # Create data loaders for training and evaluation
    train_dataloader = DataLoader(small_train_dataset, shuffle=True, batch_size=16)
    eval_dataloader = DataLoader(small_eval_dataset, batch_size=16)
    
    # Prepare the model, optimizer, and data loaders for distributed training
    train_dataloader, eval_dataloader, model, optimizer = accelerator.prepare(
        train_dataloader, eval_dataloader, model, optimizer
    )
    
    # Set the number of epochs and the learning rate scheduler
    num_epochs = 3
    num_training_steps = num_epochs * len(train_dataloader)
    lr_scheduler = get_scheduler(
      "linear",
      optimizer=optimizer,
      num_warmup_steps=0,
      num_training_steps=num_training_steps
    )
    
    # Create a progress bar to track the training steps
    progress_bar = tqdm(range(num_training_steps))

    # Load the accuracy metric
    metric = evaluate.load("accuracy")
    
    # Train the model for the specified number of epochs
    for epoch in range(num_epochs):
        # In training mode
        model.train()
        
        # Loop over the batches in the training data loader
        for batch in train_dataloader:
            # Forward pass: compute the model outputs and loss
            outputs = model(**batch)
            loss = outputs.loss
            
            # Backward pass: compute the gradients using Accelerator's backward method
            accelerator.backward(loss)
    
            # Update the model parameters and the learning rate
            optimizer.step()
            lr_scheduler.step()
            
            # Zero out the gradients for the next batch
            optimizer.zero_grad()
            
            # Update the progress bar
            progress_bar.update(1)
            
        # In evaluation mode
        model.eval()

        # Perform evaluation every epoch
        for batch in eval_dataloader:
            # Forward pass: compute the prediction digits
            outputs = model(**batch)
            # Gather all predictions and targets
            all_outputs, all_labels = accelerator.gather_for_metrics((outputs, batch['labels']))
            # Get the predictions from the logits
            all_logits = all_outputs.logits
            all_predictions = torch.argmax(all_logits, dim=-1)
            # Example of use with a *Datasets.Metric*
            metric.add_batch(predictions=all_predictions, references=all_labels)

        # Compute the metric value over all batches and update the progress bar
        progress_bar.set_postfix({'Accuracy': metric.compute()['accuracy']})

## Train
Once you’ve added the relevant lines of code, launch your training in a script or a notebook.

### Train with a script
If you are running your training from a script, run the following command to create and save a configuration file:
```
accelerate config
```
Reply to the questions asked, and this will save a default_config.yaml file in your cache folder, default to be here: <code>~/.cache/huggingface/accelerate</code>. You can also specify with the flag --config_file the location of the file you want to save.

Once this is done, you can test everything is going well on your setup by running:

```
accelerate test
```
This will launch a short script that will test the distributed environment. If it runs fine, you are ready for the next step!

Then launch your training with:

In [20]:
!accelerate launch distributed-training.py

Some weights of the model checkpoint at bert-base-cased were not used when initializing BertForSequenceClassification: ['cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-cased and are newly initi

If you stored the config file in a non-default location, you can indicate it to the launcher like this:

In [None]:
!accelerate launch distributed-training.py --config_file path_to_config.yaml --args_for_the_script

To see the complete list of args parameters that you can pass in, run
```
accelerate launch -h
```

### Train with a notebook
Accelerate can also run in a notebook. Wrap all the code responsible for training in a function, and pass it to notebook_launcher.

> The Accelerator object should only be defined inside the training function. This is because the initialization should be done inside the launcher only.

In [22]:
from accelerate import notebook_launcher

notebook_launcher(training, num_processes=2)

Launching training on 2 GPUs.


Some weights of the model checkpoint at bert-base-cased were not used when initializing BertForSequenceClassification: ['cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.bias']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-cased and are newly initi

  0%|          | 0/2 [00:00<?, ?it/s]

Found cached dataset yelp_review_full (/home/zonghang/.cache/huggingface/datasets/yelp_review_full/yelp_review_full/1.0.0/e8e18e19d7be9e75642fc66b198abadb116f73599ec89a69ba5dd8d1e57ba0bf)
Loading cached processed dataset at /home/zonghang/.cache/huggingface/datasets/yelp_review_full/yelp_review_full/1.0.0/e8e18e19d7be9e75642fc66b198abadb116f73599ec89a69ba5dd8d1e57ba0bf/cache-70c092b3ee550def.arrow


  0%|          | 0/2 [00:00<?, ?it/s]

Map:   0%|          | 0/50000 [00:00<?, ? examples/s]

Loading cached processed dataset at /home/zonghang/.cache/huggingface/datasets/yelp_review_full/yelp_review_full/1.0.0/e8e18e19d7be9e75642fc66b198abadb116f73599ec89a69ba5dd8d1e57ba0bf/cache-70c092b3ee550def.arrow


Map:   0%|          | 0/50000 [00:00<?, ? examples/s]

Loading cached shuffled indices for dataset at /home/zonghang/.cache/huggingface/datasets/yelp_review_full/yelp_review_full/1.0.0/e8e18e19d7be9e75642fc66b198abadb116f73599ec89a69ba5dd8d1e57ba0bf/cache-8892a45b941239e8.arrow
Loading cached shuffled indices for dataset at /home/zonghang/.cache/huggingface/datasets/yelp_review_full/yelp_review_full/1.0.0/e8e18e19d7be9e75642fc66b198abadb116f73599ec89a69ba5dd8d1e57ba0bf/cache-8892a45b941239e8.arrow
Loading cached shuffled indices for dataset at /home/zonghang/.cache/huggingface/datasets/yelp_review_full/yelp_review_full/1.0.0/e8e18e19d7be9e75642fc66b198abadb116f73599ec89a69ba5dd8d1e57ba0bf/cache-f9b53c5c2da8d38a.arrow
100%|███████████████████████████| 96/96 [00:49<00:00,  1.92it/s, Accuracy=0.585]
100%|███████████████████████████| 96/96 [00:49<00:00,  1.92it/s, Accuracy=0.585]


## Distributed evaluation
You can perform regular evaluation in your training script, if you leave your validation dataloader out of the prepare() method. In this case, you will need to put the input data on the accelerator.device manually.

To perform distributed evaluation, send along your validation dataloader to the prepare() method:

In [None]:
eval_dataloader = accelerator.prepare(eval_dataloader)

As each device will only see part of the evaluation data, you will need to group your predictions together. This is very easy to do with the gather_for_metrics() method.

In [None]:
for batch in eval_dataloader:
    # Forward pass: compute the prediction digits
    outputs = model(**batch)
    # Gather all predictions and labels
    all_outputs, all_labels = accelerator.gather_for_metrics((outputs, batch['labels']))
    # Get the predictions from the logits
    all_logits = all_outputs.logits
    all_predictions = torch.argmax(all_logits, dim=-1)
    # Example of use with a *Datasets.Metric*
    metric.add_batch(predictions=all_predictions, references=all_labels)

# Compute the metric value over all batches
metric.compute()

> Similar to the training dataloader, passing your validation dataloader through prepare() may change it: if you run on X GPUs, it will have its length divided by X (since your actual batch size will be multiplied by X), unless you set <code>split_batches=True</code>.

## Other caveats
### Execute a statement only on one processes
Some of your instructions only need to run for one process on a given server: for instance a data download or a log statement. To do this, wrap the statement in a test like this:

In [None]:
if accelerator.is_local_main_process:
    # Is executed once per server

Another example is progress bars: to avoid having multiple progress bars in your output, you can display one only on the local main process:

In [None]:
from tqdm.auto import tqdm

progress_bar = tqdm(range(args.max_train_steps), disable=not accelerator.is_local_main_process)

The local means per machine: if you are running your training on two servers with several GPUs, the instruction will be executed once on each of those servers. If you need to execute something only once for all processes (and not per machine), wrap it in a test like this:

In [None]:
if accelerator.is_main_process:
    # Is executed once only

For printing statements, if you only want to execut it once per machine, you can just replace the print function by <code>accelerator.print</code>.

### Defer execution
When you run your usual script, instructions are executed in order. Using Accelerate to deploy your script on several GPUs at the same time introduces a complication: while each process executes all instructions in order, some may be faster than others.

You might need to **wait for all processes to have reached a certain point before executing a given instruction**. For instance, you shouldn’t save a model before being sure every process is done with training. 

To do this, just write the following line in your code:

In [None]:
accelerator.wait_for_everyone()

This instruction will block all the processes that arrive first until all the other processes have reached that point (if you run your script on just one GPU or CPU, this won’t do anything).

### Saving/loading a model
Saving the model you trained might need a bit of adjustment: first you should **wait for all processes to reach that point** in the script as shown above, and then, you should **unwrap your model** before saving it.

This is because when going through the prepare() method, your model may have been placed inside a bigger model, which deals with the distributed training. This in turn means that saving your model state dictionary without taking any precaution will take that potential extra layer into account, and you will end up with weights you can’t load back in your base model.

This is why it’s recommended to unwrap your model first. Here is an example:

In [None]:
accelerator.wait_for_everyone()
unwrapped_model = accelerator.unwrap_model(model)
accelerator.save(unwrapped_model.state_dict(), filename)

If your script contains logic to load a checkpoint, we also recommend you load your weights in the unwrapped model (this is only useful if you use the load function after making your model go through prepare()). 

Here is an example:

In [None]:
unwrapped_model = accelerator.unwrap_model(model)
unwrapped_model.load_state_dict(torch.load(filename))