Skip to content

Commit

Permalink
Merge pull request cupy#4592 from leofang/mempool_impl
Browse files Browse the repository at this point in the history
CUDA 11.2: Add `MemoryAsyncPool` to support `malloc_async`
  • Loading branch information
mergify[bot] authored and chainer-ci committed Apr 6, 2021
1 parent 92a75be commit 5ae2e95
Show file tree
Hide file tree
Showing 11 changed files with 395 additions and 69 deletions.
1 change: 1 addition & 0 deletions cupy/cuda/__init__.py
Expand Up @@ -107,6 +107,7 @@ def is_available():
from cupy.cuda.memory import MemoryAsync # NOQA
from cupy.cuda.memory import MemoryPointer # NOQA
from cupy.cuda.memory import MemoryPool # NOQA
from cupy.cuda.memory import MemoryAsyncPool # NOQA
from cupy.cuda.memory import PythonFunctionAllocator # NOQA
from cupy.cuda.memory import set_allocator # NOQA
from cupy.cuda.memory import get_allocator # NOQA
Expand Down
239 changes: 195 additions & 44 deletions cupy/cuda/memory.pyx
@@ -1,4 +1,5 @@
# distutils: language = c++
cimport cpython # NOQA
cimport cython # NOQA

import atexit
Expand All @@ -10,7 +11,7 @@ import threading
import warnings
import weakref

from cupy_backends.cuda.api import runtime
from cupy_backends.cuda.api.runtime import CUDARuntimeError
from cupy._core import syncdetect

from fastrlock cimport rlock
Expand Down Expand Up @@ -49,9 +50,9 @@ class OutOfMemoryError(MemoryError):
self._limit = limit

