Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add stream callback #373

Merged
merged 3 commits into from
Aug 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ script:
- if [ "${ALPAKA_CI_ANALYSIS}" == "OFF" ] ;then ./travis/compileExec.sh "test/unit/mem/buf/" ./memBuf ;fi
- if [ "${ALPAKA_CI_ANALYSIS}" == "OFF" ] ;then ./travis/compileExec.sh "test/unit/mem/view/" ./memView ;fi
- if [ "${ALPAKA_CI_ANALYSIS}" == "OFF" ] ;then ./travis/compileExec.sh "test/unit/meta/" ./meta ;fi
- if [ "${ALPAKA_CI_ANALYSIS}" == "OFF" ] ;then ./travis/compileExec.sh "test/unit/stream/" ./stream ;fi
- if [ "${ALPAKA_CI_ANALYSIS}" == "OFF" ] ;then ./travis/compileExec.sh "test/unit/time/" ./time ;fi
- if [ "${ALPAKA_CI_ANALYSIS}" == "OFF" ] ;then ./travis/compileExec.sh "test/unit/vec/" ./vec ;fi

Expand Down
2 changes: 1 addition & 1 deletion doc/markdown/user/implementation/mapping/CUDA.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ The following tables list the functions available in the [CUDA Runtime API](http

|CUDA|alpaka|
|---|---|
|cudaStreamAddCallback|-|
|cudaStreamAddCallback|alpaka::stream::enqueue(stream, \[\](){do_something();})|
|cudaStreamAttachMemAsync|-|
|cudaStreamCreate|<ul><li>stream = alpaka::stream::StreamCudaRtAsync(device);</li><li>stream = alpaka::stream::StreamCudaRtSync(device);</li></ul>|
|cudaStreamCreateWithFlags|see cudaStreamCreate (cudaStreamNonBlocking hard coded)|
Expand Down
19 changes: 0 additions & 19 deletions include/alpaka/stream/StreamCpuAsync.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,25 +235,6 @@ namespace alpaka
//
//-----------------------------------------------------------------------------
ALPAKA_FN_HOST static auto enqueue(
#if !(BOOST_COMP_CLANG_CUDA && BOOST_ARCH_CUDA_DEVICE)
stream::StreamCpuAsync & stream,
TTask & task)
#else
stream::StreamCpuAsync &,
TTask &)
#endif
-> void
{
// Workaround: Clang can not support this when natively compiling device code. See ConcurrentExecPool.hpp.
#if !(BOOST_COMP_CLANG_CUDA && BOOST_ARCH_CUDA_DEVICE)
stream.m_spAsyncStreamCpu->m_workerThread.enqueueTask(
task);
#endif
}
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
ALPAKA_FN_HOST static auto enqueue(
#if !(BOOST_COMP_CLANG_CUDA && BOOST_ARCH_CUDA_DEVICE)
stream::StreamCpuAsync & stream,
TTask const & task)
Expand Down
11 changes: 0 additions & 11 deletions include/alpaka/stream/StreamCpuSync.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,6 @@ namespace alpaka
stream::StreamCpuSync,
TTask>
{
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
ALPAKA_FN_HOST static auto enqueue(
stream::StreamCpuSync & stream,
TTask & task)
-> void
{
boost::ignore_unused(stream);
task();
}
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
Expand Down
86 changes: 79 additions & 7 deletions include/alpaka/stream/StreamCudaRtAsync.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ namespace alpaka
namespace detail
{
//#############################################################################
//! The CUDA RT stream implementation.
//! The CUDA RT async stream implementation.
//#############################################################################
class StreamCudaRtAsyncImpl final
{
Expand Down Expand Up @@ -132,7 +132,7 @@ namespace alpaka
}

//#############################################################################
//! The CUDA RT stream.
//! The CUDA RT async stream.
//#############################################################################
class StreamCudaRtAsync final
{
Expand Down Expand Up @@ -191,7 +191,7 @@ namespace alpaka
namespace traits
{
//#############################################################################
//! The CUDA RT stream device type trait specialization.
//! The CUDA RT async stream device type trait specialization.
//#############################################################################
template<>
struct DevType<
Expand All @@ -200,7 +200,7 @@ namespace alpaka
using type = dev::DevCudaRt;
};
//#############################################################################
//! The CUDA RT stream device get trait specialization.
//! The CUDA RT async stream device get trait specialization.
//#############################################################################
template<>
struct GetDev<
Expand All @@ -223,7 +223,7 @@ namespace alpaka
namespace traits
{
//#############################################################################
//! The CUDA RT stream event type trait specialization.
//! The CUDA RT async stream event type trait specialization.
//#############################################################################
template<>
struct EventType<
Expand All @@ -238,7 +238,79 @@ namespace alpaka
namespace traits
{
//#############################################################################
//! The CUDA RT stream test trait specialization.
//! The CUDA RT sync stream enqueue trait specialization.
//#############################################################################
template<
typename TTask>
struct Enqueue<
stream::StreamCudaRtAsync,
TTask>
{
//#############################################################################
//!
//#############################################################################
struct CallbackSynchronizationData
{
std::mutex m_mutex;
std::condition_variable m_event;
bool notified = false;
};

//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
static void CUDART_CB cudaRtCallback(cudaStream_t /*stream*/, cudaError_t /*status*/, void *arg)
{
auto& callbackSynchronizationData = *reinterpret_cast<CallbackSynchronizationData*>(arg);

{
std::unique_lock<std::mutex> lock(callbackSynchronizationData.m_mutex);
callbackSynchronizationData.notified = true;
}

callbackSynchronizationData.m_event.notify_one();
}

//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
ALPAKA_FN_HOST static auto enqueue(
stream::StreamCudaRtAsync & stream,
TTask const & task)
-> void
{
auto pCallbackSynchronizationData = std::make_shared<CallbackSynchronizationData>();

ALPAKA_CUDA_RT_CHECK(cudaStreamAddCallback(
stream.m_spStreamCudaRtAsyncImpl->m_CudaStream,
cudaRtCallback,
pCallbackSynchronizationData.get(),
0u));

std::thread t(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To create a thread per callback can be very expensive in time. e.g. PIConGPU is spawning over 2000 kernel/memcpy per second and will have over 100 tasks waiting in streams. This means we need to spawn 2k threads/s and have over 100 active threads.

Is it possible to use on thread for all callbacks (waiting in the background), add callbacks to a list and than execute the callback always from the callback thread?

We can move this also to later pull request if it is currently not easy to do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, I would like to merge this as is. I would create a follow-up issue for optimizing this.
The most flexible solution would be a thread pool with a queue of ready callbacks. If there is only one thread, the latency would be highest but it would equal your single thread solution, if there are multiple threads, the latency/resource tradeoff can be adapted per use case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge this and create the follow up ticket? My upcoming event unit tests are based on some of those stream test helpers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sry I was busy, yes I will merge it

[pCallbackSynchronizationData, task](){

// If the callback has not yet been called, we wait for it.
std::unique_lock<std::mutex> lock(pCallbackSynchronizationData->m_mutex);
if(!pCallbackSynchronizationData->notified)
{
pCallbackSynchronizationData->m_event.wait(
lock,
[pCallbackSynchronizationData](){
return pCallbackSynchronizationData->notified;
}
);
}

task();
}
);

t.detach();
}
};
//#############################################################################
//! The CUDA RT async stream test trait specialization.
//#############################################################################
template<>
struct Empty<
Expand Down Expand Up @@ -269,7 +341,7 @@ namespace alpaka
namespace traits
{
//#############################################################################
//! The CUDA RT stream thread wait trait specialization.
//! The CUDA RT async stream thread wait trait specialization.
//!
//! Blocks execution of the calling thread until the stream has finished processing all previously requested tasks (kernels, data copies, ...)
//#############################################################################
Expand Down
79 changes: 72 additions & 7 deletions include/alpaka/stream/StreamCudaRtSync.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ namespace alpaka
namespace detail
{
//#############################################################################
//! The CUDA RT stream implementation.
//! The CUDA RT sync stream implementation.
//#############################################################################
class StreamCudaRtSyncImpl final
{
Expand Down Expand Up @@ -132,7 +132,7 @@ namespace alpaka
}

//#############################################################################
//! The CUDA RT stream.
//! The CUDA RT sync stream.
//#############################################################################
class StreamCudaRtSync final
{
Expand Down Expand Up @@ -191,7 +191,7 @@ namespace alpaka
namespace traits
{
//#############################################################################
//! The CUDA RT stream device type trait specialization.
//! The CUDA RT sync stream device type trait specialization.
//#############################################################################
template<>
struct DevType<
Expand All @@ -200,7 +200,7 @@ namespace alpaka
using type = dev::DevCudaRt;
};
//#############################################################################
//! The CUDA RT stream device get trait specialization.
//! The CUDA RT sync stream device get trait specialization.
//#############################################################################
template<>
struct GetDev<
Expand All @@ -223,7 +223,7 @@ namespace alpaka
namespace traits
{
//#############################################################################
//! The CUDA RT stream event type trait specialization.
//! The CUDA RT sync stream event type trait specialization.
//#############################################################################
template<>
struct EventType<
Expand All @@ -238,7 +238,72 @@ namespace alpaka
namespace traits
{
//#############################################################################
//! The CUDA RT stream test trait specialization.
//! The CUDA RT sync stream enqueue trait specialization.
//#############################################################################
template<
typename TTask>
struct Enqueue<
stream::StreamCudaRtSync,
TTask>
{
//#############################################################################
//!
//#############################################################################
struct CallbackSynchronizationData
{
std::mutex m_mutex;
std::condition_variable m_event;
bool notified = false;
};

//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
static void CUDART_CB cudaRtCallback(cudaStream_t /*stream*/, cudaError_t /*status*/, void *arg)
{
auto& callbackSynchronizationData = *reinterpret_cast<CallbackSynchronizationData*>(arg);

{
std::unique_lock<std::mutex> lock(callbackSynchronizationData.m_mutex);
callbackSynchronizationData.notified = true;
}

callbackSynchronizationData.m_event.notify_one();
}

//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
ALPAKA_FN_HOST static auto enqueue(
stream::StreamCudaRtSync & stream,
TTask const & task)
-> void
{
auto pCallbackSynchronizationData = std::make_shared<CallbackSynchronizationData>();

ALPAKA_CUDA_RT_CHECK(cudaStreamAddCallback(
stream.m_spStreamCudaRtSyncImpl->m_CudaStream,
cudaRtCallback,
pCallbackSynchronizationData.get(),
0u));

// If the callback has not yet been called, we wait for it.
std::unique_lock<std::mutex> lock(pCallbackSynchronizationData->m_mutex);
if(!pCallbackSynchronizationData->notified)
{
pCallbackSynchronizationData->m_event.wait(
lock,
[pCallbackSynchronizationData](){
return pCallbackSynchronizationData->notified;
}
);
}

task();
}
};
//#############################################################################
//! The CUDA RT sync stream test trait specialization.
//#############################################################################
template<>
struct Empty<
Expand Down Expand Up @@ -269,7 +334,7 @@ namespace alpaka
namespace traits
{
//#############################################################################
//! The CUDA RT stream thread wait trait specialization.
//! The CUDA RT sync stream thread wait trait specialization.
//!
//! Blocks execution of the calling thread until the stream has finished processing all previously requested tasks (kernels, data copies, ...)
//#############################################################################
Expand Down
5 changes: 3 additions & 2 deletions include/alpaka/stream/Traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ namespace alpaka
//-----------------------------------------------------------------------------
//! Queues the given task in the given stream.
//!
//! If it has previously been queued, then this call will overwrite any existing state of the event.
//! Any subsequent calls which examine the status of event will only examine the completion of this most recent call to enqueue.
//! Special Handling for events:
//! If the event has previously been queued, then this call will overwrite any existing state of the event.
//! Any subsequent calls which examine the status of event will only examine the completion of this most recent call to enqueue.
//-----------------------------------------------------------------------------
template<
typename TStream,
Expand Down
Loading