Skip to content

Commit

Permalink
Merge pull request #7939 from leofang/accelerate_pinned_mem
Browse files Browse the repository at this point in the history
Accelerate H2D copies when the source is on pinned memory
  • Loading branch information
kmaehashi committed Nov 26, 2023
2 parents b1e3f19 + 2a2605a commit ff9b39b
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 30 deletions.
3 changes: 2 additions & 1 deletion cupy/_core/core.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ cpdef Module compile_with_cache(str source, tuple options=*, arch=*,

# TODO(niboshi): Move to _routines_creation.pyx
cpdef _ndarray_base array(
obj, dtype=*, bint copy=*, order=*, bint subok=*, Py_ssize_t ndmin=*)
obj, dtype=*, bint copy=*, order=*, bint subok=*, Py_ssize_t ndmin=*,
bint blocking=*)
cpdef _ndarray_base _convert_object_with_cuda_array_interface(a)

cdef _ndarray_base _ndarray_init(subtype, const shape_t& shape, dtype, obj)
Expand Down
68 changes: 45 additions & 23 deletions cupy/_core/core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2371,7 +2371,8 @@ _round_ufunc = create_ufunc(
# -----------------------------------------------------------------------------

cpdef _ndarray_base array(obj, dtype=None, bint copy=True, order='K',
bint subok=False, Py_ssize_t ndmin=0):
bint subok=False, Py_ssize_t ndmin=0,
bint blocking=False):
# TODO(beam2d): Support subok options
if subok:
raise NotImplementedError
Expand All @@ -2384,6 +2385,7 @@ cpdef _ndarray_base array(obj, dtype=None, bint copy=True, order='K',
if hasattr(obj, '__cuda_array_interface__'):
return _array_from_cuda_array_interface(
obj, dtype, copy, order, subok, ndmin)

if hasattr(obj, '__cupy_get_ndarray__'):
return _array_from_cupy_ndarray(
obj.__cupy_get_ndarray__(), dtype, copy, order, ndmin)
Expand All @@ -2392,9 +2394,10 @@ cpdef _ndarray_base array(obj, dtype=None, bint copy=True, order='K',
_array_info_from_nested_sequence(obj))
if concat_shape is not None:
return _array_from_nested_sequence(
obj, dtype, order, ndmin, concat_shape, concat_type, concat_dtype)
obj, dtype, order, ndmin, concat_shape, concat_type, concat_dtype,
blocking)

return _array_default(obj, dtype, order, ndmin)
return _array_default(obj, dtype, order, ndmin, blocking)


cdef _ndarray_base _array_from_cupy_ndarray(
Expand Down Expand Up @@ -2431,7 +2434,7 @@ cdef _ndarray_base _array_from_cuda_array_interface(

cdef _ndarray_base _array_from_nested_sequence(
obj, dtype, order, Py_ssize_t ndmin, concat_shape, concat_type,
concat_dtype):
concat_dtype, bint blocking):
cdef Py_ssize_t ndim

# resulting array is C order unless 'F' is explicitly specified
Expand All @@ -2450,17 +2453,18 @@ cdef _ndarray_base _array_from_nested_sequence(

if concat_type is numpy.ndarray:
return _array_from_nested_numpy_sequence(
obj, concat_dtype, dtype, concat_shape, order, ndmin)
obj, concat_dtype, dtype, concat_shape, order, ndmin,
blocking)
elif concat_type is ndarray: # TODO(takagi) Consider subclases
return _array_from_nested_cupy_sequence(
obj, dtype, concat_shape, order)
obj, dtype, concat_shape, order, blocking)
else:
assert False


