Skip to content

Commit

Permalink
Merge pull request #71 from Bluefog-Lib/atc_multi_step
Browse files Browse the repository at this point in the history
ATC multi-step case
  • Loading branch information
bichengying committed Jan 24, 2021
2 parents c25967b + 902623f commit 43d03da
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 65 deletions.
40 changes: 30 additions & 10 deletions bluefog/torch/optimizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ class CommunicationType(Enum):
)
_warning_message_backward_pass_per_step = (
"Unexpected behavior:\n"
" After backward_passes_per_step times of backward computation `loss.backward()` are called,\n"
" After num_steps_per_communication times of backward computation `loss.backward()` are called,\n"
" an optimizer step() function must be called.\n"
" It does not matter how many step() functions are called in between.\n"
" Please adjust backward_passes_per_step to accumulate gradients locally.\n"
" Please adjust num_steps_per_communication to accumulate gradients locally.\n"
" More information can be found in the FAQ page.\n"
)

Expand Down Expand Up @@ -523,6 +523,8 @@ def __init__(self, params, model, communication_type, backward_passes_per_step=1

self._reduce_delay = {v: self._backward_passes_per_step
for _, v in sorted(named_parameters)}
self._step_delay = {v: self._backward_passes_per_step
for _, v in sorted(named_parameters)}
if os.getenv('BLUEFOG_TIMELINE'):
self.turn_on_timeline()
if bf.size() > 1:
Expand Down Expand Up @@ -568,7 +570,13 @@ def hook(grad):
"However, you can register you own step to this class. The signature "
"should be parameter-wise tep func:\n"
" func(parameter, gradient, parameter_group) -> None\n")
self._step_func(p, grad.data, param_group)
if self._step_delay[p] <= 0:
if not self._error_encountered:
warnings.warn(_warning_message_num_step_per_communication)
self._error_encountered = True
self._step_delay[p] -= 1
if self._step_delay[p] == 0:
self._step_func(p, grad.data, param_group)

if self._reduce_delay[p] <= 0:
if not self._error_encountered:
Expand Down Expand Up @@ -802,10 +810,22 @@ def step(self, closure=None):
loss = None
if closure is not None:
loss = closure()

if all(v != 0 for k, v in self._step_delay.items()):
# This corresponding to case 2 multiple local updates.
super(self.__class__, self).step()
elif all(v == 0 for k, v in self._step_delay.items()):
self._step_delay = self._step_delay.fromkeys(
self._step_delay, self._backward_passes_per_step)
else:
raise ValueError(
"We do not support partial step update in ATC yet.")

self.synchronize()
# TODO(ybc) Figure out a better and more robust way to do sync in ATC.
# Note, tere self. _synchronized just turns from true to false immediately.
self._synchronized = False

