Skip to content

Commit 4ab30f5

Browse files
committed
Async: Add AsyncLoopWork to execute callbacks in background threads on the EventLoop
1 parent 79fdec0 commit 4ab30f5

File tree

8 files changed

+135
-10
lines changed

8 files changed

+135
-10
lines changed

Documentation/Libraries/Async.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ This is the list of supported async operations:
2424
| [AsyncFileClose](@ref SC::AsyncFileClose) | @copybrief SC::AsyncFileClose |
2525
| [AsyncLoopTimeout](@ref SC::AsyncLoopTimeout) | @copybrief SC::AsyncLoopTimeout |
2626
| [AsyncLoopWakeUp](@ref SC::AsyncLoopWakeUp) | @copybrief SC::AsyncLoopWakeUp |
27-
| [AsyncProcessExit](@ref SC::AsyncProcessExit) | @copybrief SC::AsyncProcessExit |
27+
| [AsyncLoopWork](@ref SC::AsyncLoopWork) | @copybrief SC::AsyncLoopWork |
28+
| [AsyncProcessExit](@ref SC::AsyncProcessExit) | @copybrief SC::AsyncProcessExit |
2829
| [AsyncFilePoll](@ref SC::AsyncFilePoll) | @copybrief SC::AsyncFilePoll |
2930

3031
# Status
@@ -53,6 +54,9 @@ Event loop can be run in different ways to allow integrated it in multiple ways
5354
## AsyncLoopWakeUp
5455
@copydoc SC::AsyncLoopWakeUp
5556

57+
## AsyncLoopWork
58+
@copydoc SC::AsyncLoopWork
59+
5660
## AsyncProcessExit
5761
@copydoc SC::AsyncProcessExit
5862

