diff --git a/cuda_core/cuda/core/experimental/_device.pyx b/cuda_core/cuda/core/experimental/_device.pyx index 1db2adbf8d..bcba09f985 100644 --- a/cuda_core/cuda/core/experimental/_device.pyx +++ b/cuda_core/cuda/core/experimental/_device.pyx @@ -28,7 +28,6 @@ from cuda.core.experimental._utils.cuda_utils import ( from cuda.core.experimental._stream cimport default_stream - # TODO: I prefer to type these as "cdef object" and avoid accessing them from within Python, # but it seems it is very convenient to expose them for testing purposes... _tls = threading.local() @@ -1274,7 +1273,7 @@ class Device: """ self._check_context_initialized() ctx = self._get_current_context() - return Event._init(self._id, ctx, options, True) + return Event._init(self._id, ctx, options) def allocate(self, size, stream: Optional[Stream] = None) -> Buffer: """Allocate device memory from a specified stream. diff --git a/cuda_core/cuda/core/experimental/_event.pxd b/cuda_core/cuda/core/experimental/_event.pxd index 1f586f18df..0972063af3 100644 --- a/cuda_core/cuda/core/experimental/_event.pxd +++ b/cuda_core/cuda/core/experimental/_event.pxd @@ -11,8 +11,6 @@ cdef class Event: cydriver.CUevent _handle bint _timing_disabled bint _busy_waited - bint _ipc_enabled - object _ipc_descriptor int _device_id object _ctx_handle diff --git a/cuda_core/cuda/core/experimental/_event.pyx b/cuda_core/cuda/core/experimental/_event.pyx index dd6ef0b06e..962556597a 100644 --- a/cuda_core/cuda/core/experimental/_event.pyx +++ b/cuda_core/cuda/core/experimental/_event.pyx @@ -4,9 +4,7 @@ from __future__ import annotations -cimport cpython from libc.stdint cimport uintptr_t -from libc.string cimport memcpy from cuda.bindings cimport cydriver @@ -16,7 +14,6 @@ from cuda.core.experimental._utils.cuda_utils cimport ( ) from dataclasses import dataclass -import multiprocessing from typing import TYPE_CHECKING, Optional from cuda.core.experimental._context import Context @@ -43,7 +40,7 @@ cdef class EventOptions: has actually been completed. Otherwise, the CPU thread will busy-wait until the event has been completed. (Default to False) - ipc_enabled : bool, optional + support_ipc : bool, optional Event will be suitable for interprocess use. Note that enable_timing must be False. (Default to False) @@ -51,7 +48,7 @@ cdef class EventOptions: enable_timing: Optional[bool] = False busy_waited_sync: Optional[bool] = False - ipc_enabled: Optional[bool] = False + support_ipc: Optional[bool] = False cdef class Event: @@ -89,35 +86,24 @@ cdef class Event: raise RuntimeError("Event objects cannot be instantiated directly. Please use Stream APIs (record).") @classmethod - def _init(cls, device_id: int, ctx_handle: Context, options=None, is_free=False): + def _init(cls, device_id: int, ctx_handle: Context, options=None): cdef Event self = Event.__new__(cls) cdef EventOptions opts = check_or_create_options(EventOptions, options, "Event options") cdef unsigned int flags = 0x0 self._timing_disabled = False self._busy_waited = False - self._ipc_enabled = False - self._ipc_descriptor = None if not opts.enable_timing: flags |= cydriver.CUevent_flags.CU_EVENT_DISABLE_TIMING self._timing_disabled = True if opts.busy_waited_sync: flags |= cydriver.CUevent_flags.CU_EVENT_BLOCKING_SYNC self._busy_waited = True - if opts.ipc_enabled: - if is_free: - raise TypeError( - "IPC-enabled events must be bound; use Stream.record for creation." - ) - flags |= cydriver.CUevent_flags.CU_EVENT_INTERPROCESS - self._ipc_enabled = True - if not self._timing_disabled: - raise TypeError("IPC-enabled events cannot use timing.") + if opts.support_ipc: + raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/103") with nogil: HANDLE_RETURN(cydriver.cuEventCreate(&self._handle, flags)) self._device_id = device_id self._ctx_handle = ctx_handle - if opts.ipc_enabled: - self.get_ipc_descriptor() return self cpdef close(self): @@ -165,40 +151,6 @@ cdef class Event: raise CUDAError(err) raise RuntimeError(explanation) - def get_ipc_descriptor(self) -> IPCEventDescriptor: - """Export an event allocated for sharing between processes.""" - if self._ipc_descriptor is not None: - return self._ipc_descriptor - if not self.is_ipc_enabled: - raise RuntimeError("Event is not IPC-enabled") - cdef cydriver.CUipcEventHandle data - with nogil: - HANDLE_RETURN(cydriver.cuIpcGetEventHandle(&data, (self._handle))) - cdef bytes data_b = cpython.PyBytes_FromStringAndSize((data.reserved), sizeof(data.reserved)) - self._ipc_descriptor = IPCEventDescriptor._init(data_b, self._busy_waited) - return self._ipc_descriptor - - @classmethod - def from_ipc_descriptor(cls, ipc_descriptor: IPCEventDescriptor) -> Event: - """Import an event that was exported from another process.""" - cdef cydriver.CUipcEventHandle data - memcpy(data.reserved, (ipc_descriptor._reserved), sizeof(data.reserved)) - cdef Event self = Event.__new__(cls) - with nogil: - HANDLE_RETURN(cydriver.cuIpcOpenEventHandle(&self._handle, data)) - self._timing_disabled = True - self._busy_waited = ipc_descriptor._busy_waited - self._ipc_enabled = True - self._ipc_descriptor = ipc_descriptor - self._device_id = -1 # ?? - self._ctx_handle = None # ?? - return self - - @property - def is_ipc_enabled(self) -> bool: - """Return True if the event can be shared across process boundaries, otherwise False.""" - return self._ipc_enabled - @property def is_timing_disabled(self) -> bool: """Return True if the event does not record timing data, otherwise False.""" @@ -209,6 +161,11 @@ cdef class Event: """Return True if the event synchronization would keep the CPU busy-waiting, otherwise False.""" return self._busy_waited + @property + def is_ipc_supported(self) -> bool: + """Return True if this event can be used as an interprocess event, otherwise False.""" + raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/103") + def sync(self): """Synchronize until the event completes. @@ -255,43 +212,12 @@ cdef class Event: context is set current after a event is created. """ - if self._device_id >= 0: - from ._device import Device # avoid circular import - return Device(self._device_id) + + from cuda.core.experimental._device import Device # avoid circular import + + return Device(self._device_id) @property def context(self) -> Context: """Return the :obj:`~_context.Context` associated with this event.""" - if self._ctx_handle is not None and self._device_id >= 0: - return Context._from_ctx(self._ctx_handle, self._device_id) - - -cdef class IPCEventDescriptor: - """Serializable object describing an event that can be shared between processes.""" - - cdef: - bytes _reserved - bint _busy_waited - - def __init__(self, *arg, **kwargs): - raise RuntimeError("IPCEventDescriptor objects cannot be instantiated directly. Please use Event APIs.") - - @classmethod - def _init(cls, reserved: bytes, busy_waited: bint): - cdef IPCEventDescriptor self = IPCEventDescriptor.__new__(cls) - self._reserved = reserved - self._busy_waited = busy_waited - return self - - def __eq__(self, IPCEventDescriptor rhs): - # No need to check self._busy_waited. - return self._reserved == rhs._reserved - - def __reduce__(self): - return self._init, (self._reserved, self._busy_waited) - - -def _reduce_event(event): - return event.from_ipc_descriptor, (event.get_ipc_descriptor(),) - -multiprocessing.reduction.register(Event, _reduce_event) + return Context._from_ctx(self._ctx_handle, self._device_id) diff --git a/cuda_core/cuda/core/experimental/_memory.pyx b/cuda_core/cuda/core/experimental/_memory.pyx index 5037f59eb0..024ffa2aef 100644 --- a/cuda_core/cuda/core/experimental/_memory.pyx +++ b/cuda_core/cuda/core/experimental/_memory.pyx @@ -226,11 +226,11 @@ cdef class Buffer(_cyBuffer, MemoryResourceAttributes): if stream is None: # Note: match this behavior to DeviceMemoryResource.allocate() stream = default_stream() - cdef cydriver.CUmemPoolPtrExportData data - memcpy(data.reserved, (ipc_buffer._reserved), sizeof(data.reserved)) + cdef cydriver.CUmemPoolPtrExportData share_data + memcpy(share_data.reserved, (ipc_buffer._reserved), sizeof(share_data.reserved)) cdef cydriver.CUdeviceptr ptr with nogil: - HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._mempool_handle, &data)) + HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._mempool_handle, &share_data)) return Buffer._init(ptr, ipc_buffer.size, mr, stream) def copy_to(self, dst: Buffer = None, *, stream: Stream) -> Buffer: @@ -511,7 +511,7 @@ cdef class DeviceMemoryResourceOptions: (Default to 0) """ ipc_enabled : cython.bint = False - max_size : cython.size_t = 0 + max_size : cython.int = 0 # TODO: cythonize this? diff --git a/cuda_core/cuda/core/experimental/_stream.pyx b/cuda_core/cuda/core/experimental/_stream.pyx index 82406c5598..e2a418ac27 100644 --- a/cuda_core/cuda/core/experimental/_stream.pyx +++ b/cuda_core/cuda/core/experimental/_stream.pyx @@ -260,13 +260,7 @@ cdef class Stream: # and CU_EVENT_RECORD_EXTERNAL, can be set in EventOptions. if event is None: self._get_device_and_context() - event = Event._init((self._device_id), (self._ctx_handle), options, False) - elif event.is_ipc_enabled: - raise TypeError( - "IPC-enabled events should not be re-recorded, instead create a " - "new event by supplying options." - ) - + event = Event._init((self._device_id), (self._ctx_handle), options) cdef cydriver.CUevent e = ((event))._handle with nogil: HANDLE_RETURN(cydriver.cuEventRecord(e, self._handle)) diff --git a/cuda_core/tests/helpers/__init__.py b/cuda_core/tests/helpers.py similarity index 96% rename from cuda_core/tests/helpers/__init__.py rename to cuda_core/tests/helpers.py index e87122e649..10af3dcc22 100644 --- a/cuda_core/tests/helpers/__init__.py +++ b/cuda_core/tests/helpers.py @@ -22,7 +22,7 @@ import cuda_python_test_helpers except ImportError: # Import shared platform helpers for tests across repos - sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[3] / "cuda_python_test_helpers")) + sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[2] / "cuda_python_test_helpers")) import cuda_python_test_helpers diff --git a/cuda_core/tests/helpers/buffers.py b/cuda_core/tests/helpers/buffers.py deleted file mode 100644 index 52220956d2..0000000000 --- a/cuda_core/tests/helpers/buffers.py +++ /dev/null @@ -1,122 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -import ctypes -import sys - -from cuda.core.experimental import Buffer, MemoryResource -from cuda.core.experimental._utils.cuda_utils import driver, handle_return - -if sys.platform.startswith("win"): - libc = ctypes.CDLL("msvcrt.dll") -else: - libc = ctypes.CDLL("libc.so.6") - - -__all__ = ["DummyUnifiedMemoryResource", "PatternGen", "make_scratch_buffer", "compare_equal_buffers"] - - -class DummyUnifiedMemoryResource(MemoryResource): - def __init__(self, device): - self.device = device - - def allocate(self, size, stream=None) -> Buffer: - ptr = handle_return(driver.cuMemAllocManaged(size, driver.CUmemAttach_flags.CU_MEM_ATTACH_GLOBAL.value)) - return Buffer.from_handle(ptr=ptr, size=size, mr=self) - - def deallocate(self, ptr, size, stream=None): - handle_return(driver.cuMemFree(ptr)) - - @property - def is_device_accessible(self) -> bool: - return True - - @property - def is_host_accessible(self) -> bool: - return True - - @property - def device_id(self) -> int: - return self.device - - -class PatternGen: - """ - Provides methods to fill a target buffer with known test patterns and - verify the expected values. - - If a stream is provided, operations are synchronized with respect to that - stream. Otherwise, they are synchronized over the device. - - The test pattern is either a fixed value or a cyclic pattern generated from - an 8-bit seed. Only one of `value` or `seed` should be supplied. - - Distinct test patterns are stored in private buffers called pattern - buffers. Calls to `fill_buffer` copy from a pattern buffer to the target - buffer. Calls to `verify_buffer` copy from the target buffer to a scratch - buffer and then perform a comparison. - """ - - def __init__(self, device, size, stream=None): - self.device = device - self.size = size - self.stream = stream if stream is not None else device.create_stream() - self.sync_target = stream if stream is not None else device - self.pattern_buffers = {} - - def fill_buffer(self, buffer, seed=None, value=None): - """Fill a device buffer with a sequential test pattern using unified memory.""" - assert buffer.size == self.size - pattern_buffer = self._get_pattern_buffer(seed, value) - buffer.copy_from(pattern_buffer, stream=self.stream) - - def verify_buffer(self, buffer, seed=None, value=None): - """Verify the buffer contents against a sequential pattern.""" - assert buffer.size == self.size - scratch_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.size) - ptr_test = self._ptr(scratch_buffer) - pattern_buffer = self._get_pattern_buffer(seed, value) - ptr_expected = self._ptr(pattern_buffer) - scratch_buffer.copy_from(buffer, stream=self.stream) - self.sync_target.sync() - assert libc.memcmp(ptr_test, ptr_expected, self.size) == 0 - - @staticmethod - def _ptr(buffer): - """Get a pointer to the specified buffer.""" - return ctypes.cast(int(buffer.handle), ctypes.POINTER(ctypes.c_ubyte)) - - def _get_pattern_buffer(self, seed, value): - """Get a buffer holding the specified test pattern.""" - assert seed is None or value is None - if value is None: - seed = (0 if seed is None else seed) & 0xFF - key = seed, value - pattern_buffer = self.pattern_buffers.get(key, None) - if pattern_buffer is None: - if value is not None: - pattern_buffer = make_scratch_buffer(self.device, value, self.size) - else: - pattern_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.size) - ptr = self._ptr(pattern_buffer) - for i in range(self.size): - ptr[i] = (seed + i) & 0xFF - self.pattern_buffers[key] = pattern_buffer - return pattern_buffer - - -def make_scratch_buffer(device, value, nbytes): - """Create a unified memory buffer with the specified value.""" - buffer = DummyUnifiedMemoryResource(device).allocate(nbytes) - ptr = ctypes.cast(int(buffer.handle), ctypes.POINTER(ctypes.c_byte)) - ctypes.memset(ptr, value & 0xFF, nbytes) - return buffer - - -def compare_equal_buffers(buffer1, buffer2): - """Compare the contents of two host-accessible buffers for bitwise equality.""" - if buffer1.size != buffer2.size: - return False - ptr1 = ctypes.cast(int(buffer1.handle), ctypes.POINTER(ctypes.c_byte)) - ptr2 = ctypes.cast(int(buffer2.handle), ctypes.POINTER(ctypes.c_byte)) - return libc.memcmp(ptr1, ptr2, buffer1.size) == 0 diff --git a/cuda_core/tests/helpers/latch.py b/cuda_core/tests/helpers/latch.py deleted file mode 100644 index e1166973d8..0000000000 --- a/cuda_core/tests/helpers/latch.py +++ /dev/null @@ -1,63 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -import ctypes - -import pytest -from cuda.core.experimental import ( - LaunchConfig, - LegacyPinnedMemoryResource, - Program, - ProgramOptions, - launch, -) - -import helpers - - -class LatchKernel: - """ - Manages a kernel that blocks stream progress until released. - """ - - def __init__(self, device): - if helpers.CUDA_INCLUDE_PATH is None: - pytest.skip("need CUDA header") - code = """ - #include - - extern "C" - __global__ void latch(int* val) { - cuda::atomic_ref signal{*val}; - while (true) { - if (signal.load(cuda::memory_order_relaxed)) { - break; - } - } - } - """ - program_options = ProgramOptions( - std="c++17", - arch=f"sm_{''.join(f'{i}' for i in device.compute_capability)}", - include_path=helpers.CCCL_INCLUDE_PATHS, - ) - prog = Program(code, code_type="c++", options=program_options) - mod = prog.compile(target_type="cubin") - self.kernel = mod.get_kernel("latch") - - mr = LegacyPinnedMemoryResource() - self.buffer = mr.allocate(4) - self.busy_wait_flag[0] = 0 - - def launch(self, stream): - """Launch the latch kernel, blocking stream progress via busy waiting.""" - config = LaunchConfig(grid=1, block=1) - launch(stream, config, self.kernel, int(self.buffer.handle)) - - def release(self): - """Release the latch, allowing stream progress.""" - self.busy_wait_flag[0] = 1 - - @property - def busy_wait_flag(self): - return ctypes.cast(int(self.buffer.handle), ctypes.POINTER(ctypes.c_int32)) diff --git a/cuda_core/tests/helpers/logging.py b/cuda_core/tests/helpers/logging.py deleted file mode 100644 index d5be94a280..0000000000 --- a/cuda_core/tests/helpers/logging.py +++ /dev/null @@ -1,50 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -import time - - -class TimestampedLogger: - """ - A logger that prefixes each output with a timestamp, containing the elapsed - time since the logger was created. - - Example: - - import multiprocess as mp - import time - - def main(): - log = TimestampedLogger(prefix="parent: ") - log("begin") - process = mp.Process(target=child_main, args=(log,)) - process.start() - process.join() - log("done") - - def child_main(log): - log.prefix = " child: " - log("begin") - time.sleep(1) - log("done") - - if __name__ == "__main__": - main() - - Possible output: - - [ 0.003 ms] parent: begin - [ 819.464 ms] child: begin - [ 1819.666 ms] child: done - [ 1882.954 ms] parent: done - """ - - def __init__(self, prefix=None, start_time=None, enabled=True): - self.prefix = "" if prefix is None else prefix - self.start_time = start_time if start_time is not None else time.time_ns() - self.enabled = enabled - - def __call__(self, msg): - if self.enabled: - now = (time.time_ns() - self.start_time) * 1e-6 - print(f"[{now:>10.3f} ms] {self.prefix}{msg}") diff --git a/cuda_core/tests/memory_ipc/test_errors.py b/cuda_core/tests/memory_ipc/test_errors.py index 3e8265b39c..d1a235603d 100644 --- a/cuda_core/tests/memory_ipc/test_errors.py +++ b/cuda_core/tests/memory_ipc/test_errors.py @@ -8,6 +8,8 @@ from cuda.core.experimental import Buffer, Device, DeviceMemoryResource, DeviceMemoryResourceOptions from cuda.core.experimental._utils.cuda_utils import CUDAError +from cuda_python_test_helpers import supports_ipc_mempool + CHILD_TIMEOUT_SEC = 20 NBYTES = 64 POOL_SIZE = 2097152 @@ -18,6 +20,10 @@ class ChildErrorHarness: PARENT_ACTION, CHILD_ACTION, and ASSERT (see below for examples).""" def test_main(self, ipc_device, ipc_memory_resource): + if not supports_ipc_mempool(ipc_device): + import pytest + + pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") """Parent process that checks child errors.""" # Attach fixtures to this object for convenience. These can be accessed # from PARENT_ACTION. diff --git a/cuda_core/tests/memory_ipc/test_event_ipc.py b/cuda_core/tests/memory_ipc/test_event_ipc.py deleted file mode 100644 index fc1acb329d..0000000000 --- a/cuda_core/tests/memory_ipc/test_event_ipc.py +++ /dev/null @@ -1,172 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -import multiprocessing as mp - -import pytest -from cuda.core.experimental import Device, EventOptions -from helpers.buffers import compare_equal_buffers, make_scratch_buffer -from helpers.latch import LatchKernel -from helpers.logging import TimestampedLogger - -ENABLE_LOGGING = False # Set True for test debugging and development -CHILD_TIMEOUT_SEC = 20 -NBYTES = 64 - - -class TestEventIpc: - """Check the basic usage of IPC-enabled events with a latch kernel.""" - - def test_main(self, ipc_device, ipc_memory_resource): - log = TimestampedLogger(prefix="parent: ", enabled=ENABLE_LOGGING) - device = ipc_device - mr = ipc_memory_resource - stream1 = device.create_stream() - latch = LatchKernel(device) - - # Start the child process. - q_out, q_in = [mp.Queue() for _ in range(2)] - process = mp.Process(target=self.child_main, args=(log, q_out, q_in)) - process.start() - - # Prepare scratch buffers. - target = make_scratch_buffer(device, 0, NBYTES) - ones = make_scratch_buffer(device, 1, NBYTES) - twos = make_scratch_buffer(device, 2, NBYTES) - - # Allocate the buffer and send it to the child. - buffer = mr.allocate(NBYTES, stream=stream1) - log("sending buffer") - q_out.put(buffer) - - # Stream 1: - log("enqueuing latch kernel on stream1") - latch.launch(stream1) - log("enqueuing copy on stream1") - buffer.copy_from(ones, stream=stream1) - - ipc_event_options = EventOptions(ipc_enabled=True) - e = stream1.record(options=ipc_event_options) - log(f"recorded event ({hex(e.handle)})") - q_out.put(e) - log("sent event") - - # Wait on the child. - log("waiting for child") - none = q_in.get(timeout=CHILD_TIMEOUT_SEC) - assert none is None - - log("releasing stream1") - latch.release() - process.join() - assert process.exitcode == 0 - log("done") - - # Finish up. - target.copy_from(buffer, stream=stream1) - stream1.sync() - assert compare_equal_buffers(target, twos) - - def child_main(self, log, q_in, q_out): - log.prefix = " child: " - log("ready") - device = Device() - device.set_current() - stream2 = device.create_stream() - twos = make_scratch_buffer(device, 2, NBYTES) - buffer = q_in.get(timeout=CHILD_TIMEOUT_SEC) - log("got buffer") - e = q_in.get(timeout=CHILD_TIMEOUT_SEC) - log(f"got event ({hex(e.handle)})") - stream2.wait(e) - log("enqueuing copy on stream2") - buffer.copy_from(twos, stream=stream2) - log("signaling parent") - q_out.put(None) - log("waiting") - stream2.sync() - log("done") - - -def test_event_is_monadic(ipc_device): - """Check that IPC-enabled events are always bound and cannot be reset.""" - device = ipc_device - with pytest.raises(TypeError, match=r"^IPC-enabled events must be bound; use Stream.record for creation\.$"): - device.create_event({"ipc_enabled": True}) - - stream = device.create_stream() - e = stream.record(options={"ipc_enabled": True}) - with pytest.raises( - TypeError, - match=r"^IPC-enabled events should not be re-recorded, instead create a new event by supplying options\.$", - ): - stream.record(e) - - -@pytest.mark.parametrize( - "options", [{"ipc_enabled": True, "enable_timing": True}, EventOptions(ipc_enabled=True, enable_timing=True)] -) -def test_event_timing_disabled(ipc_device, options): - """Check that IPC-enabled events cannot be created with timing enabled.""" - device = ipc_device - stream = device.create_stream() - with pytest.raises(TypeError, match=r"^IPC-enabled events cannot use timing\.$"): - stream.record(options=options) - - -class TestIpcEventProperties: - """ - Check that event properties are properly set after transfer to a child - process. - """ - - @pytest.mark.parametrize("busy_waited_sync", [True, False]) - @pytest.mark.parametrize("use_options_cls", [True, False]) - @pytest.mark.parametrize("use_option_kw", [True, False]) - def test_main(self, ipc_device, busy_waited_sync, use_options_cls, use_option_kw): - device = ipc_device - stream = device.create_stream() - - # Start the child process. - q_out, q_in = [mp.Queue() for _ in range(2)] - process = mp.Process(target=self.child_main, args=(q_out, q_in)) - process.start() - - # Create an event and send it. - options = ( - EventOptions(ipc_enabled=True, busy_waited_sync=busy_waited_sync) - if use_options_cls - else {"ipc_enabled": True, "busy_waited_sync": busy_waited_sync} - ) - e = stream.record(options=options) if use_option_kw else stream.record(None, options) - q_out.put(e) - - # Check its properties. - props = q_in.get(timeout=CHILD_TIMEOUT_SEC) - assert props[0] == e.get_ipc_descriptor() - assert props[1] == e.is_ipc_enabled - assert props[2] == e.is_timing_disabled - assert props[3] == e.is_sync_busy_waited - assert props[4] is None - assert props[5] is None - - process.join() - assert process.exitcode == 0 - - def child_main(self, q_in, q_out): - device = Device() - device.set_current() - - # Get the event. - e = q_in.get(timeout=CHILD_TIMEOUT_SEC) - - # Send its properties. - props = ( - e.get_ipc_descriptor(), - e.is_ipc_enabled, - e.is_timing_disabled, - e.is_sync_busy_waited, - e.device, - e.context, - ) - q_out.put(props) diff --git a/cuda_core/tests/memory_ipc/test_memory_ipc.py b/cuda_core/tests/memory_ipc/test_memory_ipc.py index 23a3e91b7f..c980a7ad84 100644 --- a/cuda_core/tests/memory_ipc/test_memory_ipc.py +++ b/cuda_core/tests/memory_ipc/test_memory_ipc.py @@ -4,7 +4,9 @@ import multiprocessing as mp from cuda.core.experimental import Buffer, DeviceMemoryResource -from helpers.buffers import PatternGen +from utility import IPCBufferTestHelper + +from cuda_python_test_helpers import supports_ipc_mempool CHILD_TIMEOUT_SEC = 20 NBYTES = 64 @@ -14,11 +16,14 @@ class TestIpcMempool: def test_main(self, ipc_device, ipc_memory_resource): + if not supports_ipc_mempool(ipc_device): + import pytest + + pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") """Test IPC with memory pools.""" # Set up the IPC-enabled memory pool and share it. device = ipc_device mr = ipc_memory_resource - pgen = PatternGen(device, NBYTES) # Start the child process. queue = mp.Queue() @@ -27,7 +32,8 @@ def test_main(self, ipc_device, ipc_memory_resource): # Allocate and fill memory. buffer = mr.allocate(NBYTES) - pgen.fill_buffer(buffer, seed=False) + helper = IPCBufferTestHelper(device, buffer) + helper.fill_buffer(flipped=False) # Export the buffer via IPC. queue.put(buffer) @@ -37,20 +43,24 @@ def test_main(self, ipc_device, ipc_memory_resource): assert process.exitcode == 0 # Verify that the buffer was modified. - pgen.verify_buffer(buffer, seed=True) + helper.verify_buffer(flipped=True) buffer.close() def child_main(self, device, mr, queue): device.set_current() buffer = queue.get(timeout=CHILD_TIMEOUT_SEC) - pgen = PatternGen(device, NBYTES) - pgen.verify_buffer(buffer, seed=False) - pgen.fill_buffer(buffer, seed=True) + helper = IPCBufferTestHelper(device, buffer) + helper.verify_buffer(flipped=False) + helper.fill_buffer(flipped=True) buffer.close() class TestIPCMempoolMultiple: def test_main(self, ipc_device, ipc_memory_resource): + if not supports_ipc_mempool(ipc_device): + import pytest + + pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") """Test IPC with memory pools using multiple processes.""" # Construct an IPC-enabled memory resource and share it with two children. device = ipc_device @@ -78,29 +88,31 @@ def test_main(self, ipc_device, ipc_memory_resource): assert p2.exitcode == 0 # Verify that the buffers were modified. - pgen = PatternGen(device, NBYTES) - pgen.verify_buffer(buffer1, seed=1) - pgen.verify_buffer(buffer2, seed=2) + IPCBufferTestHelper(device, buffer1).verify_buffer(flipped=False) + IPCBufferTestHelper(device, buffer2).verify_buffer(flipped=True) buffer1.close() buffer2.close() - def child_main(self, device, mr, seed, queue): + def child_main(self, device, mr, idx, queue): # Note: passing the mr registers it so that buffers can be passed # directly. device.set_current() buffer1 = queue.get(timeout=CHILD_TIMEOUT_SEC) buffer2 = queue.get(timeout=CHILD_TIMEOUT_SEC) - pgen = PatternGen(device, NBYTES) - if seed == 1: - pgen.fill_buffer(buffer1, seed=1) - elif seed == 2: - pgen.fill_buffer(buffer2, seed=2) + if idx == 1: + IPCBufferTestHelper(device, buffer1).fill_buffer(flipped=False) + elif idx == 2: + IPCBufferTestHelper(device, buffer2).fill_buffer(flipped=True) buffer1.close() buffer2.close() class TestIPCSharedAllocationHandleAndBufferDescriptors: def test_main(self, ipc_device, ipc_memory_resource): + if not supports_ipc_mempool(ipc_device): + import pytest + + pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") """ Demonstrate that a memory pool allocation handle can be reused for IPC with multiple processes. Uses buffer descriptors. @@ -112,8 +124,8 @@ def test_main(self, ipc_device, ipc_memory_resource): # Start children. q1, q2 = (mp.Queue() for _ in range(2)) - p1 = mp.Process(target=self.child_main, args=(device, alloc_handle, False, q1)) - p2 = mp.Process(target=self.child_main, args=(device, alloc_handle, True, q2)) + p1 = mp.Process(target=self.child_main, args=(device, alloc_handle, 1, q1)) + p2 = mp.Process(target=self.child_main, args=(device, alloc_handle, 2, q2)) p1.start() p2.start() @@ -130,13 +142,12 @@ def test_main(self, ipc_device, ipc_memory_resource): assert p2.exitcode == 0 # Verify results. - pgen = PatternGen(device, NBYTES) - pgen.verify_buffer(buffer1, seed=False) - pgen.verify_buffer(buffer2, seed=True) + IPCBufferTestHelper(device, buffer1).verify_buffer(starting_from=1) + IPCBufferTestHelper(device, buffer2).verify_buffer(starting_from=2) buffer1.close() buffer2.close() - def child_main(self, device, alloc_handle, seed, queue): + def child_main(self, device, alloc_handle, idx, queue): """Fills a shared memory buffer.""" # In this case, the device needs to be set up (passing the mr does it # implicitly in other tests). @@ -144,8 +155,7 @@ def child_main(self, device, alloc_handle, seed, queue): mr = DeviceMemoryResource.from_allocation_handle(device, alloc_handle) buffer_descriptor = queue.get(timeout=CHILD_TIMEOUT_SEC) buffer = Buffer.from_ipc_descriptor(mr, buffer_descriptor) - pgen = PatternGen(device, NBYTES) - pgen.fill_buffer(buffer, seed=seed) + IPCBufferTestHelper(device, buffer).fill_buffer(starting_from=idx) buffer.close() @@ -161,8 +171,8 @@ def test_main(self, ipc_device, ipc_memory_resource): # Start children. q1, q2 = (mp.Queue() for _ in range(2)) - p1 = mp.Process(target=self.child_main, args=(device, alloc_handle, False, q1)) - p2 = mp.Process(target=self.child_main, args=(device, alloc_handle, True, q2)) + p1 = mp.Process(target=self.child_main, args=(device, alloc_handle, 1, q1)) + p2 = mp.Process(target=self.child_main, args=(device, alloc_handle, 2, q2)) p1.start() p2.start() @@ -179,13 +189,12 @@ def test_main(self, ipc_device, ipc_memory_resource): assert p2.exitcode == 0 # Verify results. - pgen = PatternGen(device, NBYTES) - pgen.verify_buffer(buffer1, seed=False) - pgen.verify_buffer(buffer2, seed=True) + IPCBufferTestHelper(device, buffer1).verify_buffer(starting_from=1) + IPCBufferTestHelper(device, buffer2).verify_buffer(starting_from=2) buffer1.close() buffer2.close() - def child_main(self, device, alloc_handle, seed, queue): + def child_main(self, device, alloc_handle, idx, queue): """Fills a shared memory buffer.""" device.set_current() @@ -194,6 +203,5 @@ def child_main(self, device, alloc_handle, seed, queue): # Now get buffers. buffer = queue.get(timeout=CHILD_TIMEOUT_SEC) - pgen = PatternGen(device, NBYTES) - pgen.fill_buffer(buffer, seed=seed) + IPCBufferTestHelper(device, buffer).fill_buffer(starting_from=idx) buffer.close() diff --git a/cuda_core/tests/memory_ipc/test_send_buffers.py b/cuda_core/tests/memory_ipc/test_send_buffers.py index 75a4651925..19685a94f7 100644 --- a/cuda_core/tests/memory_ipc/test_send_buffers.py +++ b/cuda_core/tests/memory_ipc/test_send_buffers.py @@ -6,7 +6,9 @@ import pytest from cuda.core.experimental import DeviceMemoryResource, DeviceMemoryResourceOptions -from helpers.buffers import PatternGen +from utility import IPCBufferTestHelper + +from cuda_python_test_helpers import supports_ipc_mempool CHILD_TIMEOUT_SEC = 20 NBYTES = 64 @@ -18,6 +20,8 @@ @pytest.mark.parametrize("nmrs", (1, NMRS)) def test_ipc_send_buffers(ipc_device, nmrs): """Test passing buffers sourced from multiple memory resources.""" + if not supports_ipc_mempool(ipc_device): + pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") # Set up several IPC-enabled memory pools. device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) @@ -25,12 +29,18 @@ def test_ipc_send_buffers(ipc_device, nmrs): # Allocate and fill memory. buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))] - pgen = PatternGen(device, NBYTES) for buffer in buffers: - pgen.fill_buffer(buffer, seed=False) + helper = IPCBufferTestHelper(device, buffer) + helper.fill_buffer(flipped=False) # Start the child process. - process = mp.Process(target=child_main, args=(device, buffers)) + process = mp.Process( + target=child_main, + args=( + device, + buffers, + ), + ) process.start() # Wait for the child process. @@ -38,16 +48,16 @@ def test_ipc_send_buffers(ipc_device, nmrs): assert process.exitcode == 0 # Verify that the buffers were modified. - pgen = PatternGen(device, NBYTES) for buffer in buffers: - pgen.verify_buffer(buffer, seed=True) + helper = IPCBufferTestHelper(device, buffer) + helper.verify_buffer(flipped=True) buffer.close() def child_main(device, buffers): device.set_current() - pgen = PatternGen(device, NBYTES) for buffer in buffers: - pgen.verify_buffer(buffer, seed=False) - pgen.fill_buffer(buffer, seed=True) + helper = IPCBufferTestHelper(device, buffer) + helper.verify_buffer(flipped=False) + helper.fill_buffer(flipped=True) buffer.close() diff --git a/cuda_core/tests/memory_ipc/test_serialize.py b/cuda_core/tests/memory_ipc/test_serialize.py index ceac50e502..215538ae68 100644 --- a/cuda_core/tests/memory_ipc/test_serialize.py +++ b/cuda_core/tests/memory_ipc/test_serialize.py @@ -6,7 +6,7 @@ import os from cuda.core.experimental import Buffer, Device, DeviceMemoryResource -from helpers.buffers import PatternGen +from utility import IPCBufferTestHelper CHILD_TIMEOUT_SEC = 20 NBYTES = 64 @@ -46,9 +46,8 @@ def test_main(self, ipc_device, ipc_memory_resource): assert process.exitcode == 0 # Confirm buffers were modified. - pgen = PatternGen(device, NBYTES) - pgen.verify_buffer(buffer1, seed=True) - pgen.verify_buffer(buffer2, seed=True) + IPCBufferTestHelper(device, buffer1).verify_buffer(flipped=True) + IPCBufferTestHelper(device, buffer2).verify_buffer(flipped=True) buffer1.close() buffer2.close() @@ -68,9 +67,8 @@ def child_main(self, conn): buffer2 = Buffer.from_ipc_descriptor(mr, buffer_desc) # by descriptor # Modify the buffers. - pgen = PatternGen(device, NBYTES) - pgen.fill_buffer(buffer1, seed=True) - pgen.fill_buffer(buffer2, seed=True) + IPCBufferTestHelper(device, buffer1).fill_buffer(flipped=True) + IPCBufferTestHelper(device, buffer2).fill_buffer(flipped=True) buffer1.close() buffer2.close() @@ -102,8 +100,7 @@ def test_main(self, ipc_device, ipc_memory_resource): assert process.exitcode == 0 # Confirm buffer was modified. - pgen = PatternGen(device, NBYTES) - pgen.verify_buffer(buffer, seed=True) + IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True) buffer.close() def child_main(self, pipe, _): @@ -117,8 +114,7 @@ def child_main(self, pipe, _): # Buffer. buffer = pipe[0].get(timeout=CHILD_TIMEOUT_SEC) assert buffer.memory_resource.handle == mr.handle - pgen = PatternGen(device, NBYTES) - pgen.fill_buffer(buffer, seed=True) + IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True) buffer.close() @@ -138,8 +134,8 @@ def test_object_passing(ipc_device, ipc_memory_resource): buffer = mr.allocate(NBYTES) buffer_desc = buffer.get_ipc_descriptor() - pgen = PatternGen(device, NBYTES) - pgen.fill_buffer(buffer, seed=False) + helper = IPCBufferTestHelper(device, buffer) + helper.fill_buffer(flipped=False) # Start the child process. process = mp.Process(target=child_main, args=(alloc_handle, mr, buffer_desc, buffer)) @@ -147,7 +143,7 @@ def test_object_passing(ipc_device, ipc_memory_resource): process.join(timeout=CHILD_TIMEOUT_SEC) assert process.exitcode == 0 - pgen.verify_buffer(buffer, seed=True) + helper.verify_buffer(flipped=True) buffer.close() @@ -155,37 +151,40 @@ def child_main(alloc_handle, mr1, buffer_desc, buffer1): device = Device() device.set_current() mr2 = DeviceMemoryResource.from_allocation_handle(device, alloc_handle) - pgen = PatternGen(device, NBYTES) # OK to build the buffer from either mr and the descriptor. # All buffer* objects point to the same memory. buffer2 = Buffer.from_ipc_descriptor(mr1, buffer_desc) buffer3 = Buffer.from_ipc_descriptor(mr2, buffer_desc) - pgen.verify_buffer(buffer1, seed=False) - pgen.verify_buffer(buffer2, seed=False) - pgen.verify_buffer(buffer3, seed=False) + helper1 = IPCBufferTestHelper(device, buffer1) + helper2 = IPCBufferTestHelper(device, buffer2) + helper3 = IPCBufferTestHelper(device, buffer3) + + helper1.verify_buffer(flipped=False) + helper2.verify_buffer(flipped=False) + helper3.verify_buffer(flipped=False) # Modify 1. - pgen.fill_buffer(buffer1, seed=True) + helper1.fill_buffer(flipped=True) - pgen.verify_buffer(buffer1, seed=True) - pgen.verify_buffer(buffer2, seed=True) - pgen.verify_buffer(buffer3, seed=True) + helper1.verify_buffer(flipped=True) + helper2.verify_buffer(flipped=True) + helper3.verify_buffer(flipped=True) # Modify 2. - pgen.fill_buffer(buffer2, seed=False) + helper2.fill_buffer(flipped=False) - pgen.verify_buffer(buffer1, seed=False) - pgen.verify_buffer(buffer2, seed=False) - pgen.verify_buffer(buffer3, seed=False) + helper1.verify_buffer(flipped=False) + helper2.verify_buffer(flipped=False) + helper3.verify_buffer(flipped=False) # Modify 3. - pgen.fill_buffer(buffer3, seed=True) + helper3.fill_buffer(flipped=True) - pgen.verify_buffer(buffer1, seed=True) - pgen.verify_buffer(buffer2, seed=True) - pgen.verify_buffer(buffer3, seed=True) + helper1.verify_buffer(flipped=True) + helper2.verify_buffer(flipped=True) + helper3.verify_buffer(flipped=True) # Close any one buffer. buffer1.close() diff --git a/cuda_core/tests/memory_ipc/test_workerpool.py b/cuda_core/tests/memory_ipc/test_workerpool.py index 3f3f46cd27..6372b5668d 100644 --- a/cuda_core/tests/memory_ipc/test_workerpool.py +++ b/cuda_core/tests/memory_ipc/test_workerpool.py @@ -7,7 +7,9 @@ import pytest from cuda.core.experimental import Buffer, Device, DeviceMemoryResource, DeviceMemoryResourceOptions -from helpers.buffers import PatternGen +from utility import IPCBufferTestHelper + +from cuda_python_test_helpers import supports_ipc_mempool CHILD_TIMEOUT_SEC = 20 NBYTES = 64 @@ -28,6 +30,8 @@ class TestIpcWorkerPool: @pytest.mark.parametrize("nmrs", (1, NMRS)) def test_main(self, ipc_device, nmrs): + if not supports_ipc_mempool(ipc_device): + pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] @@ -36,16 +40,14 @@ def test_main(self, ipc_device, nmrs): with mp.Pool(NWORKERS) as pool: pool.map(self.process_buffer, buffers) - pgen = PatternGen(device, NBYTES) for buffer in buffers: - pgen.verify_buffer(buffer, seed=True) + IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True) buffer.close() def process_buffer(self, buffer): device = Device(buffer.memory_resource.device_id) device.set_current() - pgen = PatternGen(device, NBYTES) - pgen.fill_buffer(buffer, seed=True) + IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True) buffer.close() @@ -64,6 +66,8 @@ def init_worker(mrs): @pytest.mark.parametrize("nmrs", (1, NMRS)) def test_main(self, ipc_device, nmrs): + if not supports_ipc_mempool(ipc_device): + pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] @@ -75,9 +79,8 @@ def test_main(self, ipc_device, nmrs): [(mrs.index(buffer.memory_resource), buffer.get_ipc_descriptor()) for buffer in buffers], ) - pgen = PatternGen(device, NBYTES) for buffer in buffers: - pgen.verify_buffer(buffer, seed=True) + IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True) buffer.close() def process_buffer(self, mr_idx, buffer_desc): @@ -85,8 +88,7 @@ def process_buffer(self, mr_idx, buffer_desc): device = Device(mr.device_id) device.set_current() buffer = Buffer.from_ipc_descriptor(mr, buffer_desc) - pgen = PatternGen(device, NBYTES) - pgen.fill_buffer(buffer, seed=True) + IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True) buffer.close() @@ -108,6 +110,8 @@ def init_worker(mrs): @pytest.mark.parametrize("nmrs", (1, NMRS)) def test_main(self, ipc_device, nmrs): + if not supports_ipc_mempool(ipc_device): + pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] @@ -116,14 +120,12 @@ def test_main(self, ipc_device, nmrs): with mp.Pool(NWORKERS, initializer=self.init_worker, initargs=(mrs,)) as pool: pool.starmap(self.process_buffer, [(device, pickle.dumps(buffer)) for buffer in buffers]) - pgen = PatternGen(device, NBYTES) for buffer in buffers: - pgen.verify_buffer(buffer, seed=True) + IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True) buffer.close() def process_buffer(self, device, buffer_s): device.set_current() buffer = pickle.loads(buffer_s) # noqa: S301 - pgen = PatternGen(device, NBYTES) - pgen.fill_buffer(buffer, seed=True) + IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True) buffer.close() diff --git a/cuda_core/tests/memory_ipc/utility.py b/cuda_core/tests/memory_ipc/utility.py new file mode 100644 index 0000000000..7ce7752b6d --- /dev/null +++ b/cuda_core/tests/memory_ipc/utility.py @@ -0,0 +1,65 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import ctypes + +from cuda.core.experimental import Buffer, MemoryResource +from cuda.core.experimental._utils.cuda_utils import driver, handle_return + + +class DummyUnifiedMemoryResource(MemoryResource): + def __init__(self, device): + self.device = device + + def allocate(self, size, stream=None) -> Buffer: + ptr = handle_return(driver.cuMemAllocManaged(size, driver.CUmemAttach_flags.CU_MEM_ATTACH_GLOBAL.value)) + return Buffer.from_handle(ptr=ptr, size=size, mr=self) + + def deallocate(self, ptr, size, stream=None): + handle_return(driver.cuMemFree(ptr)) + + @property + def is_device_accessible(self) -> bool: + return True + + @property + def is_host_accessible(self) -> bool: + return True + + @property + def device_id(self) -> int: + return self.device + + +class IPCBufferTestHelper: + """A helper for manipulating memory buffers in IPC tests. + + Provides methods to fill a buffer with one of two test patterns and verify + the expected values. + """ + + def __init__(self, device, buffer): + self.device = device + self.buffer = buffer + self.scratch_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.buffer.size) + self.stream = device.create_stream() + + def fill_buffer(self, flipped=False, starting_from=0): + """Fill a device buffer with test pattern using unified memory.""" + ptr = ctypes.cast(int(self.scratch_buffer.handle), ctypes.POINTER(ctypes.c_byte)) + op = (lambda i: 255 - i) if flipped else (lambda i: i) + for i in range(self.buffer.size): + ptr[i] = ctypes.c_byte(op(starting_from + i)) + self.buffer.copy_from(self.scratch_buffer, stream=self.stream) + self.device.sync() + + def verify_buffer(self, flipped=False, starting_from=0): + """Verify the buffer contents.""" + self.scratch_buffer.copy_from(self.buffer, stream=self.stream) + self.device.sync() + ptr = ctypes.cast(int(self.scratch_buffer.handle), ctypes.POINTER(ctypes.c_byte)) + op = (lambda i: 255 - i) if flipped else (lambda i: i) + for i in range(self.buffer.size): + assert ctypes.c_byte(ptr[i]).value == ctypes.c_byte(op(starting_from + i)).value, ( + f"Buffer contains incorrect data at index {i}" + ) diff --git a/cuda_core/tests/test_event.py b/cuda_core/tests/test_event.py index 36b696aa75..746121946d 100644 --- a/cuda_core/tests/test_event.py +++ b/cuda_core/tests/test_event.py @@ -5,14 +5,21 @@ import time import cuda.core.experimental +import helpers +import numpy as np import pytest from cuda.core.experimental import ( Device, Event, EventOptions, + LaunchConfig, + LegacyPinnedMemoryResource, + Program, + ProgramOptions, + launch, ) -from helpers.latch import LatchKernel +from conftest import skipif_need_cuda_headers from cuda_python_test_helpers import IS_WSL @@ -114,24 +121,60 @@ def test_error_timing_recorded(): event3 - event2 +@skipif_need_cuda_headers # libcu++ +@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") def test_error_timing_incomplete(): device = Device() device.set_current() - latch = LatchKernel(device) + + # This kernel is designed to busy loop until a signal is received + code = """ +#include + +extern "C" +__global__ void wait(int* val) { + cuda::atomic_ref signal{*val}; + while (true) { + if (signal.load(cuda::memory_order_relaxed)) { + break; + } + } +} +""" + + arch = "".join(f"{i}" for i in device.compute_capability) + program_options = ProgramOptions( + std="c++17", + arch=f"sm_{arch}", + include_path=helpers.CCCL_INCLUDE_PATHS, + ) + prog = Program(code, code_type="c++", options=program_options) + mod = prog.compile(target_type="cubin") + ker = mod.get_kernel("wait") + + mr = LegacyPinnedMemoryResource() + b = mr.allocate(4) + arr = np.from_dlpack(b).view(np.int32) + arr[0] = 0 + + config = LaunchConfig(grid=1, block=1) + ker_args = (arr.ctypes.data,) + enabled = EventOptions(enable_timing=True) stream = device.create_stream() event1 = stream.record(options=enabled) - latch.launch(stream) + launch(stream, config, ker, *ker_args) event3 = stream.record(options=enabled) - # event3 will never complete because the latch has not been released + # event3 will never complete because the stream is waiting on wait() to complete with pytest.raises(RuntimeError, match="^One or both events have not completed."): event3 - event1 - latch.release() + arr[0] = 1 event3.sync() event3 - event1 # this should work + b.close() def test_event_device(init_cuda): diff --git a/cuda_core/tests/test_helpers.py b/cuda_core/tests/test_helpers.py deleted file mode 100644 index 25d0c26a54..0000000000 --- a/cuda_core/tests/test_helpers.py +++ /dev/null @@ -1,70 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# -# SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE - -import time - -import pytest -from cuda.core.experimental import Device -from helpers.buffers import PatternGen, compare_equal_buffers, make_scratch_buffer -from helpers.latch import LatchKernel -from helpers.logging import TimestampedLogger - -ENABLE_LOGGING = False # Set True for test debugging and development -NBYTES = 64 - - -def test_latchkernel(): - """Test LatchKernel.""" - log = TimestampedLogger() - log("begin") - device = Device() - device.set_current() - stream = device.create_stream() - target = make_scratch_buffer(device, 0, NBYTES) - zeros = make_scratch_buffer(device, 0, NBYTES) - ones = make_scratch_buffer(device, 1, NBYTES) - latch = LatchKernel(device) - log("launching latch kernel") - latch.launch(stream) - log("launching copy (0->1) kernel") - target.copy_from(ones, stream=stream) - log("going to sleep") - time.sleep(1) - log("checking target == 0") - assert compare_equal_buffers(target, zeros) - log("releasing latch and syncing") - latch.release() - stream.sync() - log("checking target == 1") - assert compare_equal_buffers(target, ones) - log("done") - - -def test_patterngen_seeds(): - """Test PatternGen with seed argument.""" - device = Device() - device.set_current() - buffer = make_scratch_buffer(device, 0, NBYTES) - - # All seeds are pairwise different. - pgen = PatternGen(device, NBYTES) - for i in range(256): - pgen.fill_buffer(buffer, seed=i) - pgen.verify_buffer(buffer, seed=i) - for j in range(i + 1, 256): - with pytest.raises(AssertionError): - pgen.verify_buffer(buffer, seed=j) - - -def test_patterngen_values(): - """Test PatternGen with value argument, also compare_equal_buffers.""" - device = Device() - device.set_current() - ones = make_scratch_buffer(device, 1, NBYTES) - twos = make_scratch_buffer(device, 2, NBYTES) - assert compare_equal_buffers(ones, ones) - assert not compare_equal_buffers(ones, twos) - pgen = PatternGen(device, NBYTES) - pgen.verify_buffer(ones, value=1) - pgen.verify_buffer(twos, value=2) diff --git a/cuda_core/tests/test_memory.py b/cuda_core/tests/test_memory.py index 0052136c4c..904997f116 100644 --- a/cuda_core/tests/test_memory.py +++ b/cuda_core/tests/test_memory.py @@ -27,7 +27,6 @@ from cuda.core.experimental._memory import DLDeviceType, IPCBufferDescriptor from cuda.core.experimental._utils.cuda_utils import handle_return from cuda.core.experimental.utils import StridedMemoryView -from helpers.buffers import DummyUnifiedMemoryResource from cuda_python_test_helpers import IS_WSL, supports_ipc_mempool @@ -96,6 +95,30 @@ def device_id(self) -> int: raise RuntimeError("the pinned memory resource is not bound to any GPU") +class DummyUnifiedMemoryResource(MemoryResource): + def __init__(self, device): + self.device = device + + def allocate(self, size, stream=None) -> Buffer: + ptr = handle_return(driver.cuMemAllocManaged(size, driver.CUmemAttach_flags.CU_MEM_ATTACH_GLOBAL.value)) + return Buffer.from_handle(ptr=ptr, size=size, mr=self) + + def deallocate(self, ptr, size, stream=None): + handle_return(driver.cuMemFree(ptr)) + + @property + def is_device_accessible(self) -> bool: + return True + + @property + def is_host_accessible(self) -> bool: + return True + + @property + def device_id(self) -> int: + return 0 + + class DummyPinnedMemoryResource(MemoryResource): def __init__(self, device): self.device = device diff --git a/cuda_core/tests/test_stream.py b/cuda_core/tests/test_stream.py index da4c66d548..505cbbf22a 100644 --- a/cuda_core/tests/test_stream.py +++ b/cuda_core/tests/test_stream.py @@ -49,6 +49,12 @@ def test_stream_record(init_cuda): assert isinstance(event, Event) +def test_stream_record_invalid_event(init_cuda): + stream = Device().create_stream(options=StreamOptions()) + with pytest.raises(TypeError): + stream.record(event="invalid_event") + + def test_stream_wait_event(init_cuda): s1 = Device().create_stream() s2 = Device().create_stream()