Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
283 changes: 130 additions & 153 deletions cuda_core/cuda/core/experimental/_memoryview.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -88,155 +88,20 @@ cdef class StridedMemoryView:
cdef DLTensor *dl_tensor

# Memoized properties
cdef:
tuple _shape
tuple _strides
# a `None` value for _strides has defined meaning in dlpack and
# the cuda array interface, meaning C order, contiguous.
#
# this flag helps prevent unnecessary recompuation of _strides
bint _strides_init
object _dtype

def __init__(
self,
*,
ptr: intptr_t,
device_id: int,
is_device_accessible: bint,
readonly: bint,
metadata: object,
exporting_obj: object,
dl_tensor: intptr_t = 0,
) -> None:
self.ptr = ptr
self.device_id = device_id
self.is_device_accessible = is_device_accessible
self.readonly = readonly
self.metadata = metadata
self.exporting_obj = exporting_obj
self.dl_tensor = <DLTensor*>dl_tensor
self._shape = None
self._strides = None
self._strides_init = False
self._dtype = None

@classmethod
def from_dlpack(cls, obj: object, stream_ptr: int | None=None) -> StridedMemoryView:
cdef int dldevice, device_id
cdef bint is_device_accessible, is_readonly
is_device_accessible = False
dldevice, device_id = obj.__dlpack_device__()
if dldevice == _kDLCPU:
assert device_id == 0
device_id = -1
if stream_ptr is None:
raise BufferError("stream=None is ambiguous with view()")
elif stream_ptr == -1:
stream_ptr = None
elif dldevice == _kDLCUDA:
assert device_id >= 0
is_device_accessible = True
# no need to check other stream values, it's a pass-through
if stream_ptr is None:
raise BufferError("stream=None is ambiguous with view()")
elif dldevice in (_kDLCUDAHost, _kDLCUDAManaged):
is_device_accessible = True
# just do a pass-through without any checks, as pinned/managed memory can be
# accessed on both host and device
else:
raise BufferError("device not supported")

cdef object capsule
try:
capsule = obj.__dlpack__(
stream=int(stream_ptr) if stream_ptr else None,
max_version=(DLPACK_MAJOR_VERSION, DLPACK_MINOR_VERSION))
except TypeError:
capsule = obj.__dlpack__(
stream=int(stream_ptr) if stream_ptr else None)

cdef void* data = NULL
cdef DLTensor* dl_tensor
cdef DLManagedTensorVersioned* dlm_tensor_ver
cdef DLManagedTensor* dlm_tensor
cdef const char *used_name
if cpython.PyCapsule_IsValid(
capsule, DLPACK_VERSIONED_TENSOR_UNUSED_NAME):
data = cpython.PyCapsule_GetPointer(
capsule, DLPACK_VERSIONED_TENSOR_UNUSED_NAME)
dlm_tensor_ver = <DLManagedTensorVersioned*>data
dl_tensor = &dlm_tensor_ver.dl_tensor
is_readonly = (dlm_tensor_ver.flags & DLPACK_FLAG_BITMASK_READ_ONLY) != 0
used_name = DLPACK_VERSIONED_TENSOR_USED_NAME
cdef tuple _shape
cdef tuple _strides
cdef bint _strides_init # Has the strides tuple been init'ed?
cdef object _dtype

def __init__(self, obj=None, stream_ptr=None):
if obj is not None:
# populate self's attributes
if check_has_dlpack(obj):
view_as_dlpack(obj, stream_ptr, self)
else:
view_as_cai(obj, stream_ptr, self)
else:
assert cpython.PyCapsule_IsValid(
capsule, DLPACK_TENSOR_UNUSED_NAME)
data = cpython.PyCapsule_GetPointer(
capsule, DLPACK_TENSOR_UNUSED_NAME)
dlm_tensor = <DLManagedTensor*>data
dl_tensor = &dlm_tensor.dl_tensor
is_readonly = False
used_name = DLPACK_TENSOR_USED_NAME

cpython.PyCapsule_SetName(capsule, used_name)

return cls(
ptr=<intptr_t>dl_tensor.data,
device_id=int(device_id),
is_device_accessible=is_device_accessible,
readonly=is_readonly,
metadata=capsule,
exporting_obj=obj,
dl_tensor=<intptr_t>dl_tensor,
)

@classmethod
def from_cuda_array_interface(cls, obj: object, stream_ptr: int | None=None) -> StridedMemoryView:
cdef dict cai_data = obj.__cuda_array_interface__
if cai_data["version"] < 3:
raise BufferError("only CUDA Array Interface v3 or above is supported")
if cai_data.get("mask") is not None:
raise BufferError("mask is not supported")
if stream_ptr is None:
raise BufferError("stream=None is ambiguous with view()")

