Skip to content

Commit

Permalink
Add application level hibernatable websockets autoresponses
Browse files Browse the repository at this point in the history
This commit add a new method for hibernatable web sockets that enables
a ping/pong application autoresponse, storing the last received pong
timestamp. There's also a method to access the last received timestamp.
  • Loading branch information
jqmmes committed Jun 2, 2023
1 parent d3788af commit 9f8b8eb
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 2 deletions.
47 changes: 47 additions & 0 deletions src/workerd/api/actor-state.c++
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,53 @@ kj::Array<jsg::Ref<api::WebSocket>> DurableObjectState::getWebSockets(
return kj::Array<jsg::Ref<api::WebSocket>>();
}

void DurableObjectState::setWebSocketAutoResponse(
jsg::Optional<jsg::Ref<WebSocketRequestResponsePair>> maybeReqResp) {
auto& a = KJ_REQUIRE_NONNULL(IoContext::current().getActor());

if (maybeReqResp == nullptr) {
// If there's no request/response pair, we unset any current set auto response configuration.
KJ_IF_MAYBE(manager, a.getHibernationManager()) {
// If there's no hibernation manager created yet, there's nothing to do here.
manager->unsetWebSocketAutoResponse();
}
return;
}

auto reqResp = KJ_REQUIRE_NONNULL(kj::mv(maybeReqResp));
auto maxRequestOrResponseSize = 2048;

JSG_REQUIRE(reqResp->getRequest().size() <= maxRequestOrResponseSize, RangeError, kj::str(
"Request cannot be larger than ", maxRequestOrResponseSize, " bytes. ",
"A request of size ", reqResp->getRequest().size(), " was provided."));

JSG_REQUIRE(reqResp->getResponse().size() <= maxRequestOrResponseSize, RangeError, kj::str(
"Response cannot be larger than ", maxRequestOrResponseSize, " bytes. ",
"A response of size ", reqResp->getResponse().size(), " was provided."));

if (a.getHibernationManager() == nullptr) {
a.setHibernationManager(kj::refcounted<HibernationManagerImpl>(
a.getLoopback(), KJ_REQUIRE_NONNULL(a.getHibernationEventType())));
// If there's no hibernation manager created yet, we should create one and
// set its auto response.
}
KJ_REQUIRE_NONNULL(a.getHibernationManager()).setWebSocketAutoResponse(kj::mv(reqResp));
}

kj::Maybe<jsg::Ref<api::WebSocketRequestResponsePair>> DurableObjectState::getWebSocketAutoResponse() {
auto& a = KJ_REQUIRE_NONNULL(IoContext::current().getActor());
KJ_IF_MAYBE(manager, a.getHibernationManager()) {
// If there's no hibernation manager created yet, there's nothing to do here.
auto r = manager->getWebSocketAutoResponse();
return r;
}
return nullptr;
}

kj::Maybe<kj::Date> DurableObjectState::getWebSocketAutoResponseTimestamp(jsg::Ref<WebSocket> ws) {
return ws->getAutoResponseTimestamp();
}

