Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move implementation of queue event handler into workerd #543

Merged
merged 1 commit into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "html-rewriter.h"
#include "trace.h"
#include "scheduled.h"
#include "queue.h"
#include "hibernatable-web-socket.h"
#include "blob.h"
#include "sockets.h"
Expand Down Expand Up @@ -92,6 +93,7 @@ class WorkerGlobalScope: public EventTarget {
JSG_TS_DEFINE(type WorkerGlobalScopeEventMap = {
fetch: FetchEvent;
scheduled: ScheduledEvent;
queue: QueueEvent;
unhandledrejection: PromiseRejectionEvent;
rejectionhandled: PromiseRejectionEvent;
});
Expand Down
260 changes: 260 additions & 0 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,264 @@ kj::Promise<void> WorkerQueue::sendBatch(
}).attach(kj::mv(client));
};

QueueMessage::QueueMessage(
v8::Isolate* isolate, rpc::QueueMessage::Reader message, IoPtr<QueueEventResult> result)
: id(kj::str(message.getId())),
timestamp(message.getTimestampNs() * kj::NANOSECONDS + kj::UNIX_EPOCH),
body(isolate, jsg::Deserializer(isolate, message.getData()).readValue()),
result(result) {}
// Note that we must make deep copies of all data here since the incoming Reader may be
// deallocated while JS's GC wrappers still exist.

QueueMessage::QueueMessage(
v8::Isolate* isolate, IncomingQueueMessage message, IoPtr<QueueEventResult> result)
: id(kj::mv(message.id)),
timestamp(message.timestamp),
body(isolate, jsg::Deserializer(isolate, kj::mv(message.body)).readValue()),
result(result) {}

jsg::Value QueueMessage::getBody(jsg::Lock& js) {
return body.addRef(js);
}

void QueueMessage::retry() {
if (result->retryAll) {
return;
}

if (result->ackAll) {
auto msg = kj::str(
"Received a call to retry() on message ", id, " after ackAll() was already called. "
"Calling retry() on a message after calling ackAll() has no effect.");
IoContext::current().logWarning(msg);
return;
}

if (result->explicitAcks.contains(id)) {
auto msg = kj::str(
"Received a call to retry() on message ", id, " after ack() was already called. "
"Calling retry() on a message after calling ack() has no effect.");
IoContext::current().logWarning(msg);
jasnell marked this conversation as resolved.
Show resolved Hide resolved
return;
}
result->explicitRetries.findOrCreate(id, [this]() { return kj::heapString(id); } );
}

void QueueMessage::ack() {
if (result->ackAll) {
return;
}

if (result->retryAll) {
auto msg = kj::str(
"Received a call to ack() on message ", id, " after retryAll() was already called. "
"Calling ack() on a message after calling retryAll() has no effect.");
IoContext::current().logWarning(msg);
return;
}

if (result->explicitRetries.contains(id)) {
auto msg = kj::str(
"Received a call to ack() on message ", id, " after retry() was already called. "
"Calling ack() on a message after calling retry() has no effect.");
IoContext::current().logWarning(msg);
return;
}
result->explicitAcks.findOrCreate(id, [this]() { return kj::heapString(id); } );
}

QueueEvent::QueueEvent(v8::Isolate* isolate, rpc::EventDispatcher::QueueParams::Reader params, IoPtr<QueueEventResult> result)
: ExtendableEvent("queue"), queueName(kj::heapString(params.getQueueName())), result(result) {
// Note that we must make deep copies of all data here since the incoming Reader may be
// deallocated while JS's GC wrappers still exist.
auto incoming = params.getMessages();
auto messagesBuilder = kj::heapArrayBuilder<jsg::Ref<QueueMessage>>(incoming.size());
for (auto i: kj::indices(incoming)) {
messagesBuilder.add(jsg::alloc<QueueMessage>(isolate, incoming[i], result));
}
messages = messagesBuilder.finish();
}

QueueEvent::QueueEvent(v8::Isolate* isolate, Params params, IoPtr<QueueEventResult> result)
: ExtendableEvent("queue"), queueName(kj::mv(params.queueName)), result(result) {
auto messagesBuilder = kj::heapArrayBuilder<jsg::Ref<QueueMessage>>(params.messages.size());
for (auto i: kj::indices(params.messages)) {
messagesBuilder.add(jsg::alloc<QueueMessage>(isolate, kj::mv(params.messages[i]), result));
}
messages = messagesBuilder.finish();
}

void QueueEvent::retryAll() {
if (result->ackAll) {
IoContext::current().logWarning(
"Received a call to retryAll() after ackAll() was already called. "
"Calling retryAll() after calling ackAll() has no effect.");
return;
}
result->retryAll = true;
}

void QueueEvent::ackAll() {
if (result->retryAll) {
IoContext::current().logWarning(
"Received a call to ackAll() after retryAll() was already called. "
"Calling ackAll() after calling retryAll() has no effect.");
return;
}
result->ackAll = true;
}

jsg::Ref<QueueEvent> startQueueEvent(
EventTarget& globalEventTarget,
kj::OneOf<rpc::EventDispatcher::QueueParams::Reader, QueueEvent::Params> params,
IoPtr<QueueEventResult> result,
Worker::Lock& lock, kj::Maybe<ExportedHandler&> exportedHandler,
const jsg::TypeHandler<QueueExportedHandler>& handlerHandler) {
auto isolate = lock.getIsolate();
jsg::Ref<QueueEvent> event(nullptr);
KJ_SWITCH_ONEOF(params) {
KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) {
event = jsg::alloc<QueueEvent>(isolate, p, result);
}
KJ_CASE_ONEOF(p, QueueEvent::Params) {
event = jsg::alloc<QueueEvent>(isolate, kj::mv(p), result);
}
}

