Skip to content

Commit

Permalink
Add application level hibernatable websockets autoresponses
Browse files Browse the repository at this point in the history
This commit add a new method for hibernatable web sockets that enables
a ping/pong application autoresponse, storing the last received pong
timestamp. There's also a method to access the last received timestamp.
  • Loading branch information
jqmmes committed May 11, 2023
1 parent 8919954 commit f4bdadf
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 1 deletion.
31 changes: 31 additions & 0 deletions src/workerd/api/actor-state.c++
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,37 @@ kj::Array<jsg::Ref<api::WebSocket>> DurableObjectState::getWebSockets(
return kj::Array<jsg::Ref<api::WebSocket>>();
}

void DurableObjectState::setWebSocketAutoresponse(kj::String request, kj::String response) {
auto maxRequestResponseSize = 2048;

JSG_REQUIRE(request.size() <= maxRequestResponseSize, RangeError, kj::str(
"Request cannot be larger than ", maxRequestResponseSize, " bytes. ",
"A request of size ", request.size(), " was provided."));

JSG_REQUIRE(response.size() <= maxRequestResponseSize, RangeError, kj::str(
"Response cannot be larger than ", maxRequestResponseSize, " bytes. ",
"A response of size ", response.size(), " was provided."));

auto& a = KJ_REQUIRE_NONNULL(IoContext::current().getActor());
if (a.getHibernationManager() == nullptr) {
a.setHibernationManager(kj::refcounted<HibernationManagerImpl>(
a.getLoopback(), KJ_REQUIRE_NONNULL(a.getHibernationEventType())));
// If there's no hibernation manager created yet, we should create one and
// set its auto response.
}
KJ_REQUIRE_NONNULL(a.getHibernationManager()).setWebSocketAutoResponse(
kj::mv(request),
kj::mv(response));
}

void DurableObjectState::unsetWebSocketAutoresponse() {
auto& a = KJ_REQUIRE_NONNULL(IoContext::current().getActor());
KJ_IF_MAYBE(manager, a.getHibernationManager()) {
// If there's no hibernation manager created yet, there's nothing to do here.
manager->unsetWebSocketAutoResponse();
}
}