cdef _ndarray_base _array_from_nested_numpy_sequence(
arrays, src_dtype, dst_dtype, const shape_t& shape, order,
Py_ssize_t ndmin):
Py_ssize_t ndmin, bint blocking):
a_dtype = get_dtype(dst_dtype) # convert to numpy.dtype
if a_dtype.char not in '?bhilqBHILQefdFD':
raise ValueError('Unsupported dtype %s' % a_dtype)
Expand All @@ -2487,20 +2491,24 @@ cdef _ndarray_base _array_from_nested_numpy_sequence(
a_dtype,
src_cpu)
a = ndarray(shape, dtype=a_dtype, order=order)
a.data.copy_from_host_async(mem.ptr, nbytes)
a.data.copy_from_host_async(mem.ptr, nbytes, stream)
pinned_memory._add_to_watch_list(stream.record(), mem)
else:
# fallback to numpy array and send it to GPU
# Note: a_cpu.ndim is always >= 1
a_cpu = numpy.array(arrays, dtype=a_dtype, copy=False, order=order,
ndmin=ndmin)
a = ndarray(shape, dtype=a_dtype, order=order)
a.data.copy_from_host(a_cpu.ctypes.data, nbytes)
a.data.copy_from_host_async(a_cpu.ctypes.data, nbytes, stream)

if blocking:
stream.synchronize()

return a


cdef _ndarray_base _array_from_nested_cupy_sequence(obj, dtype, shape, order):
cdef _ndarray_base _array_from_nested_cupy_sequence(
obj, dtype, shape, order, bint blocking):
lst = _flatten_list(obj)

# convert each scalar (0-dim) ndarray to 1-dim
Expand All @@ -2509,10 +2517,16 @@ cdef _ndarray_base _array_from_nested_cupy_sequence(obj, dtype, shape, order):
a = _manipulation.concatenate_method(lst, 0)
a = a.reshape(shape)
a = a.astype(dtype, order=order, copy=False)

if blocking:
stream = stream_module.get_current_stream()
stream.synchronize()

return a


cdef _ndarray_base _array_default(obj, dtype, order, Py_ssize_t ndmin):
cdef _ndarray_base _array_default(
obj, dtype, order, Py_ssize_t ndmin, bint blocking):
if order is not None and len(order) >= 1 and order[0] in 'KAka':
if isinstance(obj, numpy.ndarray) and obj.flags.fnc:
order = 'F'
Expand All @@ -2531,20 +2545,28 @@ cdef _ndarray_base _array_default(obj, dtype, order, Py_ssize_t ndmin):
return a
cdef Py_ssize_t nbytes = a.nbytes

cdef pinned_memory.PinnedMemoryPointer mem
stream = stream_module.get_current_stream()
# Note: even if obj is already backed by pinned memory, we still need to
# allocate an extra buffer and copy from it to avoid potential data race,
# see the discussion here:
# https://github.com/cupy/cupy/pull/5155#discussion_r621808782
cdef pinned_memory.PinnedMemoryPointer mem = (
_alloc_async_transfer_buffer(nbytes))
if mem is not None:
src_cpu = numpy.frombuffer(mem, a_dtype, a_cpu.size)
src_cpu[:] = a_cpu.ravel(order)
a.data.copy_from_host_async(mem.ptr, nbytes)
pinned_memory._add_to_watch_list(stream.record(), mem)

cdef intptr_t ptr_h = <intptr_t>(a_cpu.ctypes.data)
if pinned_memory.is_memory_pinned(ptr_h):
a.data.copy_from_host_async(ptr_h, nbytes, stream)
else:
a.data.copy_from_host(a_cpu.ctypes.data, nbytes)
# The input numpy array does not live on pinned memory, so we allocate
# an extra buffer and copy from it to avoid potential data race, see
# the discussion here:
# https://github.com/cupy/cupy/pull/5155#discussion_r621808782
mem = _alloc_async_transfer_buffer(nbytes)
if mem is not None:
src_cpu = numpy.frombuffer(mem, a_dtype, a_cpu.size)
src_cpu[:] = a_cpu.ravel(order)
a.data.copy_from_host_async(mem.ptr, nbytes, stream)
pinned_memory._add_to_watch_list(stream.record(), mem)
else:
a.data.copy_from_host_async(ptr_h, nbytes, stream)

