Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 230 additions & 0 deletions src/workerd/io/actor-sqlite-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ static constexpr kj::Date twoMs = 2 * kj::MILLISECONDS + kj::UNIX_EPOCH;
static constexpr kj::Date threeMs = 3 * kj::MILLISECONDS + kj::UNIX_EPOCH;
static constexpr kj::Date fourMs = 4 * kj::MILLISECONDS + kj::UNIX_EPOCH;
static constexpr kj::Date fiveMs = 5 * kj::MILLISECONDS + kj::UNIX_EPOCH;
static constexpr kj::Date sixMs = 6 * kj::MILLISECONDS + kj::UNIX_EPOCH;
static constexpr kj::Date tenMs = 10 * kj::MILLISECONDS + kj::UNIX_EPOCH;
// Used as the "current time" parameter for armAlarmHandler in tests.
// Set to epoch (before all test alarm times) so existing tests aren't affected by
// the overdue alarm check.
Expand Down Expand Up @@ -73,6 +75,9 @@ struct ActorSqliteTest final {

kj::Promise<void> scheduleRun(
kj::Maybe<kj::Date> newAlarmTime, kj::Promise<void> priorTask) override {
KJ_IF_SOME(h, parent.scheduleRunWithPriorHandler) {
return h(newAlarmTime, kj::mv(priorTask));
}
KJ_IF_SOME(h, parent.scheduleRunHandler) {
return h(newAlarmTime);
}
Expand All @@ -87,6 +92,8 @@ struct ActorSqliteTest final {
ActorSqliteTest& parent;
};
kj::Maybe<kj::Function<kj::Promise<void>(kj::Maybe<kj::Date>)>> scheduleRunHandler;
kj::Maybe<kj::Function<kj::Promise<void>(kj::Maybe<kj::Date>, kj::Promise<void>)>>
scheduleRunWithPriorHandler;
ActorSqliteTestHooks hooks = ActorSqliteTestHooks(*this);

ActorSqlite actor;
Expand Down Expand Up @@ -1138,6 +1145,229 @@ KJ_TEST("rejected move-later alarm scheduling request does not break gate") {
test.pollAndExpectCalls({"commit"})[0]->fulfill();
}

KJ_TEST("rapid move-later alarm changes coalesce into bounded scheduleRun calls") {
// When many commits each move the alarm time later while a scheduleRun is already in-flight,
// the scheduleLaterAlarm mechanism should coalesce them into at most one pending request,
// rather than chaining N promises (one per commit).
ActorSqliteTest test;

// Initialize alarm state to 1ms.
test.setAlarm(oneMs);
test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({});
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

// Move alarm to 2ms. The db commit completes, triggering a post-commit scheduleRun(2ms)
// since the alarm moved later.
test.setAlarm(twoMs);
test.pollAndExpectCalls({"commit"})[0]->fulfill();
KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
// The first move-later scheduleRun starts.
auto fulfiller2Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]);

// While 2ms scheduleRun is in-flight, move alarm to 3ms, 4ms, 5ms in rapid succession.
// Each commit completes immediately but the scheduleRun for 2ms is still pending.
// Only the final value (5ms) should be scheduled after the 2ms scheduleRun completes.
test.setAlarm(threeMs);
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({}); // No new scheduleRun -- coalesced into pending.
KJ_ASSERT(expectSync(test.getAlarm()) == threeMs);

test.setAlarm(fourMs);
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({}); // No new scheduleRun -- coalesced into pending.
KJ_ASSERT(expectSync(test.getAlarm()) == fourMs);

test.setAlarm(fiveMs);
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({}); // No new scheduleRun -- coalesced into pending.
KJ_ASSERT(expectSync(test.getAlarm()) == fiveMs);

// Now fulfill the 2ms scheduleRun. The coalesced pending time (5ms) should be scheduled next.
fulfiller2Ms->fulfill();
auto fulfiller5Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(5ms)"})[0]);
// Importantly, there is exactly one scheduleRun(5ms), not three separate calls for 3ms, 4ms, 5ms.

fulfiller5Ms->fulfill();
test.pollAndExpectCalls({});

KJ_ASSERT(expectSync(test.getAlarm()) == fiveMs);
}

