Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cudaLaunchHostFunc #4338

Merged
merged 4 commits into from
Dec 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions cupy/cuda/stream.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,41 @@ class BaseStream(object):
object), and returns nothing.
arg (object): Argument to the callback.

"""
if runtime._is_hip_environment and self.ptr == 0:
raise RuntimeError('HIP does not allow adding callbacks to the '
'default (null) stream')
.. note::
Whenever possible, use the :meth:`launch_host_func` method
instead of this one, as it may be deprecated and removed from
CUDA at some point.

"""
def f(stream, status, dummy):
callback(self, status, arg)

runtime.streamAddCallback(self.ptr, f, 0)

def launch_host_func(self, callback, arg):
"""Launch a callback on host when all queued work is done.

Args:
callback (function): Callback function. It must take only one
argument (user data object), and returns nothing.
Comment on lines +232 to +233
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly a naive question, but add_callback supports/requires 3 arguments: stream, error status and user data. Shouldn't it be the same here? I think for the test you proposed in #4322 (comment) it would be very useful to have the stream so that we can verify that it's indeed happening on the stream we're expecting.

Copy link
Member Author

@leofang leofang Nov 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I am following the requirement from cudaLaunchHostFunc that the host function should only have 1 argument instead of 3.

For the test we would like to do there, we could either use add_callback + a 3-arg callback, or use this new function launch_host_func + a 1-arg callback, with the stream added as part of the arg:

    data = []
    data.append(stream)
    stream.launch_host_func(callback, data)

and check stream.ptr in the callback function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, I missed the fact that's how cudaLaunchHostFunc should behave out of my ignorance of that function, thanks for clarifying!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, Peter! I didn't know this function before either. I became aware of it only because your PTDS PR made me wonder if it's possible to test it programmatically other than eyeballing nvvp/nsys, and I started browsing the Runtime API doc 😄

arg (object): Argument to the callback.

.. note::
Whenever possible, this method is recommended over
:meth:`add_callback`, which may be deprecated and removed from
CUDA at some point.

.. seealso:: `cudaLaunchHostFunc()`_

.. _cudaLaunchHostFunc():
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__EXECUTION.html#group__CUDART__EXECUTION_1g05841eaa5f90f27124241baafb3e856f

"""
def f(dummy):
callback(arg)

runtime.launchHostFunc(self.ptr, f, 0)