cdef intptr_t producer_s, consumer_s
stream_ptr = int(stream_ptr)
if stream_ptr != -1:
stream = cai_data.get("stream")
if stream is not None:
producer_s = <intptr_t>(stream)
consumer_s = <intptr_t>(stream_ptr)
assert producer_s > 0
# establish stream order
if producer_s != consumer_s:
e = handle_return(driver.cuEventCreate(
driver.CUevent_flags.CU_EVENT_DISABLE_TIMING))
handle_return(driver.cuEventRecord(e, producer_s))
handle_return(driver.cuStreamWaitEvent(consumer_s, e, 0))
handle_return(driver.cuEventDestroy(e))

cdef intptr_t ptr = int(cai_data["data"][0])
return cls(
ptr=ptr,
device_id=handle_return(
driver.cuPointerGetAttribute(
driver.CUpointer_attribute.CU_POINTER_ATTRIBUTE_DEVICE_ORDINAL,
ptr
)
),
is_device_accessible=True,
readonly=cai_data["data"][1],
metadata=cai_data,
exporting_obj=obj,
)

@classmethod
def from_any_interface(cls, obj: object, stream_ptr: int | None = None) -> StridedMemoryView:
if check_has_dlpack(obj):
return cls.from_dlpack(obj, stream_ptr)
return cls.from_cuda_array_interface(obj, stream_ptr)
pass

def __dealloc__(self):
if self.dl_tensor == NULL:
Expand All @@ -256,7 +121,7 @@ cdef class StridedMemoryView:
dlm_tensor.deleter(dlm_tensor)

@property
def shape(self) -> tuple[int, ...]:
def shape(self) -> tuple[int]:
if self._shape is None:
if self.exporting_obj is not None:
if self.dl_tensor != NULL:
Expand All @@ -271,7 +136,7 @@ cdef class StridedMemoryView:
return self._shape

@property
def strides(self) -> Optional[tuple[int, ...]]:
def strides(self) -> Optional[tuple[int]]:
cdef int itemsize
if self._strides_init is False:
if self.exporting_obj is not None:
Expand Down Expand Up @@ -341,7 +206,8 @@ cdef bint check_has_dlpack(obj) except*:


cdef class _StridedMemoryViewProxy:
cdef readonly:

cdef:
object obj
bint has_dlpack

Expand All @@ -351,11 +217,82 @@ cdef class _StridedMemoryViewProxy:

cpdef StridedMemoryView view(self, stream_ptr=None):
if self.has_dlpack:
return StridedMemoryView.from_dlpack(self.obj, stream_ptr)
return view_as_dlpack(self.obj, stream_ptr)
else:
return StridedMemoryView.from_cuda_array_interface(self.obj, stream_ptr)
return view_as_cai(self.obj, stream_ptr)


cdef StridedMemoryView view_as_dlpack(obj, stream_ptr, view=None):
cdef int dldevice, device_id
cdef bint is_device_accessible, is_readonly
is_device_accessible = False
dldevice, device_id = obj.__dlpack_device__()
if dldevice == _kDLCPU:
assert device_id == 0
device_id = -1
if stream_ptr is None:
raise BufferError("stream=None is ambiguous with view()")
elif stream_ptr == -1:
stream_ptr = None
elif dldevice == _kDLCUDA:
assert device_id >= 0
is_device_accessible = True
# no need to check other stream values, it's a pass-through
if stream_ptr is None:
raise BufferError("stream=None is ambiguous with view()")
elif dldevice in (_kDLCUDAHost, _kDLCUDAManaged):
is_device_accessible = True
# just do a pass-through without any checks, as pinned/managed memory can be
# accessed on both host and device
else:
raise BufferError("device not supported")

cdef object capsule
try:
capsule = obj.__dlpack__(
stream=int(stream_ptr) if stream_ptr else None,
max_version=(DLPACK_MAJOR_VERSION, DLPACK_MINOR_VERSION))
except TypeError:
capsule = obj.__dlpack__(
stream=int(stream_ptr) if stream_ptr else None)

cdef void* data = NULL
cdef DLTensor* dl_tensor
cdef DLManagedTensorVersioned* dlm_tensor_ver
cdef DLManagedTensor* dlm_tensor
cdef const char *used_name
if cpython.PyCapsule_IsValid(
capsule, DLPACK_VERSIONED_TENSOR_UNUSED_NAME):
data = cpython.PyCapsule_GetPointer(
capsule, DLPACK_VERSIONED_TENSOR_UNUSED_NAME)
dlm_tensor_ver = <DLManagedTensorVersioned*>data
dl_tensor = &dlm_tensor_ver.dl_tensor
is_readonly = bool((dlm_tensor_ver.flags & DLPACK_FLAG_BITMASK_READ_ONLY) != 0)
used_name = DLPACK_VERSIONED_TENSOR_USED_NAME
elif cpython.PyCapsule_IsValid(
capsule, DLPACK_TENSOR_UNUSED_NAME):
data = cpython.PyCapsule_GetPointer(
capsule, DLPACK_TENSOR_UNUSED_NAME)
dlm_tensor = <DLManagedTensor*>data
dl_tensor = &dlm_tensor.dl_tensor
is_readonly = False
used_name = DLPACK_TENSOR_USED_NAME
else:
assert False

cpython.PyCapsule_SetName(capsule, used_name)

