Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cuda_core/cuda/core/experimental/_device.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ from cuda.core.experimental._utils.cuda_utils import (
from cuda.core.experimental._stream cimport default_stream



# TODO: I prefer to type these as "cdef object" and avoid accessing them from within Python,
# but it seems it is very convenient to expose them for testing purposes...
_tls = threading.local()
Expand Down Expand Up @@ -1274,7 +1273,7 @@ class Device:
"""
self._check_context_initialized()
ctx = self._get_current_context()
return Event._init(self._id, ctx, options, True)
return Event._init(self._id, ctx, options)

def allocate(self, size, stream: Optional[Stream] = None) -> Buffer:
"""Allocate device memory from a specified stream.
Expand Down
2 changes: 0 additions & 2 deletions cuda_core/cuda/core/experimental/_event.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ cdef class Event:
cydriver.CUevent _handle
bint _timing_disabled
bint _busy_waited
bint _ipc_enabled
object _ipc_descriptor
int _device_id
object _ctx_handle

Expand Down
104 changes: 15 additions & 89 deletions cuda_core/cuda/core/experimental/_event.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

from __future__ import annotations

cimport cpython
from libc.stdint cimport uintptr_t
from libc.string cimport memcpy

from cuda.bindings cimport cydriver

Expand All @@ -16,7 +14,6 @@ from cuda.core.experimental._utils.cuda_utils cimport (
)

from dataclasses import dataclass
import multiprocessing
from typing import TYPE_CHECKING, Optional

from cuda.core.experimental._context import Context
Expand All @@ -43,15 +40,15 @@ cdef class EventOptions:
has actually been completed.
Otherwise, the CPU thread will busy-wait until the event has
been completed. (Default to False)
ipc_enabled : bool, optional
support_ipc : bool, optional
Event will be suitable for interprocess use.
Note that enable_timing must be False. (Default to False)

"""

enable_timing: Optional[bool] = False
busy_waited_sync: Optional[bool] = False
ipc_enabled: Optional[bool] = False
support_ipc: Optional[bool] = False


cdef class Event:
Expand Down Expand Up @@ -89,35 +86,24 @@ cdef class Event:
raise RuntimeError("Event objects cannot be instantiated directly. Please use Stream APIs (record).")

@classmethod
def _init(cls, device_id: int, ctx_handle: Context, options=None, is_free=False):
def _init(cls, device_id: int, ctx_handle: Context, options=None):
cdef Event self = Event.__new__(cls)
cdef EventOptions opts = check_or_create_options(EventOptions, options, "Event options")
cdef unsigned int flags = 0x0
self._timing_disabled = False
self._busy_waited = False
self._ipc_enabled = False
self._ipc_descriptor = None
if not opts.enable_timing:
flags |= cydriver.CUevent_flags.CU_EVENT_DISABLE_TIMING
self._timing_disabled = True
if opts.busy_waited_sync:
flags |= cydriver.CUevent_flags.CU_EVENT_BLOCKING_SYNC
self._busy_waited = True
if opts.ipc_enabled:
if is_free:
raise TypeError(
"IPC-enabled events must be bound; use Stream.record for creation."
)
flags |= cydriver.CUevent_flags.CU_EVENT_INTERPROCESS
self._ipc_enabled = True
if not self._timing_disabled:
raise TypeError("IPC-enabled events cannot use timing.")
if opts.support_ipc:
raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/103")
with nogil:
HANDLE_RETURN(cydriver.cuEventCreate(&self._handle, flags))
self._device_id = device_id
self._ctx_handle = ctx_handle
if opts.ipc_enabled:
self.get_ipc_descriptor()
return self

cpdef close(self):
Expand Down Expand Up @@ -165,40 +151,6 @@ cdef class Event:
raise CUDAError(err)
raise RuntimeError(explanation)

def get_ipc_descriptor(self) -> IPCEventDescriptor:
"""Export an event allocated for sharing between processes."""
if self._ipc_descriptor is not None:
return self._ipc_descriptor
if not self.is_ipc_enabled:
raise RuntimeError("Event is not IPC-enabled")
cdef cydriver.CUipcEventHandle data
with nogil:
HANDLE_RETURN(cydriver.cuIpcGetEventHandle(&data, <cydriver.CUevent>(self._handle)))
cdef bytes data_b = cpython.PyBytes_FromStringAndSize(<char*>(data.reserved), sizeof(data.reserved))
self._ipc_descriptor = IPCEventDescriptor._init(data_b, self._busy_waited)
return self._ipc_descriptor

@classmethod
def from_ipc_descriptor(cls, ipc_descriptor: IPCEventDescriptor) -> Event:
"""Import an event that was exported from another process."""
cdef cydriver.CUipcEventHandle data
memcpy(data.reserved, <const void*><const char*>(ipc_descriptor._reserved), sizeof(data.reserved))
cdef Event self = Event.__new__(cls)
with nogil:
HANDLE_RETURN(cydriver.cuIpcOpenEventHandle(&self._handle, data))
self._timing_disabled = True
self._busy_waited = ipc_descriptor._busy_waited
self._ipc_enabled = True
self._ipc_descriptor = ipc_descriptor
self._device_id = -1 # ??
self._ctx_handle = None # ??
return self

@property
def is_ipc_enabled(self) -> bool:
"""Return True if the event can be shared across process boundaries, otherwise False."""
return self._ipc_enabled

@property
def is_timing_disabled(self) -> bool:
"""Return True if the event does not record timing data, otherwise False."""
Expand All @@ -209,6 +161,11 @@ cdef class Event:
"""Return True if the event synchronization would keep the CPU busy-waiting, otherwise False."""
return self._busy_waited

@property
def is_ipc_supported(self) -> bool:
"""Return True if this event can be used as an interprocess event, otherwise False."""
raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/103")

def sync(self):
"""Synchronize until the event completes.

Expand Down Expand Up @@ -255,43 +212,12 @@ cdef class Event:
context is set current after a event is created.

"""
if self._device_id >= 0:
from ._device import Device # avoid circular import
return Device(self._device_id)

from cuda.core.experimental._device import Device # avoid circular import

return Device(self._device_id)

@property
def context(self) -> Context:
"""Return the :obj:`~_context.Context` associated with this event."""
if self._ctx_handle is not None and self._device_id >= 0:
return Context._from_ctx(self._ctx_handle, self._device_id)


cdef class IPCEventDescriptor:
"""Serializable object describing an event that can be shared between processes."""

cdef:
bytes _reserved
bint _busy_waited

def __init__(self, *arg, **kwargs):
raise RuntimeError("IPCEventDescriptor objects cannot be instantiated directly. Please use Event APIs.")

@classmethod
def _init(cls, reserved: bytes, busy_waited: bint):
cdef IPCEventDescriptor self = IPCEventDescriptor.__new__(cls)
self._reserved = reserved
self._busy_waited = busy_waited
return self

def __eq__(self, IPCEventDescriptor rhs):
# No need to check self._busy_waited.
return self._reserved == rhs._reserved

def __reduce__(self):
return self._init, (self._reserved, self._busy_waited)


def _reduce_event(event):
return event.from_ipc_descriptor, (event.get_ipc_descriptor(),)

multiprocessing.reduction.register(Event, _reduce_event)
return Context._from_ctx(self._ctx_handle, self._device_id)
8 changes: 4 additions & 4 deletions cuda_core/cuda/core/experimental/_memory.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ cdef class Buffer(_cyBuffer, MemoryResourceAttributes):
if stream is None:
# Note: match this behavior to DeviceMemoryResource.allocate()
stream = default_stream()
cdef cydriver.CUmemPoolPtrExportData data
memcpy(data.reserved, <const void*><const char*>(ipc_buffer._reserved), sizeof(data.reserved))
cdef cydriver.CUmemPoolPtrExportData share_data
memcpy(share_data.reserved, <const void*><const char*>(ipc_buffer._reserved), sizeof(share_data.reserved))
cdef cydriver.CUdeviceptr ptr
with nogil:
HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._mempool_handle, &data))
HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._mempool_handle, &share_data))
return Buffer._init(<intptr_t>ptr, ipc_buffer.size, mr, stream)

def copy_to(self, dst: Buffer = None, *, stream: Stream) -> Buffer:
Expand Down Expand Up @@ -511,7 +511,7 @@ cdef class DeviceMemoryResourceOptions:
(Default to 0)
"""
ipc_enabled : cython.bint = False
max_size : cython.size_t = 0
max_size : cython.int = 0


# TODO: cythonize this?
Expand Down
8 changes: 1 addition & 7 deletions cuda_core/cuda/core/experimental/_stream.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,7 @@ cdef class Stream:
# and CU_EVENT_RECORD_EXTERNAL, can be set in EventOptions.
if event is None:
self._get_device_and_context()
event = Event._init(<int>(self._device_id), <uintptr_t>(self._ctx_handle), options, False)
elif event.is_ipc_enabled:
raise TypeError(
"IPC-enabled events should not be re-recorded, instead create a "
"new event by supplying options."
)

event = Event._init(<int>(self._device_id), <uintptr_t>(self._ctx_handle), options)
cdef cydriver.CUevent e = (<cyEvent?>(event))._handle
with nogil:
HANDLE_RETURN(cydriver.cuEventRecord(e, self._handle))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import cuda_python_test_helpers
except ImportError:
# Import shared platform helpers for tests across repos
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[3] / "cuda_python_test_helpers"))
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[2] / "cuda_python_test_helpers"))
import cuda_python_test_helpers


Expand Down
122 changes: 0 additions & 122 deletions cuda_core/tests/helpers/buffers.py

This file was deleted.

Loading
Loading