diff --git a/cuda_core/cuda/core/experimental/_device.pyx b/cuda_core/cuda/core/experimental/_device.pyx index bcba09f985..1db2adbf8d 100644 --- a/cuda_core/cuda/core/experimental/_device.pyx +++ b/cuda_core/cuda/core/experimental/_device.pyx @@ -28,6 +28,7 @@ from cuda.core.experimental._utils.cuda_utils import ( from cuda.core.experimental._stream cimport default_stream + # TODO: I prefer to type these as "cdef object" and avoid accessing them from within Python, # but it seems it is very convenient to expose them for testing purposes... _tls = threading.local() @@ -1273,7 +1274,7 @@ class Device: """ self._check_context_initialized() ctx = self._get_current_context() - return Event._init(self._id, ctx, options) + return Event._init(self._id, ctx, options, True) def allocate(self, size, stream: Optional[Stream] = None) -> Buffer: """Allocate device memory from a specified stream. diff --git a/cuda_core/cuda/core/experimental/_event.pxd b/cuda_core/cuda/core/experimental/_event.pxd index 0972063af3..1f586f18df 100644 --- a/cuda_core/cuda/core/experimental/_event.pxd +++ b/cuda_core/cuda/core/experimental/_event.pxd @@ -11,6 +11,8 @@ cdef class Event: cydriver.CUevent _handle bint _timing_disabled bint _busy_waited + bint _ipc_enabled + object _ipc_descriptor int _device_id object _ctx_handle diff --git a/cuda_core/cuda/core/experimental/_event.pyx b/cuda_core/cuda/core/experimental/_event.pyx index 962556597a..dd6ef0b06e 100644 --- a/cuda_core/cuda/core/experimental/_event.pyx +++ b/cuda_core/cuda/core/experimental/_event.pyx @@ -4,7 +4,9 @@ from __future__ import annotations +cimport cpython from libc.stdint cimport uintptr_t +from libc.string cimport memcpy from cuda.bindings cimport cydriver @@ -14,6 +16,7 @@ from cuda.core.experimental._utils.cuda_utils cimport ( ) from dataclasses import dataclass +import multiprocessing from typing import TYPE_CHECKING, Optional from cuda.core.experimental._context import Context @@ -40,7 +43,7 @@ cdef class EventOptions: has actually been completed. Otherwise, the CPU thread will busy-wait until the event has been completed. (Default to False) - support_ipc : bool, optional + ipc_enabled : bool, optional Event will be suitable for interprocess use. Note that enable_timing must be False. (Default to False) @@ -48,7 +51,7 @@ cdef class EventOptions: enable_timing: Optional[bool] = False busy_waited_sync: Optional[bool] = False - support_ipc: Optional[bool] = False + ipc_enabled: Optional[bool] = False cdef class Event: @@ -86,24 +89,35 @@ cdef class Event: raise RuntimeError("Event objects cannot be instantiated directly. Please use Stream APIs (record).") @classmethod - def _init(cls, device_id: int, ctx_handle: Context, options=None): + def _init(cls, device_id: int, ctx_handle: Context, options=None, is_free=False): cdef Event self = Event.__new__(cls) cdef EventOptions opts = check_or_create_options(EventOptions, options, "Event options") cdef unsigned int flags = 0x0 self._timing_disabled = False self._busy_waited = False + self._ipc_enabled = False + self._ipc_descriptor = None if not opts.enable_timing: flags |= cydriver.CUevent_flags.CU_EVENT_DISABLE_TIMING self._timing_disabled = True if opts.busy_waited_sync: flags |= cydriver.CUevent_flags.CU_EVENT_BLOCKING_SYNC self._busy_waited = True - if opts.support_ipc: - raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/103") + if opts.ipc_enabled: + if is_free: + raise TypeError( + "IPC-enabled events must be bound; use Stream.record for creation." + ) + flags |= cydriver.CUevent_flags.CU_EVENT_INTERPROCESS + self._ipc_enabled = True + if not self._timing_disabled: + raise TypeError("IPC-enabled events cannot use timing.") with nogil: HANDLE_RETURN(cydriver.cuEventCreate(&self._handle, flags)) self._device_id = device_id self._ctx_handle = ctx_handle + if opts.ipc_enabled: + self.get_ipc_descriptor() return self cpdef close(self): @@ -151,6 +165,40 @@ cdef class Event: raise CUDAError(err) raise RuntimeError(explanation) + def get_ipc_descriptor(self) -> IPCEventDescriptor: + """Export an event allocated for sharing between processes.""" + if self._ipc_descriptor is not None: + return self._ipc_descriptor + if not self.is_ipc_enabled: + raise RuntimeError("Event is not IPC-enabled") + cdef cydriver.CUipcEventHandle data + with nogil: + HANDLE_RETURN(cydriver.cuIpcGetEventHandle(&data, (self._handle))) + cdef bytes data_b = cpython.PyBytes_FromStringAndSize((data.reserved), sizeof(data.reserved)) + self._ipc_descriptor = IPCEventDescriptor._init(data_b, self._busy_waited) + return self._ipc_descriptor + + @classmethod + def from_ipc_descriptor(cls, ipc_descriptor: IPCEventDescriptor) -> Event: + """Import an event that was exported from another process.""" + cdef cydriver.CUipcEventHandle data + memcpy(data.reserved, (ipc_descriptor._reserved), sizeof(data.reserved)) + cdef Event self = Event.__new__(cls) + with nogil: + HANDLE_RETURN(cydriver.cuIpcOpenEventHandle(&self._handle, data)) + self._timing_disabled = True + self._busy_waited = ipc_descriptor._busy_waited + self._ipc_enabled = True + self._ipc_descriptor = ipc_descriptor + self._device_id = -1 # ?? + self._ctx_handle = None # ?? + return self + + @property + def is_ipc_enabled(self) -> bool: + """Return True if the event can be shared across process boundaries, otherwise False.""" + return self._ipc_enabled + @property def is_timing_disabled(self) -> bool: """Return True if the event does not record timing data, otherwise False.""" @@ -161,11 +209,6 @@ cdef class Event: """Return True if the event synchronization would keep the CPU busy-waiting, otherwise False.""" return self._busy_waited - @property - def is_ipc_supported(self) -> bool: - """Return True if this event can be used as an interprocess event, otherwise False.""" - raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/103") - def sync(self): """Synchronize until the event completes. @@ -212,12 +255,43 @@ cdef class Event: context is set current after a event is created. """ - - from cuda.core.experimental._device import Device # avoid circular import - - return Device(self._device_id) + if self._device_id >= 0: + from ._device import Device # avoid circular import + return Device(self._device_id) @property def context(self) -> Context: """Return the :obj:`~_context.Context` associated with this event.""" - return Context._from_ctx(self._ctx_handle, self._device_id) + if self._ctx_handle is not None and self._device_id >= 0: + return Context._from_ctx(self._ctx_handle, self._device_id) + + +cdef class IPCEventDescriptor: + """Serializable object describing an event that can be shared between processes.""" + + cdef: + bytes _reserved + bint _busy_waited + + def __init__(self, *arg, **kwargs): + raise RuntimeError("IPCEventDescriptor objects cannot be instantiated directly. Please use Event APIs.") + + @classmethod + def _init(cls, reserved: bytes, busy_waited: bint): + cdef IPCEventDescriptor self = IPCEventDescriptor.__new__(cls) + self._reserved = reserved + self._busy_waited = busy_waited + return self + + def __eq__(self, IPCEventDescriptor rhs): + # No need to check self._busy_waited. + return self._reserved == rhs._reserved + + def __reduce__(self): + return self._init, (self._reserved, self._busy_waited) + + +def _reduce_event(event): + return event.from_ipc_descriptor, (event.get_ipc_descriptor(),) + +multiprocessing.reduction.register(Event, _reduce_event) diff --git a/cuda_core/cuda/core/experimental/_memory.pyx b/cuda_core/cuda/core/experimental/_memory.pyx index 024ffa2aef..5037f59eb0 100644 --- a/cuda_core/cuda/core/experimental/_memory.pyx +++ b/cuda_core/cuda/core/experimental/_memory.pyx @@ -226,11 +226,11 @@ cdef class Buffer(_cyBuffer, MemoryResourceAttributes): if stream is None: # Note: match this behavior to DeviceMemoryResource.allocate() stream = default_stream() - cdef cydriver.CUmemPoolPtrExportData share_data - memcpy(share_data.reserved, (ipc_buffer._reserved), sizeof(share_data.reserved)) + cdef cydriver.CUmemPoolPtrExportData data + memcpy(data.reserved, (ipc_buffer._reserved), sizeof(data.reserved)) cdef cydriver.CUdeviceptr ptr with nogil: - HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._mempool_handle, &share_data)) + HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._mempool_handle, &data)) return Buffer._init(ptr, ipc_buffer.size, mr, stream) def copy_to(self, dst: Buffer = None, *, stream: Stream) -> Buffer: @@ -511,7 +511,7 @@ cdef class DeviceMemoryResourceOptions: (Default to 0) """ ipc_enabled : cython.bint = False - max_size : cython.int = 0 + max_size : cython.size_t = 0 # TODO: cythonize this? diff --git a/cuda_core/cuda/core/experimental/_stream.pyx b/cuda_core/cuda/core/experimental/_stream.pyx index e2a418ac27..82406c5598 100644 --- a/cuda_core/cuda/core/experimental/_stream.pyx +++ b/cuda_core/cuda/core/experimental/_stream.pyx @@ -260,7 +260,13 @@ cdef class Stream: # and CU_EVENT_RECORD_EXTERNAL, can be set in EventOptions. if event is None: self._get_device_and_context() - event = Event._init((self._device_id), (self._ctx_handle), options) + event = Event._init((self._device_id), (self._ctx_handle), options, False) + elif event.is_ipc_enabled: + raise TypeError( + "IPC-enabled events should not be re-recorded, instead create a " + "new event by supplying options." + ) + cdef cydriver.CUevent e = ((event))._handle with nogil: HANDLE_RETURN(cydriver.cuEventRecord(e, self._handle)) diff --git a/cuda_core/tests/helpers.py b/cuda_core/tests/helpers/__init__.py similarity index 96% rename from cuda_core/tests/helpers.py rename to cuda_core/tests/helpers/__init__.py index 10af3dcc22..e87122e649 100644 --- a/cuda_core/tests/helpers.py +++ b/cuda_core/tests/helpers/__init__.py @@ -22,7 +22,7 @@ import cuda_python_test_helpers except ImportError: # Import shared platform helpers for tests across repos - sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[2] / "cuda_python_test_helpers")) + sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[3] / "cuda_python_test_helpers")) import cuda_python_test_helpers diff --git a/cuda_core/tests/helpers/buffers.py b/cuda_core/tests/helpers/buffers.py new file mode 100644 index 0000000000..52220956d2 --- /dev/null +++ b/cuda_core/tests/helpers/buffers.py @@ -0,0 +1,122 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import ctypes +import sys + +from cuda.core.experimental import Buffer, MemoryResource +from cuda.core.experimental._utils.cuda_utils import driver, handle_return + +if sys.platform.startswith("win"): + libc = ctypes.CDLL("msvcrt.dll") +else: + libc = ctypes.CDLL("libc.so.6") + + +__all__ = ["DummyUnifiedMemoryResource", "PatternGen", "make_scratch_buffer", "compare_equal_buffers"] + + +class DummyUnifiedMemoryResource(MemoryResource): + def __init__(self, device): + self.device = device + + def allocate(self, size, stream=None) -> Buffer: + ptr = handle_return(driver.cuMemAllocManaged(size, driver.CUmemAttach_flags.CU_MEM_ATTACH_GLOBAL.value)) + return Buffer.from_handle(ptr=ptr, size=size, mr=self) + + def deallocate(self, ptr, size, stream=None): + handle_return(driver.cuMemFree(ptr)) + + @property + def is_device_accessible(self) -> bool: + return True + + @property + def is_host_accessible(self) -> bool: + return True + + @property + def device_id(self) -> int: + return self.device + + +class PatternGen: + """ + Provides methods to fill a target buffer with known test patterns and + verify the expected values. + + If a stream is provided, operations are synchronized with respect to that + stream. Otherwise, they are synchronized over the device. + + The test pattern is either a fixed value or a cyclic pattern generated from + an 8-bit seed. Only one of `value` or `seed` should be supplied. + + Distinct test patterns are stored in private buffers called pattern + buffers. Calls to `fill_buffer` copy from a pattern buffer to the target + buffer. Calls to `verify_buffer` copy from the target buffer to a scratch + buffer and then perform a comparison. + """ + + def __init__(self, device, size, stream=None): + self.device = device + self.size = size + self.stream = stream if stream is not None else device.create_stream() + self.sync_target = stream if stream is not None else device + self.pattern_buffers = {} + + def fill_buffer(self, buffer, seed=None, value=None): + """Fill a device buffer with a sequential test pattern using unified memory.""" + assert buffer.size == self.size + pattern_buffer = self._get_pattern_buffer(seed, value) + buffer.copy_from(pattern_buffer, stream=self.stream) + + def verify_buffer(self, buffer, seed=None, value=None): + """Verify the buffer contents against a sequential pattern.""" + assert buffer.size == self.size + scratch_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.size) + ptr_test = self._ptr(scratch_buffer) + pattern_buffer = self._get_pattern_buffer(seed, value) + ptr_expected = self._ptr(pattern_buffer) + scratch_buffer.copy_from(buffer, stream=self.stream) + self.sync_target.sync() + assert libc.memcmp(ptr_test, ptr_expected, self.size) == 0 + + @staticmethod + def _ptr(buffer): + """Get a pointer to the specified buffer.""" + return ctypes.cast(int(buffer.handle), ctypes.POINTER(ctypes.c_ubyte)) + + def _get_pattern_buffer(self, seed, value): + """Get a buffer holding the specified test pattern.""" + assert seed is None or value is None + if value is None: + seed = (0 if seed is None else seed) & 0xFF + key = seed, value + pattern_buffer = self.pattern_buffers.get(key, None) + if pattern_buffer is None: + if value is not None: + pattern_buffer = make_scratch_buffer(self.device, value, self.size) + else: + pattern_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.size) + ptr = self._ptr(pattern_buffer) + for i in range(self.size): + ptr[i] = (seed + i) & 0xFF + self.pattern_buffers[key] = pattern_buffer + return pattern_buffer + + +def make_scratch_buffer(device, value, nbytes): + """Create a unified memory buffer with the specified value.""" + buffer = DummyUnifiedMemoryResource(device).allocate(nbytes) + ptr = ctypes.cast(int(buffer.handle), ctypes.POINTER(ctypes.c_byte)) + ctypes.memset(ptr, value & 0xFF, nbytes) + return buffer + + +def compare_equal_buffers(buffer1, buffer2): + """Compare the contents of two host-accessible buffers for bitwise equality.""" + if buffer1.size != buffer2.size: + return False + ptr1 = ctypes.cast(int(buffer1.handle), ctypes.POINTER(ctypes.c_byte)) + ptr2 = ctypes.cast(int(buffer2.handle), ctypes.POINTER(ctypes.c_byte)) + return libc.memcmp(ptr1, ptr2, buffer1.size) == 0 diff --git a/cuda_core/tests/helpers/latch.py b/cuda_core/tests/helpers/latch.py new file mode 100644 index 0000000000..e1166973d8 --- /dev/null +++ b/cuda_core/tests/helpers/latch.py @@ -0,0 +1,63 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import ctypes + +import pytest +from cuda.core.experimental import ( + LaunchConfig, + LegacyPinnedMemoryResource, + Program, + ProgramOptions, + launch, +) + +import helpers + + +class LatchKernel: + """ + Manages a kernel that blocks stream progress until released. + """ + + def __init__(self, device): + if helpers.CUDA_INCLUDE_PATH is None: + pytest.skip("need CUDA header") + code = """ + #include + + extern "C" + __global__ void latch(int* val) { + cuda::atomic_ref signal{*val}; + while (true) { + if (signal.load(cuda::memory_order_relaxed)) { + break; + } + } + } + """ + program_options = ProgramOptions( + std="c++17", + arch=f"sm_{''.join(f'{i}' for i in device.compute_capability)}", + include_path=helpers.CCCL_INCLUDE_PATHS, + ) + prog = Program(code, code_type="c++", options=program_options) + mod = prog.compile(target_type="cubin") + self.kernel = mod.get_kernel("latch") + + mr = LegacyPinnedMemoryResource() + self.buffer = mr.allocate(4) + self.busy_wait_flag[0] = 0 + + def launch(self, stream): + """Launch the latch kernel, blocking stream progress via busy waiting.""" + config = LaunchConfig(grid=1, block=1) + launch(stream, config, self.kernel, int(self.buffer.handle)) + + def release(self): + """Release the latch, allowing stream progress.""" + self.busy_wait_flag[0] = 1 + + @property + def busy_wait_flag(self): + return ctypes.cast(int(self.buffer.handle), ctypes.POINTER(ctypes.c_int32)) diff --git a/cuda_core/tests/helpers/logging.py b/cuda_core/tests/helpers/logging.py new file mode 100644 index 0000000000..d5be94a280 --- /dev/null +++ b/cuda_core/tests/helpers/logging.py @@ -0,0 +1,50 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import time + + +class TimestampedLogger: + """ + A logger that prefixes each output with a timestamp, containing the elapsed + time since the logger was created. + + Example: + + import multiprocess as mp + import time + + def main(): + log = TimestampedLogger(prefix="parent: ") + log("begin") + process = mp.Process(target=child_main, args=(log,)) + process.start() + process.join() + log("done") + + def child_main(log): + log.prefix = " child: " + log("begin") + time.sleep(1) + log("done") + + if __name__ == "__main__": + main() + + Possible output: + + [ 0.003 ms] parent: begin + [ 819.464 ms] child: begin + [ 1819.666 ms] child: done + [ 1882.954 ms] parent: done + """ + + def __init__(self, prefix=None, start_time=None, enabled=True): + self.prefix = "" if prefix is None else prefix + self.start_time = start_time if start_time is not None else time.time_ns() + self.enabled = enabled + + def __call__(self, msg): + if self.enabled: + now = (time.time_ns() - self.start_time) * 1e-6 + print(f"[{now:>10.3f} ms] {self.prefix}{msg}") diff --git a/cuda_core/tests/memory_ipc/test_errors.py b/cuda_core/tests/memory_ipc/test_errors.py index d1a235603d..3e8265b39c 100644 --- a/cuda_core/tests/memory_ipc/test_errors.py +++ b/cuda_core/tests/memory_ipc/test_errors.py @@ -8,8 +8,6 @@ from cuda.core.experimental import Buffer, Device, DeviceMemoryResource, DeviceMemoryResourceOptions from cuda.core.experimental._utils.cuda_utils import CUDAError -from cuda_python_test_helpers import supports_ipc_mempool - CHILD_TIMEOUT_SEC = 20 NBYTES = 64 POOL_SIZE = 2097152 @@ -20,10 +18,6 @@ class ChildErrorHarness: PARENT_ACTION, CHILD_ACTION, and ASSERT (see below for examples).""" def test_main(self, ipc_device, ipc_memory_resource): - if not supports_ipc_mempool(ipc_device): - import pytest - - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") """Parent process that checks child errors.""" # Attach fixtures to this object for convenience. These can be accessed # from PARENT_ACTION. diff --git a/cuda_core/tests/memory_ipc/test_event_ipc.py b/cuda_core/tests/memory_ipc/test_event_ipc.py new file mode 100644 index 0000000000..fc1acb329d --- /dev/null +++ b/cuda_core/tests/memory_ipc/test_event_ipc.py @@ -0,0 +1,172 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import multiprocessing as mp + +import pytest +from cuda.core.experimental import Device, EventOptions +from helpers.buffers import compare_equal_buffers, make_scratch_buffer +from helpers.latch import LatchKernel +from helpers.logging import TimestampedLogger + +ENABLE_LOGGING = False # Set True for test debugging and development +CHILD_TIMEOUT_SEC = 20 +NBYTES = 64 + + +class TestEventIpc: + """Check the basic usage of IPC-enabled events with a latch kernel.""" + + def test_main(self, ipc_device, ipc_memory_resource): + log = TimestampedLogger(prefix="parent: ", enabled=ENABLE_LOGGING) + device = ipc_device + mr = ipc_memory_resource + stream1 = device.create_stream() + latch = LatchKernel(device) + + # Start the child process. + q_out, q_in = [mp.Queue() for _ in range(2)] + process = mp.Process(target=self.child_main, args=(log, q_out, q_in)) + process.start() + + # Prepare scratch buffers. + target = make_scratch_buffer(device, 0, NBYTES) + ones = make_scratch_buffer(device, 1, NBYTES) + twos = make_scratch_buffer(device, 2, NBYTES) + + # Allocate the buffer and send it to the child. + buffer = mr.allocate(NBYTES, stream=stream1) + log("sending buffer") + q_out.put(buffer) + + # Stream 1: + log("enqueuing latch kernel on stream1") + latch.launch(stream1) + log("enqueuing copy on stream1") + buffer.copy_from(ones, stream=stream1) + + ipc_event_options = EventOptions(ipc_enabled=True) + e = stream1.record(options=ipc_event_options) + log(f"recorded event ({hex(e.handle)})") + q_out.put(e) + log("sent event") + + # Wait on the child. + log("waiting for child") + none = q_in.get(timeout=CHILD_TIMEOUT_SEC) + assert none is None + + log("releasing stream1") + latch.release() + process.join() + assert process.exitcode == 0 + log("done") + + # Finish up. + target.copy_from(buffer, stream=stream1) + stream1.sync() + assert compare_equal_buffers(target, twos) + + def child_main(self, log, q_in, q_out): + log.prefix = " child: " + log("ready") + device = Device() + device.set_current() + stream2 = device.create_stream() + twos = make_scratch_buffer(device, 2, NBYTES) + buffer = q_in.get(timeout=CHILD_TIMEOUT_SEC) + log("got buffer") + e = q_in.get(timeout=CHILD_TIMEOUT_SEC) + log(f"got event ({hex(e.handle)})") + stream2.wait(e) + log("enqueuing copy on stream2") + buffer.copy_from(twos, stream=stream2) + log("signaling parent") + q_out.put(None) + log("waiting") + stream2.sync() + log("done") + + +def test_event_is_monadic(ipc_device): + """Check that IPC-enabled events are always bound and cannot be reset.""" + device = ipc_device + with pytest.raises(TypeError, match=r"^IPC-enabled events must be bound; use Stream.record for creation\.$"): + device.create_event({"ipc_enabled": True}) + + stream = device.create_stream() + e = stream.record(options={"ipc_enabled": True}) + with pytest.raises( + TypeError, + match=r"^IPC-enabled events should not be re-recorded, instead create a new event by supplying options\.$", + ): + stream.record(e) + + +@pytest.mark.parametrize( + "options", [{"ipc_enabled": True, "enable_timing": True}, EventOptions(ipc_enabled=True, enable_timing=True)] +) +def test_event_timing_disabled(ipc_device, options): + """Check that IPC-enabled events cannot be created with timing enabled.""" + device = ipc_device + stream = device.create_stream() + with pytest.raises(TypeError, match=r"^IPC-enabled events cannot use timing\.$"): + stream.record(options=options) + + +class TestIpcEventProperties: + """ + Check that event properties are properly set after transfer to a child + process. + """ + + @pytest.mark.parametrize("busy_waited_sync", [True, False]) + @pytest.mark.parametrize("use_options_cls", [True, False]) + @pytest.mark.parametrize("use_option_kw", [True, False]) + def test_main(self, ipc_device, busy_waited_sync, use_options_cls, use_option_kw): + device = ipc_device + stream = device.create_stream() + + # Start the child process. + q_out, q_in = [mp.Queue() for _ in range(2)] + process = mp.Process(target=self.child_main, args=(q_out, q_in)) + process.start() + + # Create an event and send it. + options = ( + EventOptions(ipc_enabled=True, busy_waited_sync=busy_waited_sync) + if use_options_cls + else {"ipc_enabled": True, "busy_waited_sync": busy_waited_sync} + ) + e = stream.record(options=options) if use_option_kw else stream.record(None, options) + q_out.put(e) + + # Check its properties. + props = q_in.get(timeout=CHILD_TIMEOUT_SEC) + assert props[0] == e.get_ipc_descriptor() + assert props[1] == e.is_ipc_enabled + assert props[2] == e.is_timing_disabled + assert props[3] == e.is_sync_busy_waited + assert props[4] is None + assert props[5] is None + + process.join() + assert process.exitcode == 0 + + def child_main(self, q_in, q_out): + device = Device() + device.set_current() + + # Get the event. + e = q_in.get(timeout=CHILD_TIMEOUT_SEC) + + # Send its properties. + props = ( + e.get_ipc_descriptor(), + e.is_ipc_enabled, + e.is_timing_disabled, + e.is_sync_busy_waited, + e.device, + e.context, + ) + q_out.put(props) diff --git a/cuda_core/tests/memory_ipc/test_memory_ipc.py b/cuda_core/tests/memory_ipc/test_memory_ipc.py index c980a7ad84..23a3e91b7f 100644 --- a/cuda_core/tests/memory_ipc/test_memory_ipc.py +++ b/cuda_core/tests/memory_ipc/test_memory_ipc.py @@ -4,9 +4,7 @@ import multiprocessing as mp from cuda.core.experimental import Buffer, DeviceMemoryResource -from utility import IPCBufferTestHelper - -from cuda_python_test_helpers import supports_ipc_mempool +from helpers.buffers import PatternGen CHILD_TIMEOUT_SEC = 20 NBYTES = 64 @@ -16,14 +14,11 @@ class TestIpcMempool: def test_main(self, ipc_device, ipc_memory_resource): - if not supports_ipc_mempool(ipc_device): - import pytest - - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") """Test IPC with memory pools.""" # Set up the IPC-enabled memory pool and share it. device = ipc_device mr = ipc_memory_resource + pgen = PatternGen(device, NBYTES) # Start the child process. queue = mp.Queue() @@ -32,8 +27,7 @@ def test_main(self, ipc_device, ipc_memory_resource): # Allocate and fill memory. buffer = mr.allocate(NBYTES) - helper = IPCBufferTestHelper(device, buffer) - helper.fill_buffer(flipped=False) + pgen.fill_buffer(buffer, seed=False) # Export the buffer via IPC. queue.put(buffer) @@ -43,24 +37,20 @@ def test_main(self, ipc_device, ipc_memory_resource): assert process.exitcode == 0 # Verify that the buffer was modified. - helper.verify_buffer(flipped=True) + pgen.verify_buffer(buffer, seed=True) buffer.close() def child_main(self, device, mr, queue): device.set_current() buffer = queue.get(timeout=CHILD_TIMEOUT_SEC) - helper = IPCBufferTestHelper(device, buffer) - helper.verify_buffer(flipped=False) - helper.fill_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.verify_buffer(buffer, seed=False) + pgen.fill_buffer(buffer, seed=True) buffer.close() class TestIPCMempoolMultiple: def test_main(self, ipc_device, ipc_memory_resource): - if not supports_ipc_mempool(ipc_device): - import pytest - - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") """Test IPC with memory pools using multiple processes.""" # Construct an IPC-enabled memory resource and share it with two children. device = ipc_device @@ -88,31 +78,29 @@ def test_main(self, ipc_device, ipc_memory_resource): assert p2.exitcode == 0 # Verify that the buffers were modified. - IPCBufferTestHelper(device, buffer1).verify_buffer(flipped=False) - IPCBufferTestHelper(device, buffer2).verify_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.verify_buffer(buffer1, seed=1) + pgen.verify_buffer(buffer2, seed=2) buffer1.close() buffer2.close() - def child_main(self, device, mr, idx, queue): + def child_main(self, device, mr, seed, queue): # Note: passing the mr registers it so that buffers can be passed # directly. device.set_current() buffer1 = queue.get(timeout=CHILD_TIMEOUT_SEC) buffer2 = queue.get(timeout=CHILD_TIMEOUT_SEC) - if idx == 1: - IPCBufferTestHelper(device, buffer1).fill_buffer(flipped=False) - elif idx == 2: - IPCBufferTestHelper(device, buffer2).fill_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + if seed == 1: + pgen.fill_buffer(buffer1, seed=1) + elif seed == 2: + pgen.fill_buffer(buffer2, seed=2) buffer1.close() buffer2.close() class TestIPCSharedAllocationHandleAndBufferDescriptors: def test_main(self, ipc_device, ipc_memory_resource): - if not supports_ipc_mempool(ipc_device): - import pytest - - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") """ Demonstrate that a memory pool allocation handle can be reused for IPC with multiple processes. Uses buffer descriptors. @@ -124,8 +112,8 @@ def test_main(self, ipc_device, ipc_memory_resource): # Start children. q1, q2 = (mp.Queue() for _ in range(2)) - p1 = mp.Process(target=self.child_main, args=(device, alloc_handle, 1, q1)) - p2 = mp.Process(target=self.child_main, args=(device, alloc_handle, 2, q2)) + p1 = mp.Process(target=self.child_main, args=(device, alloc_handle, False, q1)) + p2 = mp.Process(target=self.child_main, args=(device, alloc_handle, True, q2)) p1.start() p2.start() @@ -142,12 +130,13 @@ def test_main(self, ipc_device, ipc_memory_resource): assert p2.exitcode == 0 # Verify results. - IPCBufferTestHelper(device, buffer1).verify_buffer(starting_from=1) - IPCBufferTestHelper(device, buffer2).verify_buffer(starting_from=2) + pgen = PatternGen(device, NBYTES) + pgen.verify_buffer(buffer1, seed=False) + pgen.verify_buffer(buffer2, seed=True) buffer1.close() buffer2.close() - def child_main(self, device, alloc_handle, idx, queue): + def child_main(self, device, alloc_handle, seed, queue): """Fills a shared memory buffer.""" # In this case, the device needs to be set up (passing the mr does it # implicitly in other tests). @@ -155,7 +144,8 @@ def child_main(self, device, alloc_handle, idx, queue): mr = DeviceMemoryResource.from_allocation_handle(device, alloc_handle) buffer_descriptor = queue.get(timeout=CHILD_TIMEOUT_SEC) buffer = Buffer.from_ipc_descriptor(mr, buffer_descriptor) - IPCBufferTestHelper(device, buffer).fill_buffer(starting_from=idx) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=seed) buffer.close() @@ -171,8 +161,8 @@ def test_main(self, ipc_device, ipc_memory_resource): # Start children. q1, q2 = (mp.Queue() for _ in range(2)) - p1 = mp.Process(target=self.child_main, args=(device, alloc_handle, 1, q1)) - p2 = mp.Process(target=self.child_main, args=(device, alloc_handle, 2, q2)) + p1 = mp.Process(target=self.child_main, args=(device, alloc_handle, False, q1)) + p2 = mp.Process(target=self.child_main, args=(device, alloc_handle, True, q2)) p1.start() p2.start() @@ -189,12 +179,13 @@ def test_main(self, ipc_device, ipc_memory_resource): assert p2.exitcode == 0 # Verify results. - IPCBufferTestHelper(device, buffer1).verify_buffer(starting_from=1) - IPCBufferTestHelper(device, buffer2).verify_buffer(starting_from=2) + pgen = PatternGen(device, NBYTES) + pgen.verify_buffer(buffer1, seed=False) + pgen.verify_buffer(buffer2, seed=True) buffer1.close() buffer2.close() - def child_main(self, device, alloc_handle, idx, queue): + def child_main(self, device, alloc_handle, seed, queue): """Fills a shared memory buffer.""" device.set_current() @@ -203,5 +194,6 @@ def child_main(self, device, alloc_handle, idx, queue): # Now get buffers. buffer = queue.get(timeout=CHILD_TIMEOUT_SEC) - IPCBufferTestHelper(device, buffer).fill_buffer(starting_from=idx) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=seed) buffer.close() diff --git a/cuda_core/tests/memory_ipc/test_send_buffers.py b/cuda_core/tests/memory_ipc/test_send_buffers.py index 19685a94f7..75a4651925 100644 --- a/cuda_core/tests/memory_ipc/test_send_buffers.py +++ b/cuda_core/tests/memory_ipc/test_send_buffers.py @@ -6,9 +6,7 @@ import pytest from cuda.core.experimental import DeviceMemoryResource, DeviceMemoryResourceOptions -from utility import IPCBufferTestHelper - -from cuda_python_test_helpers import supports_ipc_mempool +from helpers.buffers import PatternGen CHILD_TIMEOUT_SEC = 20 NBYTES = 64 @@ -20,8 +18,6 @@ @pytest.mark.parametrize("nmrs", (1, NMRS)) def test_ipc_send_buffers(ipc_device, nmrs): """Test passing buffers sourced from multiple memory resources.""" - if not supports_ipc_mempool(ipc_device): - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") # Set up several IPC-enabled memory pools. device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) @@ -29,18 +25,12 @@ def test_ipc_send_buffers(ipc_device, nmrs): # Allocate and fill memory. buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))] + pgen = PatternGen(device, NBYTES) for buffer in buffers: - helper = IPCBufferTestHelper(device, buffer) - helper.fill_buffer(flipped=False) + pgen.fill_buffer(buffer, seed=False) # Start the child process. - process = mp.Process( - target=child_main, - args=( - device, - buffers, - ), - ) + process = mp.Process(target=child_main, args=(device, buffers)) process.start() # Wait for the child process. @@ -48,16 +38,16 @@ def test_ipc_send_buffers(ipc_device, nmrs): assert process.exitcode == 0 # Verify that the buffers were modified. + pgen = PatternGen(device, NBYTES) for buffer in buffers: - helper = IPCBufferTestHelper(device, buffer) - helper.verify_buffer(flipped=True) + pgen.verify_buffer(buffer, seed=True) buffer.close() def child_main(device, buffers): device.set_current() + pgen = PatternGen(device, NBYTES) for buffer in buffers: - helper = IPCBufferTestHelper(device, buffer) - helper.verify_buffer(flipped=False) - helper.fill_buffer(flipped=True) + pgen.verify_buffer(buffer, seed=False) + pgen.fill_buffer(buffer, seed=True) buffer.close() diff --git a/cuda_core/tests/memory_ipc/test_serialize.py b/cuda_core/tests/memory_ipc/test_serialize.py index 215538ae68..ceac50e502 100644 --- a/cuda_core/tests/memory_ipc/test_serialize.py +++ b/cuda_core/tests/memory_ipc/test_serialize.py @@ -6,7 +6,7 @@ import os from cuda.core.experimental import Buffer, Device, DeviceMemoryResource -from utility import IPCBufferTestHelper +from helpers.buffers import PatternGen CHILD_TIMEOUT_SEC = 20 NBYTES = 64 @@ -46,8 +46,9 @@ def test_main(self, ipc_device, ipc_memory_resource): assert process.exitcode == 0 # Confirm buffers were modified. - IPCBufferTestHelper(device, buffer1).verify_buffer(flipped=True) - IPCBufferTestHelper(device, buffer2).verify_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.verify_buffer(buffer1, seed=True) + pgen.verify_buffer(buffer2, seed=True) buffer1.close() buffer2.close() @@ -67,8 +68,9 @@ def child_main(self, conn): buffer2 = Buffer.from_ipc_descriptor(mr, buffer_desc) # by descriptor # Modify the buffers. - IPCBufferTestHelper(device, buffer1).fill_buffer(flipped=True) - IPCBufferTestHelper(device, buffer2).fill_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer1, seed=True) + pgen.fill_buffer(buffer2, seed=True) buffer1.close() buffer2.close() @@ -100,7 +102,8 @@ def test_main(self, ipc_device, ipc_memory_resource): assert process.exitcode == 0 # Confirm buffer was modified. - IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.verify_buffer(buffer, seed=True) buffer.close() def child_main(self, pipe, _): @@ -114,7 +117,8 @@ def child_main(self, pipe, _): # Buffer. buffer = pipe[0].get(timeout=CHILD_TIMEOUT_SEC) assert buffer.memory_resource.handle == mr.handle - IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=True) buffer.close() @@ -134,8 +138,8 @@ def test_object_passing(ipc_device, ipc_memory_resource): buffer = mr.allocate(NBYTES) buffer_desc = buffer.get_ipc_descriptor() - helper = IPCBufferTestHelper(device, buffer) - helper.fill_buffer(flipped=False) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=False) # Start the child process. process = mp.Process(target=child_main, args=(alloc_handle, mr, buffer_desc, buffer)) @@ -143,7 +147,7 @@ def test_object_passing(ipc_device, ipc_memory_resource): process.join(timeout=CHILD_TIMEOUT_SEC) assert process.exitcode == 0 - helper.verify_buffer(flipped=True) + pgen.verify_buffer(buffer, seed=True) buffer.close() @@ -151,40 +155,37 @@ def child_main(alloc_handle, mr1, buffer_desc, buffer1): device = Device() device.set_current() mr2 = DeviceMemoryResource.from_allocation_handle(device, alloc_handle) + pgen = PatternGen(device, NBYTES) # OK to build the buffer from either mr and the descriptor. # All buffer* objects point to the same memory. buffer2 = Buffer.from_ipc_descriptor(mr1, buffer_desc) buffer3 = Buffer.from_ipc_descriptor(mr2, buffer_desc) - helper1 = IPCBufferTestHelper(device, buffer1) - helper2 = IPCBufferTestHelper(device, buffer2) - helper3 = IPCBufferTestHelper(device, buffer3) - - helper1.verify_buffer(flipped=False) - helper2.verify_buffer(flipped=False) - helper3.verify_buffer(flipped=False) + pgen.verify_buffer(buffer1, seed=False) + pgen.verify_buffer(buffer2, seed=False) + pgen.verify_buffer(buffer3, seed=False) # Modify 1. - helper1.fill_buffer(flipped=True) + pgen.fill_buffer(buffer1, seed=True) - helper1.verify_buffer(flipped=True) - helper2.verify_buffer(flipped=True) - helper3.verify_buffer(flipped=True) + pgen.verify_buffer(buffer1, seed=True) + pgen.verify_buffer(buffer2, seed=True) + pgen.verify_buffer(buffer3, seed=True) # Modify 2. - helper2.fill_buffer(flipped=False) + pgen.fill_buffer(buffer2, seed=False) - helper1.verify_buffer(flipped=False) - helper2.verify_buffer(flipped=False) - helper3.verify_buffer(flipped=False) + pgen.verify_buffer(buffer1, seed=False) + pgen.verify_buffer(buffer2, seed=False) + pgen.verify_buffer(buffer3, seed=False) # Modify 3. - helper3.fill_buffer(flipped=True) + pgen.fill_buffer(buffer3, seed=True) - helper1.verify_buffer(flipped=True) - helper2.verify_buffer(flipped=True) - helper3.verify_buffer(flipped=True) + pgen.verify_buffer(buffer1, seed=True) + pgen.verify_buffer(buffer2, seed=True) + pgen.verify_buffer(buffer3, seed=True) # Close any one buffer. buffer1.close() diff --git a/cuda_core/tests/memory_ipc/test_workerpool.py b/cuda_core/tests/memory_ipc/test_workerpool.py index 6372b5668d..3f3f46cd27 100644 --- a/cuda_core/tests/memory_ipc/test_workerpool.py +++ b/cuda_core/tests/memory_ipc/test_workerpool.py @@ -7,9 +7,7 @@ import pytest from cuda.core.experimental import Buffer, Device, DeviceMemoryResource, DeviceMemoryResourceOptions -from utility import IPCBufferTestHelper - -from cuda_python_test_helpers import supports_ipc_mempool +from helpers.buffers import PatternGen CHILD_TIMEOUT_SEC = 20 NBYTES = 64 @@ -30,8 +28,6 @@ class TestIpcWorkerPool: @pytest.mark.parametrize("nmrs", (1, NMRS)) def test_main(self, ipc_device, nmrs): - if not supports_ipc_mempool(ipc_device): - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] @@ -40,14 +36,16 @@ def test_main(self, ipc_device, nmrs): with mp.Pool(NWORKERS) as pool: pool.map(self.process_buffer, buffers) + pgen = PatternGen(device, NBYTES) for buffer in buffers: - IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True) + pgen.verify_buffer(buffer, seed=True) buffer.close() def process_buffer(self, buffer): device = Device(buffer.memory_resource.device_id) device.set_current() - IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=True) buffer.close() @@ -66,8 +64,6 @@ def init_worker(mrs): @pytest.mark.parametrize("nmrs", (1, NMRS)) def test_main(self, ipc_device, nmrs): - if not supports_ipc_mempool(ipc_device): - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] @@ -79,8 +75,9 @@ def test_main(self, ipc_device, nmrs): [(mrs.index(buffer.memory_resource), buffer.get_ipc_descriptor()) for buffer in buffers], ) + pgen = PatternGen(device, NBYTES) for buffer in buffers: - IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True) + pgen.verify_buffer(buffer, seed=True) buffer.close() def process_buffer(self, mr_idx, buffer_desc): @@ -88,7 +85,8 @@ def process_buffer(self, mr_idx, buffer_desc): device = Device(mr.device_id) device.set_current() buffer = Buffer.from_ipc_descriptor(mr, buffer_desc) - IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=True) buffer.close() @@ -110,8 +108,6 @@ def init_worker(mrs): @pytest.mark.parametrize("nmrs", (1, NMRS)) def test_main(self, ipc_device, nmrs): - if not supports_ipc_mempool(ipc_device): - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] @@ -120,12 +116,14 @@ def test_main(self, ipc_device, nmrs): with mp.Pool(NWORKERS, initializer=self.init_worker, initargs=(mrs,)) as pool: pool.starmap(self.process_buffer, [(device, pickle.dumps(buffer)) for buffer in buffers]) + pgen = PatternGen(device, NBYTES) for buffer in buffers: - IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True) + pgen.verify_buffer(buffer, seed=True) buffer.close() def process_buffer(self, device, buffer_s): device.set_current() buffer = pickle.loads(buffer_s) # noqa: S301 - IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=True) buffer.close() diff --git a/cuda_core/tests/memory_ipc/utility.py b/cuda_core/tests/memory_ipc/utility.py deleted file mode 100644 index 7ce7752b6d..0000000000 --- a/cuda_core/tests/memory_ipc/utility.py +++ /dev/null @@ -1,65 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -import ctypes - -from cuda.core.experimental import Buffer, MemoryResource -from cuda.core.experimental._utils.cuda_utils import driver, handle_return - - -class DummyUnifiedMemoryResource(MemoryResource): - def __init__(self, device): - self.device = device - - def allocate(self, size, stream=None) -> Buffer: - ptr = handle_return(driver.cuMemAllocManaged(size, driver.CUmemAttach_flags.CU_MEM_ATTACH_GLOBAL.value)) - return Buffer.from_handle(ptr=ptr, size=size, mr=self) - - def deallocate(self, ptr, size, stream=None): - handle_return(driver.cuMemFree(ptr)) - - @property - def is_device_accessible(self) -> bool: - return True - - @property - def is_host_accessible(self) -> bool: - return True - - @property - def device_id(self) -> int: - return self.device - - -class IPCBufferTestHelper: - """A helper for manipulating memory buffers in IPC tests. - - Provides methods to fill a buffer with one of two test patterns and verify - the expected values. - """ - - def __init__(self, device, buffer): - self.device = device - self.buffer = buffer - self.scratch_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.buffer.size) - self.stream = device.create_stream() - - def fill_buffer(self, flipped=False, starting_from=0): - """Fill a device buffer with test pattern using unified memory.""" - ptr = ctypes.cast(int(self.scratch_buffer.handle), ctypes.POINTER(ctypes.c_byte)) - op = (lambda i: 255 - i) if flipped else (lambda i: i) - for i in range(self.buffer.size): - ptr[i] = ctypes.c_byte(op(starting_from + i)) - self.buffer.copy_from(self.scratch_buffer, stream=self.stream) - self.device.sync() - - def verify_buffer(self, flipped=False, starting_from=0): - """Verify the buffer contents.""" - self.scratch_buffer.copy_from(self.buffer, stream=self.stream) - self.device.sync() - ptr = ctypes.cast(int(self.scratch_buffer.handle), ctypes.POINTER(ctypes.c_byte)) - op = (lambda i: 255 - i) if flipped else (lambda i: i) - for i in range(self.buffer.size): - assert ctypes.c_byte(ptr[i]).value == ctypes.c_byte(op(starting_from + i)).value, ( - f"Buffer contains incorrect data at index {i}" - ) diff --git a/cuda_core/tests/test_event.py b/cuda_core/tests/test_event.py index 746121946d..36b696aa75 100644 --- a/cuda_core/tests/test_event.py +++ b/cuda_core/tests/test_event.py @@ -5,21 +5,14 @@ import time import cuda.core.experimental -import helpers -import numpy as np import pytest from cuda.core.experimental import ( Device, Event, EventOptions, - LaunchConfig, - LegacyPinnedMemoryResource, - Program, - ProgramOptions, - launch, ) +from helpers.latch import LatchKernel -from conftest import skipif_need_cuda_headers from cuda_python_test_helpers import IS_WSL @@ -121,60 +114,24 @@ def test_error_timing_recorded(): event3 - event2 -@skipif_need_cuda_headers # libcu++ -@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") def test_error_timing_incomplete(): device = Device() device.set_current() - - # This kernel is designed to busy loop until a signal is received - code = """ -#include - -extern "C" -__global__ void wait(int* val) { - cuda::atomic_ref signal{*val}; - while (true) { - if (signal.load(cuda::memory_order_relaxed)) { - break; - } - } -} -""" - - arch = "".join(f"{i}" for i in device.compute_capability) - program_options = ProgramOptions( - std="c++17", - arch=f"sm_{arch}", - include_path=helpers.CCCL_INCLUDE_PATHS, - ) - prog = Program(code, code_type="c++", options=program_options) - mod = prog.compile(target_type="cubin") - ker = mod.get_kernel("wait") - - mr = LegacyPinnedMemoryResource() - b = mr.allocate(4) - arr = np.from_dlpack(b).view(np.int32) - arr[0] = 0 - - config = LaunchConfig(grid=1, block=1) - ker_args = (arr.ctypes.data,) - + latch = LatchKernel(device) enabled = EventOptions(enable_timing=True) stream = device.create_stream() event1 = stream.record(options=enabled) - launch(stream, config, ker, *ker_args) + latch.launch(stream) event3 = stream.record(options=enabled) - # event3 will never complete because the stream is waiting on wait() to complete + # event3 will never complete because the latch has not been released with pytest.raises(RuntimeError, match="^One or both events have not completed."): event3 - event1 - arr[0] = 1 + latch.release() event3.sync() event3 - event1 # this should work - b.close() def test_event_device(init_cuda): diff --git a/cuda_core/tests/test_helpers.py b/cuda_core/tests/test_helpers.py new file mode 100644 index 0000000000..25d0c26a54 --- /dev/null +++ b/cuda_core/tests/test_helpers.py @@ -0,0 +1,70 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE + +import time + +import pytest +from cuda.core.experimental import Device +from helpers.buffers import PatternGen, compare_equal_buffers, make_scratch_buffer +from helpers.latch import LatchKernel +from helpers.logging import TimestampedLogger + +ENABLE_LOGGING = False # Set True for test debugging and development +NBYTES = 64 + + +def test_latchkernel(): + """Test LatchKernel.""" + log = TimestampedLogger() + log("begin") + device = Device() + device.set_current() + stream = device.create_stream() + target = make_scratch_buffer(device, 0, NBYTES) + zeros = make_scratch_buffer(device, 0, NBYTES) + ones = make_scratch_buffer(device, 1, NBYTES) + latch = LatchKernel(device) + log("launching latch kernel") + latch.launch(stream) + log("launching copy (0->1) kernel") + target.copy_from(ones, stream=stream) + log("going to sleep") + time.sleep(1) + log("checking target == 0") + assert compare_equal_buffers(target, zeros) + log("releasing latch and syncing") + latch.release() + stream.sync() + log("checking target == 1") + assert compare_equal_buffers(target, ones) + log("done") + + +def test_patterngen_seeds(): + """Test PatternGen with seed argument.""" + device = Device() + device.set_current() + buffer = make_scratch_buffer(device, 0, NBYTES) + + # All seeds are pairwise different. + pgen = PatternGen(device, NBYTES) + for i in range(256): + pgen.fill_buffer(buffer, seed=i) + pgen.verify_buffer(buffer, seed=i) + for j in range(i + 1, 256): + with pytest.raises(AssertionError): + pgen.verify_buffer(buffer, seed=j) + + +def test_patterngen_values(): + """Test PatternGen with value argument, also compare_equal_buffers.""" + device = Device() + device.set_current() + ones = make_scratch_buffer(device, 1, NBYTES) + twos = make_scratch_buffer(device, 2, NBYTES) + assert compare_equal_buffers(ones, ones) + assert not compare_equal_buffers(ones, twos) + pgen = PatternGen(device, NBYTES) + pgen.verify_buffer(ones, value=1) + pgen.verify_buffer(twos, value=2) diff --git a/cuda_core/tests/test_memory.py b/cuda_core/tests/test_memory.py index 904997f116..0052136c4c 100644 --- a/cuda_core/tests/test_memory.py +++ b/cuda_core/tests/test_memory.py @@ -27,6 +27,7 @@ from cuda.core.experimental._memory import DLDeviceType, IPCBufferDescriptor from cuda.core.experimental._utils.cuda_utils import handle_return from cuda.core.experimental.utils import StridedMemoryView +from helpers.buffers import DummyUnifiedMemoryResource from cuda_python_test_helpers import IS_WSL, supports_ipc_mempool @@ -95,30 +96,6 @@ def device_id(self) -> int: raise RuntimeError("the pinned memory resource is not bound to any GPU") -class DummyUnifiedMemoryResource(MemoryResource): - def __init__(self, device): - self.device = device - - def allocate(self, size, stream=None) -> Buffer: - ptr = handle_return(driver.cuMemAllocManaged(size, driver.CUmemAttach_flags.CU_MEM_ATTACH_GLOBAL.value)) - return Buffer.from_handle(ptr=ptr, size=size, mr=self) - - def deallocate(self, ptr, size, stream=None): - handle_return(driver.cuMemFree(ptr)) - - @property - def is_device_accessible(self) -> bool: - return True - - @property - def is_host_accessible(self) -> bool: - return True - - @property - def device_id(self) -> int: - return 0 - - class DummyPinnedMemoryResource(MemoryResource): def __init__(self, device): self.device = device diff --git a/cuda_core/tests/test_stream.py b/cuda_core/tests/test_stream.py index 505cbbf22a..da4c66d548 100644 --- a/cuda_core/tests/test_stream.py +++ b/cuda_core/tests/test_stream.py @@ -49,12 +49,6 @@ def test_stream_record(init_cuda): assert isinstance(event, Event) -def test_stream_record_invalid_event(init_cuda): - stream = Device().create_stream(options=StreamOptions()) - with pytest.raises(TypeError): - stream.record(event="invalid_event") - - def test_stream_wait_event(init_cuda): s1 = Device().create_stream() s2 = Device().create_stream()