Skip to content

Symmetric memory pytorch backends#6023

Open
saivishal1999 wants to merge 18 commits intomainfrom
symmetric-memory-pytorch-backends
Open

Symmetric memory pytorch backends#6023
saivishal1999 wants to merge 18 commits intomainfrom
symmetric-memory-pytorch-backends

Conversation

@saivishal1999
Copy link
Copy Markdown
Collaborator

No description provided.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 2, 2026

Review updated until commit 6996d05

Description

  • Add PyTorch symmetric memory backends (NCCL, NVSHMEM, CUDA) as alternatives to native VMM

  • Implement getSymmetricMemoryBackend() to select backend via NVFUSER_ENABLE=symmetric_memory_backend option

  • Integrate PyTorch's c10d::symmetric_memory for allocation, rendezvous, and remote tensor access

  • Add Communicator methods to expose Store and Backend for PyTorch symmetric memory integration

Changes walkthrough

Relevant files
Enhancement
6 files
ipc_utils.h
Add SymmetricMemoryBackend enum and getter                             
+13/-0   
ipc_utils.cpp
Implement getSymmetricMemoryBackend option parsing             
+18/-0   
symmetric_tensor.h
Add PyTorch symmetric memory handle member                             
+15/-6   
symmetric_tensor.cpp
Implement PyTorch backend allocation and remote access     
+162/-1 
communicator.h
Declare getStore and getWorldBackendIntrusivePtr                 
+13/-0   
communicator.cpp
Implement getStore and getWorldBackendIntrusivePtr             
+16/-0   
Configuration changes
2 files
options.h
Add SymmetricMemoryBackend to EnableOption enum                   
+2/-0     
options.cpp
Register symmetric_memory_backend enable option                   
+1/-0     
Tests
1 files
test_multidevice_symmetric_tensor.cpp
Add tests for symmetric memory backend selection                 
+108/-0 
Miscellaneous
1 files
fbuild.sh
Add build script for development                                                 
+24/-0   

PR Reviewer Guide

Here are some key observations to aid the review process:

🧪 PR contains tests
⚡ Recommended focus areas for review
Silent fallback to Native backend

When an invalid argument is passed to symmetric_memory_backend option (e.g., "pytorch_invalid"),
getSymmetricMemoryBackend() silently falls back to Native instead of reporting an error.
This could mask user configuration mistakes. Consider adding validation to warn or error
on unknown backend arguments.

SymmetricMemoryBackend getSymmetricMemoryBackend() {
  if (isOptionEnabled(EnableOption::SymmetricMemoryBackend)) {
    if (hasEnableOptionArgument(
            EnableOption::SymmetricMemoryBackend, "pytorch_nccl")) {
      return SymmetricMemoryBackend::PyTorchNccl;
    }
    if (hasEnableOptionArgument(
            EnableOption::SymmetricMemoryBackend, "pytorch_nvshmem")) {
      return SymmetricMemoryBackend::PyTorchNvshmem;
    }
    if (hasEnableOptionArgument(
            EnableOption::SymmetricMemoryBackend, "pytorch_cuda")) {
      return SymmetricMemoryBackend::PyTorchCuda;
    }
  }
  return SymmetricMemoryBackend::Native;
}
PyTorch backend tests commented out

The test PyTorchBackend_RemoteAccessCorrectness (lines 125-163) is commented out. Since this
PR introduces PyTorch symmetric memory backends, having at least one active test for the
non-native paths would be valuable to ensure correctness. Consider enabling or adding an
alternative test for the PyTorch backend path.

// TEST_F(SymmetricTensorTest, PyTorchBackend_RemoteAccessCorrectness) {
//   if (communicator_->size() == 1) {
//     GTEST_SKIP() << "Skipping test for single device";
//   }
//   SymmetricMemoryBackend backend = getSymmetricMemoryBackend();
//   if (backend == SymmetricMemoryBackend::Native) {
//     GTEST_SKIP()
//         << "PyTorch backend not selected; set NVFUSER_ENABLE=symmetric_memory_backend(pytorch_nccl) to run";
//   }

//   const int64_t rank = communicator_->deviceId();
//   const int64_t world_size = communicator_->size();

//   at::Tensor local_tensor = SymmetricTensor::allocate(
//       {256, 512}, at::ScalarType::Float, communicator_->device());
//   SymmetricTensor sym_tensor(local_tensor);

//   EXPECT_TRUE(local_tensor.is_cuda());
//   EXPECT_EQ(local_tensor.numel(), 256 * 512);

//   float local_value = static_cast<float>(rank + 200);
//   local_tensor.fill_(local_value);

//   sym_tensor.setupRemoteHandles();

//   for (int64_t peer_rank = 0; peer_rank < world_size; ++peer_rank) {
//     void* peer_ptr = sym_tensor.remoteTensor(peer_rank).data_ptr();
//     EXPECT_NE(peer_ptr, nullptr);

//     float peer_value;
//     NVFUSER_CUDA_RT_SAFE_CALL(cudaMemcpy(
//         &peer_value, peer_ptr, sizeof(float), cudaMemcpyDeviceToHost));

//     float expected_value = static_cast<float>(peer_rank + 200);
//     EXPECT_FLOAT_EQ(peer_value, expected_value)
//         << "Rank " << rank << " reading from rank " << peer_rank
//         << " (PyTorch backend)";
//   }
// }
Unnecessary build script added

A new file fbuild.sh was added which appears to be a local development/build script with
hardcoded paths (e.g., /opt/hpcx/ucc). This should likely be removed from the PR as it's
not part of the feature implementation and contains machine-specific configuration.

#!/bin/bash

export CC=clang-20
export CXX=clang++-20
export LDFLAGS="-fuse-ld=mold"

export NVFUSER_BUILD_ENABLE_PCH

export UCC_HOME="/opt/hpcx/ucc"
export UCC_DIR="/opt/hpcx/ucc/lib/cmake/ucc"
export UCX_HOME="/opt/hpcx/ucx"
export UCX_DIR="/opt/hpcx/ucx/lib/cmake/ucx"

# export TORCH_CUDA_ARCH_LIST="9.0"

export NVFUSER_BUILD_WITH_UCC=1
export NVFUSER_BUILD_INSTALL_DIR=$BUILD_DIRECTORY/nvfuser
export NVFUSER_BUILD_DIR=$BUILD_DIRECTORY

# Enable debug mode, leave empty for non-debug compilation
export NVFUSER_BUILD_BUILD_TYPE=Debug
export RUN_CMAKE=""

pip install -v -e ./python --no-build-isolation

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Mar 2, 2026

Greptile Summary

This PR integrates PyTorch's symmetric memory layer (torch.distributed._symmetric_memory) as an optional backend for SymmetricTensor, selectable via NVFUSER_ENABLE=symmetric_memory_backend(pytorch_nccl|pytorch_nvshmem|pytorch_cuda). The native CUDA VMM+IPC path is preserved as the default. Alongside the new backend, the PR fixes several pre-existing issues in the native path (unsigned loop variable UB, uninitialized structs, size_tint64_t sign-cast warnings, mc_ptr_ rename).

Key changes:

  • SymmetricMemoryBackend enum and getSymmetricMemoryBackend() added to ipc_utils.
  • ensurePyTorchSymmMemBackend() in symmetric_tensor.cpp handles one-time set_backend and NVSHMEM \"0\" alias registration.
  • Communicator::getBackendForTeam now wraps the NCCL backend in a c10d::ProcessGroup so PyTorch's rendezvous can resolve it by name — but does not guard against the current rank being absent from the team, producing a ProcessGroup with rank == team.size() (out of range) and a nullptr backend (see comment on communicator.cpp).
  • ContiguousView test is correctly skipped for the PyTorch backend; existing multicast and remote-handle tests are unchanged.

Confidence Score: 2/5

Not safe to merge — the ProcessGroup creation in getBackendForTeam does not guard against the current rank being absent from the team, leading to a ProcessGroup registered with an out-of-range rank and nullptr backend.

The logic bug in getBackendForTeam is a P0 issue: whenever getBackendForTeam is called for a sub-team that does not include the current rank (a common case), a malformed ProcessGroup is registered and can be resolved by symmetric-memory rendezvous, causing hangs or crashes. The rest of the changes are sound (native-path cleanups are correct, macro guards are consistent, enum additions are clean), but this one issue blocks safe merging.

csrc/multidevice/communicator.cpp (lines 427–445): ProcessGroup creation block missing team-membership guard.

Important Files Changed

Filename Overview
csrc/multidevice/communicator.cpp Adds ProcessGroup wrapper creation in getBackendForTeam for symmetric memory NCCL rendezvous; contains a logic bug where the wrapper is created without checking if the local rank belongs to the team, leading to an invalid rank index and nullptr backend.
csrc/multidevice/communicator.h Adds registerProcessGroup() method and process_groups_ map; correctly guarded by NVFUSER_DISTRIBUTED && USE_DISTRIBUTED macros.
csrc/multidevice/symmetric_tensor.cpp Adds PyTorch symmetric memory backend support (NCCL/NVSHMEM/CUDA transports); multiple improvements to native path (zero-init, static_cast fixes); NVF_ERROR(false,...) pattern used in unreachable path at end of ensurePyTorchSymmMemBackend is dead code.
csrc/multidevice/symmetric_tensor.h Adds torch_symm_handle_ member for PyTorch backend, renames mc_ptr_ to multicast_ptr_; cleanly guarded.
csrc/multidevice/ipc_utils.h Adds SymmetricMemoryBackend enum and getSymmetricMemoryBackend() declaration; minor improvement of adding explicit underlying type to MulticastProtocol enum.
csrc/multidevice/ipc_utils.cpp Adds getSymmetricMemoryBackend() implementation; various struct initialization cleanups (zero-init, nullptr).
csrc/options.cpp Registers the new SymmetricMemoryBackend option in the available options map.
csrc/options.h Adds SymmetricMemoryBackend to the EnableOption enum; clean change.
tests/cpp/test_multidevice_symmetric_tensor.cpp Adds GTEST_SKIP guard for ContiguousView test when PyTorch backend is active; no longer contains the debug print statement.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A["SymmetricTensor::allocate / setupRemoteHandles"] --> B{"getSymmetricMemoryBackend()"}
    B -->|Native| C["Native VMM path\ncuMemCreate + IPC FD exchange"]
    B -->|"PyTorchNccl / PyTorchNvshmem / PyTorchCuda"| D["ensurePyTorchSymmMemBackend()"]
    D --> E["call_once: set_backend(name)"]
    D --> F["comm.getBackendForTeam(all_ranks, kNccl)"]
    F --> G{"Current rank in team?"}
    G -->|Yes| H["Create ProcessGroup wrapper\nsetBackend / setGroupName\nregisterProcessGroup(team_key)"]
    G -->|"No - BUG"| I["team_rank = team.size() - invalid\nProcessGroup with bad rank registered"]
    D --> J{"PyTorchNvshmem?"}
    J -->|Yes| K["call_once pg0_once\nresolve or register alias 0"]
    J -->|No| L["return group_name"]
    K --> L
    L --> M["allocate: empty_strided_p2p"]
    L --> N["setupRemoteHandles:\nbarrier + rendezvous\ntorch_symm_handle_ set"]
Loading

Reviews (14): Last reviewed commit: "Address pending review cmnts" | Re-trigger Greptile

Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 files reviewed, 4 comments

Edit Code Review Agent Settings | Greptile

Comment on lines +46 to +72
void ensurePyTorchSymmMemBackend(SymmetricMemoryBackend backend) {
static std::once_flag once;
std::call_once(once, [backend]() {
const char* name = nullptr;
switch (backend) {
case SymmetricMemoryBackend::PyTorchNccl:
name = "NCCL";
break;
case SymmetricMemoryBackend::PyTorchNvshmem:
name = "NVSHMEM";
break;
case SymmetricMemoryBackend::PyTorchCuda:
name = "CUDA";
break;
default:
NVF_ERROR(false, "Unexpected PyTorch symmetric memory backend");
}
c10d::symmetric_memory::set_backend(name);
Communicator& comm = Communicator::getInstance();
NVF_CHECK(comm.is_available(), "Communicator not available for symmetric memory");
c10d::symmetric_memory::set_group_info(
kPyTorchSymmMemGroupName,
static_cast<int>(comm.deviceId()),
static_cast<int>(comm.size()),
comm.getStore());
});
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NCCL backend initialization is incomplete — register_process_group is never called

ensurePyTorchSymmMemBackend calls set_group_info but never calls c10d::register_process_group. According to the comment added to communicator.h for getWorldBackendIntrusivePtr:

Returns the world backend as an intrusive_ptr so it can be registered with c10d::register_process_group (e.g. for PyTorch symmetric memory NCCL rendezvous, which resolves the group by name).

getWorldBackendIntrusivePtr was clearly introduced to supply the backend for this registration, yet the call to c10d::register_process_group is absent from ensurePyTorchSymmMemBackend. PyTorch's NCCL symmetric-memory rendezvous resolves the process group by name at the point it is called; without a prior register_process_group(kPyTorchSymmMemGroupName, ...), the NCCL backend path will fail to locate the group and throw at rendezvous time.

The missing call should be something like:

// After set_group_info, for NCCL backend:
c10d::register_process_group(
    kPyTorchSymmMemGroupName,
    comm.getWorldBackendIntrusivePtr(CommunicatorBackend::kNccl));

The fact that getWorldBackendIntrusivePtr was added in this exact PR but is never invoked strongly suggests this step was accidentally left out.

Comment on lines +150 to +152
std::vector<int64_t> strides(sizes.size());
strides.back() = 1;
for (int64_t i = (int64_t)strides.size() - 2; i >= 0; --i) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undefined behavior when sizes is empty (0-dim tensor)

std::vector<int64_t> strides(sizes.size());
strides.back() = 1;   // UB if sizes is empty

std::vector::back() on an empty vector is undefined behaviour. The same guard-free pattern also exists in the native path further down in the same function (~line 225). While allocating a 0-dimensional symmetric tensor is unusual, the PyTorch path that was just added adds a new callsite where callers may pass {} as sizes. A simple check is sufficient:

NVF_CHECK(!sizes.empty(), "Cannot allocate a 0-dim symmetric tensor");

or initialise strides defensively (matching the standard row-major convention for 0-dim tensors, which is an empty strides vector) and skip the loop entirely when sizes is empty.

@nsarka
Copy link
Copy Markdown
Member

nsarka commented Mar 3, 2026

Sorry! I accidentally hit the button to merge main into the branch. Hopefully it's ok.

Comment on lines +46 to +72
void ensurePyTorchSymmMemBackend(SymmetricMemoryBackend backend) {
static std::once_flag once;
std::call_once(once, [backend]() {
const char* name = nullptr;
switch (backend) {
case SymmetricMemoryBackend::PyTorchNccl:
name = "NCCL";
break;
case SymmetricMemoryBackend::PyTorchNvshmem:
name = "NVSHMEM";
break;
case SymmetricMemoryBackend::PyTorchCuda:
name = "CUDA";
break;
default:
NVF_ERROR(false, "Unexpected PyTorch symmetric memory backend");
}
c10d::symmetric_memory::set_backend(name);
Communicator& comm = Communicator::getInstance();
NVF_CHECK(comm.is_available(), "Communicator not available for symmetric memory");
c10d::symmetric_memory::set_group_info(
kPyTorchSymmMemGroupName,
static_cast<int>(comm.deviceId()),
static_cast<int>(comm.size()),
comm.getStore());
});
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::call_once exception-safety leaves set_backend in a permanently broken state on retry

std::call_once resets its once_flag if the callable exits via an exception, allowing a subsequent call to retry. However, the callable here calls set_backend(name) before set_group_info(...). If set_backend succeeds but set_group_info subsequently throws (e.g., because the store is unavailable), once_flag is reset and the next allocate() call will attempt set_backend(name) a second time. PyTorch's symmetric memory layer is likely to throw on that second set_backend call (backend already configured), making it impossible to recover without restarting the process.

A straightforward mitigation is to separate the two calls into distinct phases or to wrap set_backend in its own protection:

// Separate once-flags for each idempotent step, or catch and suppress
// the "already set" error from set_backend on retry:
try {
  c10d::symmetric_memory::set_backend(name);
} catch (const std::exception& e) {
  // If the backend is already set to the correct name, treat as success.
  // Re-throw otherwise.
}
c10d::symmetric_memory::set_group_info(
    kPyTorchSymmMemGroupName, ...);

Alternatively, split the once_flag so set_backend has its own dedicated guard that truly runs at most once, while set_group_info can retry on failure.

Copy link
Copy Markdown
Collaborator

@samnordmann samnordmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Some minor comments
Please add test, fix linter, and run the CI with !test command (comment directly on the PR)

- name: Run lintrunner

return store_.get();
}

#ifdef NVFUSER_DISTRIBUTED
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need guard here?


#ifdef NVFUSER_DISTRIBUTED
#include <torch/csrc/distributed/c10d/Backend.hpp>
#include <torch/csrc/distributed/c10d/Store.hpp>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed

Comment on lines +129 to +137
// Returns the store as an intrusive_ptr for use with PyTorch symmetric
// memory (c10d::symmetric_memory::set_group_info).
c10::intrusive_ptr<c10d::Store> getStore() const;

// Returns the world backend as an intrusive_ptr so it can be registered with
// c10d::register_process_group (e.g. for PyTorch symmetric memory NCCL
// rendezvous, which resolves the group by name).
c10::intrusive_ptr<c10d::Backend> getWorldBackendIntrusivePtr(
std::optional<CommunicatorBackend> backend = std::nullopt);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather, change the signature of the existing getter method to return intrusive_ptr instead of raw pointer

Comment on lines +461 to +468
std::string Communicator::getSymmMemGroupKey(
std::optional<CommunicatorBackend> backend) {
std::vector<RankType> all_ranks(size_);
std::iota(all_ranks.begin(), all_ranks.end(), 0);
CommunicatorBackend b = backend.value_or(default_backend_);
(void)getBackendForTeam(all_ranks, b, "symm_mem_");
return getTeamKey(all_ranks, b);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getSymmMemGroupKey returns key without "symm_mem_" prefix — mismatch with registered process group

getBackendForTeam(all_ranks, b, "symm_mem_") registers the process group under the key "symm_mem_" + getTeamKey(all_ranks, b) (see the register_process_group call in that function). However, getSymmMemGroupKey then returns just getTeamKey(all_ranks, b) — without the "symm_mem_" prefix.

The returned key is subsequently used in ensurePyTorchSymmMemBackend as the group_name passed to both set_group_info and rendezvous. Newer NCCL builds resolve the process group by name at rendezvous time; they will look for a process group registered as "nccl0,1,..." but only "symm_mem_nccl0,1,..." exists, causing rendezvous to fail.

The current workaround that registers under "0" papers over this for older NCCL, but the mismatch will surface as soon as the TODO comment is resolved and older-NCCL special-casing is removed.

The return statement should return the full team_key including the prefix:

Suggested change
std::string Communicator::getSymmMemGroupKey(
std::optional<CommunicatorBackend> backend) {
std::vector<RankType> all_ranks(size_);
std::iota(all_ranks.begin(), all_ranks.end(), 0);
CommunicatorBackend b = backend.value_or(default_backend_);
(void)getBackendForTeam(all_ranks, b, "symm_mem_");
return getTeamKey(all_ranks, b);
}
std::string Communicator::getSymmMemGroupKey(
std::optional<CommunicatorBackend> backend) {
std::vector<RankType> all_ranks(size_);
std::iota(all_ranks.begin(), all_ranks.end(), 0);
CommunicatorBackend b = backend.value_or(default_backend_);
const std::string prefix = "symm_mem_";
(void)getBackendForTeam(all_ranks, b, prefix);
return prefix + getTeamKey(all_ranks, b);
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems relevant, wdyt ?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted and removed "symm_mem_" prefix from the group name everywhere in my latest patch. Would you say it is important to have a prefix?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the prefix everywhere does fix the mismatch issue, so your code will work correctly now.

However, having the prefix is better practice. Here's why:

Without the prefix, a symmetric memory process group and a regular process group with the same ranks will have identical keys. For example, if you create a regular NCCL group with ranks [0,1,2,3] and then create a symmetric memory group with the same ranks, they'll collide on key "nccl0,1,2,3".

The prefix provides namespace separation — it ensures symmetric memory groups ("symm_mem_nccl0,1,2,3") can coexist with regular groups ("nccl0,1,2,3"). This matters if your code path might create both types of groups for the same rank configuration.

If you're certain that symmetric memory groups will never coexist with regular groups of the same configuration, removing the prefix is fine. But if there's any chance of that happening, keep the prefix for safety.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted and removed "symm_mem_" prefix from the group name everywhere in my latest patch. Would you say it is important to have a prefix?

that's up to you. Also, cf #6023 (comment)

Comment on lines +142 to +144
c10::intrusive_ptr<c10d::Store> getStore() const {
return c10::intrusive_ptr<c10d::Store>(store_);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getStore() uses non-idiomatic intrusive_ptr construction

c10::intrusive_ptr<c10d::Store>(store_) passes the raw TCPStore* obtained from store_ (via the implicit operator T* of intrusive_ptr) to a new intrusive_ptr<Store>. This calls the unsafe intrusive_ptr<T>(T*, bool) constructor that takes an already-retained raw pointer — but store_ is managed and this path risks a ref-count imbalance.

The idiomatic way is to let the intrusive_ptr copy-conversion handle it:

Suggested change
c10::intrusive_ptr<c10d::Store> getStore() const {
return c10::intrusive_ptr<c10d::Store>(store_);
}
c10::intrusive_ptr<c10d::Store> getStore() const {
return store_;
}

Comment on lines +41 to +43
case SymmetricMemoryBackend::PyTorchCuda:
name = "CUDA";
break;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 set_backend is never called for the PyTorchCuda backend

For PyTorchNccl and PyTorchNvshmem, c10d::symmetric_memory::set_backend(name) is called inside the call_once lambda. For PyTorchCuda, name is assigned "CUDA" but set_backend is never invoked. If PyTorch's symmetric-memory layer requires an explicit set_backend call before allocating with a CUDA transport, every empty_strided_p2p call on the CUDA path will either use whatever backend was previously configured (potentially NCCL or NVSHMEM) or fail silently at rendezvous time.

If PyTorchCuda truly requires no set_backend call (e.g., because "CUDA" is the implicit default), please add a comment explaining this so future maintainers don't perceive it as an oversight. Otherwise, add the missing call:

case SymmetricMemoryBackend::PyTorchCuda:
  name = "CUDA";
  c10d::symmetric_memory::set_backend(name);
  break;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relevant

Comment on lines +53 to +56
if (backend != SymmetricMemoryBackend::Native) {
NVF_CHECK(
comm.isBackendAvailable(CommunicatorBackend::kNccl),
"NCCL backend is required for symmetric_memory_backend(nccl)");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 NCCL availability check incorrectly required for all PyTorch backends

isBackendAvailable(CommunicatorBackend::kNccl) is checked unconditionally for every non-Native backend — including PyTorchNvshmem and PyTorchCuda. If those backends don't actually require an NCCL process group (e.g., NVSHMEM uses its own transport), this check will spuriously reject them on systems where NCCL is unavailable.

Additionally, the error message hardcodes "(nccl)" even when the active backend is NVSHMEM or CUDA, which will confuse users:

"NCCL backend is required for symmetric_memory_backend(nccl)"
// fired even when NVFUSER_ENABLE=symmetric_memory_backend(pytorch_nvshmem)

Consider guarding the NCCL check only for PyTorchNccl, and adjusting the error message dynamically:

if (backend == SymmetricMemoryBackend::PyTorchNccl) {
  NVF_CHECK(
      comm.isBackendAvailable(CommunicatorBackend::kNccl),
      "NCCL backend is required for symmetric_memory_backend(pytorch_nccl)");
}

Comment on lines +29 to +47
static std::once_flag once;
std::call_once(once, [backend]() {
const char* name = nullptr;
switch (backend) {
case SymmetricMemoryBackend::PyTorchNccl:
name = "NCCL";
c10d::symmetric_memory::set_backend(name);
break;
case SymmetricMemoryBackend::PyTorchNvshmem:
name = "NVSHMEM";
c10d::symmetric_memory::set_backend(name);
break;
case SymmetricMemoryBackend::PyTorchCuda:
name = "CUDA";
break;
default:
NVF_ERROR(false, "Unexpected PyTorch symmetric memory backend");
}
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Static once_flag binds to whichever backend is passed first — silently ignores later backends

once is a static std::once_flag, so set_backend(name) is called exactly once for the lifetime of the process. If the flag fires on the first call (e.g., PyTorchCuda), a later call with PyTorchNccl won't call set_backend("NCCL") at all — the wrong (or absent) backend will silently remain active.

In practice a single process shouldn't mix backends, but the current structure provides no error if it does. The typical guard is to also capture the name into a static and assert consistency on subsequent calls:

static std::string configured_name;
std::call_once(once, [backend, &configured_name]() {
  // ... set backend and populate configured_name
});
NVF_CHECK(
    configured_name == expected_name,
    "symmetric memory backend already configured as '", configured_name,
    "', cannot reconfigure to '", expected_name, "'");

Or, at minimum, document that mixing backends within a process is undefined behaviour.

Copy link
Copy Markdown
Collaborator

@samnordmann samnordmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall!
Please cleanup, fix the CI and all the minor issues

#include <c10/util/intrusive_ptr.h>

#if defined(NVFUSER_DISTRIBUTED) && \
__has_include(<torch/csrc/distributed/c10d/GroupRegistry.hpp>) && \
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the rationale behind defining NVFUSER_CAN_REGISTER_C10D_PROCESS_GROUP? In what scenario can the header be missing?

#ifdef NVFUSER_DISTRIBUTED
#include <torch/csrc/distributed/c10d/Backend.hpp>
#if NVFUSER_CAN_REGISTER_C10D_PROCESS_GROUP
#include <torch/csrc/distributed/c10d/ProcessGroup.hpp>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this header should always be present, no?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this header when I added the process_groups_ variable, so the same guard is used. It wasn't needed before my changes

if (backend != SymmetricMemoryBackend::Native) {
NVF_CHECK(
comm.isBackendAvailable(CommunicatorBackend::kNccl),
"NCCL backend is required for symmetric_memory_backend(nccl)");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"NCCL backend is required for symmetric_memory_backend(nccl)");
"NCCL backend is required for non-native symmetric memory backend: , backend");

@samnordmann
Copy link
Copy Markdown
Collaborator

Also, please write a PR description

Comment on lines +63 to +71
static std::once_flag pg0_once;
std::call_once(pg0_once, [&]() {
try {
(void)c10d::resolve_process_group("0");
} catch (const c10::Error&) {
auto pg = c10d::resolve_process_group(group_name);
c10d::register_process_group("0", pg);
}
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 "0" alias registered but never unregistered on cleanup

c10d::register_process_group("0", pg) is called inside a static std::once_flag lambda that lives in ensurePyTorchSymmMemBackend. The "0" key is never added to process_groups_ in Communicator, so Communicator::cleanup() will not unregister it:

for (const auto& entry : process_groups_) {
    c10d::unregister_process_group(entry.first); // only unregisters team_key, never "0"
}

In test environments that tear down and re-create a Communicator, the stale "0" registration persists across test cases. On the next call to ensurePyTorchSymmMemBackend, pg0_once is permanently fired, so c10d::resolve_process_group("0") succeeds with the old, destroyed process group — and symm-mem rendezvous will silently use it.

The fix is to track the "0" alias and unregister it during cleanup(), or unconditionally overwrite the "0" registration rather than checking first.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about this ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taken care of in next commit, process_groups_ variable keeps track if "0" was registered by fuser's symmem

@samnordmann
Copy link
Copy Markdown
Collaborator

!test

@saivishal1999 saivishal1999 requested a review from wujingyue March 25, 2026 16:47
Comment on lines +370 to +373
for (const auto& entry : process_groups_) {
c10d::unregister_process_group(entry.first);
}
process_groups_.clear();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 process_groups_ cleanup guard mismatch — compile error when NVFUSER_DISTRIBUTED is set without USE_DISTRIBUTED

process_groups_ is declared in communicator.h under #if defined(NVFUSER_DISTRIBUTED) && defined(USE_DISTRIBUTED), but the cleanup loop here lives under the broader #if defined(NVFUSER_DISTRIBUTED) (without the USE_DISTRIBUTED guard). When a build defines NVFUSER_DISTRIBUTED but not USE_DISTRIBUTED, process_groups_ does not exist as a member, yet this code tries to iterate over it — a hard compile error.

c10d::unregister_process_group (from GroupRegistry.hpp) is already included under #ifdef NVFUSER_DISTRIBUTED, so fixing just the guard on these lines is sufficient:

Suggested change
for (const auto& entry : process_groups_) {
c10d::unregister_process_group(entry.first);
}
process_groups_.clear();
#if defined(USE_DISTRIBUTED)
for (const auto& entry : process_groups_) {
c10d::unregister_process_group(entry.first);
}
process_groups_.clear();
#endif

(The surrounding #if defined(NVFUSER_DISTRIBUTED) / #endif already provides the outer distributed guard.)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is relevant -- but the real fix is to replace USE_DISTRIBUTED by NVFUSER_DISTRIBUTED wherever it appears (we don't use USE_DISTRIBUTED in the codebase)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. You're right—USE_DISTRIBUTED should be replaced with NVFUSER_DISTRIBUTED everywhere. The codebase currently has process_groups_ guarded by both macros, but the cleanup code only checks one, which causes the mismatch.

USE_DISTRIBUTED appears in 4 files:

  1. csrc/multidevice/communicator.h (line 14, line 162) — the header where process_groups_ is declared
  2. csrc/multidevice/communicator.cpp (line 414) — where process_groups_ is populated
  3. csrc/multidevice/symmetric_tensor.cpp (multiple locations) — where symmetric memory uses USE_DISTRIBUTED
  4. csrc/multidevice/symmetric_tensor.h (line 88) — where torch_symm_handle_ is declared

All of these #if defined(NVFUSER_DISTRIBUTED) && defined(USE_DISTRIBUTED) guards should become #if defined(NVFUSER_DISTRIBUTED).

The Python files (python/utils.py, python/tools/gen_nvfuser_version.py) and CMakeLists.txt also reference USE_DISTRIBUTED for build configuration, so those would need updating too depending on your build system changes.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a build "build_image_ARM_igpu_20_TNVF" which was failing - it was setting NVFUSER_DISTRIBUTED to ON but USE_DISTRIBUTED to False during cmake configuration. So I had to explicitly add USE_DISTRIBUTED checks. I'll add the missing guard here.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I didnt know
cc @wujingyue to make sure you are aware of this

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for tagging me.

the real fix is to replace USE_DISTRIBUTED by NVFUSER_DISTRIBUTED wherever it appears (we don't use USE_DISTRIBUTED in the codebase)

That sounds right.

I'm not sure why NVFUSER_DISTRIBUTED was created in the first place -- it was there before I joined the project. We could use USE_DISTRIBUTED throughout the codebase. I guess NVFUSER_DISTRIBUTED gives an extra control of turning nvFuser distributed off even when pytorch distributed is on. But I'm not sure that use case is relevant.

it was setting NVFUSER_DISTRIBUTED to ON but USE_DISTRIBUTED to False

Are you sure about this given

cmake_dependent_option(NVFUSER_DISTRIBUTED "" ON "USE_DISTRIBUTED" OFF)
?

Also note

// nvFuser is sometimes built on a pytorch without c10d. When that
// happens, c10d isn't linked, NVFUSER_DISTRIBUTED is undefined and the
// multi-GPU component of nvFuser is expected to be disabled.
//
// Instead of adding `#ifdef NVFUSER_DISTRIBUTED` in too many places, this file
// provides a buildable mock implementation of c10d to keep nvFuser code less
// divergent. This implementation won't run because tests and user code are
// guarded by Communicator::is_available.
. I think we can avoid the #if by defining a mock.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wujingyue I saw that USE_DISTRIBUTED was false and NVFUSER_DISTRIBUTE was on in logs here https://gitlab-master.nvidia.com/dl/pytorch/fuser-gh-mirror/-/jobs/287606349/raw.

@wujingyue wujingyue requested a review from Priya2698 March 25, 2026 17:16
Comment on lines +63 to +76
static std::once_flag pg0_once;
std::call_once(pg0_once, [&]() {
try {
(void)c10d::resolve_process_group("0");
} catch (const std::exception&) {
// resolve_process_group throws c10d Error
// (derives from std::exception)
auto pg = c10d::resolve_process_group(group_name);
c10d::register_process_group("0", pg);
}
});

comm.barrier(CommunicatorBackend::kNccl);
return group_name;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Barrier fires on every call, not just during initial setup

comm.barrier(CommunicatorBackend::kNccl) is placed outside the pg0_once lambda, so it executes on every invocation of ensurePyTorchSymmMemBackend — including every subsequent call to allocate() and setupRemoteHandles() after initialization has already completed. This is both a performance concern (unnecessary NCCL barrier per allocation/rendezvous) and a correctness risk: if any caller ever invokes allocate() or setupRemoteHandles() asymmetrically across ranks (different call counts), these extra barriers will hang.

The barrier is only needed once — after pg0_once — to ensure all ranks have completed the group alias registration before any rank proceeds to use it. Moving it inside the pg0_once lambda would fix both concerns:

static std::once_flag pg0_once;
std::call_once(pg0_once, [&]() {
  try {
    (void)c10d::resolve_process_group("0");
  } catch (const std::exception&) {
    auto pg = c10d::resolve_process_group(group_name);
    c10d::register_process_group("0", pg);
  }
  comm.barrier(CommunicatorBackend::kNccl); // moved inside once-flag
});

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems relevant too, wdyt?
Why do we need a barrier here? Please add a short comment in the code to explain.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The barrier ensures all ranks have completed the process group "0" alias registration before proceeding. Without it, there's a race: one rank could try to resolve "0" before another rank has finished registering it.

As mentioned in the earlier comment, the barrier should be moved inside the pg0_once lambda—it only needs to run once after registration, not on every call to this function:

Suggested change
static std::once_flag pg0_once;
std::call_once(pg0_once, [&]() {
try {
(void)c10d::resolve_process_group("0");
} catch (const std::exception&) {
// resolve_process_group throws c10d Error
// (derives from std::exception)
auto pg = c10d::resolve_process_group(group_name);
c10d::register_process_group("0", pg);
}
});
comm.barrier(CommunicatorBackend::kNccl);
return group_name;
static std::once_flag pg0_once;
std::call_once(pg0_once, [&]() {
try {
(void)c10d::resolve_process_group("0");
} catch (const std::exception&) {
// resolve_process_group throws c10d Error
// (derives from std::exception)
auto pg = c10d::resolve_process_group(group_name);
c10d::register_process_group("0", pg);
}
// Barrier ensures all ranks have registered the "0" alias before any
// rank proceeds to use it. Without this, races can occur where one rank
// tries to resolve "0" before another rank has finished registering it.
comm.barrier(CommunicatorBackend::kNccl);
});

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samnordmann moved the barrier call inside setupremotehandles function.

bool is_contiguous_view_setup_ = false;
at::Tensor contiguous_view_;
#if defined(NVFUSER_DISTRIBUTED) && defined(USE_DISTRIBUTED)
c10::intrusive_ptr<c10d::symmetric_memory::SymmetricMemory>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a bare minimum example of using c10d::symmetric_memory::SymmetricMemory and at::Tensor without any nvFuser? I ask this because I feel this class has lots of fields that are irrelevant for c10d SymmetricMemory, but I could be terribly wrong.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some tests using c10d::symmetric_memory. https://github.com/pytorch/pytorch/blob/main/test/distributed/test_symmetric_memory.py
Does this help?
Yes, except for one or two fields like local_tensor_ and mc_ptr_ no other fields are used by c10d::symmetric_memory.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this help?

Yes -- thanks! I understood the Python API. When I say c10d, I was mostly trying to figure out the C++ side of the things. Looks like it's roughly one-to-one correspondence.

no other fields are used by c10d::symmetric_memory.

Given that, would you consider a separate data structure encapsulating just what you need? Inheritance comes with coupling: https://media.pragprog.com/titles/tpp20/inheritance-tax.pdf

In addition, I'm wondering whether the multicast pointer needs to be packed in SymmetricTensor. Can you remind me how you plan to use it so I can also think about it? I haven't seen this PR tries to but I could be missing something.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't exactly understand the reason to encapsulate them into a separate data structure. Just to be sure, local_tensor_ and mc_ptr are used in both fuser native and c10d::symmem paths; only c10d methods use these fields as arguments and aren't typecasted anywhere.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mainly to decouple the c10d::symmem path from (many) fields that are used only in native. Am I missing anything?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SymmetricTensor class should now have a single SymmemData field instead of separate local_tensor_, mc_ptr member variables

Sorry, I'm sure I wasn't clear.

I meant to not use SymmetricTensor at all for the c10d::symmem path. SymmetricTensor came with too many data fields that are not necessary for c10d::symmmem.

In addition, I'm not sure whether the multicast pointer should be packed together with the local tensor. What does a multicast pointer even mean? Is local_tensor.data() supposed to be the virtual address that we bind to a multicast handle?

(Anyhow, I'm trying to think how to build it from first principles. I'm aware of the existing SymmetricTensor class and that it's used by the "native" path. However, I'm trying to avoid carrying over too much legacy to the c10d path.)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the methods like allocate, setupRemoteHandles, remoteTensor are all methods of SymmetricTensor class. We thought of adding the torch backends without changing the basic interface(above methods). So very minimal change was done to SymmetricTensor class to accommodate torch backends - just the torch_symm_handle was added. Regarding the other design choices, @samnordmann can you please clarify

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether the multicast pointer should be packed together with the local tensor.

I think I figured this out from the documentation. symm_mem.rendezvous returns a handle containing remote buffers (one per GPU) and optionally a multicasting buffer. Writing to the local tensor doesn't change any remote data; writing to a remote buffer only changes the local tensor on that particular GPU; writing to the multicasting buffer changes the local tensors on all GPUs. That's why SymmetricTensor contains local_tensor_, remote_buffers_ and mc_ptr_.

Am I understanding this correctly?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like c10d::symmetric_memory::SymmetricMemory (the type of torch_symm_handle_) already holds remote buffers and the multicast pointer. So for the c10d path we don't need to store another version of remote_buffers_ and mc_ptr_ separately.

So, if we make a new class NewSymmetricTensor (need a better name...) for c10d, that class will only need to contain the local tensor and a c10d::symmetric_memory::SymmetricMemory. Is that right?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's correct - if we make a new class NewSymmetricTensor or c10d, that class will only need to contain the local tensor and a c10d::symmetric_memory::SymmetricMemory.

@saivishal1999
Copy link
Copy Markdown
Collaborator Author

!test

3 similar comments
@saivishal1999
Copy link
Copy Markdown
Collaborator Author

!test

@saivishal1999
Copy link
Copy Markdown
Collaborator Author

!test

@samnordmann
Copy link
Copy Markdown
Collaborator

!test

bool is_contiguous_view_setup_ = false;
at::Tensor contiguous_view_;
#if defined(NVFUSER_DISTRIBUTED) && defined(USE_DISTRIBUTED)
c10::intrusive_ptr<c10d::symmetric_memory::SymmetricMemory>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this help?

Yes -- thanks! I understood the Python API. When I say c10d, I was mostly trying to figure out the C++ side of the things. Looks like it's roughly one-to-one correspondence.

no other fields are used by c10d::symmetric_memory.

Given that, would you consider a separate data structure encapsulating just what you need? Inheritance comes with coupling: https://media.pragprog.com/titles/tpp20/inheritance-tax.pdf

In addition, I'm wondering whether the multicast pointer needs to be packed in SymmetricTensor. Can you remind me how you plan to use it so I can also think about it? I haven't seen this PR tries to but I could be missing something.

Copy link
Copy Markdown
Collaborator

@wujingyue wujingyue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, some of my comments went pending and didn't land. Should be fixed now.

Comment on lines +370 to +373
for (const auto& entry : process_groups_) {
c10d::unregister_process_group(entry.first);
}
process_groups_.clear();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is relevant -- but the real fix is to replace USE_DISTRIBUTED by NVFUSER_DISTRIBUTED wherever it appears (we don't use USE_DISTRIBUTED in the codebase)

Comment on lines +461 to +468
std::string Communicator::getSymmMemGroupKey(
std::optional<CommunicatorBackend> backend) {
std::vector<RankType> all_ranks(size_);
std::iota(all_ranks.begin(), all_ranks.end(), 0);
CommunicatorBackend b = backend.value_or(default_backend_);
(void)getBackendForTeam(all_ranks, b, "symm_mem_");
return getTeamKey(all_ranks, b);
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Comment on lines +415 to +435
#if defined(NVFUSER_DISTRIBUTED) && defined(USE_DISTRIBUTED)
std::optional<c10d::ProcessGroup::BackendType> pg_backend =
(b == CommunicatorBackend::kNccl)
? std::optional<c10d::ProcessGroup::BackendType>(
c10d::ProcessGroup::BackendType::NCCL)
: std::nullopt;
if (backends_[team_key] != nullptr && pg_backend.has_value()) {
auto rank_it = std::ranges::find(team.begin(), team.end(), deviceId());
RankType team_rank = std::distance(team.begin(), rank_it);

auto pg = c10::make_intrusive<c10d::ProcessGroup>(
c10::make_intrusive<c10d::PrefixStore>(team_key, store_),
team_rank,
static_cast<int>(team.size()));
pg->setBackend(c10::DeviceType::CUDA, *pg_backend, backends_[team_key]);
pg->setDefaultBackend(*pg_backend);
pg->setGroupName(team_key);

c10d::register_process_group(team_key, pg);
process_groups_[team_key] = std::move(pg);
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain why we need this change? I am not sure to understand the logic and motivation. It seems like an old artifact -- process_groups_ doesn't seem to be read anywhere. Please clarify

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to keep track of process groups registered by fuser's symmem so that they can be unregistered during cleanup and also to keep track if the group is already registered or not. in the next commit you'll see that i'll use this variable's keys(to read) and during cleanup

std::unordered_map<std::string, c10::intrusive_ptr<c10d::Backend>> backends_;
// c10d process-group wrappers registered for symmetric-memory rendezvous.
#if defined(NVFUSER_DISTRIBUTED) && defined(USE_DISTRIBUTED)
std::unordered_map<std::string, c10::intrusive_ptr<c10d::ProcessGroup>>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make sure c10d_mock.h is up to date to avoid compilation issue in the non-distributed mode

Also, can you explain (and add a comment in the code) why we need ProcessGroup here?

Comment on lines +63 to +71
static std::once_flag pg0_once;
std::call_once(pg0_once, [&]() {
try {
(void)c10d::resolve_process_group("0");
} catch (const c10::Error&) {
auto pg = c10d::resolve_process_group(group_name);
c10d::register_process_group("0", pg);
}
});
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about this ?

Comment on lines +63 to +76
static std::once_flag pg0_once;
std::call_once(pg0_once, [&]() {
try {
(void)c10d::resolve_process_group("0");
} catch (const std::exception&) {
// resolve_process_group throws c10d Error
// (derives from std::exception)
auto pg = c10d::resolve_process_group(group_name);
c10d::register_process_group("0", pg);
}
});

comm.barrier(CommunicatorBackend::kNccl);
return group_name;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems relevant too, wdyt?
Why do we need a barrier here? Please add a short comment in the code to explain.

Comment on lines +427 to +445
#if defined(NVFUSER_DISTRIBUTED) && defined(USE_DISTRIBUTED)
if (process_groups_.find(team_key) == process_groups_.end()) {
if (b == CommunicatorBackend::kNccl) {
auto rank_it = std::ranges::find(team.begin(), team.end(), deviceId());
RankType team_rank = std::distance(team.begin(), rank_it);

auto pg = c10::make_intrusive<c10d::ProcessGroup>(
c10::make_intrusive<c10d::PrefixStore>(team_key, store_),
team_rank,
static_cast<int>(team.size()));
pg->setBackend(
c10::DeviceType::CUDA,
c10d::ProcessGroup::BackendType::NCCL,
backends_[team_key]);
pg->setDefaultBackend(c10d::ProcessGroup::BackendType::NCCL);
pg->setGroupName(team_key);

registerProcessGroup(team_key, pg);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 ProcessGroup created with out-of-bounds rank when current process is not in the team

When deviceId() is not a member of team, the earlier backends_ block (line ~408-425) already returns nullptr for backends_[team_key]. But the new ProcessGroup creation block immediately below does not guard against this case:

auto rank_it = std::ranges::find(team.begin(), team.end(), deviceId());
RankType team_rank = std::distance(team.begin(), rank_it);
// rank_it == team.end() → team_rank == team.size() (out of range)

auto pg = c10::make_intrusive<c10d::ProcessGroup>(
    ...,
    team_rank,                  // ← out-of-range rank
    static_cast<int>(team.size()));
pg->setBackend(
    c10::DeviceType::CUDA,
    c10d::ProcessGroup::BackendType::NCCL,
    backends_[team_key]);       // ← nullptr backend

A ProcessGroup with rank == size and a nullptr NCCL backend gets registered under team_key. If symmetric-memory rendezvous subsequently resolves this key, it will operate against an invalid process group, likely causing a hang or crash.

The fix is to mirror the same guard already used in the backends_ block:

if (process_groups_.find(team_key) == process_groups_.end()) {
  if (b == CommunicatorBackend::kNccl) {
    auto rank_it = std::ranges::find(team.begin(), team.end(), deviceId());
    if (rank_it == team.end()) {
      // This rank is not part of the team; skip ProcessGroup creation.
      return backends_.at(team_key).get();
    }
    RankType team_rank = std::distance(team.begin(), rank_it);
    // ... rest of pg creation ...
  }
}

Comment on lines +54 to +91
// Always return a valid group name
if (backend != SymmetricMemoryBackend::Native) {
NVF_CHECK(
comm.isBackendAvailable(CommunicatorBackend::kNccl),
"NCCL backend is required for non-native symmetric memory backend");

std::vector<RankType> all_ranks(comm.size());
std::iota(all_ranks.begin(), all_ranks.end(), 0);
(void)comm.getBackendForTeam(all_ranks, CommunicatorBackend::kNccl);
std::string group_name = std::accumulate(
std::begin(all_ranks),
std::end(all_ranks),
std::string("nccl"),
[](const std::string& a, const RankType& b) {
return a.empty() ? std::to_string(b) : a + ',' + std::to_string(b);
});
if (backend == SymmetricMemoryBackend::PyTorchNvshmem) {
static std::once_flag pg0_once;
std::call_once(pg0_once, [&]() {
try {
(void)c10d::resolve_process_group("0");
} catch (const std::exception&) {
// resolve_process_group throws c10d Error
// (derives from std::exception)
auto pg = c10d::resolve_process_group(group_name);
comm.registerProcessGroup("0", pg);
}
});
}

return group_name;
}

NVF_ERROR(
false,
"No c10d backend available for symmetric memory rendezvous. "
"Expected NCCL or UCC process group.");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Dead-code NVF_ERROR at end of ensurePyTorchSymmMemBackend and redundant inner guard

Every caller of ensurePyTorchSymmMemBackend already verifies backend != SymmetricMemoryBackend::Native before calling, so the inner if (backend != SymmetricMemoryBackend::Native) guard at line 55 is always true and the NVF_ERROR at the end is unreachable dead code. While not a runtime crash on its own, this structure is misleading: it implies to a future reader that Native is a valid input, and any refactoring that adds a new call path without the outer guard will silently skip the NVF_ERROR and fall through to undefined behavior.

Consider either:

  • Asserting the precondition at the top of the function: NVF_CHECK(backend != SymmetricMemoryBackend::Native, "..."), then removing the inner guard; or
  • Adding an internal-only overload that takes a non-Native value type to make the invariant compile-time enforced.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants