Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1173,17 +1173,17 @@ if(BUILD_TEST)
list(APPEND MULTIDEVICE_TEST_SRCS
${NVFUSER_ROOT}/tests/cpp/multidevice.cpp
${NVFUSER_ROOT}/tests/cpp/multidevice_transformer.cpp
${NVFUSER_ROOT}/tests/cpp/test_multidevice_host_ir_overlap.cpp
${NVFUSER_ROOT}/tests/cpp/test_multidevice_communications.cpp
${NVFUSER_ROOT}/tests/cpp/test_multidevice_communicator.cpp
${NVFUSER_ROOT}/tests/cpp/test_multidevice_host_ir.cpp
${NVFUSER_ROOT}/tests/cpp/test_multidevice_host_ir_overlap.cpp
${NVFUSER_ROOT}/tests/cpp/test_multidevice_ipc.cpp
${NVFUSER_ROOT}/tests/cpp/test_multidevice_lower_communication.cpp
${NVFUSER_ROOT}/tests/cpp/test_multidevice_matmul.cpp
${NVFUSER_ROOT}/tests/cpp/test_multidevice_pipeline.cpp
${NVFUSER_ROOT}/tests/cpp/test_multidevice_sharding.cpp
${NVFUSER_ROOT}/tests/cpp/test_multidevice_stream_parallel_type.cpp
${NVFUSER_ROOT}/tests/cpp/test_multidevice_transformer.cpp
${NVFUSER_ROOT}/tests/cpp/test_multidevice_ipc.cpp
)
add_test_without_main(test_multidevice "${MULTIDEVICE_TEST_SRCS}" "")
list(APPEND TEST_BINARIES test_multidevice)
Expand Down
20 changes: 10 additions & 10 deletions csrc/host_ir/evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,24 +482,24 @@ void HostIrEvaluator::handle(MatmulOp* matmul) {
TensorView* b = matmul->inB();
TensorView* out = matmul->out();

if (expr_evaluator_.isKnown(out)) {
auto t_a = getKnownConcreteValue(a).as<at::Tensor>();
auto t_b = getKnownConcreteValue(b).as<at::Tensor>();
auto t_out = getKnownConcreteValue(out).as<at::Tensor>();
at::matmul_out(t_out, t_a, t_b);
} else {
if (!matmul->outputIsPreallocated()) {
unhandled(matmul);
return;
}

auto t_a = getKnownConcreteValue(a).as<at::Tensor>();
auto t_b = getKnownConcreteValue(b).as<at::Tensor>();
auto t_out = getKnownConcreteValue(out).as<at::Tensor>();
at::matmul_out(t_out, t_a, t_b);
}

void HostIrEvaluator::handle(LinearOp* linear) {
auto* in = linear->inA()->as<TensorView>();
auto* weight = linear->inB()->as<TensorView>();
auto* out = linear->out()->as<TensorView>();

if (!expr_evaluator_.isKnown(out)) {
unhandled(linear);
return;
if (!linear->outputIsPreallocated()) {
return unhandled(linear);
}

auto in_tensor = getKnownConcreteValue(in).as<at::Tensor>();
Expand Down Expand Up @@ -753,7 +753,7 @@ void HostIrEvaluator::handle(ShardByStream* shard) {
IterDomain* stream_id = *i;

auto in_tensor = getKnownConcreteValue(shard->in()).as<at::Tensor>();
int64_t stream_index =
auto stream_index =
expr_evaluator_.evaluate(shard->stream_index()).as<int64_t>();
at::Tensor out_tensor =
in_tensor
Expand Down
109 changes: 78 additions & 31 deletions csrc/host_ir/lowering.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@ namespace nvfuser {
namespace {

struct LoopInfo {
hir::ForLoop* loop;
hir::ForLoop* loop = nullptr;

// The Scope that owns `loop`. It's one level outer than `loop`'s body scope.
Scope* parent_scope;
Scope* parent_scope = nullptr;

// The iterator that points to `loop`. This way, we can insert instructions,
// e.g. Allocate, right before the loop.
Scope::Iterator parent_insertion_point;
};

std::ostream& operator<<(std::ostream& os, const LoopInfo& loop_info) {
os << loop_info.loop->toInlineString();
if (loop_info.loop == nullptr) {
os << "<null>";
} else {
os << loop_info.loop->toInlineString();
}
return os;
}

Expand Down Expand Up @@ -114,7 +118,10 @@ const std::vector<IterDomain*>& findReferenceLoopDomain(
// Expr.
Expr* cloneWithNewOperands(
Expr* e,
const std::unordered_map<Val*, Val*>& replacement_map) {
const std::unordered_map<Val*, Val*>& replacement_map,
bool output_is_preallocated) {
NVF_ERROR(!e->outputIsPreallocated());

auto maybe_replace = [&](Val*& x) -> bool {
Val* new_x = getOrDefault(replacement_map, x);
if (new_x == nullptr) {
Expand All @@ -132,11 +139,16 @@ Expr* cloneWithNewOperands(
std::vector<Val*> new_outs = e->outputs();
replaced += std::ranges::count_if(new_outs, maybe_replace);

if (replaced == 0) {
if (replaced == 0 && !output_is_preallocated) {
return e;
}

return e->newObjectFunc()(e->container(), new_ins, new_outs, e->attributes());
Expr* new_e =
e->newObjectFunc()(e->container(), new_ins, new_outs, e->attributes());
if (output_is_preallocated) {
new_e = new_e->withOutputPreallocated();
}
return new_e;
}

void lowerSegment(
Expand All @@ -146,6 +158,14 @@ void lowerSegment(
hir::HostIrContainer& hic,
LoopNest& loop_nest,
IrCloner& ir_cloner) {
Scope& innermost_scope = loop_nest.innermostScope();
// FIXME: cleanup. innermost can return an empty LoopInfo when the nest is
// empty.
LoopInfo innermost;
if (!loop_nest.empty()) {
innermost = loop_nest.innermost();
}

switch (group.schedulerType()) {
case SchedulerType::Communication: {
auto device_id = Communicator::getInstance().deviceId();
Expand All @@ -157,24 +177,50 @@ void lowerSegment(
// without cloning the value again.
Expr* e = ir_cloner.clone(group.exprs().front());

for (auto* c : convertSingleOpToCommunication(e, device_id)) {
// FIXME: should this be associated with the scope?
std::unordered_map<Val*, Val*> replacement_map;
for (Expr* c : convertSingleOpToCommunication(e, device_id)) {
NVF_ERROR(
c->isA<Communication>(),
"Exprs in a Communication group should be Communication: ",
c);
// Allocate the recv buffers of communications
auto* communication = c->as<Communication>();
TensorView* tv = communication->out();
if (tv->getDeviceMesh().has(device_id)) {
auto* allocate =
IrBuilder::create<kir::Allocate>(tv, MemoryType::Global);
// TODO: allocation may have to go to the top level. See how
// SchedulerType::ExprEval handles allocations.
loop_nest.innermostScope().push_back(allocate);
TensorView* in = communication->in();
TensorView* out = communication->out();
if (getShardedIterDomain(in, ParallelType::Stream, DomainType::kLoop) !=
nullptr &&
getShardedIterDomain(
in, ParallelType::Stream, DomainType::kAllocation) == nullptr) {
auto [i, inserted] = replacement_map.try_emplace(
in, hir::shardByStream(in, innermost.loop->index()));
if (inserted) {
innermost_scope.push_back(i->second->definition());
}
}
loop_nest.innermostScope().push_back(communication);
auto wait = IrBuilder::create<hir::Wait>(communication);
loop_nest.innermostScope().push_back(wait);

// Allocate the recv buffers of communications
auto* allocate =
IrBuilder::create<kir::Allocate>(out, MemoryType::Global);
if (getShardedIterDomain(
out, ParallelType::Stream, DomainType::kLoop) != nullptr &&
getShardedIterDomain(
out, ParallelType::Stream, DomainType::kAllocation) ==
nullptr) {
innermost.parent_scope->insert(
innermost.parent_insertion_point, allocate);
auto [i, inserted] = replacement_map.try_emplace(
out, hir::shardByStream(out, innermost.loop->index()));
NVF_ERROR(inserted);
innermost_scope.push_back(i->second->definition());
} else {
innermost_scope.push_back(allocate);
}

Expr* new_c = cloneWithNewOperands(c, replacement_map, true);
innermost_scope.push_back(new_c);

auto* wait = IrBuilder::create<hir::Wait>(new_c);
innermost_scope.push_back(wait);
}
break;
}
Expand Down Expand Up @@ -206,14 +252,11 @@ void lowerSegment(
// TensorViews.
if (loop_nest.empty()) {
for (Expr* e : exprs) {
loop_nest.innermostScope().push_back(e);
innermost_scope.push_back(e);
}
break;
}

auto [for_loop, parent_scope, parent_insertion_point] =
loop_nest.innermost();

std::unordered_map<Val*, Val*> replacement_map;
for (Expr* e : exprs) {
for (auto* in : ir_utils::filterByType<TensorView>(e->inputs())) {
Expand All @@ -223,34 +266,38 @@ void lowerSegment(
in, ParallelType::Stream, DomainType::kAllocation) ==
nullptr) {
auto [i, inserted] = replacement_map.try_emplace(
in, hir::shardByStream(in, for_loop->index()));
in, hir::shardByStream(in, innermost.loop->index()));
if (inserted) {
for_loop->body().push_back(i->second->definition());
innermost_scope.push_back(i->second->definition());
}
}
}

bool output_is_preallocated = false;
for (auto* out : ir_utils::filterByType<TensorView>(e->outputs())) {
if (getShardedIterDomain(
out, ParallelType::Stream, DomainType::kAllocation) ==
nullptr) {
auto* allocate =
IrBuilder::create<kir::Allocate>(out, MemoryType::Global);
parent_scope->insert(parent_insertion_point, allocate);
output_is_preallocated = true;
innermost.parent_scope->insert(
innermost.parent_insertion_point, allocate);
// Loop is stream parallelized but allocation is not. Therefore,
// `out` should be allocated outside the loop.
//
// I use try_emplace here so shardByStream is called only when `out`
// is missing.
auto [i, inserted] = replacement_map.try_emplace(
out, hir::shardByStream(out, for_loop->index()));
out, hir::shardByStream(out, innermost.loop->index()));
NVF_ERROR(inserted);
for_loop->body().push_back(i->second->definition());
innermost_scope.push_back(i->second->definition());
}
}

Expr* new_e = cloneWithNewOperands(e, replacement_map);
for_loop->body().push_back(new_e);
Expr* new_e =
cloneWithNewOperands(e, replacement_map, output_is_preallocated);
innermost_scope.push_back(new_e);
}
break;
}
Expand All @@ -275,7 +322,7 @@ void lowerSegment(
auto* tv = out->as<TensorView>();
auto* allocate =
IrBuilder::create<kir::Allocate>(tv, MemoryType::Global);
loop_nest.innermostScope().push_back(allocate);
innermost_scope.push_back(allocate);
}

// Add the LaunchKernel instruction.
Expand All @@ -291,7 +338,7 @@ void lowerSegment(
ins,
outs,
cache_id);
loop_nest.innermostScope().push_back(launch_kernel);
innermost_scope.push_back(launch_kernel);
}
} // switch
} // lowerSegment
Expand Down
1 change: 1 addition & 0 deletions csrc/host_ir/pass/stream_parallel_type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ std::list<Expr*> processForLoopBodies(
ir_utils::filterByType<TensorView>(body_expr->outputs())) {
processTensor(body_expr, output, tensor_index);
}
body_expr = body_expr->withOutputPreallocated();
new_loop_body.push_back(body_expr);
}
}
Expand Down
9 changes: 8 additions & 1 deletion csrc/ir/base_nodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ std::optional<DataType> Val::getDataType() const {
// after inputs and outputs are registered with the Expr
Expr::Expr(IrBuilderPasskey passkey) : Statement(passkey) {}

// FIXME: Should this constructor copy the output_is_preallocated_ flag?
Expr::Expr(const Expr* src, IrCloner* ir_cloner)
: Statement(src, ir_cloner),
attributes_(ir_cloner->clone(src->attributes_)),
Expand All @@ -270,12 +271,13 @@ Expr::Expr(
outputs_(std::move(outputs)) {}

Expr* Expr::shallowCopy() const {
auto result =
Expr* result =
newObjectFunc()(ir_container_, inputs(), outputs(), attributes());
if (container()->isA<kir::Kernel>()) {
result->predicate_ = predicate_;
result->write_predicate_ = write_predicate_;
}
result->output_is_preallocated_ = output_is_preallocated_;
return result;
}

Expand Down Expand Up @@ -383,6 +385,11 @@ Expr* Expr::withWritePredicate(kir::Predicate* predicate) {
return result;
}

Expr* Expr::withOutputPreallocated() {
output_is_preallocated_ = true;
return this;
}

std::vector<PolymorphicValue> Expr::evaluate(
const ExpressionEvaluator& ee,
const std::vector<PolymorphicValue>& inputs) const {
Expand Down
8 changes: 8 additions & 0 deletions csrc/ir/base_nodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,12 @@ class NVF_API Expr : public Statement {
// TODO: Protect based on being in kernel container
Expr* withWritePredicate(kir::Predicate* write_predicate);

bool outputIsPreallocated() const {
return output_is_preallocated_;
}

Expr* withOutputPreallocated();

// Get the name of an expression
virtual const char* getOpString() const = 0;

Expand Down Expand Up @@ -660,6 +666,8 @@ class NVF_API Expr : public Statement {

// Only used for reduction-related expressions
kir::Predicate* write_predicate_ = nullptr;

bool output_is_preallocated_ = false;
};

template <typename T>
Expand Down
6 changes: 3 additions & 3 deletions csrc/multidevice/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ class NVF_API Communicator {
}

// returns the number of processes in the communicator
auto size() const {
int64_t size() const {
return size_;
}

// returns the local number of processes in the communicator (within the node)
auto local_size() const {
int64_t local_size() const {
return local_size_;
}

Expand All @@ -89,7 +89,7 @@ class NVF_API Communicator {
const std::string& prefix = "");

// returns the device associated with the current process
auto device() const {
at::Device device() const {
return at::Device("cuda:" + std::to_string(local_rank_));
}

Expand Down
4 changes: 2 additions & 2 deletions csrc/runtime/fusion_kernel_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
// clang-format on
#include <runtime/fusion_kernel_runtime.h>

#include <c10/cuda/CUDAGuard.h>

#include <fusion.h>
#include <fusion_profiler.h>
#include <fusion_segmenter.h>
Expand All @@ -25,8 +27,6 @@
#include <serde/fusion_cache_generated.h>
#include <type.h>

#include <c10/cuda/CUDAGuard.h>

namespace nvfuser {

namespace {
Expand Down
3 changes: 2 additions & 1 deletion tests/cpp/test_host_ir_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ TEST_F(HostIrEvaluatorTest, MatmulInLoop) {

// By default, MatmulOp is computed by ExpressionEvaluator so it appears in
// host IR.
auto* mm = IrBuilder::create<MatmulOp>(loop_out, in, loop_w);
auto* mm = IrBuilder::create<MatmulOp>(loop_out, in, loop_w)
->withOutputPreallocated();
for_loop->body().push_back(mm);

hic->pushBackTopLevelExprs(allocate_out);
Expand Down
Loading