KJ_IF_MAYBE(h, exportedHandler) {
auto queueHandler = KJ_ASSERT_NONNULL(handlerHandler.tryUnwrap(
lock, h->self.getHandle(lock.getIsolate())));
KJ_IF_MAYBE(f, queueHandler.queue) {
auto promise = (*f)(lock, jsg::alloc<QueueController>(event.addRef()),
h->env.addRef(isolate), h->getCtx(isolate));
event->waitUntil(kj::mv(promise));
} else {
lock.logWarningOnce(
"Received a QueueEvent but we lack a handler for QueueEvents. "
"Did you remember to export a queue() function?");
JSG_FAIL_REQUIRE(Error, "Handler does not export a queue() function.");
}
} else {
if (globalEventTarget.getHandlerCount("queue") == 0) {
lock.logWarningOnce(
"Received a QueueEvent but we lack an event listener for queue events. "
"Did you remember to call addEventListener(\"queue\", ...)?");
JSG_FAIL_REQUIRE(Error, "No event listener registered for queue messages.");
}
globalEventTarget.dispatchEventImpl(lock, event.addRef());
}

return event.addRef();
}

kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
kj::Own<IoContext_IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointName) {
incomingRequest->delivered();
auto& context = incomingRequest->getContext();

kj::String queueName;
uint32_t batchSize;
KJ_SWITCH_ONEOF(params) {
KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) {
queueName = kj::heapString(p.getQueueName());
batchSize = p.getMessages().size();
}
KJ_CASE_ONEOF(p, QueueEvent::Params) {
queueName = kj::heapString(p.queueName);
batchSize = p.messages.size();
}
}

KJ_IF_MAYBE(t, incomingRequest->getWorkerTracer()) {
t->setEventInfo(context.now(), Trace::QueueEventInfo(kj::mv(queueName), batchSize));
}

// Create a custom refcounted type for holding the queueEvent so that we can pass it to the
// waitUntil'ed callback safely without worrying about whether this coroutine gets canceled.
struct QueueEventHolder : public kj::Refcounted {
jsg::Ref<QueueEvent> event = nullptr;
};
auto queueEventHolder = kj::refcounted<QueueEventHolder>();

// It's a little ugly, but the usage of waitUntil (and finishScheduled) down below are here so
// that users can write queue handlers in the old addEventListener("queue", ...) syntax (where we
// can't just wait on their addEventListener handler to resolve because it can't be async).
context.addWaitUntil(context.run(
[this, entrypointName=entrypointName, &context, queueEvent = kj::addRef(*queueEventHolder),
&metrics = incomingRequest->getMetrics()]
(Worker::Lock& lock) mutable {
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

auto& typeHandler = lock.getWorker().getIsolate().getApiIsolate().getQueueTypeHandler(lock);
queueEvent->event = startQueueEvent(lock.getGlobalScope(), kj::mv(params), context.addObject(result), lock,
lock.getExportedHandler(entrypointName, context.getActor()), typeHandler);
}));

// TODO(soon): There's a good chance we'll want a different wall-clock timeout for queue handlers
// than for scheduled workers, but it's not at all clear yet to me what it should be, so just
// reuse the scheduled worker logic and timeout for now.
auto completed = co_await incomingRequest->finishScheduled();

co_return WorkerInterface::CustomEvent::Result {
.outcome = completed ? context.waitUntilStatus() : EventOutcome::EXCEEDED_CPU,
};
}

kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::sendRpc(
capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks,
rpc::EventDispatcher::Client dispatcher) {
auto req = dispatcher.castAs<rpc::EventDispatcher>().queueRequest();
KJ_SWITCH_ONEOF(params) {
KJ_CASE_ONEOF(p, rpc::EventDispatcher::QueueParams::Reader) {
req.setQueueName(p.getQueueName());
req.setMessages(p.getMessages());
}
KJ_CASE_ONEOF(p, QueueEvent::Params) {
req.setQueueName(p.queueName);
auto messages = req.initMessages(p.messages.size());
for (auto i: kj::indices(p.messages)) {
messages[i].setId(p.messages[i].id);
messages[i].setTimestampNs((p.messages[i].timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS);
messages[i].setData(p.messages[i].body);
}
}
}

return req.send().then([this](auto resp) {
auto respResult = resp.getResult();
this->result.retryAll = respResult.getRetryAll();
this->result.ackAll = respResult.getAckAll();
this->result.explicitRetries.clear();
for (const auto& msgId : respResult.getExplicitRetries()) {
this->result.explicitRetries.insert(kj::heapString(msgId));
}
this->result.explicitAcks.clear();
for (const auto& msgId : respResult.getExplicitAcks()) {
this->result.explicitAcks.insert(kj::heapString(msgId));
}
return WorkerInterface::CustomEvent::Result {
.outcome = respResult.getOutcome(),
};
});
}

kj::Array<kj::String> QueueCustomEventImpl::getExplicitRetries() const {
auto retryArray = kj::heapArrayBuilder<kj::String>(result.explicitRetries.size());
for (const auto& msgId : result.explicitRetries) {
retryArray.add(kj::heapString(msgId));
}
return retryArray.finish();
}

kj::Array<kj::String> QueueCustomEventImpl::getExplicitAcks() const {
auto ackArray = kj::heapArrayBuilder<kj::String>(result.explicitAcks.size());
for (const auto& msgId : result.explicitAcks) {
ackArray.add(kj::heapString(msgId));
}
return ackArray.finish();
}

} // namespace workerd::api
Loading