Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add adaptivelsgd in meta_optimizer #27289

Merged
merged 5 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions paddle/fluid/framework/distributed_strategy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ message LocalSGDConfig {
optional int32 begin_step = 2 [ default = 1 ];
}

message AdaptiveLocalSGDConfig {
optional int32 init_k_steps = 1 [ default = 1 ];
optional int32 begin_step = 2 [ default = 1 ];
}

message GradientMergeConfig {
optional int32 k_steps = 1 [ default = 1 ];
optional bool avg = 2 [ default = true ];
Expand Down Expand Up @@ -121,6 +126,7 @@ message DistributedStrategy {
optional bool cudnn_exhaustive_search = 21 [ default = true ];
optional int32 conv_workspace_size_limit = 22 [ default = 4000 ];
optional bool cudnn_batchnorm_spatial_persistent = 23 [ default = true ];
optional bool adaptive_localsgd = 24 [ default = false ];

optional RecomputeConfig recompute_configs = 101;
optional AMPConfig amp_configs = 102;
Expand All @@ -131,6 +137,7 @@ message DistributedStrategy {
optional AsyncConfig a_sync_configs = 107;
optional LarsConfig lars_configs = 108;
optional LambConfig lamb_configs = 109;
optional AdaptiveLocalSGDConfig adaptive_localsgd_configs = 110;
optional BuildStrategy build_strategy = 201;
optional ExecutionStrategy execution_strategy = 202;
}
Expand Down
57 changes: 57 additions & 0 deletions python/paddle/distributed/fleet/base/distributed_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,63 @@ def localsgd_configs(self, configs):
"localsgd_configs")
assign_configs_value(self.strategy.localsgd_configs, configs)

@property
def adaptive_localsgd(self):
"""
Indicating whether we are using Adaptive Local SGD training. Default Value: False
For more details, please refer to `Adaptive Communication Strategies to Achieve
the Best Error-Runtime Trade-off in Local-Update SGD <https://arxiv.org/pdf/1810.08313.pdf>`_.


Examples:
.. code-block:: python

import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
strategy.adaptive_localsgd = True # by default this is false

"""
return self.strategy.localsgd

@adaptive_localsgd.setter
@is_strict_auto
def adaptive_localsgd(self, flag):
if isinstance(flag, bool):
self.strategy.localsgd = flag
else:
print("WARNING: adaptive_localsgd should have value of bool type")

@property
def adaptive_localsgd_configs(self):
"""
Set AdaptiveLocalSGD training configurations. AdaptiveLocalSGD has a configurable
setting that can be configured through a dict.

**Notes**:
init_k_steps(int) The initial steps for training before adaptive localsgd.
Then, the adaptive localsgd method will modify init_k_steps automatically.
Default 1.
begin_step(int) The step of begining training by adaptive localsgd. Default 1.

Examples:
.. code-block:: python

import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
strategy.adaptive_localsgd = True
strategy.adaptive_localsgd_configs = {"init_k_steps": 1,
"begin_step": 30}
"""

return get_msg_dict(self.strategy.adaptive_localsgd_configs)

@adaptive_localsgd_configs.setter
@is_strict_auto
def adaptive_localsgd_configs(self, configs):
check_configs_key(self.strategy.adaptive_localsgd_configs, configs,
"adaptive_localsgd_configs")
assign_configs_value(self.strategy.adaptive_localsgd_configs, configs)

@property
def dgc(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .parameter_server_optimizer import ParameterServerOptimizer
from .pipeline_optimizer import PipelineOptimizer
from .localsgd_optimizer import LocalSGDOptimizer
from .localsgd_optimizer import AdaptiveLocalSGDOptimizer
from .lars_optimizer import LarsOptimizer
from .parameter_server_graph_optimizer import ParameterServerGraphOptimizer
from .dgc_optimizer import DGCOptimizer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, optimizer):
self.meta_optimizers_white_list = [
"LarsOptimizer", "LambOptimizer", "RecomputeOptimizer",
"LocalSGDOptimizer", "GradientMergeOptimizer",
"GraphExecutionOptimizer"
"GraphExecutionOptimizer", "AdaptiveLocalSGDOptimizer"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

新建一行,且用逗号结束,避免未来出现冲突

]
self.meta_optimizers_black_list = ["DGCOptimizer"]

Expand Down
253 changes: 252 additions & 1 deletion python/paddle/distributed/fleet/meta_optimizers/localsgd_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ def __init__(self, optimizer):
super(LocalSGDOptimizer, self).__init__(optimizer)
self.inner_opt = optimizer
self.meta_optimizers_white_list = []
self.meta_optimizers_black_list = ["GraphExecutionOptimizer"]
self.meta_optimizers_black_list = [
"GraphExecutionOptimizer", "AdaptiveLocalSGDOptimizer"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

新建一行,且用逗号结束

]
self.snapshot_key = '@SNAPSHOT'

def _can_apply(self):
Expand Down Expand Up @@ -186,3 +188,252 @@ def begin_localsgd():

layers.cond(step > begin_step, begin_localsgd, communicate)
return minimized


class AdaptiveLocalSGDOptimizer(MetaOptimizerBase):
def __init__(self, optimizer):
super(AdaptiveLocalSGDOptimizer, self).__init__(optimizer)
self.inner_opt = optimizer
self.meta_optimizers_white_list = []
self.meta_optimizers_black_list = [
"GraphExecutionOptimizer", "LocalSGDOptimizer"
]
self.snapshot_key = '@SNAPSHOT'

def _can_apply(self):
if not self.role_maker._is_collective:
return False

if not self.user_defined_strategy.adaptive_localsgd:
return False

if self.role_maker.worker_num() <= 1:
return False

return isinstance(self.inner_opt, paddle.optimizer.momentum.Momentum) \
or isinstance(self.inner_opt, paddle.fluid.optimizer.Momentum) \
or isinstance(self.inner_opt, paddle.optimizer.sgd.SGD) \
or isinstance(self.inner_opt, paddle.fluid.optimizer.SGD)

def _disable_strategy(self, dist_strategy):
dist_strategy.adaptive_localsgd = False
dist_strategy.adaptive_localsgd_configs = {}

def _enable_strategy(self, dist_strategy, context):
dist_strategy.adaptive_localsgd = True
dist_strategy.adaptive_localsgd_configs = {
"init_k_steps": 1,
"begin_step": 1
}

def snapshot_name(self, param_name):
return param_name + self.snapshot_key

def create_snapshot_vars(self, program):
block = program.global_block()

non_dist_params = []
for param in block.iter_parameters():
if not param.is_distributed:
non_dist_params.append(param)

p2s = []
for param in non_dist_params:
snapshot = block.create_var(
name=self.snapshot_name(param.name),
shape=param.shape,
persistable=True,
stop_gradient=True,
dtype=param.dtype)
p2s.append([param, snapshot])
return p2s

def init_snapshot_vars(self, startup_program, param2snapshot):
with program_guard(startup_program):
for param, snapshot in param2snapshot:
layers.assign(param, snapshot)

def _generate_avg_loss(self, program_block, loss, avg_loss):
program_block.append_op(
type='c_allreduce_sum',
inputs={'X': [loss]},
outputs={'Out': [avg_loss]},
attrs={
'ring_id': 0,
OP_ROLE_KEY: OpRole.Optimize,
'use_calc_stream': True
})
program_block.append_op(
type='c_sync_calc_stream',
inputs={'X': [avg_loss]},
outputs={'Out': [avg_loss]},
attrs={OP_ROLE_KEY: OpRole.Optimize})

program_block.append_op(
type='scale',
inputs={'X': [avg_loss]},
outputs={'Out': [avg_loss]},
attrs={
'scale': 1.0 / self.role_maker.worker_num(),
OP_ROLE_KEY: OpRole.Optimize
})

def minimize_impl(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
minimized = self.inner_opt.minimize(
loss, startup_program=startup_program)

init_k_steps = self.user_defined_strategy.adaptive_localsgd_configs[
'init_k_steps']
begin_step_value = self.user_defined_strategy.adaptive_localsgd_configs[
'begin_step']

if startup_program is None:
startup_program = default_startup_program()
main_block = loss.block

self.nrings = 2
collective_helper = CollectiveHelper(self.role_maker, self.nrings)
collective_helper.update_startup_program(startup_program)
p2s = self.create_snapshot_vars(startup_program)
self.init_snapshot_vars(startup_program, p2s)

p2s = self.create_snapshot_vars(main_block.program)
with program_guard(main_block.program, startup_program):
step = layers.autoincreased_step_counter(begin=1)

k_steps = layers.create_global_var(
name="k_steps",
shape=[1],
value=int(init_k_steps),
dtype='int64',
persistable=True)

begin_step = layers.create_global_var(
name="begin_step",
shape=[1],
value=int(begin_step_value),
dtype='int64',
persistable=True)

last_step = layers.create_global_var(
name="last_step",
shape=[1],
value=int(0),
dtype='int64',
persistable=True)

avg_loss = layers.create_global_var(
name="avg_loss",
shape=[1],
value=float(0),
dtype=loss.dtype,
persistable=True)

lr_0 = layers.create_global_var(
name="lr_0",
shape=[1],
value=float(0),
dtype='float32',
persistable=True)

loss_0 = layers.create_global_var(
name="loss_0",
shape=[1],
value=float(0),
dtype='float32',
persistable=True)

global_lr = self.inner_opt._global_learning_rate()

def initialize():
self._generate_avg_loss(main_block, loss, avg_loss)
layers.assign(avg_loss, loss_0)
layers.assign(global_lr, lr_0)

layers.cond(step == 1, initialize)

def communicate():
sub_block = default_main_program().current_block()
ring_id = -1
for param, snapshot in p2s:
sub_block.append_op(
type='elementwise_sub',
inputs={'X': [snapshot],
'Y': [param]},
outputs={'Out': [param]},
attrs={OP_ROLE_KEY: OpRole.Optimize})
sub_block.append_op(
type='c_sync_calc_stream',
inputs={'X': param},
outputs={'Out': param},
attrs={OP_ROLE_KEY: OpRole.Optimize})
ring_id = (ring_id + 1) % self.nrings
sub_block.append_op(
type='c_allreduce_sum',
inputs={'X': [param]},
outputs={'Out': [param]},
attrs={
'ring_id': ring_id,
OP_ROLE_KEY: OpRole.Optimize
})

for ring_id in range(self.nrings):
sub_block.append_op(
type='c_sync_comm_stream',
inputs={'X': param},
outputs={'Out': param},
attrs={
'ring_id': ring_id,
OP_ROLE_KEY: OpRole.Optimize
})

for param, snapshot in p2s:
sub_block.append_op(
type='scale',
inputs={'X': [param]},
outputs={'Out': [param]},
attrs={
'scale': 1.0 / self.role_maker.worker_num(),
OP_ROLE_KEY: OpRole.Optimize
})
sub_block.append_op(
type='elementwise_sub',
inputs={'X': [snapshot],
'Y': [param]},
outputs={'Out': [param]},
attrs={OP_ROLE_KEY: OpRole.Optimize})
sub_block.append_op(
type='assign',
inputs={'X': [param]},
outputs={'Out': [snapshot]},
attrs={OP_ROLE_KEY: OpRole.Optimize})
layers.assign(step, last_step)

def communicate_avg_loss():
communicate()
self._generate_avg_loss(main_block, loss, avg_loss)
next_local_steps = layers.cast(
layers.ceil(
layers.sqrt(lr_0 * avg_loss / (global_lr * loss_0) *
float(init_k_steps))),
dtype='int64')
max_local_steps = layers.fill_constant(
shape=[1], dtype='int64', value=16)
min_local_steps = layers.fill_constant(
shape=[1], dtype='int64', value=1)
next_local_steps = layers.elementwise_min(next_local_steps,
max_local_steps)
next_local_steps = layers.elementwise_max(next_local_steps,
min_local_steps)
layers.assign(next_local_steps, k_steps)

def begin_localsgd():
layers.cond(step - last_step == k_steps, communicate_avg_loss)

layers.cond(step > begin_step, begin_localsgd, communicate)

return minimized
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ def test_localsgd_configs(self):
self.assertEqual(strategy.localsgd_configs["k_steps"], 4)
self.assertEqual(strategy.localsgd_configs["begin_step"], 120)

def test_adaptive_localsgd_configs(self):
strategy = paddle.distributed.fleet.DistributedStrategy()
configs = {"init_k_steps": 1, "begin_step": 120}
strategy.adaptive_localsgd_configs = configs
self.assertEqual(strategy.adaptive_localsgd_configs["init_k_steps"], 1)
self.assertEqual(strategy.adaptive_localsgd_configs["begin_step"], 120)

def test_dgc(self):
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.dgc = True
Expand Down
Loading