if limit == 0:
msg = (
'Out of memory allocating {:,} bytes '
'(allocated so far: {:,} bytes).'.format(size, total))
msg = 'Out of memory allocating {:,} bytes'.format(size)
if total != -1:
msg += ' (allocated so far: {:,} bytes).'.format(total)
else:
msg = (
'Out of memory allocating {:,} bytes '
Expand Down Expand Up @@ -106,32 +107,26 @@ cdef class Memory(BaseMemory):
runtime.free(self.ptr)


cdef inline void async_alloc_check() except*:
cdef int dev_id
cdef list support = [runtime.deviceGetAttribute(
runtime.cudaDevAttrMemoryPoolsSupported, dev_id)
for dev_id in range(runtime.getDeviceCount())]
_thread_local.device_support_async_alloc = support


cdef inline void is_async_alloc_supported(int device_id) except*:
if CUDA_VERSION < 11020:
raise RuntimeError("memory_async is supported since CUDA 11.2")
if runtime._is_hip_environment:
raise RuntimeError('HIP does not support memory_async')
global is_async_alloc_support_checked
if not is_async_alloc_support_checked:
async_alloc_check()
is_async_alloc_support_checked = True
is_supported = _thread_local.device_support_async_alloc[device_id]
cdef int dev_id
cdef list support
try:
is_supported = _thread_local.device_support_async_alloc[device_id]
except AttributeError:
support = [runtime.deviceGetAttribute(
runtime.cudaDevAttrMemoryPoolsSupported, dev_id)
for dev_id in range(runtime.getDeviceCount())]
_thread_local.device_support_async_alloc = support
is_supported = support[device_id]
if not is_supported:
raise RuntimeError('Device {} does not support '
'malloc_async'.format(device_id))


cdef bint is_async_alloc_support_checked = False


@cython.no_gc
cdef class MemoryAsync(BaseMemory):
"""Asynchronous memory allocation on a CUDA device.
Expand All @@ -147,8 +142,6 @@ cdef class MemoryAsync(BaseMemory):
readonly intptr_t stream

def __init__(self, size_t size, intptr_t stream):
# TODO(leofang): perhaps we should align the memory ourselves?
# size = _round_size(size)
self.size = size
self.device_id = device.get_device_id()
# The stream is allowed to be destroyed before the memory is freed, so
Expand All @@ -174,13 +167,13 @@ cdef class MemoryAsync(BaseMemory):
if self.ptr:
try:
runtime.freeAsync(self.ptr, curr_stream)
except runtime.CUDARuntimeError as e:
except CUDARuntimeError as e:
if e.status not in ok_errors:
raise
try:
curr_stream = stream_module.get_current_stream_ptr()
runtime.freeAsync(self.ptr, curr_stream)
except runtime.CUDARuntimeError as e:
except CUDARuntimeError as e:
if e.status not in ok_errors:
raise
runtime.free(self.ptr)
Expand Down Expand Up @@ -594,7 +587,7 @@ cdef class MemoryPointer:
try:
runtime.deviceEnablePeerAccess(peer)
# peer access could already be set by external libraries at this point
except runtime.CUDARuntimeError as e:
except CUDARuntimeError as e:
if e.status != runtime.errorPeerAccessAlreadyEnabled:
raise
finally:
Expand Down Expand Up @@ -813,7 +806,7 @@ DEF ALLOCATION_UNIT_SIZE = 512
_allocation_unit_size = ALLOCATION_UNIT_SIZE


cpdef size_t _round_size(size_t size):
cpdef inline size_t _round_size(size_t size):
"""Rounds up the memory size to fit memory alignment of cudaMalloc."""
# avoid 0 div checking
size = (size + ALLOCATION_UNIT_SIZE - 1) // ALLOCATION_UNIT_SIZE
Expand Down Expand Up @@ -944,6 +937,21 @@ cdef class _Arena:
return False


# cpdef because uint-tested
# module-level function can be inlined
cpdef inline dict _parse_limit_string(limit=None):
if limit is None:
limit = os.environ.get('CUPY_GPU_MEMORY_LIMIT')
size = None
fraction = None
if limit is not None:
if limit.endswith('%'):
fraction = float(limit[:-1]) / 100.0
else:
size = int(limit)
return {'size': size, 'fraction': fraction}


@cython.final
cdef class SingleDeviceMemoryPool:
"""Memory pool implementation for single device.
Expand Down Expand Up @@ -996,7 +1004,7 @@ cdef class SingleDeviceMemoryPool:
self._in_use_lock = rlock.create_fastrlock()
self._total_bytes_lock = rlock.create_fastrlock()

self.set_limit(**(self._parse_limit_string()))
self.set_limit(**(_parse_limit_string()))

cdef _Arena _arena(self, intptr_t stream_ptr):
"""Returns appropriate arena of a given stream.
Expand Down Expand Up @@ -1213,19 +1221,6 @@ cdef class SingleDeviceMemoryPool:
with LockAndNoGc(self._total_bytes_lock):
return self._total_bytes_limit

# cpdef because uint-tested
cpdef dict _parse_limit_string(self, limit=None):
if limit is None:
limit = os.environ.get('CUPY_GPU_MEMORY_LIMIT')
size = None
fraction = None
if limit is not None:
if limit.endswith('%'):
fraction = float(limit[:-1]) / 100.0
else:
size = int(limit)
return {'size': size, 'fraction': fraction}

cdef _compact_index(self, intptr_t stream_ptr, bint free):
# need self._free_lock
cdef _Arena arena
Expand Down Expand Up @@ -1306,20 +1301,20 @@ cdef class SingleDeviceMemoryPool:
oom_error = False
try:
mem = self._alloc(size).mem
except runtime.CUDARuntimeError as e:
except CUDARuntimeError as e:
if e.status != runtime.errorMemoryAllocation:
raise
self.free_all_blocks()
try:
mem = self._alloc(size).mem
except runtime.CUDARuntimeError as e:
except CUDARuntimeError as e:
if e.status != runtime.errorMemoryAllocation:
raise
gc.collect()
self.free_all_blocks()
try:
mem = self._alloc(size).mem
except runtime.CUDARuntimeError as e:
except CUDARuntimeError as e:
if e.status != runtime.errorMemoryAllocation:
raise
oom_error = True
Expand All @@ -1334,7 +1329,7 @@ cdef class SingleDeviceMemoryPool:
return mem


cdef class MemoryPool(object):
cdef class MemoryPool:

"""Memory pool for all GPU devices on the host.
Expand Down Expand Up @@ -1491,6 +1486,162 @@ cdef class MemoryPool(object):
return mp.get_limit()


cdef class MemoryAsyncPool:
"""(Experimental) CUDA memory pool for all GPU devices on the host.
A memory pool preserves any allocations even if they are freed by the user.
One instance of this class can be used for multiple devices. This class
uses CUDA's Stream Ordered Memory Allocator (supported on CUDA 11.2+).
The simplest way to use this pool as CuPy's default allocator is the
following code::
set_allocator(MemoryAsyncPool().malloc)
Using this feature requires CUDA >= 11.2 with a supported GPU and platform.
If it is not supported, an error will be raised.
The current CuPy stream is used to allocate/free the memory.
Args:
pool_handles (str or int): A flag to indicate which mempool to use.
`'default'` is for the device's default mempool, `'current'` is for
the current mempool (which could be the default one), and an `int`
that represents ``cudaMemPool_t`` created from elsewhere for an
external mempool. A list consisting of these flags can also be
accepted, in which case the list length must equal to the total
number of visible devices so that the mempools for each device can
be set independently.
.. warning::
This feature is currently experimental and subject to change.
.. note::
:class:`MemoryAsyncPool` currently cannot work with memory hooks.
.. seealso:: `Stream Ordered Memory Allocator`_
.. _Stream Ordered Memory Allocator:
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__MEMORY__POOLS.html
"""
# This is an analogous to SingleDeviceMemoryPool + MemoryPool, but for
# CUDA's async allocator. The main purpose is to provide a memory pool
# interface for multiple devices, but given that CUDA's mempool is
# implemented at the driver level, the same pool could be shared by many
# applications in the same process, so we can't collect meaningful
# statistics like used bytes for this pool...

cdef:
# A list of cudaMemPool_t to each device's mempool
readonly list _pools

def __init__(self, pool_handles='default'):
cdef int dev_id
if (cpython.PySequence_Check(pool_handles)
and not isinstance(pool_handles, str)):
# allow different kinds of handles on each device
self._pools = [self.set_pool(pool_handles[dev_id], dev_id)
for dev_id in range(runtime.getDeviceCount())]
else:
# use the same argument for all devices
self._pools = [self.set_pool(pool_handles, dev_id)
for dev_id in range(runtime.getDeviceCount())]

cdef intptr_t set_pool(self, handle, int dev_id) except? 0:
cdef intptr_t pool
if handle == 'default':
# Use the device's default pool
pool = runtime.deviceGetDefaultMemPool(dev_id)
elif handle == 'current':
# Use the device's current pool
pool = runtime.deviceGetMemPool(dev_id)
elif handle == 'create':
# TODO(leofang): Support cudaMemPoolCreate
raise NotImplementedError('cudaMemPoolCreate is not yet supported')
elif isinstance(handle, int):
# Use an existing pool (likely from other applications?)
pool = <intptr_t>(handle)
else:
raise ValueError("handle must be "
"'default' (for the device's default pool), "
"'current' (for the device's current pool), "
"or int (a pointer to cudaMemPool_t)")
runtime.deviceSetMemPool(dev_id, pool)
return pool

cpdef MemoryPointer malloc(self, size_t size):
"""Allocate memory from the current device's pool on the current
stream.
This method can be used as a CuPy memory allocator. The simplest way to
use a memory pool as the default allocator is the following code::
set_allocator(MemoryAsyncPool().malloc)
Args:
size (int): Size of the memory buffer to allocate in bytes.
Returns:
~cupy.cuda.MemoryPointer: Pointer to the allocated buffer.
"""
_util.experimental('cupy.cuda.MemoryAsyncPool.malloc')
cdef size_t rounded_size = _round_size(size)
mem = None
oom_error = False
try:
mem = malloc_async(rounded_size)
except CUDARuntimeError as e:
if e.status != runtime.errorMemoryAllocation:
raise
stream = stream_module.get_current_stream()
stream.synchronize()
try:
mem = malloc_async(rounded_size)
except CUDARuntimeError as e:
if e.status != runtime.errorMemoryAllocation:
raise
stream.synchronize()
try:
mem = malloc_async(rounded_size)
except CUDARuntimeError as e:
if e.status != runtime.errorMemoryAllocation:
raise
oom_error = True
finally:
if mem is None:
assert oom_error
# Set total to -1 as we currently do not keep track of the
# usage of the async mempool
raise OutOfMemoryError(size, -1, 0)
return mem

cpdef free_all_blocks(self, stream=None):
# We don't have access to the mempool internal, but if there are
# any memory asynchronously freed, a synchonization will make sure
# they become visible (to both cudaMalloc and cudaMallocAsync). See
# https://github.com/cupy/cupy/issues/3777#issuecomment-758890450
runtime.deviceSynchronize()

cpdef size_t n_free_blocks(self):
raise NotImplementedError

cpdef size_t used_bytes(self):
raise NotImplementedError

cpdef size_t free_bytes(self):
raise NotImplementedError

cpdef size_t total_bytes(self):
raise NotImplementedError

cpdef set_limit(self, size=None, fraction=None):
# TODO(leofang): Support cudaMemPoolTrimTo?
raise NotImplementedError

cpdef size_t get_limit(self):
raise NotImplementedError


ctypedef void*(*malloc_func_type)(void*, size_t, int)
ctypedef void(*free_func_type)(void*, void*, int)

Expand Down
6 changes: 6 additions & 0 deletions cupy_backends/cuda/api/runtime.pxd
Expand Up @@ -102,6 +102,8 @@ cdef extern from *:
ctypedef struct cudaUUID 'cudaUUID_t':
char bytes[16]

ctypedef void* MemPool 'cudaMemPool_t'

IF CUDA_VERSION >= 11000:
# We can't use IF in the middle of structs declaration
# to add or ignore fields in compile time so we have to
Expand Down Expand Up @@ -928,6 +930,10 @@ cpdef memPrefetchAsync(intptr_t devPtr, size_t count, int dstDevice,
intptr_t stream)
cpdef memAdvise(intptr_t devPtr, size_t count, int advice, int device)
cpdef PointerAttributes pointerGetAttributes(intptr_t ptr)
cpdef intptr_t deviceGetDefaultMemPool(int) except? 0
cpdef intptr_t deviceGetMemPool(int) except? 0
cpdef deviceSetMemPool(int, intptr_t)
cpdef memPoolTrimTo(intptr_t, size_t)


###############################################################################
Expand Down

0 comments on commit 5ae2e95

Please sign in to comment.