Libraries/Async/Async.cpp

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const char* SC::AsyncRequest::TypeToString(Type type)
3636
{
3737
case Type::LoopTimeout: return "LoopTimeout";
3838
case Type::LoopWakeUp: return "LoopWakeUp";
39+
case Type::LoopWork: return "LoopWork";
3940
case Type::ProcessExit: return "ProcessExit";
4041
case Type::SocketAccept: return "SocketAccept";
4142
case Type::SocketConnect: return "SocketConnect";
@@ -120,6 +121,12 @@ SC::Result SC::AsyncLoopWakeUp::start(AsyncEventLoop& loop, EventObject* eo)
120121

121122
SC::Result SC::AsyncLoopWakeUp::wakeUp() { return getEventLoop()->wakeUpFromExternalThread(*this); }
122123

124+
SC::Result SC::AsyncLoopWork::start(AsyncEventLoop& loop, ThreadPool& threadPool)
125+
{
126+
SC_TRY_MSG(work.isValid(), "AsyncLoopWork::start - Invalid work callback");
127+
return queueSubmission(loop, threadPool, task);
128+
}
129+
123130
SC::Result SC::AsyncProcessExit::start(AsyncEventLoop& loop, ProcessDescriptor::Handle process)
124131
{
125132
handle = process;
@@ -189,8 +196,14 @@ SC::Result SC::AsyncFileRead::start(AsyncEventLoop& loop, ThreadPool& threadPool
189196
SC_TRY_MSG(buffer.sizeInBytes() > 0, "AsyncFileRead::start - Zero sized read buffer");
190197
SC_TRY_MSG(fileDescriptor != FileDescriptor::Invalid, "AsyncFileRead::start - Invalid file descriptor");
191198
SC_TRY(validateAsync());
192-
SC_TRY(queueSubmission(loop, threadPool, task));
193-
return SC::Result(true);
199+
if (loop.internalSelf.makesSenseToRunInThreadPool(*this))
200+
{
201+
return queueSubmission(loop, threadPool, task);
202+
}
203+
else
204+
{
205+
return queueSubmission(loop);
206+
}
194207
}
195208

196209
SC::Result SC::AsyncFileWrite::start(AsyncEventLoop& loop)
@@ -207,8 +220,14 @@ SC::Result SC::AsyncFileWrite::start(AsyncEventLoop& loop, ThreadPool& threadPoo
207220
SC_TRY_MSG(buffer.sizeInBytes() > 0, "AsyncFileWrite::start - Zero sized write buffer");
208221
SC_TRY_MSG(fileDescriptor != FileDescriptor::Invalid, "AsyncFileWrite::start - Invalid file descriptor");
209222
SC_TRY(validateAsync());
210-
SC_TRY(queueSubmission(loop, threadPool, task));
211-
return SC::Result(true);
223+
if (loop.internalSelf.makesSenseToRunInThreadPool(*this))
224+
{
225+
return queueSubmission(loop, threadPool, task);
226+
}
227+
else
228+
{
229+
return queueSubmission(loop);
230+
}
212231
}
213232

214233
SC::Result SC::AsyncFileClose::start(AsyncEventLoop& loop, FileDescriptor::Handle fd)
@@ -336,7 +355,7 @@ SC::Result SC::AsyncEventLoop::Private::queueSubmission(AsyncRequest& async, Asy
336355
async.state = AsyncRequest::State::Setup;
337356

338357
// Only set the async tasks for operations and backends that are not io_uring
339-
if (task and eventLoop->internalSelf.makesSenseToRunInThreadPool(async))
358+
if (task)
340359
{
341360
async.asyncTask = task;
342361
task->async = &async;
@@ -934,6 +953,7 @@ void SC::AsyncEventLoop::Private::removeActiveHandle(AsyncRequest& async)
934953
{
935954
case AsyncRequest::Type::LoopTimeout: activeLoopTimeouts.remove(*static_cast<AsyncLoopTimeout*>(&async)); break;
936955
case AsyncRequest::Type::LoopWakeUp: activeLoopWakeUps.remove(*static_cast<AsyncLoopWakeUp*>(&async)); break;
956+
case AsyncRequest::Type::LoopWork: activeLoopWork.remove(*static_cast<AsyncLoopWork*>(&async)); break;
937957
case AsyncRequest::Type::ProcessExit: activeProcessExits.remove(*static_cast<AsyncProcessExit*>(&async)); break;
938958
case AsyncRequest::Type::SocketAccept: activeSocketAccepts.remove(*static_cast<AsyncSocketAccept*>(&async)); break;
939959
case AsyncRequest::Type::SocketConnect: activeSocketConnects.remove(*static_cast<AsyncSocketConnect*>(&async)); break;
@@ -970,6 +990,7 @@ void SC::AsyncEventLoop::Private::addActiveHandle(AsyncRequest& async)
970990
{
971991
case AsyncRequest::Type::LoopTimeout: activeLoopTimeouts.queueBack(*static_cast<AsyncLoopTimeout*>(&async)); break;
972992
case AsyncRequest::Type::LoopWakeUp: activeLoopWakeUps.queueBack(*static_cast<AsyncLoopWakeUp*>(&async)); break;
993+
case AsyncRequest::Type::LoopWork: activeLoopWork.queueBack(*static_cast<AsyncLoopWork*>(&async)); break;
973994
case AsyncRequest::Type::ProcessExit: activeProcessExits.queueBack(*static_cast<AsyncProcessExit*>(&async)); break;
974995
case AsyncRequest::Type::SocketAccept: activeSocketAccepts.queueBack(*static_cast<AsyncSocketAccept*>(&async)); break;
975996
case AsyncRequest::Type::SocketConnect: activeSocketConnects.queueBack(*static_cast<AsyncSocketConnect*>(&async)); break;
@@ -997,6 +1018,7 @@ SC::Result SC::AsyncEventLoop::Private::applyOnAsync(AsyncRequest& async, Lambda
9971018
{
9981019
case AsyncRequest::Type::LoopTimeout: SC_TRY(lambda(*static_cast<AsyncLoopTimeout*>(&async))); break;
9991020
case AsyncRequest::Type::LoopWakeUp: SC_TRY(lambda(*static_cast<AsyncLoopWakeUp*>(&async))); break;
1021+
case AsyncRequest::Type::LoopWork: SC_TRY(lambda(*static_cast<AsyncLoopWork*>(&async))); break;
10001022
case AsyncRequest::Type::ProcessExit: SC_TRY(lambda(*static_cast<AsyncProcessExit*>(&async))); break;
10011023
case AsyncRequest::Type::SocketAccept: SC_TRY(lambda(*static_cast<AsyncSocketAccept*>(&async))); break;
10021024
case AsyncRequest::Type::SocketConnect: SC_TRY(lambda(*static_cast<AsyncSocketConnect*>(&async))); break;

Libraries/Async/Async.h

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ struct SC::AsyncRequest
137137
{
138138
LoopTimeout, ///< Request is an AsyncLoopTimeout object
139139
LoopWakeUp, ///< Request is an AsyncLoopWakeUp object
140+
LoopWork, ///< Request is an AsyncLoopWork object
140141
ProcessExit, ///< Request is an AsyncProcessExit object
141142
SocketAccept, ///< Request is an AsyncSocketAccept object
142143
SocketConnect, ///< Request is an AsyncSocketConnect object
@@ -349,6 +350,33 @@ struct AsyncLoopWakeUp : public AsyncRequest
349350
Atomic<bool> pending = false;
350351
};
351352

353+
/// @brief Executes work in a thread pool and then invokes a callback on the event loop thread. @n
354+
/// AsyncLoopWork::work is invoked on one of the thread supplied by the ThreadPool passed during AsyncLoopWork::start.
355+
/// AsyncLoopWork::callback will be called as a completion, on the event loop thread AFTER work callback is finished.
356+
///
357+
/// \snippet Libraries/Async/Tests/AsyncTest.cpp AsyncLoopWorkSnippet1
358+
struct AsyncLoopWork : public AsyncRequest
359+
{
360+
AsyncLoopWork() : AsyncRequest(Type::LoopWork) {}
361+
362+
/// @brief Completion data for AsyncLoopWakeUp
363+
using CompletionData = AsyncCompletionData;
364+
365+
/// @brief Callback result for AsyncLoopWakeUp
366+
using Result = AsyncResultOf<AsyncLoopWork, CompletionData>;
367+
368+
/// @brief Schedule work to be executed on a background thread, notifying the event loop when it's finished.
369+
/// @param eventLoop The AsyncEventLoop where to schedule this work on
370+
/// @param threadPool The ThreadPool that will supply the background thread
371+
[[nodiscard]] SC::Result start(AsyncEventLoop& eventLoop, ThreadPool& threadPool);
372+
373+
Function<SC::Result()> work; /// Called to execute the work in a background threadpool thread
374+
Function<void(Result&)> callback; /// Called after work is done, on the thread calling EventLoop::run()
375+
376+
private:
377+
AsyncTaskOf<AsyncLoopWork> task;
378+
};
379+
352380
/// @brief Starts monitoring a process, notifying about its termination.
353381
/// @ref library_process library can be used to start a process and obtain the native process handle.
354382
///
@@ -390,7 +418,7 @@ struct AsyncProcessExit : public AsyncRequest
390418
detail::WinOverlappedOpaque overlapped;
391419
detail::WinWaitHandle waitHandle;
392420
#elif SC_PLATFORM_LINUX
393-
FileDescriptor pidFd;
421+
FileDescriptor pidFd;
394422
#endif
395423
};
396424

@@ -897,9 +925,9 @@ struct SC::AsyncEventLoop
897925

898926
struct PrivateDefinition
899927
{
900-
static constexpr int Windows = 320;
901-
static constexpr int Apple = 344;
902-
static constexpr int Default = 328;
928+
static constexpr int Windows = 336;
929+
static constexpr int Apple = 360;
930+
static constexpr int Default = 344;
903931

904932
static constexpr size_t Alignment = 8;
905933

@@ -950,6 +978,8 @@ struct SC::AsyncEventLoop
950978
InternalOpaque internal;
951979
Internal& internalSelf;
952980
friend struct AsyncRequest;
981+
friend struct AsyncFileWrite;
982+
friend struct AsyncFileRead;
953983
};
954984

955985
//! @}

Libraries/Async/Internal/AsyncLinux.inl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,11 @@ struct SC::AsyncEventLoop::KernelQueueIoURing
342342
//-------------------------------------------------------------------------------------------------------
343343
// Nothing to do :)
344344

345+
//-------------------------------------------------------------------------------------------------------
346+
// WORK
347+
//-------------------------------------------------------------------------------------------------------
348+
static Result executeOperation(AsyncLoopWork& loopWork, AsyncLoopWork::CompletionData&) { return loopWork.work(); }
349+
345350
//-------------------------------------------------------------------------------------------------------
346351
// Socket ACCEPT
347352
//-------------------------------------------------------------------------------------------------------

Libraries/Async/Internal/AsyncPosix.inl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,11 @@ struct SC::AsyncEventLoop::KernelQueuePosix
474474
//-------------------------------------------------------------------------------------------------------
475475
// Nothing to do :)
476476

477+
//-------------------------------------------------------------------------------------------------------
478+
// WORK
479+
//-------------------------------------------------------------------------------------------------------
480+
static Result executeOperation(AsyncLoopWork& loopWork, AsyncLoopWork::CompletionData&) { return loopWork.work(); }
481+
477482
//-------------------------------------------------------------------------------------------------------
478483
// Socket ACCEPT
479484
//-------------------------------------------------------------------------------------------------------

Libraries/Async/Internal/AsyncPrivate.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ struct SC::AsyncEventLoop::Private
2020
// Active phase
2121
IntrusiveDoubleLinkedList<AsyncLoopTimeout> activeLoopTimeouts;
2222
IntrusiveDoubleLinkedList<AsyncLoopWakeUp> activeLoopWakeUps;
23+
IntrusiveDoubleLinkedList<AsyncLoopWork> activeLoopWork;
2324
IntrusiveDoubleLinkedList<AsyncProcessExit> activeProcessExits;
2425
IntrusiveDoubleLinkedList<AsyncSocketAccept> activeSocketAccepts;
2526
IntrusiveDoubleLinkedList<AsyncSocketConnect> activeSocketConnects;

Libraries/Async/Internal/AsyncWindows.inl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,12 @@ struct SC::AsyncEventLoop::KernelQueue
239239
//-------------------------------------------------------------------------------------------------------
240240
[[nodiscard]] static bool setupAsync(AsyncLoopWakeUp&) { return true; }
241241

242+
//-------------------------------------------------------------------------------------------------------
243+
// WORK
244+
//-------------------------------------------------------------------------------------------------------
245+
static bool setupAsync(AsyncLoopWork&) { return true; }
246+
static Result executeOperation(AsyncLoopWork& loopWork, AsyncLoopWork::CompletionData&) { return loopWork.work(); }
247+
242248
//-------------------------------------------------------------------------------------------------------
243249
// Socket ACCEPT
244250
//-------------------------------------------------------------------------------------------------------

Libraries/Async/Tests/AsyncTest.cpp

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ struct SC::AsyncTest : public SC::TestCase
2727
}
2828
for (int i = 0; i < numTestsToRun; ++i)
2929
{
30+
if (test_section("loop work"))
31+
{
32+
loopWork();
33+
}
3034
loopTimeout();
3135
loopWakeUpFromExternalThread();
3236
loopWakeUp();
@@ -50,6 +54,8 @@ struct SC::AsyncTest : public SC::TestCase
5054
}
5155
}
5256

57+
void loopWork();
58+
5359
void loopFreeSubmittingOnClose()
5460
{
5561
// This test checks that on close asyncs being submitted are being removed for submission queue and set as Free.
@@ -819,6 +825,52 @@ struct SC::AsyncTest : public SC::TestCase
819825
}
820826
};
821827

828+
void SC::AsyncTest::loopWork()
829+
{
830+
//! [AsyncLoopWorkSnippet1]
831+
// This test creates a thread pool with 4 thread and 16 AsyncLoopWork.
832+
// All the 16 AsyncLoopWork are scheduled to do some work on a background thread.
833+
// After work is done, their respective after-work callback is invoked on the event loop thread.
834+
835+
static constexpr int NUM_THREADS = 4;
836+
static constexpr int NUM_WORKS = NUM_THREADS * NUM_THREADS;
837+
838+
ThreadPool threadPool;
839+
SC_TEST_EXPECT(threadPool.create(NUM_THREADS));
840+
841+
AsyncEventLoop eventLoop;
842+
SC_TEST_EXPECT(eventLoop.create());
843+
844+
AsyncLoopWork works[NUM_WORKS];
845+
846+
int numAfterWorkCallbackCalls = 0;
847+
Atomic<int> numWorkCallbackCalls = 0;
848+
849+
for (int idx = 0; idx < NUM_WORKS; ++idx)
850+
{
851+
works[idx].work = [&]
852+
{
853+
// This work callback is called on some random threadPool thread
854+
Thread::Sleep(50); // Execute some work on the thread
855+
numWorkCallbackCalls.fetch_add(1); // Atomically increment this counter
856+
return Result(true);
857+
};
858+
works[idx].callback = [&](AsyncLoopWork::Result&)
859+
{
860+
// This after-work callback is invoked on the event loop thread.
861+
// More precisely this runs on the thread calling eventLoop.run().
862+
numAfterWorkCallbackCalls++; // No need for atomics here, callback is run inside loop thread
863+
};
864+
SC_TEST_EXPECT(works[idx].start(eventLoop, threadPool));
865+
}
866+
SC_TEST_EXPECT(eventLoop.run());
867+
868+
// Check that callbacks have been actually called
869+
SC_TEST_EXPECT(numWorkCallbackCalls.load() == NUM_WORKS);
870+
SC_TEST_EXPECT(numAfterWorkCallbackCalls == NUM_WORKS);
871+
//! [AsyncLoopWorkSnippet1]
872+
}
873+
822874
namespace SC
823875
{
824876
void runAsyncTest(SC::TestReport& report) { AsyncTest test(report); }

0 commit comments

Comments
 (0)