diff --git a/src/workerd/api/actor-state.c++ b/src/workerd/api/actor-state.c++ index ce8ad52fe19..74259797b62 100644 --- a/src/workerd/api/actor-state.c++ +++ b/src/workerd/api/actor-state.c++ @@ -830,6 +830,53 @@ kj::Array> DurableObjectState::getWebSockets( return kj::Array>(); } +void DurableObjectState::setWebSocketAutoResponse( + jsg::Optional> maybeReqResp) { + auto& a = KJ_REQUIRE_NONNULL(IoContext::current().getActor()); + + if (maybeReqResp == nullptr) { + // If there's no request/response pair, we unset any current set auto response configuration. + KJ_IF_MAYBE(manager, a.getHibernationManager()) { + // If there's no hibernation manager created yet, there's nothing to do here. + manager->unsetWebSocketAutoResponse(); + } + return; + } + + auto reqResp = KJ_REQUIRE_NONNULL(kj::mv(maybeReqResp)); + auto maxRequestOrResponseSize = 2048; + + JSG_REQUIRE(reqResp->getRequest().size() <= maxRequestOrResponseSize, RangeError, kj::str( + "Request cannot be larger than ", maxRequestOrResponseSize, " bytes. ", + "A request of size ", reqResp->getRequest().size(), " was provided.")); + + JSG_REQUIRE(reqResp->getResponse().size() <= maxRequestOrResponseSize, RangeError, kj::str( + "Response cannot be larger than ", maxRequestOrResponseSize, " bytes. ", + "A response of size ", reqResp->getResponse().size(), " was provided.")); + + if (a.getHibernationManager() == nullptr) { + a.setHibernationManager(kj::refcounted( + a.getLoopback(), KJ_REQUIRE_NONNULL(a.getHibernationEventType()))); + // If there's no hibernation manager created yet, we should create one and + // set its auto response. + } + KJ_REQUIRE_NONNULL(a.getHibernationManager()).setWebSocketAutoResponse(kj::mv(reqResp)); +} + +kj::Maybe> DurableObjectState::getWebSocketAutoResponse() { + auto& a = KJ_REQUIRE_NONNULL(IoContext::current().getActor()); + KJ_IF_MAYBE(manager, a.getHibernationManager()) { + // If there's no hibernation manager created yet, there's nothing to do here. + auto r = manager->getWebSocketAutoResponse(); + return r; + } + return nullptr; +} + +kj::Maybe DurableObjectState::getWebSocketAutoResponseTimestamp(jsg::Ref ws) { + return ws->getAutoResponseTimestamp(); +} + kj::Array serializeV8Value(v8::Local value, v8::Isolate* isolate) { jsg::Serializer serializer(isolate, jsg::Serializer::Options { .version = 15, diff --git a/src/workerd/api/actor-state.h b/src/workerd/api/actor-state.h index 783ebe5b3d4..dbf1c4083a2 100644 --- a/src/workerd/api/actor-state.h +++ b/src/workerd/api/actor-state.h @@ -340,6 +340,28 @@ class ActorState: public jsg::Object { kj::Maybe> persistent; }; +class WebSocketRequestResponsePair: public jsg::Object { +public: + WebSocketRequestResponsePair(kj::String request, kj::String response) + : request(kj::mv(request)), response(kj::mv(response)) {}; + + static jsg::Ref constructor(kj::String request, kj::String response) { + return jsg::alloc(kj::mv(request),kj::mv(response)); + }; + + kj::StringPtr getRequest() { return request.asPtr(); } + kj::StringPtr getResponse() { return response.asPtr(); } + + JSG_RESOURCE_TYPE(WebSocketRequestResponsePair) { + JSG_READONLY_PROTOTYPE_PROPERTY(request, getRequest); + JSG_READONLY_PROTOTYPE_PROPERTY(response, getResponse); + } + +private: + kj::String request; + kj::String response; +}; + class DurableObjectState: public jsg::Object { // The type passed as the first parameter to durable object class's constructor. @@ -386,6 +408,19 @@ class DurableObjectState: public jsg::Object { // If no tag is provided, an array of all accepted WebSockets is returned. // Disconnected WebSockets are automatically removed from the list. + void setWebSocketAutoResponse(jsg::Optional> maybeReqResp); + // Sets an object-wide websocket auto response message for a specific + // request string. All websockets belonging to the same object must + // reply to the request with the matching response, then store the timestamp at which + // the request was received. + // If maybeReqResp is not set, we consider it as unset and remove any set request response pair. + + kj::Maybe> getWebSocketAutoResponse(); + // Gets the currently set object-wide websocket auto response. + + kj::Maybe getWebSocketAutoResponseTimestamp(jsg::Ref ws); + // Get the last auto response timestamp or null + JSG_RESOURCE_TYPE(DurableObjectState, CompatibilityFlags::Reader flags) { JSG_METHOD(waitUntil); JSG_READONLY_INSTANCE_PROPERTY(id, getId); @@ -393,6 +428,9 @@ class DurableObjectState: public jsg::Object { JSG_METHOD(blockConcurrencyWhile); JSG_METHOD(acceptWebSocket); JSG_METHOD(getWebSockets); + JSG_METHOD(setWebSocketAutoResponse); + JSG_METHOD(getWebSocketAutoResponse); + JSG_METHOD(getWebSocketAutoResponseTimestamp); if (flags.getWorkerdExperimental()) { // TODO(someday): This currently exists for testing purposes only but maybe it could be @@ -428,6 +466,7 @@ class DurableObjectState: public jsg::Object { api::DurableObjectStorageOperations::GetOptions, \ api::DurableObjectStorageOperations::GetAlarmOptions, \ api::DurableObjectStorageOperations::PutOptions, \ - api::DurableObjectStorageOperations::SetAlarmOptions + api::DurableObjectStorageOperations::SetAlarmOptions, \ + api::WebSocketRequestResponsePair } // namespace workerd::api diff --git a/src/workerd/api/global-scope.h b/src/workerd/api/global-scope.h index 7a97e4e9f73..ea92969f3f5 100644 --- a/src/workerd/api/global-scope.h +++ b/src/workerd/api/global-scope.h @@ -456,6 +456,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope { JSG_NESTED_TYPE(Response); JSG_NESTED_TYPE(WebSocket); JSG_NESTED_TYPE(WebSocketPair); + JSG_NESTED_TYPE(WebSocketRequestResponsePair); JSG_NESTED_TYPE(AbortController); JSG_NESTED_TYPE(AbortSignal); diff --git a/src/workerd/api/web-socket.c++ b/src/workerd/api/web-socket.c++ index bb54e485541..93b5a793281 100644 --- a/src/workerd/api/web-socket.c++ +++ b/src/workerd/api/web-socket.c++ @@ -664,6 +664,15 @@ void WebSocket::serializeAttachment(jsg::Lock& js, v8::Local attachme serializedAttachment = kj::mv(released.data); } +void WebSocket::setAutoResponseTimestamp(kj::Maybe time) { + autoResponseTimestamp = time; +} + + +kj::Maybe WebSocket::getAutoResponseTimestamp() { + return autoResponseTimestamp; +} + void WebSocket::dispatchOpen(jsg::Lock& js) { dispatchEventImpl(js, jsg::alloc("open")); } diff --git a/src/workerd/api/web-socket.h b/src/workerd/api/web-socket.h index b957776feb5..7d8e18ecd82 100644 --- a/src/workerd/api/web-socket.h +++ b/src/workerd/api/web-socket.h @@ -380,6 +380,11 @@ class WebSocket: public EventTarget { // Used to get/set the attachment for hibernation. // If the object isn't serialized, it will not survive hibernation. + void setAutoResponseTimestamp(kj::Maybe time); + kj::Maybe getAutoResponseTimestamp(); + // Used to get/store the last auto request/response timestamp for this WebSocket. + // These methods are c++ only and are not exposed to our js interface. + int getReadyState(); bool isAccepted(); @@ -436,6 +441,7 @@ class WebSocket: public EventTarget { kj::Maybe url; kj::Maybe protocol = kj::String(); kj::Maybe extensions = kj::String(); + kj::Maybe autoResponseTimestamp; kj::Maybe> serializedAttachment; // All WebSockets have this property. It starts out null but can // be assigned to any serializable value. The property will survive hibernation. diff --git a/src/workerd/io/hibernation-manager.c++ b/src/workerd/io/hibernation-manager.c++ index 9a718520caa..6d1b723a1c5 100644 --- a/src/workerd/io/hibernation-manager.c++ +++ b/src/workerd/io/hibernation-manager.c++ @@ -96,6 +96,27 @@ kj::Vector> HibernationManagerImpl::getWebSockets( return kj::mv(matches); } +void HibernationManagerImpl::setWebSocketAutoResponse( + jsg::Ref reqResp) { + autoResponsePair = kj::mv(reqResp); +} + +void HibernationManagerImpl::unsetWebSocketAutoResponse() { + autoResponsePair = nullptr; +} + +kj::Maybe> HibernationManagerImpl::getWebSocketAutoResponse() { + KJ_IF_MAYBE(ar, autoResponsePair) { + return ar->addRef(); + } else { + return nullptr; + } +} + +void HibernationManagerImpl::setTimerChannel(TimerChannel& timerChannel) { + timer = timerChannel; +} + void HibernationManagerImpl::hibernateWebSockets(Worker::Lock& lock) { jsg::Lock& js(lock); v8::HandleScope handleScope(js.v8Isolate); @@ -164,6 +185,42 @@ kj::Promise HibernationManagerImpl::readLoop(HibernatableWebSocket& hib) { while (true) { kj::WebSocket::Message message = co_await ws.receive(); // Note that errors are handled by the callee of `readLoop`, since we throw from `receive()`. + + auto skip = false; + + KJ_IF_MAYBE (reqResp, autoResponsePair) { + KJ_SWITCH_ONEOF(message) { + KJ_CASE_ONEOF(text, kj::String) { + if (text == (*reqResp)->getRequest()) { + // If the received message matches the one set for auto-response, we must + // short-circuit readLoop, store the current timestamp and and automatically respond + // with the expected response. + TimerChannel& timerChannel = KJ_REQUIRE_NONNULL(timer); + // We should have set the timerChannel previously in the hibernation manager. + // If we haven't, we aren't able to get the current time. + hib.autoResponseTimestamp = timerChannel.now(); + // We'll store the current timestamp in the HibernatableWebSocket to assure it gets + // stored even if the WebSocket is currently hibernating. In that scenario, the timestamp + // value will be loaded into the WebSocket during unhibernation. + KJ_IF_MAYBE(active, hib.activeOrPackage.tryGet>()) { + // If the actor is not hibernated/If the WebSocket is active, we need to update + // autoResponseTimestamp on the active websocket. + (*active)->setAutoResponseTimestamp(hib.autoResponseTimestamp); + } + ws.send((*reqResp)->getResponse().asArray()); + skip = true; + // If we've sent an auto response message, we should not unhibernate or deliver the + // received message to the actor + } + } + KJ_CASE_ONEOF_DEFAULT {} + } + } + + if (skip) { + continue; + } + auto websocketId = randomUUID(nullptr); webSocketsForEventHandler.insert(kj::str(websocketId), &hib); diff --git a/src/workerd/io/hibernation-manager.h b/src/workerd/io/hibernation-manager.h index e4cf093eca4..74778f8f1b8 100644 --- a/src/workerd/io/hibernation-manager.h +++ b/src/workerd/io/hibernation-manager.h @@ -39,6 +39,11 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager { // Hibernates all the websockets held by the HibernationManager. // This converts our activeOrPackage from an api::WebSocket to a HibernationPackage. + void setWebSocketAutoResponse(jsg::Ref reqResp) override; + void unsetWebSocketAutoResponse() override; + kj::Maybe> getWebSocketAutoResponse() override; + void setTimerChannel(TimerChannel& timerChannel) override; + friend class api::HibernatableWebSocketEvent; private: @@ -101,6 +106,9 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager { KJ_IF_MAYBE(package, activeOrPackage.tryGet()) { activeOrPackage.init>( api::WebSocket::hibernatableFromNative(js, *KJ_REQUIRE_NONNULL(ws), kj::mv(*package))); + activeOrPackage.get>()->setAutoResponseTimestamp(autoResponseTimestamp); + // Now that unhibernated the WebSocket, we can set the last received autoResponse timestamp + // that was stored in the corresponding HibernatableWebSocket. } return activeOrPackage.get>().addRef(); } @@ -132,6 +140,10 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager { bool hasDispatchedClose = false; // True once we have dispatched the close event. // This prevents us from dispatching it if we have already done so. + + kj::Maybe autoResponseTimestamp; + // Stores the last received autoResponseRequest timestamp. + friend HibernationManagerImpl; }; @@ -196,5 +208,7 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager { }; DisconnectHandler onDisconnect; kj::TaskSet readLoopTasks; + kj::Maybe> autoResponsePair; + kj::Maybe timer; }; }; // namespace workerd diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index fe25628f9c0..da8b18c6d2a 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -3103,6 +3103,9 @@ kj::Maybe Worker::Actor::getHibernationManag void Worker::Actor::setHibernationManager(kj::Own hib) { KJ_REQUIRE(impl->hibernationManager == nullptr); + hib->setTimerChannel(impl->timerChannel); + // Not the cleanest way to provide hibernation manager with a timer channel reference, but + // where HibernationManager is constructed (actor-state), we don't have a timer channel ref. impl->hibernationManager = kj::mv(hib); } diff --git a/src/workerd/io/worker.h b/src/workerd/io/worker.h index 008d48cadae..744cc883e30 100644 --- a/src/workerd/io/worker.h +++ b/src/workerd/io/worker.h @@ -38,6 +38,7 @@ namespace api { struct CryptoAlgorithm; struct QueueExportedHandler; class WebSocket; + class WebSocketRequestResponsePair; } class IoContext; @@ -684,6 +685,10 @@ class Worker::Actor final: public kj::Refcounted { jsg::Lock& js, kj::Maybe tag) = 0; virtual void hibernateWebSockets(Worker::Lock& lock) = 0; + virtual void setWebSocketAutoResponse(jsg::Ref reqResp) = 0; + virtual void unsetWebSocketAutoResponse() = 0; + virtual kj::Maybe> getWebSocketAutoResponse() = 0; + virtual void setTimerChannel(TimerChannel& timerChannel) = 0; }; Actor(const Worker& worker, kj::Maybe tracker, Id actorId,