def record(self, event=None):
"""Records an event on the stream.

Expand Down
1 change: 1 addition & 0 deletions cupy_backends/cuda/api/runtime.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,7 @@ cpdef streamDestroy(intptr_t stream)
cpdef streamSynchronize(intptr_t stream)
cpdef streamAddCallback(intptr_t stream, callback, intptr_t arg,
unsigned int flags=*)
cpdef launchHostFunc(intptr_t stream, callback, intptr_t arg)
cpdef streamQuery(intptr_t stream)
cpdef streamWaitEvent(intptr_t stream, intptr_t event, unsigned int flags=*)
cpdef intptr_t eventCreate() except? 0
Expand Down
29 changes: 29 additions & 0 deletions cupy_backends/cuda/api/runtime.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ cdef extern from *:
driver.Stream stream, Error status, void* userData)
ctypedef StreamCallbackDef* StreamCallback 'cudaStreamCallback_t'

ctypedef void HostFnDef(void* userData)
ctypedef HostFnDef* HostFn 'cudaHostFn_t'


cdef extern from '../cupy_cuda_runtime.h' nogil:

Expand Down Expand Up @@ -170,6 +173,7 @@ cdef extern from '../cupy_cuda_runtime.h' nogil:
int cudaStreamSynchronize(driver.Stream stream)
int cudaStreamAddCallback(driver.Stream stream, StreamCallback callback,
void* userData, unsigned int flags)
int cudaLaunchHostFunc(driver.Stream stream, HostFn fn, void* userData)
int cudaStreamQuery(driver.Stream stream)
int cudaStreamWaitEvent(driver.Stream stream, driver.Event event,
unsigned int flags)
Expand Down Expand Up @@ -798,8 +802,18 @@ cdef _streamCallbackFunc(driver.Stream hStream, int status,
cpython.Py_DECREF(obj)


cdef _HostFnFunc(void* func_arg) with gil:
obj = <object>func_arg
func, arg = obj
func(arg)
cpython.Py_DECREF(obj)


cpdef streamAddCallback(intptr_t stream, callback, intptr_t arg,
unsigned int flags=0):
if _is_hip_environment and stream == 0:
raise RuntimeError('HIP does not allow adding callbacks to the '
'default (null) stream')
func_arg = (callback, arg)
cpython.Py_INCREF(func_arg)
with nogil:
Expand All @@ -809,6 +823,21 @@ cpdef streamAddCallback(intptr_t stream, callback, intptr_t arg,
check_status(status)


cpdef launchHostFunc(intptr_t stream, callback, intptr_t arg):
if _is_hip_environment:
raise RuntimeError('This feature is not supported on HIP')
if CUDA_VERSION < 10000:
raise RuntimeError('This feature is only supported on CUDA 10.0+')
Comment on lines +827 to +830
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@takagi by inspecting the generated C file I noticed Cython (at least 0.29.21 that I'm using) has a nice property that for the if conditions that can be determined at compile time, Cython will be smart enough to determine the rest of code is dead and eliminate it! (So for RTD and HIP, this function actually stops right after raising.) As a result, it's fine even if we don't add any stubs (for example I forgot to add stubs for CUDA 9.2 😂) Can we rely on this behavior?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds so nice! Would you check if it works with Cython 0.28.0 as, currently, CuPy requires Cython 0.28.0 or later to build it from its source?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @takagi, I just checked Cython 0.28.0 will also eliminate the dead code! This is the warning thrown during cythonizing (in both versions):

[50/59] Cythonizing cupy_backends/cuda/api/runtime.pyx
warning: cupy_backends/cuda/api/runtime.pyx:832:16: Unreachable code

Though I think 0.28 is too outdated and should be avoided as much as possible (#4148).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I ask you to post a new issue so other core members can find that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, @takagi, see #4393.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


func_arg = (callback, arg)
cpython.Py_INCREF(func_arg)
with nogil:
status = cudaLaunchHostFunc(
<driver.Stream>stream, <HostFn>_HostFnFunc,
<void*>func_arg)
check_status(status)


cpdef streamQuery(intptr_t stream):
return cudaStreamQuery(<driver.Stream>stream)

Expand Down
1 change: 1 addition & 0 deletions cupy_backends/cuda/hip/cupy_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ typedef hipDeviceProp_t cudaDeviceProp;


typedef hipStreamCallback_t cudaStreamCallback_t;
typedef void (*cudaHostFn_t)(void* userData);
typedef hipPointerAttribute_t cudaPointerAttributes;

typedef hipChannelFormatKind cudaChannelFormatKind;
Expand Down
4 changes: 4 additions & 0 deletions cupy_backends/cuda/hip/cupy_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ cudaError_t cudaStreamAddCallback(cudaStream_t stream,
return hipStreamAddCallback(stream, callback, userData, flags);
}

cudaError_t cudaLaunchHostFunc(cudaStream_t stream, cudaHostFn_t fn, void* userData) {
return hipErrorUnknown;
}

cudaError_t cudaStreamQuery(cudaStream_t stream) {
return hipStreamQuery(stream);
}
Expand Down
4 changes: 4 additions & 0 deletions cupy_backends/cuda/stub/cupy_cuda_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ cudaError_t cudaStreamAddCallback(...) {
return cudaSuccess;
}

cudaError_t cudaLaunchHostFunc(...) {
return cudaSuccess;
}

cudaError_t cudaStreamQuery(...) {
return cudaSuccess;
}
Expand Down
1 change: 1 addition & 0 deletions docs/source/reference/cuda.rst
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ to use these functions.
cupy.cuda.runtime.streamAddCallback
cupy.cuda.runtime.streamQuery
cupy.cuda.runtime.streamWaitEvent
cupy.cuda.runtime.launchHostFunc
cupy.cuda.runtime.eventCreate
cupy.cuda.runtime.eventCreateWithFlags
cupy.cuda.runtime.eventDestroy
Expand Down
44 changes: 39 additions & 5 deletions tests/cupy_tests/cuda_tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,25 @@ def test_get_and_add_callback(self):
stream.synchronize()
assert out == list(range(N))

@attr.gpu
@unittest.skipIf(cuda.runtime.is_hip, 'HIP does not support this')
@unittest.skipIf(cuda.driver.get_build_version() < 10000,
'Only CUDA 10.0+ supports this')
def test_launch_host_func(self):
N = 100
cupy_arrays = [testing.shaped_random((2, 3)) for _ in range(N)]

stream = cuda.Stream.null

out = []
for i in range(N):
numpy_array = cupy_arrays[i].get(stream=stream)
stream.launch_host_func(
lambda t: out.append(t[0]), (i, numpy_array))

stream.synchronize()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this test here be performed on an explicit stream too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in fact done in the other test added below. It's just that the stream pointer is wrapped by ExternalStream there, but it should achieve what you had in mind.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, I should have checked more than what GH showed to see how the stream was created, thanks for pointing that out and sorry for the false alarm.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for checking! It's always good to have extra pairs of eyes 🙏

assert out == list(range(N))

@attr.gpu
def test_with_statement(self):
stream1 = cuda.Stream()
Expand Down Expand Up @@ -93,11 +112,7 @@ def test_get_and_add_callback(self):
N = 100
cupy_arrays = [testing.shaped_random((2, 3)) for _ in range(N)]

if not cuda.runtime.is_hip:
stream = cuda.Stream.null
else:
# adding callbacks to the null stream in HIP would segfault...
stream = cuda.Stream()
stream = self.stream
Comment on lines -96 to +115
Copy link
Member Author

@leofang leofang Nov 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes the mistake I made in #3835 🤦🏻‍♂️ self.stream was totally ignored...


out = []
for i in range(N):
Expand All @@ -108,3 +123,22 @@ def test_get_and_add_callback(self):

stream.synchronize()
assert out == list(range(N))

@attr.gpu
@unittest.skipIf(cuda.runtime.is_hip, 'HIP does not support this')
@unittest.skipIf(cuda.driver.get_build_version() < 10000,
'Only CUDA 10.0+ supports this')
def test_launch_host_func(self):
N = 100
cupy_arrays = [testing.shaped_random((2, 3)) for _ in range(N)]

stream = self.stream

out = []
for i in range(N):
numpy_array = cupy_arrays[i].get(stream=stream)
stream.launch_host_func(
lambda t: out.append(t[0]), (i, numpy_array))

stream.synchronize()
assert out == list(range(N))