diff --git a/cuda_core/cuda/core/experimental/_memory/_buffer.pxd b/cuda_core/cuda/core/experimental/_memory/_buffer.pxd index 12da84b2bd..00f8274be5 100644 --- a/cuda_core/cuda/core/experimental/_memory/_buffer.pxd +++ b/cuda_core/cuda/core/experimental/_memory/_buffer.pxd @@ -12,6 +12,7 @@ cdef class Buffer: uintptr_t _ptr size_t _size MemoryResource _memory_resource + object _ipc_data object _ptr_obj Stream _alloc_stream diff --git a/cuda_core/cuda/core/experimental/_memory/_buffer.pyx b/cuda_core/cuda/core/experimental/_memory/_buffer.pyx index 1ad79538ac..eefad08e5d 100644 --- a/cuda_core/cuda/core/experimental/_memory/_buffer.pyx +++ b/cuda_core/cuda/core/experimental/_memory/_buffer.pyx @@ -7,7 +7,7 @@ from __future__ import annotations from libc.stdint cimport uintptr_t from cuda.core.experimental._memory._device_memory_resource cimport DeviceMemoryResource -from cuda.core.experimental._memory._ipc cimport IPCBufferDescriptor +from cuda.core.experimental._memory._ipc cimport IPCBufferDescriptor, IPCDataForBuffer from cuda.core.experimental._memory cimport _ipc from cuda.core.experimental._stream cimport Stream_accept, Stream from cuda.core.experimental._utils.cuda_utils cimport ( @@ -45,6 +45,7 @@ cdef class Buffer: self._ptr = 0 self._size = 0 self._memory_resource = None + self._ipc_data = None self._ptr_obj = None self._alloc_stream = None @@ -55,13 +56,14 @@ cdef class Buffer: @classmethod def _init( cls, ptr: DevicePointerT, size_t size, mr: MemoryResource | None = None, - stream: Stream | None = None + stream: Stream | None = None, ipc_descriptor: IPCBufferDescriptor | None = None ): cdef Buffer self = Buffer.__new__(cls) self._ptr = (int(ptr)) self._ptr_obj = ptr self._size = size self._memory_resource = mr + self._ipc_data = IPCDataForBuffer(ipc_descriptor, True) if ipc_descriptor is not None else None self._alloc_stream = (stream) if stream is not None else None return self @@ -92,15 +94,17 @@ cdef class Buffer: @classmethod def from_ipc_descriptor( - cls, mr: DeviceMemoryResource, ipc_buffer: IPCBufferDescriptor, + cls, mr: DeviceMemoryResource, ipc_descriptor: IPCBufferDescriptor, stream: Stream = None ) -> Buffer: """Import a buffer that was exported from another process.""" - return _ipc.Buffer_from_ipc_descriptor(cls, mr, ipc_buffer, stream) + return _ipc.Buffer_from_ipc_descriptor(cls, mr, ipc_descriptor, stream) def get_ipc_descriptor(self) -> IPCBufferDescriptor: """Export a buffer allocated for sharing between processes.""" - return _ipc.Buffer_get_ipc_descriptor(self) + if self._ipc_data is None: + self._ipc_data = IPCDataForBuffer(_ipc.Buffer_get_ipc_descriptor(self), False) + return self._ipc_data.ipc_descriptor def close(self, stream: Stream | GraphBuilder | None = None): """Deallocate this buffer asynchronously on the given stream. @@ -257,6 +261,12 @@ cdef class Buffer: return self._memory_resource.is_host_accessible raise NotImplementedError("WIP: Currently this property only supports buffers with associated MemoryResource") + @property + def is_mapped(self) -> bool: + """Return True if this buffer is mapped into the process via IPC.""" + return getattr(self._ipc_data, "is_mapped", False) + + @property def memory_resource(self) -> MemoryResource: """Return the memory resource associated with this buffer.""" diff --git a/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pxd b/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pxd index 49867ce35f..823a270b27 100644 --- a/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pxd +++ b/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pxd @@ -4,7 +4,7 @@ from cuda.bindings cimport cydriver from cuda.core.experimental._memory._buffer cimport MemoryResource -from cuda.core.experimental._memory._ipc cimport IPCData +from cuda.core.experimental._memory._ipc cimport IPCDataForMR cdef class DeviceMemoryResource(MemoryResource): @@ -12,7 +12,7 @@ cdef class DeviceMemoryResource(MemoryResource): int _dev_id cydriver.CUmemoryPool _handle bint _mempool_owned - IPCData _ipc_data + IPCDataForMR _ipc_data object _attributes object _peer_accessible_by object __weakref__ diff --git a/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pyx b/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pyx index b07111682c..ac18079a62 100644 --- a/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pyx +++ b/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pyx @@ -12,7 +12,7 @@ from libc.string cimport memset from cuda.bindings cimport cydriver from cuda.core.experimental._memory._buffer cimport Buffer, MemoryResource from cuda.core.experimental._memory cimport _ipc -from cuda.core.experimental._memory._ipc cimport IPCAllocationHandle, IPCData +from cuda.core.experimental._memory._ipc cimport IPCAllocationHandle, IPCDataForMR from cuda.core.experimental._stream cimport default_stream, Stream_accept, Stream from cuda.core.experimental._utils.cuda_utils cimport ( check_or_create_options, @@ -313,8 +313,6 @@ cdef class DeviceMemoryResource(MemoryResource): """ if not self.is_ipc_enabled: raise RuntimeError("Memory resource is not IPC-enabled") - if self.is_mapped: - raise RuntimeError("Imported memory resource cannot be exported") return self._ipc_data._alloc_handle def allocate(self, size_t size, stream: Stream | GraphBuilder | None = None) -> Buffer: @@ -540,7 +538,7 @@ cdef void DMR_init_create( if opts.ipc_enabled: alloc_handle = _ipc.DMR_export_mempool(self) - self._ipc_data = IPCData(alloc_handle, mapped=False) + self._ipc_data = IPCDataForMR(alloc_handle, False) # Raise an exception if the given stream is capturing. diff --git a/cuda_core/cuda/core/experimental/_memory/_ipc.pxd b/cuda_core/cuda/core/experimental/_memory/_ipc.pxd index 2b9c80290d..60d96a3b33 100644 --- a/cuda_core/cuda/core/experimental/_memory/_ipc.pxd +++ b/cuda_core/cuda/core/experimental/_memory/_ipc.pxd @@ -24,7 +24,13 @@ cdef cydriver.CUmemAllocationHandleType IPC_HANDLE_TYPE cdef is_supported() -cdef class IPCData: +cdef class IPCDataForBuffer: + cdef: + IPCBufferDescriptor _ipc_descriptor + bint _is_mapped + + +cdef class IPCDataForMR: cdef: IPCAllocationHandle _alloc_handle bint _is_mapped diff --git a/cuda_core/cuda/core/experimental/_memory/_ipc.pyx b/cuda_core/cuda/core/experimental/_memory/_ipc.pyx index d9384bf2b8..22be23d9ea 100644 --- a/cuda_core/cuda/core/experimental/_memory/_ipc.pyx +++ b/cuda_core/cuda/core/experimental/_memory/_ipc.pyx @@ -7,10 +7,9 @@ from libc.stdint cimport uintptr_t from libc.string cimport memcpy from cuda.bindings cimport cydriver +from cuda.core.experimental._memory._buffer cimport Buffer from cuda.core.experimental._stream cimport default_stream -from cuda.core.experimental._utils.cuda_utils cimport ( - HANDLE_RETURN, -) +from cuda.core.experimental._utils.cuda_utils cimport HANDLE_RETURN import multiprocessing import os @@ -32,15 +31,27 @@ cdef cydriver.CUmemAllocationHandleType IPC_HANDLE_TYPE = cdef is_supported(): return IPC_HANDLE_TYPE != cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE -cdef class IPCData: - """Data members related to sharing memory pools via IPC.""" - def __cinit__(self): - self._alloc_handle = None - self._is_mapped = False - def __init__(self, IPCAllocationHandle alloc_handle, bint mapped): +cdef class IPCDataForBuffer: + """Data members related to sharing memory buffers via IPC.""" + def __cinit__(self, IPCBufferDescriptor ipc_descriptor, bint is_mapped): + self._ipc_descriptor = ipc_descriptor + self._is_mapped = is_mapped + + @property + def ipc_descriptor(self): + return self._ipc_descriptor + + @property + def is_mapped(self): + return self._is_mapped + + +cdef class IPCDataForMR: + """Data members related to sharing memory resources via IPC.""" + def __cinit__(self, IPCAllocationHandle alloc_handle, bint is_mapped): self._alloc_handle = alloc_handle - self._is_mapped = mapped + self._is_mapped = is_mapped @property def alloc_handle(self): @@ -155,7 +166,7 @@ cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer self): return IPCBufferDescriptor._init(data_b, self.size) cdef Buffer Buffer_from_ipc_descriptor( - cls, DeviceMemoryResource mr, IPCBufferDescriptor ipc_buffer, stream + cls, DeviceMemoryResource mr, IPCBufferDescriptor ipc_descriptor, stream ): """Import a buffer that was exported from another process.""" if not mr.is_ipc_enabled: @@ -166,13 +177,13 @@ cdef Buffer Buffer_from_ipc_descriptor( cdef cydriver.CUmemPoolPtrExportData data memcpy( data.reserved, - (ipc_buffer._payload), + (ipc_descriptor._payload), sizeof(data.reserved) ) cdef cydriver.CUdeviceptr ptr with nogil: HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._handle, &data)) - return Buffer._init(ptr, ipc_buffer.size, mr, stream) + return Buffer._init(ptr, ipc_descriptor.size, mr, stream, ipc_descriptor) # DeviceMemoryResource IPC Implementation @@ -200,7 +211,7 @@ cdef DeviceMemoryResource DMR_from_allocation_handle(cls, device_id, alloc_handl from .._device import Device self._dev_id = Device(device_id).device_id self._mempool_owned = True - self._ipc_data = IPCData(alloc_handle, mapped=True) + self._ipc_data = IPCDataForMR(alloc_handle, True) # Map the mempool into this process. cdef int handle = int(alloc_handle) @@ -214,9 +225,6 @@ cdef DeviceMemoryResource DMR_from_allocation_handle(cls, device_id, alloc_handl registered = self.register(uuid) assert registered is self - # Always close the file handle. - alloc_handle.close() - return self diff --git a/cuda_core/tests/memory_ipc/test_errors.py b/cuda_core/tests/memory_ipc/test_errors.py index 3e8265b39c..d6280ae0ec 100644 --- a/cuda_core/tests/memory_ipc/test_errors.py +++ b/cuda_core/tests/memory_ipc/test_errors.py @@ -87,21 +87,6 @@ def ASSERT(self, exc_type, exc_msg): assert "CUDA_ERROR_INVALID_VALUE" in exc_msg -class TestExportImportedMR(ChildErrorHarness): - """Error when exporting a memory resource that was imported.""" - - def PARENT_ACTION(self, queue): - queue.put(self.mr) - - def CHILD_ACTION(self, queue): - mr = queue.get(timeout=CHILD_TIMEOUT_SEC) - mr.get_allocation_handle() - - def ASSERT(self, exc_type, exc_msg): - assert exc_type is RuntimeError - assert exc_msg == "Imported memory resource cannot be exported" - - class TestImportBuffer(ChildErrorHarness): """Error when using a buffer as a buffer descriptor.""" @@ -117,7 +102,7 @@ def CHILD_ACTION(self, queue): def ASSERT(self, exc_type, exc_msg): assert exc_type is TypeError - assert exc_msg.startswith("Argument 'ipc_buffer' has incorrect type") + assert exc_msg.startswith("Argument 'ipc_descriptor' has incorrect type") class TestDanglingBuffer(ChildErrorHarness): diff --git a/cuda_core/tests/memory_ipc/test_memory_ipc.py b/cuda_core/tests/memory_ipc/test_memory_ipc.py index 23a3e91b7f..54d8056865 100644 --- a/cuda_core/tests/memory_ipc/test_memory_ipc.py +++ b/cuda_core/tests/memory_ipc/test_memory_ipc.py @@ -18,6 +18,7 @@ def test_main(self, ipc_device, ipc_memory_resource): # Set up the IPC-enabled memory pool and share it. device = ipc_device mr = ipc_memory_resource + assert not mr.is_mapped pgen = PatternGen(device, NBYTES) # Start the child process. @@ -27,6 +28,7 @@ def test_main(self, ipc_device, ipc_memory_resource): # Allocate and fill memory. buffer = mr.allocate(NBYTES) + assert not buffer.is_mapped pgen.fill_buffer(buffer, seed=False) # Export the buffer via IPC. @@ -42,7 +44,9 @@ def test_main(self, ipc_device, ipc_memory_resource): def child_main(self, device, mr, queue): device.set_current() + assert mr.is_mapped buffer = queue.get(timeout=CHILD_TIMEOUT_SEC) + assert buffer.is_mapped pgen = PatternGen(device, NBYTES) pgen.verify_buffer(buffer, seed=False) pgen.fill_buffer(buffer, seed=True) diff --git a/cuda_core/tests/memory_ipc/test_send_buffers.py b/cuda_core/tests/memory_ipc/test_send_buffers.py index a26b4422fe..3493828c7e 100644 --- a/cuda_core/tests/memory_ipc/test_send_buffers.py +++ b/cuda_core/tests/memory_ipc/test_send_buffers.py @@ -5,7 +5,7 @@ from itertools import cycle import pytest -from cuda.core.experimental import DeviceMemoryResource, DeviceMemoryResourceOptions +from cuda.core.experimental import Device, DeviceMemoryResource, DeviceMemoryResourceOptions from helpers.buffers import PatternGen CHILD_TIMEOUT_SEC = 20 @@ -15,39 +15,113 @@ POOL_SIZE = 2097152 -@pytest.mark.parametrize("nmrs", (1, NMRS)) -def test_ipc_send_buffers(ipc_device, nmrs): - """Test passing buffers sourced from multiple memory resources.""" - # Set up several IPC-enabled memory pools. - device = ipc_device - options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) - mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] - - # Allocate and fill memory. - buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))] - pgen = PatternGen(device, NBYTES) - for buffer in buffers: - pgen.fill_buffer(buffer, seed=False) - - # Start the child process. - process = mp.Process(target=child_main, args=(device, buffers)) - process.start() - - # Wait for the child process. - process.join(timeout=CHILD_TIMEOUT_SEC) - assert process.exitcode == 0 - - # Verify that the buffers were modified. - pgen = PatternGen(device, NBYTES) - for buffer in buffers: - pgen.verify_buffer(buffer, seed=True) +class TestIpcSendBuffers: + @pytest.mark.parametrize("nmrs", (1, NMRS)) + def test_main(self, ipc_device, nmrs): + """Test passing buffers sourced from multiple memory resources.""" + # Set up several IPC-enabled memory pools. + device = ipc_device + options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) + mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] + + # Allocate and fill memory. + buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))] + pgen = PatternGen(device, NBYTES) + for buffer in buffers: + pgen.fill_buffer(buffer, seed=False) + + # Start the child process. + process = mp.Process(target=self.child_main, args=(device, buffers)) + process.start() + + # Wait for the child process. + process.join(timeout=CHILD_TIMEOUT_SEC) + assert process.exitcode == 0 + + # Verify that the buffers were modified. + pgen = PatternGen(device, NBYTES) + for buffer in buffers: + pgen.verify_buffer(buffer, seed=True) + buffer.close() + + def child_main(self, device, buffers): + device.set_current() + pgen = PatternGen(device, NBYTES) + for buffer in buffers: + pgen.verify_buffer(buffer, seed=False) + pgen.fill_buffer(buffer, seed=True) + buffer.close() + + +class TestIpcReexport: + """ + Test re-export of an IPC-enabled memory allocation. + + Work is done by three processes as follows: + + - Process A allocates a buffer and shares it with process B. + - Process B shares it with process C. + - Process C receives the buffer, fills it, and signals completion. + + This test checks that a buffer allocated in A can be exported to B and then + re-exported from B to C. + """ + + def test_main(self, ipc_device, ipc_memory_resource): + # Set up the device. + device = ipc_device + device.set_current() + + # Allocate, fill a buffer. + mr = ipc_memory_resource + pgen = PatternGen(device, NBYTES) + buffer = mr.allocate(NBYTES) + pgen.fill_buffer(buffer, seed=0) + + # Set up communication. + q_bc = mp.Queue() + event_b, event_c = [mp.Event() for _ in range(2)] + + # Spawn B and C. + proc_b = mp.Process(target=self.process_b_main, args=(buffer, q_bc, event_b)) + proc_c = mp.Process(target=self.process_c_main, args=(q_bc, event_c)) + proc_b.start() + proc_c.start() + + # Wait for C to signal completion then clean up. + event_c.wait(timeout=CHILD_TIMEOUT_SEC) + event_b.set() # b can finish now + proc_b.join(timeout=CHILD_TIMEOUT_SEC) + proc_c.join(timeout=CHILD_TIMEOUT_SEC) + assert proc_b.exitcode == 0 + assert proc_c.exitcode == 0 + + # Verify that C’s operations are visible. + pgen.verify_buffer(buffer, seed=1) buffer.close() + def process_b_main(self, buffer, q_bc, event_b): + # Process B: receive buffer from A then forward it to C. + device = Device() + device.set_current() -def child_main(device, buffers): - device.set_current() - pgen = PatternGen(device, NBYTES) - for buffer in buffers: - pgen.verify_buffer(buffer, seed=False) - pgen.fill_buffer(buffer, seed=True) + # Forward the buffer to C. + q_bc.put(buffer) buffer.close() + + # Wait for C to receive before exiting. + event_b.wait(timeout=CHILD_TIMEOUT_SEC) + + def process_c_main(self, q_bc, event_c): + # Process C: receive buffer from B then fill it. + device = Device() + device.set_current() + + # Get the buffer and fill it. + buffer = q_bc.get(timeout=CHILD_TIMEOUT_SEC) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=1) + buffer.close() + + # Signal A that the work is complete. + event_c.set() diff --git a/cuda_core/tests/test_memory.py b/cuda_core/tests/test_memory.py index 796c12ea7d..1bbc06bd93 100644 --- a/cuda_core/tests/test_memory.py +++ b/cuda_core/tests/test_memory.py @@ -145,6 +145,7 @@ def buffer_initialization(dummy_mr: MemoryResource): assert buffer.memory_resource == dummy_mr assert buffer.is_device_accessible == dummy_mr.is_device_accessible assert buffer.is_host_accessible == dummy_mr.is_host_accessible + assert not buffer.is_mapped buffer.close()