cdef StridedMemoryView buf = StridedMemoryView() if view is None else view
buf.dl_tensor = dl_tensor
buf.metadata = capsule
buf.ptr = <intptr_t>(dl_tensor.data)
buf.device_id = device_id
buf.is_device_accessible = is_device_accessible
buf.readonly = is_readonly
buf.exporting_obj = obj

return buf


cdef object dtype_dlpack_to_numpy(DLDataType* dtype):
cdef int bits = dtype.bits
Expand Down Expand Up @@ -417,6 +354,46 @@ cdef object dtype_dlpack_to_numpy(DLDataType* dtype):
return numpy.dtype(np_dtype)


# Also generate for Python so we can test this code path
cpdef StridedMemoryView view_as_cai(obj, stream_ptr, view=None):
cdef dict cai_data = obj.__cuda_array_interface__
if cai_data["version"] < 3:
raise BufferError("only CUDA Array Interface v3 or above is supported")
if cai_data.get("mask") is not None:
raise BufferError("mask is not supported")
if stream_ptr is None:
raise BufferError("stream=None is ambiguous with view()")

cdef StridedMemoryView buf = StridedMemoryView() if view is None else view
buf.exporting_obj = obj
buf.metadata = cai_data
buf.dl_tensor = NULL
buf.ptr, buf.readonly = cai_data["data"]
buf.is_device_accessible = True
buf.device_id = handle_return(
driver.cuPointerGetAttribute(
driver.CUpointer_attribute.CU_POINTER_ATTRIBUTE_DEVICE_ORDINAL,
buf.ptr))

cdef intptr_t producer_s, consumer_s
stream_ptr = int(stream_ptr)
if stream_ptr != -1:
stream = cai_data.get("stream")
if stream is not None:
producer_s = <intptr_t>(stream)
consumer_s = <intptr_t>(stream_ptr)
assert producer_s > 0
# establish stream order
if producer_s != consumer_s:
e = handle_return(driver.cuEventCreate(
driver.CUevent_flags.CU_EVENT_DISABLE_TIMING))
handle_return(driver.cuEventRecord(e, producer_s))
handle_return(driver.cuStreamWaitEvent(consumer_s, e, 0))
handle_return(driver.cuEventDestroy(e))

return buf


def args_viewable_as_strided_memory(tuple arg_indices):
"""
Decorator to create proxy objects to :obj:`StridedMemoryView` for the
Expand Down
4 changes: 2 additions & 2 deletions cuda_core/tests/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,15 +609,15 @@ def test_strided_memory_view_leak():
arr = np.zeros(1048576, dtype=np.uint8)
before = sys.getrefcount(arr)
for idx in range(10):
StridedMemoryView.from_any_interface(arr, stream_ptr=-1)
StridedMemoryView(arr, stream_ptr=-1)
after = sys.getrefcount(arr)
assert before == after


def test_strided_memory_view_refcnt():
# Use Fortran ordering so strides is used
a = np.zeros((64, 4), dtype=np.uint8, order="F")
av = StridedMemoryView.from_any_interface(a, stream_ptr=-1)
av = StridedMemoryView(a, stream_ptr=-1)
# segfaults if refcnt is wrong
assert av.shape[0] == 64
assert sys.getrefcount(av.shape) >= 2
Expand Down
7 changes: 4 additions & 3 deletions cuda_core/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import numpy as np
import pytest
from cuda.core.experimental import Device
from cuda.core.experimental._memoryview import view_as_cai
from cuda.core.experimental.utils import StridedMemoryView, args_viewable_as_strided_memory


Expand Down Expand Up @@ -77,7 +78,7 @@ def my_func(arr):

def test_strided_memory_view_cpu(self, in_arr):
# stream_ptr=-1 means "the consumer does not care"
view = StridedMemoryView.from_any_interface(in_arr, stream_ptr=-1)
view = StridedMemoryView(in_arr, stream_ptr=-1)
self._check_view(view, in_arr)

def _check_view(self, view, in_arr):
Expand Down Expand Up @@ -146,7 +147,7 @@ def test_strided_memory_view_cpu(self, in_arr, use_stream):
# This is the consumer stream
s = dev.create_stream() if use_stream else None

view = StridedMemoryView.from_any_interface(in_arr, stream_ptr=s.handle if s else -1)
view = StridedMemoryView(in_arr, stream_ptr=s.handle if s else -1)
self._check_view(view, in_arr, dev)

def _check_view(self, view, in_arr, dev):
Expand Down Expand Up @@ -178,7 +179,7 @@ def test_cuda_array_interface_gpu(self, in_arr, use_stream):
# The usual path in `StridedMemoryView` prefers the DLPack interface
# over __cuda_array_interface__, so we call `view_as_cai` directly
# here so we can test the CAI code path.
view = StridedMemoryView.from_cuda_array_interface(in_arr, stream_ptr=s.handle if s else -1)
view = view_as_cai(in_arr, stream_ptr=s.handle if s else -1)
self._check_view(view, in_arr, dev)

def _check_view(self, view, in_arr, dev):
Expand Down
Loading