Skip to content

Commit

Permalink
Revert "Inductor support for aten::all_reduce (pytorch#93111)"
Browse files Browse the repository at this point in the history
This reverts commit a8cbf70.
  • Loading branch information
pruthvistony committed May 2, 2023
1 parent fff065c commit 3b7b0b4
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 404 deletions.
2 changes: 1 addition & 1 deletion .ci/pytorch/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ test_dynamo_shard() {
test_inductor_distributed() {
# this runs on both single-gpu and multi-gpu instance. It should be smart about skipping tests that aren't supported
# with if required # gpus aren't available
PYTORCH_TEST_WITH_INDUCTOR=0 python test/run_test.py --include distributed/test_dynamo_distributed distributed/test_traceable_collectives --verbose
PYTORCH_TEST_WITH_INDUCTOR=0 python test/run_test.py --include distributed/test_dynamo_distributed --verbose
assert_git_not_dirty
}

Expand Down
1 change: 0 additions & 1 deletion .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
- torch/_subclasses/fake_utils.py
- torch/_subclasses/meta_utils.py
- test/distributed/test_dynamo_distributed.py
- test/distributed/test_traceable_collectives.py
- functorch/_src/partitioners.py
- functorch/_src/aot_autograd.py

Expand Down
236 changes: 0 additions & 236 deletions test/distributed/test_traceable_collectives.py

This file was deleted.

108 changes: 2 additions & 106 deletions torch/_inductor/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
Tensors backed by views add one more indirection to the IR.
TensorBox -> View -> StorageBox -> Buffer
In these cases, the underlying StorageBox/Buffer will be shared with the pre-view TensorBox.
For metadata mutation (e.g. as_strided_) we swing the TensorBox pointer.
"""


Expand Down Expand Up @@ -4200,109 +4202,3 @@ def debug_str(self, name="block"):
"",
code.strip().replace("def forward(", f"def {name}("),
)


class Wait(ExternKernel):
"""
Wait should not be used by itself. It should always be constructed in tandem
with a collective op that produces a work to wait on.
"""

def __init__(
self,
layout,
inputs,
constant_args=(),
):
super().__init__(None, layout, inputs, constant_args)
self.name = V.graph.register_buffer(self)

def should_allocate(self):
return False

def codegen(self, wrapper):
(input_collective,) = [t.codegen_reference() for t in self.inputs]
work = f"{input_collective}_work" # hacky way to name work objs..
wrapper.writeline(f"{work}.wait()")

# wait op still needs to produce a 'buffer' that represents the tensor output.
# this is a symbolic gesture, and it gets handled by WrapperCodegen.
# codegen outputs a '# reuse' line that assigns the input buffer here ('input_collective')
# to a new name (`self.get_name()`) and `del`s the old name.
wrapper.writeline(f"{self.get_name()} = {input_collective}")

@classmethod
def create(cls, collective_op: "TensorBox"):
return Wait(
layout=collective_op.get_layout(),
inputs=[collective_op],
)

def get_alias_names(self):
# Signal to codegen that our output buffer isn't safe to reuse
return [self.inputs[0].codegen_reference()]


class AllReduce(ExternKernel):
def __init__(
self,
layout,
inputs,
constant_args=(),
):
super().__init__(None, layout, inputs, constant_args)
self.name = V.graph.register_buffer(self)

def should_allocate(self):
return True

@classmethod
def create(
cls, x: "TensorBox", reduce_op: str, tag: str, ranks: List[int], group_size: int
):
x = cls.realize_input(x)

# is there a difference between literally using x.data.layout below, vs
# creating a new one that has the same properties?
new_layout = FlexibleLayout(x.get_device(), x.get_dtype(), x.get_size())

# AllReduce returns a 'work' object. But Inductor's scheduler doesn't need to know
# about that, and we just pretend for scheduling purposes that the work obj is a 1-elem tensor.
# Nobody should consume the output of AllReduce except 'Wait', which we control here.
return AllReduce(
layout=new_layout,
inputs=[x],
constant_args=[reduce_op, tag, ranks, group_size],
)

def codegen(self, wrapper):
wrapper.add_import_once("import torch.distributed as dist")
wrapper.add_import_once(
"from torch.distributed._functional_collectives import _str_to_reduce_op"
)
wrapper.add_import_once(
"from torch.distributed.distributed_c10d import _find_or_create_pg_by_ranks_and_tag"
)

# extract references to our args in string form for codegen output
(input_name,) = [t.codegen_reference() for t in self.inputs]
output_name = self.get_name()
reduce_op, tag, ranks, group_size = self.constant_args

# TODO: avoid more than one ref of the same pg (even though they are cached inside the api)
wrapper.writeline(
f"{output_name}_pg = _find_or_create_pg_by_ranks_and_tag('{tag}', {ranks}, {group_size})"
)

# We must copy our input buffer sometimes, but the scheduler will help us find opportunities
# to reuse the input buffer. (This requires no other users of the input buffer.)
if not wrapper.did_reuse(self, self.inputs[0]):
wrapper.writeline(f"{output_name}.copy_({input_name})")

# At this point, output_name points to a buffer that is either
# (1) the input buffer, which we're allowed to inplace modify
# (2) a freshly allocated buffer, which we've copied the input into above
wrapper.writeline(
f"{output_name}_work = dist.all_reduce({output_name}, async_op=True,"
f" group={output_name}_pg, op=_str_to_reduce_op('{str(reduce_op)}'))"
)

0 comments on commit 3b7b0b4

Please sign in to comment.