kj::Array<kj::byte> serializeV8Value(v8::Local<v8::Value> value, v8::Isolate* isolate) {
jsg::Serializer serializer(isolate, jsg::Serializer::Options {
.version = 15,
Expand Down
10 changes: 10 additions & 0 deletions src/workerd/api/actor-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,13 +380,23 @@ class DurableObjectState: public jsg::Object {
// If no tag is provided, an array of all accepted WebSockets is returned.
// Disconnected WebSockets are automatically removed from the list.

void setWebSocketAutoresponse(kj::String request, kj::String response);
// Sets an object-wide websocket auto response message for a specific
// request string. All websockets belonging to the same object must
// reply to request with response and store the request recived timestamp.

void unsetWebSocketAutoresponse();
// Unsets an object-wide websocket auto response.

JSG_RESOURCE_TYPE(DurableObjectState, CompatibilityFlags::Reader flags) {
JSG_METHOD(waitUntil);
JSG_READONLY_INSTANCE_PROPERTY(id, getId);
JSG_READONLY_INSTANCE_PROPERTY(storage, getStorage);
JSG_METHOD(blockConcurrencyWhile);
JSG_METHOD(acceptWebSocket);
JSG_METHOD(getWebSockets);
JSG_METHOD(setWebSocketAutoresponse);
JSG_METHOD(unsetWebSocketAutoresponse);

if (flags.getWorkerdExperimental()) {
// TODO(someday): This currently exists for testing purposes only but maybe it could be
Expand Down
9 changes: 9 additions & 0 deletions src/workerd/api/web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,15 @@ void WebSocket::serializeAttachment(jsg::Lock& js, v8::Local<v8::Value> attachme
serializedAttachment = kj::mv(released.data);
}

void WebSocket::setAutoResponseTimestamp(kj::Maybe<kj::Date> time) {
autoResponseTimestamp = time;
}


kj::Maybe<kj::Date> WebSocket::getAutoResponseTimestamp() {
return autoResponseTimestamp;
}

void WebSocket::dispatchOpen(jsg::Lock& js) {
dispatchEventImpl(js, jsg::alloc<Event>("open"));
}
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/api/web-socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,8 @@ class WebSocket: public EventTarget {
// Used to get/set the attachment for hibernation.
// If the object isn't serialized, it will not survive hibernation.

void setAutoResponseTimestamp(kj::Maybe<kj::Date> time);

int getReadyState();

bool isAccepted();
Expand All @@ -384,6 +386,7 @@ class WebSocket: public EventTarget {
kj::Maybe<kj::StringPtr> getUrl();
kj::Maybe<kj::StringPtr> getProtocol();
kj::Maybe<kj::StringPtr> getExtensions();
kj::Maybe<kj::Date> getAutoResponseTimestamp();

JSG_RESOURCE_TYPE(WebSocket, CompatibilityFlags::Reader flags) {
JSG_INHERIT(EventTarget);
Expand All @@ -407,11 +410,13 @@ class WebSocket: public EventTarget {
JSG_READONLY_PROTOTYPE_PROPERTY(url, getUrl);
JSG_READONLY_PROTOTYPE_PROPERTY(protocol, getProtocol);
JSG_READONLY_PROTOTYPE_PROPERTY(extensions, getExtensions);
JSG_READONLY_PROTOTYPE_PROPERTY(autoResponseTimestamp, getAutoResponseTimestamp);
} else {
JSG_READONLY_INSTANCE_PROPERTY(readyState, getReadyState);
JSG_READONLY_INSTANCE_PROPERTY(url, getUrl);
JSG_READONLY_INSTANCE_PROPERTY(protocol, getProtocol);
JSG_READONLY_INSTANCE_PROPERTY(extensions, getExtensions);
JSG_READONLY_INSTANCE_PROPERTY(autoResponseTimestamp, getAutoResponseTimestamp);
}

JSG_TS_DEFINE(type WebSocketEventMap = {
Expand All @@ -427,6 +432,7 @@ class WebSocket: public EventTarget {
kj::Maybe<kj::String> url;
kj::Maybe<kj::String> protocol = kj::String();
kj::Maybe<kj::String> extensions = kj::String();
kj::Maybe<kj::Date> autoResponseTimestamp;
kj::Maybe<kj::Array<byte>> serializedAttachment;
// All WebSockets have this property. It starts out null but can
// be assigned to any serializable value. The property will survive hibernation.
Expand Down
51 changes: 50 additions & 1 deletion src/workerd/io/hibernation-manager.c++
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ kj::Vector<jsg::Ref<api::WebSocket>> HibernationManagerImpl::getWebSockets(
return kj::mv(matches);
}

void HibernationManagerImpl::setWebSocketAutoResponse(kj::String request,
kj::String response) {
autoResponseRequest = kj::mv(request);
autoResponseResponse = kj::mv(response);
}

void HibernationManagerImpl::unsetWebSocketAutoResponse() {
autoResponseRequest = nullptr;
autoResponseResponse = nullptr;
}

void HibernationManagerImpl::setTimerChannel(TimerChannel& timerChannel) {
timer = timerChannel;
}

void HibernationManagerImpl::hibernateWebSockets(Worker::Lock& lock) {
jsg::Lock& js(lock);
v8::HandleScope handleScope(js.v8Isolate);
Expand Down Expand Up @@ -165,8 +180,42 @@ kj::Promise<void> HibernationManagerImpl::readLoop(HibernatableWebSocket& hib) {
while (true) {
kj::WebSocket::Message message = co_await ws.receive();
// Note that errors are handled by the callee of `readLoop`, since we throw from `receive()`.
webSocketForEventHandler = hib;

auto skip = false;

KJ_IF_MAYBE (req, autoResponseRequest) {
KJ_SWITCH_ONEOF(message) {
KJ_CASE_ONEOF(text, kj::String) {
if (text == *req) {
// If the received message equals to the one set for auto response, we must
// short-circuit readLoop, store the current timestamp and respond with
// autoResponseResponse.
TimerChannel& timerChannel = KJ_REQUIRE_NONNULL(timer);
// We should have set the timerChannel previously in the hibernation manager.
// If we haven't, we aren't able to get the current time.
hib.autoResponseTimestamp = timerChannel.now();
KJ_IF_MAYBE(active, hib.activeOrPackage.tryGet<jsg::Ref<api::WebSocket>>()) {
// If the actor is not hibernated, we need to update autoResponseTimestamp on the
// active websocket.
(*active)->setAutoResponseTimestamp(hib.autoResponseTimestamp);
}
auto response = KJ_REQUIRE_NONNULL(kj::mv(autoResponseResponse));
// If we have an autoResponseRequest set, we must have autoResponseResponse also.
ws.send(response.asArray());
skip = true;
// If we've sent an auto response message, we should not unhibernate or deliver the
// received message to the actor
}
}
KJ_CASE_ONEOF_DEFAULT {}
}
}

if (skip) {
continue;
}

webSocketForEventHandler = hib;
// Build the event params depending on what type of message we got.
kj::Maybe<api::HibernatableSocketParams> maybeParams;
KJ_SWITCH_ONEOF(message) {
Expand Down
11 changes: 11 additions & 0 deletions src/workerd/io/hibernation-manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
// Hibernates all the websockets held by the HibernationManager.
// This converts our activeOrPackage from an api::WebSocket to a HibernationPackage.

void setWebSocketAutoResponse(kj::String request, kj::String response) override;
void unsetWebSocketAutoResponse() override;
void setTimerChannel(TimerChannel& timerChannel) override;

friend jsg::Ref<api::WebSocket> api::HibernatableWebSocketEvent::getWebSocket(jsg::Lock& lock);

private:
Expand Down Expand Up @@ -128,6 +132,10 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
bool hasDispatchedClose = false;
// True once we have dispatched the close event.
// This prevents us from dispatching it if we have already done so.

kj::Maybe<kj::Date> autoResponseTimestamp;
// Stores the last received autoResponseRequest timestamp.

friend HibernationManagerImpl;
};

Expand Down Expand Up @@ -187,5 +195,8 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
};
DisconnectHandler onDisconnect;
kj::TaskSet readLoopTasks;
kj::Maybe<kj::String> autoResponseRequest;
kj::Maybe<kj::String> autoResponseResponse;
kj::Maybe<TimerChannel&> timer;
};
}; // namespace workerd
4 changes: 4 additions & 0 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -3090,7 +3090,11 @@ kj::Maybe<Worker::Actor::HibernationManager&> Worker::Actor::getHibernationManag

void Worker::Actor::setHibernationManager(kj::Own<HibernationManager> hib) {
KJ_REQUIRE(impl->hibernationManager == nullptr);
hib->setTimerChannel(impl->timerChannel);
// Not the cleanest way to provide hibernation manager with a timer channel reference, but
// where HibernationManager is constructed (actor-state), we don't have a timer channel ref.
impl->hibernationManager = kj::mv(hib);

}

kj::Maybe<uint16_t> Worker::Actor::getHibernationEventType() {
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,9 @@ class Worker::Actor final: public kj::Refcounted {
jsg::Lock& js,
kj::Maybe<kj::StringPtr> tag) = 0;
virtual void hibernateWebSockets(Worker::Lock& lock) = 0;
virtual void setWebSocketAutoResponse(kj::String request, kj::String response) = 0;
virtual void unsetWebSocketAutoResponse() = 0;
virtual void setTimerChannel(TimerChannel& timerChannel) = 0;
};

Actor(const Worker& worker, kj::Maybe<RequestTracker&> tracker, Id actorId,
Expand Down

0 comments on commit f4bdadf

Please sign in to comment.