Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions examples/workers/l2/vector_add/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def build_chip_callable(platform: str) -> ChipCallable:
)


def _run(worker: Worker, chip_cid: int) -> None:
def _run(worker: Worker, chip_cid: int):
"""Allocate device memory, copy inputs, execute, copy outputs back, verify."""
# --- 1. Prepare host arrays ---
torch.manual_seed(42)
Expand Down Expand Up @@ -155,7 +155,8 @@ def _run(worker: Worker, chip_cid: int) -> None:
# --- 4. Run. CallConfig() defaults are fine for this kernel. ---
config = CallConfig()
print("[vector_add] running on device...")
worker.run(chip_cid, args, config)
timing = worker.run(chip_cid, args, config)
print(f"[vector_add] {timing}")

# --- 5. D2H copy back + verify ---
worker.copy_from(host_out.data_ptr(), dev_out, NBYTES)
Expand All @@ -169,6 +170,7 @@ def _run(worker: Worker, chip_cid: int) -> None:
print(f"[vector_add] max |host_out - expected| = {max_diff:.3e}")
assert torch.allclose(host_out, expected, rtol=1e-5, atol=1e-5)
print("[vector_add] golden check PASSED")
return timing


def run(platform: str, device_id: int) -> int:
Expand Down
115 changes: 115 additions & 0 deletions examples/workers/l2/vector_add/test_run_timing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright (c) PyPTO Contributors.
# This program is free software, you can redistribute it and/or modify it under the terms and conditions of
# CANN Open Software License Agreement Version 2.0 (the "License").
# Please refer to the License for details. You may not use this file except in compliance with the License.
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
# INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
# See LICENSE in the root of the software repository for the full text of the License.
# -----------------------------------------------------------------------------------------------------------
"""Hardware ST asserting Worker.run returns a usable RunTiming.

Reuses the vector_add example's kernel + ChipCallable build so this test
doesn't drag in its own kernel sources. The contract being verified:

* `worker.run(...)` returns a `RunTiming` instance (not None).
* `host_wall_us` is strictly positive — there's no way a real dispatch
took zero steady-clock time, so this would catch a regression where
the C ABI stopped writing the timing back to the out-param.
* `device_wall_us` is strictly positive on the default build, even
without `--enable-l2-swimlane`. The orch_summary write was decoupled
from the swimlane shared-region path so device timing is always
available when PTO2_PROFILING is compiled in (default).
* `host_wall_us > device_wall_us` — host wall must wrap the device
dispatch, so the host clock should always read longer.
"""

import pytest
from _task_interface import RunTiming # pyright: ignore[reportMissingImports]
from simpler.task_interface import CallConfig, ChipStorageTaskArgs, ContinuousTensor, DataType
from simpler.worker import Worker

from .main import N_COLS, N_ELEMS, N_ROWS, NBYTES, build_chip_callable


def _drive_one_run(platform: str, device_id: int, *, enable_l2_swimlane: bool = False):
import torch # noqa: PLC0415

worker = Worker(
level=2,
platform=platform,
runtime="tensormap_and_ringbuffer",
device_id=device_id,
)
chip_callable = build_chip_callable(platform)
chip_cid = worker.register(chip_callable)
worker.init()
try:
# Use deterministic inputs so the run never accidentally hits a
# degenerate kernel path that fails to publish a perf record.
host_a = torch.full((N_ROWS, N_COLS), 1.0, dtype=torch.float32)
host_b = torch.full((N_ROWS, N_COLS), 2.0, dtype=torch.float32)

dev_a = worker.malloc(NBYTES)
dev_b = worker.malloc(NBYTES)
dev_out = worker.malloc(NBYTES)
worker.copy_to(dev_a, host_a.data_ptr(), NBYTES)
worker.copy_to(dev_b, host_b.data_ptr(), NBYTES)

args = ChipStorageTaskArgs()
args.add_tensor(ContinuousTensor.make(dev_a, (N_ROWS, N_COLS), DataType.FLOAT32))
args.add_tensor(ContinuousTensor.make(dev_b, (N_ROWS, N_COLS), DataType.FLOAT32))
args.add_tensor(ContinuousTensor.make(dev_out, (N_ROWS, N_COLS), DataType.FLOAT32))

config = CallConfig()
config.enable_l2_swimlane = enable_l2_swimlane

timing = worker.run(chip_cid, args, config)

# Verify the output is sane (so we know the kernel actually ran and
# the timing isn't from an early-error path).
host_out = torch.zeros(N_ROWS, N_COLS, dtype=torch.float32)
worker.copy_from(host_out.data_ptr(), dev_out, NBYTES)
worker.free(dev_a)
worker.free(dev_b)
worker.free(dev_out)
assert torch.allclose(host_out, host_a + host_b, rtol=1e-5, atol=1e-5), (
f"vector_add kernel output diverged; max |a+b - out| = "
f"{float(torch.max(torch.abs(host_out - (host_a + host_b)))):.3e} "
f"(N_ELEMS={N_ELEMS})"
)
return timing
finally:
worker.close()


