Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【paddle.fleet】add auto parallel L1 implementations #27090

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions python/paddle/distributed/fleet/base/distributed_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,25 @@
import paddle
from paddle.distributed.fleet.proto import distributed_strategy_pb2
from paddle.fluid.framework import Variable, set_flags, core
from paddle.fluid.wrapped_decorator import wrap_decorator
import google.protobuf.text_format

__all__ = ["DistributedStrategy"]

non_auto_func_called = True


def __non_auto_func_called__(func):
def __impl__(*args, **kwargs):
global non_auto_func_called
non_auto_func_called = False
return func(*args, **kwargs)

return __impl__


is_strict_auto = wrap_decorator(__non_auto_func_called__)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This keyword is a little hard to be understood. Could we name it 'reset_auto_flag' as what it exactly does?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just a check, whether a user is using strictly auto configuration. Here is a strictly auto configuration.

strategy = DistributedStrategy()
strategy.auto = True

Here is a case that is not a strict auto configuration.

strategy = DistributedStrategy()
strategy.amp = True
strategy.auto = True



def get_msg_dict(msg):
res_dict = {}
Expand Down Expand Up @@ -164,6 +179,7 @@ def execution_strategy(self):
return execution_strategy

@execution_strategy.setter
@is_strict_auto
def execution_strategy(self, strategy):
fields = self.strategy.execution_strategy.DESCRIPTOR.fields
for f in fields:
Expand Down Expand Up @@ -203,6 +219,7 @@ def build_strategy(self):
return build_strategy

@build_strategy.setter
@is_strict_auto
def build_strategy(self, strategy):
fields = self.strategy.build_strategy.DESCRIPTOR.fields
for f in fields:
Expand Down Expand Up @@ -237,6 +254,7 @@ def a_sync(self):
return self.strategy.a_sync

@a_sync.setter
@is_strict_auto
def a_sync(self, flag):
if isinstance(flag, bool):
self.strategy.a_sync = flag
Expand Down Expand Up @@ -287,6 +305,7 @@ def a_sync_configs(self):
return get_msg_dict(self.strategy.a_sync_configs)

@a_sync_configs.setter
@is_strict_auto
def a_sync_configs(self, configs):
check_configs_key(self.strategy.a_sync_configs, configs,
"a_sync_configs")
Expand All @@ -309,6 +328,7 @@ def amp(self):
return self.strategy.amp

@amp.setter
@is_strict_auto
def amp(self, flag):
if isinstance(flag, bool):
self.strategy.amp = flag
Expand Down Expand Up @@ -351,6 +371,7 @@ def amp_configs(self):
return get_msg_dict(self.strategy.amp_configs)

@amp_configs.setter
@is_strict_auto
def amp_configs(self, configs):
check_configs_key(self.strategy.amp_configs, configs, "amp_configs")
assign_configs_value(self.strategy.amp_configs, configs)
Expand Down Expand Up @@ -388,6 +409,7 @@ def sync_nccl_allreduce(self):
return self.strategy.sync_nccl_allreduce

@sync_nccl_allreduce.setter
@is_strict_auto
def sync_nccl_allreduce(self, flag):
if isinstance(flag, bool):
self.strategy.sync_nccl_allreduce = flag
Expand All @@ -411,6 +433,7 @@ def use_hierarchical_allreduce(self):
return self.strategy.use_hierarchical_allreduce

@use_hierarchical_allreduce.setter
@is_strict_auto
def use_hierarchical_allreduce(self, flag):
if isinstance(flag, bool):
self.strategy.use_hierarchical_allreduce = flag
Expand All @@ -435,6 +458,7 @@ def hierarchical_allreduce_inter_nranks(self):
return self.strategy.hierarchical_allreduce_inter_nranks

@hierarchical_allreduce_inter_nranks.setter
@is_strict_auto
def hierarchical_allreduce_inter_nranks(self, value):
if isinstance(value, int):
self.strategy.hierarchical_allreduce_inter_nranks = value
Expand All @@ -461,6 +485,7 @@ def sync_batch_norm(self):
return self.strategy.sync_batch_norm

@sync_batch_norm.setter
@is_strict_auto
def sync_batch_norm(self, flag):
if isinstance(flag, bool):
self.strategy.sync_batch_norm = flag
Expand All @@ -483,6 +508,7 @@ def fuse_all_reduce_ops(self):
return self.strategy.fuse_all_reduce_ops

