diff --git a/CMakeLists.txt b/CMakeLists.txt index 8c807c93e85..dac99d38599 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -261,6 +261,7 @@ list(APPEND NVFUSER_SRCS ${NVFUSER_SRCS_DIR}/fusion_guard.cpp ${NVFUSER_SRCS_DIR}/fusion_segmenter.cpp ${NVFUSER_SRCS_DIR}/global_allocator.cpp + ${NVFUSER_SRCS_DIR}/graph/task_graph.cpp ${NVFUSER_SRCS_DIR}/grouped_reduction.cpp ${NVFUSER_SRCS_DIR}/host_ir/container.cpp ${NVFUSER_SRCS_DIR}/host_ir/evaluator.cpp @@ -992,6 +993,7 @@ list(APPEND JIT_TEST_SRCS ${NVFUSER_ROOT}/tests/cpp/test_statement_guard.cpp ${NVFUSER_ROOT}/tests/cpp/test_stream.cpp ${NVFUSER_ROOT}/tests/cpp/test_swizzle.cpp + ${NVFUSER_ROOT}/tests/cpp/test_task_graph.cpp ${NVFUSER_ROOT}/tests/cpp/test_tensor_factories.cpp ${NVFUSER_ROOT}/tests/cpp/test_tmem.cpp ${NVFUSER_ROOT}/tests/cpp/test_transpose.cpp diff --git a/csrc/fusion_segmenter.cpp b/csrc/fusion_segmenter.cpp index ce6c29c80c3..4e0b6b68722 100644 --- a/csrc/fusion_segmenter.cpp +++ b/csrc/fusion_segmenter.cpp @@ -14,7 +14,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -28,6 +30,7 @@ #include #include #include +#include #include #include @@ -2001,8 +2004,159 @@ bool SegmentCandidateFinder::hasSegmentHints(Fusion* fusion) { } namespace { + +class SegmentedGroupTaskGraphConverter { + public: + static TaskGraph convert( + const std::vector& groups, + SchedulerRuntimeInfo* runtime_info) { + SegmentedGroupTaskGraphConverter conv(runtime_info); + for (SegmentedGroup* group : groups) { + conv.processGroup(group); + } + return TaskGraph(conv.all_tasks_, conv.all_data_); + } + + private: + SegmentedGroupTaskGraphConverter(SchedulerRuntimeInfo* runtime_info) + : runtime_info_(runtime_info) {} + + void processGroup(SegmentedGroup* group) { + // When there are aliased inputs, they will appear as _outputs_ of the + // SegmentedGroup. To avoid actually adding those as outputs, we record them + // here first + std::unordered_set aliased_input_tvs; + for (Val* v : group->outputs()) { + if (auto* aliased_input_tv = dynamic_cast( + v->fusion()->getOutputAlias(v).aliased_io)) { + aliased_input_tvs.insert(aliased_input_tv); + } + } + + std::vector inputs; + // These are fusion inputs, so they are not edges between segments + for (Val* v : group->inputs()) { + if (auto* tv = dynamic_cast(v)) { + // Ignore scalar inputs + TaskGraph::DataId data_id = maybeRegisterTv(tv); + TaskGraph::Data& data = all_data_.at(data_id); + data.can_free = !tv->isFusionInput(); + inputs.push_back(data_id); + } + } + std::vector outputs; + for (Val* v : group->outputs()) { + if (auto* tv = dynamic_cast(v)) { + if (aliased_input_tvs.count(tv) || tv->isFusionInput()) { + // These are counted as outputs but are actually _inputs_ to this + // group + // Note that we skip setting alias links in the graph when the input + // is simply forwarded to the outputs unchanged. + // See AliasTest.TrivialInputForwarding for an example of this + continue; + } + TaskGraph::DataId data_id = maybeRegisterTv(tv); + TaskGraph::Data& data = all_data_.at((size_t)data_id); + if (auto* aliased_input_tv = dynamic_cast( + tv->fusion()->getOutputAlias(tv).aliased_io)) { + data.aliases_input = maybeRegisterTv(aliased_input_tv); + } + data.can_free = !tv->isFusionOutput(); + outputs.push_back(data_id); + } + } + + // TODO: inspect compiled segment executors to determine temp gmem needed + TaskGraph::Size temp_space = 0; + + all_tasks_.emplace_back(inputs, outputs, temp_space); + } + + int64_t getNumAllocatedElements(TensorView* tv) { + if (tv->isCpuScalar()) { + // Since CPU scalars do not result in any GPU allocation we count them as + // empty. + return 0; + } + int64_t numel = 1; + // Use ExpressionEvaluator for computed tensors assuming they are + // contiguous + for (IterDomain* id : tv->getMaybeAllocationDomain()) { + if (id->isBroadcast() || id->isReduction() || id->isDeviceDim()) { + continue; + } + PolymorphicValue pv = std::monostate{}; + if (runtime_info_ != nullptr) { + pv = runtime_info_->expressionEvaluator().evaluate(id->extent()); + } + // If we can't determine the size of this dimension, just assume + // it's 2. This way we will give precedence to tensors with + // allocation domains that have more concrete IDs. + int64_t dim_size = pv.is() ? pv.as() : 2; + numel *= dim_size; + } + return numel; + } + + TaskGraph::DataId maybeRegisterTv(TensorView* tv) { + auto it = tv2dataid_.find(tv); + if (it != tv2dataid_.end()) { + // tv is already registered + return it->second; + } + + // Register this TV + auto new_id = static_cast(std::ssize(all_data_)); + tv2dataid_[tv] = new_id; + + // If the TV is of type Index, we don't know if it will be 8 bytes or 4 + // bytes until we are given input + DataType dtype = tv->dtype(); + if (dtype == DataType::Index) { + // If we don't have runtime info, assume it is 64-bit + dtype = runtime_info_ != nullptr ? runtime_info_->getIndexType() + : DataType::Int; + } + TaskGraph::Size size = + getNumAllocatedElements(tv) * dataTypeSizeByte(dtype); + + all_data_.emplace_back( + /*definition=*/std::nullopt, + /*uses=*/std::vector{}, + /*aliases_input=*/-1, + size, + /*can_free=*/true); + return new_id; + } + + private: + SchedulerRuntimeInfo* runtime_info_; + std::vector all_data_; + std::unordered_map tv2dataid_; + std::vector all_tasks_; +}; + +std::vector optimalTopoSort( + const std::vector& groups, + SchedulerRuntimeInfo* runtime_info) { + FUSER_PERF_SCOPE("optimalTopoSort"); + + TaskGraph graph = + SegmentedGroupTaskGraphConverter::convert(groups, runtime_info); + + TaskGraph::SortResult result = graph.findOptimalOrder(/*validate=*/false); + + std::vector order; + order.reserve(groups.size()); + for (const TaskGraph::Step& step : result.steps) { + order.push_back(groups.at((size_t)step.task)); + } + return order; +} + std::vector toposort( const std::vector& groups) { + FUSER_PERF_SCOPE("toposort"); std::deque to_visit; std::unordered_map num_producer_edges; for (SegmentedGroup* group : groups) { @@ -5383,7 +5537,10 @@ void SegmentedFusion::annotateFP16IntermediateTensors() { } } -RuntimeWorkSpace prepareRuntimeOrder(const SegmentedFusion& segmented_fusion) { +RuntimeWorkSpace prepareRuntimeOrder( + const SegmentedFusion& segmented_fusion, + SchedulerRuntimeInfo* runtime_info) { + FUSER_PERF_SCOPE("prepareRuntimeOrder"); RuntimeWorkSpace runtime_workspace; // setup the order tensor dimensions are bound @@ -5398,7 +5555,8 @@ RuntimeWorkSpace prepareRuntimeOrder(const SegmentedFusion& segmented_fusion) { } } - runtime_workspace.group_run_order = toposort(segmented_fusion.groups()); + runtime_workspace.group_run_order = + optimalTopoSort(segmented_fusion.groups(), runtime_info); return runtime_workspace; } diff --git a/csrc/fusion_segmenter.h b/csrc/fusion_segmenter.h index 360f6180344..1a271e37712 100644 --- a/csrc/fusion_segmenter.h +++ b/csrc/fusion_segmenter.h @@ -477,7 +477,9 @@ struct RuntimeWorkSpace { // Perform a topological sort of different groups composiong the Segmented // Fusion -RuntimeWorkSpace prepareRuntimeOrder(const SegmentedFusion& segmented_fusion); +RuntimeWorkSpace prepareRuntimeOrder( + const SegmentedFusion& segmented_fusion, + SchedulerRuntimeInfo* runtime_info = nullptr); //! This is a base class for segmenter analysis //! provides the minimal implementation on header so that diff --git a/csrc/graph/task_graph.cpp b/csrc/graph/task_graph.cpp new file mode 100644 index 00000000000..9ece0deffb3 --- /dev/null +++ b/csrc/graph/task_graph.cpp @@ -0,0 +1,711 @@ +// clang-format off +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-present NVIDIA CORPORATION & AFFILIATES. + * All rights reserved. + * SPDX-License-Identifier: BSD-3-Clause + */ +// clang-format on +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include "options.h" + +namespace nvfuser { + +TaskGraph::TaskGraph( + const std::vector& tasks, + const std::vector& data) + : tasks_(tasks), data_(data) { + NVF_ERROR( + tasks.size() <= std::numeric_limits::max(), + "There are too many tasks to represent with TaskGraph::TaskId"); + NVF_ERROR( + data.size() <= std::numeric_limits::max(), + "There are too many data objects to represent with TaskGraph::DataId"); + + // Fill in Data uses and definitions + for (TaskId task_id : arange(numTasks())) { + const Task& task = tasks_.at((size_t)task_id); + for (DataId output_id : task.outputs) { + data_.at((size_t)output_id).definition = task_id; + } + for (DataId input_id : task.inputs) { + Data& input = data_.at(input_id); + if (std::find(input.uses.begin(), input.uses.end(), task_id) == + input.uses.end()) { + input.uses.push_back(task_id); + } + } + } + + validateGraph(); + + // Initialize the counts of future uses of data and unmet dependencies of + // tasks. These are the out-degrees of Data and in-degrees of Tasks, + // respectively. + num_dependencies_.reserve(tasks_.size()); + for (const Task& task : tasks_) { + // Only count task inputs that are not already available (i.e. they have no + // definition) + num_dependencies_.push_back((DataId)std::count_if( + task.inputs.begin(), task.inputs.end(), [&](DataId data_id) { + return getData(data_id).definition.has_value(); + })); + // Validate input + for (DataId input_id : task.inputs) { + NVF_ERROR(input_id >= 0 && (size_t)input_id < data_.size()); + } + for (DataId output_id : task.outputs) { + NVF_ERROR(output_id >= 0 && (size_t)output_id < data_.size()); + } + } + num_uses_.reserve(data_.size()); + for (const Data& data : data_) { + num_uses_.push_back((TaskId)data.uses.size()); + if (!data.definition.has_value()) { + initial_allocation_ += (Size)data.size; + } + // Validate input + if (data.definition.has_value()) { + DataId d = data.definition.value(); + NVF_ERROR(d >= 0 && (size_t)d < tasks_.size()); + } + if (data.aliases_input != -1) { + DataId a = data.aliases_input; + NVF_ERROR(a >= 0 && (size_t)a < data_.size()); + } + for (TaskId use : data.uses) { + NVF_ERROR(use >= 0 && (size_t)use < tasks_.size()); + } + } +} + +void TaskGraph::validateGraph() const { + for (TaskId task_id : arange(numTasks())) { + const Task& task = tasks_.at((size_t)task_id); + for (DataId output_id : task.outputs) { + const Data& output = getData(output_id); + NVF_ERROR( + output.definition.has_value() && + output.definition.value() == task_id); + } + for (DataId input_id : task.inputs) { + const Data& input = getData(input_id); + NVF_ERROR( + std::find(input.uses.begin(), input.uses.end(), task_id) != + input.uses.end()); + } + } + + for (const auto& [data_id, data] : enumerate(data_)) { + if (data.definition.has_value()) { + const Task& def = getTask(data.definition.value()); + NVF_ERROR( + std::find(def.outputs.begin(), def.outputs.end(), data_id) != + def.outputs.end()); + } + for (const TaskId use_id : data.uses) { + const Task& use = getTask(use_id); + NVF_ERROR( + std::find(use.inputs.begin(), use.inputs.end(), data_id) != + use.inputs.end()); + } + } +} + +std::string TaskGraph::toMermaid() const { + std::stringstream ss; + + ss << "flowchart TD\n"; + + bool print_data_size = false; + if (numData() > 0) { + Size sz = -1; + for (const Data& data : data_) { + if (data.size == 0) { + continue; + } + if (sz == -1) { + sz = data.size; + continue; + } + if (data.size != 0 && data.size != sz) { + print_data_size = true; + break; + } + } + } + + std::vector is_aliased_input(numData(), false); + + // Declare nodes with shapes and labels + for (const auto& [data_id, data] : enumerate(data_)) { + if (data.aliases_input != -1) { + is_aliased_input.at(data.aliases_input) = true; + } + ss << " d" << data_id << "([\"d" << data_id; + if (print_data_size || data.size == 0) { + // Print data size if there are different sized data elements. Always + // print [0] for empty data (these will be shown in gray) + ss << " [" << data.size << "]"; + } + ss << "\"]);\n"; + } + for (const auto& [task_id, task] : enumerate(tasks_)) { + if (task.temp_space != 0) { + ss << " t" << task_id << "[\"t" << task_id << " [" << task.temp_space + << "]\"];\n"; + } + } + + for (const auto& [task_id, task] : enumerate(tasks_)) { + for (const DataId& input_id : task.inputs) { + ss << " d" << input_id << " --> t" << task_id << "\n"; + } + for (const DataId& output_id : task.outputs) { + ss << " t" << task_id << " --> d" << output_id << "\n"; + } + } + + ss << "\n"; + ss << " classDef task fill:orange,stroke:darkorange;\n"; + ss << " classDef data fill:lightblue,stroke:blue;\n"; + ss << " classDef dataInput fill:lightgreen,stroke:green;\n"; + ss << " classDef dataOutput fill:pink,stroke:red;\n"; + ss << " classDef dataEmpty fill:#EEE,stroke:#DDD,color:#999;\n"; + ss << " classDef aliasedInput fill:yellow,stroke:yellow;\n"; + ss << " classDef aliasEdge stroke-dasharray:3,stroke:blue;\n"; + + ss << "\n"; + for (const TaskId task_id : arange(numTasks())) { + ss << " class t" << task_id << " task;\n"; + } + ss << "\n"; + for (const auto& [data_id, data] : enumerate(data_)) { + // Create edges for aliases + if (data.aliases_input != -1) { + ss << " d" << data_id << " alias" << data_id << "@--> d" + << data.aliases_input << ";\n"; + ss << " class alias" << data_id << " aliasEdge;\n"; + } + + std::string class_name = "data"; + if (!data.definition.has_value()) { + if (is_aliased_input.at(data_id)) { + class_name = "aliasedInput"; + } else { + class_name = "dataInput"; + } + } else if (!data.can_free) { + class_name = "dataOutput"; + } else if (data.size == 0) { + class_name = "dataEmpty"; + } + ss << " class d" << data_id << " " << class_name << ";\n"; + } + + return ss.str(); +} + +void TaskGraph::validateSteps(const std::vector& steps) const { + // First find any Data in the graph that has no definition. This must be + // preallocated before running the program, so we initialize allocated and + // high_water_mark to the sum of their sizes. + TaskGraph::Size allocated = getInitialAllocation(); + TaskGraph::Size high_water_mark = allocated; + + std::vector future_uses = num_uses_; + std::vector outstanding_dependencies = num_dependencies_; + + // Now we are ready to process steps + for (const Step& step : steps) { + NVF_ERROR( + outstanding_dependencies.at((size_t)step.task) == 0, + "Invalid ordering found: task id ", + step.task, + " is executed before all its dependencies are available"); + + const Task& task = getTask(step.task); + + // Allocate outputs + for (const DataId output_id : task.outputs) { + const Data& output = getData(output_id); + if (output.aliases_input != -1) { + // Check that the aliased input has no further uses + // Note that we will decrement this use count later in this function + NVF_ERROR( + future_uses.at((size_t)output.aliases_input) == 1, + "Tried to execute segment that would overwrite input alias before " + "some of its uses"); + } else { + // Don't allocate outputs if they are reusing input memory + allocated += output.size; + } + } + + // Add temporary space + allocated += task.temp_space; + + // This is the most space we will use, so update high water mark here + high_water_mark = std::max(high_water_mark, allocated); + + NVF_ERROR( + step.high_water_mark == high_water_mark, + "Mismatch in high water mark during validation"); + + // reduce use count for inputs and free them if possible + for (const DataId input_id : task.inputs) { + if (--future_uses.at((size_t)input_id) == 0) { + // There are no more uses for this Data, so free it if we're allowed to + const Data& data = getData(input_id); + if (data.can_free) { + allocated -= data.size; + } + } + } + + for (const DataId output_id : task.outputs) { + const Data& data = getData(output_id); + for (const TaskId use_id : data.uses) { + --outstanding_dependencies.at((size_t)use_id); + } + } + + // step.allocated indicates how much space is allocated _upon completion_ of + // this step + NVF_ERROR( + step.allocated == allocated, "Mismatch in allocated during validation"); + } +} + +TaskGraph TaskGraph::convertAliasesToDependencies() const { + // Begin with a copy of the tasks and data + std::vector tasks{tasks_}; + std::vector data{data_}; + + // This is used to ensure we don't have multiple aliases of the same input + std::unordered_set aliased_inputs; + + // If we modify data while traversing it, then we run the risk + + for (TaskId task_id : arange((TaskId)tasks.size())) { + Task& task = tasks.at(task_id); + for (DataId output_id : task.outputs) { + Data& output = data.at((size_t)output_id); + if (output.aliases_input != -1) { + DataId alias_id = output.aliases_input; + // Reset the aliases_input flag before modifying the data vector + output.aliases_input = -1; + Data& alias = data.at((size_t)alias_id); + NVF_ERROR_EQ( + output.size, + alias.size, + "Expected alias to have same size as alias"); + // Reset to unaliased and set size to zero + output.size = 0; + + NVF_ERROR( + !aliased_inputs.contains(alias_id), + "Found multiple outputs aliasing the same input"); + aliased_inputs.insert(alias_id); + + // For each use of the aliased input, add a new output to it and make + // that output a new input to the current task + for (TaskId use_id : alias.uses) { + if (use_id == task_id) { + continue; + } + Task& use = tasks.at((size_t)use_id); + + auto dummy_data_id = (DataId)data.size(); + data.emplace_back( + /*definition=*/std::optional{use_id}, + /*uses=*/std::vector{task_id}, + /*aliases_input=*/-1, + /*size=*/0, + /*can_free=*/true); + + use.outputs.push_back(dummy_data_id); + task.inputs.push_back(dummy_data_id); + } + } + } + } + + return {tasks, data}; +} + +namespace { + +//! [Backtracking algorithm to find optimal topological ordering] +//! +//! Note that if the input graph has aliases, we first convert to a graph +//! without aliases but with more data links. This allows us to ignore aliasing +//! when sorting the tasks, while maintaining the same memory usage analysis. +//! +//! If validate==true, then we will validate the steps vector after every +//! backtracking step. +//! +//! c.f. https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm +class TaskSorter { + public: + TaskSorter( + const TaskGraph& graph, + bool validate, + int64_t max_time_us, + bool print_debug) + : orig_graph_(graph), + graph_(graph.convertAliasesToDependencies()), + debug_(print_debug), + validate_(validate), + max_time_us_(max_time_us) { + if (debug_) { + const bool has_aliasing = std::ranges::any_of( + arange(orig_graph_.numData()), [&](TaskGraph::DataId data_id) { + return orig_graph_.getData(data_id).aliases_input != -1; + }); + if (has_aliasing) { + debug() << "Aliasing detected in task graph. Original graph:\n"; + debug() << orig_graph_.toString() << "\n\n"; + if (hasDebugDumpArgument(DebugDumpOption::TaskGraph, "mermaid")) { + debug() << "Original graph (mermaid):\n" + << orig_graph_.toMermaid() << std::endl; + } + debug() << "Modified graph without aliasing:\n"; + debug() << graph_.toString() << "\n\n"; + if (hasDebugDumpArgument(DebugDumpOption::TaskGraph, "mermaid")) { + debug() << "Modified graph (mermaid):\n" + << graph_.toMermaid() << std::endl; + } + } else { + debug() << graph_.toString() << "\n\n"; + if (hasDebugDumpArgument(DebugDumpOption::TaskGraph, "mermaid")) { + debug() << "Mermaid graph:\n" << graph_.toMermaid() << std::endl; + } + } + } + sort(); + } + + const TaskGraph::SortResult& result() const { + return result_; + } + + private: + inline void validate() const { + if (validate_) { + orig_graph_.validateSteps(steps_); + } + } + + //! This pushes a step indicating that we should execute the given task next. + void advance(TaskGraph::TaskId task_id) { + TaskGraph::Size allocated = 0; + TaskGraph::Size high_water_mark = 0; + if (steps_.empty()) { + // (Re-)Initialize allocated and high_water_mark to starting values + allocated = graph_.getInitialAllocation(); + high_water_mark = allocated; + } else { + allocated = steps_.back().allocated; + high_water_mark = steps_.back().high_water_mark; + } + + NVF_ERROR( + ready_tasks_.erase(task_id) == 1, + "Attempted to advance to task that was not marked ready"); + + // Compute the new allocated amount and high water mark for this step + const TaskGraph::Task& task = graph_.getTask(task_id); + + for (const TaskGraph::DataId output_id : task.outputs) { + const TaskGraph::Data& output = graph_.getData(output_id); + // Allocate outputs + allocated += output.size; + + // Update outstanding_dependencies_ and ready_tasks_ for each use + for (const TaskGraph::TaskId use_id : output.uses) { + --outstanding_dependencies_.at((size_t)use_id); + if (taskIsReady(use_id)) { + ready_tasks_.insert(use_id); + } + } + } + + // Add temp space + allocated += task.temp_space; + + // Update high water mark + high_water_mark = std::max(high_water_mark, allocated); + + // Decrement future_uses_ and deallocate dead inputs + for (const TaskGraph::DataId input_id : task.inputs) { + const TaskGraph::Data& input = graph_.getData(input_id); + if (--future_uses_.at((size_t)input_id) == 0) { + if (input.can_free) { + allocated -= input.size; + } + } + } + + steps_.emplace_back(task_id, allocated, high_water_mark); + } + + //! Backtrack a single step. This returns the TaskId of the step that was + //! popped. + TaskGraph::TaskId backtrack() { + validate(); + TaskGraph::TaskId last_task_id = steps_.back().task; + const TaskGraph::Task& last_task = graph_.getTask(last_task_id); + steps_.pop_back(); + + ready_tasks_.insert(last_task_id); + + // Update outstanding_dependencies to reflect that the outputs of last_task + // are no longer available + for (const TaskGraph::DataId& output_id : last_task.outputs) { + const TaskGraph::Data& output = graph_.getData(output_id); + for (const TaskGraph::TaskId use_id : output.uses) { + if (outstanding_dependencies_.at((size_t)use_id)++ == 0) { + // This task _was_ ready but not it is not + ready_tasks_.erase((size_t)use_id); + } + } + } + + // Update future_uses to reflect that the inputs to last_task will need to + // compute last_task later + for (const TaskGraph::DataId& input_id : last_task.inputs) { + future_uses_.at((size_t)input_id)++; + } + + return last_task_id; + } + + //! A task is ready if it has no outstanding_dependencies and it is the last + //! use for all of its aliased inputs. Note that since we convert to a graph + //! with no aliases in the constructor of this class, it is safe to assume + //! that there are no alias conflicts. + bool taskIsReady(TaskGraph::TaskId task_id) const { + return outstanding_dependencies_.at((size_t)task_id) == 0; + } + + void sort() { + // Set up outstanding_dependencies_, future_uses_, and ready_tasks_ + future_uses_.resize(graph_.numData(), 0); + for (const TaskGraph::DataId data_id : arange(graph_.numData())) { + const TaskGraph::Data& data = graph_.getData(data_id); + future_uses_.at((size_t)data_id) = data.uses.size(); + } + + outstanding_dependencies_.resize(graph_.numTasks(), 0); + for (const TaskGraph::TaskId task_id : arange(graph_.numTasks())) { + const TaskGraph::Task& task = graph_.getTask(task_id); + TaskGraph::DataId inputs_to_compute = 0; + for (const TaskGraph::DataId data_id : task.inputs) { + const TaskGraph::Data& data = graph_.getData(data_id); + if (data.definition.has_value()) { + // Skip counting input data since these are available before we start + inputs_to_compute++; + } + } + outstanding_dependencies_.at((size_t)task_id) = inputs_to_compute; + if (taskIsReady(task_id)) { + ready_tasks_.insert(task_id); + } + } + + // Initialize best steps found so far + std::vector best_steps; + + // This is the main optimization loop + TaskGraph::TaskId backtracked_task_id = -1; + + using Clock = std::chrono::high_resolution_clock; + Clock::time_point start = Clock::now(); + + result_.iterations = 0; + while (true) { + result_.iterations++; + if (result_.iterations % 64 == 0) { + Clock::time_point end = Clock::now(); + if (std::chrono::duration_cast(end - start) + .count() > max_time_us_) { + break; + } + } + + NVF_ERROR( + !ready_tasks_.empty() || std::ssize(steps_) == graph_.numTasks(), + "Ran out of ready tasks before completing ordering"); + + const auto it = std::lower_bound( + ready_tasks_.begin(), ready_tasks_.end(), backtracked_task_id + 1); + TaskGraph::TaskId next_task_id = it == ready_tasks_.end() ? -1 : *it; + + // Reset backtracked_task_id + backtracked_task_id = -1; + + if (next_task_id == -1) { + // There are no ready tasks with ID above the backtracked_task_id. This + // means it is time to backtrack + + if (steps_.empty()) { + // If there is nowhere to backtrack it means we are done with the + // search + result_.exhaustive = true; + break; + } + backtracked_task_id = backtrack(); + continue; + } + + advance(next_task_id); + + // If our high water mark is above best_usage, terminate early and + // backtrack + TaskGraph::Size hwm = steps_.back().high_water_mark; + TaskGraph::Size best_hwm = best_steps.empty() + ? std::numeric_limits::max() + : best_steps.back().high_water_mark; + + if (hwm > best_hwm) { + backtracked_task_id = backtrack(); + continue; + } + + // Our usage is at or below best. Have we completed an ordering? If + // so, update best_steps + if (std::ssize(steps_) == graph_.numTasks() && hwm < best_hwm) { + best_steps = steps_; + } + } // while + + // Record our best found steps + result_.steps = best_steps; + + if (isDebugDumpEnabled(DebugDumpOption::TaskGraph)) { + Clock::time_point stop = Clock::now(); + debug() << "Found these steps in " + << (std::chrono::duration_cast( + stop - start) + .count()) + << " ms:\n"; + for (const TaskGraph::Step& step : result_.steps) { + debug() << " " << step << "\n"; + } + debug() << "The search contained " << result_.iterations + << " iterations and was "; + if (!result_.exhaustive) { + debug() << "NOT "; + } + debug() << "exhaustive" << std::endl; + } + + // Validate final result + NVF_ERROR(result_.steps.size() == (size_t)graph_.numTasks()); + validate(); + } + + private: + const TaskGraph& orig_graph_; + const TaskGraph graph_; + const bool debug_; + const bool validate_; + const int64_t max_time_us_; + + TaskGraph::SortResult result_; + std::vector steps_; + + //! There is one entry here for each task and indicating how many + //! dependencies are currently unmet. When this reaches zero the task becomes + //! ready. + std::vector outstanding_dependencies_; + + //! There is one entry here for each Data and indicating how many uses there + //! are remaining. When it reaches zero, the Data can be freed if allowed. + std::vector future_uses_; + + //! This holds all candidates for the next step, sorted by ID + std::set ready_tasks_; +}; + +} // namespace + +std::string TaskGraph::Task::toString() const { + std::stringstream ss; + ss << "Task{"; + ss << "input ids={" << inputs << "}"; + ss << ", output ids={" << outputs << "}"; + ss << ", temp space=" << temp_space; + ss << "}"; + return ss.str(); +} + +std::string TaskGraph::Data::toString() const { + std::stringstream ss; + ss << "Data{"; + ss << "definition=" + << (definition.has_value() ? std::to_string(definition.value()) : "none"); + ss << ", uses={" << uses << "}"; + ss << ", size=" << size; + ss << ", aliases_input=" << aliases_input; + ss << ", can_free=" << (can_free ? "yes" : "no"); + ss << "}"; + return ss.str(); +} + +std::string TaskGraph::Step::toString() const { + std::stringstream ss; + ss << "Step{"; + ss << "task id=" << task; + ss << ", allocated=" << allocated; + ss << ", high water mark=" << high_water_mark; + ss << "}"; + return ss.str(); +} + +std::string TaskGraph::SortResult::toString() const { + std::stringstream ss; + ss << "SortResult{"; + ss << "steps={" << steps << "}"; + ss << ", iterations=" << iterations; + ss << ", exhaustive=" << (exhaustive ? "yes" : "no"); + ss << "}"; + return ss.str(); +} + +std::string TaskGraph::toString() const { + std::stringstream ss; + ss << "TaskGraph{\n"; + ss << " data:\n"; + for (DataId i : arange(numData())) { + ss << " " << i << " = " << getData(i) << "\n"; + } + ss << " tasks:\n"; + for (TaskId j : arange(numTasks())) { + ss << " " << j << " = " << getTask(j) << "\n"; + } + ss << "}"; + return ss.str(); +} + +TaskGraph::SortResult TaskGraph::findOptimalOrder(bool validate) const { + TaskSorter sorter( + *this, + validate, + /*max_time_us=*/100000, + isDebugDumpEnabled(DebugDumpOption::TaskGraph)); + return sorter.result(); +} + +} // namespace nvfuser diff --git a/csrc/graph/task_graph.h b/csrc/graph/task_graph.h new file mode 100644 index 00000000000..790bb738526 --- /dev/null +++ b/csrc/graph/task_graph.h @@ -0,0 +1,195 @@ +// clang-format off +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-present NVIDIA CORPORATION & AFFILIATES. + * All rights reserved. + * SPDX-License-Identifier: BSD-3-Clause + */ +// clang-format on +#pragma once + +#include +#include +#include +#include +#include + +namespace nvfuser { + +//! A task graph is a stripped-down representation of a data flow graph. It was +//! originally intended to model runtime order optimization during segmentation, +//! but might have applications in other contexts. +//! +//! TensorViews are represented as Data and each contains a size and might be +//! aliased to another Data, modeling input/output aliasing in a Fusion. A +//! segment from a segmented fusion is represented here as a Task. Every task +//! has inputs and outputs and also might require some temporary space to do its +//! computation. For example when doing grid reductions we require a gmem buffer +//! that is freed after the segment is computed. +//! +//! We model execution using the Step struct. A vector of Steps is simply a +//! runtime ordering of Tasks, but with some extra state that helps us track +//! memory allocation across the execution. Specifically, our model usually only +//! allocates Data upon its first use and immediately deallocates in after its +//! last use. The only exception is if the Data is marked can_free=false, which +//! would be the case for unsegmented Fusion inputs or outputs whose lifetimes +//! must extend past the execution of the entire graph. +class TaskGraph { + public: + using TaskId = int16_t; + using DataId = int16_t; + using Size = int64_t; + + //! A Task consumes some input Data and produces some output Data. To do so, + //! it might use some intermediate space. + struct Task { + std::vector inputs; + std::vector outputs; + //! This amount of temporary space is required only while executing the Task + //! and is immediately freed afterward + Size temp_space = 0; + + std::string toString() const; + }; + + //! A Data object represents a TensorView with a given size. + struct Data { + std::optional definition; + std::vector uses; + + //! If set to something other than -1, this means we do not allocate a new + //! output when executing this Data's definition, instead we re-use the + //! space from the specified input. Note that this implies an ordering + //! constraint which we will check, since the definition must be the last + //! use of the aliased input. + DataId aliases_input = -1; + + Size size = 1; + + //! This indicates whether we are able to free this data after its last use. + //! For a segmented fusion, unsegmented fusion inputs and outputs cannot be + //! freed (with the exception of an aliased input), while any intermediate + //! tensors should be freed as soon as possible. + bool can_free = true; + + std::string toString() const; + }; + + //! Note that the Tasks provided here must have accurate inputs, outputs, and + //! temporary space. The Data must have accurate aliases_input, size, and + //! can_free fields. However, uses and definitions can be empty in which case + //! they will be filled in automatically. Any pre-existing definitions or uses + //! will be checked for consistency. + TaskGraph(const std::vector& tasks, const std::vector& data); + + //! This represents the execution of a single Task in a given ordering. It + //! tracks some cumulative state representing the amount of space required up + //! to this point. + struct Step { + TaskId task; + + //! This is the sum of all Data that is active _after_ execution of this + //! task and after any inputs with no more uses are freed. + Size allocated; + + //! This is the maximum active space used until this step is completed. + Size high_water_mark; + + std::string toString() const; + }; + + TaskId numTasks() const { + return (TaskId)tasks_.size(); + } + + const Task& getTask(TaskId id) const { + return tasks_.at((size_t)id); + } + + TaskId numData() const { + return (DataId)data_.size(); + } + + const Data& getData(DataId id) const { + return data_.at((size_t)id); + } + + Size getInitialAllocation() const { + return initial_allocation_; + } + + void validateGraph() const; + + //! Given a list of steps, recompute the active space and high water mark. + //! This is useful for validating that our backtracking algorithm does not + //! corrupt this data. Raises an exception if corruption is detected. + void validateSteps(const std::vector& steps) const; + + struct SortResult { + std::vector steps; + + //! Number of iterations computed + int64_t iterations = 0; + + //! Whether the search was exhaustive. If not, then it was likely cut off + //! early because of an iteration limit. + bool exhaustive = false; + + std::string toString() const; + }; + + //! This converts a graph that has aliases into one that has no aliases but + //! has the same task order dependencies and the same memory requirements for + //! any execution order. This is done by adding new Data nodes that have zero + //! size in order to enforce the constraint that the last use of an aliased + //! input must be the one that overwrites that input. + //! + //! This conversion is mainly used to simplify algorithms so that they can + //! guarantee the aliasing condition without needing to explicitly handle + //! aliasing. + TaskGraph convertAliasesToDependencies() const; + + //! This does an exhaustive search of all possible orderings using a modified + //! Kahn's algorithm to efficiently traverse the set of possible topological + //! orderings. + SortResult findOptimalOrder(bool validate = true) const; + + std::string toString() const; + + //! Generates a string in the mermaid language for rendering online + std::string toMermaid() const; + + private: + const std::vector tasks_; + std::vector data_; + + //! How much data is allocated by data that has no definition, i.e. input data + Size initial_allocation_ = 0; + + std::vector num_uses_; + std::vector num_dependencies_; +}; + +inline std::ostream& operator<<(std::ostream& os, const TaskGraph::Task& task) { + os << task.toString(); + return os; +} +inline std::ostream& operator<<(std::ostream& os, const TaskGraph::Data& data) { + os << data.toString(); + return os; +} +inline std::ostream& operator<<(std::ostream& os, const TaskGraph& graph) { + os << graph.toString(); + return os; +} +inline std::ostream& operator<<(std::ostream& os, const TaskGraph::Step& step) { + os << step.toString(); + return os; +} +inline std::ostream& operator<<( + std::ostream& os, + const TaskGraph::SortResult& result) { + os << result.toString(); + return os; +} + +} // namespace nvfuser diff --git a/csrc/host_ir/lower.cpp b/csrc/host_ir/lower.cpp index 041e51018da..0debbffcac6 100644 --- a/csrc/host_ir/lower.cpp +++ b/csrc/host_ir/lower.cpp @@ -116,6 +116,8 @@ std::unique_ptr HostIrLower::lower( std::move(fusion), KernelArgumentHolder(), options, true); // Infer a topologically ordered traversal of the segmented fusion to // determine the order for launching the kernels/comms + // TODO: pass runtime info to prepareRuntimeOrder to optimize runtime order + // for memory use RuntimeWorkSpace workspace = prepareRuntimeOrder(*staged_fusion); // Create the HostIrContainer representing the host program. Each segment of // the segmented fusion will be translated to a HostIR diff --git a/csrc/host_ir/lowering.cpp b/csrc/host_ir/lowering.cpp index 9f4acd80d63..9e032e760f8 100644 --- a/csrc/host_ir/lowering.cpp +++ b/csrc/host_ir/lowering.cpp @@ -240,7 +240,8 @@ void lowerSegment( std::unique_ptr lowerSegmentedFusionToHostIr( const SegmentedFusion& segmented_fusion, const std::vector& launch_params_per_segment, - std::vector>& executors) { + std::vector>& executors, + SchedulerRuntimeInfo& runtime_info) { auto hic = std::make_unique(); IrCloner ir_cloner(hic.get()); @@ -262,7 +263,7 @@ std::unique_ptr lowerSegmentedFusionToHostIr( } for (SegmentedGroup* group : - prepareRuntimeOrder(segmented_fusion).group_run_order) { + prepareRuntimeOrder(segmented_fusion, &runtime_info).group_run_order) { lowerSegment( *group, segmented_fusion.completeFusion()->getOutputAliases(), diff --git a/csrc/host_ir/lowering.h b/csrc/host_ir/lowering.h index 91df58f20ab..5c68833f7e7 100644 --- a/csrc/host_ir/lowering.h +++ b/csrc/host_ir/lowering.h @@ -21,6 +21,7 @@ std::unique_ptr lowerSegmentedFusionToHostIr( // TODO(#4927): Launch parameters should be passed in at runtime, not // compile time. They can change according to input sizes. const std::vector& launch_params_per_segment, - std::vector>& executors); + std::vector>& executors, + SchedulerRuntimeInfo& runtime_info); } // namespace nvfuser diff --git a/csrc/options.cpp b/csrc/options.cpp index 6e424d5e316..46d66699e38 100644 --- a/csrc/options.cpp +++ b/csrc/options.cpp @@ -147,6 +147,7 @@ std::unordered_map> Options< {"segmented_fusion", DebugDumpOption::FusionSegments}, {"segmenter_logging", DebugDumpOption::FusionSegmenterLog}, {"scheduler_params", DebugDumpOption::SchedulerDebug}, + {"task_graph", DebugDumpOption::TaskGraph}, {"dynamic_shared_memory", DebugDumpOption::DynamicSharedMemory}, {"scheduler_verbose", DebugDumpOption::SchedulerVerbose}, {"sync_map", DebugDumpOption::SyncMap}, diff --git a/csrc/options.h b/csrc/options.h index 4ba316242ad..5baba986355 100644 --- a/csrc/options.h +++ b/csrc/options.h @@ -76,6 +76,7 @@ enum class DebugDumpOption { PythonFrontendDebug, //! Python Frontend debug information. TransformPropagator, //! When running TransformPropagator, print propagation //! path and replay result + TaskGraph, //! Print information about TaskGraph used in segmentation ordering Cubin, //! Dump compiled CUBIN Sass, //! Dump disassembled SASS SassToFile, //!< Dump disassembled SASS to File diff --git a/csrc/runtime/fusion_kernel_runtime.cpp b/csrc/runtime/fusion_kernel_runtime.cpp index 962b540d675..273362252ea 100644 --- a/csrc/runtime/fusion_kernel_runtime.cpp +++ b/csrc/runtime/fusion_kernel_runtime.cpp @@ -128,7 +128,7 @@ FusionKernelRuntime::FusionKernelRuntime( // Pre-compute the executor order so that the run time path // would go directly to kernel launch. - runtime_workspace_ = prepareRuntimeOrder(*segmented_fusion_); + runtime_workspace_ = prepareRuntimeOrder(*segmented_fusion_, &runtime_info); executors_.resize(segmented_fusion_->groups().size()); @@ -467,8 +467,13 @@ void FusionKernelRuntime::compileFusionParallel(KernelArgumentHolder args) { for (const auto& heuristic_params : schedulers()) { launch_params_per_segment.push_back(heuristic_params->lparams); } + SchedulerRuntimeInfo runtime_info( + segmented_fusion_->completeFusion(), args); std::unique_ptr hic = lowerSegmentedFusionToHostIr( - *segmented_fusion_, launch_params_per_segment, executors_); + *segmented_fusion_, + launch_params_per_segment, + executors_, + runtime_info); if (isOptionEnabled(EnableOption::HostIrJit)) { hij_ = std::make_unique(std::move(hic)); } else { diff --git a/python/nvfuser/pytorch_utils.py b/python/nvfuser/pytorch_utils.py index afe7cc0c4a5..c5bebf5ffb8 100644 --- a/python/nvfuser/pytorch_utils.py +++ b/python/nvfuser/pytorch_utils.py @@ -6,6 +6,7 @@ from ._C import DataType import ctypes +from dataclasses import dataclass import functools import gc from typing import Type, Union, Tuple @@ -188,3 +189,33 @@ def retried_func(*args, **kwargs): return output return retried_func + + +@dataclass +class TorchMemorySnapshot: + bytes_allocated_current: int = None + bytes_allocated_max: int = None + + def __post_init__(self): + self.bytes_allocated_current = torch.cuda.memory_allocated() + self.bytes_allocated_max = torch.cuda.max_memory_allocated() + + +@dataclass +class RecordTorchMemory: + before: TorchMemorySnapshot | None = None + after: TorchMemorySnapshot | None = None + + def __enter__(self): + gc.collect(0) + torch.cuda.empty_cache() + torch.cuda.reset_peak_memory_stats() + self.before = TorchMemorySnapshot() + return self + + def __exit__(self, type, value, traceback): + self.after = TorchMemorySnapshot() + return True + + def __repr__(self): + return f"RecordTorchMemory:\n before={self.before}\n after={self.after}\n" diff --git a/tests/cpp/test_task_graph.cpp b/tests/cpp/test_task_graph.cpp new file mode 100644 index 00000000000..2a6772d7899 --- /dev/null +++ b/tests/cpp/test_task_graph.cpp @@ -0,0 +1,356 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2023-present NVIDIA CORPORATION & + * AFFILIATES. All rights reserved. SPDX-License-Identifier: BSD-3-Clause + */ +// clang-format on +#include + +#include +#include + +#include +#include +#include + +namespace nvfuser { + +using Tasks = std::vector; +using TaskGraphTest = NVFuserTest; + +std::vector inferData(const Tasks& tasks) { + // Find number of data items so we can resize + TaskGraph::DataId max_data_id = 0; + for (const TaskGraph::Task& task : tasks) { + for (TaskGraph::DataId input_id : task.inputs) { + max_data_id = std::max(max_data_id, input_id); + } + for (TaskGraph::DataId output_id : task.outputs) { + max_data_id = std::max(max_data_id, output_id); + } + } + std::vector all_data((size_t)max_data_id + 1); + + for (const auto& [task_id, task] : enumerate(tasks)) { + for (TaskGraph::DataId input_id : task.inputs) { + all_data.at(input_id).uses.push_back(task_id); + } + for (TaskGraph::DataId output_id : task.outputs) { + all_data.at(output_id).definition = task_id; + } + } + + // Detect inputs and outputs and ensure they are not freed + // + // Note that any Data without a definition is treated as an input. + // Additionally, only Data that has no uses is considered an output. In the + // general case we could have outputs that are used in other Tasks, but these + // will need to be handled manually, since the intention of this tool is to + // enable common graph patterns to be built quickly. + for (TaskGraph::Data& data : all_data) { + data.size = 1; + data.can_free = data.definition.has_value() && !data.uses.empty(); + } + + return all_data; +} + +std::vector getTasks(const TaskGraph::SortResult& result) { + const std::vector& steps = result.steps; + std::vector tasks; + tasks.reserve(steps.size()); + for (const TaskGraph::Step& step : steps) { + tasks.push_back(step.task); + } + return tasks; +} + +TEST_F(TaskGraphTest, Basic) { + /* + * 0 1 + * |\ / + * | 2 + * |/ + * 3 + */ + Tasks tasks{{{0, 1}, {2}}, {{0, 2}, {3}}}; + std::vector data = inferData(tasks); + auto graph = TaskGraph(tasks, data); + + const TaskGraph::SortResult result = graph.findOptimalOrder(); + + ASSERT_EQ(result.steps.size(), tasks.size()); + std::vector expected{0, 1}; + EXPECT_EQ(getTasks(result), expected); + EXPECT_EQ(result.steps.back().high_water_mark, 4); +} + +TEST_F(TaskGraphTest, SharedIntermediate) { + /* + * 0 + * /|\ + * | 1 | + * |/ \| + * 2 3 + */ + Tasks tasks{ + {{0}, {1}}, + {{0, 1}, {2}}, + {{0, 1}, {3}}, + }; + std::vector data = inferData(tasks); + auto graph = TaskGraph(tasks, data); + + const TaskGraph::SortResult result = graph.findOptimalOrder(); + + ASSERT_EQ(result.steps.size(), tasks.size()); + // Either 0 1 2 or 0 2 1 are acceptable orders + EXPECT_EQ(result.steps.back().high_water_mark, 4); +} + +TEST_F(TaskGraphTest, SharedIntermediateWithAlias) { + /* + * 0 + * /|\ + * | 1 | + * |/ \| + * 2 3 + */ + Tasks tasks{ + {{0}, {1}}, // Task 0 + {{0, 1}, {2}}, // Task 1 + {{0, 1}, {3}}, // Task 2 + }; + std::vector data = inferData(tasks); + + { + data.at(2).aliases_input = -1; + data.at(3).aliases_input = 0; + auto graph = TaskGraph(tasks, data); + const TaskGraph::SortResult result = graph.findOptimalOrder(); + + ASSERT_EQ(result.steps.size(), tasks.size()); + // Due to the alias 0 1 2 is the only acceptable order + std::vector expected{0, 1, 2}; + EXPECT_EQ(getTasks(result), expected); + EXPECT_EQ(result.steps.back().high_water_mark, 3); + } + + { // When 2 aliases the input instead, we should switch the order + data.at(2).aliases_input = 0; + data.at(3).aliases_input = -1; + auto graph = TaskGraph(tasks, data); + const TaskGraph::SortResult result = graph.findOptimalOrder(); + + ASSERT_EQ(result.steps.size(), tasks.size()); + // Now 0 2 1 is the only acceptable order + std::vector expected{0, 2, 1}; + EXPECT_EQ(getTasks(result), expected); + EXPECT_EQ(result.steps.back().high_water_mark, 3); + } +} + +// This example includes two segments, each of which aliases the other +TEST_F(TaskGraphTest, ImpossibleAlias) { + /* + * 0 1 + * |\ /| + * | X | + * |/ \| + * 2 3 + * + * Two tasks, each takes the same two inputs + */ + Tasks tasks{{{0, 1}, {2}}, {{0, 1}, {3}}}; + std::vector data = inferData(tasks); + // Each of the segment outputs aliases a different input + data[2].aliases_input = 0; + data[3].aliases_input = 1; + // This graph can't be ordered without breaking the aliasing constraint + auto graph = TaskGraph(tasks, data); + + EXPECT_THAT( + [&graph]() { graph.findOptimalOrder(); }, + ::testing::ThrowsMessage(::testing::HasSubstr( + "Ran out of ready tasks before completing ordering"))); +} + +TEST_F(TaskGraphTest, SelfEdge) { + Tasks tasks{{{0}, {0}}}; + std::vector data = inferData(tasks); + // This graph can't be ordered because it contains an edge from a Data node + // back to itself. A task can't be both producer and consumer to a Data. + auto graph = TaskGraph(tasks, data); + + EXPECT_THAT( + [&graph]() { graph.findOptimalOrder(); }, + ::testing::ThrowsMessage(::testing::HasSubstr( + "Ran out of ready tasks before completing ordering"))); +} + +TEST_F(TaskGraphTest, TwoCycle) { + Tasks tasks{{{0}, {1}}, {{1}, {0}}}; + std::vector data = inferData(tasks); + // This graph can't be ordered because it contains a cycle + auto graph = TaskGraph(tasks, data); + + EXPECT_THAT( + [&graph]() { graph.findOptimalOrder(); }, + ::testing::ThrowsMessage(::testing::HasSubstr( + "Ran out of ready tasks before completing ordering"))); +} + +TEST_F(TaskGraphTest, ThreeCycle) { + Tasks tasks{{{0}, {1}}, {{1}, {2}}, {{2}, {0}}}; + std::vector data = inferData(tasks); + // This graph can't be ordered because it contains a cycle + auto graph = TaskGraph(tasks, data); + + EXPECT_THAT( + [&graph]() { graph.findOptimalOrder(); }, + ::testing::ThrowsMessage(::testing::HasSubstr( + "Ran out of ready tasks before completing ordering"))); +} + +TEST_F(TaskGraphTest, FreeableIntermediate) { + /* + * 0 + * /|\ + * 1 2 3 + * | + * 4 + */ + Tasks tasks{ + {{0}, {1}}, // Task 0 + {{0}, {2}}, // Task 1 + {{0}, {3}}, // Task 2 + {{3}, {4}}, // Task 3 + }; + std::vector data = inferData(tasks); + auto graph = TaskGraph(tasks, data); + + const TaskGraph::SortResult result = graph.findOptimalOrder(); + + // Expect that we evaluate the branch with intermediate before the others, + // since that intermediate 3 can take the space we'll need later for output 1 + // or 2 + ASSERT_EQ(result.steps.size(), tasks.size()); + EXPECT_NE(getTasks(result).back(), 3); + EXPECT_EQ(result.steps.back().high_water_mark, 4); +} + +// This is a parallel chains graph, the optimal schedule should cut this into an +// out-tree and an in-tree with the cut placed at local minimal of the +// hill-valley representation of each chain. +// See Kayaaslan et al. 2018 +// https://doi.org/10.1016/j.tcs.2017.09.037 +TEST_F(TaskGraphTest, DifferentSizes) { + /* + * 0 + * / \ + * 1 4 + * | | + * 2 5 + * | | + * 3 6 + * | | + * | 7 + * \ / + * 8 + */ + Tasks tasks{ + {{0}, {1}}, // Task 0 + {{1}, {2}}, // Task 1 + {{2}, {3}}, // Task 2 + {{0}, {4}}, // Task 3 + {{4}, {5}}, // Task 4 + {{5}, {6}}, // Task 5 + {{6}, {7}}, // Task 6 + {{3, 7}, {8}} // Task 7 + }; + std::vector data = inferData(tasks); + data[0].size = 1; + + data[1].size = 15; + data[2].size = 7; // hill-valley = 8 + data[3].size = 11; + + data[4].size = 10; + data[5].size = 11; + data[6].size = 7; // hill-valley = 4 + data[7].size = 8; + + data[8].size = 1; + auto graph = TaskGraph(tasks, data); + + const TaskGraph::SortResult result = graph.findOptimalOrder(); + + // The local minima are at data 2 and 6, so we should compute up to each first + // then compute the end parts afterward. + + ASSERT_EQ(result.steps.size(), tasks.size()); + std::vector expected{0, 1, 3, 4, 5, 2, 6, 7}; + EXPECT_EQ(getTasks(result), expected); + // Note that the suboptimal straightforward ordering in this case is {0, 1, + // 2, 3, 4, 5, 6, 7} which has a high_water_mark of 33 + EXPECT_EQ(result.steps.back().high_water_mark, 29); +} + +// This is the example from Figure 1 of Kayaaslan et al. 2018 +// It includes temporary space needed for each task. +// This is a candidate for the Liu algorithm instead of brute force search. +// https://doi.org/10.1016/j.tcs.2017.09.037 +TEST_F(TaskGraphTest, InTree) { + /* + * 0 3 + * | | + * 1 4 7 + * | | | + * 2 5 8 + * \| | + * 6 9 + * \| + * 10 + */ + Tasks tasks{ + {{0}, {1}}, // Task 0 + {{1}, {2}}, // Task 1 + {{3}, {4}}, // Task 2 + {{4}, {5}}, // Task 3 + {{2, 5}, {6}}, // Task 4 + {{7}, {8}}, // Task 5 + {{8}, {9}}, // Task 6 + {{6, 9}, {10}}, // Task 7 + }; + std::vector data = inferData(tasks); + data[0].size = 1; // input + data[1].size = 4; + data[2].size = 1; + data[3].size = 1; // input + data[4].size = 2; + data[5].size = 2; + data[6].size = 2; + data[7].size = 1; // input + data[8].size = 1; + data[9].size = 5; + data[10].size = 1; + tasks[0].temp_space = 4; // A + tasks[1].temp_space = 3; // B + tasks[2].temp_space = 1; // C + tasks[3].temp_space = 2; // D + tasks[4].temp_space = 2; // E + tasks[5].temp_space = 8; // F + tasks[6].temp_space = 2; // G + tasks[7].temp_space = 1; // H + + auto graph = TaskGraph(tasks, data); + + const TaskGraph::SortResult result = graph.findOptimalOrder(); + + ASSERT_EQ(result.steps.size(), tasks.size()); + // By Kayaaslan et al. 2018, Sn 3.1, + // one optimal order is F A B C D E G H which has cost 34 + // There are others with the same cost such as F C D A B E G H + EXPECT_EQ(result.steps.back().high_water_mark, 34); +} + +} // namespace nvfuser diff --git a/tests/python/direct/test_repro.py b/tests/python/direct/test_repro.py index 036e84f7bb5..29dd1b0facd 100644 --- a/tests/python/direct/test_repro.py +++ b/tests/python/direct/test_repro.py @@ -6,6 +6,7 @@ import torch import pytest from nvfuser_direct import FusionDefinition, DataType +from nvfuser.pytorch_utils import RecordTorchMemory from python.direct_utils import skip_if_global_memory_below_gb # Use smaller range for torch.testing.make_tensor for nvfuser_direct.validate @@ -4741,3 +4742,60 @@ def nvfuser_fusion(fd: FusionDefinition) -> None: ] fd.validate(inputs) + + # https://github.com/NVIDIA/Fuser/issues/3290 + def test_execution_order(self): + N_PARALLEL_PATHS = 10 + + with FusionDefinition() as fd: + T0s = [ + fd.define_tensor( + shape=[256, 256], + contiguity=[True, True], + dtype=DataType.Float, + is_cpu=False, + stride_order=[1, 0], + ) + for _ in range(N_PARALLEL_PATHS) + ] + a = fd.define_tensor( + shape=[256, 256], + contiguity=[True, True], + dtype=DataType.Float, + is_cpu=False, + stride_order=[1, 0], + ) + for T0 in T0s: + T1 = fd.ops.relu(T0) + T2 = fd.ops.matmul(T1, T1) + T3 = fd.ops.relu(T2) + a = fd.ops.matmul(T3, a) + fd.add_output(a) + + t0s = [ + torch.randn(256, 256, device="cuda") for _ in range(N_PARALLEL_PATHS) + ] # 0.25 MiB * N_PARALLEL_PATHS + a = torch.randn(256, 256, device="cuda") # 0.25 MiB + + # Warm up + fd.execute([*t0s, a]) + + with RecordTorchMemory() as nvf_mem: + fd.execute([*t0s, a]) + + def eager_func(t0s, a): + for t0 in t0s: + t1 = torch.nn.functional.relu(t0) + del t0 + t2 = torch.matmul(t1, t1) + del t1 + t3 = torch.nn.functional.relu(t2) + del t2 + a = torch.matmul(t3, a) + del t3 + return a + + with RecordTorchMemory() as eager_mem: + eager_func(t0s, a) + + assert nvf_mem == eager_mem