Skip to content

Commit

Permalink
Move implementation of queue event handler into workerd
Browse files Browse the repository at this point in the history
This required a few more changes than the binding did, so merits at
least a slightly closer review. All internal tests pass, at least.
  • Loading branch information
a-robinson committed Apr 17, 2023
1 parent 3679619 commit 71eea19
Show file tree
Hide file tree
Showing 8 changed files with 505 additions and 49 deletions.
1 change: 1 addition & 0 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "html-rewriter.h"
#include "trace.h"
#include "scheduled.h"
#include "queue.h"
#include "hibernatable-web-socket.h"
#include "blob.h"
#include "sockets.h"
Expand Down
260 changes: 260 additions & 0 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,264 @@ kj::Promise<void> WorkerQueue::sendBatch(
}).attach(kj::mv(client));
};

QueueMessage::QueueMessage(
v8::Isolate* isolate, rpc::QueueMessage::Reader message, IoPtr<QueueEventResult> result)
: id(kj::str(message.getId())),
timestamp(message.getTimestampNs() * kj::NANOSECONDS + kj::UNIX_EPOCH),
body(isolate, jsg::Deserializer(isolate, message.getData()).readValue()),
result(result) {}
// Note that we must make deep copies of all data here since the incoming Reader may be
// deallocated while JS's GC wrappers still exist.

QueueMessage::QueueMessage(
v8::Isolate* isolate, IncomingQueueMessage message, IoPtr<QueueEventResult> result)
: id(kj::mv(message.id)),
timestamp(message.timestamp),
body(isolate, jsg::Deserializer(isolate, kj::mv(message.body)).readValue()),
result(result) {}

jsg::Value QueueMessage::getBody(jsg::Lock& js) {
return body.addRef(js);
}

void QueueMessage::retry() {
if (result->retryAll) {
return;
}

if (result->ackAll) {
auto msg = kj::str(
"Received a call to retry() on message ", id, " after ackAll() was already called. "
"Calling retry() on a message after calling ackAll() has no effect.");
IoContext::current().logWarning(msg);
return;
}

if (result->explicitAcks.contains(id)) {
auto msg = kj::str(
"Received a call to retry() on message ", id, " after ack() was already called. "
"Calling retry() on a message after calling ack() has no effect.");
IoContext::current().logWarning(msg);
return;
}
result->explicitRetries.findOrCreate(id, [this]() { return kj::heapString(id); } );
}

void QueueMessage::ack() {
if (result->ackAll) {
return;
}

if (result->retryAll) {
auto msg = kj::str(
"Received a call to ack() on message ", id, " after retryAll() was already called. "
"Calling ack() on a message after calling retryAll() has no effect.");
IoContext::current().logWarning(msg);
return;
}

if (result->explicitRetries.contains(id)) {
auto msg = kj::str(
"Received a call to ack() on message ", id, " after retry() was already called. "
"Calling ack() on a message after calling retry() has no effect.");
IoContext::current().logWarning(msg);
return;
}
result->explicitAcks.findOrCreate(id, [this]() { return kj::heapString(id); } );
}

QueueEvent::QueueEvent(v8::Isolate* isolate, rpc::EventDispatcher::QueueParams::Reader params, IoPtr<QueueEventResult> result)
: ExtendableEvent("queue"), queueName(kj::heapString(params.getQueueName())), result(result) {
// Note that we must make deep copies of all data here since the incoming Reader may be
// deallocated while JS's GC wrappers still exist.
auto incoming = params.getMessages();
auto messagesBuilder = kj::heapArrayBuilder<jsg::Ref<QueueMessage>>(incoming.size());
for (auto i: kj::indices(incoming)) {
messagesBuilder.add(jsg::alloc<QueueMessage>(isolate, incoming[i], result));
}
messages = messagesBuilder.finish();
}

QueueEvent::QueueEvent(v8::Isolate* isolate, Params params, IoPtr<QueueEventResult> result)
: ExtendableEvent("queue"), queueName(kj::mv(params.queueName)), result(result) {
auto messagesBuilder = kj::heapArrayBuilder<jsg::Ref<QueueMessage>>(params.messages.size());
for (auto i: kj::indices(params.messages)) {
messagesBuilder.add(jsg::alloc<QueueMessage>(isolate, kj::mv(params.messages[i]), result));
}
messages = messagesBuilder.finish();
}

void QueueEvent::retryAll() {
if (result->ackAll) {
IoContext::current().logWarning(
"Received a call to retryAll() after ackAll() was already called. "
"Calling retryAll() after calling ackAll() has no effect.");
return;
}
result->retryAll = true;
}

void QueueEvent::ackAll() {
if (result->retryAll) {
IoContext::current().logWarning(
"Received a call to ackAll() after retryAll() was already called. "
"Calling ackAll() after calling retryAll() has no effect.");
return;
}
result->ackAll = true;
}

jsg::Ref<QueueEvent> startQueueEvent(
EventTarget& globalEventTarget,
kj::OneOf<rpc::EventDispatcher::QueueParams::Reader, QueueEvent::Params> params,
IoPtr<QueueEventResult> result,
Worker::Lock& lock, kj::Maybe<ExportedHandler&> exportedHandler,
const jsg::TypeHandler<QueueExportedHandler>& handlerHandler) {
auto isolate = lock.getIsolate();
jsg::Ref<QueueEvent> event(nullptr);
KJ_SWITCH_ONEOF(params) {
KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) {
event = jsg::alloc<QueueEvent>(isolate, p, result);
}
KJ_CASE_ONEOF(p, QueueEvent::Params) {
event = jsg::alloc<QueueEvent>(isolate, kj::mv(p), result);
}
}