@fuse_all_reduce_ops.setter
@is_strict_auto
def fuse_all_reduce_ops(self, flag):
if isinstance(flag, bool):
self.strategy.fuse_all_reduce_ops = flag
Expand All @@ -506,6 +532,7 @@ def fuse_grad_size_in_MB(self):
return self.strategy.fuse_grad_size_in_MB

@fuse_grad_size_in_MB.setter
@is_strict_auto
def fuse_grad_size_in_MB(self, value):
if isinstance(value, int):
self.strategy.fuse_grad_size_in_MB = value
Expand All @@ -517,6 +544,7 @@ def _fuse_grad_size_in_TFLOPS(self):
return self.strategy.fuse_grad_size_in_TFLOPS

@_fuse_grad_size_in_TFLOPS.setter
@is_strict_auto
def _fuse_grad_size_in_TFLOPS(self, value):
if isinstance(value, float):
self.strategy.fuse_grad_size_in_TFLOPS = value
Expand All @@ -543,13 +571,15 @@ def nccl_comm_num(self):
return self.strategy.nccl_comm_num

@nccl_comm_num.setter
@is_strict_auto
def nccl_comm_num(self, value):
if isinstance(value, int):
self.strategy.nccl_comm_num = value
else:
print("WARNING: nccl_comm_num should have value of int type")

@recompute.setter
@is_strict_auto
def recompute(self, flag):
if isinstance(flag, bool):
self.strategy.recompute = flag
Expand All @@ -574,6 +604,7 @@ def recompute_configs(self):
return get_msg_dict(self.strategy.recompute_configs)

@recompute_configs.setter
@is_strict_auto
def recompute_configs(self, configs):
check_configs_key(self.strategy.recompute_configs, configs,
"checkpoint_configs")
Expand All @@ -598,6 +629,7 @@ def pipeline(self):
return self.strategy.pipeline

@pipeline.setter
@is_strict_auto
def pipeline(self, flag):
if isinstance(flag, bool):
self.strategy.pipeline = flag
Expand Down Expand Up @@ -634,6 +666,7 @@ def pipeline_configs(self):
return get_msg_dict(self.strategy.pipeline_configs)

@pipeline_configs.setter
@is_strict_auto
def pipeline_configs(self, configs):
check_configs_key(self.strategy.pipeline_configs, configs,
"pipeline_configs")
Expand All @@ -658,6 +691,7 @@ def localsgd(self):
return self.strategy.localsgd

@localsgd.setter
@is_strict_auto
def localsgd(self, flag):
if isinstance(flag, bool):
self.strategy.localsgd = flag
Expand Down Expand Up @@ -690,6 +724,7 @@ def localsgd_configs(self):
return get_msg_dict(self.strategy.localsgd_configs)

@localsgd_configs.setter
@is_strict_auto
def localsgd_configs(self, configs):
check_configs_key(self.strategy.localsgd_configs, configs,
"localsgd_configs")
Expand All @@ -714,6 +749,7 @@ def dgc(self):
return self.strategy.dgc

@dgc.setter
@is_strict_auto
def dgc(self, flag):
if isinstance(flag, bool):
self.strategy.dgc = flag
Expand Down Expand Up @@ -749,6 +785,7 @@ def dgc_configs(self):
return get_msg_dict(self.strategy.dgc_configs)

@dgc_configs.setter
@is_strict_auto
def dgc_configs(self, configs):
check_configs_key(self.strategy.dgc_configs, configs, "dgc_configs")
assign_configs_value(self.strategy.dgc_configs, configs)
Expand Down Expand Up @@ -776,6 +813,7 @@ def gradient_merge(self):
return self.strategy.gradient_merge

@gradient_merge.setter
@is_strict_auto
def gradient_merge(self, flag):
if isinstance(flag, bool):
self.strategy.gradient_merge = flag
Expand Down Expand Up @@ -803,6 +841,7 @@ def gradient_merge_configs(self):
return get_msg_dict(self.strategy.gradient_merge_configs)

@gradient_merge_configs.setter
@is_strict_auto
def gradient_merge_configs(self, configs):
check_configs_key(self.strategy.gradient_merge_configs, configs,
"gradient_configs")
Expand All @@ -827,6 +866,7 @@ def lars(self):
return self.strategy.lars

@lars.setter
@is_strict_auto
def lars(self, flag):
if isinstance(flag, bool):
self.strategy.lars = flag
Expand Down Expand Up @@ -862,6 +902,7 @@ def lars_configs(self):
return get_msg_dict(self.strategy.lars_configs)

