You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The removal of getArgsNumAfterSegmentRuns and the associated checks in the tests might impact the ability to verify that intermediate tensors are being properly deallocated. Ensure that the new method of checking peak memory directly is sufficient to catch memory leaks or excessive memory usage.
args, runtime_workspace_, segmented_fusion_->inputs());
// group should share cache id.auto group_cache_id = args.getCacheId();
constint64_t num_groups = (int64_t)runtime_workspace_.group_run_order.size();
if (isProfilerEnabled()) {
FusionProfiler::startCompile();
}
// host ir
std::unique_ptr<hir::HostIrContainer> hic;
if (isOptionEnabled(EnableOption::HostIrLowering)) {
hic = std::make_unique<hir::HostIrContainer>(
num_groups); // Some indices will be empty
}
std::atomic<bool> detect_exception_in_thread_pool{false};
std::string thread_pool_error_message;
std::mutex thread_pool_error_message_mutex;
for (int64_t run_order_id = 0; run_order_id < num_groups; ++run_order_id) {
auto group_to_run = runtime_workspace_.group_run_order.at(run_order_id);
if (isDebugDumpEnabled(DebugDumpOption::PythonDefinitionSegments)) {
debug() << "Python definition for segmented group "
<< group_to_run->groupId() << ":" << std::endl;
python_frontend::FusionDefinition fd(/*id=*/std::nullopt);
python_frontend::translate(group_to_run->getFusion(), &fd);
fd.print(debug());
}
// TODO: index mode should be updated per segmented kernel// Prepare input vectorauto group_runtime_inputs =
args_manager.translateValsToArgs(group_to_run->inputs());
group_runtime_inputs.setDeviceIndex(args.getDeviceIndex());
if (group_cache_id.has_value()) {
group_runtime_inputs.setCacheId(group_cache_id.value());
}
if (num_groups == 1 || isOptionDisabled(DisableOption::ParallelCompile)) {
FUSER_PERF_SCOPE("FusionKernelRuntime::compileFusionParallel");
c10::cuda::CUDAGuard dg(args.getDeviceIndex());
c10::Device device(c10::DeviceType::CUDA, args.getDeviceIndex());
compileKernel(group_runtime_inputs, group_to_run, hic.get());
} else {
hir::HostIrContainer* hic_ptr = hic.get();
// launch compileKernel thread heregetThreadPool()->run([this,
args,
group_runtime_inputs,
group_to_run,
&detect_exception_in_thread_pool,
&thread_pool_error_message,
&thread_pool_error_message_mutex,
hic_ptr]() {
FUSER_PERF_SCOPE("FusionKernelRuntime::compileFusionParallel");
try {
c10::cuda::CUDAGuard dg(args.getDeviceIndex());
c10::Device device(c10::DeviceType::CUDA, args.getDeviceIndex());
compileKernel(group_runtime_inputs, group_to_run, hic_ptr);
} catch (const std::exception& e) {
// Set flag inside lambda so we can throw an exception after thread// pool completes its work.
detect_exception_in_thread_pool.store(true);
const std::lock_guard<std::mutex> lock(
thread_pool_error_message_mutex);
std::stringstream ss;
ss << thread_pool_error_message << "\nError from segmentation group "
<< group_to_run->groupId() << ": " << e.what() << "\n";
thread_pool_error_message = ss.str();
}
});
}
auto fusion_to_run = segmented_fusion_->makeFusion(group_to_run).second;
auto group_runtime_outputs =
inferOutputSizes(fusion_to_run.get(), group_runtime_inputs);
// map output args to tensor map
args_manager.updateWithSegmentOutputs(
group_to_run->outputs(), group_runtime_outputs, run_order_id);
}
// add all expressions and compiled kernels to the host ir containerif (hic != nullptr) {
IrCloner ir_cloner(hic.get());
FusionGuard::setCurFusion(hic.get());
for (int64_t run_order_id = 0; run_order_id < num_groups; ++run_order_id) {
auto group_to_run = runtime_workspace_.group_run_order.at(run_order_id);
if (hic->hasKernelExecutor(run_order_id)) {
auto in_clone = ir_cloner.clone(group_to_run->inputs());
auto out_clone = ir_cloner.clone(group_to_run->outputs());
auto heuristic_params = schedulers().at(run_order_id).get();
auto launch_kernel = IrBuilder::create<hir::LaunchKernel>(
run_order_id,
heuristic_params->lparams,
heuristic_params->cparams,
std::vector<Val*>{in_clone},
std::vector<Val*>{out_clone});
hic->pushBackTopLevelExprs(launch_kernel);
} else {
// push back segment's exprs into the container as top level expressionsfor (auto* expr : group_to_run->exprs()) {
auto cloned_expr = ir_cloner.clone(expr);
hic->pushBackTopLevelExprs(cloned_expr);
}
}
}
for (const Val* in : segmented_fusion_->inputs()) {
hic->addInput(ir_cloner.clone(in));
}
for (const Val* out : segmented_fusion_->outputs()) {
hic->addOutput(ir_cloner.clone(out));
}
}
if (num_groups != 1 && !isOptionDisabled(DisableOption::ParallelCompile)) {
// Wait until all segments finish compilinggetThreadPool()->waitWorkComplete();
NVF_ERROR(
!detect_exception_in_thread_pool.load(),
"Detected exception while compiling fusion segments in parallel. ",
"Error messages from all threads are printed below.\n",
thread_pool_error_message,
"\nUse NVFUSER_DISABLE=parallel_compile to simplify error message.");
}
if (hic != nullptr) {
hie_ = std::make_unique<hir::HostIrEvaluator>(
hir::HostIrEvaluator(std::move(hic)));
}
if (isProfilerEnabled()) {
FusionProfiler::stopCompile();
}
}
voidFusionKernelRuntime::disableKernelLaunch() {
NVF_CHECK(
isCompiled(),
"Tried to set parameters of executors before they were initialized.");
for (auto& executor : executors_) {
if (auto ke = dynamic_cast<KernelExecutor*>(executor.get())) {
ke->setExecuteKernelFlag(false);
}
}
}
SegmentedFusion* FusionKernelRuntime::fusionSegments() const {
return segmented_fusion_.get();
}
HeuristicParamsList* FusionKernelRuntime::schedulerHeuristics() const {
return heuristics_.get();
}
const ExecutorLog& FusionKernelRuntime::getMostRecentExecutorLog() const {
NVF_ERROR(profiling_, "Executor log is only produced in profiling mode");
return most_recent_executor_log_;
}
std::optional<std::unique_ptr<HeuristicParamsList>> FusionKernelRuntime::
getMaybeHeuristicsFor(
const KernelArgumentHolder& args,
std::optional<PrimDataType> forced_index_type) {
FUSER_PERF_SCOPE("FusionKernelRuntime::getMaybeHeuristicsFor");
// The runtime group run order is different from the segmented_fusion group// order. Instead of using HeuristicParamsList::emplaceBack, we initialize// HeuristicParamsList with the desired number of groups.constint64_t num_groups = (int64_t)runtime_workspace_.group_run_order.size();
std::unique_ptr<HeuristicParamsList> heuristics =
std::make_unique<HeuristicParamsList>(num_groups);
// We make a mutable copy of args so that we can use it in an// ArgumentManager
ArgumentManager args_manager(
args, runtime_workspace_, segmented_fusion_->inputs());
// Follow group run orderfor (int64_t group_id : c10::irange(num_groups)) {
auto group_to_run = runtime_workspace_.group_run_order.at(group_id);
// Create fusion for this segmented group
Fusion* fusion_to_run = group_to_run->getFusion();
NVF_ERROR(fusion_to_run != nullptr);
FusionGuard fg(fusion_to_run);
// Get input arguments for SchedulerRuntimeInfo
KernelArgumentHolder group_runtime_inputs =
args_manager.translateValsToArgs(group_to_run->inputs());
group_runtime_inputs.setDeviceIndex(args.getDeviceIndex());
// Create PrecomputedValues for fusion segment
std::unique_ptr<PrecomputedValues> evaluator_precomputed_values;
{
FUSER_PERF_SCOPE(
"FusionKernelRuntime::getMaybeHeuristicsFor::PrecomputedValues");
evaluator_precomputed_values =
std::make_unique<PrecomputedValues>(fusion_to_run);
evaluator_precomputed_values->bindInputs(group_runtime_inputs);
// TODO Remove binding the original fusion inputs when creating// heuristics for fusion segment.
evaluator_precomputed_values->bindValues(
group_to_run->getCompleteFusionInputs(), args);
evaluator_precomputed_values->evaluate();
}
// Get all tensorviews for segmented fusion
std::vector<TensorView*> all_tvs_for_fusion_to_run =
fusion_to_run->allTvs();
SchedulerRuntimeInfo fusion_to_run_info(
fusion_to_run,
group_runtime_inputs,
evaluator_precomputed_values.get(),
all_tvs_for_fusion_to_run,
forced_index_type);
if (heuristics_ == nullptr) {
// Add new scheduler entry for this segmented group
heuristics->at(group_to_run->groupId()) =
segmented_fusion_->makeInitialHeuristicParams(
group_to_run, fusion_to_run_info);
} else {
// Try to get scheduler entry// NOTE: we are able to skip compile time checks here since the fusion// has already been segmented. During segmentation, each segment must// pass both canScheduleCompileTime and canScheduleRuntime for the// scheduler that accepts the segment. Since we might have different// runtime info than was used during segmentation, we cannot skip// canScheduleRuntime, but it is safe to skip canScheduleCompileTime. We// skip it here to avoid performing expensive fusion traversals on the// dynamic shape path.auto maybe_heuristic_params =
group_to_run->getMaybeHeuristicParams(fusion_to_run_info);
// If unavailable, then return std::nulloptif (!maybe_heuristic_params.has_value()) {
return std::nullopt;
}
// Check if this scheduler entry matches the previous entry for this// segmented group. If no match, then return std::nullptrauto heuristic_params = std::move(maybe_heuristic_params.value());
if (!heuristic_params->sameAs(
heuristics_->at(group_to_run->groupId()).get())) {
return std::nullopt;
}
// Add new scheduler entry for this segmented group
heuristics->at(group_to_run->groupId()) = std::move(heuristic_params);
}
// Generate metadata for the fusion's outputsauto group_runtime_outputs = inferOutputSizes(
fusion_to_run,
group_runtime_inputs,
evaluator_precomputed_values.get());
args_manager.updateWithSegmentOutputs(
group_to_run->outputs(), group_runtime_outputs, group_id);
}
return heuristics;
}
voidFusionKernelRuntime::updateHeuristicsLaunchParams(
HeuristicParamsList* update_heuristics) {
auto scheduler_list_length = heuristics_->heuristicsList().size();
NVF_ERROR(
update_heuristics->heuristicsList().size() == scheduler_list_length);
for (constauto i : c10::irange(scheduler_list_length)) {
auto& heuristic_params = heuristics_->heuristicsList()[i];
heuristic_params->lparams = update_heuristics->heuristicsList()[i]->lparams;
}
}
const std::vector<std::unique_ptr<ExecutorAbstract>>& FusionKernelRuntime::
executors() const {
return executors_;
}
std::unordered_map<Val*, PolymorphicValue> FusionKernelRuntime::
runSegmentsWithInputs(KernelArgumentHolder& args) {
FUSER_PERF_SCOPE("FusionKernelRuntime::runSegmentsWithInputs");
NVF_ERROR(
args.size() == segmented_fusion_->inputs().size(),
"Inputs were not set up correctly, received ",
args.size(),
" inputs but expected ",
segmented_fusion_->inputs().size());
ArgumentManager args_manager(
args, runtime_workspace_, segmented_fusion_->inputs());
// group should share cache id.auto group_cache_id = args.getCacheId();
constint64_t num_groups = (int64_t)runtime_workspace_.group_run_order.size();
kernel_time_ms_ = 0;
for (auto run_order_id : c10::irange(num_groups)) {
// TODO: index mode should be updated per segmented kernel// Prepare input vectorauto group_to_run = runtime_workspace_.group_run_order.at(run_order_id);
KernelArgumentHolder group_runtime_inputs =
args_manager.translateValsToArgs(group_to_run->inputs());
group_runtime_inputs.setDeviceIndex(args.getDeviceIndex());
if (group_cache_id.has_value()) {
group_runtime_inputs.setCacheId(group_cache_id.value());
}
// TODO: currently we are still outputing PyTorch tensors, instead of// something abstract. This is quite unsatisfying.// Run graph segment
KernelArgumentHolder group_runtime_outputs =
runKernelWithInput(group_runtime_inputs, group_to_run);
args_manager.updateWithSegmentOutputs(
group_to_run->outputs(), group_runtime_outputs, run_order_id);
}
if (isProfilerEnabled()) {
int64_t input_bytes = 0;
for (auto* inp : fusionSegments()->inputs()) {
if (inp->isA<TensorView>()) {
The new test ClearGmemBetweenSegments should be thoroughly reviewed to ensure it covers all scenarios that the old test FusionClearGmemBetweenSegments_CUDA covered. Verify that the new test accurately checks for memory deallocation and peak memory usage.
TEST_F(RuntimeTest, ClearGmemBetweenSegments) {
at::cuda::clearCublasWorkspaces();
releaseZeroedMemory();
ASSERT_EQ(memoryAllocated(0), 0) << "Previous tests leaked memory.";
auto fusion = std::make_unique<Fusion>();
FusionGuard fg(fusion.get());
std::vector<int64_t> input_shape{32, 64, 8, 128};
auto tv0 = TensorViewBuilder()
.ndims(input_shape.size())
.dtype(DataType::Double)
.build();
fusion->addInput(tv0);
auto tv1 = add(tv0, IrBuilder::create<Val>(1.0)); // Group 0auto tv2 = sum(tv1, {0}); // Group 0auto tv3 = sum(tv2, {-1}); // Group 1auto output = sum(tv3, {0}); // Group 2
fusion->addOutput(output);
resetPeakMemoryStats(0);
ASSERT_EQ(maxMemoryAllocated(0), 0) << "No tensors are allocated so far.";
auto options = at::TensorOptions().dtype(at::kDouble).device(at::kCUDA, 0);
at::Tensor at_x = at::randn(input_shape, options);
FusionExecutorCache executor_cache(std::move(fusion));
auto outputs = executor_cache.runFusionWithInputs({at_x});
constint64_t max_memory_allocated = maxMemoryAllocated(0);
auto runtime = executor_cache.getMostRecentKernelRuntime();
EXPECT_EQ(runtime->fusionSegments()->groups().size(), 3)
<< "segmentation didn't happen as expected";
testValidate(executor_cache.fusion(), outputs, {at_x}, __LINE__, __FILE__);
if (c10::utils::check_env("PYTORCH_NO_CUDA_MEMORY_CACHING") == true) {
GTEST_SKIP() << "Skipped because PYTORCH_NO_CUDA_MEMORY_CACHING is on. ""This usually happens when running with compute-sanitizer. ""maxMemoryAllocated can only collect peak memory ""from a caching allocator.";
}
EXPECT_EQ(
max_memory_allocated,
(32 * 64 * 8 * 128 + 64 * 8 * 128 + 64 * 8) * sizeof(double))
<< "tv0 (32 * 64 * 8 * 128) outlived the execution, so it contributes ""to the peak memory. tv1 was never allocated because it's internal ""to group 0. tv2 (64 * 8 * 128) and tv3 (64 * 8) were both alive ""when executing group 1.";
}
} // namespace nvfuser
The new functions resetPeakMemoryStats, maxMemoryAllocated, and memoryAllocated should be reviewed for correctness and efficiency. Ensure they accurately reflect the memory usage and peak memory allocation on the specified device.
voidresetPeakMemoryStats(const c10::DeviceIndex device) {
c10::cuda::CUDACachingAllocator::CUDAAllocator* allocator =
c10::cuda::CUDACachingAllocator::get();
NVF_CHECK(allocator != nullptr);
allocator->resetPeakStats(device);
}
namespace {
// Stats like allocated_bytes comes as a size-3 array (cf.// https://github.com/pytorch/pytorch/blob/feb503c1df78afd46962ed04e446d6e88ac0522d/c10/core/Allocator.h#L365-L370).// The 0-th element is an aggregation of both the small pool and the large.constexprautokAggregateStatsIndex = static_cast<uint64_t>(
#if NVF_TORCH_VERSION_NO_LESS(2, 7, 0)
c10::CachingAllocator::StatType::AGGREGATE
#else
c10::CachingDeviceAllocator::StatType::AGGREGATE
#endif
);
} // namespaceint64_tmaxMemoryAllocated(const c10::DeviceIndex device) {
c10::cuda::CUDACachingAllocator::CUDAAllocator* allocator =
c10::cuda::CUDACachingAllocator::get();
NVF_CHECK(allocator != nullptr);
c10::CachingDeviceAllocator::DeviceStats device_stats =
allocator->getDeviceStats(device);
return device_stats.allocated_bytes.at(kAggregateStatsIndex).peak;
}
int64_tmemoryAllocated(const c10::DeviceIndex device) {
c10::cuda::CUDACachingAllocator::CUDAAllocator* allocator =
c10::cuda::CUDACachingAllocator::get();
NVF_CHECK(allocator != nullptr);
c10::CachingDeviceAllocator::DeviceStats device_stats =
allocator->getDeviceStats(device);
return device_stats.allocated_bytes.at(kAggregateStatsIndex).current;
}
} // namespace nvfuser
The reason will be displayed to describe this comment to others. Learn more.
We can also do this in TearDown of all tests so we can verify none of them leak GPU memory. I'm happy to pursue that in a separate PR because the blast radius is larger.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Instead, check peak memory directly. The utilities will be used for verifying deallocation in host IR.
https://testing.googleblog.com/2013/08/testing-on-toilet-test-behavior-not.html