kj::Array<kj::byte> serializeV8Value(v8::Local<v8::Value> value, v8::Isolate* isolate) {
jsg::Serializer serializer(isolate, jsg::Serializer::Options {
.version = 15,
Expand Down
41 changes: 40 additions & 1 deletion src/workerd/api/actor-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,28 @@ class ActorState: public jsg::Object {
kj::Maybe<jsg::Ref<DurableObjectStorage>> persistent;
};

class WebSocketRequestResponsePair: public jsg::Object {
public:
WebSocketRequestResponsePair(kj::String request, kj::String response)
: request(kj::mv(request)), response(kj::mv(response)) {};

static jsg::Ref<WebSocketRequestResponsePair> constructor(kj::String request, kj::String response) {
return jsg::alloc<WebSocketRequestResponsePair>(kj::mv(request),kj::mv(response));
};

kj::StringPtr getRequest() { return request.asPtr(); }
kj::StringPtr getResponse() { return response.asPtr(); }

JSG_RESOURCE_TYPE(WebSocketRequestResponsePair) {
JSG_READONLY_PROTOTYPE_PROPERTY(request, getRequest);
JSG_READONLY_PROTOTYPE_PROPERTY(response, getResponse);
}

private:
kj::String request;
kj::String response;
};

class DurableObjectState: public jsg::Object {
// The type passed as the first parameter to durable object class's constructor.

Expand Down Expand Up @@ -386,13 +408,29 @@ class DurableObjectState: public jsg::Object {
// If no tag is provided, an array of all accepted WebSockets is returned.
// Disconnected WebSockets are automatically removed from the list.

void setWebSocketAutoResponse(jsg::Optional<jsg::Ref<api::WebSocketRequestResponsePair>> maybeReqResp);
// Sets an object-wide websocket auto response message for a specific
// request string. All websockets belonging to the same object must
// reply to the request with the matching response, then store the timestamp at which
// the request was received.
// If maybeReqResp is not set, we consider it as unset and remove any set request response pair.

kj::Maybe<jsg::Ref<api::WebSocketRequestResponsePair>> getWebSocketAutoResponse();
// Gets the currently set object-wide websocket auto response.

kj::Maybe<kj::Date> getWebSocketAutoResponseTimestamp(jsg::Ref<WebSocket> ws);
// Get the last auto response timestamp or null

JSG_RESOURCE_TYPE(DurableObjectState, CompatibilityFlags::Reader flags) {
JSG_METHOD(waitUntil);
JSG_READONLY_INSTANCE_PROPERTY(id, getId);
JSG_READONLY_INSTANCE_PROPERTY(storage, getStorage);
JSG_METHOD(blockConcurrencyWhile);
JSG_METHOD(acceptWebSocket);
JSG_METHOD(getWebSockets);
JSG_METHOD(setWebSocketAutoResponse);
JSG_METHOD(getWebSocketAutoResponse);
JSG_METHOD(getWebSocketAutoResponseTimestamp);

if (flags.getWorkerdExperimental()) {
// TODO(someday): This currently exists for testing purposes only but maybe it could be
Expand Down Expand Up @@ -428,6 +466,7 @@ class DurableObjectState: public jsg::Object {
api::DurableObjectStorageOperations::GetOptions, \
api::DurableObjectStorageOperations::GetAlarmOptions, \
api::DurableObjectStorageOperations::PutOptions, \
api::DurableObjectStorageOperations::SetAlarmOptions
api::DurableObjectStorageOperations::SetAlarmOptions, \
api::WebSocketRequestResponsePair

} // namespace workerd::api
1 change: 1 addition & 0 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
JSG_NESTED_TYPE(Response);
JSG_NESTED_TYPE(WebSocket);
JSG_NESTED_TYPE(WebSocketPair);
JSG_NESTED_TYPE(WebSocketRequestResponsePair);

JSG_NESTED_TYPE(AbortController);
JSG_NESTED_TYPE(AbortSignal);
Expand Down
9 changes: 9 additions & 0 deletions src/workerd/api/web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,15 @@ void WebSocket::serializeAttachment(jsg::Lock& js, v8::Local<v8::Value> attachme
serializedAttachment = kj::mv(released.data);
}

void WebSocket::setAutoResponseTimestamp(kj::Maybe<kj::Date> time) {
autoResponseTimestamp = time;
}


kj::Maybe<kj::Date> WebSocket::getAutoResponseTimestamp() {
return autoResponseTimestamp;
}

void WebSocket::dispatchOpen(jsg::Lock& js) {
dispatchEventImpl(js, jsg::alloc<Event>("open"));
}
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/api/web-socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ class WebSocket: public EventTarget {
// Used to get/set the attachment for hibernation.
// If the object isn't serialized, it will not survive hibernation.

void setAutoResponseTimestamp(kj::Maybe<kj::Date> time);
kj::Maybe<kj::Date> getAutoResponseTimestamp();
// Used to get/store the last auto request/response timestamp for this WebSocket.
// These methods are c++ only and are not exposed to our js interface.

int getReadyState();

bool isAccepted();
Expand Down Expand Up @@ -436,6 +441,7 @@ class WebSocket: public EventTarget {
kj::Maybe<kj::String> url;
kj::Maybe<kj::String> protocol = kj::String();
kj::Maybe<kj::String> extensions = kj::String();
kj::Maybe<kj::Date> autoResponseTimestamp;
kj::Maybe<kj::Array<byte>> serializedAttachment;
// All WebSockets have this property. It starts out null but can
// be assigned to any serializable value. The property will survive hibernation.
Expand Down
57 changes: 57 additions & 0 deletions src/workerd/io/hibernation-manager.c++
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,27 @@ kj::Vector<jsg::Ref<api::WebSocket>> HibernationManagerImpl::getWebSockets(
return kj::mv(matches);
}

void HibernationManagerImpl::setWebSocketAutoResponse(
jsg::Ref<api::WebSocketRequestResponsePair> reqResp) {
autoResponsePair = kj::mv(reqResp);
}

void HibernationManagerImpl::unsetWebSocketAutoResponse() {
autoResponsePair = nullptr;
}

kj::Maybe<jsg::Ref<api::WebSocketRequestResponsePair>> HibernationManagerImpl::getWebSocketAutoResponse() {
KJ_IF_MAYBE(ar, autoResponsePair) {
return ar->addRef();
} else {
return nullptr;
}
}

void HibernationManagerImpl::setTimerChannel(TimerChannel& timerChannel) {
timer = timerChannel;
}

void HibernationManagerImpl::hibernateWebSockets(Worker::Lock& lock) {
jsg::Lock& js(lock);
v8::HandleScope handleScope(js.v8Isolate);
Expand Down Expand Up @@ -164,6 +185,42 @@ kj::Promise<void> HibernationManagerImpl::readLoop(HibernatableWebSocket& hib) {
while (true) {
kj::WebSocket::Message message = co_await ws.receive();
// Note that errors are handled by the callee of `readLoop`, since we throw from `receive()`.

auto skip = false;

KJ_IF_MAYBE (reqResp, autoResponsePair) {
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(text, kj::String) {
if (text == (*reqResp)->getRequest()) {
// If the received message matches the one set for auto-response, we must
// short-circuit readLoop, store the current timestamp and and automatically respond
// with the expected response.
TimerChannel& timerChannel = KJ_REQUIRE_NONNULL(timer);
// We should have set the timerChannel previously in the hibernation manager.
// If we haven't, we aren't able to get the current time.
hib.autoResponseTimestamp = timerChannel.now();
// We'll store the current timestamp in the HibernatableWebSocket to assure it gets
// stored even if the WebSocket is currently hibernating. In that scenario, the timestamp
// value will be loaded into the WebSocket during unhibernation.
KJ_IF_MAYBE(active, hib.activeOrPackage.tryGet<jsg::Ref<api::WebSocket>>()) {
// If the actor is not hibernated/If the WebSocket is active, we need to update
// autoResponseTimestamp on the active websocket.
(*active)->setAutoResponseTimestamp(hib.autoResponseTimestamp);
}
ws.send((*reqResp)->getResponse().asArray());
skip = true;
// If we've sent an auto response message, we should not unhibernate or deliver the
// received message to the actor
}
}
KJ_CASE_ONEOF_DEFAULT {}
}
}

if (skip) {
continue;
}

auto websocketId = randomUUID(nullptr);
webSocketsForEventHandler.insert(kj::str(websocketId), &hib);

Expand Down
16 changes: 15 additions & 1 deletion src/workerd/io/hibernation-manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
// Hibernates all the websockets held by the HibernationManager.
// This converts our activeOrPackage from an api::WebSocket to a HibernationPackage.

void setWebSocketAutoResponse(jsg::Ref<api::WebSocketRequestResponsePair> reqResp) override;
void unsetWebSocketAutoResponse() override;
kj::Maybe<jsg::Ref<api::WebSocketRequestResponsePair>> getWebSocketAutoResponse() override;
void setTimerChannel(TimerChannel& timerChannel) override;

friend class api::HibernatableWebSocketEvent;

private:
Expand Down Expand Up @@ -100,7 +105,10 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
// to the api::WebSocket.
KJ_IF_MAYBE(package, activeOrPackage.tryGet<api::WebSocket::HibernationPackage>()) {
activeOrPackage.init<jsg::Ref<api::WebSocket>>(
api::WebSocket::hibernatableFromNative(js, *KJ_REQUIRE_NONNULL(ws), kj::mv(*package)));
api::WebSocket::hibernatableFromNative(js, *KJ_REQUIRE_NONNULL(ws), kj::mv(*package))
)->setAutoResponseTimestamp(autoResponseTimestamp);
// Now that we unhibernated the WebSocket, we can set the last received autoResponse timestamp
// that was stored in the corresponding HibernatableWebSocket.
}
return activeOrPackage.get<jsg::Ref<api::WebSocket>>().addRef();
}
Expand Down Expand Up @@ -132,6 +140,10 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
bool hasDispatchedClose = false;
// True once we have dispatched the close event.
// This prevents us from dispatching it if we have already done so.

kj::Maybe<kj::Date> autoResponseTimestamp;
// Stores the last received autoResponseRequest timestamp.

friend HibernationManagerImpl;
};

Expand Down Expand Up @@ -196,5 +208,7 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
};
DisconnectHandler onDisconnect;
kj::TaskSet readLoopTasks;
kj::Maybe<jsg::Ref<api::WebSocketRequestResponsePair>> autoResponsePair;
kj::Maybe<TimerChannel&> timer;
};
}; // namespace workerd
3 changes: 3 additions & 0 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3103,6 +3103,9 @@ kj::Maybe<Worker::Actor::HibernationManager&> Worker::Actor::getHibernationManag

void Worker::Actor::setHibernationManager(kj::Own<HibernationManager> hib) {
KJ_REQUIRE(impl->hibernationManager == nullptr);
hib->setTimerChannel(impl->timerChannel);
// Not the cleanest way to provide hibernation manager with a timer channel reference, but
// where HibernationManager is constructed (actor-state), we don't have a timer channel ref.
impl->hibernationManager = kj::mv(hib);
}

Expand Down
5 changes: 5 additions & 0 deletions src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace api {
struct CryptoAlgorithm;
struct QueueExportedHandler;
class WebSocket;
class WebSocketRequestResponsePair;
}

class IoContext;
Expand Down Expand Up @@ -684,6 +685,10 @@ class Worker::Actor final: public kj::Refcounted {
jsg::Lock& js,
kj::Maybe<kj::StringPtr> tag) = 0;
virtual void hibernateWebSockets(Worker::Lock& lock) = 0;
virtual void setWebSocketAutoResponse(jsg::Ref<api::WebSocketRequestResponsePair> reqResp) = 0;
virtual void unsetWebSocketAutoResponse() = 0;
virtual kj::Maybe<jsg::Ref<api::WebSocketRequestResponsePair>> getWebSocketAutoResponse() = 0;
virtual void setTimerChannel(TimerChannel& timerChannel) = 0;
};

Actor(const Worker& worker, kj::Maybe<RequestTracker&> tracker, Id actorId,
Expand Down

0 comments on commit 9f8b8eb

Please sign in to comment.