@lars_configs.setter
@is_strict_auto
def lars_configs(self, configs):
check_configs_key(self.strategy.lars_configs, configs, "lars_configs")
assign_configs_value(self.strategy.lars_configs, configs)
Expand All @@ -887,6 +928,7 @@ def lamb(self):
return self.strategy.lamb

@lamb.setter
@is_strict_auto
def lamb(self, flag):
if isinstance(flag, bool):
self.strategy.lamb = flag
Expand Down Expand Up @@ -917,15 +959,21 @@ def lamb_configs(self):
return get_msg_dict(self.strategy.lamb_configs)

@lamb_configs.setter
@is_strict_auto
def lamb_configs(self, configs):
check_configs_key(self.strategy.lamb_configs, configs, "lamb_configs")
assign_configs_value(self.strategy.lamb_configs, configs)

@property
def elastic(self):
"""
Indicating whether we want to do current distributed training on clusters with elastic resources.
Currently, this is configuration is not valid.
"""
return self.strategy.elastic

@elastic.setter
@is_strict_auto
def elastic(self, flag):
if isinstance(flag, bool):
self.strategy.elastic = flag
Expand All @@ -934,6 +982,25 @@ def elastic(self, flag):

@property
def auto(self):
"""
Indicating whether we are using auto-parallel configuration
This feature is currently an experimental feature. Currently,
auto-parallelism can be used only when a user does not set any other
strategy configs except auto. For details, please reference the following
code example
Default Value: False

Examples:
.. code-block:: python

import paddle
import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
strategy.auto = True

optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
"""
return self.strategy.auto

@auto.setter
Expand All @@ -945,9 +1012,27 @@ def auto(self, flag):

@property
def cudnn_exhaustive_search(self):
"""
Indicating whether to use exhaustive search method to choose convolution algorithms.
Exhaustive search attempts all cuDNN algorithms to choose the fastest algorithm.
This method is time-consuming, the choosed algorithm will be cached for the given layer specifications.
Once the layer specifications (like batch size, feature map size) are changed, it will search again.
Default Value: True

Examples:
.. code-block:: python

import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
strategy.cudnn_exhaustive_search = False

optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
"""
return self.strategy.cudnn_exhaustive_search

@cudnn_exhaustive_search.setter
@is_strict_auto
def cudnn_exhaustive_search(self, flag):
if isinstance(flag, bool):
self.strategy.cudnn_exhaustive_search = flag
Expand All @@ -958,9 +1043,28 @@ def cudnn_exhaustive_search(self, flag):

@property
def conv_workspace_size_limit(self):
"""
The workspace limit size in MB unit for choosing cuDNN convolution algorithms.
The inner funciton of cuDNN obtain the fastest suited algorithm that fits within this memory limit.
Usually, large workspace size may lead to choose faster algorithms,
but significant increasing memory workspace. Users need to trade-off between memory and speed.
Default Value: 4000

Examples:
.. code-block:: python

import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
strategy.conv_workspace_size_limit = 1024

optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy)

"""
return self.strategy.conv_workspace_size_limit

@conv_workspace_size_limit.setter
@is_strict_auto
def conv_workspace_size_limit(self, value):
if isinstance(value, int):
self.strategy.conv_workspace_size_limit = value
Expand All @@ -971,9 +1075,26 @@ def conv_workspace_size_limit(self, value):

@property
def cudnn_batchnorm_spatial_persistent(self):
"""
Indicates whether to use the mode CUDNN_BATCHNORM_SPATIAL_PERSISTENT function in batchnorm.
This is only useful in cudnn.
Default Value: True

Examples:
.. code-block:: python

import paddle.distributed.fleet as fleet
strategy = fleet.DistributedStrategy()
strategy.cudnn_batchnorm_spatial_persistent = True

optimizer = paddle.optimizer.SGD(learning_rate=0.01)
optimizer = fleet.distributed_optimizer(optimizer, strategy)

"""
return self.strategy.cudnn_batchnorm_spatial_persistent

@cudnn_batchnorm_spatial_persistent.setter
@is_strict_auto
def cudnn_batchnorm_spatial_persistent(self, flag):
if isinstance(flag, bool):
self.strategy.cudnn_batchnorm_spatial_persistent = flag
Expand Down Expand Up @@ -1005,6 +1126,12 @@ def _enable_env(self):
if core.globals().is_public(key):
core.globals()[key] = values[i]

def _is_strict_auto(self):
global non_auto_func_called
if self.strategy.auto and non_auto_func_called:
return True
return False

def __repr__(self):
fields = self.strategy.DESCRIPTOR.fields
for f in fields:
Expand Down
Loading