KJ_IF_MAYBE(h, exportedHandler) {
auto queueHandler = KJ_ASSERT_NONNULL(handlerHandler.tryUnwrap(
lock, h->self.getHandle(lock.getIsolate())));
KJ_IF_MAYBE(f, queueHandler.queue) {
auto promise = (*f)(lock, jsg::alloc<QueueController>(event.addRef()),
h->env.addRef(isolate), h->getCtx(isolate));
event->waitUntil(kj::mv(promise));
} else {
lock.logWarningOnce(
"Received a QueueEvent but we lack a handler for QueueEvents. "
"Did you remember to export a queue() function?");
JSG_FAIL_REQUIRE(Error, "Handler does not export a queue() function.");
}
} else {
if (globalEventTarget.getHandlerCount("queue") == 0) {
lock.logWarningOnce(
"Received a QueueEvent but we lack an event listener for queue events. "
"Did you remember to call addEventListener(\"queue\", ...)?");
JSG_FAIL_REQUIRE(Error, "No event listener registered for queue messages.");
}
globalEventTarget.dispatchEventImpl(lock, event.addRef());
}

return event.addRef();
}

kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
kj::Own<IoContext_IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointName) {
incomingRequest->delivered();
auto& context = incomingRequest->getContext();

kj::String queueName;
uint32_t batchSize;
KJ_SWITCH_ONEOF(params) {
KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) {
queueName = kj::heapString(p.getQueueName());
batchSize = p.getMessages().size();
}
KJ_CASE_ONEOF(p, QueueEvent::Params) {
queueName = kj::heapString(p.queueName);
batchSize = p.messages.size();
}
}

KJ_IF_MAYBE(t, incomingRequest->getWorkerTracer()) {
t->setEventInfo(context.now(), Trace::QueueEventInfo(kj::mv(queueName), batchSize));
}

// Create a custom refcounted type for holding the queueEvent so that we can pass it to the
// waitUntil'ed callback safely without worrying about whether this coroutine gets canceled.
struct QueueEventHolder : public kj::Refcounted {
jsg::Ref<QueueEvent> event = nullptr;
};
auto queueEventHolder = kj::refcounted<QueueEventHolder>();

// It's a little ugly, but the usage of waitUntil (and finishScheduled) down below are here so
// that users can write queue handlers in the old addEventListener("queue", ...) syntax (where we
// can't just wait on their addEventListener handler to resolve because it can't be async).
context.addWaitUntil(context.run(
[this, entrypointName=entrypointName, &context, queueEvent = kj::addRef(*queueEventHolder),
&metrics = incomingRequest->getMetrics()]
(Worker::Lock& lock) mutable {
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

auto& typeHandler = lock.getWorker().getIsolate().getApiIsolate().getQueueTypeHandler(lock);
queueEvent->event = startQueueEvent(lock.getGlobalScope(), kj::mv(params), context.addObject(result), lock,
lock.getExportedHandler(entrypointName, context.getActor()), typeHandler);
}));

// TODO(soon): There's a good chance we'll want a different wall-clock timeout for queue handlers
// than for scheduled workers, but it's not at all clear yet to me what it should be, so just
// reuse the scheduled worker logic and timeout for now.
auto completed = co_await incomingRequest->finishScheduled();

co_return WorkerInterface::CustomEvent::Result {
.outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU,
};
}

kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::sendRpc(
capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks,
rpc::EventDispatcher::Client dispatcher) {
auto req = dispatcher.castAs<rpc::EventDispatcher>().queueRequest();
KJ_SWITCH_ONEOF(params) {
KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) {
req.setQueueName(p.getQueueName());
req.setMessages(p.getMessages());
}
KJ_CASE_ONEOF(p, QueueEvent::Params) {
req.setQueueName(p.queueName);
auto messages = req.initMessages(p.messages.size());
for (auto i: kj::indices(p.messages)) {
messages[i].setId(p.messages[i].id);
messages[i].setTimestampNs((p.messages[i].timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS);
messages[i].setData(p.messages[i].body);
}
}
}

return req.send().then([this](auto resp) {
auto respResult = resp.getResult();
this->result.retryAll = respResult.getRetryAll();
this->result.ackAll = respResult.getAckAll();
this->result.explicitRetries.clear();
for (const auto& msgId : respResult.getExplicitRetries()) {
this->result.explicitRetries.insert(kj::heapString(msgId));
}
this->result.explicitAcks.clear();
for (const auto& msgId : respResult.getExplicitAcks()) {
this->result.explicitAcks.insert(kj::heapString(msgId));
}
return WorkerInterface::CustomEvent::Result {
.outcome = respResult.getOutcome(),
};
});
}

kj::Array<kj::String> QueueCustomEventImpl::getExplicitRetries() const {
auto retryArray = kj::heapArrayBuilder<kj::String>(result.explicitRetries.size());
for (const auto& msgId : result.explicitRetries) {
retryArray.add(kj::heapString(msgId));
}
return retryArray.finish();
}

kj::Array<kj::String> QueueCustomEventImpl::getExplicitAcks() const {
auto ackArray = kj::heapArrayBuilder<kj::String>(result.explicitAcks.size());
for (const auto& msgId : result.explicitAcks) {
ackArray.add(kj::heapString(msgId));
}
return ackArray.finish();
}

} // namespace workerd::api
Loading

0 comments on commit 71eea19

Please sign in to comment.