Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pp comm overlap use tensor fusion helper #55540

Merged
merged 2 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
else:
from .pp_utils import p2p_communication as p2p

from .pp_utils.utils import HOOK_ACTION, FusedCommBuffer, assign_group_by_size
from paddle.distributed.fleet.utils.tensor_fusion_helper import (
assign_group_by_size,
)

from .pp_utils.utils import HOOK_ACTION, FusedCommBuffer

__all__ = []

Expand Down
72 changes: 7 additions & 65 deletions python/paddle/distributed/fleet/meta_parallel/pp_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import OrderedDict

import numpy as np

import paddle
from paddle import _legacy_C_ops
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_storage import (
GradStorage,
from paddle.distributed.fleet.utils.tensor_fusion_helper import (
flatten_dense_tensors,
)
from paddle.fluid import core
from paddle.framework import base as imperative_base

alignment = {
"gpu": 256,
}
align = {
paddle.float16.value: 2,
paddle.bfloat16.value: 2,
paddle.float32.value: 4,
}

__all__ = []


Expand Down Expand Up @@ -131,35 +118,6 @@ def _all_gather(tensor, group=None, use_calc_stream=True):
)


def flatten_dense_tensors(parameters, use_main_grad=False):
_buffer_size = 0
_param2align = {}
dtype = paddle.float32 if use_main_grad else parameters[0].dtype

for param in parameters:
assert param.trainable, "param must be trainable..."
size = np.prod(param.shape) * align[dtype]
remaining = size % alignment["gpu"]
ali = 0 if remaining == 0 else alignment["gpu"] - remaining
align_ = ali // align[dtype]
_buffer_size += np.prod(param.shape) + align_
_param2align[param.name] = align_

# process gradient
grad_storage = GradStorage(
size=_buffer_size,
dtype=dtype,
device="gpu",
destination="0",
parm2align=_param2align,
)

for param in parameters:
grad_storage.add_grad(param, _param2align[param.name])

return grad_storage.buffer


class FusedCommBuffer:
def __init__(self, id, params, comm_group, acc_steps=1, act=None, dst=-1):
self._id = id
Expand Down Expand Up @@ -188,8 +146,11 @@ def __init__(self, id, params, comm_group, acc_steps=1, act=None, dst=-1):
self._init_step_dict()

self.grad_storage = flatten_dense_tensors(
self._params, self.use_main_grad
)
self._params,
use_main_grad=self.use_main_grad,
fuse_param=False,
warp_buffer=False,
).buffer

self._record_addr()

Expand Down Expand Up @@ -272,22 +233,3 @@ def scale_and_split_grads(self):
self.grad_storage.scale_(scale_factor)

self._reset_params_checked_in()


def assign_group_by_size(parameters, group_size=128 * 1024 * 1024):
group_idx = 0
memory_counter = 0
var_groups = OrderedDict()
dtype = parameters[0].dtype

for var in parameters:
bytes = np.prod(var.shape) * core.size_of_dtype(var.dtype)
if memory_counter < group_size and dtype == var.dtype:
memory_counter += bytes
else:
memory_counter = bytes
dtype = var.dtype
group_idx += 1
var_groups.setdefault(group_idx, []).append(var)

return var_groups
43 changes: 27 additions & 16 deletions python/paddle/distributed/fleet/utils/tensor_fusion_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
}


def assign_group_by_size(parameters, group_size=256 * 1024 * 1024):
# TODO(Yuang Liu): make pp_utils/utils use this tensor fusion helper
def assign_group_by_size(parameters, group_size=128 * 1024 * 1024):
is_sparse_gradient = [False] * len(parameters)

group_indices = core.eager_assign_group_by_size(
Expand All @@ -45,7 +44,9 @@ def assign_group_by_size(parameters, group_size=256 * 1024 * 1024):
return var_groups


def flatten_dense_tensors(parameters, use_main_grad):
def flatten_dense_tensors(
parameters, use_main_grad=False, fuse_param=True, warp_buffer=False
):
from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_storage import (
GradStorage,
ParamStorage,
Expand All @@ -64,9 +65,11 @@ def flatten_dense_tensors(parameters, use_main_grad):
_buffer_size += np.prod(param.shape) + align_
_param2align[param.name] = align_

param_storage = ParamStorage(size=_buffer_size, dtype=dtype, device="gpu")

param_storage.add_rank_params(parameters, _param2align)
if fuse_param:
param_storage = ParamStorage(
size=_buffer_size, dtype=dtype, device="gpu"
)
param_storage.add_rank_params(parameters, _param2align)

# process gradient
grad_dtype = paddle.float32 if use_main_grad else dtype
Expand All @@ -81,27 +84,35 @@ def flatten_dense_tensors(parameters, use_main_grad):
for param in parameters:
grad_storage.add_grad(param, _param2align[param.name])

param_storage.warp_buffer()
grad_storage.warp_buffer()
if warp_buffer:
if fuse_param:
param_storage.warp_buffer()
grad_storage.warp_buffer()

if not use_main_grad:
# param_storage --> grad_storage
param_storage.buffer._copy_gradient_from(grad_storage.buffer)
if fuse_param:
if not use_main_grad:
# param_storage --> grad_storage
param_storage.buffer._copy_gradient_from(grad_storage.buffer)
else:
param_storage.buffer.main_grad = grad_storage.buffer
param_storage.buffer.stop_gradient = False
return param_storage, grad_storage
else:
param_storage.buffer.main_grad = grad_storage.buffer
param_storage.buffer.stop_gradient = False
return param_storage, grad_storage
return grad_storage


def obtain_storage(parameters, use_main_grad, clip, dist):
if len(parameters) < 1:
return []

var_groups = assign_group_by_size(parameters)
var_groups = assign_group_by_size(parameters, group_size=256 * 1024 * 1024)
storage = []
for group_idx, parameters in var_groups.items():
param_storage, grad_storage = flatten_dense_tensors(
parameters, use_main_grad
parameters,
use_main_grad=use_main_grad,
fuse_param=True,
warp_buffer=True,
)
param_storage.buffer.need_clip = clip
param_storage.buffer.is_distributed = dist
Expand Down