KJ_TEST("armAlarmHandler with coalesced pending alarms schedules reschedule exactly once") {
// Verifies two properties:
// 1. No duplicate scheduleRun(6ms): armAlarmHandler clears pendingLaterAlarmTime so the
// FORK_A completion handler does not re-issue it.
// 2. Future commits (10ms) that arrive after armAlarmHandler fires are correctly handled:
// they queue in pendingLaterAlarmTime, get picked up by FORK_A's completion handler,
// and chain off FORK_B (armAlarmHandler's fork) so the order is 3ms -> 6ms -> 10ms.
ActorSqliteTest test;

// Initialize alarm to 1ms and fully commit it so lastConfirmedAlarmDbState = 1ms.
test.setAlarm(oneMs);
test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({});
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

// Move alarm to 3ms -- scheduleRun(3ms) goes in-flight via scheduleLaterAlarm.
// alarmLaterIsInFlight=true, alarmLaterInFlight=FORK_A.
test.setAlarm(threeMs);
test.pollAndExpectCalls({"commit"})[0]->fulfill();
auto fulfiller3Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(3ms)"})[0]);

// While 3ms scheduleRun is in-flight, rapidly move to 4ms then 6ms.
// Both coalesce into pendingLaterAlarmTime=6ms; no new scheduleRun issued.
test.setAlarm(fourMs);
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({});

test.setAlarm(sixMs);
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({});
KJ_ASSERT(expectSync(test.getAlarm()) == sixMs);

// The 1ms alarm fires. armAlarmHandler sees scheduledTime=1ms, localAlarmState=6ms.
// willFireEarlier(1ms, 6ms) => reschedule-later path:
// requestScheduledAlarm(6ms, FORK_A.addBranch()) called synchronously -> FORK_B
// pendingLaterAlarmTime cleared to kj::none
// alarmLaterInFlight = FORK_B
// alarmLaterIsInFlight unchanged (still true, owned by FORK_A lifecycle)
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
KJ_ASSERT(armResult.is<ActorSqlite::CancelAlarmHandler>());
auto& cancelResult = armResult.get<ActorSqlite::CancelAlarmHandler>();

// scheduleRun(6ms) issued exactly once -- synchronously inside armAlarmHandler.
auto fulfiller6Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(6ms)"})[0]);

// Commit for 10ms arrives while scheduleRun(3ms) is still in-flight.
// alarmLaterIsInFlight=true (FORK_A lifecycle still active) so 10ms is correctly
// queued: pendingLaterAlarmTime=Some(10ms). FORK_B is referenced by alarmLaterInFlight.
test.setAlarm(tenMs);
test.pollAndExpectCalls({"commit"})[0]->fulfill();
test.pollAndExpectCalls({}); // No scheduleRun yet -- coalesced into pending.

// Fulfill scheduleRun(3ms). FORK_A resolves. FORK_A completion handler fires:
// alarmLaterIsInFlight=false
// pendingLaterAlarmTime=Some(10ms) -> scheduleLaterAlarm(10ms)
// requestScheduledAlarm(10ms, FORK_B.addBranch()) -> FORK_C chains off FORK_B
// scheduleRun(10ms) issued synchronously
// Importantly: scheduleRun(6ms) is NOT issued again here -- pendingLaterAlarmTime
// held 10ms (not 6ms), because armAlarmHandler had already cleared the 6ms.
fulfiller3Ms->fulfill();
auto fulfiller10Ms = kj::mv(test.pollAndExpectCalls({"scheduleRun(10ms)"})[0]);

// Fulfill scheduleRun(6ms). FORK_B resolves cleanly -- it has no completion handler,
// so overwriting alarmLaterInFlight with FORK_C is safe: FORK_C captured a branch of
// FORK_B as priorTask before the field was overwritten, keeping FORK_B alive. FORK_B
// resolving propagates into FORK_C's priorTask silently. No new scheduleRun here.
fulfiller6Ms->fulfill();
KJ_ASSERT(cancelResult.waitBeforeCancel.poll(test.ws));
test.pollAndExpectCalls({});

// Fulfill scheduleRun(10ms). FORK_C resolves, its completion handler fires with no
// pending times. Done.
fulfiller10Ms->fulfill();
test.pollAndExpectCalls({});
KJ_ASSERT(expectSync(test.getAlarm()) == tenMs);
}

