Skip to content

Commit 0526b5c

Browse files
committed
Async: Split runOnce into submitRequests, blockingPoll and dispatchCompletions
This allows easy integration with third party event loops, including GUI event Loops. It allows polling in a secondary thread and submitting or running completions on the third party event loop (or GUI) thread.
1 parent 138d1d8 commit 0526b5c

File tree

7 files changed

+174
-39
lines changed

7 files changed

+174
-39
lines changed

Documentation/Libraries/Async.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,24 @@ This is usable but needs some more testing and a few more features.
4040

4141
### Run modes
4242

43-
Event loop can be run in different ways to allow integrated it in multiple ways in applications that may already have an existing event loop (example GUI applications).
43+
Event loop can be run in different ways to allow integrated it in multiple ways in applications.
4444

4545
| Run mode | Description |
4646
|:------------------------------|:------------------------------------------|
4747
| SC::AsyncEventLoop::run | @copydoc SC::AsyncEventLoop::run |
4848
| SC::AsyncEventLoop::runOnce | @copydoc SC::AsyncEventLoop::runOnce |
4949
| SC::AsyncEventLoop::runNoWait | @copydoc SC::AsyncEventLoop::runNoWait |
5050

51+
52+
Alternatively user can explicitly use three methods to submit, poll and dispatch events.
53+
This is very useful to integrate the event loop into applications with other event loops (for example GUI applications).
54+
55+
| Run mode | Description |
56+
|:------------------------------------------|:--------------------------------------------------|
57+
| SC::AsyncEventLoop::submitRequests | @copydoc SC::AsyncEventLoop::submitRequests |
58+
| SC::AsyncEventLoop::blockingPoll | @copydoc SC::AsyncEventLoop::blockingPoll |
59+
| SC::AsyncEventLoop::dispatchCompletions | @copydoc SC::AsyncEventLoop::dispatchCompletions |
60+
5161
## AsyncLoopTimeout
5262
@copydoc SC::AsyncLoopTimeout
5363

@@ -105,7 +115,6 @@ SC::ArenaMap from the [Containers](@ref library_containers) can be used to preal
105115
# Roadmap
106116

107117
🟩 Usable Features:
108-
- Implement option to do blocking poll check without dispatching callbacks (needed for efficient gui event loop integration)
109118
- More comprehensive test suite, testing all cancellations
110119
- FS operations (open stat read write unlink copyfile mkdir chmod etc.)
111120
- UDP Send/Receive

Libraries/Async/Async.cpp

Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,21 @@ SC::Result SC::AsyncEventLoop::run()
276276
return SC::Result(true);
277277
}
278278

279+
SC::Result SC::AsyncEventLoop::submitRequests(AsyncKernelEvents& kernelEvents)
280+
{
281+
return internal.submitRequests(kernelEvents);
282+
}
283+
284+
SC::Result SC::AsyncEventLoop::blockingPoll(AsyncKernelEvents& kernelEvents)
285+
{
286+
return internal.blockingPoll(Internal::SyncMode::ForcedForwardProgress, kernelEvents);
287+
}
288+
289+
SC::Result SC::AsyncEventLoop::dispatchCompletions(AsyncKernelEvents& kernelEvents)
290+
{
291+
return internal.dispatchCompletions(Internal::SyncMode::ForcedForwardProgress, kernelEvents);
292+
}
293+
279294
template <>
280295
void SC::AsyncEventLoop::InternalOpaque::construct(Handle& buffer)
281296
{
@@ -369,14 +384,14 @@ SC::AsyncLoopTimeout* SC::AsyncEventLoop::Internal::findEarliestLoopTimeout() co
369384
return earliestTime;
370385
}
371386