return loss
else:
# Optimizer.step() might be triggered when user calls broadcast_optimizer_state()
Expand Down Expand Up @@ -1330,7 +1350,7 @@ def DistributedHierarchicalNeighborAllreduceOptimizer(optimizer, model,


def DistributedGradientAllreduceOptimizer(optimizer, model,
backward_passes_per_step=1):
num_steps_per_communication=1):
"""
An distributed optimizer that wraps another torch.optim.Optimizer through allreduce ops.
The communication happens when backward propagation happens, which is the same as Horovod.
Expand Down Expand Up @@ -1376,12 +1396,12 @@ def DistributedGradientAllreduceOptimizer(optimizer, model,
(optimizer.__class__,),
dict(_DistributedOptimizer.__dict__),
)
return cls(optimizer.param_groups, model, backward_passes_per_step)
return cls(optimizer.param_groups, model, num_steps_per_communication)


def DistributedAdaptThenCombineOptimizer(optimizer, model,
communication_type=CommunicationType.neighbor_allreduce,
backward_passes_per_step=1):
num_steps_per_communication=1):
"""
An distributed optimizer that wraps another torch.optim.Optimizer.
The communication is applied on the parameters when backward propagation triggered and
Expand Down Expand Up @@ -1410,7 +1430,7 @@ def DistributedAdaptThenCombineOptimizer(optimizer, model,
communication_type: A enum type to determine use neighbor_allreduce, or allreduce, or
hierarchical_neighbor_allreduce, or empty function as communcaiton behavior.
Empty function just means no communication.
backward_passes_per_step: Number of expected backward function calls before each
num_steps_per_communication: Number of expected backward function calls before each
communication. This allows local model parameter updates
per num_steps_per_communication before reducing them over
distributed computation resources.
Expand All @@ -1422,7 +1442,7 @@ def DistributedAdaptThenCombineOptimizer(optimizer, model,
>>> opt = bf.DistributedAdaptWithCombineOptimizer(optimizer, model,
>>> communication_type=CommunicationType.neighbor_allreduce,
>>> backward_passes_per_step=J)
>>> num_steps_per_communication=J)
>>> opt.zero_grad()
>>> for j in range(J):
>>> output = model(data_batch_i)
Expand All @@ -1434,7 +1454,7 @@ def DistributedAdaptThenCombineOptimizer(optimizer, model,
>>> opt = bf.DistributedAdaptWithCombineOptimizer(optimizer, model,
>>> communication_type=CommunicationType.neighbor_allreduce,
>>> backward_passes_per_step=J)
>>> num_steps_per_communication=J)
>>> for j in range(J):
>>> output = model(data_batch_i)
>>> loss = ...
Expand All @@ -1447,7 +1467,7 @@ def DistributedAdaptThenCombineOptimizer(optimizer, model,
(optimizer.__class__,),
dict(_DistributedAdaptThenCombineOptimizer.__dict__),
)
return cls(optimizer.param_groups, model, communication_type, backward_passes_per_step)
return cls(optimizer.param_groups, model, communication_type, num_steps_per_communication)


def DistributedAdaptWithCombineOptimizer(optimizer, model,
Expand Down
16 changes: 6 additions & 10 deletions docs/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ FAQ
.. contents:: Question Lists
:local:

Why does the warning related to ``num_steps_per_communication`` or ``backward_passes_per_step`` pop up in Bluefog optimizer?
Why does the warning related to ``num_steps_per_communication`` pop up in Bluefog optimizer?
--------------------------------------------------------------------------------------------

During your usage of Bluefog distributed optimizer, you may encounter the following
Expand All @@ -21,13 +21,14 @@ two types of warnings:
It does not matter how many step() functions are called in between.
Please adjust num_step_per_communication to update model parameters locally.
More information can be found in the FAQ page.
.. code-block:: python
Warning (unexpected behavior):
After backward_passes_per_step times of backward computation `loss.backward()` are called,
After num_steps_per_communication times of backward computation `loss.backward()` are called,
an optimizer step() function must be called.
It does not matter how many step() functions are called in between.
Please adjust backward_passes_per_step to accumulate gradients locally.
Please adjust num_steps_per_communication to accumulate gradients locally.
More information can be found in the FAQ page.
.. note::
Expand All @@ -37,10 +38,6 @@ two types of warnings:
behavior during forward computation, while ``DistributedGradientAllreduceOptimizer`` and
``DistributedAdaptThenCombineOptimizer`` triggers during backward computation.
This is also reflected by the optimizer argument naming.
All other optimizers uses ``num_steps_per_communication``, while these two optimizers uses
``backward_passes_per_step``.
The following discussion only focuses on forward computation case, but it works for backward
scenario as well.

To understand the meaning of the above two warnings,
consider the following admissible code snippet for local gradient aggregation case:
Expand Down Expand Up @@ -112,10 +109,9 @@ mini batch here, due to the left over mini batch in the first batch. No warning
this process, since after every 3 **F**, an **S** is followed.
This kind of behavior may not be desired, and users should be careful with this situation.

These are two common usages for ``num_steps_per_communication`` or ``backward_passes_per_step`` for
These are two common usages for ``num_steps_per_communication`` for
Bluefog distributed optimizer. But other usage is also allowed, as long as after
``num_steps_per_communication`` forward computation or ``backward_passes_per_step``
backward propogation, the step function is executed.
``num_steps_per_communication`` forward computation or backward propogation, the step function is executed.
With that in mind, some other admissible calling procedures are **FFsFS**, **FsFFS**, etc.
Some inadmissible calling procedures are **FFFFS**, **FFsFFS**.

Expand Down

0 comments on commit 43d03da

Please sign in to comment.