diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index af72138b439..d691ba4529f 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -908,9 +908,9 @@ kj::Promise QueueCustomEvent::run( // It'd be nicer if we could fall through to the code below for the non-compat-flag logic in // this case, but we don't even know if the worker uses service worker syntax until after // runProm resolves, so we just copy the bare essentials here. - auto result = co_await incomingRequest->finishScheduled(); - bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED; - outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU; + auto scheduledResult = co_await incomingRequest->finishScheduled(); + bool completed = scheduledResult == EventOutcome::OK; + outcome = completed ? context.waitUntilStatus() : scheduledResult; } else { // We're responsible for calling drain() on the incomingRequest to ensure that waitUntil tasks // can continue to run in the backgound for a while even after we return a result to the @@ -932,8 +932,8 @@ kj::Promise QueueCustomEvent::run( // We reuse the finishScheduled() method for convenience, since queues use the same wall clock // timeout as scheduled workers. - auto result = co_await incomingRequest->finishScheduled(); - bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED; + auto scheduledResult = co_await incomingRequest->finishScheduled(); + bool completed = scheduledResult == EventOutcome::OK; // Log some debug info if the request timed out or was aborted, to aid in debugging situations // where consumer workers appear to get stuck and repeatedly take 15 minutes. @@ -962,9 +962,11 @@ kj::Promise QueueCustomEvent::run( auto& ioContext = incomingRequest->getContext(); auto scriptId = ioContext.getWorker().getScript().getId(); auto tasks = ioContext.getWaitUntilTasks().trace(); - if (result == IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT) { + // TODO + if (scheduledResult == EventOutcome::EXCEEDED_CPU) { + // if (result == IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT) { KJ_LOG(WARNING, "NOSENTRY queue event hit timeout", scriptId, status, tasks); - } else if (result == IoContext_IncomingRequest::FinishScheduledResult::ABORTED) { + } else if (scheduledResult == EventOutcome::EXCEPTION) { // Attempt to grab the error message to understand the reason for the abort. // Include a timeout just in case for some unexpected reason the onAbort promise hasn't // already rejected. @@ -980,7 +982,7 @@ kj::Promise QueueCustomEvent::run( } co_return WorkerInterface::CustomEvent::Result{ - .outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU, + .outcome = completed ? context.waitUntilStatus() : scheduledResult, }; } } diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 11849c9a63a..132c3293abc 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -564,8 +564,35 @@ kj::Promise IoContext::IncomingRequest::drain() { .exclusiveJoin(context->onAbort().catch_([](kj::Exception&&) {})); } -kj::Promise IoContext::IncomingRequest:: - finishScheduled() { +// TODO +constexpr kj::Exception::DetailTypeId SCRIPT_KILLED_DETAIL_ID = 0x0ull; +constexpr kj::Exception::DetailTypeId INACTIVE_WEBSOCKETS_DETAIL_ID = 0x0ull; +constexpr kj::Exception::DetailTypeId SCRIPT_NOT_FOUND_DETAIL_ID = 0x0ull; + +// TODO: Try to generalize this. Are all outcomes needed here? +EventOutcome outcomeFromException(kj::Exception& e) { + if (e.getType() == kj::Exception::Type::OVERLOADED) { + if (e.getDetail(SCRIPT_KILLED_DETAIL_ID) != kj::none) { + return EventOutcome::KILL_SWITCH; + } else if (e.getDetail(MEMORY_LIMIT_DETAIL_ID) != kj::none) { + return EventOutcome::EXCEEDED_MEMORY; + } else if (e.getDetail(CPU_LIMIT_DETAIL_ID) != kj::none) { + return EventOutcome::EXCEEDED_CPU; + } else if (e.getDetail(INACTIVE_WEBSOCKETS_DETAIL_ID) != kj::none) { + return EventOutcome::LOAD_SHED; + } else { + // TODO(later): We have many overloaded exceptions that can't be described as killSwitch, + // exceededCpu or exceededMemory. Should there be a new outcome type for them? + return EventOutcome::EXCEPTION; + } + } else if (e.getDetail(SCRIPT_NOT_FOUND_DETAIL_ID) != kj::none) { + return EventOutcome::SCRIPT_NOT_FOUND; + } else { + return EventOutcome::EXCEPTION; + } +} + +kj::Promise IoContext::IncomingRequest::finishScheduled() { // TODO(someday): In principle we should be able to support delivering the "scheduled" event type // to an actor, and this may be important if we open up the whole of WorkerInterface to be // callable from any stub. However, the logic around async tasks would have to be different. We @@ -580,13 +607,22 @@ kj::Promise IoContext::Incomin context->incomingRequests.front().waitedForWaitUntil = true; auto timeoutPromise = context->limitEnforcer->limitScheduled().then( - [] { return IoContext_IncomingRequest::FinishScheduledResult::TIMEOUT; }); + // TODO: I think there is a material difference between the scheduled limit being hit and + // actually hitting the CPU limit, we emit a NOSENTRY log for this case for queue events. How + // do we handle this best? + [] { return EventOutcome::EXCEEDED_CPU; }); return context->waitUntilTasks.onEmpty() - .then([]() { return IoContext_IncomingRequest::FinishScheduledResult::COMPLETED; }) + .then([]() { return EventOutcome::OK; }) .exclusiveJoin(kj::mv(timeoutPromise)) .exclusiveJoin(context->onAbort().then([] { - return IoContext_IncomingRequest::FinishScheduledResult::ABORTED; - }, [](kj::Exception&&) { return IoContext_IncomingRequest::FinishScheduledResult::ABORTED; })); + // abortFulfiller should only ever be rejected instead of being fulfilled, but return an + // exception outcome if it does happen + return EventOutcome::EXCEPTION; + }, [](kj::Exception&& e) { + // TODO: If we end up returning exception here, that's still a case that's hard to make sense of + return outcomeFromException(e); + // TODO: Just call reportFailure() here? + })); } class IoContext::PendingEvent: public kj::Refcounted { diff --git a/src/workerd/io/io-context.h b/src/workerd/io/io-context.h index 39553e33fc5..bccf737d308 100644 --- a/src/workerd/io/io-context.h +++ b/src/workerd/io/io-context.h @@ -110,8 +110,7 @@ class IoContext_IncomingRequest final { // This method is also used by some custom event handlers (see WorkerInterface::CustomEvent) that // need similar behavior, as well as the test handler. TODO(cleanup): Rename to something more // generic? - enum class FinishScheduledResult { COMPLETED, ABORTED, TIMEOUT }; - kj::Promise finishScheduled(); + kj::Promise finishScheduled(); // Access the event loop's current time point. This will remain constant between ticks. This is // used to implement IoContext::now(), which should be preferred so that time can be adjusted diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index 3d46aae580c..13ca25615f7 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -718,10 +718,10 @@ kj::Promise WorkerEntrypoint::runScheduled( kj::Own request) -> kj::Promise { TRACE_EVENT("workerd", "WorkerEntrypoint::runScheduled() waitForFinished()"); - auto result = co_await request->finishScheduled(); - bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED; + auto scheduledResult = co_await request->finishScheduled(); + bool completed = scheduledResult == EventOutcome::OK; co_return WorkerInterface::ScheduledResult{.retry = context.shouldRetryScheduled(), - .outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU}; + .outcome = completed ? context.waitUntilStatus() : scheduledResult}; }; auto promise = waitForFinished(context, kj::mv(incomingRequest)); @@ -893,9 +893,9 @@ kj::Promise WorkerEntrypoint::test() { static auto constexpr waitForFinished = [](IoContext& context, kj::Own request) -> kj::Promise { TRACE_EVENT("workerd", "WorkerEntrypoint::test() waitForFinished()"); - auto result = co_await request->finishScheduled(); + auto scheduledResult = co_await request->finishScheduled(); - if (result == IoContext_IncomingRequest::FinishScheduledResult::ABORTED) { + if (scheduledResult == EventOutcome::EXCEPTION) { // If the test handler throws an exception (without aborting - just a regular exception), // then `outcome` ends up being EventOutcome::EXCEPTION, which causes us to return false. // But in that case we are separately relying on the exception being logged as an uncaught @@ -913,8 +913,8 @@ kj::Promise WorkerEntrypoint::test() { // (enough so that we can get logs/spans from them in wd-tests), so this is not needed in // practice. - bool completed = result == IoContext_IncomingRequest::FinishScheduledResult::COMPLETED; - auto outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU; + bool completed = scheduledResult == EventOutcome::OK; + auto outcome = completed ? context.waitUntilStatus() : scheduledResult; co_return outcome == EventOutcome::OK; };