Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cuda_core/cuda/core/experimental/_memory/_buffer.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ cdef class Buffer:
uintptr_t _ptr
size_t _size
MemoryResource _memory_resource
object _ipc_data
object _ptr_obj
Stream _alloc_stream

Expand Down
20 changes: 15 additions & 5 deletions cuda_core/cuda/core/experimental/_memory/_buffer.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ from __future__ import annotations
from libc.stdint cimport uintptr_t

from cuda.core.experimental._memory._device_memory_resource cimport DeviceMemoryResource
from cuda.core.experimental._memory._ipc cimport IPCBufferDescriptor
from cuda.core.experimental._memory._ipc cimport IPCBufferDescriptor, IPCDataForBuffer
from cuda.core.experimental._memory cimport _ipc
from cuda.core.experimental._stream cimport Stream_accept, Stream
from cuda.core.experimental._utils.cuda_utils cimport (
Expand Down Expand Up @@ -45,6 +45,7 @@ cdef class Buffer:
self._ptr = 0
self._size = 0
self._memory_resource = None
self._ipc_data = None
self._ptr_obj = None
self._alloc_stream = None

Expand All @@ -55,13 +56,14 @@ cdef class Buffer:
@classmethod
def _init(
cls, ptr: DevicePointerT, size_t size, mr: MemoryResource | None = None,
stream: Stream | None = None
stream: Stream | None = None, ipc_descriptor: IPCBufferDescriptor | None = None
):
cdef Buffer self = Buffer.__new__(cls)
self._ptr = <uintptr_t>(int(ptr))
self._ptr_obj = ptr
self._size = size
self._memory_resource = mr
self._ipc_data = IPCDataForBuffer(ipc_descriptor, True) if ipc_descriptor is not None else None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: This would be more readable with is_mapped=True.

I'd do that systematically for all IPCDataForBuffer and IPCDataForMR calls (I think there are only four calls).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I actually had exactly that initially but noticed that Cython passes the keyword arguments as true keywords (in a dict) -- I had hoped that it would perform some magic, since the call target is a cdef function. Within this project, it's hard to know how to trade readability off against performance.

self._alloc_stream = <Stream>(stream) if stream is not None else None
return self

Expand Down Expand Up @@ -92,15 +94,17 @@ cdef class Buffer:

@classmethod
def from_ipc_descriptor(
cls, mr: DeviceMemoryResource, ipc_buffer: IPCBufferDescriptor,
cls, mr: DeviceMemoryResource, ipc_descriptor: IPCBufferDescriptor,
stream: Stream = None
) -> Buffer:
"""Import a buffer that was exported from another process."""
return _ipc.Buffer_from_ipc_descriptor(cls, mr, ipc_buffer, stream)
return _ipc.Buffer_from_ipc_descriptor(cls, mr, ipc_descriptor, stream)

def get_ipc_descriptor(self) -> IPCBufferDescriptor:
"""Export a buffer allocated for sharing between processes."""
return _ipc.Buffer_get_ipc_descriptor(self)
if self._ipc_data is None:
self._ipc_data = IPCDataForBuffer(_ipc.Buffer_get_ipc_descriptor(self), False)
return self._ipc_data.ipc_descriptor

def close(self, stream: Stream | GraphBuilder | None = None):
"""Deallocate this buffer asynchronously on the given stream.
Expand Down Expand Up @@ -257,6 +261,12 @@ cdef class Buffer:
return self._memory_resource.is_host_accessible
raise NotImplementedError("WIP: Currently this property only supports buffers with associated MemoryResource")

@property
def is_mapped(self) -> bool:
"""Return True if this buffer is mapped into the process via IPC."""
return getattr(self._ipc_data, "is_mapped", False)


@property
def memory_resource(self) -> MemoryResource:
"""Return the memory resource associated with this buffer."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

from cuda.bindings cimport cydriver
from cuda.core.experimental._memory._buffer cimport MemoryResource
from cuda.core.experimental._memory._ipc cimport IPCData
from cuda.core.experimental._memory._ipc cimport IPCDataForMR


cdef class DeviceMemoryResource(MemoryResource):
cdef:
int _dev_id
cydriver.CUmemoryPool _handle
bint _mempool_owned
IPCData _ipc_data
IPCDataForMR _ipc_data
object _attributes
object _peer_accessible_by
object __weakref__
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ from libc.string cimport memset
from cuda.bindings cimport cydriver
from cuda.core.experimental._memory._buffer cimport Buffer, MemoryResource
from cuda.core.experimental._memory cimport _ipc
from cuda.core.experimental._memory._ipc cimport IPCAllocationHandle, IPCData
from cuda.core.experimental._memory._ipc cimport IPCAllocationHandle, IPCDataForMR
from cuda.core.experimental._stream cimport default_stream, Stream_accept, Stream
from cuda.core.experimental._utils.cuda_utils cimport (
check_or_create_options,
Expand Down Expand Up @@ -313,8 +313,6 @@ cdef class DeviceMemoryResource(MemoryResource):
"""
if not self.is_ipc_enabled:
raise RuntimeError("Memory resource is not IPC-enabled")
if self.is_mapped:
raise RuntimeError("Imported memory resource cannot be exported")
return self._ipc_data._alloc_handle

def allocate(self, size_t size, stream: Stream | GraphBuilder | None = None) -> Buffer:
Expand Down Expand Up @@ -540,7 +538,7 @@ cdef void DMR_init_create(

if opts.ipc_enabled:
alloc_handle = _ipc.DMR_export_mempool(self)
self._ipc_data = IPCData(alloc_handle, mapped=False)
self._ipc_data = IPCDataForMR(alloc_handle, False)


# Raise an exception if the given stream is capturing.
Expand Down
8 changes: 7 additions & 1 deletion cuda_core/cuda/core/experimental/_memory/_ipc.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ cdef cydriver.CUmemAllocationHandleType IPC_HANDLE_TYPE
cdef is_supported()


cdef class IPCData:
cdef class IPCDataForBuffer:
cdef:
IPCBufferDescriptor _ipc_descriptor
bint _is_mapped


cdef class IPCDataForMR:
cdef:
IPCAllocationHandle _alloc_handle
bint _is_mapped
Expand Down
42 changes: 25 additions & 17 deletions cuda_core/cuda/core/experimental/_memory/_ipc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ from libc.stdint cimport uintptr_t
from libc.string cimport memcpy

from cuda.bindings cimport cydriver
from cuda.core.experimental._memory._buffer cimport Buffer
from cuda.core.experimental._stream cimport default_stream
from cuda.core.experimental._utils.cuda_utils cimport (
HANDLE_RETURN,
)
from cuda.core.experimental._utils.cuda_utils cimport HANDLE_RETURN

import multiprocessing
import os
Expand All @@ -32,15 +31,27 @@ cdef cydriver.CUmemAllocationHandleType IPC_HANDLE_TYPE =
cdef is_supported():
return IPC_HANDLE_TYPE != cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE

cdef class IPCData:
"""Data members related to sharing memory pools via IPC."""
def __cinit__(self):
self._alloc_handle = None
self._is_mapped = False

def __init__(self, IPCAllocationHandle alloc_handle, bint mapped):
cdef class IPCDataForBuffer:
"""Data members related to sharing memory buffers via IPC."""
def __cinit__(self, IPCBufferDescriptor ipc_descriptor, bint is_mapped):
self._ipc_descriptor = ipc_descriptor
self._is_mapped = is_mapped

@property
def ipc_descriptor(self):
return self._ipc_descriptor

@property
def is_mapped(self):
return self._is_mapped


cdef class IPCDataForMR:
"""Data members related to sharing memory resources via IPC."""
def __cinit__(self, IPCAllocationHandle alloc_handle, bint is_mapped):
self._alloc_handle = alloc_handle
self._is_mapped = mapped
self._is_mapped = is_mapped

@property
def alloc_handle(self):
Expand Down Expand Up @@ -155,7 +166,7 @@ cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer self):
return IPCBufferDescriptor._init(data_b, self.size)

cdef Buffer Buffer_from_ipc_descriptor(
cls, DeviceMemoryResource mr, IPCBufferDescriptor ipc_buffer, stream
cls, DeviceMemoryResource mr, IPCBufferDescriptor ipc_descriptor, stream
):
"""Import a buffer that was exported from another process."""
if not mr.is_ipc_enabled:
Expand All @@ -166,13 +177,13 @@ cdef Buffer Buffer_from_ipc_descriptor(
cdef cydriver.CUmemPoolPtrExportData data
memcpy(
data.reserved,
<const void*><const char*>(ipc_buffer._payload),
<const void*><const char*>(ipc_descriptor._payload),
sizeof(data.reserved)
)
cdef cydriver.CUdeviceptr ptr
with nogil:
HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._handle, &data))
return Buffer._init(<uintptr_t>ptr, ipc_buffer.size, mr, stream)
return Buffer._init(<uintptr_t>ptr, ipc_descriptor.size, mr, stream, ipc_descriptor)


# DeviceMemoryResource IPC Implementation
Expand Down Expand Up @@ -200,7 +211,7 @@ cdef DeviceMemoryResource DMR_from_allocation_handle(cls, device_id, alloc_handl
from .._device import Device
self._dev_id = Device(device_id).device_id
self._mempool_owned = True
self._ipc_data = IPCData(alloc_handle, mapped=True)
self._ipc_data = IPCDataForMR(alloc_handle, True)

# Map the mempool into this process.
cdef int handle = int(alloc_handle)
Expand All @@ -214,9 +225,6 @@ cdef DeviceMemoryResource DMR_from_allocation_handle(cls, device_id, alloc_handl
registered = self.register(uuid)
assert registered is self

# Always close the file handle.
alloc_handle.close()

return self


Expand Down
17 changes: 1 addition & 16 deletions cuda_core/tests/memory_ipc/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,6 @@ def ASSERT(self, exc_type, exc_msg):
assert "CUDA_ERROR_INVALID_VALUE" in exc_msg


class TestExportImportedMR(ChildErrorHarness):
"""Error when exporting a memory resource that was imported."""

def PARENT_ACTION(self, queue):
queue.put(self.mr)

def CHILD_ACTION(self, queue):
mr = queue.get(timeout=CHILD_TIMEOUT_SEC)
mr.get_allocation_handle()

def ASSERT(self, exc_type, exc_msg):
assert exc_type is RuntimeError
assert exc_msg == "Imported memory resource cannot be exported"


class TestImportBuffer(ChildErrorHarness):
"""Error when using a buffer as a buffer descriptor."""

Expand All @@ -117,7 +102,7 @@ def CHILD_ACTION(self, queue):

def ASSERT(self, exc_type, exc_msg):
assert exc_type is TypeError
assert exc_msg.startswith("Argument 'ipc_buffer' has incorrect type")
assert exc_msg.startswith("Argument 'ipc_descriptor' has incorrect type")


class TestDanglingBuffer(ChildErrorHarness):
Expand Down
4 changes: 4 additions & 0 deletions cuda_core/tests/memory_ipc/test_memory_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def test_main(self, ipc_device, ipc_memory_resource):
# Set up the IPC-enabled memory pool and share it.
device = ipc_device
mr = ipc_memory_resource
assert not mr.is_mapped
pgen = PatternGen(device, NBYTES)

# Start the child process.
Expand All @@ -27,6 +28,7 @@ def test_main(self, ipc_device, ipc_memory_resource):

# Allocate and fill memory.
buffer = mr.allocate(NBYTES)
assert not buffer.is_mapped
pgen.fill_buffer(buffer, seed=False)

# Export the buffer via IPC.
Expand All @@ -42,7 +44,9 @@ def test_main(self, ipc_device, ipc_memory_resource):

def child_main(self, device, mr, queue):
device.set_current()
assert mr.is_mapped
buffer = queue.get(timeout=CHILD_TIMEOUT_SEC)
assert buffer.is_mapped
pgen = PatternGen(device, NBYTES)
pgen.verify_buffer(buffer, seed=False)
pgen.fill_buffer(buffer, seed=True)
Expand Down
Loading
Loading