372-
void SC::AsyncEventLoop::Internal::invokeExpiredTimers(AsyncLoopTimeout& timeout)
387+
void SC::AsyncEventLoop::Internal::invokeExpiredTimers(Time::HighResolutionCounter currentTime)
373388
{
374389
AsyncLoopTimeout* async;
375390
for (async = activeLoopTimeouts.front; //
376391
async != nullptr; //
377392
async = static_cast<AsyncLoopTimeout*>(async->next))
378393
{
379-
if (timeout.expirationTime.isLaterThanOrEqualTo(async->expirationTime))
394+
if (currentTime.isLaterThanOrEqualTo(async->expirationTime))
380395
{
381396
removeActiveHandle(*async);
382397
AsyncLoopTimeout::Result result(*async, Result(true));
@@ -533,7 +548,20 @@ SC::Result SC::AsyncEventLoop::Internal::completeAndEventuallyReactivate(KernelE
533548

534549
SC::Result SC::AsyncEventLoop::Internal::runStep(SyncMode syncMode)
535550
{
536-
KernelEvents kernelEvents(loop->internal.kernelQueue.get());
551+
alignas(uint64_t) uint8_t buffer[8 * 1024]; // 8 Kb of kernel events
552+
AsyncKernelEvents kernelEvents;
553+
kernelEvents.eventsMemory = buffer;
554+
SC_TRY(submitRequests(kernelEvents));
555+
SC_TRY(blockingPoll(syncMode, kernelEvents));
556+
return dispatchCompletions(syncMode, kernelEvents);
557+
}
558+
559+
SC::Result SC::AsyncEventLoop::Internal::submitRequests(AsyncKernelEvents& asyncKernelEvents)
560+
{
561+
KernelEvents kernelEvents(loop->internal.kernelQueue.get(), asyncKernelEvents);
562+
asyncKernelEvents.numberOfEvents = 0;
563+
// TODO: Check if it's possible to avoid zeroing kernel events memory
564+
memset(asyncKernelEvents.eventsMemory.data(), 0, asyncKernelEvents.eventsMemory.sizeInBytes());
537565
SC_LOG_MESSAGE("---------------\n");
538566

539567
while (AsyncRequest* async = submissions.dequeueFront())
@@ -545,6 +573,12 @@ SC::Result SC::AsyncEventLoop::Internal::runStep(SyncMode syncMode)
545573
}
546574
}
547575

576+
return SC::Result(true);
577+
}
578+
579+
SC::Result SC::AsyncEventLoop::Internal::blockingPoll(SyncMode syncMode, AsyncKernelEvents& asyncKernelEvents)
580+
{
581+
KernelEvents kernelEvents(loop->internal.kernelQueue.get(), asyncKernelEvents);
548582
if (getTotalNumberOfActiveHandle() <= 0 and numberOfManualCompletions == 0)
549583
{
550584
// happens when we do cancelAsync on the last active async for example
@@ -558,11 +592,27 @@ SC::Result SC::AsyncEventLoop::Internal::runStep(SyncMode syncMode)
558592
SC_TRY(kernelEvents.syncWithKernel(*loop, syncMode));
559593
SC_LOG_MESSAGE("Active Requests After Poll = {}\n", getTotalNumberOfActiveHandle());
560594
}
595+
return SC::Result(true);
596+
}
561597

562-
if (expiredTimer)
598+
SC::Result SC::AsyncEventLoop::Internal::dispatchCompletions(SyncMode syncMode, AsyncKernelEvents& asyncKernelEvents)
599+
{
600+
KernelEvents kernelEvents(loop->internal.kernelQueue.get(), asyncKernelEvents);
601+
switch (syncMode)
563602
{
564-
invokeExpiredTimers(*expiredTimer);
565-
expiredTimer = nullptr;
603+
case SyncMode::NoWait: {
604+
updateTime();
605+
invokeExpiredTimers(loopTime);
606+
}
607+
break;
608+
case SyncMode::ForcedForwardProgress: {
609+
if (expiredTimer)
610+
{
611+
invokeExpiredTimers(expiredTimer->expirationTime);
612+
expiredTimer = nullptr;
613+
}
614+
}
615+
break;
566616
}
567617
runStepExecuteCompletions(kernelEvents);
568618
runStepExecuteManualCompletions(kernelEvents);

Libraries/Async/Async.h

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ struct ThreadPool;
4545
struct ThreadPoolTask;
4646

4747
struct EventObject;
48+
struct AsyncKernelEvents;
4849
struct AsyncEventLoop;
4950

5051
struct AsyncRequest;
@@ -848,6 +849,18 @@ struct AsyncFilePoll : public AsyncRequest
848849

849850
} // namespace SC
850851

852+
/// @brief Allows user to supply a block of memory that will store kernel I/O events retrieved from
853+
/// AsyncEventLoop::runOnce. Such events can then be later passed to AsyncEventLoop::dispatchCompletions.
854+
/// @see AsyncEventLoop::runOnce
855+
struct SC::AsyncKernelEvents
856+
{
857+
Span<uint8_t> eventsMemory; ///< User supplied block of memory used to store kernel I/O events
858+
859+
private:
860+
int numberOfEvents = 0;
861+
friend struct AsyncEventLoop;
862+
};
863+
851864
/// @brief Asynchronous I/O (files, sockets, timers, processes, fs events, threads wake-up) (see @ref library_async)
852865
/// AsyncEventLoop pushes all AsyncRequest derived classes to I/O queues in the OS.
853866
/// Basic lifetime for an event loop is:
@@ -876,25 +889,63 @@ struct SC::AsyncEventLoop
876889
/// Closes the event loop kernel object
877890
[[nodiscard]] Result close();
878891

879-
/// Blocks until there are no more active queued requests.
892+
/// Blocks until there are no more active queued requests, dispatching all completions.
880893
/// It's useful for applications where the eventLoop is the only (or the main) loop.
881894
/// One example could be a console based app doing socket IO or a web server.
882-
/// Waiting on requests blocks the current thread with 0% CPU utilization.
895+
/// Waiting on kernel events blocks the current thread with 0% CPU utilization.
896+
/// @see AsyncEventLoop::blockingPoll to integrate the loop with a GUI event loop
883897
[[nodiscard]] Result run();
884898

885-
/// Blocks until at least one request proceeds, ensuring forward progress.
886-
/// It's useful for applications where the eventLoop events needs to be interleaved with other work.
887-
/// For example one possible way of integrating with a UI event loop could be to schedule a recurrent timeout
888-
/// timer every 1/60 seconds where calling GUI event loop updates every 60 seconds, blocking for I/O for
889-
/// the remaining time. Waiting on requests blocks the current thread with 0% CPU utilization.
899+
/// Blocks until at least one request proceeds, ensuring forward progress, dispatching all completions.
900+
/// It's useful for application where it's needed to run some idle work after every IO event.
901+
/// Waiting on requests blocks the current thread with 0% CPU utilization.
902+
///
903+
/// This function is a shortcut invoking async event loop building blocks:
904+
/// - AsyncEventLoop::submitRequests
905+
/// - AsyncEventLoop::blockingPoll
906+
/// - AsyncEventLoop::dispatchCompletions
907+
/// @see AsyncEventLoop::blockingPoll for a description on how to integrate AsyncEventLoop with another event loop
890908
[[nodiscard]] Result runOnce();
891909

892-
/// Process active requests if they exist or returns immediately without blocking.
910+
/// Process active requests if any, dispatching their completions, or returns immediately without blocking.
893911
/// It's useful for game-like applications where the event loop runs every frame and one would like to check
894912
/// and dispatch its I/O callbacks in-between frames.
895913
/// This call allows poll-checking I/O without blocking.
914+
/// @see AsyncEventLoop::blockingPoll to integrate the loop with a GUI event loop
896915
[[nodiscard]] Result runNoWait();
897916

917+
/// Submits all queued async requests.
918+
/// An AsyncRequest becomes queued after user calls its specific AsyncRequest::start method.
919+
///
920+
/// @see AsyncEventLoop::blockingPoll for a description on how to integrate AsyncEventLoop with another event loop
921+
[[nodiscard]] Result submitRequests(AsyncKernelEvents& kernelEvents);
922+
923+
/// Blocks until at least one event happens, ensuring forward progress, without executing completions.
924+
/// It's one of the three building blocks of AsyncEventLoop::runOnce allowing co-operation of AsyncEventLoop
925+
/// within another event loop (for example a GUI event loop or another IO event loop).
926+
///
927+
/// One possible example of such integration with a GUI event loop could:
928+
///
929+
/// - Call AsyncEventLoop::submitRequests on the GUI thread to queue some requests
930+
/// - Call AsyncEventLoop::blockingPoll on a secondary thread, storying AsyncKernelEvents
931+
/// - Wake up the GUI event loop from the secondary thread after AsyncEventLoop::blockingPoll returns
932+
/// - Call AsyncEventLoop:dispatchCompletions on the GUI event loop to dispatch callbacks on GUI thread
933+
/// - Repeat all steps
934+
///
935+
/// Waiting on requests blocks the current thread with 0% CPU utilization.
936+
/// @param kernelEvents Mandatory parameter to store kernel IO events WITHOUT running their completions.
937+
/// In that case user is expected to run completions passing it to AsyncEventLoop::dispatchCompletions.
938+
/// @see AsyncEventLoop::submitRequests sends async requests to kernel before calling blockingPoll
939+
/// @see AsyncEventLoop::dispatchCompletions invokes callbacks associated with kernel events after blockingPoll
940+
[[nodiscard]] Result blockingPoll(AsyncKernelEvents& kernelEvents);
941+
942+
/// Invokes completions for the AsyncKernelEvents collected by a call to AsyncEventLoop::blockingPoll.
943+
/// This is typically done when user wants to pool for events on a thread (calling AsyncEventLoop::blockingPoll)
944+
/// and dispatch the callbacks on another thread (calling AsyncEventLoop::dispatchCompletions).
945+
/// The typical example would be integrating AsyncEventLoop with a GUI event loop.
946+
/// @see AsyncEventLoop::blockingPoll for a description on how to integrate AsyncEventLoop with another event loop
947+
[[nodiscard]] Result dispatchCompletions(AsyncKernelEvents& kernelEvents);
948+
898949
/// Wake up the event loop from a thread different than the one where run() is called (and potentially blocked).
899950
/// The parameter is an AsyncLoopWakeUp that must have been previously started (with AsyncLoopWakeUp::start).
900951
[[nodiscard]] Result wakeUpFromExternalThread(AsyncLoopWakeUp& wakeUp);

Libraries/Async/Internal/AsyncInternal.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ struct SC::AsyncEventLoop::Internal
9393
// Timers
9494
[[nodiscard]] AsyncLoopTimeout* findEarliestLoopTimeout() const;
9595

96-
void invokeExpiredTimers(AsyncLoopTimeout& timeout);
96+
void invokeExpiredTimers(Time::HighResolutionCounter currentTime);
9797
void updateTime();
9898

9999
[[nodiscard]] Result cancelAsync(AsyncRequest& async);
@@ -131,6 +131,10 @@ struct SC::AsyncEventLoop::Internal
131131

132132
[[nodiscard]] Result runStep(SyncMode syncMode);
133133

134+
[[nodiscard]] Result submitRequests(AsyncKernelEvents& kernelEvents);
135+
[[nodiscard]] Result blockingPoll(SyncMode syncMode, AsyncKernelEvents& kernelEvents);
136+
[[nodiscard]] Result dispatchCompletions(SyncMode syncMode, AsyncKernelEvents& kernelEvents);
137+
134138
void runStepExecuteCompletions(KernelEvents& kernelEvents);
135139
void runStepExecuteManualCompletions(KernelEvents& kernelEvents);
136140
void runStepExecuteManualThreadPoolCompletions(KernelEvents& kernelEvents);

Libraries/Async/Internal/AsyncLinux.inl

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ struct SC::AsyncEventLoop::Internal::KernelEvents
3636
bool isEpoll = true;
3737
AlignedStorage<16400> storage;
3838

39-
KernelEvents(KernelQueue& kernelQueue);
39+
KernelEvents(KernelQueue& kernelQueue, AsyncKernelEvents& asyncKernelEvents);
4040
~KernelEvents();
4141

4242
KernelEventsIoURing& getUring();
@@ -171,15 +171,20 @@ struct SC::AsyncEventLoop::Internal::KernelQueueIoURing
171171
struct SC::AsyncEventLoop::Internal::KernelEventsIoURing
172172
{
173173
private:
174-
static constexpr int totalNumEvents = 256;
175-
176-
int newEvents = 0;
177-
io_uring_cqe events[totalNumEvents];
174+
io_uring_cqe* events;
178175

179176
KernelEvents& parentKernelEvents;
180177

178+
int& newEvents;
179+
const int totalNumEvents;
180+
181181
public:
182-
KernelEventsIoURing(KernelEvents& kq) : parentKernelEvents(kq) {}
182+
KernelEventsIoURing(KernelEvents& kq, AsyncKernelEvents& kernelEvents)
183+
: parentKernelEvents(kq), newEvents(kernelEvents.numberOfEvents),
184+
totalNumEvents(static_cast<int>(kernelEvents.eventsMemory.sizeInBytes() / sizeof(events[0])))
185+
{
186+
events = reinterpret_cast<decltype(events)>(kernelEvents.eventsMemory.data());
187+
}
183188

184189
[[nodiscard]] AsyncRequest* getAsyncRequest(uint32_t idx)
185190
{
@@ -634,11 +639,11 @@ SC::Result SC::AsyncEventLoop::Internal::KernelQueue::wakeUpFromExternalThread()
634639
// AsyncEventLoop::Internal::KernelEvents
635640
//----------------------------------------------------------------------------------------
636641

637-
SC::AsyncEventLoop::Internal::KernelEvents::KernelEvents(KernelQueue& kernelQueue)
642+
SC::AsyncEventLoop::Internal::KernelEvents::KernelEvents(KernelQueue& kernelQueue, AsyncKernelEvents& asyncKernelEvents)
638643
{
639644
isEpoll = kernelQueue.isEpoll;
640-
isEpoll ? placementNew(storage.reinterpret_as<KernelEventsPosix>(), *this)
641-
: placementNew(storage.reinterpret_as<KernelEventsIoURing>(), *this);
645+
isEpoll ? placementNew(storage.reinterpret_as<KernelEventsPosix>(), *this, asyncKernelEvents)
646+
: placementNew(storage.reinterpret_as<KernelEventsIoURing>(), *this, asyncKernelEvents);
642647
}
643648

644649
SC::AsyncEventLoop::Internal::KernelEvents::~KernelEvents()

Libraries/Async/Internal/AsyncPosix.inl

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -281,22 +281,30 @@ struct SC::AsyncEventLoop::Internal::KernelQueuePosix
281281
struct SC::AsyncEventLoop::Internal::KernelEventsPosix
282282
{
283283
private:
284-
static constexpr int totalNumEvents = 1024;
285-
286284
#if SC_ASYNC_USE_EPOLL
287-
epoll_event events[totalNumEvents];
285+
epoll_event* events;
288286
#else
289-
struct kevent events[totalNumEvents];
287+
struct kevent* events;
290288
#endif
291-
int newEvents = 0;
292-
293289
KernelEvents& parentKernelEvents;
294290

291+
int& newEvents;
292+
const int totalNumEvents;
293+
295294
public:
296-
KernelEventsPosix(KernelEvents& kq) : parentKernelEvents(kq) { memset(events, 0, sizeof(events)); }
297295
#if SC_PLATFORM_APPLE
298-
KernelEventsPosix(KernelQueuePosix&) : parentKernelEvents(*this) { memset(events, 0, sizeof(events)); }
296+
KernelEventsPosix(KernelQueue&, AsyncKernelEvents& kernelEvents)
297+
: parentKernelEvents(*this),
298+
#else
299+
KernelEventsPosix(KernelEvents& ke, AsyncKernelEvents& kernelEvents)
300+
: parentKernelEvents(ke),
299301
#endif
302+
newEvents(kernelEvents.numberOfEvents),
303+
totalNumEvents(static_cast<int>(kernelEvents.eventsMemory.sizeInBytes() / sizeof(events[0])))
304+
{
305+
events = reinterpret_cast<decltype(events)>(kernelEvents.eventsMemory.data());
306+
}
307+
300308
uint32_t getNumEvents() const { return static_cast<uint32_t>(newEvents); }
301309

302310
[[nodiscard]] AsyncRequest* getAsyncRequest(uint32_t idx) const

Libraries/Async/Internal/AsyncWindows.inl

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,18 @@ struct SC::AsyncEventLoop::Internal::KernelQueue
184184

185185
struct SC::AsyncEventLoop::Internal::KernelEvents
186186
{
187-
static constexpr int totalNumEvents = 128;
188187

189-
OVERLAPPED_ENTRY events[totalNumEvents];
190-
ULONG newEvents = 0;
188+
OVERLAPPED_ENTRY* events;
191189

192-
KernelEvents(KernelQueue&) { ::memset(events, 0, totalNumEvents * sizeof(events[0])); }
190+
int& newEvents;
191+
const int totalNumEvents = 0;
192+
193+
KernelEvents(KernelQueue&, AsyncKernelEvents& kernelEvents)
194+
: newEvents(kernelEvents.numberOfEvents),
195+
totalNumEvents(static_cast<int>(kernelEvents.eventsMemory.sizeInBytes() / sizeof(events[0])))
196+
{
197+
events = reinterpret_cast<decltype(events)>(kernelEvents.eventsMemory.data());
198+
}
193199

194200
uint32_t getNumEvents() const { return static_cast<uint32_t>(newEvents); }
195201

@@ -230,7 +236,9 @@ struct SC::AsyncEventLoop::Internal::KernelEvents
230236
}
231237
const DWORD ms =
232238
nextTimer or syncMode == Internal::SyncMode::NoWait ? static_cast<ULONG>(timeout.ms) : INFINITE;
233-
const BOOL res = ::GetQueuedCompletionStatusEx(loopFd, events, totalNumEvents, &newEvents, ms, FALSE);
239+
ULONG ulongEvents = static_cast<ULONG>(newEvents);
240+
const BOOL res = ::GetQueuedCompletionStatusEx(loopFd, events, totalNumEvents, &ulongEvents, ms, FALSE);
241+
newEvents = static_cast<int>(ulongEvents);
234242
if (res == FALSE)
235243
{
236244
if (::GetLastError() == WAIT_TIMEOUT)

0 commit comments

Comments
 (0)