Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expected to have finished reduction in the prior iteration before starting a new one. #24

Closed
rahular opened this issue Apr 6, 2021 · 11 comments

Comments

@rahular
Copy link

rahular commented Apr 6, 2021

I have modified the nlp_example to finetune an EncoderDecoder on translation data like this:

accelerator = Accelerator(device_placement=False, fp16=args.fp16, cpu=args.cpu)
def _tokenize(batch):
    if accelerator.distributed_type == DistributedType.TPU:
        src = tokenizer(batch[0], padding="max_length", max_length=128, return_tensors="pt")
        tgt = tokenizer(batch[1], padding="max_length", max_length=128, return_tensors="pt")
    else:
        src = tokenizer(list(batch[0]), padding="longest", return_tensors="pt")
        tgt = tokenizer(list(batch[1]), padding="longest", return_tensors="pt")
    return src, tgt
...
for step, batch in train_bar:
    src, tgt = _tokenize(batch)
    src["input_ids"] = src["input_ids"].to(accelerator.device)
    tgt["input_ids"] = tgt["input_ids"].to(accelerator.device)
    outputs = model(input_ids=src["input_ids"], decoder_input_ids=tgt["input_ids"], labels=tgt["input_ids"])
    loss = outputs.loss
    loss = loss / gradient_accumulation_steps
    accelerator.backward(loss)
    if step % gradient_accumulation_steps == 0:
        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()

    if step % eval_steps == 0:
        model.eval()
        for step, batch in enumerate(dev_dataloader):
            src, tgt = _tokenize(batch)
            src["input_ids"] = src["input_ids"].to(accelerator.device)
            tgt["input_ids"] = tgt["input_ids"].to(accelerator.device)
            with torch.no_grad():
                predictions = model.generate(
                    src["input_ids"],
                    decoder_start_token_id=tokenizer.convert_tokens_to_ids("[CLS]"),
                    num_beams=4,
                    repetition_penalty=1.0,
                    do_sample=False,
                    forced_bos_token_id=None,
                )
            pred_str = tokenizer.batch_decode(predictions, skip_special_tokens=True)
            ref_str = tokenizer.batch_decode(tgt["input_ids"], skip_special_tokens=True)
            metric.add_batch(
                predictions=accelerator.gather(pred_str), references=accelerator.gather([[r] for r in ref_str]),
            )
        eval_metric = metric.compute()
...

I am getting the following error during training

  File "trainer.py", line 104, in training_function
    outputs = model(input_ids=src["input_ids"], decoder_input_ids=tgt["input_ids"], labels=tgt["input_ids"])
  File "/home/wjv316/anaconda3/envs/indic/lib/python3.7/site-packages/torch/nn/modules/module.py", line 727, in _call_impl
    result = self.forward(*input, **kwargs)
  File "/home/wjv316/anaconda3/envs/indic/lib/python3.7/site-packages/torch/nn/parallel/distributed.py", line 606, in forward
    if self.reducer._rebuild_buckets():
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by (1) passing the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`; (2) making sure all `forward` function outputs participate in calculating loss. If you already have done the above two steps, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return value of `forward` of your module when reporting this issue (e.g. list, dict, iterable).

and the following during generation

  File "trainer.py", line 120, in training_function
    predictions = model.generate(
  File "/home/wjv316/anaconda3/envs/indic/lib/python3.7/site-packages/torch/nn/modules/module.py", line 779, in __getattr__
    type(self).__name__, name))
torch.nn.modules.module.ModuleAttributeError: 'DistributedDataParallel' object has no attribute 'generate'

Both are working fine if I change the configuration to use only one GPU using accelerate config

@sgugger
Copy link
Collaborator

sgugger commented Apr 6, 2021

Hi there! For your first problem, you have to set find_unused_parameters=True when creating the distributed model (as PyTorch tells you in the error message). This can be done (with a source install as it's a feature that was added recently) by creating a DistributedDataParallelKwargs containing this and passing it to your Accelerator:

from accelerate import DistributedDataParallelKwargs

ddp_kwargs = DistributedDataParallelKwargs(find_unused_parameters=True)
accelerator = Accelerator(kwargs_handlers=[ddp_kwargs])

This will still let your script run on one GPU/CPU (it will just be ignored then) and when in distributed training, should fix your first issue. If the error only appears when gradient_accumulation_steps > 1, you should set

ddp_kwargs = DistributedDataParallelKwargs(find_unused_parameters=args.gradient_accumulation_steps > 1)

For the second issue, the model is not the same once you have passed it to accelerator.prepare: it has been set up for distributed training and wrapped in a container (DistributedDataParallel) that does not have a generate method anymore. You can get your initial model back with

accelerator.unwrap_model(model)

so replace your generate line with:

predictions = accelerator.unwrap_model(model).generate(

@rahular
Copy link
Author

rahular commented Apr 6, 2021

@sgugger that worked, thanks!

@rahular rahular closed this as completed Apr 6, 2021
@rahular
Copy link
Author

rahular commented Apr 7, 2021

@sgugger on a related note, I am using sacrebleu as my metric and doing the following:

metric = load_metric("sacrebleu")
...
for step, (src, tgt) in enumerate(dev_dataloader):
    ...
    predictions = accelerator.gather(predictions)
    references = accelerator.gather(tgt["input_ids"])
    pred_str = tokenizer.batch_decode(predictions, skip_special_tokens=True)
    ref_str = tokenizer.batch_decode(references, skip_special_tokens=True)
    metric.add_batch(predictions=pred_str, references=[[r] for r in ref_str])
...
eval_metric = metric.compute()
accelerator.print(f"BLEU: {eval_metric['score']:.4f}")

Is this the correct way to do it? Or should I specifically tell the metric object that it is being used in a distributed environment as detailed in the docs

@sgugger
Copy link
Collaborator

sgugger commented Apr 7, 2021

This is the proper way indeed!

You don't have to use the metrics distributed part since accelerator.gather will gather your predictions and labels instead, and this script will work on distributed environment with several machines (multi-node distributed training) whereas the distributed env for metrics only works on one machine (since it saves things to disk).

@rahular
Copy link
Author

rahular commented Apr 7, 2021

Cool! Lastly, I am saving optimizer, etc. with accelerator.save. But is there an accelerate equivalent to model.save_pretrained(savedir)?

@sgugger
Copy link
Collaborator

sgugger commented Apr 7, 2021

Yes, for the model you should first make sure every process finished training with

accelerator.wait_for_everyone()

(it should work without but let's be cautious). Then you should unwrap the model to get the save_pretrainedmethod (same as with generate before):

unwrapped_model = accelerator.unwrap_model(model)

Then pass along the accelerator_save to save_pretrained so that it uses that for the saving:

unwrapped_model.save_pretrained(your_save_dir, save_function=accelerator.save)

Note that there are more examples using accelerate in the official Transformers examples if you want to see more usecases of the library :-)

@rahular
Copy link
Author

rahular commented Apr 7, 2021

That's great, thanks for all the awesome work you do!

@JulesGM
Copy link
Contributor

JulesGM commented Apr 13, 2023

(hi Rahul :)) @sgugger, is there a way to pass find_unused_parameters to accelerate just through the config file?

@sgugger
Copy link
Collaborator

sgugger commented Apr 13, 2023

No, only with the kwargs_handlers argument.

@sunyuhan19981208
Copy link

@sgugger This answer really helped.

@jaydeepborkar
Copy link

@sgugger thank you so much! worked like magic!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants