diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6eadda84c..20fd988ba 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -98,10 +98,10 @@ jobs: . .venv/bin/activate bash tools/verify_packaging.sh - # ---------- Python unit tests (no hardware) ---------- - ut-py: + # ---------- Unit tests (no hardware, Python + C++) ---------- + ut: runs-on: ${{ matrix.os }} - timeout-minutes: 10 + timeout-minutes: 15 strategy: matrix: os: [ubuntu-latest, macos-latest] @@ -115,33 +115,6 @@ jobs: with: python-version: ${{ matrix.python-version }} - - name: Cache pip packages - uses: actions/cache@v3 - with: - path: ~/.cache/pip - key: ${{ runner.os }}-pip-${{ hashFiles('**/*.py') }} - restore-keys: | - ${{ runner.os }}-pip- - - - name: Install dependencies - run: | - pip install torch --index-url https://download.pytorch.org/whl/cpu - pip install '.[test]' - - - name: Run unit tests - run: pytest tests -m "not requires_hardware" -v - - # ---------- C++ unit tests (no hardware) ---------- - ut-cpp: - runs-on: ${{ matrix.os }} - timeout-minutes: 10 - strategy: - matrix: - os: [ubuntu-latest, macos-latest] - steps: - - name: Checkout repository - uses: actions/checkout@v4 - - name: Install GoogleTest (Linux) if: runner.os == 'Linux' run: | @@ -157,6 +130,22 @@ jobs: run: | brew install googletest + - name: Cache pip packages + uses: actions/cache@v3 + with: + path: ~/.cache/pip + key: ${{ runner.os }}-pip-${{ hashFiles('**/*.py') }} + restore-keys: | + ${{ runner.os }}-pip- + + - name: Install dependencies + run: | + pip install torch --index-url https://download.pytorch.org/whl/cpu + pip install '.[test]' + + - name: Run Python unit tests + run: pytest tests -m "not requires_hardware" -v + - name: Build and run C++ unit tests run: | cmake -B tests/ut/cpp/build -S tests/ut/cpp @@ -276,8 +265,10 @@ jobs: fi exit $rc - # ---------- Python unit tests (a2a3 hardware) ---------- - ut-py-a2a3: + # ---------- Unit tests (a2a3 hardware, Python + C++) ---------- + ut-a2a3: + needs: detect-changes + if: needs.detect-changes.outputs.a2a3_changed == 'true' runs-on: [self-hosted, a2a3] timeout-minutes: 30 @@ -285,20 +276,46 @@ jobs: - name: Checkout repository uses: actions/checkout@v4 - - name: Build nanobind extension + - name: Set up environment run: | python3 -m venv --system-site-packages .venv source .venv/bin/activate pip install --upgrade pip pip install '.[test]' - - name: Run hardware unit tests (a2a3) + - name: Run Python hardware unit tests + run: | + set +e + source .venv/bin/activate + source ${ASCEND_HOME_PATH}/bin/setenv.bash + set -e + python -m pytest tests -m requires_hardware --platform a2a3 -v + + - name: Build and run C++ hardware unit tests run: | + set +e source .venv/bin/activate - source ${ASCEND_HOME_PATH}/bin/setenv.bash && python -m pytest tests -m requires_hardware --platform a2a3 -v + source ${ASCEND_HOME_PATH}/bin/setenv.bash + set -e + python -c "from simpler_setup.runtime_builder import RuntimeBuilder; RuntimeBuilder('a2a3').get_binaries('tensormap_and_ringbuffer', build=True)" + cmake -B tests/ut/cpp/build -S tests/ut/cpp -DSIMPLER_ENABLE_HARDWARE_TESTS=ON + cmake --build tests/ut/cpp/build + python3 -c " + import json, os + s, e = os.environ['DEVICE_RANGE'].split('-') + npus = [{'id': str(i), 'slots': 1} for i in range(int(s), int(e)+1)] + json.dump({'version': {'major': 1, 'minor': 0}, 'local': [{'npus': npus}]}, + open('tests/ut/cpp/build/resources.json', 'w')) + " + ctest --test-dir tests/ut/cpp/build \ + -L "^requires_hardware(_a2a3)?$" \ + --resource-spec-file $PWD/tests/ut/cpp/build/resources.json \ + -j$(nproc) --output-on-failure # ---------- Scene tests (a2a3 hardware) ---------- st-onboard-a2a3: + needs: detect-changes + if: needs.detect-changes.outputs.a2a3_changed == 'true' runs-on: [self-hosted, a2a3] timeout-minutes: 60 @@ -306,7 +323,7 @@ jobs: - name: Checkout repository uses: actions/checkout@v4 - - name: Build nanobind extension + - name: Set up environment run: | python3 -m venv --system-site-packages .venv source .venv/bin/activate @@ -329,57 +346,83 @@ jobs: exit $rc - # ---------- Detect A5 changes (runs on GitHub server, not A5 machine) ---------- + # ---------- Detect platform-specific changes (runs on GitHub server) ---------- detect-changes: runs-on: ubuntu-latest outputs: + a2a3_changed: ${{ steps.check.outputs.a2a3_changed }} a5_changed: ${{ steps.check.outputs.a5_changed }} steps: - name: Checkout repository uses: actions/checkout@v4 with: fetch-depth: 0 - - name: Check A5 file changes + - name: Check file changes id: check run: | FILES=$(git diff --name-only ${{ github.event.pull_request.base.sha }}...${{ github.event.pull_request.head.sha }}) - # Skip A5 only when ALL changed files are confined to a2a3-only or non-code paths. - # Shared code (src/common/, python/, examples/scripts/, build files) affects A5. - A2A3_ONLY='^(src/a2a3/|examples/a2a3/|tests/(st|device_tests)/a2a3/)' NON_CODE='^(docs/|\.docs/|\.claude/|KNOWN_ISSUES\.md$|\.gitignore$|README\.md$|\.pre-commit-config\.yaml$)' - # Filter out a2a3-only and non-code files; if anything remains, it may affect A5 - REMAINING=$(echo "$FILES" | grep -vE "$A2A3_ONLY" | grep -vE "$NON_CODE" || true) + # a2a3: skip only when ALL changed files are a5-only or non-code + A5_ONLY='^(src/a5/|examples/a5/|tests/(st|device_tests)/a5/)' + A2A3_REMAINING=$(echo "$FILES" | grep -vE "$A5_ONLY" | grep -vE "$NON_CODE" || true) + + if [ -n "$A2A3_REMAINING" ]; then + echo "a2a3_changed=true" >> "$GITHUB_OUTPUT" + echo "Files affecting a2a3:" + echo "$A2A3_REMAINING" + else + echo "a2a3_changed=false" >> "$GITHUB_OUTPUT" + echo "All changes are a5-only or non-code; skipping a2a3" + fi - if [ -n "$REMAINING" ]; then + # a5: skip only when ALL changed files are a2a3-only or non-code + A2A3_ONLY='^(src/a2a3/|examples/a2a3/|tests/(st|device_tests)/a2a3/)' + A5_REMAINING=$(echo "$FILES" | grep -vE "$A2A3_ONLY" | grep -vE "$NON_CODE" || true) + + if [ -n "$A5_REMAINING" ]; then echo "a5_changed=true" >> "$GITHUB_OUTPUT" - echo "Files affecting A5:" - echo "$REMAINING" + echo "Files affecting a5:" + echo "$A5_REMAINING" else echo "a5_changed=false" >> "$GITHUB_OUTPUT" - echo "All changes are a2a3-only or non-code; skipping A5" + echo "All changes are a2a3-only or non-code; skipping a5" fi - # TODO: Uncomment when a5 hardware runner is available. - # Add the "a5" label to the runner, matching [self-hosted, a5] below. - # - # ut-py-a5: - # needs: detect-changes - # runs-on: [self-hosted, a5] - # timeout-minutes: 30 - # - # steps: - # - name: Checkout repository - # uses: actions/checkout@v4 - # - # - name: Build nanobind extension - # run: pip install . - # - # - name: Run hardware unit tests (a5) - # run: | - # export PATH="$HOME/.local/bin:$PATH" - # source ${ASCEND_HOME_PATH}/bin/setenv.bash && pytest tests -m requires_hardware --platform a5 -v - # + + # ---------- Unit tests (a5 hardware, Python + C++) ---------- + ut-a5: + needs: detect-changes + if: needs.detect-changes.outputs.a5_changed == 'true' + runs-on: [self-hosted, a5] + timeout-minutes: 30 + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up environment + run: | + set +e + source ${ASCEND_HOME_PATH}/bin/setenv.bash + set -e + pip install '.[test]' + + - name: Run Python hardware unit tests (a5) + run: | + set +e + source ${ASCEND_HOME_PATH}/bin/setenv.bash + set -e + python -m pytest tests -m requires_hardware --platform a5 -v + + - name: Build and run C++ hardware unit tests (a5) + run: | + set +e + source ${ASCEND_HOME_PATH}/bin/setenv.bash + set -e + cmake -B tests/ut/cpp/build -S tests/ut/cpp -DSIMPLER_ENABLE_HARDWARE_TESTS=ON + cmake --build tests/ut/cpp/build + ctest --test-dir tests/ut/cpp/build -L "^requires_hardware(_a5)?$" --output-on-failure st-onboard-a5: needs: detect-changes @@ -391,14 +434,19 @@ jobs: - name: Checkout repository uses: actions/checkout@v4 - - name: Build nanobind extension + - name: Set up environment run: | + set +e source ${ASCEND_HOME_PATH}/bin/setenv.bash + set -e + pip install --upgrade pip pip install '.[test]' - name: Run pytest scene tests (a5) run: | + set +e source ${ASCEND_HOME_PATH}/bin/setenv.bash + set -e DEVICE_LIST=$(python -c "s,e='${DEVICE_RANGE}'.split('-'); print(','.join(str(i) for i in range(int(s),int(e)+1)))") PYTEST="python -m pytest examples tests/st --platform a5 --device ${DEVICE_RANGE} -v --clone-protocol https" task-submit --timeout 1800 --max-time 1800 --device "$DEVICE_LIST" --run "set +e; $PYTEST --pto-session-timeout 1200; rc=\$?; if [ \$rc -eq 124 ]; then echo 'pytest timed out; retrying with pinned PTO-ISA commit'; $PYTEST --pto-session-timeout 1200 --pto-isa-commit d96c8784 --clone-protocol https; rc=\$?; fi; exit \$rc" diff --git a/docs/ci.md b/docs/ci.md index 1e66f0561..4b715c22b 100644 --- a/docs/ci.md +++ b/docs/ci.md @@ -6,10 +6,11 @@ The CI pipeline maps test categories (st, ut-py, ut-cpp) × hardware tiers to Gi Design principles: -1. **Separate jobs per test category** — st, ut-py, and ut-cpp run as independent jobs for parallelism and clear dashboard visibility. +1. **Merge by runner, not by language** — Python and C++ unit tests share setup cost and run as steps within a single job per runner tier (`ut`, `ut-a2a3`, `ut-a5`). 2. **Runner matches hardware tier** — no-hardware tests run on `ubuntu-latest`; platform-specific tests run on self-hosted runners with the matching label (`a2a3`, `a5`). 3. **`--platform` is the only filter** — pytest uses `--platform` + the `requires_hardware` marker; ctest uses label `-LE` exclusion. No `-m st`, no `-m "not requires_hardware"`. 4. **sim = no hardware** — `a2a3sim`/`a5sim` jobs run on github-hosted runners alongside unit tests. +5. **Skip irrelevant platforms** — `detect-changes` gates hardware jobs so pure a5 PRs skip a2a3 runners and vice versa. ## Full Job Matrix @@ -17,40 +18,34 @@ The complete test-type × hardware-tier matrix. Empty cells have no tests yet; o | Category | github-hosted (no hardware) | a2a3 runner | a5 runner | | -------- | --------------------------- | ----------- | --------- | -| **ut-py** | `ut-py` | `ut-py-a2a3` | `ut-py-a5` | -| **ut-cpp** | `ut-cpp` | `ut-cpp-a2a3` | `ut-cpp-a5` | -| **st** | `st-sim-a2a3`, `st-sim-a5` | `st-a2a3` | `st-a5` | +| **ut** (py + cpp) | `ut` | `ut-a2a3` | `ut-a5` | +| **st** | `st-sim-a2a3`, `st-sim-a5` | `st-onboard-a2a3` | `st-onboard-a5` | ## GitHub Actions Jobs -Currently active jobs (a5 jobs commented out — no runner yet): - ```text PullRequest - ├── ut-py (ubuntu-latest) - ├── ut-cpp (ubuntu-latest) - ├── st-sim-a2a3 (ubuntu + macOS) - ├── st-sim-a5 (ubuntu + macOS) - ├── ut-py-a2a3 (a2a3 self-hosted) - ├── ut-cpp-a2a3 (a2a3 self-hosted) - ├── st-a2a3 (a2a3 self-hosted) - ├── ut-py-a5 (a5 self-hosted, commented out) - ├── ut-cpp-a5 (a5 self-hosted, commented out) - └── st-a5 (a5 self-hosted, commented out) + ├── pre-commit (ubuntu-latest) + ├── packaging-matrix (ubuntu + macOS) + ├── ut (ubuntu + macOS) — Python + C++ UT, no hardware + ├── st-sim-a2a3 (ubuntu + macOS) + ├── st-sim-a5 (ubuntu + macOS) + ├── detect-changes (ubuntu-latest) — gates a2a3 + a5 hw jobs + ├── ut-a2a3 (a2a3 self-hosted) — Python + C++ UT, a2a3 hardware + ├── st-onboard-a2a3 (a2a3 self-hosted) + ├── ut-a5 (a5 self-hosted) — Python + C++ UT, a5 hardware + └── st-onboard-a5 (a5 self-hosted) ``` | Job | Runner | What it runs | | --- | ------ | ------------ | -| `ut-py` | `ubuntu-latest` | `pytest tests/ut` | -| `ut-cpp` | `ubuntu-latest` | `ctest --test-dir tests/ut/cpp/build -LE requires_hardware` | +| `ut` | `ubuntu-latest`, `macos-latest` | `pytest tests/ut` + `ctest -LE requires_hardware` | | `st-sim-a2a3` | `ubuntu-latest`, `macos-latest` | `pytest examples tests/st --platform a2a3sim` | | `st-sim-a5` | `ubuntu-latest`, `macos-latest` | `pytest examples tests/st --platform a5sim` | -| `ut-py-a2a3` | a2a3 self-hosted | `pytest tests/ut --platform a2a3` | -| `ut-cpp-a2a3` | a2a3 self-hosted | `ctest --test-dir tests/ut/cpp/build -L "^requires_hardware(_a2a3)?$"` | -| `st-a2a3` | a2a3 self-hosted | `pytest examples tests/st --platform a2a3 --device ...` | -| `ut-py-a5` | a5 self-hosted | `pytest tests/ut --platform a5` | -| `ut-cpp-a5` | a5 self-hosted | `ctest --test-dir tests/ut/cpp/build -L "^requires_hardware(_a5)?$"` | -| `st-a5` | a5 self-hosted | `pytest examples tests/st --platform a5 --device ...` | +| `ut-a2a3` | a2a3 self-hosted | `pytest tests/ut --platform a2a3` + `ctest -L "^requires_hardware(_a2a3)?$" --resource-spec-file ...` | +| `st-onboard-a2a3` | a2a3 self-hosted | `pytest examples tests/st --platform a2a3 --device ...` | +| `ut-a5` | a5 self-hosted | `pytest tests/ut --platform a5` + `ctest -L "^requires_hardware(_a5)?$"` | +| `st-onboard-a5` | a5 self-hosted | `pytest examples tests/st --platform a5 --device ...` | ### Parallel ST runs on hardware @@ -101,8 +96,9 @@ not need `--max-parallel` manually. ### Scheduling constraints - Sim scene tests and no-hardware unit tests run on github-hosted runners (no hardware). -- `a2a3` tests (st + ut-py + ut-cpp) only run on the `a2a3` self-hosted machine. -- `a5` tests (st + ut-py + ut-cpp) only run on the `a5` self-hosted machine. +- `detect-changes` gates all hardware jobs: pure a5 PRs skip a2a3 runners and vice versa. +- a2a3 tests (st + ut) only run on the `a2a3` self-hosted machine when a2a3-relevant files change. +- a5 tests (st + ut) only run on the `a5` self-hosted machine when a5-relevant files change. ## Hardware Classification @@ -110,9 +106,9 @@ Three hardware tiers, applied to all test categories. See [testing.md](testing.m | Tier | CI Runner | Job examples | | ---- | --------- | ------------ | -| No hardware | `ubuntu-latest` | `ut-py`, `ut-cpp`, `st-sim-*` | -| Platform-specific (a2a3) | `[self-hosted, a2a3]` | `ut-py-a2a3`, `ut-cpp-a2a3`, `st-a2a3` | -| Platform-specific (a5) | `[self-hosted, a5]` | `ut-py-a5`, `ut-cpp-a5`, `st-a5` | +| No hardware | `ubuntu-latest` | `ut`, `st-sim-*` | +| Platform-specific (a2a3) | `[self-hosted, a2a3]` | `ut-a2a3`, `st-onboard-a2a3` | +| Platform-specific (a5) | `[self-hosted, a5]` | `ut-a5`, `st-onboard-a5` | ## Test Sources diff --git a/docs/testing.md b/docs/testing.md index 739e347b8..53edf17fd 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -451,6 +451,42 @@ add_test(NAME test_my_component COMMAND test_my_component) # set_tests_properties(test_my_component PROPERTIES LABELS "requires_hardware_a2a3") ``` +#### C++ hardware tests needing NPU devices + +Tests that need specific NPU devices use CTest's [resource allocation](https://cmake.org/cmake/help/latest/prop_test/RESOURCE_GROUPS.html). Declare `RESOURCE_GROUPS` alongside the hardware label: + +```cmake +set_tests_properties(test_hccl_comm PROPERTIES + LABELS "requires_hardware_a2a3" + RESOURCE_GROUPS "2,npus:1" # 2 groups × 1 NPU slot each = 2 distinct devices +) +``` + +The CI generates a resource spec file from `${DEVICE_RANGE}` and passes it to ctest: + +```bash +# Generate resource spec (CI does this automatically) +python3 -c " +import json +npus = [{'id': str(i), 'slots': 1} for i in range(8)] +json.dump({'version': {'major': 1, 'minor': 0}, 'local': [{'npus': npus}]}, + open('resources.json', 'w')) +" + +# Run with resource allocation — CTest assigns devices, no oversubscription +ctest --test-dir tests/ut/cpp/build \ + -L "^requires_hardware(_a2a3)?$" \ + --resource-spec-file resources.json \ + -j$(nproc) --output-on-failure +``` + +CTest passes allocated device ids via environment variables: + +- `CTEST_RESOURCE_GROUP_COUNT` — number of groups +- `CTEST_RESOURCE_GROUP__NPUS` — `"id:,slots:1"` per group + +Tests read these to determine which devices to use. See `test_hccl_comm.cpp::read_ctest_devices()` for the parsing pattern. + ### New Scene Test Create a `test_*.py` file using the `@scene_test` decorator: diff --git a/src/a2a3/platform/include/common/comm_context.h b/src/a2a3/platform/include/common/comm_context.h new file mode 100644 index 000000000..bae98f014 --- /dev/null +++ b/src/a2a3/platform/include/common/comm_context.h @@ -0,0 +1,69 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ +/** + * CommContext — device-side distributed communication context. + * + * This struct is the ABI contract between host (comm_hccl.cpp / comm_sim.cpp) + * and device kernels. PTO communication instructions (TREDUCE, TGET, TPUT) + * access remote data through the GVA addresses in windowsIn[]/windowsOut[] + * via MTE2 DMA. + * + * On HCCL MESH topology the struct layout matches what HCCL returns directly. + * On RING topology the host builds it by extracting remote RDMA addresses + * from HcclOpResParam's remoteRes array. + * On simulation the host fills it with malloc'd pointers. + */ + +#pragma once + +#include +#include + +static constexpr uint32_t COMM_MAX_RANK_NUM = 64; + +struct CommContext { + uint64_t workSpace; + uint64_t workSpaceSize; + + uint32_t rankId; + uint32_t rankNum; + uint64_t winSize; + uint64_t windowsIn[COMM_MAX_RANK_NUM]; + uint64_t windowsOut[COMM_MAX_RANK_NUM]; +}; + +// The struct itself lives in this repo, so on the surface these asserts look +// like they only check that we do not contradict ourselves. Their real value +// is that this layout is consumed by *two* out-of-band parties that never see +// this header at the same time: +// +// 1. HCCL MESH topology: comm_hccl.cpp reinterpret_cast's HCCL's returned +// device-context pointer as CommContext*. The cast is only sound +// as long as our layout happens to match HCCL's internal MESH struct +// (verified by hand against CANN 9.x). Any accidental insert/reorder +// breaks that implicit match and device DMA reads silently garble. +// +// 2. Device kernels (AICore / AICPU) compiled with CCEC may apply slightly +// different alignment rules than host gcc. A host-side sizeof/offset +// lock is a necessary-but-not-sufficient guard; device side should add +// its own mirror asserts when it starts consuming this struct. +// +// Treat the numbers below as a tripwire: changing them is a deliberate act +// that forces the editor to re-verify both assumptions above, not a routine +// "oh I just added a field" edit. +static_assert(sizeof(CommContext) == 1056, "CommContext size shifted"); +static_assert(offsetof(CommContext, workSpace) == 0, "CommContext layout drift"); +static_assert(offsetof(CommContext, workSpaceSize) == 8, "CommContext layout drift"); +static_assert(offsetof(CommContext, rankId) == 16, "CommContext layout drift"); +static_assert(offsetof(CommContext, rankNum) == 20, "CommContext layout drift"); +static_assert(offsetof(CommContext, winSize) == 24, "CommContext layout drift"); +static_assert(offsetof(CommContext, windowsIn) == 32, "CommContext layout drift"); +static_assert(offsetof(CommContext, windowsOut) == 544, "CommContext layout drift"); diff --git a/src/a2a3/platform/include/host/comm.h b/src/a2a3/platform/include/host/comm.h new file mode 100644 index 000000000..eb4602202 --- /dev/null +++ b/src/a2a3/platform/include/host/comm.h @@ -0,0 +1,121 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ +/** + * Backend-neutral distributed communication C API. + * + * Provides five primitives for multi-rank communication: init, allocate + * shared windows, query local window base, barrier, and destroy. + * + * Implementations: + * onboard/host/comm_hccl.cpp — HCCL backend (links CANN hccl/hccl_fwk) + * sim/host/comm_sim.cpp — malloc-based simulation + * + * All functions are compiled into libhost_runtime.so. The linker selects + * the implementation at build time (onboard vs sim), with no runtime + * dispatch or virtual functions. + */ + +#pragma once + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct CommHandle_ *CommHandle; + +/** + * Initialize a communicator for the given rank. + * + * The caller is responsible for ACL/device lifecycle before this call: + * - aclInit() must have been performed at least once in the process + * (DeviceRunner::ensure_acl_ready() is the canonical owner). + * - aclrtSetDevice() must be in effect on the current thread. + * - `stream` is a pre-created aclrtStream owned by the caller; this + * module does not create or destroy streams. + * + * On the HCCL backend this performs the RootInfo exchange (rank 0 writes + * the file, others wait) and HcclCommInitRootInfo. + * + * @param rank This process's rank (0-based). + * @param nranks Total number of ranks. + * @param stream Caller-owned aclrtStream (passed as void*) used for + * HCCL operations like HcclBarrier and + * HcclAllocComResourceByTiling. Sim backend ignores it. + * @param rootinfo_path Filesystem path used to exchange root info between + * ranks (rank 0 writes, others read). + * @return Opaque handle, or NULL on failure. + */ +CommHandle comm_init(int rank, int nranks, void *stream, const char *rootinfo_path); + +/** + * Allocate RDMA / shared-memory windows and populate the device context. + * + * On HCCL this calls HcclAllocComResourceByTiling and extracts per-rank + * window addresses (MESH or RING topology). On sim it mallocs a shared + * region and partitions it. + * + * @param h Handle from comm_init(). + * @param win_size Window size hint (bytes per rank). The backend + * may allocate more; actual size is stored in the + * returned device context. + * @param device_ctx_out Receives a device pointer to a CommContext + * struct that can be passed to device kernels. + * @return 0 on success, non-zero on failure. + */ +int comm_alloc_windows(CommHandle h, size_t win_size, uint64_t *device_ctx_out); + +/** + * Get the base address of this rank's local window. + * + * Window buffers allocated via comm_alloc_windows() are contiguous per + * rank. This returns the start of the local rank's region. + * + * @param h Handle from comm_init(). + * @param base_out Receives the device-pointer base address. + * @return 0 on success, non-zero on failure. + */ +int comm_get_local_window_base(CommHandle h, uint64_t *base_out); + +/** + * Get the actual per-rank window size allocated by the backend. + * + * @param h Handle from comm_init(). + * @param size_out Receives the actual per-rank window size in bytes. + * @return 0 on success, non-zero on failure. + */ +int comm_get_window_size(CommHandle h, size_t *size_out); + +/** + * Synchronize all ranks. + * + * Blocks until every rank in the communicator has called comm_barrier(). + * + * @param h Handle from comm_init(). + * @return 0 on success, non-zero on failure. + */ +int comm_barrier(CommHandle h); + +/** + * Destroy the communicator and release all resources. + * + * After this call the handle is invalid. + * + * @param h Handle from comm_init(). + * @return 0 on success, non-zero on failure. + */ +int comm_destroy(CommHandle h); + +#ifdef __cplusplus +} +#endif diff --git a/src/a2a3/platform/onboard/host/CMakeLists.txt b/src/a2a3/platform/onboard/host/CMakeLists.txt index 2bef4d9ba..f259cfa22 100644 --- a/src/a2a3/platform/onboard/host/CMakeLists.txt +++ b/src/a2a3/platform/onboard/host/CMakeLists.txt @@ -39,6 +39,7 @@ list(APPEND HOST_RUNTIME_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/unified_log_host.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/performance_collector.cpp" "${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/tensor_dump_collector.cpp" + "${CMAKE_CURRENT_SOURCE_DIR}/comm_hccl.cpp" ) if(DEFINED CUSTOM_SOURCE_DIRS) foreach(SRC_DIR ${CUSTOM_SOURCE_DIRS}) @@ -91,6 +92,17 @@ target_link_directories(host_runtime ${ASCEND_HOME_PATH}/runtime/lib64 ) +# CANN 9.x exposes the working non-V2 HCCL entry points through libhcomm. +# Link it explicitly so comm_hccl.cpp can follow the same initialization path +# as the pto-isa communication tests. +find_library(HCOMM_LIB NAMES hcomm PATHS ${ASCEND_HOME_PATH}/lib64 NO_DEFAULT_PATH) +if(HCOMM_LIB) + set(HCCL_LINK_TARGETS hcomm) + message(STATUS "Using HCCL library: hcomm") +else() + message(FATAL_ERROR "libhcomm not found under ${ASCEND_HOME_PATH}/lib64") +endif() + # Link against CANN runtime libraries # ascend_hal is dynamically loaded at runtime via dlopen in device_runner # when performance profiling is enabled @@ -98,6 +110,7 @@ target_link_libraries(host_runtime PRIVATE runtime ascendcl + ${HCCL_LINK_TARGETS} dl ) diff --git a/src/a2a3/platform/onboard/host/comm_hccl.cpp b/src/a2a3/platform/onboard/host/comm_hccl.cpp new file mode 100644 index 000000000..56019b3ef --- /dev/null +++ b/src/a2a3/platform/onboard/host/comm_hccl.cpp @@ -0,0 +1,717 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ +/** + * HCCL backend for the comm_* distributed communication API. + * + * Implements the five functions declared in host/comm.h using Ascend + * HCCL (bundled with CANN). Handles both MESH and RING topologies + * when extracting per-rank RDMA window addresses. + */ + +#include "host/comm.h" +#include "common/comm_context.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "acl/acl.h" +#include "hccl/hccl_comm.h" +#include "hccl/hccl_types.h" + +using CommTopo = uint32_t; + +// Internal HCCL helpers are exported by libhcomm on CANN 9.x. The public +// HCCL APIs below intentionally use the standard, non-V2 entry points to match +// the working pto-isa initialization sequence. +extern "C" HcclResult HcclAllocComResourceByTiling(HcclComm comm, void *stream, void *mc2Tiling, void **commContext); +extern "C" HcclResult HcomGetCommHandleByGroup(const char *group, HcclComm *commHandle); +extern "C" HcclResult HcomGetL0TopoTypeEx(const char *group, CommTopo *topoType, uint32_t isSetDevice); + +static inline HcclResult hccl_get_root_info(HcclRootInfo *ri) { return HcclGetRootInfo(ri); } +static inline HcclResult hccl_comm_init_root_info(uint32_t n, const HcclRootInfo *ri, uint32_t r, HcclComm *c) { + return HcclCommInitRootInfo(n, ri, r, c); +} +static inline HcclResult hccl_get_comm_name(HcclComm c, char *name) { return HcclGetCommName(c, name); } +static inline HcclResult hccl_barrier(HcclComm c, aclrtStream s) { return HcclBarrier(c, s); } +static inline HcclResult hccl_comm_destroy(HcclComm c) { return HcclCommDestroy(c); } +static inline HcclResult hccl_alloc_com_resource(HcclComm c, void *s, void *t, void **ctx) { + return HcclAllocComResourceByTiling(c, s, t, ctx); +} +static inline HcclResult hccl_get_comm_handle_by_group(const char *g, HcclComm *c) { + return HcomGetCommHandleByGroup(g, c); +} +static inline HcclResult hccl_get_l0_topo_type_ex(const char *g, CommTopo *t, uint32_t f) { + return HcomGetL0TopoTypeEx(g, t, f); +} + +static constexpr uint32_t COMM_IS_NOT_SET_DEVICE = 0; +static constexpr uint32_t COMM_TOPO_MESH = 0b1u; + +// ============================================================================ +// HCCL tiling structures (required by HcclAllocComResourceByTiling) +// ============================================================================ + +namespace { + +static constexpr uint32_t MAX_CC_TILING_NUM = 8U; +static constexpr uint32_t GROUP_NAME_SIZE = 128U; +static constexpr uint32_t ALG_CONFIG_SIZE = 128U; + +struct Mc2InitTilingInner { + uint32_t version; + uint32_t mc2HcommCnt; + uint32_t offset[MAX_CC_TILING_NUM]; + uint8_t debugMode; + uint8_t preparePosition; + uint16_t queueNum; + uint16_t commBlockNum; + uint8_t devType; + char reserved[17]; +}; + +struct Mc2cCTilingInner { + uint8_t skipLocalRankCopy; + uint8_t skipBufferWindowCopy; + uint8_t stepSize; + uint8_t version; + char reserved[9]; + uint8_t commEngine; + uint8_t srcDataType; + uint8_t dstDataType; + char groupName[GROUP_NAME_SIZE]; + char algConfig[ALG_CONFIG_SIZE]; + uint32_t opType; + uint32_t reduceType; +}; + +struct Mc2CommConfigV2 { + Mc2InitTilingInner init; + Mc2cCTilingInner inner; +}; + +// HCCL compat structs for RING topology parsing +struct HcclSignalInfo { + uint64_t resId; + uint64_t addr; + uint32_t devId; + uint32_t tsId; + uint32_t rankId; + uint32_t flag; +}; + +struct HcclStreamInfo { + int32_t streamIds; + uint32_t sqIds; + uint32_t cqIds; + uint32_t logicCqids; +}; + +struct ListCommon { + uint64_t nextHost; + uint64_t preHost; + uint64_t nextDevice; + uint64_t preDevice; +}; + +static constexpr uint32_t COMPAT_LOCAL_NOTIFY_MAX_NUM = 64; +static constexpr uint32_t COMPAT_LOCAL_STREAM_MAX_NUM = 19; +static constexpr uint32_t COMPAT_AICPU_OP_NOTIFY_MAX_NUM = 2; + +struct LocalResInfoV2 { + uint32_t streamNum; + uint32_t signalNum; + HcclSignalInfo localSignals[COMPAT_LOCAL_NOTIFY_MAX_NUM]; + HcclStreamInfo streamInfo[COMPAT_LOCAL_STREAM_MAX_NUM]; + HcclStreamInfo mainStreamInfo; + HcclSignalInfo aicpuOpNotify[COMPAT_AICPU_OP_NOTIFY_MAX_NUM]; + ListCommon nextTagRes; +}; + +struct AlgoTopoInfo { + uint32_t userRank; + uint32_t userRankSize; + int32_t deviceLogicId; + bool isSingleMeshAggregation; + uint32_t deviceNumPerAggregation; + uint32_t superPodNum; + uint32_t devicePhyId; + uint32_t topoType; + uint32_t deviceType; + uint32_t serverNum; + uint32_t meshAggregationRankSize; + uint32_t multiModuleDiffDeviceNumMode; + uint32_t multiSuperPodDiffServerNumMode; + uint32_t realUserRank; + bool isDiffDeviceModule; + bool isDiffDeviceType; + uint32_t gcdDeviceNumPerAggregation; + uint32_t moduleNum; + uint32_t isUsedRdmaRankPairNum; + uint64_t isUsedRdmaRankPair; + uint32_t pairLinkCounterNum; + uint64_t pairLinkCounter; + uint32_t nicNum; + uint64_t nicList; + uint64_t complanRankLength; + uint64_t complanRank; + uint64_t bridgeRankNum; + uint64_t bridgeRank; + uint64_t serverAndsuperPodRankLength; + uint64_t serverAndsuperPodRank; +}; + +struct HcclOpConfig { + uint8_t deterministic; + uint8_t retryEnable; + uint8_t highPerfEnable; + uint8_t padding[5]; + uint8_t linkTimeOut[8]; + uint64_t notifyWaitTime; + uint32_t retryHoldTime; + uint32_t retryIntervalTime; + bool interXLinkDisable; + uint32_t floatOverflowMode; + uint32_t multiQpThreshold; +}; + +struct RemoteResPtr { + uint64_t nextHostPtr; + uint64_t nextDevicePtr; +}; + +struct HcclMC2WorkSpace { + uint64_t workspace; + uint64_t workspaceSize; +}; + +struct HcclRankRelationResV2 { + uint32_t remoteUsrRankId; + uint32_t remoteWorldRank; + uint64_t windowsIn; + uint64_t windowsOut; + uint64_t windowsExp; + ListCommon nextTagRes; +}; + +struct HcclOpResParamHead { + uint32_t localUsrRankId; + uint32_t rankSize; + uint64_t winSize; + uint64_t localWindowsIn; + uint64_t localWindowsOut; + char hcomId[128]; + uint64_t winExpSize; + uint64_t localWindowsExp; +}; + +struct HcclOpResParam { + HcclMC2WorkSpace mc2WorkSpace; + uint32_t localUsrRankId; + uint32_t rankSize; + uint64_t winSize; + uint64_t localWindowsIn; + uint64_t localWindowsOut; + char hcomId[128]; + uint64_t winExpSize; + uint64_t localWindowsExp; + uint32_t rWinStart; + uint32_t rWinOffset; + uint64_t version; + LocalResInfoV2 localRes; + AlgoTopoInfo topoInfo; + HcclOpConfig config; + uint64_t hostStateInfo; + uint64_t aicpuStateInfo; + uint64_t lockAddr; + uint32_t rsv[16]; + uint32_t notifysize; + uint32_t remoteResNum; + RemoteResPtr remoteRes[1]; +}; + +// Layout contract with CANN 9.x libhcomm. These asserts convert a silent +// "field offset shifted -> we read garbage from aclrtMemcpy" failure mode +// into a compile error. If CANN upgrades change any of these, re-verify +// the struct against the new libhcomm source before bumping the numbers. +static_assert(sizeof(HcclRankRelationResV2) == 64, "HcclRankRelationResV2 size drift"); +static_assert(offsetof(HcclRankRelationResV2, windowsIn) == 8, "HcclRankRelationResV2 layout drift"); +static_assert(sizeof(LocalResInfoV2) == 2472, "LocalResInfoV2 size drift"); +static_assert(sizeof(HcclOpResParam) == 3000, "HcclOpResParam size drift"); +static_assert(offsetof(HcclOpResParam, localUsrRankId) == 16, "HcclOpResParam layout drift"); +static_assert(offsetof(HcclOpResParam, rankSize) == 20, "HcclOpResParam layout drift"); +static_assert(offsetof(HcclOpResParam, winSize) == 24, "HcclOpResParam layout drift"); +static_assert(offsetof(HcclOpResParam, localWindowsIn) == 32, "HcclOpResParam layout drift"); +static_assert(offsetof(HcclOpResParam, remoteRes) == 2984, "HcclOpResParam layout drift"); + +// Magic numbers required by HcclAllocComResourceByTiling. These are CANN +// internal enum values with no public header; names + comments record intent. +// Changing any of them changes the semantics of the MC2 resource request. +static constexpr uint32_t kMc2TilingVersion = 100U; // Mc2InitTilingInner::version +static constexpr uint32_t kMc2CommBlockNum = 48U; // Hardware comm block count (A2/A3 topology) +static constexpr uint8_t kMc2DevType = 4U; // devType = Ascend 910B family +static constexpr uint8_t kMc2InnerVersion = 1U; // Mc2cCTilingInner::version +static constexpr uint32_t kMc2OpTypeBatchWrite = 18U; // opType = BatchWrite (MC2 SDMA path) +static constexpr uint8_t kMc2CommEngineSdma = 3U; // commEngine = SDMA +static constexpr const char *kMc2AlgConfig = "BatchWrite=level0:fullmesh"; + +} // anonymous namespace + +// ============================================================================ +// Internal state +// ============================================================================ + +struct CommHandle_ { + int rank; + int nranks; + std::string rootinfo_path; + uint64_t run_token = 0; + + // Caller-owned: supplied to comm_init, never created or destroyed here. + aclrtStream stream = nullptr; + HcclComm hccl_comm = nullptr; + + CommContext host_ctx{}; + CommContext *device_ctx = nullptr; + bool owns_device_ctx = false; +}; + +// ============================================================================ +// Helpers +// ============================================================================ + +namespace { + +static constexpr uint64_t ROOTINFO_MAGIC = 0x50544f5f4843434cULL; // "PTO_HCCL" + +struct RootInfoFileHeader { + uint64_t magic = ROOTINFO_MAGIC; + uint64_t run_token = 0; + uint32_t payload_size = HCCL_ROOT_INFO_BYTES; + uint32_t reserved = 0; +}; + +static std::string handshake_dir(const std::string &rootinfo_path) { + auto last_slash = rootinfo_path.rfind('/'); + if (last_slash == std::string::npos) return "."; + return rootinfo_path.substr(0, last_slash); +} + +static std::string handshake_prefix(const std::string &rootinfo_path) { + auto last_slash = rootinfo_path.rfind('/'); + return last_slash == std::string::npos ? rootinfo_path : rootinfo_path.substr(last_slash + 1); +} + +static std::string run_token_hex(uint64_t run_token) { + std::ostringstream oss; + oss << std::hex << run_token; + return oss.str(); +} + +static uint64_t make_run_token(int rank) { + // steady_clock is monotonic and unaffected by NTP or wall-clock jumps; + // we only need within-host uniqueness for the handshake file naming. + auto now = std::chrono::time_point_cast(std::chrono::steady_clock::now()) + .time_since_epoch() + .count(); + uint64_t token = static_cast(now); + token ^= static_cast(getpid()) << 16; + token ^= static_cast(rank & 0xFFFF); + return token; +} + +static std::string +barrier_marker_path(const std::string &rootinfo_path, uint64_t run_token, const std::string &tag, int rank) { + return handshake_dir(rootinfo_path) + "/barrier_" + handshake_prefix(rootinfo_path) + "_" + tag + "_" + + run_token_hex(run_token) + "_" + std::to_string(rank) + ".ready"; +} + +static void cleanup_handshake_files(const std::string &rootinfo_path) { + std::error_code ec; + std::filesystem::remove(rootinfo_path, ec); + + const std::string prefix = "barrier_" + handshake_prefix(rootinfo_path) + "_"; + const std::string dir = handshake_dir(rootinfo_path); + for (const auto &entry : std::filesystem::directory_iterator(dir, ec)) { + if (ec) break; + if (!entry.is_regular_file(ec)) continue; + const std::string name = entry.path().filename().string(); + if (name.rfind(prefix, 0) != 0) continue; + if (name.size() < 6 || name.substr(name.size() - 6) != ".ready") continue; + std::filesystem::remove(entry.path(), ec); + ec.clear(); + } +} + +static bool +wait_for_rootinfo(const std::string &path, HcclRootInfo *root_info, uint64_t *run_token, int timeout_sec = 120) { + for (int i = 0; i < timeout_sec * 10; ++i) { + std::ifstream f(path, std::ios::binary); + if (f.good()) { + RootInfoFileHeader header{}; + f.read(reinterpret_cast(&header), sizeof(header)); + if (!f.good()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + continue; + } + if (header.magic != ROOTINFO_MAGIC || header.payload_size != HCCL_ROOT_INFO_BYTES) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + continue; + } + f.read(root_info->internal, HCCL_ROOT_INFO_BYTES); + if (!f.good()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + continue; + } + *run_token = header.run_token; + return true; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + return false; +} + +static bool file_barrier( + const std::string &rootinfo_path, int rank, int nranks, const std::string &tag, uint64_t run_token, + int timeout_sec = 120 +) { + std::string my_marker = barrier_marker_path(rootinfo_path, run_token, tag, rank); + { std::ofstream(my_marker) << "1"; } + + const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(timeout_sec); + for (int r = 0; r < nranks; ++r) { + std::string marker = barrier_marker_path(rootinfo_path, run_token, tag, r); + while (true) { + std::ifstream f(marker); + if (f.good()) break; + if (std::chrono::steady_clock::now() >= deadline) { + fprintf( + stderr, "[comm rank %d] file_barrier('%s') timed out after %ds waiting for rank %d\n", rank, + tag.c_str(), timeout_sec, r + ); + return false; + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + } + return true; +} + +} // namespace + +// ============================================================================ +// API implementation +// ============================================================================ + +extern "C" CommHandle comm_init(int rank, int nranks, void *stream, const char *rootinfo_path) try { + if (stream == nullptr) { + fprintf(stderr, "[comm rank %d] comm_init: caller-supplied stream is null\n", rank); + return nullptr; + } + if (rootinfo_path == nullptr || *rootinfo_path == '\0') { + fprintf(stderr, "[comm rank %d] comm_init: rootinfo_path is null or empty\n", rank); + return nullptr; + } + if (nranks <= 0 || rank < 0 || rank >= nranks) { + fprintf(stderr, "[comm rank %d] comm_init: invalid rank/nranks (rank=%d, nranks=%d)\n", rank, rank, nranks); + return nullptr; + } + if (static_cast(nranks) > COMM_MAX_RANK_NUM) { + fprintf( + stderr, "[comm rank %d] comm_init: nranks=%d exceeds COMM_MAX_RANK_NUM=%u\n", rank, nranks, + COMM_MAX_RANK_NUM + ); + return nullptr; + } + + auto *h = new (std::nothrow) CommHandle_{}; + if (!h) return nullptr; + + h->rank = rank; + h->nranks = nranks; + h->rootinfo_path = rootinfo_path; + h->stream = static_cast(stream); + + // NOTE: aclInit / aclrtSetDevice / stream creation are intentionally NOT + // performed here — the caller (DeviceRunner::ensure_acl_ready + a stream + // it owns) is responsible for them. This keeps ACL lifecycle ownership + // in one place (DeviceRunner) and matches HCCL's API shape, which already + // takes a caller-supplied stream. + + // RootInfo exchange + HcclRootInfo rootInfo{}; + if (rank == 0) { + cleanup_handshake_files(h->rootinfo_path); + h->run_token = make_run_token(rank); + HcclResult hret = hccl_get_root_info(&rootInfo); + if (hret != HCCL_SUCCESS) { + fprintf(stderr, "[comm rank 0] HcclGetRootInfo failed: %d\n", (int)hret); + delete h; + return nullptr; + } + RootInfoFileHeader header{}; + header.run_token = h->run_token; + std::string tmp_path = h->rootinfo_path + ".tmp." + std::to_string(getpid()); + std::ofstream fout(tmp_path, std::ios::binary | std::ios::trunc); + fout.write(reinterpret_cast(&header), sizeof(header)); + fout.write(rootInfo.internal, HCCL_ROOT_INFO_BYTES); + fout.close(); + if (!fout.good() || std::rename(tmp_path.c_str(), h->rootinfo_path.c_str()) != 0) { + std::remove(tmp_path.c_str()); + delete h; + return nullptr; + } + } else { + if (!wait_for_rootinfo(h->rootinfo_path, &rootInfo, &h->run_token)) { + fprintf(stderr, "[comm rank %d] Timeout waiting for rootinfo\n", rank); + delete h; + return nullptr; + } + } + + if (!file_barrier(h->rootinfo_path, h->rank, h->nranks, "rootinfo_ready", h->run_token)) { + delete h; + return nullptr; + } + + // Init communicator + HcclResult hret = + hccl_comm_init_root_info(static_cast(nranks), &rootInfo, static_cast(rank), &h->hccl_comm); + if (hret != HCCL_SUCCESS) { + fprintf(stderr, "[comm rank %d] HcclCommInitRootInfo failed: %d\n", rank, (int)hret); + delete h; + return nullptr; + } + + return h; +} catch (const std::exception &e) { + fprintf(stderr, "[comm rank %d] comm_init: exception: %s\n", rank, e.what()); + return nullptr; +} catch (...) { + fprintf(stderr, "[comm rank %d] comm_init: unknown exception\n", rank); + return nullptr; +} + +extern "C" int comm_alloc_windows(CommHandle h, size_t /*win_size*/, uint64_t *device_ctx_out) try { + if (!h || !device_ctx_out) return -1; + + char group[128] = {}; + HcclResult hret = hccl_get_comm_name(h->hccl_comm, group); + if (hret != HCCL_SUCCESS) return -1; + + CommTopo topoType = 0; + hret = hccl_get_l0_topo_type_ex(group, &topoType, COMM_IS_NOT_SET_DEVICE); + if (hret != HCCL_SUCCESS) return -1; + + HcclComm commHandle = nullptr; + hret = hccl_get_comm_handle_by_group(group, &commHandle); + if (hret != HCCL_SUCCESS) return -1; + + // File barrier so all ranks have completed HcclCommInitRootInfo + if (!file_barrier(h->rootinfo_path, h->rank, h->nranks, "hccl_init", h->run_token)) { + return -1; + } + + // Tiling configuration for HcclAllocComResourceByTiling. See + // kMc2* constants above for the meaning of each magic value. + Mc2CommConfigV2 tiling{}; + memset(&tiling, 0, sizeof(tiling)); + tiling.init.version = kMc2TilingVersion; + tiling.init.mc2HcommCnt = 1U; + tiling.init.commBlockNum = kMc2CommBlockNum; + tiling.init.devType = kMc2DevType; + tiling.init.offset[0] = + static_cast(reinterpret_cast(&tiling.inner) - reinterpret_cast(&tiling.init)); + tiling.inner.opType = kMc2OpTypeBatchWrite; + tiling.inner.commEngine = kMc2CommEngineSdma; + tiling.inner.version = kMc2InnerVersion; + strncpy(tiling.inner.groupName, group, GROUP_NAME_SIZE - 1); + strncpy(tiling.inner.algConfig, kMc2AlgConfig, ALG_CONFIG_SIZE - 1); + + void *ctxPtr = nullptr; + hret = hccl_alloc_com_resource(commHandle, h->stream, &tiling, &ctxPtr); + if (hret != HCCL_SUCCESS || ctxPtr == nullptr) return -1; + + // Extract CommContext (topology-dependent) + aclError aRet; + if (topoType == COMM_TOPO_MESH) { + h->device_ctx = reinterpret_cast(ctxPtr); + aRet = aclrtMemcpy( + &h->host_ctx, sizeof(h->host_ctx), h->device_ctx, sizeof(h->host_ctx), ACL_MEMCPY_DEVICE_TO_HOST + ); + if (aRet != ACL_SUCCESS) return -1; + if (h->host_ctx.rankNum == 0 || h->host_ctx.rankNum > COMM_MAX_RANK_NUM) { + fprintf( + stderr, "[comm rank %d] MESH CommContext.rankNum=%u out of range [1, %u]\n", h->rank, + h->host_ctx.rankNum, COMM_MAX_RANK_NUM + ); + return -1; + } + } else { + // RING topology: parse HcclOpResParam structure on device + auto *rawCtx = reinterpret_cast(ctxPtr); + + HcclOpResParamHead head{}; + const size_t headOff = offsetof(HcclOpResParam, localUsrRankId); + aRet = aclrtMemcpy(&head, sizeof(head), rawCtx + headOff, sizeof(head), ACL_MEMCPY_DEVICE_TO_HOST); + if (aRet != ACL_SUCCESS) return -1; + + // rankSize comes from device memory; cap against our static windowsIn + // buffer (COMM_MAX_RANK_NUM) before using it to index or size. + if (head.rankSize == 0 || head.rankSize > COMM_MAX_RANK_NUM) { + fprintf( + stderr, "[comm rank %d] HcclOpResParam.rankSize=%u out of range [1, %u]\n", h->rank, head.rankSize, + COMM_MAX_RANK_NUM + ); + return -1; + } + + const size_t remoteResOff = offsetof(HcclOpResParam, remoteRes); + const size_t remoteResBytes = head.rankSize * sizeof(RemoteResPtr); + std::vector remoteResArr(head.rankSize); + aRet = aclrtMemcpy( + remoteResArr.data(), remoteResBytes, rawCtx + remoteResOff, remoteResBytes, ACL_MEMCPY_DEVICE_TO_HOST + ); + if (aRet != ACL_SUCCESS) return -1; + + memset(&h->host_ctx, 0, sizeof(h->host_ctx)); + + uint64_t wsFields[2] = {0, 0}; + aRet = aclrtMemcpy(wsFields, sizeof(wsFields), rawCtx, sizeof(wsFields), ACL_MEMCPY_DEVICE_TO_HOST); + if (aRet != ACL_SUCCESS) return -1; + h->host_ctx.workSpace = wsFields[0]; + h->host_ctx.workSpaceSize = wsFields[1]; + h->host_ctx.rankId = head.localUsrRankId; + h->host_ctx.rankNum = head.rankSize; + h->host_ctx.winSize = head.winSize; + + for (uint32_t i = 0; i < head.rankSize; ++i) { + if (i == head.localUsrRankId) { + h->host_ctx.windowsIn[i] = head.localWindowsIn; + h->host_ctx.windowsOut[i] = head.localWindowsOut; + continue; + } + uint64_t devPtr = remoteResArr[i].nextDevicePtr; + if (devPtr == 0) return -1; + + HcclRankRelationResV2 remoteInfo{}; + aRet = aclrtMemcpy( + &remoteInfo, sizeof(remoteInfo), reinterpret_cast(devPtr), sizeof(remoteInfo), + ACL_MEMCPY_DEVICE_TO_HOST + ); + if (aRet != ACL_SUCCESS) return -1; + h->host_ctx.windowsIn[i] = remoteInfo.windowsIn; + h->host_ctx.windowsOut[i] = remoteInfo.windowsOut; + } + + void *newDevMem = nullptr; + aRet = aclrtMalloc(&newDevMem, sizeof(CommContext), ACL_MEM_MALLOC_HUGE_FIRST); + if (aRet != ACL_SUCCESS) return -1; + + aRet = + aclrtMemcpy(newDevMem, sizeof(CommContext), &h->host_ctx, sizeof(CommContext), ACL_MEMCPY_HOST_TO_DEVICE); + if (aRet != ACL_SUCCESS) { + aclrtFree(newDevMem); + return -1; + } + h->device_ctx = reinterpret_cast(newDevMem); + h->owns_device_ctx = true; + } + + *device_ctx_out = reinterpret_cast(h->device_ctx); + return 0; +} catch (const std::exception &e) { + fprintf(stderr, "[comm] comm_alloc_windows: exception: %s\n", e.what()); + return -1; +} catch (...) { + fprintf(stderr, "[comm] comm_alloc_windows: unknown exception\n"); + return -1; +} + +extern "C" int comm_get_local_window_base(CommHandle h, uint64_t *base_out) { + if (!h || !base_out) return -1; + *base_out = h->host_ctx.windowsIn[h->rank]; + return 0; +} + +extern "C" int comm_get_window_size(CommHandle h, size_t *size_out) { + if (!h || !size_out) return -1; + *size_out = static_cast(h->host_ctx.winSize); + return 0; +} + +extern "C" int comm_barrier(CommHandle h) { + if (!h) return -1; + // HcclBarrier is synchronous — it blocks until all ranks arrive. + // Do NOT call aclrtSynchronizeStream after it: HcclBarrier internally + // switches the thread's ACL context, which invalidates the caller-owned + // stream for context-checked ACL calls (error 507018). + HcclResult hret = hccl_barrier(h->hccl_comm, h->stream); + if (hret != HCCL_SUCCESS) { + fprintf(stderr, "[comm rank %d] HcclBarrier failed: %d\n", h->rank, static_cast(hret)); + return static_cast(hret); + } + return 0; +} + +extern "C" int comm_destroy(CommHandle h) try { + if (!h) return -1; + + // Final barrier is best-effort: if a peer already crashed we still need to + // release the local resources we own, so timeout just logs and proceeds. + int rc = 0; + if (!file_barrier(h->rootinfo_path, h->rank, h->nranks, "destroy", h->run_token)) { + fprintf( + stderr, "[comm rank %d] comm_destroy: final barrier timed out; releasing local state anyway\n", h->rank + ); + rc = -1; + } + + if (h->owns_device_ctx && h->device_ctx) { + aclrtFree(h->device_ctx); + } + if (h->hccl_comm) { + HcclResult hret = hccl_comm_destroy(h->hccl_comm); + if (hret != HCCL_SUCCESS) { + fprintf(stderr, "[comm rank %d] HcclCommDestroy failed: %d\n", h->rank, static_cast(hret)); + if (rc == 0) rc = -1; + } + } + + // NOTE: we do NOT destroy h->stream — it is caller-owned. + // We also do NOT call aclrtResetDevice / aclFinalize here. Device/ACL + // lifecycle belongs to DeviceRunner, whose finalize() releases all + // device memory before resetting the device and running aclFinalize. + + if (h->rank == 0) { + cleanup_handshake_files(h->rootinfo_path); + } + + delete h; + return rc; +} catch (const std::exception &e) { + fprintf(stderr, "[comm] comm_destroy: exception: %s\n", e.what()); + if (h) delete h; + return -1; +} catch (...) { + fprintf(stderr, "[comm] comm_destroy: unknown exception\n"); + if (h) delete h; + return -1; +} diff --git a/src/a2a3/platform/onboard/host/device_runner.cpp b/src/a2a3/platform/onboard/host/device_runner.cpp index 8f08c4403..8996d5e1f 100644 --- a/src/a2a3/platform/onboard/host/device_runner.cpp +++ b/src/a2a3/platform/onboard/host/device_runner.cpp @@ -23,6 +23,7 @@ #include #include #include +#include "acl/acl.h" // Include HAL constants from CANN (header only, library loaded dynamically) #include "ascend_hal.h" @@ -270,6 +271,34 @@ int DeviceRunner::attach_current_thread(int device_id) { return 0; } +int DeviceRunner::ensure_acl_ready(int device_id) { + if (device_id < 0) { + LOG_ERROR("ensure_acl_ready: invalid device_id %d", device_id); + return -1; + } + + // aclInit is process-wide; CANN returns 100002 if it has already been + // initialized (possibly by another owner), which we treat as success. + constexpr int kAclRepeatInit = 100002; + aclError aRet = aclInit(nullptr); + if (aRet != ACL_SUCCESS && static_cast(aRet) != kAclRepeatInit) { + LOG_ERROR("aclInit failed: %d", static_cast(aRet)); + return static_cast(aRet); + } + + // ACL device binding is per-thread; every caller must still hit it. + aRet = aclrtSetDevice(device_id); + if (aRet != ACL_SUCCESS) { + LOG_ERROR("aclrtSetDevice(%d) failed: %d", device_id, static_cast(aRet)); + return static_cast(aRet); + } + + // Record that we are responsible for aclFinalize at teardown. + acl_ready_ = true; + if (device_id_ < 0) device_id_ = device_id; + return 0; +} + int DeviceRunner::prepare_run_context(int device_id) { int rc = attach_current_thread(device_id); if (rc != 0) { @@ -723,10 +752,21 @@ int DeviceRunner::finalize() { // Free all remaining allocations (including handshake buffer and binGmAddr) mem_alloc_.finalize(); - rc = rtDeviceReset(device_id_); - if (rc != 0) { - LOG_ERROR("rtDeviceReset(%d) failed during finalize: %d", device_id_, rc); - return rc; + // Reset device and finalize ACL AFTER all device memory is freed. + // Gated on acl_ready_ so rt-only runtimes that never called + // ensure_acl_ready() do not try to aclFinalize an un-init'd ACL state. + if (acl_ready_ && device_id_ >= 0) { + int reset_rc = aclrtResetDevice(device_id_); + if (reset_rc != 0) { + LOG_ERROR("aclrtResetDevice(%d) failed during finalize: %d", device_id_, reset_rc); + rc = reset_rc; + } + int finalize_rc = aclFinalize(); + if (finalize_rc != 0) { + LOG_ERROR("aclFinalize failed during finalize: %d", finalize_rc); + if (rc == 0) rc = finalize_rc; + } + acl_ready_ = false; } device_id_ = -1; @@ -735,7 +775,7 @@ int DeviceRunner::finalize() { aicore_kernel_binary_.clear(); LOG_INFO("DeviceRunner finalized"); - return 0; + return rc; } int DeviceRunner::launch_aicpu_kernel(rtStream_t stream, KernelArgs *k_args, const char *kernel_name, int aicpu_num) { diff --git a/src/a2a3/platform/onboard/host/device_runner.h b/src/a2a3/platform/onboard/host/device_runner.h index 4244224fc..1760964b5 100644 --- a/src/a2a3/platform/onboard/host/device_runner.h +++ b/src/a2a3/platform/onboard/host/device_runner.h @@ -363,6 +363,22 @@ class DeviceRunner { */ int attach_current_thread(int device_id); + /** + * Make the ACL context ready on the current thread. + * + * Calls aclInit() once per process (subsequent calls are idempotent and + * tolerate the ACL_ERROR_REPEAT_INITIALIZE sentinel) and aclrtSetDevice() + * on the current thread. This is the entry point for consumers that need + * to call acl* / Hccl* APIs (for example the comm_hccl backend) but + * intentionally do not want those modules to own ACL lifecycle themselves. + * + * Symmetric with finalize(): aclrtResetDevice + aclFinalize run there. + * + * @param device_id Device ID to bind on the current thread. + * @return 0 on success, error code on failure. + */ + int ensure_acl_ready(int device_id); + /** * Ensure the current thread has fresh run-scoped streams. * @@ -405,6 +421,11 @@ class DeviceRunner { bool binaries_loaded_{false}; // true after AICPU SO loaded std::map func_id_to_addr_; // func_id -> function_bin_addr (device GM) + // ACL lifecycle (process-wide). aclInit must run exactly once; ensure_acl_ready + // gates it behind this flag. finalize() drives aclFinalize only if we observed + // acl_ready_, so runtimes that never ask for ACL (e.g. pure rt-layer) stay unaffected. + bool acl_ready_{false}; + // Performance profiling PerformanceCollector perf_collector_; diff --git a/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp b/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp index e2949edd4..ff961fa65 100644 --- a/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp +++ b/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp @@ -119,6 +119,15 @@ int set_device(DeviceContextHandle ctx, int device_id) { return 0; } +int ensure_acl_ready_ctx(DeviceContextHandle ctx, int device_id) { + if (ctx == NULL) return -1; + try { + return static_cast(ctx)->ensure_acl_ready(device_id); + } catch (...) { + return -1; + } +} + void *device_malloc_ctx(DeviceContextHandle ctx, size_t size) { if (ctx == NULL) return NULL; try { diff --git a/tests/ut/cpp/CMakeLists.txt b/tests/ut/cpp/CMakeLists.txt index f7db14f59..d87099ed3 100644 --- a/tests/ut/cpp/CMakeLists.txt +++ b/tests/ut/cpp/CMakeLists.txt @@ -18,8 +18,26 @@ set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) +# Gate for tests that may need CANN headers/libs at build time or that only +# make sense on Ascend hardware runners. Default OFF so no-hw runners +# (ubuntu-latest / macos-latest, no CANN installed) can build the tree +# without touching anything CANN-adjacent. CI's hw jobs +# (ut-a2a3 / ut-a5) pass -DSIMPLER_ENABLE_HARDWARE_TESTS=ON. +# +# Rule of thumb: any new test that either +# (a) links against CANN libs (libascendcl, libhcomm, libruntime), or +# (b) needs to dlopen an onboard libhost_runtime.so to be meaningful, +# goes under `if(SIMPLER_ENABLE_HARDWARE_TESTS)` below. +option(SIMPLER_ENABLE_HARDWARE_TESTS + "Build tests that require Ascend hardware toolchain to build or run" + OFF) + # --------------------------------------------------------------------------- -# GoogleTest +# GoogleTest — fast path uses the system install (apt libgtest-dev / brew +# googletest) on GH-hosted runners. Self-hosted (a2a3/a5) runners don't +# have GoogleTest preinstalled, so we fall back to FetchContent from +# upstream. Either way the rest of the file sees GTEST_LIB / GTEST_MAIN_LIB +# / GTEST_INCLUDE_DIRS with a consistent meaning. # --------------------------------------------------------------------------- if(APPLE) set(GTEST_SEARCH_PATHS /opt/homebrew/lib /usr/local/lib) @@ -29,8 +47,32 @@ else() set(GTEST_INCLUDE_DIRS /usr/local/include) endif() -find_library(GTEST_LIB gtest PATHS ${GTEST_SEARCH_PATHS} REQUIRED) -find_library(GTEST_MAIN_LIB gtest_main PATHS ${GTEST_SEARCH_PATHS} REQUIRED) +find_library(GTEST_LIB gtest PATHS ${GTEST_SEARCH_PATHS}) +find_library(GTEST_MAIN_LIB gtest_main PATHS ${GTEST_SEARCH_PATHS}) + +if(NOT GTEST_LIB OR NOT GTEST_MAIN_LIB) + message(STATUS "System GoogleTest not found — fetching via FetchContent") + # Match the ABI the tests themselves compile with + # (-D_GLIBCXX_USE_CXX11_ABI=0 — see add_hierarchical_test / add_comm_api_test). + # Without this the FetchContent-built libgtest.a ends up with the default + # cxx11 ABI and link fails against our tests. + add_compile_options(-D_GLIBCXX_USE_CXX11_ABI=0) + + include(FetchContent) + FetchContent_Declare( + googletest + URL https://github.com/google/googletest/archive/refs/tags/v1.14.0.tar.gz + URL_HASH SHA256=8ad598c73ad796e0d8280b082cebd82a630d73e73cd3c70057938a6501bba5d7 + ) + set(INSTALL_GTEST OFF CACHE BOOL "" FORCE) + FetchContent_MakeAvailable(googletest) + # FetchContent exposes the gtest / gtest_main targets directly; the + # per-test functions below use these names uniformly with the system + # path (where they resolve to the absolute .so/.a path via find_library). + set(GTEST_LIB gtest) + set(GTEST_MAIN_LIB gtest_main) + set(GTEST_INCLUDE_DIRS "") # include dirs are carried by the gtest target +endif() # --------------------------------------------------------------------------- # Distributed runtime sources under test @@ -134,3 +176,64 @@ endfunction() add_task_interface_test(test_child_memory test_child_memory.cpp) add_a2a3_pto2_test(test_a2a3_pto2_fatal test_a2a3_pto2_fatal.cpp) add_a5_pto2_test(test_a5_pto2_fatal test_a5_pto2_fatal.cpp) + +# Hardware-gated tests. Block is only entered when the project is configured +# with -DSIMPLER_ENABLE_HARDWARE_TESTS=ON. CI's no-hw `ut` job does not pass +# this flag, so nothing here is compiled there — this is the structural +# invariant that lets future hw tests freely link against CANN libraries +# without breaking the no-hw build. +if(SIMPLER_ENABLE_HARDWARE_TESTS) + # Hardware-only UT for the HCCL backend of the comm_* C API. + # + # libhost_runtime.so is loaded via dlopen at runtime — it is the + # subject under test and mirrors how ChipWorker selects a runtime + # backend in production. libascendcl.so, by contrast, is generic + # CANN infrastructure: we link it directly so the test code uses + # types and values normally instead of going through + # dlsym boilerplate. + # + # ctest label still gates execution via + # -L "^requires_hardware(_a2a3)?$" + # so we can both (a) only build on hw runners, and (b) still filter + # by runtime role when multiple hw tests coexist. + set(PROJECT_ROOT "${CMAKE_SOURCE_DIR}/../../..") + set(HOST_RUNTIME_LIB + "${PROJECT_ROOT}/build/lib/a2a3/onboard/tensormap_and_ringbuffer/libhost_runtime.so" + ) + + if(NOT DEFINED ENV{ASCEND_HOME_PATH}) + message(FATAL_ERROR + "SIMPLER_ENABLE_HARDWARE_TESTS=ON requires ASCEND_HOME_PATH to be set " + "(usually via `source $ASCEND_HOME_PATH/bin/setenv.bash`)") + endif() + set(ASCEND_HOME_PATH "$ENV{ASCEND_HOME_PATH}") + find_library(ASCENDCL_LIB ascendcl + PATHS ${ASCEND_HOME_PATH}/lib64 NO_DEFAULT_PATH REQUIRED) + + function(add_comm_api_test name src) + add_executable(${name} ${src}) + target_include_directories(${name} PRIVATE + ${GTEST_INCLUDE_DIRS} + ${PROJECT_ROOT}/src/a2a3/platform/include + ${ASCEND_HOME_PATH}/include + ) + target_compile_options(${name} PRIVATE -D_GLIBCXX_USE_CXX11_ABI=0) + target_compile_definitions(${name} PRIVATE + PTO_HOST_RUNTIME_LIB_PATH="${HOST_RUNTIME_LIB}" + ) + target_link_libraries(${name} PRIVATE + ${GTEST_MAIN_LIB} + ${GTEST_LIB} + ${ASCENDCL_LIB} + pthread + dl + ) + add_test(NAME ${name} COMMAND ${name}) + set_tests_properties(${name} PROPERTIES + LABELS "requires_hardware_a2a3" + RESOURCE_GROUPS "2,npus:1" + ) + endfunction() + + add_comm_api_test(test_hccl_comm test_hccl_comm.cpp) +endif() diff --git a/tests/ut/cpp/test_hccl_comm.cpp b/tests/ut/cpp/test_hccl_comm.cpp new file mode 100644 index 000000000..1e597e3f9 --- /dev/null +++ b/tests/ut/cpp/test_hccl_comm.cpp @@ -0,0 +1,339 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ + +/* + * Hardware UT guarding the CANN/HCCL-private ABI coupling in comm_hccl.cpp. + * + * The call chain (dlopen → create_device_context → ensure_acl_ready_ctx → + * aclrtCreateStream → comm_init → comm_alloc_windows → ...) is not the + * interesting part — the interesting part is *what's inside* CommContext + * after comm_alloc_windows returns. That struct comes from one of: + * + * - MESH topology: `reinterpret_cast(HCCL's return ptr)` — + * our layout is *assumed* to match HCCL's internal MESH context. + * - RING topology: our parser reads HcclOpResParam / HcclRankRelationResV2 + * field-by-field using offsetof against reverse-engineered struct defs. + * + * Both paths silently break if CANN (or libhcomm.so specifically) shifts + * any of those internal layouts. The happy-path "six functions return 0" + * check cannot catch that, because the calls succeed with garbage fields. + * + * This test copies the populated CommContext back to host and asserts: + * rankId == (this process's rank) + * rankNum == (nranks we passed to comm_init) + * winSize == comm_get_window_size(h) (cross-API consistency) + * windowsIn[rank] == comm_get_local_window_base(h) + * windowsIn[0..nranks-1] all non-zero (every peer GVA populated) + * + * A CANN upgrade that moves any of these fields lands here as EXIT_CTX_FIELDS + * rather than silently producing wrong device DMA addresses. + * + * Hardware classification: requires_hardware_a2a3 (ctest label) + CMake + * gate SIMPLER_ENABLE_HARDWARE_TESTS. Device allocation is driven by + * CTest RESOURCE_GROUPS + --resource-spec-file. + * + * Linking strategy: libhost_runtime.so is dlopen'd — it is the subject + * under test and mirrors how ChipWorker loads a runtime backend in + * production. libascendcl.so is linked directly at compile time because + * it is generic CANN infra; going through dlsym for acl* here buys nothing + * and only hides types behind void pointers. + * + * PTO_HOST_RUNTIME_LIB_PATH is baked in at configure time by + * tests/ut/cpp/CMakeLists.txt. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "acl/acl.h" +#include "common/comm_context.h" +#include "host/comm.h" + +namespace { + +// Function pointers from libhost_runtime.so (comm_* C API + DeviceRunner wiring) +struct HostRuntimeApi { + void *(*create_device_context)(); + void (*destroy_device_context)(void *); + int (*ensure_acl_ready_ctx)(void *, int); + + CommHandle (*comm_init)(int, int, void *, const char *); + int (*comm_alloc_windows)(CommHandle, size_t, uint64_t *); + int (*comm_get_local_window_base)(CommHandle, uint64_t *); + int (*comm_get_window_size)(CommHandle, size_t *); + int (*comm_barrier)(CommHandle); + int (*comm_destroy)(CommHandle); +}; + +template +F resolve(void *handle, const char *name) { + dlerror(); + void *sym = dlsym(handle, name); + if (dlerror() != nullptr) return nullptr; + return reinterpret_cast(sym); +} + +bool load_host_runtime_api(void *handle, HostRuntimeApi &api) { + api.create_device_context = resolve(handle, "create_device_context"); + api.destroy_device_context = resolve(handle, "destroy_device_context"); + api.ensure_acl_ready_ctx = resolve(handle, "ensure_acl_ready_ctx"); + api.comm_init = resolve(handle, "comm_init"); + api.comm_alloc_windows = resolve(handle, "comm_alloc_windows"); + api.comm_get_local_window_base = + resolve(handle, "comm_get_local_window_base"); + api.comm_get_window_size = resolve(handle, "comm_get_window_size"); + api.comm_barrier = resolve(handle, "comm_barrier"); + api.comm_destroy = resolve(handle, "comm_destroy"); + return api.create_device_context && api.destroy_device_context && api.ensure_acl_ready_ctx && api.comm_init && + api.comm_alloc_windows && api.comm_get_local_window_base && api.comm_get_window_size && api.comm_barrier && + api.comm_destroy; +} + +// Exit codes for the rank child. Each stage gets a distinct code so the +// parent's waitpid surface pinpoints exactly which step broke. +constexpr int EXIT_DLERR = 10; +constexpr int EXIT_DEV_CTX = 15; +constexpr int EXIT_ACL_READY = 18; +constexpr int EXIT_STREAM = 19; +constexpr int EXIT_INIT = 20; +constexpr int EXIT_ALLOC = 30; +constexpr int EXIT_WINDOW_BASE = 40; +constexpr int EXIT_WINDOW_SIZE = 50; +// EXIT_CTX_{MEMCPY,FIELDS} gate the real purpose of this test: verifying that +// the CommContext returned by HCCL (MESH reinterpret_cast) or built by our +// RING parser actually contains the fields we expect at the offsets we +// expect. Failure here means our reverse-engineered CANN ABI disagrees with +// the live HCCL build — the CANN-coupling fragility this test is here for. +constexpr int EXIT_CTX_MEMCPY = 55; +constexpr int EXIT_CTX_FIELDS = 56; +constexpr int EXIT_BARRIER = 60; +constexpr int EXIT_DESTROY = 70; + +int run_rank(int rank, int nranks, int device_id, const char *rootinfo_path) { + // libhost_runtime.so is the subject under test — dlopen mirrors + // ChipWorker. libascendcl is linked in, so acl* is available directly. + void *host_handle = dlopen(PTO_HOST_RUNTIME_LIB_PATH, RTLD_NOW | RTLD_LOCAL); + if (host_handle == nullptr) { + fprintf(stderr, "[rank %d] dlopen host lib failed: %s\n", rank, dlerror()); + return EXIT_DLERR; + } + + HostRuntimeApi api{}; + if (!load_host_runtime_api(host_handle, api)) { + fprintf(stderr, "[rank %d] required symbols missing from libhost_runtime.so\n", rank); + dlclose(host_handle); + return EXIT_DLERR; + } + + // Caller step 1: stand up DeviceRunner to own ACL lifecycle. + void *dev_ctx = api.create_device_context(); + if (dev_ctx == nullptr) { + fprintf(stderr, "[rank %d] create_device_context returned null\n", rank); + dlclose(host_handle); + return EXIT_DEV_CTX; + } + + // Caller step 2: aclInit + aclrtSetDevice via DeviceRunner. + if (api.ensure_acl_ready_ctx(dev_ctx, device_id) != 0) { + fprintf(stderr, "[rank %d] ensure_acl_ready_ctx(%d) failed\n", rank, device_id); + api.destroy_device_context(dev_ctx); + dlclose(host_handle); + return EXIT_ACL_READY; + } + + // Caller step 3: caller creates its own stream; comm never touches it. + aclrtStream stream = nullptr; + aclError aRet = aclrtCreateStream(&stream); + if (aRet != ACL_SUCCESS || stream == nullptr) { + fprintf(stderr, "[rank %d] aclrtCreateStream failed: %d\n", rank, static_cast(aRet)); + api.destroy_device_context(dev_ctx); + dlclose(host_handle); + return EXIT_STREAM; + } + + // Caller step 4: drive comm_* against the injected stream. + int stage = 0; + int exit_code = 0; + CommHandle h = api.comm_init(rank, nranks, stream, rootinfo_path); + if (h == nullptr) { + stage = EXIT_INIT; + } else { + uint64_t device_ctx_ptr = 0; + if (api.comm_alloc_windows(h, 4096, &device_ctx_ptr) != 0 || device_ctx_ptr == 0) { + stage = EXIT_ALLOC; + } else { + uint64_t local_base = 0; + if (api.comm_get_local_window_base(h, &local_base) != 0 || local_base == 0) { + stage = EXIT_WINDOW_BASE; + } else { + size_t win_size = 0; + if (api.comm_get_window_size(h, &win_size) != 0 || win_size < 4096) { + stage = EXIT_WINDOW_SIZE; + } else { + // ABI guard: pull the CommContext that HCCL's MESH path + // reinterpret_casts back to us (or that our RING parser + // built field-by-field) and check every consumed field + // against the inputs we fed to comm_init / alloc_windows. + // + // This is the concrete check that our CANN-private ABI + // assumptions (field offsets, struct layout, windowsIn + // indexing by rank) still match the live HCCL build. A + // CANN upgrade that silently moves HcclOpResParam or the + // MESH context layout lands here. + CommContext host_ctx{}; + aclError mc_rc = aclrtMemcpy( + &host_ctx, sizeof(host_ctx), reinterpret_cast(device_ctx_ptr), sizeof(host_ctx), + ACL_MEMCPY_DEVICE_TO_HOST + ); + if (mc_rc != ACL_SUCCESS) { + fprintf( + stderr, "[rank %d] aclrtMemcpy(device_ctx) failed: %d\n", rank, static_cast(mc_rc) + ); + stage = EXIT_CTX_MEMCPY; + } else if (host_ctx.rankId != static_cast(rank) || + host_ctx.rankNum != static_cast(nranks) || + host_ctx.winSize != static_cast(win_size) || + host_ctx.windowsIn[rank] != local_base) { + fprintf( + stderr, + "[rank %d] CommContext field mismatch — CANN ABI drift?\n" + " got: rankId=%u rankNum=%u winSize=%lu windowsIn[%d]=0x%lx\n" + " expected: rankId=%d rankNum=%d winSize=%zu windowsIn[%d]=0x%lx\n", + rank, host_ctx.rankId, host_ctx.rankNum, static_cast(host_ctx.winSize), rank, + static_cast(host_ctx.windowsIn[rank]), rank, nranks, win_size, rank, + static_cast(local_base) + ); + stage = EXIT_CTX_FIELDS; + } else { + // Every peer's window GVA must be non-zero. A zero + // entry means RING parsing read the wrong offset or + // MESH didn't populate a peer slot. + for (int i = 0; i < nranks; ++i) { + if (host_ctx.windowsIn[i] == 0) { + fprintf( + stderr, "[rank %d] CommContext.windowsIn[%d] == 0 (peer GVA missing)\n", rank, i + ); + stage = EXIT_CTX_FIELDS; + break; + } + } + } + + if (stage == 0 && api.comm_barrier(h) != 0) { + stage = EXIT_BARRIER; + } + } + } + } + if (api.comm_destroy(h) != 0 && stage == 0) { + stage = EXIT_DESTROY; + } + } + exit_code = stage; + + // Caller step 5: cleanup in reverse order. destroy_device_context + // eventually drives DeviceRunner::finalize which calls aclrtResetDevice + // and aclFinalize, so we do not call them ourselves. + aclrtDestroyStream(stream); + api.destroy_device_context(dev_ctx); + dlclose(host_handle); + return exit_code; +} + +/// Read device ids allocated by CTest resource allocation. +/// +/// CTest sets CTEST_RESOURCE_GROUP_COUNT and per-group env vars when +/// --resource-spec-file is provided. With RESOURCE_GROUPS "npus:2", +/// there is one group (group 0) containing two NPU allocations: +/// CTEST_RESOURCE_GROUP_0_NPUS = "id:4,slots:1;id:5,slots:1" +/// +/// Returns the extracted device ids. +std::vector read_ctest_devices() { + std::vector ids; + const char *count_str = std::getenv("CTEST_RESOURCE_GROUP_COUNT"); + if (count_str == nullptr) return ids; + + int group_count = std::atoi(count_str); + for (int g = 0; g < group_count; ++g) { + std::string var = "CTEST_RESOURCE_GROUP_" + std::to_string(g) + "_NPUS"; + const char *val = std::getenv(var.c_str()); + if (val == nullptr) continue; + + // Parse "id:,slots:;id:,slots:;..." + std::string s(val); + size_t pos = 0; + while ((pos = s.find("id:", pos)) != std::string::npos) { + pos += 3; + ids.push_back(std::atoi(s.c_str() + pos)); + } + } + return ids; +} + +} // namespace + +class HcclCommTest : public ::testing::Test { +protected: + void SetUp() override { + // Path is baked in by tests/ut/cpp/CMakeLists.txt; the only way this + // can be wrong at test time is if someone ran ctest without first + // building the onboard runtime. + if (!std::filesystem::exists(PTO_HOST_RUNTIME_LIB_PATH)) { + GTEST_SKIP() << "libhost_runtime.so not built: " << PTO_HOST_RUNTIME_LIB_PATH + << "\n(build the a2a3 onboard tensormap_and_ringbuffer runtime first)"; + } + } +}; + +TEST_F(HcclCommTest, TwoRankInitAllocBarrierDestroy) { + constexpr int kNranks = 2; + auto devices = read_ctest_devices(); + ASSERT_GE(devices.size(), static_cast(kNranks)) + << "need " << kNranks << " NPU devices; run with --resource-spec-file"; + + const std::string rootinfo_path = "/tmp/pto_hccl_ut_rootinfo_" + std::to_string(getpid()) + ".bin"; + + std::vector pids; + pids.reserve(kNranks); + for (int rank = 0; rank < kNranks; ++rank) { + pid_t pid = fork(); + ASSERT_GE(pid, 0) << "fork failed: " << strerror(errno); + if (pid == 0) { + std::_Exit(run_rank(rank, kNranks, devices[rank], rootinfo_path.c_str())); + } + pids.push_back(pid); + } + + for (int rank = 0; rank < kNranks; ++rank) { + int status = 0; + pid_t waited = waitpid(pids[rank], &status, 0); + ASSERT_EQ(waited, pids[rank]); + ASSERT_TRUE(WIFEXITED(status)) << "rank " << rank << " did not exit normally (status=" << status << ")"; + EXPECT_EQ(WEXITSTATUS(status), 0) + << "rank " << rank << " failed at stage with exit code " << WEXITSTATUS(status) + << " (10=dlopen, 15=dev_ctx, 18=acl_ready, 19=stream, 20=init, 30=alloc, " + << "40=base, 50=size, 55=ctx_memcpy, 56=ctx_fields, 60=barrier, 70=destroy)"; + } + + std::error_code ec; + std::filesystem::remove(rootinfo_path, ec); +}