KJ_TEST("coalesced move-later followed by move-earlier does not race") {
// Regression test for a race condition where a coalesced pendingLaterAlarmTime could
// be drained concurrently with a move-earlier scheduleRun. The fix is that
// startPrecommitAlarmScheduling() clears pendingLaterAlarmTime when setting up a
// move-earlier, so the completion handler finds nothing to drain.
//
// Scenario: alarm at 1ms -> move to 5ms (later, in-flight) -> move to 10ms (coalesced)
// -> move to 2ms (earlier). Without the fix, after the 5ms RPC completes, both
// scheduleRun(10ms) and scheduleRun(2ms) would fire concurrently. With the fix,
// only scheduleRun(2ms) fires because the coalesced 10ms was cleared.
ActorSqliteTest test;

uint activeRpcs = 0;
uint maxConcurrentRpcs = 0;

// Custom handler that respects priorTask ordering like the real alarm manager.
// The real alarm manager awaits priorTask before sending its RPC; we replicate
// that here and track concurrent calls.
test.scheduleRunWithPriorHandler = [&](kj::Maybe<kj::Date> newAlarmTime,
kj::Promise<void> priorTask) -> kj::Promise<void> {
return priorTask.then([&, newAlarmTime]() mutable -> kj::Promise<void> {
activeRpcs++;
maxConcurrentRpcs = kj::max(maxConcurrentRpcs, activeRpcs);
auto desc = newAlarmTime.map([](auto& t) {
return kj::str("scheduleRun(", t, ")");
}).orDefault(kj::str("scheduleRun(none)"));
auto [promise, fulfiller] = kj::newPromiseAndFulfiller<void>();
test.calls.add(ActorSqliteTest::Call{kj::mv(desc), kj::mv(fulfiller)});
return promise.then([&]() { activeRpcs--; });
});
};

// Poll event loop until at least `count` calls accumulate. With the
// priorTask-respecting handler, calls take extra event loop turns to appear.
auto drainCalls = [&](std::initializer_list<kj::StringPtr> expected, kj::StringPtr msg = ""_kj) {
size_t need = expected.size();
for (int i = 0; i < 100; i++) {
test.ws.poll();
if (need == 0 && i >= 10) break;
if (need > 0 && test.calls.size() >= need) break;
}
auto callDescs = KJ_MAP(c, test.calls) { return kj::str(c.desc); };
KJ_ASSERT(callDescs == kj::heapArray(expected), msg);
auto fulfillers = KJ_MAP(c, test.calls) { return kj::mv(c.fulfiller); };
test.calls.clear();
return kj::mv(fulfillers);
};

// 1. Initialize alarm state to 1ms.
test.setAlarm(oneMs);
drainCalls({"scheduleRun(1ms)"})[0]->fulfill();
drainCalls({"commit"})[0]->fulfill();
drainCalls({});
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);

// 2. Move alarm to 5ms (later). The db commit completes, then scheduleRun(5ms)
// fires post-commit via scheduleLaterAlarm. Hold the fulfiller to keep it in-flight.
test.setAlarm(fiveMs);
drainCalls({"commit"})[0]->fulfill();
auto fulfiller5Ms = kj::mv(drainCalls({"scheduleRun(5ms)"})[0]);

// 3. While scheduleRun(5ms) is in-flight, move alarm to 10ms (later).
// Since alarmLaterIsInFlight is true, 10ms is coalesced into pendingLaterAlarmTime.
test.setAlarm(tenMs);
drainCalls({"commit"})[0]->fulfill();
drainCalls({}); // No scheduleRun -- coalesced into pending.

// 4. Move alarm earlier to 2ms. startPrecommitAlarmScheduling() clears
// pendingLaterAlarmTime and calls requestScheduledAlarm(2ms, FORK_5.addBranch()).
// The priorTask-respecting handler blocks until FORK_5 resolves.
test.setAlarm(twoMs);
drainCalls({}); // scheduleRun(2ms) blocked on priorTask.

// 5. Fulfill scheduleRun(5ms). FORK_5 resolves:
// - Completion handler: pendingLaterAlarmTime was cleared -> no drain, no-op.
// - Move-earlier priorTask resolves -> scheduleRun(2ms) fires.
fulfiller5Ms->fulfill();
auto fulfiller2Ms = kj::mv(drainCalls(
{"scheduleRun(2ms)"}, "expected only scheduleRun(2ms), no concurrent scheduleRun(10ms)")[0]);

// Verify no concurrent RPCs occurred.
KJ_ASSERT(maxConcurrentRpcs <= 1,
"scheduleRun RPCs were sent concurrently -- "
"the coalesced move-later raced with the move-earlier");

// 6. Complete the move-earlier and its commit.
fulfiller2Ms->fulfill();
drainCalls({"commit"})[0]->fulfill();

// Let any remaining completion handlers settle.
for (int i = 0; i < 20; i++) test.ws.poll();

KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
}

KJ_TEST("an exception thrown during merged commits does not hang") {
ActorSqliteTest test({.monitorOutputGate = false});

Expand Down
Loading
Loading