if blocking:
stream.synchronize()

return a

Expand Down
25 changes: 19 additions & 6 deletions cupy/_creation/from_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from cupy._core import fusion


def array(obj, dtype=None, copy=True, order='K', subok=False, ndmin=0):
def array(obj, dtype=None, copy=True, order='K', subok=False, ndmin=0, *,
blocking=False):
"""Creates an array on the current device.
This function currently does not support the ``subok`` option.
Expand All @@ -28,6 +29,12 @@ def array(obj, dtype=None, copy=True, order='K', subok=False, ndmin=0):
array (default).
ndmin (int): Minimum number of dimensions. Ones are inserted to the
head of the shape if needed.
blocking (bool): Default is ``False``, meaning if a H2D copy is needed
it would run asynchronously on the current stream, and users are
responsible for ensuring the stream order. For example, writing to
the source ``obj`` without proper ordering while copying would
result in a race condition. If set to ``True``, the copy is
synchronous (with respect to the host).
Returns:
cupy.ndarray: An array on the current device.
Expand All @@ -43,10 +50,10 @@ def array(obj, dtype=None, copy=True, order='K', subok=False, ndmin=0):
.. seealso:: :func:`numpy.array`
"""
return _core.array(obj, dtype, copy, order, subok, ndmin)
return _core.array(obj, dtype, copy, order, subok, ndmin, blocking)


def asarray(a, dtype=None, order=None):
def asarray(a, dtype=None, order=None, *, blocking=False):
"""Converts an object to array.
This is equivalent to ``array(a, dtype, copy=False, order=order)``.
Expand All @@ -59,6 +66,12 @@ def asarray(a, dtype=None, order=None):
memory representation. Defaults to ``'K'``. ``order`` is ignored
for objects that are not :class:`cupy.ndarray`, but have the
``__cuda_array_interface__`` attribute.
blocking (bool): Default is ``False``, meaning if a H2D copy is needed
it would run asynchronously on the current stream, and users are
responsible for ensuring the stream order. For example, writing to
the source ``a`` without proper ordering while copying would
result in a race condition. If set to ``True``, the copy is
synchronous (with respect to the host).
Returns:
cupy.ndarray: An array on the current device. If ``a`` is already on
Expand All @@ -72,10 +85,10 @@ def asarray(a, dtype=None, order=None):
.. seealso:: :func:`numpy.asarray`
"""
return _core.array(a, dtype, False, order)
return _core.array(a, dtype, False, order, blocking=blocking)


def asanyarray(a, dtype=None, order=None):
def asanyarray(a, dtype=None, order=None, *, blocking=False):
"""Converts an object to array.
This is currently equivalent to :func:`cupy.asarray`, since there is no
Expand All @@ -86,7 +99,7 @@ def asanyarray(a, dtype=None, order=None):
.. seealso:: :func:`cupy.asarray`, :func:`numpy.asanyarray`
"""
return _core.array(a, dtype, False, order)
return _core.array(a, dtype, False, order, blocking=blocking)


def ascontiguousarray(a, dtype=None):
Expand Down
3 changes: 3 additions & 0 deletions cupy/cuda/pinned_memory.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ cdef class PinnedMemoryPool:
cpdef free(self, intptr_t ptr, size_t size)
cpdef free_all_blocks(self)
cpdef n_free_blocks(self)


cpdef bint is_memory_pinned(intptr_t data) except*
5 changes: 5 additions & 0 deletions cupy/cuda/pinned_memory.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,8 @@ cdef class PinnedMemoryPool:
finally:
rlock.unlock_fastrlock(self._lock)
return n


cpdef bint is_memory_pinned(intptr_t data) except*:
cdef runtime.PointerAttributes attrs = runtime.pointerGetAttributes(data)
return (attrs.type == runtime.memoryTypeHost)

0 comments on commit ff9b39b

Please sign in to comment.