In [None]:
#|hide
#|skip
! [ -e /content ] && pip install -Uqq fastai  # upgrade fastai on colab

In [None]:
#|default_exp accelerate

In [None]:
#|export
#nbdev_comment from __future__ import annotations
from fastai.basics import *
from fastai.callback.progress import ProgressCallback
from fastai.distributed import DistributedDL, rank0_first, setup_distrib, teardown_distrib
from fastai.optimizer import OptimWrapper
from accelerate import Accelerator

In [None]:
#hide
from nbdev.showdoc import *

# Accelerate and fastai

> Callbacks and helper functions to train in distributed training or DeepSpeed with Accelerate

Similarly to `fastai.distributed`, `fastai.accelerate` is an integration with the [Accelerate](https://github.com/huggingface/accelerate) framework, which aims to make it easy to use the same code on multiple CPU's, GPU's and TPU's (though unsupported currently in fastai). To use distributed training, there are only two required steps:

1. Add `with learn.accelerated():` before your `learn.fit` call
2. Run your training script with `accelerate launch scriptname.py ...args...`

An initial configuration can be performed by running `accelerate config`, though ensure that **mixed_precision** should always be set to **no** as fastai's `Learner.to_fp16` should be used instead. 

See below for details on the full API and underlying helper functions, if needed -- however, note that you will not need anything except the above unless you need to change how the distributed training is implemented.

Finally at the bottom is directions for how to utilize `Accelerate`'s `notebook_launcher` to launch distributed processes straight from a Jupyter Notebook!

## Accelerate

### AcceleratedTrainer -

In [None]:
#export
_hidden_params = ["mixed_precision", "fp16", "log_with", "logging_dir", "step_scheduler_with_optimizer"]

In [None]:
#|export
class AcceleratedTrainer(Callback):
    "Wrap `model` in `DistributedDataParallel` and `dls` in `DistributedDL` to be used in the Accelerate framework"
    order = 11
    @delegates(Accelerator, but=_hidden_params)
    def __init__(self,
        sync_bn=True, # Whether to replace all batch norm with `nn.SyncBatchNorm`
        **kwargs
    ):
        store_attr()
        self.accelerator = Accelerator(**kwargs)
    def before_fit(self):
        self.learn.model = self.accelerator.prepare(
            nn.SyncBatchNorm.convert_sync_batchnorm(self.model) if self.sync_bn else self.model
        )
        self.old_dls, self.old_opt = list(self.dls), self.opt
        self.learn.dls.loaders = [self._wrap_dl(dl) for dl in self.dls]
        if rank_distrib(): self.learn.logger=noop

    def _wrap_dl(self, dl):
        return dl if isinstance(dl,DistributedDL) else DistributedDL(dl)
    
    def before_backward(self):
        # Apply Accelerator backward which handles DeepSpeed, otherwise will call loss_grad.backward()
        self.accelerator.backward(self.learn.loss_grad)
        raise CancelBackwardException()

    def before_train(self):    self.learn.dl = self._wrap_dl(self.learn.dl)
    def before_validate(self): self.learn.dl = self._wrap_dl(self.learn.dl)
    def after_fit(self): self.learn.model,self.learn.dls.loaders = self.learn.model.module,self.old_dls

In [None]:
#|export
@patch
@delegates(Accelerator, but=_hidden_params)
def to_accelerate(self: Learner,
        sync_bn=True, # Whether to replace all batch norm with `nn.SyncBatchNorm`
        **kwargs
    ):
    "Add `AcceleratedTrainer` to a learner, and configures an Accelerator"
    self.add_cb(AcceleratedTrainer(sync_bn, **kwargs))
    if rank_distrib(): self.remove_cb(ProgressCallback)
    return self

In [None]:
#|export
@patch
def detach_accelerate(self: Learner):
    "Remove `DistributedTrainer` from a learner"
    if num_distrib() <=1: return self
    self.remove_cb(AcceleratedTrainer)
    if rank_distrib() and not hasattr(self, 'progress'): self.add_cb(ProgressCallback())
    return self

### `accelerate` context manager

In [None]:
#|export
@patch
@contextmanager
@delegates(Accelerator, but=_hidden_params)
def accelerate_ctx(self: Learner,
        sync_bn=True, # Whether to replace all batch norm with `nn.SyncBatchNorm`
        in_notebook=False, # Whether we are launching from a notebook or not
        **kwargs
   ):
    "A context manager to adapt a learner to train in distributed data parallel mode."
    # Adapt self to DistributedDataParallel, yield, and cleanup afterwards.
    cleanup_dpg = False
    try:
        if in_notebook:
            cuda_id = rank_distrib()
            if not torch.distributed.is_initialized():
                setup_distrib(cuda_id)
                cleanup_dpg = torch.distributed.is_initialized()
            if not rank_distrib(): print("Training Learner...")
        if num_distrib(): self.to_accelerate(sync_bn, **kwargs)
        yield self
    finally:
        self.detach_accelerate()
        if cleanup_dpg: teardown_distrib()

`accelerate_ctx` prepares a learner to train in distributed data parallel mode with Accelerate. 

Typical usage:

```
with learn.accelerate_ctx(**kwargs): learn.fit(.....)
```

It attaches a `AcceleratedTrainer` callback and `DistributedDL` dataloader to  the learner, then executes `learn.fit(.....)`.  Upon exiting the context, it removes the `AcceleratedTrainer` and `DistributedDL`, and destroys any locally created distributed process group.  The process is still attached to the GPU though.

## Notebook Launcher

Accelerate provides a [notebook_launcher](https://huggingface.co/docs/accelerate/launcher) functionality to let you keep using your Jupyter Notebook as you would, but train in a distributed setup!

To utilize this functionality, migrate your training into a function, and pass this to `notebook_launcher`, such as:

```python
---
from fastai.vision.all import *
from fastai.accelerate import *

set_seed(99, True)
path = untar_data(URLs.PETS)/'images'
dls = ImageDataLoaders.from_name_func(
    path, get_image_files(path), valid_pct=0.2,
    label_func=lambda x: x[0].isupper(), item_tfms=Resize(224))
    
learn = vision_learner(dls, resnet34, metrics=error_rate).to_fp16()

def train():
    with learn.accelerate_ctx(in_notebook=True):
        learn.fine_tune(1)
---
from accelerate import notebook_launcher
notebook_launcher(train, num_processes=2)
---
```

> Note: You must specify the number of GPUs to use with `num_processes`. 

## Export -

In [None]:
#|hide
from nbdev.export import notebook2script
notebook2script()

Converted 00_torch_core.ipynb.
Converted 01_layers.ipynb.
Converted 01a_losses.ipynb.
Converted 02_data.load.ipynb.
Converted 03_data.core.ipynb.
Converted 04_data.external.ipynb.
Converted 05_data.transforms.ipynb.
Converted 06_data.block.ipynb.
Converted 07_vision.core.ipynb.
Converted 08_vision.data.ipynb.
Converted 09_vision.augment.ipynb.
Converted 09b_vision.utils.ipynb.
Converted 09c_vision.widgets.ipynb.
Converted 10_tutorial.pets.ipynb.
Converted 10b_tutorial.albumentations.ipynb.
Converted 11_vision.models.xresnet.ipynb.
Converted 12_optimizer.ipynb.
Converted 13_callback.core.ipynb.
Converted 13a_learner.ipynb.
Converted 13b_metrics.ipynb.
Converted 14_callback.schedule.ipynb.
Converted 14a_callback.data.ipynb.
Converted 15_callback.hook.ipynb.
Converted 15a_vision.models.unet.ipynb.
Converted 16_callback.progress.ipynb.
Converted 17_callback.tracker.ipynb.
Converted 18_callback.fp16.ipynb.
Converted 18a_callback.training.ipynb.
Converted 18b_callback.preds.ipynb.
Converted 