Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -908,9 +908,9 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::run(
// It'd be nicer if we could fall through to the code below for the non-compat-flag logic in
// this case, but we don't even know if the worker uses service worker syntax until after
// runProm resolves, so we just copy the bare essentials here.
auto result = co_await incomingRequest->finishScheduled();
bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;
outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU;
auto scheduledResult = co_await incomingRequest->finishScheduled();
bool completed = scheduledResult == EventOutcome::OK;
outcome = completed ? context.waitUntilStatus() : scheduledResult;
} else {
// We're responsible for calling drain() on the incomingRequest to ensure that waitUntil tasks
// can continue to run in the backgound for a while even after we return a result to the
Expand All @@ -932,8 +932,8 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::run(

// We reuse the finishScheduled() method for convenience, since queues use the same wall clock
// timeout as scheduled workers.
auto result = co_await incomingRequest->finishScheduled();
bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;
auto scheduledResult = co_await incomingRequest->finishScheduled();
bool completed = scheduledResult == EventOutcome::OK;

// Log some debug info if the request timed out or was aborted, to aid in debugging situations
// where consumer workers appear to get stuck and repeatedly take 15 minutes.
Expand Down Expand Up @@ -962,9 +962,11 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::run(
auto& ioContext = incomingRequest->getContext();
auto scriptId = ioContext.getWorker().getScript().getId();
auto tasks = ioContext.getWaitUntilTasks().trace();
if (result == IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT) {
// TODO
if (scheduledResult == EventOutcome::EXCEEDED_CPU) {
// if (result == IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT) {
Comment on lines +965 to +967
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] Leftover debug artifacts — the bare // TODO on line 965 and the commented-out old code on line 967 look like they shouldn't be merged. If there's an open question here, make the TODO actionable; otherwise please clean these up.

Suggested change
// TODO
if (scheduledResult == EventOutcome::EXCEEDED_CPU) {
// if (result == IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT) {
if (scheduledResult == EventOutcome::EXCEEDED_CPU) {

KJ_LOG(WARNING, "NOSENTRY queue event hit timeout", scriptId, status, tasks);
} else if (result == IoContext_IncomingRequest::FinishScheduledResult::ABORTED) {
} else if (scheduledResult == EventOutcome::EXCEPTION) {
// Attempt to grab the error message to understand the reason for the abort.
// Include a timeout just in case for some unexpected reason the onAbort promise hasn't
// already rejected.
Expand All @@ -980,7 +982,7 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEvent::run(
}

co_return WorkerInterface::CustomEvent::Result{
.outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU,
.outcome = completed ? context.waitUntilStatus() : scheduledResult,
};
}
}
Expand Down
48 changes: 42 additions & 6 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -564,8 +564,35 @@ kj::Promise<void> IoContext::IncomingRequest::drain() {
.exclusiveJoin(context->onAbort().catch_([](kj::Exception&&) {}));
}

kj::Promise<IoContext_IncomingRequest::FinishScheduledResult> IoContext::IncomingRequest::
finishScheduled() {
// TODO
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] Bare // TODO without explanation — if this is a reminder to assign real IDs, please make the TODO actionable (e.g. // TODO(felix): Assign unique DetailTypeId values; these are placeholders).

constexpr kj::Exception::DetailTypeId SCRIPT_KILLED_DETAIL_ID = 0x0ull;
constexpr kj::Exception::DetailTypeId INACTIVE_WEBSOCKETS_DETAIL_ID = 0x0ull;
constexpr kj::Exception::DetailTypeId SCRIPT_NOT_FOUND_DETAIL_ID = 0x0ull;
Comment on lines +568 to +570
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] These three DetailTypeId constants are all 0x0ull, meaning they're indistinguishable at runtime. getDetail() uses the ID as a key, so SCRIPT_KILLED_DETAIL_ID, INACTIVE_WEBSOCKETS_DETAIL_ID, and SCRIPT_NOT_FOUND_DETAIL_ID will all match the same detail entry. The first branch that checks any of these will match (or none will), making the subsequent branches dead code.

The existing IDs in the codebase all use unique random 64-bit values (e.g. CPU_LIMIT_DETAIL_ID = 0xfdcb787ba4240576ull). These need real, distinct IDs — or if the detail IDs don't exist yet on the exception-producing side, this dispatch logic can't work and should be restructured.

I understand the // TODO comment suggests this is WIP, but as-is this would silently misclassify outcomes if merged.


// TODO: Try to generalize this. Are all outcomes needed here?
EventOutcome outcomeFromException(kj::Exception& e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[LOW] This function doesn't modify the exception — consider taking const kj::Exception& instead of kj::Exception&.

Suggested change
EventOutcome outcomeFromException(kj::Exception& e) {
EventOutcome outcomeFromException(const kj::Exception& e) {

if (e.getType() == kj::Exception::Type::OVERLOADED) {
if (e.getDetail(SCRIPT_KILLED_DETAIL_ID) != kj::none) {
return EventOutcome::KILL_SWITCH;
} else if (e.getDetail(MEMORY_LIMIT_DETAIL_ID) != kj::none) {
return EventOutcome::EXCEEDED_MEMORY;
} else if (e.getDetail(CPU_LIMIT_DETAIL_ID) != kj::none) {
return EventOutcome::EXCEEDED_CPU;
} else if (e.getDetail(INACTIVE_WEBSOCKETS_DETAIL_ID) != kj::none) {
return EventOutcome::LOAD_SHED;
} else {
// TODO(later): We have many overloaded exceptions that can't be described as killSwitch,
// exceededCpu or exceededMemory. Should there be a new outcome type for them?
return EventOutcome::EXCEPTION;
}
} else if (e.getDetail(SCRIPT_NOT_FOUND_DETAIL_ID) != kj::none) {
return EventOutcome::SCRIPT_NOT_FOUND;
} else {
return EventOutcome::EXCEPTION;
}
}

kj::Promise<EventOutcome> IoContext::IncomingRequest::finishScheduled() {
// TODO(someday): In principle we should be able to support delivering the "scheduled" event type
// to an actor, and this may be important if we open up the whole of WorkerInterface to be
// callable from any stub. However, the logic around async tasks would have to be different. We
Expand All @@ -580,13 +607,22 @@ kj::Promise<IoContext_IncomingRequest::FinishScheduledResult> IoContext::Incomin
context->incomingRequests.front().waitedForWaitUntil = true;

auto timeoutPromise = context->limitEnforcer->limitScheduled().then(
[] { return IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT; });
// TODO: I think there is a material difference between the scheduled limit being hit and
// actually hitting the CPU limit, we emit a NOSENTRY log for this case for queue events. How
// do we handle this best?
[] { return EventOutcome::EXCEEDED_CPU; });
return context->waitUntilTasks.onEmpty()
.then([]() { return IoContext_IncomingRequest::FinishScheduledResult::COMPLETED; })
.then([]() { return EventOutcome::OK; })
.exclusiveJoin(kj::mv(timeoutPromise))
.exclusiveJoin(context->onAbort().then([] {
return IoContext_IncomingRequest::FinishScheduledResult::ABORTED;
}, [](kj::Exception&&) { return IoContext_IncomingRequest::FinishScheduledResult::ABORTED; }));
// abortFulfiller should only ever be rejected instead of being fulfilled, but return an
// exception outcome if it does happen
return EventOutcome::EXCEPTION;
}, [](kj::Exception&& e) {
// TODO: If we end up returning exception here, that's still a case that's hard to make sense of
return outcomeFromException(e);
// TODO: Just call reportFailure() here?
}));
}

class IoContext::PendingEvent: public kj::Refcounted {
Expand Down
3 changes: 1 addition & 2 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ class IoContext_IncomingRequest final {
// This method is also used by some custom event handlers (see WorkerInterface::CustomEvent) that
// need similar behavior, as well as the test handler. TODO(cleanup): Rename to something more
// generic?
enum class FinishScheduledResult { COMPLETED, ABORTED, TIMEOUT };
kj::Promise<FinishScheduledResult> finishScheduled();
kj::Promise<EventOutcome> finishScheduled();

// Access the event loop's current time point. This will remain constant between ticks. This is
// used to implement IoContext::now(), which should be preferred so that time can be adjusted
Expand Down
14 changes: 7 additions & 7 deletions src/workerd/io/worker-entrypoint.c++
Original file line number Diff line number Diff line change
Expand Up @@ -718,10 +718,10 @@ kj::Promise<WorkerInterface::ScheduledResult> WorkerEntrypoint::runScheduled(
kj::Own<IoContext::IncomingRequest> request)
-> kj::Promise<WorkerInterface::ScheduledResult> {
TRACE_EVENT("workerd", "WorkerEntrypoint::runScheduled() waitForFinished()");
auto result = co_await request->finishScheduled();
bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;
auto scheduledResult = co_await request->finishScheduled();
bool completed = scheduledResult == EventOutcome::OK;
co_return WorkerInterface::ScheduledResult{.retry = context.shouldRetryScheduled(),
.outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU};
.outcome = completed ? context.waitUntilStatus() : scheduledResult};
};

auto promise = waitForFinished(context, kj::mv(incomingRequest));
Expand Down Expand Up @@ -893,9 +893,9 @@ kj::Promise<bool> WorkerEntrypoint::test() {
static auto constexpr waitForFinished =
[](IoContext& context, kj::Own<IoContext::IncomingRequest> request) -> kj::Promise<bool> {
TRACE_EVENT("workerd", "WorkerEntrypoint::test() waitForFinished()");
auto result = co_await request->finishScheduled();
auto scheduledResult = co_await request->finishScheduled();

if (result == IoContext_IncomingRequest::FinishScheduledResult::ABORTED) {
if (scheduledResult == EventOutcome::EXCEPTION) {
// If the test handler throws an exception (without aborting - just a regular exception),
// then `outcome` ends up being EventOutcome::EXCEPTION, which causes us to return false.
// But in that case we are separately relying on the exception being logged as an uncaught
Expand All @@ -913,8 +913,8 @@ kj::Promise<bool> WorkerEntrypoint::test() {
// (enough so that we can get logs/spans from them in wd-tests), so this is not needed in
// practice.

bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED;
auto outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU;
bool completed = scheduledResult == EventOutcome::OK;
auto outcome = completed ? context.waitUntilStatus() : scheduledResult;
co_return outcome == EventOutcome::OK;
};

Expand Down
Loading