Skip to content

Commit

Permalink
Merge pull request #23 from Bluefog-Lib/forward-hook-bluefog
Browse files Browse the repository at this point in the history
Forward hook bluefog
  • Loading branch information
bichengying committed May 20, 2020
2 parents 0a9f998 + 251403c commit 905cf47
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 132 deletions.
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ then run it through ``bfrun``. That is it!
bf.init()
optimizer = optim.SGD(model.parameters(), lr=lr * bf.size())
optimizer = bf.DistributedBluefogOptimizer(
optimizer, named_parameters=model.named_parameters()
optimizer, named_parameters=model
)
...
Expand Down
218 changes: 96 additions & 122 deletions bluefog/torch/optimizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@
import torch
import bluefog.torch as bf


def named_leaf_module(module, parent_name=None):
"""Yield an iterator over all leaf modules."""
if len(list(module.named_children())) == 0:
yield (parent_name, module)
for name, ch_module in module.named_children():
full_name = (parent_name + '.' + name if parent_name else name)
yield from named_leaf_module(ch_module, full_name)


# TODO(ybc) Use interface to refactor the code.
#pylint: disable=unused-argument
class _DistributedOptimizer(torch.optim.Optimizer):
Expand Down Expand Up @@ -106,7 +116,7 @@ def hook(*ignore):
def _allreduce_grad_async(self, p):
name = self._parameter_names.get(p)
if self._use_timeline:
bf.timeline_end_activity(name)
bf.timeline_end_activity("allreduce." + name)
handle = bf.allreduce_async(
p.grad, average=True, name=name
)
Expand All @@ -119,16 +129,16 @@ def turn_on_timeline(self, model):
def _timeline_hook(model, *unused):
for name, _ in model.named_parameters():
bf.timeline_start_activity(
name, activity_name="GRADIENT COMPT.")
"allreduce." + name, activity_name="GRADIENT COMPT.")
backward_hook_handle = model.register_backward_hook(_timeline_hook)

def _timeline_forward_pre_hook(model, *unused):
for name, _ in model.named_parameters():
bf.timeline_start_activity(name, activity_name="FORWARD")
bf.timeline_start_activity("allreduce." + name, activity_name="FORWARD")

def _timeline_forward_hook(model, *unused):
for name, _ in model.named_parameters():
bf.timeline_end_activity(name)
bf.timeline_end_activity("allreduce." + name)

pre_forward_hook_handle = model.register_forward_pre_hook(
_timeline_forward_pre_hook)
Expand All @@ -144,7 +154,7 @@ def turn_off_timeline(self):
hooks.remove()
self._timeline_hook_handles.clear()
self._use_timeline = False