@pytest.mark.platforms(["a2a3sim", "a2a3"])
@pytest.mark.runtime("tensormap_and_ringbuffer")
@pytest.mark.device_count(1)
def test_worker_run_returns_run_timing(st_platform, st_device_ids):
timing = _drive_one_run(st_platform, int(st_device_ids[0]))

assert isinstance(timing, RunTiming), (
f"Worker.run must return a RunTiming (got {type(timing).__name__}); "
f"a None return means the ChipWorker Python wrapper dropped the C++ return value."
)
# Host wall is measured with steady_clock around the dispatch; even on
# sim it covers thread + IPC overhead so the only way to see 0 is a
# bug in the out-param plumbing.
assert timing.host_wall_us > 0.0, f"host_wall_us must be > 0, got {timing.host_wall_us}"
assert timing.host_wall_ns > 0, f"host_wall_ns must be > 0, got {timing.host_wall_ns}"
# device_wall must also be > 0 without --enable-l2-swimlane after the
# Phase B decoupling: orch_summary is written unconditionally when
# PTO2_PROFILING is on (default build). Hitting 0 here means either:
# - the AICPU's l2_perf_aicpu_write_orch_summary path regressed back
# under an is_l2_swimlane_enabled() gate, or
# - the host stopped reading the phase header after the run.
assert timing.device_wall_us > 0.0, (
f"device_wall_us must be > 0 on the default PTO2_PROFILING build, got {timing.device_wall_us}; "
f"regression in the orch_summary capture path."
)
assert timing.device_wall_ns > 0
# Host wall must wrap the device dispatch — anything else points at
# a steady-clock / cycle-conversion bug.
assert timing.host_wall_us >= timing.device_wall_us, (
f"host_wall_us ({timing.host_wall_us}) must wrap device_wall_us ({timing.device_wall_us})"
)
63 changes: 58 additions & 5 deletions python/bindings/task_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,57 @@ NB_MODULE(_task_interface, m) {
// one, change the other.
m.attr("DEFAULT_LOG_THRESHOLD") = 20; // V5 = Python INFO

// --- RunTiming ---
// Returned by ChipWorker.run_prepared* / Worker.run. Cycles → ns conversion
// happens on the platform side (frequency known there); units exposed to
// Python are µs as floats to match historical benchmark_rounds.sh output.
nb::class_<RunTiming>(m, "RunTiming")
.def(nb::init<>())
.def(
"__init__",
[](RunTiming *self, uint64_t host_wall_ns, uint64_t device_wall_ns) {
new (self) RunTiming{host_wall_ns, device_wall_ns};
},
nb::arg("host_wall_ns"), nb::arg("device_wall_ns") = 0,
"Construct with explicit ns values (used by the Python Worker.run "
"wrapper to surface host-side timing for L3+ DAGs)."
)
.def_prop_ro(
"host_wall_us",
[](const RunTiming &t) {
return t.host_wall_ns / 1000.0;
},
"Host steady-clock wall around the dispatch, in microseconds."
)
.def_prop_ro(
"device_wall_us",
[](const RunTiming &t) {
return t.device_wall_ns / 1000.0;
},
"On-NPU wall (orch end - orch start), in microseconds. Populated whenever the "
"runtime was built with PTO2_PROFILING (the default); independent of "
"enable_l2_swimlane after the orch_summary capture was decoupled from the "
"swimlane shared region. Zero only on a PTO2_PROFILING-off build."
)
.def_prop_ro(
"host_wall_ns",
[](const RunTiming &t) {
return t.host_wall_ns;
}
)
.def_prop_ro(
"device_wall_ns",
[](const RunTiming &t) {
return t.device_wall_ns;
}
)
.def("__repr__", [](const RunTiming &t) {
std::ostringstream os;
os << "RunTiming(host_wall_us=" << t.host_wall_ns / 1000.0
<< ", device_wall_us=" << t.device_wall_ns / 1000.0 << ")";
return os.str();
});

// --- ChipWorker ---
nb::class_<ChipWorker>(m, "_ChipWorker")
.def(nb::init<>())
Expand Down Expand Up @@ -685,19 +736,21 @@ NB_MODULE(_task_interface, m) {
.def(
"run",
[](ChipWorker &self, int32_t callable_id, ChipStorageTaskArgs &args, const CallConfig &config) {
self.run(callable_id, &args, config);
return self.run(callable_id, &args, config);
},
nb::arg("callable_id"), nb::arg("args"), nb::arg("config"),
"Launch a callable_id previously staged via prepare_callable."
"Launch a callable_id previously staged via prepare_callable. "
"Returns RunTiming with host/device wall."
)
.def(
"run",
[](ChipWorker &self, int32_t callable_id, TaskArgs &args, const CallConfig &config) {
TaskArgsView view = make_view(args);
self.run(callable_id, view, config);
return self.run(callable_id, view, config);
},
nb::arg("callable_id"), nb::arg("args"), nb::arg("config"),
"Launch a callable_id from a TaskArgs (used for in-process callers)."
"Launch a callable_id from a TaskArgs (used for in-process callers). "
"Returns RunTiming."
)
.def(
"run_prepared_from_blob",
Expand All @@ -710,7 +763,7 @@ NB_MODULE(_task_interface, m) {
// loops never re-implement the tensor/scalar layout in Python
// (where it has historically dropped fields like child_memory).
TaskArgsView view = read_blob(reinterpret_cast<const uint8_t *>(args_blob_ptr), blob_capacity);
self.run(callable_id, view, config);
return self.run(callable_id, view, config);
},
nb::arg("callable_id"), nb::arg("args_blob_ptr"), nb::arg("blob_capacity"), nb::arg("config"),
"Launch a callable_id from a raw mailbox-blob pointer + capacity "
Expand Down
4 changes: 3 additions & 1 deletion python/simpler/task_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,12 +521,14 @@ def run(self, callable_id, args, config=None, **kwargs):
args: ChipStorageTaskArgs for this invocation.
config: Optional CallConfig. If None, a default is created.
**kwargs: Overrides applied to config (e.g. block_dim=24).

Returns a :class:`RunTiming` with host + device wall.
"""
if config is None:
config = CallConfig()
for k, v in kwargs.items():
setattr(config, k, v)
self._impl.run(int(callable_id), args, config)
return self._impl.run(int(callable_id), args, config)

def unregister_callable(self, callable_id):
"""Drop prepared state for ``callable_id`` and release its orch SO share."""
Expand Down
62 changes: 38 additions & 24 deletions python/simpler/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def my_l4_orch(orch, args, config):
MAX_REGISTERED_CALLABLE_IDS,
ChipBootstrapChannel,
ChipBootstrapMailboxState,
RunTiming,
_mailbox_load_i32,
_mailbox_store_i32,
read_args_from_blob,
Expand Down Expand Up @@ -1444,7 +1445,7 @@ def copy_from(self, dst: int, src: int, size: int, worker_id: int = 0) -> None:
# run — uniform entry point
# ------------------------------------------------------------------

def run(self, callable, args=None, config=None) -> None:
def run(self, callable, args=None, config=None) -> RunTiming:
"""Execute one task (L2) or one DAG (L3+) synchronously.

Dispatch:
Expand All @@ -1455,35 +1456,48 @@ def run(self, callable, args=None, config=None) -> None:

``args`` : TaskArgs (optional)
``config``: CallConfig (optional, default-constructed if None)

Returns a :class:`RunTiming` with ``host_wall_us`` (Python wall-clock
around the dispatch) and ``device_wall_us`` (on-NPU orchestrator wall,
populated whenever the runtime was built with ``PTO2_PROFILING`` —
the default build has it on). For L3+ DAGs, ``host_wall_us`` covers
the whole orch fn and ``device_wall_us`` is unset (0) — per-task
device timings are not aggregated here.
"""
assert self._initialized, "Worker not initialized; call init() first"
cfg = config if config is not None else CallConfig()

if self.level == 2:
assert self._chip_worker is not None
self._chip_worker.run(int(callable), args, cfg)
else:
self._start_hierarchical()
assert self._orch is not None
assert self._worker is not None
# Drop any error stashed by a previous run() so this call starts
# clean. drain() rethrows on the way out; every successful run()
# leaves the error slot empty, but an unrelated caller may have
# poked it.
self._orch._clear_error()
self._orch._scope_begin()
try:
callable(self._orch, args, cfg)
finally:
# Always release scope refs and drain so ring slots aren't
# stranded when the orch fn raises mid-DAG. drain() also
# rethrows the first dispatch failure for this run — that
# is how child-task exceptions surface to the caller of
# Worker.run(). scope_end deliberately does NOT throw: if
# it did, released refs would be incomplete and drain
# would hang on in-flight tasks.
self._orch._scope_end()
self._orch._drain()
return self._chip_worker.run(int(callable), args, cfg)

self._start_hierarchical()
assert self._orch is not None
assert self._worker is not None
# Drop any error stashed by a previous run() so this call starts
# clean. drain() rethrows on the way out; every successful run()
# leaves the error slot empty, but an unrelated caller may have
# poked it.
self._orch._clear_error()
self._orch._scope_begin()
t_start = time.perf_counter_ns()
try:
callable(self._orch, args, cfg)
finally:
# Always release scope refs and drain so ring slots aren't
# stranded when the orch fn raises mid-DAG. drain() also
# rethrows the first dispatch failure for this run — that
# is how child-task exceptions surface to the caller of
# Worker.run(). scope_end deliberately does NOT throw: if
# it did, released refs would be incomplete and drain
# would hang on in-flight tasks.
self._orch._scope_end()
self._orch._drain()
# device_wall stays 0 for L3+: aggregating per-task device cycles
# across a DAG isn't implemented here (would need accumulation in the
# ring scheduler). Callers wanting per-task device wall should issue
# individual run calls.
return RunTiming(time.perf_counter_ns() - t_start, 0)

def prepare_callable(self, callable_id: int, callable) -> None:
"""L2 only: pre-stage a callable under ``callable_id`` (see
Expand Down
Loading
Loading