Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions include/silk/fibers/fiber.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ static constexpr uint64_t FIBER_PARAMETERS_SIZE = 64;
*/
using FiberMain = int(void * parameters) noexcept;

/**
* Destructor for the fiber's parameter buffer. Called after the entry point returns.
* Null for trivially destructible parameter types.
*/
using ParametersDtor = void(void * parameters) noexcept;

/**
* Cooperative fiber scheduler with per-CPU threads, async IO, and work-stealing.
*/
Expand Down Expand Up @@ -57,7 +63,8 @@ class FiberScheduler
[[nodiscard]] static int run(int (*fiberMain)(T *) noexcept, T && parameters, FiberFuture * future) noexcept
{
static_assert(sizeof(T) <= FIBER_PARAMETERS_SIZE);
Fiber * fiber = allocateFiber(reinterpret_cast<FiberMain *>(fiberMain), future);
Fiber * fiber = allocateFiber(
reinterpret_cast<FiberMain *>(fiberMain), std::is_trivially_destructible_v<T> ? nullptr : destroyParameters<T>, future);
if (fiber)
{
std::construct_at(static_cast<T *>(getFiberParameters(fiber)), std::forward<T>(parameters));
Expand Down Expand Up @@ -355,9 +362,15 @@ class FiberScheduler
// Helpers.
//

template <typename T>
static void destroyParameters(void * p) noexcept
{
std::destroy_at(static_cast<T *>(p));
}

static void buildStealCandidates() noexcept;
static void * getFiberParameters(Fiber * fiber) noexcept;
static Fiber * allocateFiber(FiberMain * fiberMain, FiberFuture * future) noexcept;
static Fiber * allocateFiber(FiberMain * fiberMain, ParametersDtor * parametersDtor, FiberFuture * future) noexcept;
static void freeFiber(Fiber * fiber) noexcept;
static void enqueueReady(Fiber * fiber) noexcept;
static void yieldSuspendCallback(Fiber * fiber, void * context) noexcept;
Expand Down
19 changes: 14 additions & 5 deletions src/fibers/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class Fiber
Fiber(bool isProxyFiber = false) noexcept;
~Fiber() noexcept;

bool initialize(FiberMain * fiberMain, FiberFuture * waitingFuture) noexcept;
bool initialize(FiberMain * fiberMain, ParametersDtor * parametersDtor, FiberFuture * waitingFuture) noexcept;
void deinitialize() noexcept;

void switchToFiberContext() noexcept;
Expand Down Expand Up @@ -193,8 +193,11 @@ class Fiber
// dummy is stored here so it can be reused.
QueueBase::QueueNode queueNode;

// Entry point and parameters buffer.
// Entry point, parameters buffer, and optional parameters destructor.
// parametersDtor is set by run<T> for non-trivially-destructible T and
// called by fiberContextMain immediately after fiberMain returns.
FiberMain * fiberMain = nullptr;
ParametersDtor * parametersDtor = nullptr;
uint8_t parameters[FIBER_PARAMETERS_SIZE];

#if defined(__SANITIZE_ADDRESS__)
Expand Down Expand Up @@ -250,7 +253,7 @@ Fiber::~Fiber() noexcept
}
}

bool Fiber::initialize(FiberMain * fiberMain_, FiberFuture * waitingFuture_) noexcept
bool Fiber::initialize(FiberMain * fiberMain_, ParametersDtor * parametersDtor_, FiberFuture * waitingFuture_) noexcept
{
state.store(FiberState::SUSPENDED, std::memory_order_relaxed);

Expand Down Expand Up @@ -289,6 +292,7 @@ bool Fiber::initialize(FiberMain * fiberMain_, FiberFuture * waitingFuture_) noe
#endif

fiberMain = fiberMain_;
parametersDtor = parametersDtor_;
fiberContext = boost::context::detail::make_fcontext(
static_cast<uint8_t *>(stack) + PAGE_SIZE + FIBER_STACK_SIZE, FIBER_STACK_SIZE, fiberContextMain);

Expand All @@ -299,6 +303,11 @@ void Fiber::deinitialize() noexcept
{
ASSERT(!suspendedEntry.is_linked());

if (parametersDtor)
{
parametersDtor(parameters);
}

#if defined(__SANITIZE_THREAD__)
TSAN_FIBER_DESTROY(tsanFiber);
tsanFiber = nullptr;
Expand Down Expand Up @@ -1034,12 +1043,12 @@ void * FiberScheduler::getFiberParameters(Fiber * fiber) noexcept
return fiber->parameters;
}

Fiber * FiberScheduler::allocateFiber(FiberMain * fiberMain, FiberFuture * future) noexcept
Fiber * FiberScheduler::allocateFiber(FiberMain * fiberMain, ParametersDtor * parametersDtor, FiberFuture * future) noexcept
{
Fiber * fiber = scheduler->fiberPool.allocate();
if (fiber)
{
if (fiber->initialize(fiberMain, future))
if (fiber->initialize(fiberMain, parametersDtor, future))
{
Perf::getSimpleCounter(simpleCounters[FIBER_STARTED]).increment();
return fiber;
Expand Down