def synchronize(self):
missing_p = self._requires_update - set(self._handles.keys())
for p in missing_p:
Expand Down Expand Up @@ -227,17 +237,10 @@ class _DistributedNeighborAllreduceOptimizer(torch.optim.Optimizer):
w_{i+1, k} = Neighbor_Average( w_{i, k} - lr * local_grad(w_{i, k}) )
"""

def __init__(self, params, named_parameters):
def __init__(self, params, model):
super(self.__class__, self).__init__(params)

if named_parameters is not None:
named_parameters = list(named_parameters)
else:
named_parameters = [
("allreduce.noname.%s" % i, v)
for param_group in self.param_groups
for i, v in enumerate(param_group["params"])
]
named_parameters = list(model.named_parameters())

# make sure that named_parameters are tuples
if any([not isinstance(p, tuple) for p in named_parameters]):
Expand Down Expand Up @@ -267,6 +270,7 @@ def __init__(self, params, named_parameters):
"%s" % ", ".join(str(id) for id in unnamed_param_ids)
)

self._model = model
self._parameter_names = {v: k for k, v in sorted(named_parameters)}
self._handles = {}
self._requires_update = set()
Expand All @@ -278,59 +282,60 @@ def __init__(self, params, named_parameters):
self._register_hooks()

def _register_hooks(self):
for param_group in self.param_groups:
for p in param_group["params"]:
for parent_name, layer in named_leaf_module(self._model):
layer.register_forward_hook(self._make_hook(parent_name))

def _make_hook(self, parent_name):
def hook(model, *unused):
for name, p in model.named_parameters():
if self._use_timeline:
# End forward computation timeline
bf.timeline_end_activity(parent_name+'.'+name)
if p.requires_grad:
p.register_hook(self._make_hook(p))

def _make_hook(self, p):
def hook(*ignore):
assert not p.grad.requires_grad
handle = self._neighbor_allreduce_data_async(p)
self._handles[p] = handle

self._requires_update.add(p)
handle = self._neighbor_allreduce_data_async(p)
self._handles[p] = handle
return hook

def _neighbor_allreduce_data_async(self, p):
name = self._parameter_names.get(p)
if self._use_timeline:
bf.timeline_end_activity(name)
# End forward computation timeline
bf.timeline_end_activity("neighbor.allreduce." + name)
handle = bf.neighbor_allreduce_async(p.data, name=name)
return handle

def turn_on_timeline(self, model):
assert isinstance(
model, torch.nn.Module), "You have to provide nn.model to turn on timeline"

def turn_on_timeline(self):
def _timeline_hook(model, *unused):
for name, _ in model.named_parameters():
bf.timeline_start_activity(
name, activity_name="GRADIENT COMPT.")
backward_hook_handle = model.register_backward_hook(_timeline_hook)
"neighbor.allreduce." + name, activity_name="GRADIENT COMPT.")
backward_hook_handle = self._model.register_backward_hook(
_timeline_hook)

def _timeline_forward_pre_hook(model, *unused):
for name, _ in model.named_parameters():
bf.timeline_start_activity(name, activity_name="FORWARD")
def _make_backward_end_timeline_hook(name):
def hook(*ignore):
if self._use_timeline:
bf.timeline_end_activity("neighbor.allreduce." + name)
return hook

def _timeline_forward_hook(model, *unused):
for param_group in self.param_groups:
for p in param_group["params"]:
if p.requires_grad:
name = self._parameter_names.get(p)
p.register_hook(_make_backward_end_timeline_hook(name))

def _timeline_forward_pre_hook(model, *unused):
for name, _ in model.named_parameters():
bf.timeline_end_activity(name)
bf.timeline_start_activity(
"neighbor.allreduce." + name, activity_name="FORWARD")

pre_forward_hook_handle = model.register_forward_pre_hook(
pre_forward_hook_handle = self._model.register_forward_pre_hook(
_timeline_forward_pre_hook)
forward_hook_handle = model.register_forward_hook(
_timeline_forward_hook)
self._timeline_hook_handles.extend([backward_hook_handle,
pre_forward_hook_handle,
forward_hook_handle])
pre_forward_hook_handle])
self._use_timeline = True

def turn_off_timeline(self):
for hook in self._timeline_hook_handles:
hook.remove()
self._timeline_hook_handles.clear()
self._use_timeline = False

def synchronize(self):
missing_p = self._requires_update - set(self._handles.keys())
for p in missing_p:
Expand Down Expand Up @@ -369,31 +374,16 @@ def step(self, closure=None):

class _DistributedBluefogOptimizer(torch.optim.Optimizer):

def __init__(self, params, named_parameters):
def __init__(self, params, model):
super(self.__class__, self).__init__(params)

if named_parameters is not None:
named_parameters = list(named_parameters)
else:
named_parameters = [
("win.put.noname.%s" % i, v)
for param_group in self.param_groups
for i, v in enumerate(param_group["params"])
]

# make sure that named_parameters are tuples
if any([not isinstance(p, tuple) for p in named_parameters]):
raise ValueError(
"named_parameters should be a sequence of "
"tuples (name, parameter), usually produced by "
"model.named_parameters()."
)
named_parameters = list(model.named_parameters())

dups = _DistributedOptimizer.find_duplicates(
[k for k, _ in named_parameters])
if dups:
raise ValueError(
"Parameter names in named_parameters must be unique. "
"Parameter names in model.named_parameters must be unique. "
"Found duplicates: %s" % ", ".join(dups)
)

Expand All @@ -409,9 +399,9 @@ def __init__(self, params, named_parameters):
"%s" % ", ".join(str(id) for id in unnamed_param_ids)
)

self._model = model
self._parameter_names = {v: k for k, v in sorted(named_parameters)}
self._handles = {}
self._requires_update = set()
self._handles = {} # store parameter -> handle
self._synchronized = False
self._should_synchronize = True
self._use_timeline = False
Expand All @@ -421,10 +411,20 @@ def __init__(self, params, named_parameters):
self._register_hooks()

def _register_hooks(self):
for param_group in self.param_groups:
for p in param_group["params"]:
for parent_name, layer in named_leaf_module(self._model):
layer.register_forward_hook(self._make_hook(parent_name))

def _make_hook(self, parent_name):
def hook(model, *unused):
for name, p in model.named_parameters():
if self._use_timeline:
# End forward computation timeline
bf.timeline_end_activity(parent_name+'.'+name)
if p.requires_grad:
p.register_hook(self._make_hook(p))
handle = bf.win_put_async(
tensor=p.data, name=parent_name+'.'+name)
self._handles[p] = handle
return hook

def _register_window(self):
for param_group in self.param_groups:
Expand All @@ -437,32 +437,7 @@ def _register_window(self):
raise ValueError(
"Cannot allocate MPI window for the parameter {}".format(name))

def _make_hook(self, p):
def hook(*ignore):
assert not p.grad.requires_grad
name = self._parameter_names.get(p)
if self._use_timeline:
bf.timeline_end_activity(name)
handle = bf.win_put_async(tensor=p.data, name=name)
self._handles[p] = handle
return hook

def _win_put_async(self, p):
name = self._parameter_names.get(p)
handle = bf.win_put_async(tensor=p.data, name=name)
return handle

def synchronize(self):
missing_p = self._requires_update - set(self._handles.keys())
for p in missing_p:
handle = self._win_put_async(p)
self._handles[p] = handle

for p, handle in self._handles.items():
if handle is None:
handle = self._win_put_async(p)
self._handles[p] = handle

# Here synchronize just to make sure win_put ops is finished
# in one iteration.
with torch.no_grad():
Expand All @@ -475,31 +450,34 @@ def synchronize(self):
self._handles.clear()
self._synchronized = True

def turn_on_timeline(self, model):
assert isinstance(
model, torch.nn.Module), "You have to provide nn.model to turn on timeline"

def turn_on_timeline(self):
def _timeline_hook(model, *unused):
for name, _ in model.named_parameters():
bf.timeline_start_activity(
name, activity_name="GRADIENT COMPT.")
backward_hook_handle = model.register_backward_hook(_timeline_hook)
backward_hook_handle = self._model.register_backward_hook(
_timeline_hook)

def _make_backward_end_timeline_hook(name):
def hook(*ignore):
if self._use_timeline:
bf.timeline_end_activity(name)
return hook

for param_group in self.param_groups:
for p in param_group["params"]:
if p.requires_grad:
name = self._parameter_names.get(p)
p.register_hook(_make_backward_end_timeline_hook(name))

def _timeline_forward_pre_hook(model, *unused):
for name, _ in model.named_parameters():
bf.timeline_start_activity(name, activity_name="FORWARD")

def _timeline_forward_hook(model, *unused):
for name, _ in model.named_parameters():
bf.timeline_end_activity(name)

pre_forward_hook_handle = model.register_forward_pre_hook(
pre_forward_hook_handle = self._model.register_forward_pre_hook(
_timeline_forward_pre_hook)
forward_hook_handle = model.register_forward_hook(
_timeline_forward_hook)
self._timeline_hook_handles.extend([backward_hook_handle,
pre_forward_hook_handle,
forward_hook_handle])
pre_forward_hook_handle])
self._use_timeline = True

def turn_off_timeline(self):
Expand All @@ -526,23 +504,20 @@ def step(self, closure=None):



def DistributedBluefogOptimizer(optimizer, named_parameters=None):
"""An distributed optimizer that wraps another torch.optim.Optimizer through
mpi_win_put ops.
def DistributedBluefogOptimizer(optimizer, model):
"""An distributed optimizer that wraps another torch.optim.Optimizer with
pull model average through bf.win_put ops.
Arguments:
optimizer: Optimizer to use for computing gradients and applying updates.
named_parameters: A mapping between parameter names and values. Used for naming of
window operations. Typically just ``model.named_parameters()``
model: The model you want to train with. (Sorry, we only support single model)
Example:
>>> import bluefog.torch as bf
>>> ...
>>> bf.init()
>>> optimizer = optim.SGD(model.parameters(), lr=lr * bf.size())
>>> optimizer = bf.DistributedBluefogOptimizer(
... optimizer, named_parameters=model.named_parameters()
... )
>>> optimizer = bf.DistributedBluefogOptimizer(optimizer, model)
"""
# We dynamically create a new class that inherits from the optimizer that was passed in.
# The goal is to override the `step()` method.
Expand All @@ -551,18 +526,17 @@ def DistributedBluefogOptimizer(optimizer, named_parameters=None):
(optimizer.__class__,),
dict(_DistributedBluefogOptimizer.__dict__),
)
return cls(optimizer.param_groups, named_parameters)
return cls(optimizer.param_groups, model)


def DistributedNeighborAllreduceOptimizer(optimizer, named_parameters=None):
def DistributedNeighborAllreduceOptimizer(optimizer, model):
"""
An distributed optimizer that wraps another torch.optim.Optimizer through
neighbor_allreduce ops.
Arguments:
optimizer: Optimizer to use for computing gradients and applying updates.
named_parameters: A mapping between parameter names and values. Used for naming of
allreduce operations. Typically just ``model.named_parameters()``
model: The model you want to train with. (Sorry, we only support single model)
"""
# We dynamically create a new class that inherits from the optimizer that was passed in.
# The goal is to override the `step()` method with neighbor_allreduce implementation.
Expand All @@ -571,7 +545,7 @@ def DistributedNeighborAllreduceOptimizer(optimizer, named_parameters=None):
(optimizer.__class__,),
dict(_DistributedNeighborAllreduceOptimizer.__dict__),
)
return cls(optimizer.param_groups, named_parameters)
return cls(optimizer.param_groups, model)


def DistributedAllreduceOptimizer(optimizer, named_parameters=None):
Expand Down

0 comments on commit 905cf47

Please sign in to comment.