Skip to content

Commit

Permalink
Async: Refactoring AsyncTask to embed ThreadPoolTask
Browse files Browse the repository at this point in the history
This makes the API more pleasant as there's no need to track a separate ThreadPoolTask for each AsyncTask.
Also the api was using references for building the object and it was making it hard to create arrays / vectors of such objects.
  • Loading branch information
Pagghiu committed Apr 13, 2024
1 parent 588ef54 commit 79fdec0
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 118 deletions.
76 changes: 49 additions & 27 deletions Libraries/Async/Async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,15 @@ void SC::AsyncRequest::markAsFree()
flags = 0;
}

SC::Result SC::AsyncRequest::queueSubmission(AsyncEventLoop& loop, AsyncTask* task)
SC::Result SC::AsyncRequest::queueSubmission(AsyncEventLoop& loop)
{
return loop.privateSelf.queueSubmission(*this, task);
return loop.privateSelf.queueSubmission(*this, nullptr);
}

SC::Result SC::AsyncRequest::queueSubmission(AsyncEventLoop& loop, ThreadPool& threadPool, AsyncTask& task)
{
task.threadPool = &threadPool;
return loop.privateSelf.queueSubmission(*this, &task);
}

void SC::AsyncRequest::updateTime(AsyncEventLoop& loop) { loop.privateSelf.updateTime(); }
Expand All @@ -101,14 +107,14 @@ SC::Result SC::AsyncLoopTimeout::start(AsyncEventLoop& loop, Time::Milliseconds
updateTime(loop);
expirationTime = loop.getLoopTime().offsetBy(expiration);
timeout = expiration;
SC_TRY(queueSubmission(loop, nullptr));
SC_TRY(queueSubmission(loop));
return SC::Result(true);
}

SC::Result SC::AsyncLoopWakeUp::start(AsyncEventLoop& loop, EventObject* eo)
{
eventObject = eo;
SC_TRY(queueSubmission(loop, nullptr));
SC_TRY(queueSubmission(loop));
return SC::Result(true);
}

Expand All @@ -117,7 +123,7 @@ SC::Result SC::AsyncLoopWakeUp::wakeUp() { return getEventLoop()->wakeUpFromExte
SC::Result SC::AsyncProcessExit::start(AsyncEventLoop& loop, ProcessDescriptor::Handle process)
{
handle = process;
SC_TRY(queueSubmission(loop, nullptr));
SC_TRY(queueSubmission(loop));
return SC::Result(true);
}

Expand All @@ -126,7 +132,7 @@ SC::Result SC::AsyncSocketAccept::start(AsyncEventLoop& loop, const SocketDescri
SC_TRY(validateAsync());
SC_TRY(socketDescriptor.get(handle, SC::Result::Error("Invalid handle")));
SC_TRY(socketDescriptor.getAddressFamily(addressFamily));
SC_TRY(queueSubmission(loop, nullptr));
SC_TRY(queueSubmission(loop));
return SC::Result(true);
}

Expand All @@ -136,7 +142,7 @@ SC::Result SC::AsyncSocketConnect::start(AsyncEventLoop& loop, const SocketDescr
SC_TRY(validateAsync());
SC_TRY(socketDescriptor.get(handle, SC::Result::Error("Invalid handle")));
ipAddress = socketIpAddress;
SC_TRY(queueSubmission(loop, nullptr));
SC_TRY(queueSubmission(loop));
return SC::Result(true);
}

Expand All @@ -147,7 +153,7 @@ SC::Result SC::AsyncSocketSend::start(AsyncEventLoop& loop, const SocketDescript
SC_TRY(validateAsync());
SC_TRY(socketDescriptor.get(handle, SC::Result::Error("Invalid handle")));
buffer = dataToSend;
SC_TRY(queueSubmission(loop, nullptr));
SC_TRY(queueSubmission(loop));
return SC::Result(true);
}

Expand All @@ -157,51 +163,67 @@ SC::Result SC::AsyncSocketReceive::start(AsyncEventLoop& loop, const SocketDescr
SC_TRY(validateAsync());
SC_TRY(socketDescriptor.get(handle, SC::Result::Error("Invalid handle")));
buffer = receiveData;
SC_TRY(queueSubmission(loop, nullptr));
SC_TRY(queueSubmission(loop));
return SC::Result(true);
}

SC::Result SC::AsyncSocketClose::start(AsyncEventLoop& loop, const SocketDescriptor& socketDescriptor)
{
SC_TRY(validateAsync());
SC_TRY(socketDescriptor.get(handle, SC::Result::Error("Invalid handle")));
SC_TRY(queueSubmission(loop, nullptr));
SC_TRY(queueSubmission(loop));
return SC::Result(true);
}

SC::Result SC::AsyncFileRead::start(AsyncEventLoop& loop, FileDescriptor::Handle fd, Span<char> rb, Task* task)
SC::Result SC::AsyncFileRead::start(AsyncEventLoop& loop)
{
SC_TRY_MSG(rb.sizeInBytes() > 0, "AsyncEventLoop::startFileRead - Zero sized read buffer");
SC_TRY_MSG(buffer.sizeInBytes() > 0, "AsyncFileRead::start - Zero sized read buffer");
SC_TRY_MSG(fileDescriptor != FileDescriptor::Invalid, "AsyncFileRead::start - Invalid file descriptor");
SC_TRY(validateAsync());
fileDescriptor = fd;
buffer = rb;
SC_TRY(queueSubmission(loop, task));
SC_TRY(queueSubmission(loop));
return SC::Result(true);
}

SC::Result SC::AsyncFileWrite::start(AsyncEventLoop& loop, FileDescriptor::Handle fd, Span<const char> wb, Task* task)
SC::Result SC::AsyncFileRead::start(AsyncEventLoop& loop, ThreadPool& threadPool, Task& task)
{
SC_TRY_MSG(wb.sizeInBytes() > 0, "AsyncEventLoop::startFileWrite - Zero sized write buffer");
SC_TRY_MSG(buffer.sizeInBytes() > 0, "AsyncFileRead::start - Zero sized read buffer");
SC_TRY_MSG(fileDescriptor != FileDescriptor::Invalid, "AsyncFileRead::start - Invalid file descriptor");
SC_TRY(validateAsync());
fileDescriptor = fd;
buffer = wb;
SC_TRY(queueSubmission(loop, task));
SC_TRY(queueSubmission(loop, threadPool, task));
return SC::Result(true);
}

SC::Result SC::AsyncFileWrite::start(AsyncEventLoop& loop)
{
SC_TRY_MSG(buffer.sizeInBytes() > 0, "AsyncFileWrite::start - Zero sized write buffer");
SC_TRY_MSG(fileDescriptor != FileDescriptor::Invalid, "AsyncFileWrite::start - Invalid file descriptor");
SC_TRY(validateAsync());
SC_TRY(queueSubmission(loop));
return SC::Result(true);
}

SC::Result SC::AsyncFileWrite::start(AsyncEventLoop& loop, ThreadPool& threadPool, Task& task)
{
SC_TRY_MSG(buffer.sizeInBytes() > 0, "AsyncFileWrite::start - Zero sized write buffer");
SC_TRY_MSG(fileDescriptor != FileDescriptor::Invalid, "AsyncFileWrite::start - Invalid file descriptor");
SC_TRY(validateAsync());
SC_TRY(queueSubmission(loop, threadPool, task));
return SC::Result(true);
}

SC::Result SC::AsyncFileClose::start(AsyncEventLoop& loop, FileDescriptor::Handle fd)
{
SC_TRY(validateAsync());
fileDescriptor = fd;
SC_TRY(queueSubmission(loop, nullptr));
SC_TRY(queueSubmission(loop));
return SC::Result(true);
}

SC::Result SC::AsyncFilePoll::start(AsyncEventLoop& loop, FileDescriptor::Handle fd)
{
SC_TRY(validateAsync());
fileDescriptor = fd;
SC_TRY(queueSubmission(loop, nullptr));
SC_TRY(queueSubmission(loop));
return SC::Result(true);
}

Expand Down Expand Up @@ -313,7 +335,7 @@ SC::Result SC::AsyncEventLoop::Private::queueSubmission(AsyncRequest& async, Asy
async.eventLoop = eventLoop;
async.state = AsyncRequest::State::Setup;

// Only set the async tasks for operations and backends where it makes sense to do so
// Only set the async tasks for operations and backends that are not io_uring
if (task and eventLoop->internalSelf.makesSenseToRunInThreadPool(async))
{
async.asyncTask = task;
Expand Down Expand Up @@ -379,7 +401,7 @@ SC::Result SC::AsyncEventLoop::Private::waitForThreadPoolTasks(IntrusiveDoubleLi
{
if (it->asyncTask != nullptr)
{
if (not it->asyncTask->threadPool.waitForTask(it->asyncTask->task))
if (not it->asyncTask->threadPool->waitForTask(it->asyncTask->task))
{
res = Result::Error("Threadpool was already stopped");
}
Expand Down Expand Up @@ -631,7 +653,7 @@ struct SC::AsyncEventLoop::Private::ActivateAsyncPhase
{
AsyncTask* asyncTask = async.asyncTask;
asyncTask->task.function = [&async] { executeThreadPoolOperation(async); };
return asyncTask->threadPool.queueTask(asyncTask->task);
return asyncTask->threadPool->queueTask(asyncTask->task);
}

if (async.flags & Private::Flag_ManualCompletion)
Expand Down Expand Up @@ -664,7 +686,7 @@ struct SC::AsyncEventLoop::Private::CancelAsyncPhase
{
// Waiting here is not ideal but we need it to be able to reliably know that
// the task can be reused soon after cancelling an async that uses it.
SC_TRY(async.asyncTask->threadPool.waitForTask(async.asyncTask->task));
SC_TRY(async.asyncTask->threadPool->waitForTask(async.asyncTask->task));

// Prevent this async from going in the CompleteAsyncPhase and mark task as free
async.eventLoop->privateSelf.manualThreadPoolCompletions.remove(async);
Expand Down Expand Up @@ -721,7 +743,7 @@ struct SC::AsyncEventLoop::Private::CompleteAsyncPhase
result.returnCode = asyncTask->returnCode;
result.completionData = move(static_cast<AsyncCompletion&>(asyncTask->completionData));
// The task is already finished but we need waitForTask to make it available for next runs.
SC_TRY(asyncTask->threadPool.waitForTask(asyncTask->task));
SC_TRY(asyncTask->threadPool->waitForTask(asyncTask->task));
asyncTask->freeTask();
}
else
Expand Down

0 comments on commit 79fdec0

Please sign in to comment.