From 71eea19655a7b2a289303a65ec77bd5d095ba67f Mon Sep 17 00:00:00 2001 From: Alex Robinson Date: Fri, 14 Apr 2023 16:57:20 -0500 Subject: [PATCH] Move implementation of queue event handler into workerd This required a few more changes than the binding did, so merits at least a slightly closer review. All internal tests pass, at least. --- src/workerd/api/global-scope.h | 1 + src/workerd/api/queue.c++ | 260 ++++++++++++++++++++++++++ src/workerd/api/queue.h | 214 ++++++++++++++++++++- src/workerd/io/worker-interface.capnp | 22 +++ src/workerd/io/worker.h | 3 + src/workerd/server/workerd-api.c++ | 5 + src/workerd/server/workerd-api.h | 2 + types/defines/queues.d.ts | 47 ----- 8 files changed, 505 insertions(+), 49 deletions(-) delete mode 100644 types/defines/queues.d.ts diff --git a/src/workerd/api/global-scope.h b/src/workerd/api/global-scope.h index 7d9e5d90985..aee8870eb6a 100644 --- a/src/workerd/api/global-scope.h +++ b/src/workerd/api/global-scope.h @@ -18,6 +18,7 @@ #include "html-rewriter.h" #include "trace.h" #include "scheduled.h" +#include "queue.h" #include "hibernatable-web-socket.h" #include "blob.h" #include "sockets.h" diff --git a/src/workerd/api/queue.c++ b/src/workerd/api/queue.c++ index 7737ac3d84d..456faf3f543 100644 --- a/src/workerd/api/queue.c++ +++ b/src/workerd/api/queue.c++ @@ -133,4 +133,264 @@ kj::Promise WorkerQueue::sendBatch( }).attach(kj::mv(client)); }; +QueueMessage::QueueMessage( + v8::Isolate* isolate, rpc::QueueMessage::Reader message, IoPtr result) + : id(kj::str(message.getId())), + timestamp(message.getTimestampNs() * kj::NANOSECONDS + kj::UNIX_EPOCH), + body(isolate, jsg::Deserializer(isolate, message.getData()).readValue()), + result(result) {} +// Note that we must make deep copies of all data here since the incoming Reader may be +// deallocated while JS's GC wrappers still exist. + +QueueMessage::QueueMessage( + v8::Isolate* isolate, IncomingQueueMessage message, IoPtr result) + : id(kj::mv(message.id)), + timestamp(message.timestamp), + body(isolate, jsg::Deserializer(isolate, kj::mv(message.body)).readValue()), + result(result) {} + +jsg::Value QueueMessage::getBody(jsg::Lock& js) { + return body.addRef(js); +} + +void QueueMessage::retry() { + if (result->retryAll) { + return; + } + + if (result->ackAll) { + auto msg = kj::str( + "Received a call to retry() on message ", id, " after ackAll() was already called. " + "Calling retry() on a message after calling ackAll() has no effect."); + IoContext::current().logWarning(msg); + return; + } + + if (result->explicitAcks.contains(id)) { + auto msg = kj::str( + "Received a call to retry() on message ", id, " after ack() was already called. " + "Calling retry() on a message after calling ack() has no effect."); + IoContext::current().logWarning(msg); + return; + } + result->explicitRetries.findOrCreate(id, [this]() { return kj::heapString(id); } ); +} + +void QueueMessage::ack() { + if (result->ackAll) { + return; + } + + if (result->retryAll) { + auto msg = kj::str( + "Received a call to ack() on message ", id, " after retryAll() was already called. " + "Calling ack() on a message after calling retryAll() has no effect."); + IoContext::current().logWarning(msg); + return; + } + + if (result->explicitRetries.contains(id)) { + auto msg = kj::str( + "Received a call to ack() on message ", id, " after retry() was already called. " + "Calling ack() on a message after calling retry() has no effect."); + IoContext::current().logWarning(msg); + return; + } + result->explicitAcks.findOrCreate(id, [this]() { return kj::heapString(id); } ); +} + +QueueEvent::QueueEvent(v8::Isolate* isolate, rpc::EventDispatcher::QueueParams::Reader params, IoPtr result) + : ExtendableEvent("queue"), queueName(kj::heapString(params.getQueueName())), result(result) { + // Note that we must make deep copies of all data here since the incoming Reader may be + // deallocated while JS's GC wrappers still exist. + auto incoming = params.getMessages(); + auto messagesBuilder = kj::heapArrayBuilder>(incoming.size()); + for (auto i: kj::indices(incoming)) { + messagesBuilder.add(jsg::alloc(isolate, incoming[i], result)); + } + messages = messagesBuilder.finish(); +} + +QueueEvent::QueueEvent(v8::Isolate* isolate, Params params, IoPtr result) + : ExtendableEvent("queue"), queueName(kj::mv(params.queueName)), result(result) { + auto messagesBuilder = kj::heapArrayBuilder>(params.messages.size()); + for (auto i: kj::indices(params.messages)) { + messagesBuilder.add(jsg::alloc(isolate, kj::mv(params.messages[i]), result)); + } + messages = messagesBuilder.finish(); +} + +void QueueEvent::retryAll() { + if (result->ackAll) { + IoContext::current().logWarning( + "Received a call to retryAll() after ackAll() was already called. " + "Calling retryAll() after calling ackAll() has no effect."); + return; + } + result->retryAll = true; +} + +void QueueEvent::ackAll() { + if (result->retryAll) { + IoContext::current().logWarning( + "Received a call to ackAll() after retryAll() was already called. " + "Calling ackAll() after calling retryAll() has no effect."); + return; + } + result->ackAll = true; +} + +jsg::Ref startQueueEvent( + EventTarget& globalEventTarget, + kj::OneOf params, + IoPtr result, + Worker::Lock& lock, kj::Maybe exportedHandler, + const jsg::TypeHandler& handlerHandler) { + auto isolate = lock.getIsolate(); + jsg::Ref event(nullptr); + KJ_SWITCH_ONEOF(params) { + KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) { + event = jsg::alloc(isolate, p, result); + } + KJ_CASE_ONEOF(p, QueueEvent::Params) { + event = jsg::alloc(isolate, kj::mv(p), result); + } + } + + KJ_IF_MAYBE(h, exportedHandler) { + auto queueHandler = KJ_ASSERT_NONNULL(handlerHandler.tryUnwrap( + lock, h->self.getHandle(lock.getIsolate()))); + KJ_IF_MAYBE(f, queueHandler.queue) { + auto promise = (*f)(lock, jsg::alloc(event.addRef()), + h->env.addRef(isolate), h->getCtx(isolate)); + event->waitUntil(kj::mv(promise)); + } else { + lock.logWarningOnce( + "Received a QueueEvent but we lack a handler for QueueEvents. " + "Did you remember to export a queue() function?"); + JSG_FAIL_REQUIRE(Error, "Handler does not export a queue() function."); + } + } else { + if (globalEventTarget.getHandlerCount("queue") == 0) { + lock.logWarningOnce( + "Received a QueueEvent but we lack an event listener for queue events. " + "Did you remember to call addEventListener(\"queue\", ...)?"); + JSG_FAIL_REQUIRE(Error, "No event listener registered for queue messages."); + } + globalEventTarget.dispatchEventImpl(lock, event.addRef()); + } + + return event.addRef(); +} + +kj::Promise QueueCustomEventImpl::run( + kj::Own incomingRequest, + kj::Maybe entrypointName) { + incomingRequest->delivered(); + auto& context = incomingRequest->getContext(); + + kj::String queueName; + uint32_t batchSize; + KJ_SWITCH_ONEOF(params) { + KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) { + queueName = kj::heapString(p.getQueueName()); + batchSize = p.getMessages().size(); + } + KJ_CASE_ONEOF(p, QueueEvent::Params) { + queueName = kj::heapString(p.queueName); + batchSize = p.messages.size(); + } + } + + KJ_IF_MAYBE(t, incomingRequest->getWorkerTracer()) { + t->setEventInfo(context.now(), Trace::QueueEventInfo(kj::mv(queueName), batchSize)); + } + + // Create a custom refcounted type for holding the queueEvent so that we can pass it to the + // waitUntil'ed callback safely without worrying about whether this coroutine gets canceled. + struct QueueEventHolder : public kj::Refcounted { + jsg::Ref event = nullptr; + }; + auto queueEventHolder = kj::refcounted(); + + // It's a little ugly, but the usage of waitUntil (and finishScheduled) down below are here so + // that users can write queue handlers in the old addEventListener("queue", ...) syntax (where we + // can't just wait on their addEventListener handler to resolve because it can't be async). + context.addWaitUntil(context.run( + [this, entrypointName=entrypointName, &context, queueEvent = kj::addRef(*queueEventHolder), + &metrics = incomingRequest->getMetrics()] + (Worker::Lock& lock) mutable { + jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); + + auto& typeHandler = lock.getWorker().getIsolate().getApiIsolate().getQueueTypeHandler(lock); + queueEvent->event = startQueueEvent(lock.getGlobalScope(), kj::mv(params), context.addObject(result), lock, + lock.getExportedHandler(entrypointName, context.getActor()), typeHandler); + })); + + // TODO(soon): There's a good chance we'll want a different wall-clock timeout for queue handlers + // than for scheduled workers, but it's not at all clear yet to me what it should be, so just + // reuse the scheduled worker logic and timeout for now. + auto completed = co_await incomingRequest->finishScheduled(); + + co_return WorkerInterface::CustomEvent::Result { + .outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU, + }; +} + +kj::Promise QueueCustomEventImpl::sendRpc( + capnp::HttpOverCapnpFactory& httpOverCapnpFactory, + capnp::ByteStreamFactory& byteStreamFactory, + kj::TaskSet& waitUntilTasks, + rpc::EventDispatcher::Client dispatcher) { + auto req = dispatcher.castAs().queueRequest(); + KJ_SWITCH_ONEOF(params) { + KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) { + req.setQueueName(p.getQueueName()); + req.setMessages(p.getMessages()); + } + KJ_CASE_ONEOF(p, QueueEvent::Params) { + req.setQueueName(p.queueName); + auto messages = req.initMessages(p.messages.size()); + for (auto i: kj::indices(p.messages)) { + messages[i].setId(p.messages[i].id); + messages[i].setTimestampNs((p.messages[i].timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS); + messages[i].setData(p.messages[i].body); + } + } + } + + return req.send().then([this](auto resp) { + auto respResult = resp.getResult(); + this->result.retryAll = respResult.getRetryAll(); + this->result.ackAll = respResult.getAckAll(); + this->result.explicitRetries.clear(); + for (const auto& msgId : respResult.getExplicitRetries()) { + this->result.explicitRetries.insert(kj::heapString(msgId)); + } + this->result.explicitAcks.clear(); + for (const auto& msgId : respResult.getExplicitAcks()) { + this->result.explicitAcks.insert(kj::heapString(msgId)); + } + return WorkerInterface::CustomEvent::Result { + .outcome = respResult.getOutcome(), + }; + }); +} + +kj::Array QueueCustomEventImpl::getExplicitRetries() const { + auto retryArray = kj::heapArrayBuilder(result.explicitRetries.size()); + for (const auto& msgId : result.explicitRetries) { + retryArray.add(kj::heapString(msgId)); + } + return retryArray.finish(); +} + +kj::Array QueueCustomEventImpl::getExplicitAcks() const { + auto ackArray = kj::heapArrayBuilder(result.explicitAcks.size()); + for (const auto& msgId : result.explicitAcks) { + ackArray.add(kj::heapString(msgId)); + } + return ackArray.finish(); +} + } // namespace workerd::api diff --git a/src/workerd/api/queue.h b/src/workerd/api/queue.h index 571fba4c1d1..e6eb16898ac 100644 --- a/src/workerd/api/queue.h +++ b/src/workerd/api/queue.h @@ -1,13 +1,21 @@ #pragma once +#include + +#include #include +#include +#include +#include #include namespace workerd::api { using kj::uint; +class ExecutionContext; + // Binding types class WorkerQueue: public jsg::Object { @@ -63,9 +71,211 @@ class WorkerQueue: public jsg::Object { uint subrequestChannel; }; +// Event handler types + +// Types for other workers passing messages into and responses out of a queue handler. + +struct IncomingQueueMessage { + kj::String id; + kj::Date timestamp; + kj::Array body; + JSG_STRUCT(id, timestamp, body); +}; + +struct QueueResponse { + uint16_t outcome; + bool retryAll; + bool ackAll; + kj::Array explicitRetries; + kj::Array explicitAcks; + JSG_STRUCT(outcome, retryAll, ackAll, explicitRetries, explicitAcks); +}; + +// Internal-only representation used to accumulate the results of a queue event. + +struct QueueEventResult { + bool retryAll = false; + bool ackAll = false; + kj::HashSet explicitRetries; + kj::HashSet explicitAcks; +}; + +class QueueMessage final: public jsg::Object { +public: + QueueMessage(v8::Isolate* isolate, rpc::QueueMessage::Reader message, IoPtr result); + QueueMessage(v8::Isolate* isolate, IncomingQueueMessage message, IoPtr result); + + kj::StringPtr getId() { return id; } + kj::Date getTimestamp() { return timestamp; } + jsg::Value getBody(jsg::Lock& js); + + void retry(); + void ack(); + + // TODO(soon): Add metadata support. + + JSG_RESOURCE_TYPE(QueueMessage) { + JSG_READONLY_INSTANCE_PROPERTY(id, getId); + JSG_READONLY_INSTANCE_PROPERTY(timestamp, getTimestamp); + JSG_READONLY_INSTANCE_PROPERTY(body, getBody); + JSG_METHOD(retry); + JSG_METHOD(ack); + + JSG_TS_OVERRIDE(Message { + readonly body: Body; + }); + } + +private: + kj::String id; + kj::Date timestamp; + jsg::Value body; + IoPtr result; + + void visitForGc(jsg::GcVisitor& visitor) { + visitor.visit(body); + } +}; + +class QueueEvent final: public ExtendableEvent { +public: + // TODO(cleanup): Should we get around the need for this alternative param type by just having the + // service worker caller provide us with capnp-serialized params? + struct Params { + kj::String queueName; + kj::Array messages; + }; + + explicit QueueEvent(v8::Isolate* isolate, rpc::EventDispatcher::QueueParams::Reader params, IoPtr result); + explicit QueueEvent(v8::Isolate* isolate, Params params, IoPtr result); + + static jsg::Ref constructor(kj::String type) = delete; + + kj::ArrayPtr> getMessages() { return messages; } + kj::StringPtr getQueueName() { return queueName; } + + void retryAll(); + void ackAll(); + + JSG_RESOURCE_TYPE(QueueEvent) { + JSG_INHERIT(ExtendableEvent); + + JSG_LAZY_READONLY_INSTANCE_PROPERTY(messages, getMessages); + JSG_READONLY_INSTANCE_PROPERTY(queue, getQueueName); + + JSG_METHOD(retryAll); + JSG_METHOD(ackAll); + + JSG_TS_OVERRIDE( { + readonly messages: readonly Message[]; + }); + } + +private: + // TODO(perf): Should we store these in a v8 array directly rather than this intermediate kj + // array to avoid one intermediate copy? + kj::Array> messages; + kj::String queueName; + IoPtr result; + + void visitForGc(jsg::GcVisitor& visitor) { + for (auto i: kj::indices(messages)) { + visitor.visit(messages[i]); + } + } +}; + +class QueueController final: public jsg::Object { + // Type used when calling a module-exported queue event handler. +public: + QueueController(jsg::Ref event) + : event(kj::mv(event)) {} + + kj::ArrayPtr> getMessages() { return event->getMessages(); } + kj::StringPtr getQueueName() { return event->getQueueName(); } + void retryAll() { event->retryAll(); } + void ackAll() { event->ackAll(); } + + JSG_RESOURCE_TYPE(QueueController) { + JSG_READONLY_INSTANCE_PROPERTY(messages, getMessages); + JSG_READONLY_INSTANCE_PROPERTY(queue, getQueueName); + + JSG_METHOD(retryAll); + JSG_METHOD(ackAll); + + JSG_TS_ROOT(); + JSG_TS_OVERRIDE(MessageBatch { + readonly messages: readonly Message[]; + }); + } + +private: + jsg::Ref event; + + void visitForGc(jsg::GcVisitor& visitor) { + visitor.visit(event); + } +}; + +struct QueueExportedHandler { + // Extension of ExportedHandler covering queue handlers. + + typedef kj::Promise QueueHandler( + jsg::Ref controller, jsg::Value env, jsg::Optional> ctx); + jsg::LenientOptional> queue; + + JSG_STRUCT(queue); +}; + +jsg::Ref startQueueEvent( + EventTarget& globalEventTarget, + kj::OneOf params, + IoPtr result, + Worker::Lock& lock, kj::Maybe exportedHandler, + const jsg::TypeHandler& handlerHandler); +// Start a queue event (called from C++, not JS). Similar to startScheduled(), the caller must +// wait for waitUntil()s to produce the final QueueResult. + +class QueueCustomEventImpl final: public WorkerInterface::CustomEvent, public kj::Refcounted { +public: + QueueCustomEventImpl( + kj::OneOf params) + : params(kj::mv(params)) {} + + kj::Promise run( + kj::Own incomingRequest, + kj::Maybe entrypointName) override; + + kj::Promise sendRpc( + capnp::HttpOverCapnpFactory& httpOverCapnpFactory, + capnp::ByteStreamFactory& byteStreamFactory, + kj::TaskSet& waitUntilTasks, + rpc::EventDispatcher::Client dispatcher) override; + + static const uint16_t EVENT_TYPE = 5; + uint16_t getType() override { + return EVENT_TYPE; + } + + bool getRetryAll() const { return result.retryAll; } + bool getAckAll() const { return result.ackAll; } + kj::Array getExplicitRetries() const; + kj::Array getExplicitAcks() const; + +private: + kj::OneOf params; + QueueEventResult result; +}; + #define EW_QUEUE_ISOLATE_TYPES \ api::WorkerQueue, \ api::WorkerQueue::SendOptions, \ - api::WorkerQueue::MessageSendRequest + api::WorkerQueue::MessageSendRequest, \ + api::IncomingQueueMessage, \ + api::QueueResponse, \ + api::QueueMessage, \ + api::QueueEvent, \ + api::QueueController, \ + api::QueueExportedHandler -} // namespace edgeworker::api +} // namespace workerd::api diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index d439ec3140c..0e7bc72c663 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -110,6 +110,22 @@ struct AlarmRun @0xfa8ea4e97e23b03d { retryCountsAgainstLimit @2 :Bool = true; } +struct QueueMessage @0x944adb18c0352295 { + id @0 :Text; + timestampNs @1 :Int64; + data @2 :Data; +} + +struct QueueResponse @0x90e98932c0bfc0de { + outcome @0 :EventOutcome; + retryAll @1 :Bool; + ackAll @2 :Bool; + explicitRetries @3 :List(Text); + # List of Message IDs that were explicitly marked for retry + explicitAcks @4 :List(Text); + # List of Message IDs that were explicitly marked as acknowledged +} + interface EventDispatcher @0xf20697475ec1752d { # Interface used to deliver events to a Worker's global event handlers. @@ -138,6 +154,12 @@ interface EventDispatcher @0xf20697475ec1752d { # It would be cleaner to handle that inside the implementation so we could mark the entire # interface (and file) with allowCancellation. + queue @8 (messages :List(QueueMessage), queueName :Text) -> (result :QueueResponse) + $Cxx.allowCancellation; + # Delivers a batch of queue messages to a worker's queue event handler. Returns information about + # the success of the batch, including which messages should be considered acknowledged and which + # should be retried. + obsolete5 @5(); obsolete6 @6(); obsolete7 @7(); diff --git a/src/workerd/io/worker.h b/src/workerd/io/worker.h index 6ea5470c636..e79b4c0d526 100644 --- a/src/workerd/io/worker.h +++ b/src/workerd/io/worker.h @@ -34,6 +34,7 @@ namespace api { class ServiceWorkerGlobalScope; struct ExportedHandler; struct CryptoAlgorithm; + struct QueueExportedHandler; } class IoContext; @@ -490,6 +491,8 @@ class Worker::ApiIsolate { }; virtual const jsg::TypeHandler& getErrorInterfaceTypeHandler(jsg::Lock& lock) const = 0; + virtual const jsg::TypeHandler& getQueueTypeHandler( + jsg::Lock& lock) const = 0; virtual kj::Maybe getCryptoAlgorithm(kj::StringPtr name) const { // Look up crypto algorithms by case-insensitive name. This can be used to extend the set of diff --git a/src/workerd/server/workerd-api.c++ b/src/workerd/server/workerd-api.c++ index aadb5bc3b30..0d39663a3b8 100644 --- a/src/workerd/server/workerd-api.c++ +++ b/src/workerd/server/workerd-api.c++ @@ -162,6 +162,11 @@ const jsg::TypeHandler& return kj::downcast(lock).getTypeHandler(); } +const jsg::TypeHandler& + WorkerdApiIsolate::getQueueTypeHandler(jsg::Lock& lock) const { + return kj::downcast(lock).getTypeHandler(); +} + struct NoopCompilationObserver final : public jsg::CompilationObserver { kj::Own onEsmCompilationStart(v8::Isolate* isolate, kj::StringPtr name, jsg::ModuleInfoCompileOption option) const override { diff --git a/src/workerd/server/workerd-api.h b/src/workerd/server/workerd-api.h index cffc46bf32d..92edac79013 100644 --- a/src/workerd/server/workerd-api.h +++ b/src/workerd/server/workerd-api.h @@ -25,6 +25,8 @@ class WorkerdApiIsolate final: public Worker::ApiIsolate { jsg::Lock& lock, v8::Local moduleNamespace) const override; const jsg::TypeHandler& getErrorInterfaceTypeHandler(jsg::Lock& lock) const override; + const jsg::TypeHandler& getQueueTypeHandler( + jsg::Lock& lock) const override; static Worker::Script::Source extractSource(kj::StringPtr name, config::Worker::Reader conf, diff --git a/types/defines/queues.d.ts b/types/defines/queues.d.ts deleted file mode 100644 index de4804ab8e6..00000000000 --- a/types/defines/queues.d.ts +++ /dev/null @@ -1,47 +0,0 @@ -/** - * A message that is sent to a consumer Worker. - */ -interface Message { - /** - * A unique, system-generated ID for the message. - */ - readonly id: string; - /** - * A timestamp when the message was sent. - */ - readonly timestamp: Date; - /** - * The body of the message. - */ - readonly body: Body; - /** - * Marks message to be retried. - */ - retry(): void; - /** - * Marks message acknowledged. - */ - ack(): void; -} - -/** - * A batch of messages that are sent to a consumer Worker. - */ -interface MessageBatch { - /** - * The name of the Queue that belongs to this batch. - */ - readonly queue: string; - /** - * An array of messages in the batch. Ordering of messages is not guaranteed. - */ - readonly messages: readonly Message[]; - /** - * Marks every message to be retried in the next batch. - */ - retryAll(): void; - /** - * Marks every message acknowledged in the batch. - */ - ackAll(): void; -}