From 87a45243f063b6d0b154092cfb68a5b903c48896 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Thu, 21 May 2026 14:22:16 -0500 Subject: [PATCH 1/9] Update config capnp and worker-interface.capnp for clustering. This defines how the new feature is configured and exposed on the network. --- src/workerd/io/worker-interface.capnp | 12 ++++ src/workerd/server/workerd.capnp | 86 +++++++++++++++++++++++++-- 2 files changed, 93 insertions(+), 5 deletions(-) diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 4a29ca2e9fe..f071776bca5 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -888,6 +888,18 @@ interface WorkerdDebugPort { # purposes. # # This interface is subject to change. It is intended for use by miniflare. + # + # This is also the same interface that workerd exports on its cluster port, which other workerd + # instances in the same cluster may connect to. See ClusterConfig in workerd.capnp for how to + # configure clustering. The cluster port is only intended to be used by other instances in the + # cluster, and does not implement a regular two-party RPC protocol. `ClusterRegistry` implements + # the `VatNetwork` intended to be used for this. Nobody else should try to connect to it. + # + # TODO(clustering): Once channel tokens have been extended to support actors, consider changing + # the cluster interface with one that takes a channel token. This seems a bit cleaner and safer + # than letting the client specify props and such directly. + # TODO(clustering): Automatically use encryption on non-localhost networks. We have an X25519 + # keypair for each node, no need for certs! getEntrypoint @0 (service :Text, entrypoint :Text, props :Frankenvalue) -> (entrypoint :WorkerdBootstrap); diff --git a/src/workerd/server/workerd.capnp b/src/workerd/server/workerd.capnp index d433362ff99..47cc215443f 100644 --- a/src/workerd/server/workerd.capnp +++ b/src/workerd/server/workerd.capnp @@ -94,6 +94,9 @@ struct Config { logging @6 : LoggingOptions; # Console and Stdio logging configuration options. + + cluster @7 :ClusterConfig; + # Set when the workerd instance should join a cluster. } struct LoggingOptions { @@ -111,6 +114,72 @@ struct LoggingOptions { # Set a custom prefix for process.stderr. Defaults to "stderr: ". } +struct ClusterConfig { + # workerd instances can work together in a cluster, in order to distribute work and Durable + # Objects among them. + # + # All instances must share a single filesystem where Durable Object data is stored. The + # filesystem can be NFSv4 or any filesystem that supports similar locking semantics with proper + # leases. It can also just be a local filesystem, when running multiple workerd instances on the + # same machine to utilize multiple cores. + # + # All instances in the cluster MUST have identical configuration. The idea is that all + # instances behave exactly the same and any instance could forward any request it receives to + # any other instance and it should produce the same result. For the purpose of rolling upgrades, + # instances may temporarily differ in ways that are backwards-compatible. + # + # Cluster mode is currently implemented only on Linux. + + sharedDirectory @0 :Text; + # Name of a `DiskDirectory` service which represents storage shared by all instances in the + # cluster. If instances are on different machines (not just different processes on the same + # machine), this should point to a network filesystem that implements proper locking and leasing, + # like NFSv4. + # + # All `durableObjectStorage.localDisk` configurations in this config file must point at this same + # directory. (Each Durable Object namespace will create a subdirectory.) + + registrySubdir @1 :Text = "workerd-registry"; + # Subdirectory of `sharedDirectory` to put the actual registry files into. (This can be customized + # in case the default name conflicts with one of the Durable Object namespace `uniqueKey`s, which + # determine the directory names under which they are stored.) + # + # The registry is used to discover what workerd instances are available. Each workerd instance + # registers itself in the registry such that other instances can find it and send traffic to it. + # + # Specifically, each instance will create a file in this directory named after the instance's + # public key, which is randomly-generated when the instance starts up. The file contains the + # instance's network address, so that other instances can connect to it. + + network @3 :Text; + # Each workerd instance in the cluster listens for connections from other instances over the + # network. The `network` field indicates which network to listen on. + # + # This can be either: + # * An IPv4 or IPv6 CIDR, which must match one of the machine's IP addresses. + # * The word "unix", to mean that the cluster is actually local to the machine and should + # communicate via Unix-domain sockets. + # + # In the CIDR case, workerd enumerates all IP addresses of all network interfaces available to + # it, selects the first one that matches the CIDR, and opens an ephemeral port on that address. + # Note that this means the port workerd uses is not predictable. That's OK because workerd will + # then write a file to the registry containing its own address. + # + # In the "unix" case, the registry will be populated not by files, but by Unix-domain sockets. + + key @2 :Text; + # Some secret value known only to members of the cluster. This is used to encrypt channel tokens, + # which are used when passing service stubs or Durable Object stubs over RPC. + # + # The secrecy of this key matters if you allow untrusted clients outside of the cluster to speak + # the low-level worker-interface.capnp protocol directly with workerd instances in the cluster. + # At present, this is only exposed via the `capnpConnectHost` and `workerdDebugPort` config + # settings. If you don't use either of those in production, then the secrecy of this key isn't + # very important. + # + # TODO(someday): Support some way to gradually roll out a rotated key. +} + # ======================================================================================== # Sockets @@ -701,8 +770,6 @@ struct Worker { # This mode is intended for local testing purposes. localDisk @12 :Text; - # ** EXPERIMENTAL; SUBJECT TO BACKWARDS-INCOMPATIBLE CHANGE ** - # # Durable Object data will be stored in a directory on local disk. This field is the name of # a service, which must be a DiskDirectory service. For each Durable Object class, a # subdirectory will be created using `uniqueKey` as the name. Within the directory, one or @@ -710,11 +777,14 @@ struct Worker { # a number of different extensions depending on the storage mode. (Currently, the main storage # is a file with the extension `.sqlite`, and in certain situations extra files with the # extensions `.sqlite-wal`, and `.sqlite-shm` may also be present.) + # + # Note that if workerd has been given a `ClusterConfig` at the top level, then it is expected + # this disk directory is shared by the whole cluster (e.g. via NFSv4), and workerd will + # automatically ensure that each Durable Object is scheduled on a single machine. When using + # NFSv4, all Durable Object directories MUST be on the SAME VOLUME as the registry directory + # defined in `ClusterConfig`, so that lease loss affects all files and locks simultaneously. } - # TODO(someday): Support distributing objects across a cluster. At present, objects are always - # local to one instance of the runtime. - moduleFallback @13 :Text; tails @14 :List(ServiceDesignator); @@ -873,6 +943,12 @@ struct DiskDirectory { # is no acceptable format for these, regardless of what the client says it accepts). # # `HEAD` requests are properly optimized to perform a stat() without actually opening the file. + # + # If workerd has been given a `ClusterConfig` at the top level, it is important that any + # `DiskDirectory` is set up in such a way that a request that uses it can be served correctly + # from any instance in the cluster. For read-only directories, this just means they all have to + # contain the same data. For writable directories, you may want to use a shared filesystem like + # NFSv4. path @0 :Text; # The filesystem path of the directory. If not specified, then it must be specified on the From c4aaa469c95125b39834d3d29cc892fbf78940ad Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Thu, 21 May 2026 15:05:02 -0500 Subject: [PATCH 2/9] Add OFD file lock primitive RAII wrapper around Linux OFD locks (fcntl F_OFD_SETLK) for per-fd file locking needed by the cluster DO ownership protocol. OFD locks are per open-file-description rather than per-process, which is essential since a single workerd process manages many DOs concurrently. Includes tryLock() (non-blocking), verifyHeld() (for NFSv4 lease verification), and move semantics. Non-Linux platforms fail at runtime rather than compile time so builds still succeed when cluster mode is unused. This code (and commit message) was entirely written by Opus 4.6. --- src/workerd/util/BUILD.bazel | 13 +++ src/workerd/util/ofd-lock-test.c++ | 182 +++++++++++++++++++++++++++++ src/workerd/util/ofd-lock.c++ | 111 ++++++++++++++++++ src/workerd/util/ofd-lock.h | 48 ++++++++ 4 files changed, 354 insertions(+) create mode 100644 src/workerd/util/ofd-lock-test.c++ create mode 100644 src/workerd/util/ofd-lock.c++ create mode 100644 src/workerd/util/ofd-lock.h diff --git a/src/workerd/util/BUILD.bazel b/src/workerd/util/BUILD.bazel index 9f8ded578bc..1c485380200 100644 --- a/src/workerd/util/BUILD.bazel +++ b/src/workerd/util/BUILD.bazel @@ -453,3 +453,16 @@ kj_test( src = "state-machine-test.c++", deps = [":state-machine"], ) + +wd_cc_library( + name = "ofd-lock", + srcs = ["ofd-lock.c++"], + hdrs = ["ofd-lock.h"], + visibility = ["//visibility:public"], + deps = ["@capnp-cpp//src/kj"], +) + +kj_test( + src = "ofd-lock-test.c++", + deps = [":ofd-lock"], +) diff --git a/src/workerd/util/ofd-lock-test.c++ b/src/workerd/util/ofd-lock-test.c++ new file mode 100644 index 00000000000..aa827c444b2 --- /dev/null +++ b/src/workerd/util/ofd-lock-test.c++ @@ -0,0 +1,182 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "ofd-lock.h" + +#include +#include +#include + +#if !_WIN32 +#include +#include +#include +#endif + +namespace workerd { +namespace { + +#ifdef __linux__ + +kj::OwnFd openTempFile() { + char path[] = "/tmp/ofd-lock-test-XXXXXX"; + int fd = mkstemp(path); + KJ_SYSCALL(fd); + unlink(path); + return kj::OwnFd(fd); +} + +KJ_TEST("exclusive lock conflicts with exclusive lock") { + auto fd1 = openTempFile(); + + // Open a second fd to the same file via /proc/self/fd. + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + + // Second exclusive lock on a different fd to the same file should fail. + KJ_EXPECT(OfdLock::tryLock(fd2, OfdLock::EXCLUSIVE) == kj::none); +} + +KJ_TEST("shared lock conflicts with exclusive lock") { + auto fd1 = openTempFile(); + + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + + // Shared lock should also fail when exclusive is held. + KJ_EXPECT(OfdLock::tryLock(fd2, OfdLock::SHARED) == kj::none); +} + +KJ_TEST("shared locks do not conflict with each other") { + auto fd1 = openTempFile(); + + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::SHARED)); + auto lock2 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd2, OfdLock::SHARED)); + // Both shared locks held simultaneously — no conflict. +} + +KJ_TEST("exclusive lock blocks while shared lock is held") { + auto fd1 = openTempFile(); + + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::SHARED)); + + // Exclusive lock should fail when shared lock is held by a different fd. + KJ_EXPECT(OfdLock::tryLock(fd2, OfdLock::EXCLUSIVE) == kj::none); +} + +KJ_TEST("releasing lock allows re-acquisition") { + auto fd1 = openTempFile(); + + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + { + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + // lock1 goes out of scope here, releasing the lock. + } + + // Now fd2 should be able to acquire an exclusive lock. + auto lock2 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd2, OfdLock::EXCLUSIVE)); +} + +KJ_TEST("move constructor transfers lock") { + auto fd1 = openTempFile(); + + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + + // Move the lock. + auto lock2 = kj::mv(lock1); + + // The lock should still be held (fd2 can't get it). + KJ_EXPECT(OfdLock::tryLock(fd2, OfdLock::EXCLUSIVE) == kj::none); +} + +KJ_TEST("move constructor releases on destruction of target") { + auto fd1 = openTempFile(); + + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + { + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + auto lock2 = kj::mv(lock1); + // lock2 goes out of scope, releasing the lock. + } + + // fd2 should now succeed. + auto lock3 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd2, OfdLock::EXCLUSIVE)); +} + +KJ_TEST("move assignment transfers lock") { + auto fd1 = openTempFile(); + + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + + // Create a second lock on fd2... wait, we can't because fd1 already holds exclusive. + // Instead, test move assignment by creating a dummy that gets replaced. + auto fd3 = openTempFile(); + auto lock3 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd3, OfdLock::EXCLUSIVE)); + + lock3 = kj::mv(lock1); // Releases lock on fd3, takes lock from fd1. + + // fd2 still can't get the lock (lock on fd1 was transferred to lock3). + KJ_EXPECT(OfdLock::tryLock(fd2, OfdLock::EXCLUSIVE) == kj::none); +} + +KJ_TEST("same fd can re-lock (lock is per open file description)") { + auto fd1 = openTempFile(); + + // On the same fd, acquiring the same lock type again is a no-op re-lock, not a conflict. + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + auto lock2 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + // Both succeed because OFD locks are per open file description, and this is the same fd. +} + +KJ_TEST("verifyHeld succeeds when lock is held") { + auto fd1 = openTempFile(); + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + // Should not throw. + lock1.verifyHeld(); +} + +#else // #if __linux__ + +KJ_TEST("dummy test: platform not supported") {} + +#endif // #if __linux__, #else + +} // namespace +} // namespace workerd diff --git a/src/workerd/util/ofd-lock.c++ b/src/workerd/util/ofd-lock.c++ new file mode 100644 index 00000000000..96ad38aee9b --- /dev/null +++ b/src/workerd/util/ofd-lock.c++ @@ -0,0 +1,111 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "ofd-lock.h" + +#include + +#ifdef __linux__ +#include +#include +#endif + +namespace workerd { + +#ifdef __linux__ + +namespace { + +struct flock makeFlock(short type) { + struct flock fl = {}; + fl.l_type = type; + fl.l_whence = SEEK_SET; + fl.l_start = 0; + fl.l_len = 0; // Lock the entire file. + return fl; +} + +} // namespace + +kj::Maybe OfdLock::tryLock(int fd, Type type) { + auto fl = makeFlock(type == SHARED ? F_RDLCK : F_WRLCK); + if (fcntl(fd, F_OFD_SETLK, &fl) == -1) { + int err = errno; + if (err == EAGAIN || err == EACCES) { + // Lock is held by another open file description. + return kj::none; + } + KJ_FAIL_SYSCALL("fcntl(F_OFD_SETLK)", err); + } + return OfdLock(fd, type); +} + +void OfdLock::verifyHeld() { + KJ_REQUIRE(fd >= 0, "OfdLock has been moved away"); + // Re-lock with the same type. If the underlying NFSv4 lock stateid has been invalidated + // (lease lost), the Linux NFS client surfaces this as EIO. If the stateid is still valid, + // the kernel treats the re-lock as a no-op. + auto fl = makeFlock(type == SHARED ? F_RDLCK : F_WRLCK); + KJ_SYSCALL(fcntl(fd, F_OFD_SETLK, &fl), "NFS lease may have been lost"); +} + +OfdLock::OfdLock(int fd, Type type): fd(fd), type(type) {} + +OfdLock::OfdLock(OfdLock&& other): fd(other.fd), type(other.type) { + other.fd = -1; +} + +OfdLock& OfdLock::operator=(OfdLock&& other) { + if (this != &other) { + // Release existing lock, if any. + if (fd >= 0) { + auto fl = makeFlock(F_UNLCK); + fcntl(fd, F_OFD_SETLK, &fl); + } + fd = other.fd; + type = other.type; + other.fd = -1; + } + return *this; +} + +OfdLock::~OfdLock() { + if (fd >= 0) { + auto fl = makeFlock(F_UNLCK); + fcntl(fd, F_OFD_SETLK, &fl); + } +} + +#else // !__linux__ + +kj::Maybe OfdLock::tryLock(int fd, Type type) { + KJ_FAIL_REQUIRE("OFD locks are only supported on Linux. " + "Cluster mode is currently implemented only on Linux."); +} + +void OfdLock::verifyHeld() { + KJ_FAIL_REQUIRE("OFD locks are only supported on Linux. " + "Cluster mode is currently implemented only on Linux."); +} + +OfdLock::OfdLock(int fd, Type type): fd(fd), type(type) {} + +OfdLock::OfdLock(OfdLock&& other): fd(other.fd), type(other.type) { + other.fd = -1; +} + +OfdLock& OfdLock::operator=(OfdLock&& other) { + if (this != &other) { + fd = other.fd; + type = other.type; + other.fd = -1; + } + return *this; +} + +OfdLock::~OfdLock() {} + +#endif // __linux__ + +} // namespace workerd diff --git a/src/workerd/util/ofd-lock.h b/src/workerd/util/ofd-lock.h new file mode 100644 index 00000000000..aebe84b4c8f --- /dev/null +++ b/src/workerd/util/ofd-lock.h @@ -0,0 +1,48 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#pragma once + +#include +#include + +namespace workerd { + +class OfdLock { + // RAII holder for a lock on an open file description. Releasing this object releases the lock. + // + // Uses Linux OFD (Open File Description) locks via fcntl(F_OFD_SETLK). OFD locks are per-fd + // rather than per-process, which is essential since a single workerd process may manage many + // Durable Objects concurrently. + public: + enum Type { SHARED, EXCLUSIVE }; + + // Try to acquire a lock (non-blocking). Returns kj::none if the lock is held in a conflicting + // mode by another open file description. + // + // On non-Linux platforms, fails at runtime with KJ_FAIL_REQUIRE rather than a compile error, + // so non-Linux builds still compile when cluster mode is unused. + static kj::Maybe tryLock(int fd, Type type); + + // Verify that a lock that this open file description was previously granted is still held. + // Specifically, this is meant to check for NFSv4 lease loss. Implemented as a no-op + // F_OFD_SETLK of the same lock type on the same fd: if the underlying NFSv4 lock stateid + // has been invalidated (lease lost), the Linux NFS client surfaces this as EIO and this + // function throws; if the stateid is still valid, the kernel treats the re-lock as a + // no-op and we return cleanly. + void verifyHeld(); + + OfdLock(OfdLock&& other); + OfdLock& operator=(OfdLock&& other); + KJ_DISALLOW_COPY(OfdLock); + + ~OfdLock(); // releases via F_OFD_SETLK with F_UNLCK + + private: + int fd; // -1 after move + Type type; + explicit OfdLock(int fd, Type type); +}; + +} // namespace workerd From 8ed704ee7f99525d25cd0f1da9a0e924c6cd90c8 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Thu, 21 May 2026 15:56:34 -0500 Subject: [PATCH 3/9] Add cluster registry as a custom VatNetwork Implements ClusterRegistry, a capnp::VatNetwork for multi-node cluster RPC. Each node generates an X25519 keypair as its identity, binds a listener (Unix socket or ephemeral TCP port based on CIDR), and registers itself in a shared directory. Includes: - cluster.capnp: VatId and stub Level 3/4 types for VatNetwork params - X25519PublicKey: small wrapper with hex encoding and HashMap support - ConnectionImpl: per-peer Connection with MessageStream, idle timeout, and refcounted outbound caching - Handshake protocol: raw 32-byte public key exchange before RPC starts - Peer discovery: lazy directory scanning with mtime-based liveness - CIDR/IP mode: getifaddrs-based IP matching, OFD-locked registry file with periodic mtime heartbeat - Unix mode: socket-file-as-registry-entry, connect-based liveness The end-to-end RPC tests are present but currently failing; the non-network tests (keypair generation, self-bootstrap, peer discovery, self-connect) all pass. This code (and commit message) was originally written by Opus 4.6 but the code sucked and was heavily refactored by my human hand. --- src/workerd/server/BUILD.bazel | 27 + src/workerd/server/cluster-registry-test.c++ | 304 +++++++ src/workerd/server/cluster-registry.c++ | 825 +++++++++++++++++++ src/workerd/server/cluster-registry.h | 171 ++++ src/workerd/server/cluster.capnp | 25 + 5 files changed, 1352 insertions(+) create mode 100644 src/workerd/server/cluster-registry-test.c++ create mode 100644 src/workerd/server/cluster-registry.c++ create mode 100644 src/workerd/server/cluster-registry.h create mode 100644 src/workerd/server/cluster.capnp diff --git a/src/workerd/server/BUILD.bazel b/src/workerd/server/BUILD.bazel index ece98eee3c0..27979dccb68 100644 --- a/src/workerd/server/BUILD.bazel +++ b/src/workerd/server/BUILD.bazel @@ -249,6 +249,23 @@ wd_cc_library( ], ) +wd_capnp_library(src = "cluster.capnp") + +wd_cc_library( + name = "cluster-registry", + srcs = ["cluster-registry.c++"], + hdrs = ["cluster-registry.h"], + visibility = ["//visibility:public"], + deps = [ + ":cluster_capnp", + "//src/workerd/util:ofd-lock", + "@capnp-cpp//src/capnp:capnp-rpc", + "@capnp-cpp//src/kj", + "@capnp-cpp//src/kj:kj-async", + "@ssl", + ], +) + wd_capnp_library(src = "docker-api.capnp") wd_capnp_library(src = "log-schema.capnp") @@ -398,6 +415,16 @@ kj_test( ], ) +kj_test( + src = "cluster-registry-test.c++", + deps = [ + ":cluster-registry", + "@capnp-cpp//src/capnp:capnp-rpc", + "@capnp-cpp//src/kj", + "@capnp-cpp//src/kj:kj-async", + ], +) + kj_test( src = "json-logger-test.c++", deps = [ diff --git a/src/workerd/server/cluster-registry-test.c++ b/src/workerd/server/cluster-registry-test.c++ new file mode 100644 index 00000000000..e7b274922d4 --- /dev/null +++ b/src/workerd/server/cluster-registry-test.c++ @@ -0,0 +1,304 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "cluster-registry.h" + +#include + +#if !_WIN32 +#include +#endif + +#include +#include +#include +#include +#include +#include + +namespace workerd { +namespace { + +#if __linux__ + +// A trivial capability server that counts calls. +class CallCountServer final: public capnp::Capability::Server { + public: + int callCount = 0; + + DispatchCallResult dispatchCall(uint64_t interfaceId, + uint16_t methodId, + capnp::CallContext context) override { + ++callCount; + return {kj::READY_NOW, false, true}; + } +}; + +// Helper to create a temp directory on disk. +class TempDir { + public: + TempDir() { + char tmpl[] = "/tmp/cluster-registry-test-XXXXXX"; + KJ_ASSERT(mkdtemp(tmpl) != nullptr); + pathStr = kj::str(tmpl); + disk = kj::newDiskFilesystem(); + dir = disk->getRoot().openSubdir(kj::Path::parse(pathStr.slice(1)), // remove leading / + kj::WriteMode::MODIFY); + } + + ~TempDir() noexcept(false) { + auto p = kj::Path::parse(pathStr.slice(1)); + disk->getRoot().remove(p); + } + + kj::Own get() { + return dir->clone(); + } + + private: + kj::String pathStr; + kj::Own disk; + kj::Own dir; +}; + +// Helper to make a simple RPC call on a capability client. +void makeCall(capnp::Capability::Client& client, kj::WaitScope& ws) { + auto req = client.typelessRequest( + 0, 0, capnp::MessageSize{4, 0}, capnp::Capability::Client::CallHints()); + req.send().wait(ws); +} + +KJ_TEST("X25519PublicKey: hex roundtrip") { + X25519PublicKey key; + memset(key.bytes, 0xAB, sizeof(key.bytes)); + auto hex = key.toHex(); + KJ_EXPECT(hex.size() == 64); + auto decoded = X25519PublicKey::fromHex(hex); + KJ_EXPECT(decoded == key); +} + +KJ_TEST("X25519PublicKey: equality") { + X25519PublicKey a, b; + memset(a.bytes, 1, sizeof(a.bytes)); + memset(b.bytes, 1, sizeof(b.bytes)); + KJ_EXPECT(a == b); + + b.bytes[0] = 2; + KJ_EXPECT(!(a == b)); +} + +KJ_TEST("ClusterRegistry: self is never dead") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + KJ_EXPECT(!reg.isPeerDead(reg.getPublicKey())); +} + +KJ_TEST("ClusterRegistry: self-connect returns none") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + capnp::MallocMessageBuilder msg; + auto vatId = msg.initRoot(); + vatId.setPublicKey(kj::arrayPtr(reg.getPublicKey().bytes, 32)); + + KJ_EXPECT(reg.connect(vatId.asReader()) == kj::none); +} + +KJ_TEST("ClusterRegistry: self-bootstrap returns local capability") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto server = kj::heap(); + auto& serverRef = *server; + auto bootstrap = capnp::Capability::Client(kj::mv(server)); + auto rpc = capnp::makeRpcServer(reg, kj::mv(bootstrap)); + + capnp::MallocMessageBuilder msg; + auto vatId = msg.initRoot(); + vatId.setPublicKey(kj::arrayPtr(reg.getPublicKey().bytes, 32)); + + auto client = rpc.bootstrap(vatId); + makeCall(client, io.waitScope); + KJ_EXPECT(serverRef.callCount == 1); +} + +KJ_TEST("ClusterRegistry: peer discovery via directory scan") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg1(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + ClusterRegistry reg2(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + // In unix mode, socket files in the directory are registry entries. + KJ_EXPECT(!reg1.isPeerDead(reg2.getPublicKey())); + KJ_EXPECT(!reg2.isPeerDead(reg1.getPublicKey())); +} + +KJ_TEST("ClusterRegistry: isPeerDead caches live peers briefly") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg1(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + ClusterRegistry reg2(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto peerPath = kj::Path({reg2.getPublicKey().toHex()}); + KJ_EXPECT(!reg1.isPeerDead(reg2.getPublicKey())); + + tmpDir.get()->tryRemove(peerPath); + KJ_EXPECT(!reg1.isPeerDead(reg2.getPublicKey())); +} + +KJ_TEST("ClusterRegistry: isPeerDead caches dead peers permanently") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + X25519PublicKey peerKey; + memset(peerKey.bytes, 0xCA, sizeof(peerKey.bytes)); + auto peerPath = kj::Path({peerKey.toHex()}); + + KJ_EXPECT(reg.isPeerDead(peerKey)); + tmpDir.get()->openFile(peerPath, kj::WriteMode::CREATE)->writeAll("placeholder"); + KJ_EXPECT(reg.isPeerDead(peerKey)); +} + +KJ_TEST("ClusterRegistry: end-to-end RPC between two registries") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg1(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + ClusterRegistry reg2(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto maint1 = reg1.runMaintenance(); + auto maint2 = reg2.runMaintenance(); + + auto server1 = kj::heap(); + auto& server1Ref = *server1; + auto server2 = kj::heap(); + auto& server2Ref = *server2; + + auto rpc1 = capnp::makeRpcServer(reg1, capnp::Capability::Client(kj::mv(server1))); + auto rpc2 = capnp::makeRpcServer(reg2, capnp::Capability::Client(kj::mv(server2))); + + auto run1 = rpc1.run(); + auto run2 = rpc2.run(); + + // reg1 -> reg2 + { + capnp::MallocMessageBuilder msg; + auto vatId = msg.initRoot(); + vatId.setPublicKey(kj::arrayPtr(reg2.getPublicKey().bytes, 32)); + + auto client = rpc1.bootstrap(vatId); + makeCall(client, io.waitScope); + KJ_EXPECT(server2Ref.callCount == 1); + } + + // reg2 -> reg1 + { + capnp::MallocMessageBuilder msg; + auto vatId = msg.initRoot(); + vatId.setPublicKey(kj::arrayPtr(reg1.getPublicKey().bytes, 32)); + + auto client = rpc2.bootstrap(vatId); + makeCall(client, io.waitScope); + KJ_EXPECT(server1Ref.callCount == 1); + } +} + +KJ_TEST("ClusterRegistry: end-to-end RPC with binary IP registry entries") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg1( + tmpDir.get(), "127.0.0.0/8"_kj, io.provider->getNetwork(), io.provider->getTimer()); + ClusterRegistry reg2( + tmpDir.get(), "127.0.0.0/8"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto maint1 = reg1.runMaintenance(); + auto maint2 = reg2.runMaintenance(); + + auto server2 = kj::heap(); + auto& server2Ref = *server2; + + auto rpc1 = capnp::makeRpcServer(reg1, capnp::Capability::Client(kj::heap())); + auto rpc2 = capnp::makeRpcServer(reg2, capnp::Capability::Client(kj::mv(server2))); + + auto run1 = rpc1.run(); + auto run2 = rpc2.run(); + + capnp::MallocMessageBuilder msg; + auto vatId = msg.initRoot(); + vatId.setPublicKey(kj::arrayPtr(reg2.getPublicKey().bytes, 32)); + + auto client = rpc1.bootstrap(vatId); + makeCall(client, io.waitScope); + KJ_EXPECT(server2Ref.callCount == 1); +} + +KJ_TEST("ClusterRegistry: idle timeout closes connections") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + auto shortTimeout = 100 * kj::MILLISECONDS; + + ClusterRegistry reg1( + tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer(), shortTimeout); + ClusterRegistry reg2( + tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer(), shortTimeout); + + auto server2 = kj::heap(); + auto& server2Ref = *server2; + + auto rpc1 = capnp::makeRpcServer(reg1, capnp::Capability::Client(kj::heap())); + auto rpc2 = capnp::makeRpcServer(reg2, capnp::Capability::Client(kj::mv(server2))); + + auto run1 = rpc1.run(); + auto run2 = rpc2.run(); + + capnp::MallocMessageBuilder msg; + auto vatId = msg.initRoot(); + vatId.setPublicKey(kj::arrayPtr(reg2.getPublicKey().bytes, 32)); + + { + auto client = rpc1.bootstrap(vatId); + makeCall(client, io.waitScope); + KJ_EXPECT(server2Ref.callCount == 1); + } + // Client dropped — connection should become idle. + + // Wait for the idle timeout to fire. + io.provider->getTimer().afterDelay(shortTimeout + 200 * kj::MILLISECONDS).wait(io.waitScope); + + // After idle close, a new bootstrap should still work (new connection). + auto client2 = rpc1.bootstrap(vatId); + makeCall(client2, io.waitScope); + KJ_EXPECT(server2Ref.callCount == 2); +} + +KJ_TEST("ClusterRegistry: verifyNfsLease() succeeds when lock is held") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + reg.verifyNfsLease(); +} + +#else // #if __linux__ + +KJ_TEST("dummy test: platform not supported") {} + +#endif // #if __linux__, #else + +} // namespace +} // namespace workerd diff --git a/src/workerd/server/cluster-registry.c++ b/src/workerd/server/cluster-registry.c++ new file mode 100644 index 00000000000..a430d059e86 --- /dev/null +++ b/src/workerd/server/cluster-registry.c++ @@ -0,0 +1,825 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "cluster-registry.h" + +#if !_WIN32 +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +#include +#include +#include +#include +#include + +namespace workerd { + +#if __linux__ + +namespace { + +constexpr kj::Duration REGISTRY_MAINTENANCE_INTERVAL = 60 * kj::SECONDS; +constexpr kj::Duration DIRECTORY_SCAN_INTERVAL = 30 * kj::SECONDS; +constexpr kj::Duration NEW_REGISTRY_ENTRY_GRACE_PERIOD = 15 * kj::SECONDS; +constexpr kj::Duration FAILED_PROBE_THROTTLE = 5 * kj::SECONDS; +constexpr kj::Duration SUSPECT_PEER_AVOIDANCE_PERIOD = 15 * kj::SECONDS; +constexpr kj::Duration PEER_LIVENESS_CACHE_TTL = 15 * kj::SECONDS; + +} // namespace + +// ======================================================================================= +// X25519PublicKey + +kj::String X25519PublicKey::toHex() const { + return kj::encodeHex(kj::arrayPtr(bytes, sizeof(bytes))); +} + +X25519PublicKey X25519PublicKey::fromHex(kj::StringPtr hex) { + KJ_REQUIRE(hex.size() == 64, "X25519 public key hex must be 64 characters", hex.size()); + auto decoded = kj::decodeHex(hex.asArray()); + KJ_REQUIRE(!decoded.hadErrors, "invalid hex in X25519 public key"); + KJ_ASSERT(decoded.size() == 32); + X25519PublicKey result; + memcpy(result.bytes, decoded.begin(), 32); + return result; +} + +// ======================================================================================= +// ClusterRegistry::NativeAddress + +void ClusterRegistry::NativeAddress::setPort(uint port) { + KJ_REQUIRE(port <= static_cast(kj::maxValue), "invalid port", port); + + switch (storage.ss_family) { + case AF_INET: { + KJ_REQUIRE(length == sizeof(sockaddr_in), "invalid IPv4 sockaddr length", length); + auto& sin = *reinterpret_cast(&storage); + sin.sin_port = htons(port); + break; + } + case AF_INET6: { + KJ_REQUIRE(length == sizeof(sockaddr_in6), "invalid IPv6 sockaddr length", length); + auto& sin6 = *reinterpret_cast(&storage); + sin6.sin6_port = htons(port); + break; + } + default: + KJ_FAIL_REQUIRE("address does not have a TCP port", storage.ss_family); + } +} + +ClusterRegistry::NativeAddress ClusterRegistry::NativeAddress::fromSockaddr(const sockaddr* addr) { + KJ_REQUIRE(addr != nullptr, "missing sockaddr"); + + NativeAddress result; + switch (addr->sa_family) { + case AF_INET: + result.length = sizeof(sockaddr_in); + break; + case AF_INET6: + result.length = sizeof(sockaddr_in6); + break; + default: + KJ_FAIL_REQUIRE("sockaddr is not an IP address", addr->sa_family); + } + + memcpy(&result.storage, addr, result.length); + return result; +} + +ClusterRegistry::NativeAddress ClusterRegistry::NativeAddress::parse( + kj::ArrayPtr bytes) { + KJ_REQUIRE( + bytes.size() >= sizeof(sa_family_t), "peer registry address is too short", bytes.size()); + KJ_REQUIRE( + bytes.size() <= sizeof(sockaddr_storage), "peer registry address is too long", bytes.size()); + + NativeAddress result; + result.length = bytes.size(); + memcpy(&result.storage, bytes.begin(), bytes.size()); + + switch (result.storage.ss_family) { + case AF_INET: + KJ_REQUIRE( + result.length == sizeof(sockaddr_in), "invalid IPv4 sockaddr length", result.length); + break; + case AF_INET6: + KJ_REQUIRE( + result.length == sizeof(sockaddr_in6), "invalid IPv6 sockaddr length", result.length); + break; + default: + KJ_FAIL_REQUIRE("peer registry address is not an IP sockaddr", result.storage.ss_family); + } + + return result; +} + +// ======================================================================================= +// ClusterRegistry::ConnectionImpl + +class ClusterRegistry::ConnectionImpl final: public ClusterVatNetworkBase::Connection, + public kj::Refcounted { + public: + ConnectionImpl(ClusterRegistry& registry, + kj::Own stream, + kj::Maybe peerKey, + kj::Maybe> wasRefused) + : registry(registry), + stream(kj::mv(stream)), + msgStream(*this->stream, capnp::IncomingRpcMessage::getShortLivedCallback()), + peerKey(peerKey), + wasRefused(kj::mv(wasRefused)), + outbound(peerKey != kj::none) { + // We always need to pump immediately to send the handshake. + ensurePumping(); + } + + ~ConnectionImpl() noexcept(false) { + unregister(); + } + + cluster::VatId::Reader getPeerVatId() override { + // We can safely expected the RPC system won't call this method until there is actually some + // activity on the stream that causes it to be needed. If this is an inbound stream, there + // is necessary no activity until we receive some messages, and we must have received the + // peer key before that. + // + // This is convenient because otherwise we'd have to figure out what to do with this call + // if we haven't received the handshake yet, and therefore don't actually know who the peer + // is. + auto pk = KJ_ASSERT_NONNULL(peerKey, "getPeerVatId() called too early"); + + // Build `peerVatId` on first use. + KJ_IF_SOME(p, peerVatId) { + return p.getRoot().asReader(); + } else { + auto builder = peerVatId.emplace(peerVatIdScratch).getRoot(); + builder.setPublicKey(kj::arrayPtr(pk.bytes, sizeof(pk.bytes))); + return builder.asReader(); + } + } + + kj::Own newOutgoingMessage(uint firstSegmentWordSize) override; + kj::Promise>> receiveIncomingMessage() override; + kj::Promise shutdown() override; + void setIdle(bool newIdle) override; + + private: + class OutgoingMessageImpl; + class IncomingMessageImpl; + + ClusterRegistry& registry; + kj::Own stream; + capnp::BufferedMessageStream msgStream; + kj::Maybe peerKey; + kj::Maybe> wasRefused; + bool outbound; // is this an outbound or inbound connection? + + capnp::word peerVatIdScratch[8]{}; + kj::Maybe peerVatId; + + kj::Vector> writeQueue; + kj::Maybe> pumpTask; + kj::Maybe brokenReason; + + bool pumping = false; // is pump() running? + bool isShutdown = false; // has shutdown() been called? + + bool idle = false; // tracks last value passed to setIdle() + bool idleTimedOut = false; + + bool sentHandshake = false; + bool receivedHandshake = false; + + kj::Maybe> idleTimer; + kj::Maybe>>>> + interruptReceiveFulfiller; + + void ensurePumping(); + kj::Promise pump(); + + void requireOpen(); + void unregister(); + + friend class ClusterRegistry; +}; + +class ClusterRegistry::ConnectionImpl::OutgoingMessageImpl final: public capnp::OutgoingRpcMessage, + public kj::Refcounted { + public: + OutgoingMessageImpl(ConnectionImpl& conn, uint firstSegmentWordSize) + : conn(conn), + message(firstSegmentWordSize == 0 ? capnp::SUGGESTED_FIRST_SEGMENT_WORDS + : firstSegmentWordSize) {} + + capnp::AnyPointer::Builder getBody() override { + return message.getRoot(); + } + + void send() override { + conn.requireOpen(); + KJ_REQUIRE(!conn.idle, "bug in RpcSystem: trying to send a message while idle"); + KJ_REQUIRE(!conn.idleTimedOut, "send() after idle period timed out"); + + conn.writeQueue.add(kj::addRef(*this)); + conn.ensurePumping(); + } + + size_t sizeInWords() override { + return message.sizeInWords(); + } + + private: + ConnectionImpl& conn; + capnp::MallocMessageBuilder message; + + friend class ConnectionImpl; +}; + +class ClusterRegistry::ConnectionImpl::IncomingMessageImpl final: public capnp::IncomingRpcMessage { + public: + IncomingMessageImpl(kj::Own message): message(kj::mv(message)) {} + + capnp::AnyPointer::Reader getBody() override { + return message->getRoot(); + } + + size_t sizeInWords() override { + return message->sizeInWords(); + } + + private: + kj::Own message; +}; + +kj::Own ClusterRegistry::ConnectionImpl::newOutgoingMessage( + uint firstSegmentWordSize) { + KJ_REQUIRE(!idle, "bug in RpcSystem: trying to send a message while idle"); + return kj::refcounted(*this, firstSegmentWordSize); +} + +kj::Promise>> ClusterRegistry::ConnectionImpl:: + receiveIncomingMessage() { + KJ_IF_SOME(b, brokenReason) { + kj::throwFatalException(b.clone()); + } + if (idleTimedOut) { + co_return kj::none; + } + + if (!receivedHandshake) { + X25519PublicKey receivedKey; + auto readHandshake = stream->read(receivedKey.bytes); + if (outbound && registry.nfsMode()) { + co_await registry.timer.timeoutAfter(registry.connectTimeout, kj::mv(readHandshake)); + } else { + co_await readHandshake; + } + KJ_IF_SOME(expectedKey, peerKey) { + KJ_REQUIRE(receivedKey == expectedKey, "peer did not send expected handshake bytes"); + } else { + peerKey = receivedKey; + } + receivedHandshake = true; + if (outbound) { + KJ_IF_SOME(entry, registry.peers.find(KJ_ASSERT_NONNULL(peerKey))) { + entry.suspect = false; + entry.suspectSince = kj::none; + } + } + } + + auto paf = kj::newPromiseAndFulfiller>>(); + interruptReceiveFulfiller = kj::mv(paf.fulfiller); + + auto message = co_await msgStream.tryReadMessage().exclusiveJoin(kj::mv(paf.promise)); + + KJ_IF_SOME(m, message) { + co_return kj::heap(kj::mv(m)); + } else { + co_return kj::none; + } +} + +kj::Promise ClusterRegistry::ConnectionImpl::shutdown() { + requireOpen(); + isShutdown = true; + ensurePumping(); + return KJ_ASSERT_NONNULL(kj::mv(pumpTask)); +} + +void ClusterRegistry::ConnectionImpl::setIdle(bool newIdle) { + if (newIdle == idle) return; // Already in the desired state. + idle = newIdle; + + // Only outbound connections should potentially close themselves when idle. Inbound should + // be closed at the other end. + if (outbound) { + if (idle) { + // Wait for the idle timeout. + idleTimer = registry.timer.afterDelay(registry.idleTimeout) + .then([this]() { + // Wait until the event queue is empty, just in case we're concurrently receiving a + // message that causes us to become non-idle. (But that shouldn't happen since this is + // an outbound connection. Once idle, it should only be revived from our end. But just to + // be safe.) + return kj::evalLast([this]() { + // We officially timed out. Make sure all future calls to receiveIncomingMessage() return + // kj::none. + idleTimedOut = true; + + // Cancel the current receiveIncomingMessage() by fulfilling the promise that is + // joined with the `tryReadMessage()` promise. + KJ_IF_SOME(i, interruptReceiveFulfiller) { + i->fulfill(kj::none); + } + + // Make sure a new call to connect() can't return this again. + unregister(); + }); + }).eagerlyEvaluate(nullptr); + } else { + // Cancel the last idle wait. + idleTimer = kj::none; + } + } +} + +void ClusterRegistry::ConnectionImpl::ensurePumping() { + if (!pumping) { + pumping = true; + pumpTask = pump(); + } +} + +kj::Promise ClusterRegistry::ConnectionImpl::pump() { + KJ_TRY { + if (!sentHandshake) { + co_await stream->write(registry.publicKey.bytes); + sentHandshake = true; + } + + // Give a chance for messages to accumulate to be sent all at once. + co_await kj::yieldUntilQueueEmpty(); + + while (!writeQueue.empty()) { + // Take ownership of all messages currently pending. `writeQueue` is left empty, but may + // be repopulated concurrently. + auto ownMessages = kj::mv(writeQueue); + + // Write them all at once. + auto messageSegments = + KJ_MAP(msg, ownMessages) { return msg->message.getSegmentsForOutput(); }; + co_await msgStream.writeMessages(messageSegments); + } + + if (isShutdown) { + co_await msgStream.end(); + } + + pumping = false; + } + KJ_CATCH(exception) { + brokenReason = exception.clone(); + KJ_IF_SOME(i, interruptReceiveFulfiller) { + i->reject(exception.clone()); + } + kj::throwFatalException(kj::mv(exception)); + } +} + +void ClusterRegistry::ConnectionImpl::requireOpen() { + KJ_IF_SOME(e, brokenReason) { + kj::throwFatalException(e.clone()); + } + KJ_REQUIRE(!isShutdown, "bug in RpcSystem: can't send() after shutdown()"); +} + +void ClusterRegistry::ConnectionImpl::unregister() { + if (outbound) { + auto key = KJ_ASSERT_NONNULL(peerKey); // peerKey is always set for outbounds + + KJ_TRY { + KJ_IF_SOME(entry, registry.peers.findEntry(key)) { + KJ_IF_SOME(conn, entry.value.outboundConnection) { + if (&conn == this) { + entry.value.outboundConnection = kj::none; + } + } + + // If we failed to receive the peer handshake, mark the peer as suspect and run the + // dead-peer cleanup path. We may have failed to connect entirely, or we may have + // connected to a server that wasn't the peer we expected. + if (!receivedHandshake) { + bool wasRefusedUnix = false; + KJ_IF_SOME(w, wasRefused) { + wasRefusedUnix = *w; + } + registry.cleanupFailedPeer(entry, wasRefusedUnix); + } + } + } + KJ_CATCH(exception) { + KJ_LOG(WARNING, "failed to clean up cluster peer after connection failure", key.toHex(), + exception); + } + } +} + +// ======================================================================================= +// ClusterRegistry constructor + +ClusterRegistry::ClusterRegistry(kj::Own registryDirParam, + kj::StringPtr networkConfig, + kj::Network& kjNetwork, + kj::Timer& timer, + kj::Duration idleTimeout, + kj::Duration connectTimeout) + : registryDir(kj::mv(registryDirParam)), + dirFd(KJ_REQUIRE_NONNULL( + registryDir->getFd(), "cluster registry directory must be a disk-backed directory")), + timer(timer), + network(kjNetwork), + idleTimeout(idleTimeout), + connectTimeout(connectTimeout) { + // Generate X25519 keypair. + X25519_keypair(publicKey.bytes, privateKey); + publicKeyHex = publicKey.toHex(); + + if (networkConfig == "unix") { + // Unix mode: bind a Unix domain socket at /. + // Use /proc/self/fd// as the socket path. + auto socketPath = kj::str("/proc/self/fd/", dirFd, "/", publicKeyHex); + sockaddr_un addr{}; + KJ_REQUIRE( + socketPath.size() < sizeof(addr.sun_path), "Unix socket path is too long", socketPath); + addr.sun_family = AF_UNIX; + memcpy(addr.sun_path, socketPath.begin(), socketPath.size()); + listener = kjNetwork.getSockaddr(&addr, sizeof(addr))->listen(); + } else { + // CIDR/IP/NFS mode: find a matching local IP and bind an ephemeral port. + auto addr = findMatchingIp(networkConfig); + listener = kjNetwork.getSockaddr(addr.asSockaddr(), addr.length)->listen(); + auto port = listener->getPort(); + addr.setPort(port); + boundAddress = addr; + } +} + +ClusterRegistry::~ClusterRegistry() noexcept(false) { + registryDir->tryRemove(kj::Path({publicKeyHex})); +} + +// Search all IP addresses for all interfaces available in the network namespace to find one that +// matches the given CIDR. +ClusterRegistry::NativeAddress ClusterRegistry::findMatchingIp(kj::StringPtr cidr) { + kj::CidrRange range(cidr); + + struct ifaddrs* ifap = nullptr; + KJ_SYSCALL(getifaddrs(&ifap)); + KJ_DEFER(freeifaddrs(ifap)); + + for (auto* ifa = ifap; ifa != nullptr; ifa = ifa->ifa_next) { + if (ifa->ifa_addr == nullptr) continue; + if (range.matches(ifa->ifa_addr)) { + auto result = NativeAddress::fromSockaddr(ifa->ifa_addr); + result.setPort(0); + return result; + } + } + + KJ_FAIL_REQUIRE("no local interface matches CIDR", cidr); +} + +// ======================================================================================= +// Registry maintenance + +kj::Promise ClusterRegistry::runMaintenance() { + KJ_IF_SOME(addr, boundAddress) { + // CIDR/IP mode: create the registry file, lock it, write our address, then periodically + // verify that our registry entry and NFS lock lease are still valid. + + auto registryFile = registryDir->openFile(kj::Path({publicKeyHex}), kj::WriteMode::CREATE); + + // Acquire an exclusive OFD lock. + auto lock = KJ_ASSERT_NONNULL( + OfdLock::tryLock(KJ_ASSERT_NONNULL(registryFile->getFd()), OfdLock::EXCLUSIVE), + "our registry entry is already locked?"); + + // Write our address. + registryFile->write(0, addr.asBytes()); + registryFile->sync(); + + return maintenanceLoopCanceler.wrap(maintenanceLoop(registration.emplace(Registration{ + .file = kj::mv(registryFile), + .lock = kj::mv(lock), + }))); + } else { + // Unix socket mode: the listener socket's lifetime IS the registry entry. Nothing to maintain. + return kj::NEVER_DONE; + } +} + +kj::Promise ClusterRegistry::maintenanceLoop(Registration& registration) { + for (;;) { + co_await timer.afterDelay(REGISTRY_MAINTENANCE_INTERVAL); + + // Verify nlink > 0 (not unlinked by another node). + KJ_REQUIRE(registration.file->stat().linkCount > 0, + "registry file was unlinked by another node; this instance should shut down"); + + // Verify the OFD lock is still held (NFSv4 lease check). + registration.lock.verifyHeld(); + } +} + +void ClusterRegistry::verifyNfsLease() { + KJ_IF_SOME(r, registration) { + KJ_TRY { + r.lock.verifyHeld(); + } + KJ_CATCH(exception) { + // Might as well cancel the maintenance loop immediately. + maintenanceLoopCanceler.cancel(exception); + kj::throwFatalException(kj::mv(exception)); + } + } +} + +// ======================================================================================= +// Peer discovery + +void ClusterRegistry::scanDirectory() { + // List the registry to discover new peers and add them to the `peers` map, so that they can + // be considered by `pickRandomPeer()`. Keep in mind that on NFS, directory listings tend to be + // cached, and therefore we can's assume the existence or absence of a file really tells us + // anything about whether the peer is currently alive or dead. The best we can do is populate the + // map entries so that we know that peers exist at all, and then we'll need to probe them for + // liveness as usual later on. + + auto names = registryDir->listNames(); + + for (auto& name: names) { + if (name.size() != 64) continue; + if (name == publicKeyHex) continue; + + auto decoded = kj::decodeHex(name.asArray()); + if (decoded.hadErrors || decoded.size() != 32) continue; + + X25519PublicKey key; + memcpy(key.bytes, decoded.begin(), 32); + + peers.findOrCreate(key, [&]() -> decltype(peers)::Entry { return {.key = key}; }); + } +} + +void ClusterRegistry::scanDirectoryIfStale(kj::TimePoint now) { + KJ_IF_SOME(lastScan, lastDirectoryScan) { + if (now - lastScan < DIRECTORY_SCAN_INTERVAL) return; + } + + scanDirectory(); + lastDirectoryScan = now; +} + +bool ClusterRegistry::isPeerDead(const X25519PublicKey& key) { + if (key == publicKey) return false; + + auto& entry = + peers.findOrCreateEntry(key, [&]() -> decltype(peers)::Entry { return {.key = key}; }); + + if (entry.value.knownDead) return true; + + auto now = timer.now(); + KJ_IF_SOME(lastLive, entry.value.lastConfirmedLiveTime) { + if (now - lastLive < PEER_LIVENESS_CACHE_TTL) return false; + } + + if (registryDir->exists(kj::Path({key.toHex()}))) { + entry.value.lastConfirmedLiveTime = now; + return false; + } + + entry.value.knownDead = true; + return true; +} + +kj::Maybe ClusterRegistry::pickRandomPeer() { + kj::Vector candidates; + auto now = timer.now(); + scanDirectoryIfStale(now); + + for (auto& peer: peers) { + if (peer.key == publicKey) continue; + if (peer.value.knownDead) continue; + if (peer.value.suspect) { + KJ_IF_SOME(since, peer.value.suspectSince) { + if (now - since < SUSPECT_PEER_AVOIDANCE_PERIOD) continue; + } + } + candidates.add(peer.key); + } + + if (candidates.empty()) return kj::none; + + // Simple pseudo-random selection using the monotonic clock as a cheap source. (Don't use + // timer.now() since it only updates when the event loop waits for I/O.) + auto nanos = + (kj::systemPreciseMonotonicClock().now() - kj::origin()) / kj::NANOSECONDS; + auto idx = kj::hashCode(nanos) % candidates.size(); + return candidates[idx]; +} + +void ClusterRegistry::cleanupFailedPeer(decltype(peers)::Entry& entry, bool wasRefusedUnix) { + auto now = timer.now(); + + entry.value.suspect = true; + entry.value.suspectSince = now; + + auto path = kj::Path({entry.key.toHex()}); + + if (!nfsMode()) { + if (wasRefusedUnix) { + // We failed with ECONNREFUSED, which means the listener is gone (process exited or closed + // the listen socket), in which case we can certainly clean up the socket from disk. + registryDir->tryRemove(path); + entry.value.knownDead = true; + } else { + // We failed for some other reason. Check if the file has disappeared from disk. + if (registryDir->exists(path)) { + // File still exists. We may have failed with EAGAIN (the peer's listen queue is full) or + // some other error where the peer is not necessarily gone. We've already marked it + // suspect, but we should not actually erase it. + } else { + // File is gone from disk. This may be exactly why the connection failed, but regardless, + // we should now mark the peer dead in our cache. + entry.value.knownDead = true; + } + } + + return; + } + + KJ_IF_SOME(lastProbe, entry.value.lastFailedProbeTime) { + if (now - lastProbe < FAILED_PROBE_THROTTLE) return; + } + entry.value.lastFailedProbeTime = now; + + KJ_IF_SOME(file, registryDir->tryOpenFile(path)) { + auto meta = file->stat(); + auto calendarNow = kj::systemCoarseCalendarClock().now(); + if (calendarNow - meta.lastModified < NEW_REGISTRY_ENTRY_GRACE_PERIOD) return; + + if (OfdLock::tryLock(KJ_ASSERT_NONNULL(file->getFd()), OfdLock::SHARED) != kj::none) { + registryDir->tryRemove(path); + entry.value.knownDead = true; + } + } else { + entry.value.knownDead = true; + } +} + +// ======================================================================================= +// Peer address lookup + +ClusterRegistry::NativeAddress ClusterRegistry::readPeerAddress(const X25519PublicKey& key) { + auto path = kj::Path({key.toHex()}); + auto file = + KJ_REQUIRE_NONNULL(registryDir->tryOpenFile(path), "peer not found in registry", key.toHex()); + auto content = file->readAllBytes(); + KJ_REQUIRE(content.size() > 0, "peer registry file is empty", key.toHex()); + return NativeAddress::parse(content); +} + +// ======================================================================================= +// VatNetwork: connect() + +kj::Maybe> ClusterRegistry::connect( + cluster::VatId::Reader peer) { + auto keyData = peer.getPublicKey(); + KJ_REQUIRE(keyData.size() == 32, "invalid VatId: publicKey must be 32 bytes"); + + X25519PublicKey peerKey; + memcpy(peerKey.bytes, keyData.begin(), 32); + + // Self-connect returns kj::none per VatNetwork contract. + if (peerKey == publicKey) { + return kj::none; + } + + // Check for cached outbound connection. + KJ_IF_SOME(entry, peers.find(peerKey)) { + KJ_IF_SOME(conn, entry.outboundConnection) { + return kj::addRef(conn); + } + } + + auto& entry = peers.findOrCreateEntry( + peerKey, [&]() -> decltype(peers)::Entry { return {.key = peerKey}; }); + + KJ_REQUIRE(!entry.value.knownDead, "peer is known to be dead", peerKey.toHex()); + + // Need to establish a new connection. Look up the peer's address. + kj::Own networkAddress; + if (nfsMode()) { + NativeAddress address; + try { + address = readPeerAddress(peerKey); + } catch (...) { + cleanupFailedPeer(entry, false); + throw; + } + networkAddress = network.getSockaddr(address.asSockaddr(), address.length); + } else { + auto path = kj::str("/proc/self/fd/", dirFd, "/", peerKey.toHex()); + sockaddr_un addr{}; + KJ_REQUIRE(path.size() < sizeof(addr.sun_path), "Unix socket path is too long", path); + addr.sun_family = AF_UNIX; + memcpy(addr.sun_path, path.begin(), path.size()); + networkAddress = network.getSockaddr(&addr, sizeof(addr)); + } + + auto connectPromise = networkAddress->connect(); + + kj::Maybe> wasRefused; + if (!nfsMode()) { + auto wrapper = kj::refcountedWrapper(false); + wasRefused = wrapper->addWrappedRef(); + connectPromise = connectPromise.catch_( + [wrapper = kj::mv(wrapper)]( + kj::Exception&& e) mutable -> kj::Promise> { + if (e.getType() == kj::Exception::Type::DISCONNECTED) { + // If connect() failed with DISCONNECTED on a unix socket, this likely means we got + // ECONNREFUSED. Record this. + wrapper->getWrapped() = true; + } + kj::throwFatalException(kj::mv(e)); + }); + } + + auto promisedStream = kj::newPromisedStream(connectPromise.attach(kj::mv(networkAddress))); + + auto conn = + kj::refcounted(*this, kj::mv(promisedStream), peerKey, kj::mv(wasRefused)); + + // Cache the outbound connection. + entry.value.outboundConnection = *conn; + + return kj::Own(kj::addRef(*conn)); +} + +// ======================================================================================= +// VatNetwork: accept() + +kj::Promise> ClusterRegistry::accept() { + auto stream = co_await listener->accept(); + co_return kj::refcounted(*this, kj::mv(stream), kj::none, kj::none); +} + +// ======================================================================================= + +#else // #if __linux__ + +ClusterRegistry::ClusterRegistry(kj::Own registryDir, + kj::StringPtr network, + kj::Network& kjNetwork, + kj::Timer& timer, + kj::Duration idleTimeout, + kj::Duration connectTimeout) { + KJ_UNIMPLEMENTED("cluster mode is only implemented on linux"); +} +ClusterRegistry::~ClusterRegistry() noexcept(false) {} + +kj::Promise ClusterRegistry::runMaintenance() { + KJ_UNREACHABLE; +} +void ClusterRegistry::verifyNfsLease() { + KJ_UNREACHABLE; +} +bool ClusterRegistry::isPeerDead(const X25519PublicKey& key) { + KJ_UNREACHABLE; +} +kj::Maybe ClusterRegistry::pickRandomPeer() { + KJ_UNREACHABLE; +} +kj::Maybe> ClusterRegistry::connect( + cluster::VatId::Reader peer) { + KJ_UNREACHABLE; +} +kj::Promise> ClusterRegistry::accept() { + KJ_UNREACHABLE; +} + +#endif // #if __linux__, #else + +} // namespace workerd diff --git a/src/workerd/server/cluster-registry.h b/src/workerd/server/cluster-registry.h new file mode 100644 index 00000000000..781e91c4504 --- /dev/null +++ b/src/workerd/server/cluster-registry.h @@ -0,0 +1,171 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#pragma once + +#include +#include + +#if __linux__ +#include +#endif + +#include +#include +#include +#include +#include +#include +#include + +namespace workerd { + +using kj::byte; +using kj::uint; + +struct X25519PublicKey { + kj::byte bytes[32]; + + kj::String toHex() const; + static X25519PublicKey fromHex(kj::StringPtr hex); + + bool operator==(const X25519PublicKey& other) const { + return memcmp(bytes, other.bytes, sizeof(bytes)) == 0; + } + kj::uint hashCode() const { + return kj::hashCode(kj::ArrayPtr(bytes)); + } +}; + +// Type alias for the VatNetwork base that ClusterRegistry implements. +using ClusterVatNetworkBase = capnp::VatNetwork; + +class ClusterRegistry final: public ClusterVatNetworkBase { + // Manages this node's cluster identity, registry presence, peer discovery, and the + // VatNetwork that the cluster's RpcSystem sits on top of. + public: + ClusterRegistry(kj::Own registryDir, + kj::StringPtr network, + kj::Network& kjNetwork, + kj::Timer& timer, + kj::Duration idleTimeout = 60 * kj::SECONDS, + kj::Duration connectTimeout = 1 * kj::SECONDS); + ~ClusterRegistry() noexcept(false); + + const X25519PublicKey& getPublicKey() { + return publicKey; + } + + // Run the registry maintenance loop. Returns a promise that must be held alive for + // the lifetime of the server. + kj::Promise runMaintenance(); + + // Verify that the NFS lease is still valid on the registration file, or throw an exception if + // not (no-op if not in NFS mode). + // + // More specifically: If you call this immediately after open()ing a file, and it returns + // successfully, then the newly-opened file is guaranteed to be on the same lease as the + // registration. (Technically, the lease could have been lost already, but if so, the + // newly-opened file will not be writable.) + void verifyNfsLease(); + + // Definitive proof-of-death query. Returns true only when the peer's registry entry is absent. + bool isPeerDead(const X25519PublicKey& key); + + // Pick a random peer's key from the cached peer list, biased away from recent failures. + kj::Maybe pickRandomPeer(); + + // Returns true if this registry uses IP sockets (not unix sockets) and is therefore in + // "NFS mode" where it tries to be NFS-friendly. + bool nfsMode() { + return boundAddress != kj::none; + } + + // implements VatNetwork ----------------------------------------------------- + kj::Maybe> connect(cluster::VatId::Reader peer) override; + kj::Promise> accept() override; + + private: +#if __linux__ + class ConnectionImpl; + + struct NativeAddress { + sockaddr_storage storage{}; + socklen_t length = 0; + + const sockaddr* asSockaddr() const { + return reinterpret_cast(&storage); + } + + kj::ArrayPtr asBytes() const { + return kj::arrayPtr(reinterpret_cast(&storage), length); + } + + void setPort(uint port); + + static NativeAddress fromSockaddr(const sockaddr* addr); + static NativeAddress parse(kj::ArrayPtr bytes); + }; + + kj::Own registryDir; + int dirFd; // raw fd of registryDir, for POSIX ops + kj::Timer& timer; + kj::Network& network; + kj::Duration idleTimeout; + kj::Duration connectTimeout; + + // Identity + kj::byte privateKey[32]; + X25519PublicKey publicKey; + kj::String publicKeyHex; + + // Address we wrote to the registry file (CIDR/IP/NFS mode). Null in Unix socket mode. + kj::Maybe boundAddress; + + struct Registration { + kj::Own file; + OfdLock lock; + }; + kj::Maybe registration; // only in NFS mode + + kj::Canceler maintenanceLoopCanceler; + + // Listener + kj::Own listener; + + // Peer cache + struct PeerInfo { + kj::Maybe outboundConnection; + kj::Maybe lastConfirmedLiveTime; + kj::Maybe lastFailedProbeTime; + bool suspect = false; + kj::Maybe suspectSince; + bool knownDead = false; + }; + kj::HashMap peers; + kj::Maybe lastDirectoryScan; + + void scanDirectory(); + void scanDirectoryIfStale(kj::TimePoint now); + kj::Promise maintenanceLoop(Registration& registration); + void cleanupFailedPeer(decltype(peers)::Entry& entry, bool wasRefusedUnix); + + NativeAddress findMatchingIp(kj::StringPtr cidr); + + // Read the address for a peer from its registry file. CIDR/IP mode only. + NativeAddress readPeerAddress(const X25519PublicKey& key); + +#else // #if __linux__ + // Just so inline methods compile + X25519PublicKey publicKey; + kj::Maybe boundAddress; + +#endif // #if __linux__, #else +}; + +} // namespace workerd diff --git a/src/workerd/server/cluster.capnp b/src/workerd/server/cluster.capnp new file mode 100644 index 00000000000..35532c83478 --- /dev/null +++ b/src/workerd/server/cluster.capnp @@ -0,0 +1,25 @@ +# Copyright (c) 2026 Cloudflare, Inc. +# Licensed under the Apache 2.0 license found in the LICENSE file or at: +# https://opensource.org/licenses/Apache-2.0 + +@0xbd0c09739255a698; + +using Cxx = import "/capnp/c++.capnp"; +$Cxx.namespace("workerd::cluster"); +$Cxx.allowCancellation; + +struct VatId { + publicKey @0 :Data; + # 32-byte raw X25519 public key. +} + +# The following are stubs for v1. They are not used because canIntroduceTo() returns +# false (the default), so the RPC system falls back to proxying for any cross-peer +# capability passing. Cap'n Proto itself has not yet fully implemented Level 3, so +# leaving these empty is fine. +struct ThirdPartyToContact {} +struct ThirdPartyToAwait {} +struct ThirdPartyCompletion {} + +# Level 4 (Join) is not supported. +struct JoinResult {} From 93b749c94486a68c9fff15e3042764b02c1cf882 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Fri, 22 May 2026 16:40:36 -0500 Subject: [PATCH 4/9] Add cluster DO ownership lock file protocol MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements ClusterLockManager, which manages the lock files that serialize ownership of Durable Objects across a workerd cluster. Each lock file is named after the actor ID and contains the owner's 32-byte X25519 public key (or is empty if unowned). acquireOrRoute() encapsulates the full protocol: - If the lock file is the expected 32-byte size, read the owner key and look it up in the ClusterRegistry. If the owner is live (or might be), return a WorkerdDebugPort capability via clusterRpc.bootstrap() — the RpcSystem short-circuits self-routing without going through the VatNetwork. - Otherwise (empty file or confirmed-dead owner), try to acquire an exclusive OFD lock. If granted, ftruncate, write our identity, fsync, and return an OwnedLock. If refused, retry with exponential backoff and jitter. No checksum is needed on the lock file payload: a single pwrite() of 32 bytes is atomic with respect to concurrent readers (both on local filesystems and over NFS, where it's a single RPC to the server). The writer's sequence (ftruncate → pwrite → fsync) means readers see either size 0 or the full 32-byte key, never a partial write. This is a deviation from the original spec, which called for a CRC32. OwnedLock is an RAII handle whose destructor truncates the file and releases the lock, returning the DO to the unowned pool. verifyStillOwned() is provided for the NFSv4 lease check that the SQLite VFS will call on every file open (Commit 4). This code (and commit message) was originally written by Opus 4.6 with a fair amount of manual cleanup. --- src/workerd/server/BUILD.bazel | 28 ++ src/workerd/server/cluster-lock-test.c++ | 323 +++++++++++++++++++++++ src/workerd/server/cluster-lock.c++ | 162 ++++++++++++ src/workerd/server/cluster-lock.h | 77 ++++++ 4 files changed, 590 insertions(+) create mode 100644 src/workerd/server/cluster-lock-test.c++ create mode 100644 src/workerd/server/cluster-lock.c++ create mode 100644 src/workerd/server/cluster-lock.h diff --git a/src/workerd/server/BUILD.bazel b/src/workerd/server/BUILD.bazel index 27979dccb68..67cf455d243 100644 --- a/src/workerd/server/BUILD.bazel +++ b/src/workerd/server/BUILD.bazel @@ -266,6 +266,22 @@ wd_cc_library( ], ) +wd_cc_library( + name = "cluster-lock", + srcs = ["cluster-lock.c++"], + hdrs = ["cluster-lock.h"], + visibility = ["//visibility:public"], + deps = [ + ":cluster-registry", + ":cluster_capnp", + "//src/workerd/io:worker-interface_capnp", + "//src/workerd/util:ofd-lock", + "@capnp-cpp//src/capnp:capnp-rpc", + "@capnp-cpp//src/kj", + "@capnp-cpp//src/kj:kj-async", + ], +) + wd_capnp_library(src = "docker-api.capnp") wd_capnp_library(src = "log-schema.capnp") @@ -425,6 +441,18 @@ kj_test( ], ) +kj_test( + src = "cluster-lock-test.c++", + deps = [ + ":cluster-lock", + ":cluster-registry", + "//src/workerd/io:worker-interface_capnp", + "@capnp-cpp//src/capnp:capnp-rpc", + "@capnp-cpp//src/kj", + "@capnp-cpp//src/kj:kj-async", + ], +) + kj_test( src = "json-logger-test.c++", deps = [ diff --git a/src/workerd/server/cluster-lock-test.c++ b/src/workerd/server/cluster-lock-test.c++ new file mode 100644 index 00000000000..980a76de145 --- /dev/null +++ b/src/workerd/server/cluster-lock-test.c++ @@ -0,0 +1,323 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "cluster-lock.h" +#include "cluster-registry.h" + +#include + +#include + +#include +#include +#include +#include +#include +#include + +namespace workerd { +namespace { + +#if __linux__ + +// Helper to create a temp directory on disk with a `registry/` and `locks/` subdir, mirroring +// the production layout (the cluster registry directory and the per-namespace locks directory +// are always separate). +class TempDir { + public: + TempDir() { + char tmpl[] = "/tmp/cluster-lock-test-XXXXXX"; + KJ_ASSERT(mkdtemp(tmpl) != nullptr); + pathStr = kj::str(tmpl); + disk = kj::newDiskFilesystem(); + root = disk->getRoot().openSubdir(kj::Path::parse(pathStr.slice(1)), // remove leading / + kj::WriteMode::MODIFY); + locksDir = root->openSubdir(kj::Path({"locks"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + } + + ~TempDir() noexcept(false) { + auto p = kj::Path::parse(pathStr.slice(1)); + disk->getRoot().remove(p); + } + + kj::Own registry() { + return root->openSubdir(kj::Path({"registry"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + } + + kj::Own locks() { + return locksDir->clone(); + } + + // Read the content of a lock file directly (for asserting that the file is in the expected state). + kj::Array readLockFile(kj::StringPtr name) { + auto file = KJ_ASSERT_NONNULL(locksDir->tryOpenFile(kj::Path({name}))); + return file->readAllBytes(); + } + + // Write raw bytes to a lock file (for setting up test scenarios). + void writeLockFile(kj::StringPtr name, kj::ArrayPtr data) { + auto file = locksDir->openFile(kj::Path({name}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + file->writeAll(data); + } + + private: + kj::String pathStr; + kj::Own disk; + kj::Own root; + kj::Own locksDir; +}; + +// Stub WorkerdDebugPort server used as a per-node bootstrap. We don't actually invoke methods on +// it from these tests; we just need a capability to hand to RpcSystem. +class StubDebugPort final: public rpc::WorkerdDebugPort::Server {}; + +KJ_TEST("ClusterLockManager: unowned (empty file) -> acquire succeeds") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc = capnp::makeRpcServer(reg, capnp::Capability::Client(kj::heap())); + + ClusterLockManager lockManager(tmpDir.locks(), reg, rpc, io.provider->getTimer()); + + auto result = lockManager.acquireOrRoute("a1").wait(io.waitScope); + KJ_ASSERT(result.is()); + + // The lock file should now contain our key. + auto content = tmpDir.readLockFile("a1"); + KJ_ASSERT(content.size() == 32); + KJ_EXPECT(memcmp(content.begin(), reg.getPublicKey().bytes, 32) == 0); +} + +KJ_TEST("ClusterLockManager: owned by self -> returns self-bootstrap, no wire traffic") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc = capnp::makeRpcServer(reg, capnp::Capability::Client(kj::heap())); + + ClusterLockManager lockManager(tmpDir.locks(), reg, rpc, io.provider->getTimer()); + + // First acquire to populate the lock file with our own key. + auto ownedResult = lockManager.acquireOrRoute("a2").wait(io.waitScope); + KJ_ASSERT(ownedResult.is()); + + // Now simulate someone else also calling acquireOrRoute on the same actor (i.e. a forwarding + // path). With the OwnedLock still held, the lock file is non-empty and our key is the owner. + // acquireOrRoute should return a bootstrap client (routing to ourselves). + auto routeResult = lockManager.acquireOrRoute("a2").wait(io.waitScope); + KJ_ASSERT(routeResult.is()); +} + +KJ_TEST("ClusterLockManager: stale owner (peer dead) -> claim path runs") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc = capnp::makeRpcServer(reg, capnp::Capability::Client(kj::heap())); + + // Manually create a lock file naming a non-existent peer as owner. Since `deadKey`'s registry + // file doesn't exist in tmpDir, isPeerDead(deadKey) returns true and the claim path should run. + X25519PublicKey deadKey; + memset(deadKey.bytes, 0xAB, sizeof(deadKey.bytes)); + + tmpDir.writeLockFile("a3", kj::arrayPtr(deadKey.bytes, 32)); + + // Confirm isPeerDead returns true for the dead key. + KJ_EXPECT(reg.isPeerDead(deadKey)); + + ClusterLockManager lockManager(tmpDir.locks(), reg, rpc, io.provider->getTimer()); + + // acquireOrRoute should run the claim path and succeed. + auto result = lockManager.acquireOrRoute("a3").wait(io.waitScope); + KJ_ASSERT(result.is()); + + // The lock file should now contain *our* key. + auto content = tmpDir.readLockFile("a3"); + KJ_ASSERT(content.size() == 32); + KJ_EXPECT(memcmp(content.begin(), reg.getPublicKey().bytes, 32) == 0); +} + +KJ_TEST("ClusterLockManager: wrong-sized stale content + writer dead -> claim immediately") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc = capnp::makeRpcServer(reg, capnp::Capability::Client(kj::heap())); + + // Write a lock file of unexpected size (e.g. a crashed writer left a half-written file). + // Nobody holds the lock, so the claim path should succeed. + kj::byte garbage[10]; + memset(garbage, 0xFF, sizeof(garbage)); + tmpDir.writeLockFile("a4", kj::arrayPtr(garbage, sizeof(garbage))); + + ClusterLockManager lockManager(tmpDir.locks(), reg, rpc, io.provider->getTimer()); + + auto result = lockManager.acquireOrRoute("a4").wait(io.waitScope); + KJ_ASSERT(result.is()); + + // The lock file should now contain our key. + auto content = tmpDir.readLockFile("a4"); + KJ_ASSERT(content.size() == 32); + KJ_EXPECT(memcmp(content.begin(), reg.getPublicKey().bytes, 32) == 0); +} + +KJ_TEST("ClusterLockManager: OwnedLock destructor truncates and releases") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc = capnp::makeRpcServer(reg, capnp::Capability::Client(kj::heap())); + + ClusterLockManager lockManager(tmpDir.locks(), reg, rpc, io.provider->getTimer()); + + { + auto result = lockManager.acquireOrRoute("a5").wait(io.waitScope); + KJ_ASSERT(result.is()); + // Lock file should have our key. + auto content = tmpDir.readLockFile("a5"); + KJ_ASSERT(content.size() == 32); + } + + // After OwnedLock destructor, the file should be truncated to 0 bytes (and the OFD lock + // released). + auto content = tmpDir.readLockFile("a5"); + KJ_EXPECT(content.size() == 0); + + // A second acquire should now succeed via the claim path. + auto result2 = lockManager.acquireOrRoute("a5").wait(io.waitScope); + KJ_ASSERT(result2.is()); +} + +KJ_TEST("ClusterLockManager: owned by live peer -> returns bootstrap client") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + // Stand up two registries in the same dir (this creates registry entries for both, so neither + // is "dead"). + ClusterRegistry reg1( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + ClusterRegistry reg2( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc1 = capnp::makeRpcServer(reg1, capnp::Capability::Client(kj::heap())); + + // reg1 owns a ClusterLockManager; we'll set up a lock file naming reg2 as the owner. + // reg2 is alive (its registry file exists in tmpDir), so isPeerDead(reg2) returns false, + // and acquireOrRoute should return a bootstrap client without falling through to claim. + // + // Note: actor IDs aren't hex of 64 chars, so they won't collide with registry filenames. + tmpDir.writeLockFile("a7", kj::arrayPtr(reg2.getPublicKey().bytes, 32)); + + KJ_EXPECT(!reg1.isPeerDead(reg2.getPublicKey())); + + ClusterLockManager lockManager(tmpDir.locks(), reg1, rpc1, io.provider->getTimer()); + + auto result = lockManager.acquireOrRoute("a7").wait(io.waitScope); + KJ_ASSERT(result.is()); + + // The lock file should NOT have been modified (we did not claim). + auto content = tmpDir.readLockFile("a7"); + KJ_ASSERT(content.size() == 32); + KJ_EXPECT(memcmp(content.begin(), reg2.getPublicKey().bytes, 32) == 0); +} + +KJ_TEST("ClusterLockManager: writer alive -> retries until writer releases") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc = capnp::makeRpcServer(reg, capnp::Capability::Client(kj::heap())); + + // Open the lock file from outside the ClusterLockManager and take an exclusive lock. This + // simulates a writer that holds the lock but hasn't yet flushed its key. The file is left + // empty (matching the freshly-truncated state during the writer's claim path). + auto externalFile = + tmpDir.locks()->openFile(kj::Path({"a8"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + int externalFd = KJ_REQUIRE_NONNULL(externalFile->getFd()); + + auto externalLock = KJ_ASSERT_NONNULL(OfdLock::tryLock(externalFd, OfdLock::EXCLUSIVE)); + + ClusterLockManager lockManager(tmpDir.locks(), reg, rpc, io.provider->getTimer()); + + // acquireOrRoute should see an empty file, attempt to lock, fail, then retry with backoff. + auto promise = lockManager.acquireOrRoute("a8"); + + bool completed = false; + auto trackedPromise = promise.then( + [&](kj::OneOf&& r) { + completed = true; + return kj::mv(r); + }); + + // Pump the event loop briefly; the implementation should be stuck in its backoff loop. + io.provider->getTimer().afterDelay(50 * kj::MILLISECONDS).wait(io.waitScope); + KJ_EXPECT(!completed, "acquireOrRoute should still be waiting for the external lock"); + + // Now the "external writer" writes its key (our own, for simplicity) and releases the lock. + // acquireOrRoute, on its next iteration, should see the valid key, recognize the owner as us, + // and return a bootstrap client. + externalFile->write(0, kj::arrayPtr(reg.getPublicKey().bytes, 32)); + { auto _ = kj::mv(externalLock); } + + auto result = trackedPromise.wait(io.waitScope); + KJ_EXPECT(completed); + KJ_ASSERT(result.is()); +} + +KJ_TEST("ClusterLockManager: concurrent acquisitions, exactly one wins") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc = capnp::makeRpcServer(reg, capnp::Capability::Client(kj::heap())); + + ClusterLockManager lockManager(tmpDir.locks(), reg, rpc, io.provider->getTimer()); + + // Launch several concurrent calls. + constexpr uint N = 5; + kj::Vector>> + promises; + for (uint i = 0; i < N; ++i) { + promises.add(lockManager.acquireOrRoute("a9")); + } + + auto results = kj::joinPromises(promises.releaseAsArray()).wait(io.waitScope); + + uint ownedCount = 0; + uint routedCount = 0; + for (auto& r: results) { + if (r.is()) { + ++ownedCount; + } else { + ++routedCount; + } + } + // Exactly one should have won. + KJ_EXPECT(ownedCount == 1); + KJ_EXPECT(routedCount == N - 1); +} + +#else // #if __linux__ + +KJ_TEST("dummy test: platform not supported") {} + +#endif // #if __linux__, #else + +} // namespace +} // namespace workerd diff --git a/src/workerd/server/cluster-lock.c++ b/src/workerd/server/cluster-lock.c++ new file mode 100644 index 00000000000..c3480e8a95d --- /dev/null +++ b/src/workerd/server/cluster-lock.c++ @@ -0,0 +1,162 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "cluster-lock.h" + +#include +#include +#include + +namespace workerd { + +namespace { + +// The lock file contains exactly one X25519 public key (the owner). No checksum is needed: a +// single write of `sizeof(X25519PublicKey::bytes)` bytes is atomic with respect to concurrent +// readers on Linux (and likewise on NFS, where it's a single RPC to the server). The writer's +// protocol is truncate(0) → write(full key) → sync, so a reader interleaving sees either size +// 0 (treat as unowned, run claim path) or the full key. + +constexpr kj::Duration INITIAL_BACKOFF = 10 * kj::MILLISECONDS; +constexpr kj::Duration MAX_BACKOFF = 1 * kj::SECONDS; + +// Build a `cluster::VatId` Reader for a given public key, backed by an internal `MessageBuilder`. +class VatIdHolder { + public: + explicit VatIdHolder(const X25519PublicKey& key): msg(scratch) { + auto vatId = msg.initRoot(); + vatId.setPublicKey(kj::ArrayPtr(key.bytes)); + } + + cluster::VatId::Reader getReader() { + return msg.getRoot().asReader(); + } + + private: + capnp::word scratch[8]{}; + capnp::MallocMessageBuilder msg; +}; + +// Compute the next backoff delay with ±25% jitter. `attempt` starts at 0. +kj::Duration computeBackoff(uint attempt) { + // Exponential: 10ms, 20ms, 40ms, ..., capped at 1s. + kj::Duration base = INITIAL_BACKOFF; + for (uint i = 0; i < attempt && base < MAX_BACKOFF; ++i) { + base = base * 2; + } + if (base > MAX_BACKOFF) base = MAX_BACKOFF; + + // Jitter: multiply by a random value in [0.75, 1.25]. Use a cheap pseudo-random source. + auto nanos = + (kj::systemPreciseMonotonicClock().now() - kj::origin()) / kj::NANOSECONDS; + uint32_t r = kj::hashCode(nanos); + double jitter = 0.75 + (r / static_cast(0xFFFFFFFFu)) * 0.5; + return base * static_cast(jitter * 1000) / 1000; +} + +} // namespace + +// ======================================================================================= +// ClusterLockManager::OwnedLock + +ClusterLockManager::OwnedLock::~OwnedLock() noexcept(false) { + if (file.get() != nullptr) { + // Truncate the file back to zero so the next claimant sees an empty file. We then drop the + // OFD lock (via OfdLock's destructor) which makes the file available for other nodes to claim. + // + // Best-effort: don't throw from the destructor. + KJ_TRY { + file->truncate(0); + } + KJ_CATCH(e) { + KJ_LOG(WARNING, "truncate(0) on cluster lock file failed during release", e); + } + } +} + +// ======================================================================================= +// ClusterLockManager + +ClusterLockManager::ClusterLockManager(kj::Own dir, + ClusterRegistry& registry, + capnp::RpcSystem& clusterRpc, + kj::Timer& timer) + : dir(kj::mv(dir)), + registry(registry), + clusterRpc(clusterRpc), + timer(timer) {} + +kj::Promise> +ClusterLockManager::acquireOrRoute(kj::StringPtr actorId) { + // Validate that the actor ID is a hex string. This is the only form of actor ID we expect on + // this path (durable actors named by a 64-char hex SHA-256 hash, or by `idFromName` which also + // produces hex). Enforcing hex incidentally guarantees the ID is safe to use as a filename. + for (char c: actorId) { + KJ_REQUIRE((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F'), + "actor ID is not a hex string", actorId); + } + KJ_REQUIRE(actorId.size() > 0, "actor ID is empty"); + + kj::Path path({actorId}); + + for (uint attempt = 0;; ++attempt) { + if (attempt > 0) { + co_await timer.afterDelay(computeBackoff(attempt - 1)); + } + + // Open or create the lock file. + auto file = dir->openFile(path, kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + + // Read the owner key directly. read() returns: + // 0 → file is empty (freshly created, or previous owner cleanly + // released) + // sizeof(ownerKey.bytes) → full key present + // anything else → unexpected partial state (e.g. writer crashed mid-protocol) + // + // Since the key has a fixed width, we can assume that if we read the entire key, it is the + // complete and valid key. If the key is not done being written, we could get a short read + // (unlikely, since you'd expect a 32-byte read to always be atomic, but it's not guaranteed). + X25519PublicKey ownerKey; + auto n = file->read(0, ownerKey.bytes); + + if (n == sizeof(ownerKey.bytes)) { + if (!registry.isPeerDead(ownerKey)) { + // Trust the lock file. Build a VatId and ask the RpcSystem to bootstrap. + // The owner might be us — clusterRpc.bootstrap() returns the local bootstrap directly + // without going through ClusterRegistry::connect() in that case. + // + // If the owner is actually unreachable, the bootstrap call will fail later. Internally, + // ClusterRegistry::ConnectionImpl::unregister() runs the dead-peer cleanup probe; + // if the peer's registry file is unlocked, the file is unlinked. Whoever retries + // acquireOrRoute() will then see isPeerDead(ownerKey) return true and fall through to + // the claim path. + VatIdHolder holder(ownerKey); + co_return clusterRpc.bootstrap(holder.getReader()).castAs(); + } + + // Owner is confirmed dead. Fall through to the claim path. + } + // else: n == 0 (empty file) or n is some unexpected partial size. In either case, the claim + // path is the right next step: if a live writer is currently producing the file, they + // hold the exclusive lock and our tryLock will fail; if no one holds the lock, we'll + // succeed and overwrite whatever was there. + + // Claim path. Try to acquire an exclusive OFD lock. + int fd = KJ_REQUIRE_NONNULL(file->getFd(), "lock directory must be disk-backed"); + KJ_IF_SOME(lock, OfdLock::tryLock(fd, OfdLock::EXCLUSIVE)) { + // We got the exclusive lock. Clear any stale content and write our identity. + file->truncate(0); + + const auto& myKey = registry.getPublicKey(); + file->write(0, myKey.bytes); + file->sync(); + + co_return OwnedLock(kj::mv(file), kj::mv(lock)); + } + + // tryLock failed — someone else owns it (or is racing us to claim it). Retry. + } +} + +} // namespace workerd diff --git a/src/workerd/server/cluster-lock.h b/src/workerd/server/cluster-lock.h new file mode 100644 index 00000000000..d259968a48e --- /dev/null +++ b/src/workerd/server/cluster-lock.h @@ -0,0 +1,77 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace workerd { + +// Manages a directory of lock files for DO ownership. Each lock file is named after the actor ID +// and contains the owning node's public key (or is empty if unowned). +// +// The ownership protocol: +// 1. Open/create the lock file. +// 2. Read it. If non-empty and CRC32 valid, parse the owner key. +// - If !registry.isPeerDead(ownerKey): route via clusterRpc.bootstrap(vatIdFor(ownerKey)). +// - If registry.isPeerDead(ownerKey): fall through to the claim path. +// 3. Claim path: tryLock(EXCLUSIVE). If granted, ftruncate, write our key, fsync, +// return OwnedLock. If refused, retry with backoff. +class ClusterLockManager { + public: + ClusterLockManager(kj::Own dir, + ClusterRegistry& registry, + capnp::RpcSystem& clusterRpc, + kj::Timer& timer); + + // RAII ownership handle. Holding this object means this node owns the DO. + // Destruction truncates the lock file and releases the exclusive OFD lock, + // making the DO available for other nodes to claim. + class OwnedLock { + public: + ~OwnedLock() noexcept(false); + OwnedLock(OwnedLock&&) = default; + OwnedLock& operator=(OwnedLock&&) = default; + KJ_DISALLOW_COPY(OwnedLock); + + private: + OwnedLock(kj::Own file, OfdLock lock): file(kj::mv(file)), lock(kj::mv(lock)) {} + + kj::Own file; + OfdLock lock; + friend class ClusterLockManager; + }; + + // The core routing operation. Either acquires local ownership or returns a capability for + // reaching the current owner. + // + // Encapsulates the full protocol including retries with backoff, registry liveness + // cross-referencing, and capability resolution (with retry on stale owners). + // + // Returns: + // OwnedLock — this node has newly acquired ownership (caller should start the DO). + // rpc::WorkerdDebugPort::Client — the DO has an existing owner. The caller calls + // getActor() on this to reach the actor. The owner might be this node, in which case + // the RpcSystem returns the local self-bootstrap directly (no wire traffic). The + // caller does not need to distinguish local vs remote. + kj::Promise> acquireOrRoute( + kj::StringPtr actorId); + + private: + kj::Own dir; + ClusterRegistry& registry; + capnp::RpcSystem& clusterRpc; + kj::Timer& timer; +}; + +} // namespace workerd From a63a0e2ecce5c775d003830e6de7ed2d7ccb4264 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sat, 23 May 2026 15:25:05 -0500 Subject: [PATCH 5/9] Add a post-open callback to SQLite. This callback is invoked immediately after `open()` opens a file. In NFS mode we'll use this hook to verify the new open was opened on the same NFS lease as the registry lock. --- src/workerd/util/sqlite.c++ | 29 +++++++++++++++++++++++++++-- src/workerd/util/sqlite.h | 5 +++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/src/workerd/util/sqlite.c++ b/src/workerd/util/sqlite.c++ index a65a7db84bb..35cee8a45b0 100644 --- a/src/workerd/util/sqlite.c++ +++ b/src/workerd/util/sqlite.c++ @@ -1982,7 +1982,7 @@ sqlite3_vfs SqliteDatabase::Vfs::makeWrappedNativeVfs() { file->pMethods = nullptr; // Set up currentVfsRoot. - auto& self = *reinterpret_cast(vfs->pAppData); + auto& self = *reinterpret_cast(vfs->pAppData); KJ_ASSERT(currentVfsRoot == AT_FDCWD); currentVfsRoot = self.rootFd; KJ_DEFER(currentVfsRoot = AT_FDCWD); @@ -1994,6 +1994,16 @@ sqlite3_vfs SqliteDatabase::Vfs::makeWrappedNativeVfs() { if (file->pMethods == nullptr) { wrapper->pMethods = nullptr; } else { + KJ_IF_SOME(afterOpen, self.options.afterOpen) { + try { + afterOpen(); + } catch (kj::Exception& e) { + reportVfsErrorCaught(kj::mv(e)); + file->pMethods->xClose(file); + wrapper->pMethods = nullptr; + return SQLITE_CANTOPEN; + } + } wrapper->pMethods = &WrappedNativeFileImpl::METHOD_TABLE; wrapper->vfs = &self; wrapper->rootFd = self.rootFd; @@ -2298,7 +2308,7 @@ sqlite3_vfs SqliteDatabase::Vfs::makeKjVfs() { .pAppData = this, #define WRAP_METHOD(errorCode, block) \ - auto& self KJ_UNUSED = *static_cast(vfs->pAppData); \ + auto& self KJ_UNUSED = *static_cast(vfs->pAppData); \ try block catch (kj::Exception& e) { \ KJ_LOG(ERROR, "SQLite VFS I/O error", e); \ return errorCode; \ @@ -2315,6 +2325,11 @@ sqlite3_vfs SqliteDatabase::Vfs::makeKjVfs() { auto path = kj::Path::parse(zName); auto kjFile = KJ_UNWRAP_OR(self.directory.tryOpenFile(path), { return SQLITE_CANTOPEN; }); + KJ_IF_SOME(afterOpen, self.options.afterOpen) { + afterOpen(); + } else { + // sqelch spurious "dangling else" warning from clang + } kj::Maybe> lock; if (flags & SQLITE_OPEN_MAIN_DB) { lock = self.lockManager.lock(path, *kjFile); @@ -2330,6 +2345,11 @@ sqlite3_vfs SqliteDatabase::Vfs::makeKjVfs() { KJ_ASSERT(flags & SQLITE_OPEN_DELETEONCLOSE); KJ_ASSERT(!(flags & SQLITE_OPEN_MAIN_DB), "main DB can't be a temporary file"); kjFile = self.directory.createTemporary(); + KJ_IF_SOME(afterOpen, self.options.afterOpen) { + afterOpen(); + } else { + // sqelch spurious "dangling else" warning from clang + } } else { kj::WriteMode mode; if (flags & SQLITE_OPEN_CREATE) { @@ -2345,6 +2365,11 @@ sqlite3_vfs SqliteDatabase::Vfs::makeKjVfs() { auto path = kj::Path::parse(zName); kjFile = KJ_UNWRAP_OR(self.directory.tryOpenFile(path, mode), { return SQLITE_CANTOPEN; }); + KJ_IF_SOME(afterOpen, self.options.afterOpen) { + afterOpen(); + } else { + // sqelch spurious "dangling else" warning from clang + } if (flags & SQLITE_OPEN_MAIN_DB) { lock = self.lockManager.lock(path, *kjFile); } diff --git a/src/workerd/util/sqlite.h b/src/workerd/util/sqlite.h index 64623d5dbd0..be8448cc43a 100644 --- a/src/workerd/util/sqlite.h +++ b/src/workerd/util/sqlite.h @@ -783,6 +783,11 @@ struct SqliteDatabase::VfsOptions { // will fall back to the native VFS implementation. In that case, the options you set here will // be ORed with the ones set by the underlying VFS. int deviceCharacteristics = 0x00001000; // = SQLITE_FCNTL_POWERSAFE_OVERWRITE + + // Called immediately after SQLite opens any file through this VFS. This is used by clustered + // Durable Object storage to verify that the original NFS lease is still valid and therefore + // covers the newly-opened file. + kj::Maybe> afterOpen; }; // Implements a SQLite VFS based on a KJ directory. From e361e1a494874d1d1cc3d1df13f51e56f5ca35a5 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sat, 23 May 2026 15:27:22 -0500 Subject: [PATCH 6/9] Extend ChannelTokenHandler to use shared keys in cluster mode. In cluster mode, all members of the cluster need to use the same key for channel tokens, so they can exchange them. --- src/workerd/server/channel-token.c++ | 27 +++++++++++++++++++++++++-- src/workerd/server/channel-token.h | 2 +- src/workerd/server/server.c++ | 28 ++++++++++++++++++++-------- src/workerd/server/server.h | 4 +++- 4 files changed, 49 insertions(+), 12 deletions(-) diff --git a/src/workerd/server/channel-token.c++ b/src/workerd/server/channel-token.c++ index da16b81bd5b..ee9fc9f0c92 100644 --- a/src/workerd/server/channel-token.c++ +++ b/src/workerd/server/channel-token.c++ @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -21,9 +22,31 @@ namespace workerd::server { -ChannelTokenHandler::ChannelTokenHandler(Resolver& resolver): resolver(resolver) { - getEntropy(tokenKey); +ChannelTokenHandler::ChannelTokenHandler(Resolver& resolver, kj::Maybe clusterKey) + : resolver(resolver) { + KJ_IF_SOME(key, clusterKey) { + // All nodes in the cluster must derive the same key, based on `clusterKey`. We HMAC the + // `clusterKey` with a randomly-chosen salt to derive the channel token key. + + // clang-format off + static constexpr byte SALT[] = { + 0x9a, 0x24, 0x71, 0x39, 0x1b, 0x85, 0xce, 0x97, + 0x6c, 0xf9, 0x1c, 0xdf, 0x93, 0xc0, 0xc6, 0x36, + 0xb9, 0xd2, 0x09, 0xfe, 0x01, 0xde, 0xb1, 0x9a, + 0xda, 0xd3, 0x8e, 0x76, 0xbb, 0xae, 0xeb, 0x89, + }; + // clang-format on + + uint outLen = 0; + KJ_ASSERT(HMAC(EVP_sha256(), SALT, sizeof(SALT), key.asBytes().begin(), key.size(), tokenKey, + &outLen) != nullptr); + KJ_ASSERT(outLen == sizeof(tokenKey)); + } else { + // Single-instance, generate a new key for every run. + getEntropy(tokenKey); + } + // Construct key ID by hashing the key. SHA256_CTX ctx{}; KJ_ASSERT(SHA256_Init(&ctx)); KJ_ASSERT(SHA256_Update(&ctx, tokenKey, sizeof(tokenKey))); diff --git a/src/workerd/server/channel-token.h b/src/workerd/server/channel-token.h index ef075e3c87a..ceaaf9141ff 100644 --- a/src/workerd/server/channel-token.h +++ b/src/workerd/server/channel-token.h @@ -34,7 +34,7 @@ class ChannelTokenHandler { kj::StringPtr serviceName, kj::Maybe entrypoint, Frankenvalue props) = 0; }; - explicit ChannelTokenHandler(Resolver& resolver); + explicit ChannelTokenHandler(Resolver& resolver, kj::Maybe clusterKey = kj::none); // Helpers to implement `IoChannelFactory::{SubrequestChannel,ActorClassChannel}::getToken()`. kj::Array encodeSubrequestChannelToken(IoChannelFactory::ChannelTokenUsage usage, diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 95dafe1a09b..7c4d719577b 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -191,7 +191,6 @@ Server::Server(kj::Filesystem& fs, reportConfigError(kj::mv(reportConfigError)), loggingOptions(loggingOptions), memoryCacheProvider(kj::heap(timer)), - channelTokenHandler(*this), tasks(*this) {} struct Server::GlobalContext { @@ -5020,13 +5019,13 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr kj::Maybe serviceName; if (!def.isDynamic) serviceName = name; - auto result = - kj::refcounted(channelTokenHandler, serviceName, globalContext->threadContext, - monotonicClock, kj::mv(worker), kj::mv(errorReporter.defaultEntrypoint), - kj::mv(errorReporter.namedEntrypoints), kj::mv(errorReporter.actorClasses), - kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors), - KJ_BIND_METHOD(*this, deleteAllActors), kj::mv(dockerPath), - kj::mv(containerEgressInterceptorImage), def.isDynamic, kj::mv(abortIsolateCallback)); + auto result = kj::refcounted(KJ_ASSERT_NONNULL(channelTokenHandler), serviceName, + globalContext->threadContext, monotonicClock, kj::mv(worker), + kj::mv(errorReporter.defaultEntrypoint), kj::mv(errorReporter.namedEntrypoints), + kj::mv(errorReporter.actorClasses), kj::mv(linkCallback), + KJ_BIND_METHOD(*this, abortAllActors), KJ_BIND_METHOD(*this, deleteAllActors), + kj::mv(dockerPath), kj::mv(containerEgressInterceptorImage), def.isDynamic, + kj::mv(abortIsolateCallback)); result->initActorNamespaces(def.localActorConfigs, network); co_return result; } @@ -5782,6 +5781,8 @@ kj::Promise Server::run( loggingOptions.structuredLogging = StructuredLogging(config.getStructuredLogging()); } + setupChannelTokenHandler(config); + kj::HttpHeaderTable::Builder headerTableBuilder; globalContext = kj::heap(*this, v8System, headerTableBuilder); invalidConfigServiceSingleton = kj::refcounted(); @@ -5804,6 +5805,15 @@ kj::Promise Server::run( co_return co_await listenPromise.exclusiveJoin(kj::mv(fatalPromise)); } +void Server::setupChannelTokenHandler(config::Config::Reader config) { + ChannelTokenHandler::Resolver& tokenResolver = *this; + kj::Maybe channelTokenKey; + if (config.hasCluster()) { + channelTokenKey = config.getCluster().getKey(); + } + channelTokenHandler.emplace(tokenResolver, channelTokenKey); +} + // Configure and start the inspector socket, returning the port the socket started on. uint startInspector( kj::StringPtr inspectorAddress, Server::InspectorServiceIsolateRegistrar& registrar) { @@ -6220,6 +6230,8 @@ kj::Promise Server::test(jsg::V8System& v8System, loggingOptions.structuredLogging = StructuredLogging(config.getStructuredLogging()); } + setupChannelTokenHandler(config); + kj::HttpHeaderTable::Builder headerTableBuilder; globalContext = kj::heap(*this, v8System, headerTableBuilder); invalidConfigServiceSingleton = kj::refcounted(); diff --git a/src/workerd/server/server.h b/src/workerd/server/server.h index 4ddfd5447a0..6e336936c81 100644 --- a/src/workerd/server/server.h +++ b/src/workerd/server/server.h @@ -156,7 +156,7 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl kj::Own memoryCacheProvider; - ChannelTokenHandler channelTokenHandler; + kj::Maybe channelTokenHandler; kj::HashMap>> socketOverrides; kj::HashMap directoryOverrides; @@ -339,6 +339,8 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl void unlinkWorkerLoaders(); + void setupChannelTokenHandler(config::Config::Reader config); + kj::Promise preloadPython( kj::StringPtr workerName, const WorkerDef& workerDef, ErrorReporter& errorReporter); From d48022df59b1f61328dedeb89f4fa74a7c8eafea Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sat, 23 May 2026 15:32:47 -0500 Subject: [PATCH 7/9] Extend WorkerdDebugPort to include actor name with ID. If the actor ID was derived from a name, it's important to pass the name along with the ID so that `ctx.id.name` works correctly. --- src/workerd/io/worker-interface.capnp | 5 ++++- src/workerd/server/server.c++ | 6 +++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index f071776bca5..b0c054c7b0a 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -905,7 +905,10 @@ interface WorkerdDebugPort { -> (entrypoint :WorkerdBootstrap); # Get direct access to a stateless entrypoint. - getActor @1 (service :Text, entrypoint :Text, actorId :Text) -> (actor :WorkerdBootstrap); + getActor @1 (service :Text, entrypoint :Text, actorId :Text, actorName :Text) + -> (actor :WorkerdBootstrap); # Get an actor (Durable Object) stub. # The actorId should be a hex string for Durable Objects or a plain string for ephemeral actors. + # `actorName` may optionally be specified for Durable Objects, if `actorId` is derived from the + # name. } diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 7c4d719577b..e4b4e59f0ae 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -5715,8 +5715,12 @@ class Server::DebugPortListener { auto decoded = kj::decodeHex(actorIdStr); KJ_REQUIRE(decoded.size() == SHA256_DIGEST_LENGTH, "Invalid Durable Object ID: expected 64 hex characters (32 bytes)", decoded.size()); + kj::Maybe name; + if (params.hasActorName()) { + name = params.getActorName().clone(); + } kj::Own id = - kj::heap(decoded.begin(), kj::none); + kj::heap(decoded.begin(), kj::mv(name)); actorId = kj::mv(id); } KJ_CASE_ONEOF(c, Ephemeral) { From 33b237258eba6139acf9b1300a319c791a8a07ab Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sat, 23 May 2026 16:58:22 -0500 Subject: [PATCH 8/9] Make self-hosted Durable Objects scalable across a cluster. (This is the commit that wires everything up, but this commit message describes the overall series.) workerd has always been intended to be something you can run in production, in order to self-host Workers outside of Cloudflare. For many use cases it does in fact work for this. But, there has been one missing piece: Durable Objects. workerd has always supported running Durable Objects for the purpose of local testing, but only within the scope of a single instance running on a single thread. Any Durable Objects created would run within that same workerd instance. But in production, you probably want to utilize more than one thread on one machine. For stateless Workers, this could be done just fine: just run multiple instances of workerd and load-balance across them. But doing this completely broke the model of Durable Objects, where each object is supposed to have a single instance globally, not one per workerd instance! This change fixes the problem, by introducing a new "cluster" mode. In cluster mode, you can run several workerd instances that are aware of each other and able to route requests to each other. All instances in a cluster run the same config. Mostly these workerd instances behave like they would normally, except when a request is made to a Durable Object, they coordinate to make sure only one workerd instance owns the DO, and others route to it. The design assumes a shared filesystem for underlying DO storage. All instances must be on the same filesystem. If all instances are on the same machine (useful for utilizing multiple cores), then this can be any local filesystem. Otherwise, it must be NFSv4, or some network filesystem that has exactly the same lock/lease semantics as NFSv4. The shared filesystem is also used for service discovery and locking. This is unconventional, but has a major advantage vs something like etcd or Consul: If a node loses its NFS lease, it simultaneously loses its locks *and* loses the ability to write to any open files. This provides "fencing": there's no way a node could continue writing after other nodes believe that it is dead. If locking were provided by a separate service, then it becomes extremely difficult to ensure that a node can't accidentally write after losing its lock due to a timeout. A complete design doc can be found here: https://gist.github.com/kentonv/cd8001237dc1181058193de0e7509972 The "Implementation Plan" in the design doc was largely written by AI (with several rounds of revision guided by me); the sections before that were largely written by me (with some AI review and minor changes). GPT 5.5, Opus 4.6, and Opus 4.7 were all employed. A blog post will come later. This particular commit was initially written by GPT 5.5 but heavily refactored by hand. --- src/workerd/server/BUILD.bazel | 5 + src/workerd/server/server.c++ | 302 +++++++++++++++++++++++++++++++-- src/workerd/server/server.h | 5 + 3 files changed, 294 insertions(+), 18 deletions(-) diff --git a/src/workerd/server/BUILD.bazel b/src/workerd/server/BUILD.bazel index 67cf455d243..f11b2ad1a39 100644 --- a/src/workerd/server/BUILD.bazel +++ b/src/workerd/server/BUILD.bazel @@ -212,6 +212,7 @@ wd_cc_library( ":channel-token_capnp", "//src/workerd/io", "//src/workerd/util:entropy", + "@ssl", ], ) @@ -228,6 +229,9 @@ wd_cc_library( ":alarm-scheduler", ":channel-token", ":channel-token_capnp", + ":cluster-lock", + ":cluster-registry", + ":cluster_capnp", ":container-client", ":facet-tree-index", ":fallback-service", @@ -244,6 +248,7 @@ wd_cc_library( "//src/workerd/jsg", "//src/workerd/util:perfetto", "//src/workerd/util:websocket-error-handler", + "@capnp-cpp//src/capnp:capnp-rpc", "@capnp-cpp//src/kj/compat:kj-gzip", "@capnp-cpp//src/kj/compat:kj-tls", ], diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index e4b4e59f0ae..ac723793d86 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -5,6 +5,7 @@ #include "server.h" #include "alarm-scheduler.h" +#include "cluster-lock.h" #include "container-client.h" #include "pyodide.h" #include "workerd-api.h" @@ -46,6 +47,7 @@ #include #include #include +#include #include #include #include @@ -1915,6 +1917,8 @@ class Server::WorkerService final: public Service, kj::Array> streamingTails; kj::Array> workerLoaders; kj::Maybe workerdDebugPortNetwork; + kj::Maybe clusterRegistry; + kj::Maybe&> clusterRpc; }; using LinkCallback = kj::Function; @@ -1971,9 +1975,11 @@ class Server::WorkerService final: public Service, } auto actorClass = kj::refcounted(*this, entry.key, Frankenvalue()); - auto ns = kj::heap(kj::mv(actorClass), entry.value, - kj::systemPreciseCalendarClock(), threadContext.getUnsafeTimer(), - threadContext.getByteStreamFactory(), channelTokenHandler, network, dockerPath, + auto ns = kj::heap(kj::mv(actorClass), + // Dynamic workers can't have actor namespaces, so `serviceName` must be available here. + KJ_ASSERT_NONNULL(serviceName), entry.key, entry.value, kj::systemPreciseCalendarClock(), + threadContext.getUnsafeTimer(), threadContext.getByteStreamFactory(), + threadContext.getHttpOverCapnpFactory(), channelTokenHandler, network, dockerPath, containerEgressInterceptorImage, waitUntilTasks); actorNamespaces.insert(entry.key, kj::mv(ns)); } @@ -2078,7 +2084,7 @@ class Server::WorkerService final: public Service, auto linked = callback(*this, errorReporter); for (auto& ns: actorNamespaces) { - ns.value->link(linked.actorStorage); + ns.value->link(linked.actorStorage, linked.clusterRegistry, linked.clusterRpc); } ioChannels = kj::mv(linked); @@ -2248,35 +2254,61 @@ class Server::WorkerService final: public Service, class ActorNamespace final { public: ActorNamespace(kj::Own actorClass, + kj::StringPtr serviceName, + kj::StringPtr className, const ActorConfig& config, const kj::Clock& clock, kj::Timer& timer, capnp::ByteStreamFactory& byteStreamFactory, + capnp::HttpOverCapnpFactory& httpOverCapnpFactory, ChannelTokenHandler& channelTokenHandler, kj::Network& dockerNetwork, kj::Maybe dockerPath, kj::Maybe containerEgressInterceptorImage, kj::TaskSet& waitUntilTasks) : actorClass(kj::mv(actorClass)), + serviceName(serviceName), + className(className), config(config), clock(clock), timer(timer), byteStreamFactory(byteStreamFactory), + httpOverCapnpFactory(httpOverCapnpFactory), channelTokenHandler(channelTokenHandler), dockerNetwork(dockerNetwork), dockerPath(dockerPath), containerEgressInterceptorImage(containerEgressInterceptorImage), waitUntilTasks(waitUntilTasks) {} - void link(kj::Maybe serviceActorStorage) { + class ClusterActorChannel; + + void link(kj::Maybe serviceActorStorage, + kj::Maybe clusterRegistry, + kj::Maybe&> clusterRpc) { KJ_IF_SOME(dir, serviceActorStorage) { KJ_IF_SOME(d, config.tryGet()) { - this->actorStorage.emplace(dir.openSubdir( - kj::Path({d.uniqueKey}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY)); + this->actorStorage.emplace(dir.openSubdir(kj::Path({d.uniqueKey}), + kj::WriteMode::CREATE | kj::WriteMode::MODIFY), + *this); + KJ_IF_SOME(registry, clusterRegistry) { + auto& rpc = KJ_ASSERT_NONNULL(clusterRpc); + this->clusterRegistry = registry; + auto lockDirectory = KJ_ASSERT_NONNULL(this->actorStorage) + .directory->openSubdir(kj::Path({"locks"}), + kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + this->lockManager.emplace(kj::mv(lockDirectory), registry, rpc, timer); + } } } KJ_IF_SOME(d, config.tryGet()) { + if (lockManager != kj::none) { + // TODO(clustering): Support alarms in cluster mode. Until then, we must skip creating + // an AlarmScheduler altogether otherwise multiple instances will fight over the alarm + // database. + return; + } + auto idFactory = kj::heap(d.uniqueKey); AlarmScheduler::GetActorFn getActor = [this, idFactory = kj::mv(idFactory)]( @@ -2308,6 +2340,8 @@ class Server::WorkerService final: public Service, return config; } + // Get the ActorChannel for the given actor ID, routing to the appropriate workerd instance + // if needed. kj::Own getActorChannel(Worker::Actor::Id id) { KJ_IF_SOME(doId, id.tryGet>()) { KJ_IF_SOME(name, doId->getName()) { @@ -2320,7 +2354,43 @@ class Server::WorkerService final: public Service, } } - return kj::refcounted(getActorContainer(kj::mv(id))); + KJ_IF_SOME(ld, lockManager) { + auto key = actorKey(id); + + // First check if the container exists locally. + KJ_IF_SOME(container, actors.find(key)) { + return kj::refcounted(container->addRef()); + } + + auto promise = ld.acquireOrRoute(key); + return kj::refcounted( + promise.then([this, id = kj::mv(id), key = kj::mv(key)]( + kj::OneOf + lockOrClient) mutable -> kj::Own { + KJ_SWITCH_ONEOF(lockOrClient) { + KJ_CASE_ONEOF(ownership, ClusterLockManager::OwnedLock) { + return kj::refcounted( + createActorContainer(kj::mv(key), kj::mv(id), kj::mv(ownership))); + } + KJ_CASE_ONEOF(debugPort, rpc::WorkerdDebugPort::Client) { + auto req = debugPort.getActorRequest(capnp::MessageSize{32, 0}); + req.setService(serviceName); + req.setEntrypoint(className); + req.setActorId(key); + KJ_IF_SOME(i, id.tryGet>()) { + KJ_IF_SOME(name, i->getName()) { + req.setActorName(name); + } + } + return kj::refcounted( + byteStreamFactory, httpOverCapnpFactory, req.send().getActor()); + } + } + KJ_UNREACHABLE; + })); + } else { + return kj::refcounted(getActorContainer(kj::mv(id))); + } } class ActorContainer; @@ -2353,7 +2423,8 @@ class Server::WorkerService final: public Service, ActorNamespace& ns, kj::Maybe parent, kj::OneOf> classAndIdParam, - kj::Timer& timer) + kj::Timer& timer, + kj::Maybe ownershipLock = kj::none) : key(kj::mv(key)), tracker(kj::refcounted(*this)), ns(ns), @@ -2361,7 +2432,8 @@ class Server::WorkerService final: public Service, .orDefault(*this)), parent(parent), timer(timer), - lastAccess(timer.now()) { + lastAccess(timer.now()), + ownershipLock(kj::mv(ownershipLock)) { KJ_SWITCH_ONEOF(classAndIdParam) { KJ_CASE_ONEOF(value, ClassAndId) { // `classAndId` is immediately available. @@ -2623,6 +2695,7 @@ class Server::WorkerService final: public Service, kj::Maybe> shutdownTask; kj::Maybe> onBrokenTask; kj::Maybe brokenReason; + kj::Maybe ownershipLock; // in cluster mode // Reference to the ContainerClient (if container is enabled for this actor) kj::Maybe> containerClient; @@ -2666,6 +2739,7 @@ class Server::WorkerService final: public Service, ns.actorStorage, "can't call getFacetId() when there's no backing storage"); auto indexFile = as.directory->openFile( kj::Path({kj::str(key, ".facets")}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + ns.verifyRegistration(); return *facetTreeIndex.emplace(kj::heap(kj::mv(indexFile))); } } @@ -2816,7 +2890,9 @@ class Server::WorkerService final: public Service, KJ_IF_SOME(as, ns.actorStorage) { kj::Own sqliteHooks; if (parent == kj::none) { - KJ_IF_SOME(a, ns.alarmScheduler) { + if (ns.lockManager != kj::none) { + sqliteHooks = kj::heap(); + } else KJ_IF_SOME(a, ns.alarmScheduler) { sqliteHooks = kj::heap(a, ActorKey{.actorId = key}); } else { // No alarm scheduler available, use default hooks instance. @@ -2834,9 +2910,15 @@ class Server::WorkerService final: public Service, // Before we do anything, make sure the database is in WAL mode. We also need to // do this after reset() is used, so register a callback for that. + if (ns.nfsMode()) { + db->run("PRAGMA locking_mode=EXCLUSIVE;"); + } db->run("PRAGMA journal_mode=WAL;"); db->afterReset([this, &dir = *as.directory, selfId](SqliteDatabase& db) { + if (ns.nfsMode()) { + db.run("PRAGMA locking_mode=EXCLUSIVE;"); + } db.run("PRAGMA journal_mode=WAL;"); // reset() is used when the app called deleteAll(), in which case we also want to @@ -2920,19 +3002,25 @@ class Server::WorkerService final: public Service, } }; - kj::Own getActorContainer(Worker::Actor::Id id) { - kj::String key; - + kj::String actorKey(Worker::Actor::Id& id) { KJ_SWITCH_ONEOF(id) { KJ_CASE_ONEOF(obj, kj::Own) { KJ_REQUIRE(config.is()); - key = obj->toString(); + return obj->toString(); } KJ_CASE_ONEOF(str, kj::String) { KJ_REQUIRE(config.is()); - key = kj::str(str); + return kj::str(str); } } + KJ_UNREACHABLE; + } + + // Get or create an ActorContainer. Use only when NOT in cluster mode. + kj::Own getActorContainer(Worker::Actor::Id id) { + KJ_REQUIRE(lockManager == kj::none); + + kj::String key = actorKey(id); return actors .findOrCreate(key, [&]() mutable { @@ -2944,6 +3032,18 @@ class Server::WorkerService final: public Service, })->addRef(); } + // Create an ActorContainer in cluster mode. + kj::Own createActorContainer( + kj::String key, Worker::Actor::Id id, ClusterLockManager::OwnedLock ownership) { + KJ_ASSERT(actors.find(key) == kj::none); + + auto container = kj::refcounted(kj::str(key), *this, kj::none, + ActorContainer::ClassAndId(kj::addRef(*actorClass), kj::mv(id)), timer, + kj::mv(ownership)); + actors.insert(container->getKey(), container->addRef()); + return container; + } + kj::Own getContainerClient( kj::StringPtr containerId, kj::StringPtr imageName) { KJ_IF_SOME(existingClient, containerClients.find(containerId)) { @@ -3036,6 +3136,8 @@ class Server::WorkerService final: public Service, private: kj::Own actorClass; + kj::StringPtr serviceName; + kj::StringPtr className; const ActorConfig& config; const kj::Clock& clock; @@ -3043,9 +3145,12 @@ class Server::WorkerService final: public Service, kj::Own directory; SqliteDatabase::Vfs vfs; - ActorStorage(kj::Own directoryParam) + ActorStorage(kj::Own directoryParam, ActorNamespace& ns) : directory(kj::mv(directoryParam)), - vfs(*directory) {} + vfs(*directory, + SqliteDatabase::VfsOptions{ + .afterOpen = kj::Function(KJ_BIND_METHOD(ns, verifyRegistration)), + }) {} }; // Note: The Vfs, actorStorage, and ownAlarmScheduler must not be torn down until all actors @@ -3053,6 +3158,9 @@ class Server::WorkerService final: public Service, kj::Maybe actorStorage; kj::Maybe> ownAlarmScheduler; + kj::Maybe clusterRegistry; + kj::Maybe lockManager; + // Tracks the canceler and cleanup promise for a Docker container's lifecycle cleanup. // Useful to await on async calls of a ContainerClient destructor when the new // one appears before they've been resolved. @@ -3088,6 +3196,7 @@ class Server::WorkerService final: public Service, kj::Maybe> cleanupTask; kj::Timer& timer; capnp::ByteStreamFactory& byteStreamFactory; + capnp::HttpOverCapnpFactory& httpOverCapnpFactory; ChannelTokenHandler& channelTokenHandler; kj::Network& dockerNetwork; kj::Maybe dockerPath; @@ -3131,6 +3240,23 @@ class Server::WorkerService final: public Service, } } + bool nfsMode() { + KJ_IF_SOME(r, clusterRegistry) { + return r.nfsMode(); + } else { + return false; + } + } + + // Callback called immediately after any actor storage file is opened to verify that the + // registry lock is still held, and therefore the new open must be on the same NFS lease as + // the registry lock. + void verifyRegistration() { + KJ_IF_SOME(r, clusterRegistry) { + r.verifyNfsLease(); + } + } + // Implements actor loopback, which is used by websocket hibernation to deliver events to the // actor from the websocket's read loop. class Loopback: public Worker::Actor::Loopback, public kj::Refcounted { @@ -3170,6 +3296,43 @@ class Server::WorkerService final: public Service, AlarmScheduler& alarmScheduler; ActorKey actor; }; + + class ClusterActorSqliteHooks final: public ActorSqlite::Hooks { + public: + kj::Promise scheduleRun( + kj::Maybe newAlarmTime, kj::Promise priorTask) override { + // TODO(clustering): Support alarms. + JSG_FAIL_REQUIRE(Error, "Durable Object alarms are not yet supported in cluster mode"); + } + }; + + // ActorChannel implementation wrapping a `WorkerdBootstrap` RPC stub. + class RpcActorChannel final: public IoChannelFactory::ActorChannel { + public: + RpcActorChannel(capnp::ByteStreamFactory& byteStreamFactory, + capnp::HttpOverCapnpFactory& httpOverCapnpFactory, + rpc::WorkerdBootstrap::Client client) + : byteStreamFactory(byteStreamFactory), + httpOverCapnpFactory(httpOverCapnpFactory), + client(kj::mv(client)) {} + + kj::Own startRequest( + IoChannelFactory::SubrequestMetadata metadata) override { + capnp::MessageSize sizeHint{4, 0}; + KJ_IF_SOME(cf, metadata.cfBlobJson) { + sizeHint.wordCount += cf.size() / sizeof(capnp::word); + } + + auto dispatcher = client.startEventRequest(sizeHint).sendForPipeline().getDispatcher(); + return kj::heap( + httpOverCapnpFactory, byteStreamFactory, kj::mv(dispatcher)); + } + + private: + capnp::ByteStreamFactory& byteStreamFactory; + capnp::HttpOverCapnpFactory& httpOverCapnpFactory; + rpc::WorkerdBootstrap::Client client; + }; }; private: @@ -3351,6 +3514,33 @@ class Server::WorkerService final: public Service, kj::Own actorContainer; }; + // TODO(clustering): Replace this with the promised channels coming in the async channel token + // change, once it has landed. + class PromisedActorChannel final: public IoChannelFactory::ActorChannel { + public: + PromisedActorChannel(kj::Promise> promise) + : promise(promise + .then([this](kj::Own inner) { + this->inner = kj::mv(inner); + }) + .fork()) {} + + kj::Own startRequest(IoChannelFactory::SubrequestMetadata metadata) override { + KJ_IF_SOME(i, inner) { + return i->startRequest(kj::mv(metadata)); + } else { + return newPromisedWorkerInterface(promise.addBranch().then( + [self = addRefToThis(), metadata = kj::mv(metadata)]() mutable { + return KJ_ASSERT_NONNULL(self->inner)->startRequest(kj::mv(metadata)); + })); + } + } + + private: + kj::Maybe> inner; + kj::ForkedPromise promise; + }; + // --------------------------------------------------------------------------- // implements kj::TaskSet::ErrorHandler @@ -4869,6 +5059,10 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr auto linkCallback = [this, def = kj::mv(def), totalActorChannels](WorkerService& workerService, Worker::ValidationErrorReporter& errorReporter) mutable { WorkerService::LinkedIoChannels result; + KJ_IF_SOME(registry, this->clusterRegistry) { + result.clusterRegistry = *registry; + result.clusterRpc = KJ_ASSERT_NONNULL(this->clusterRpc); + } auto entrypointNames = workerService.getEntrypointNames(); auto actorClassNames = workerService.getActorClassNames(); @@ -5624,6 +5818,15 @@ class Server::DebugPortListener { co_return co_await server.listen(*listener); } + // Expose WorkerdDebugPortImpl for use by the cluster RPC system. + // + // TODO(cleanup): Maybe move WorkerDebugPortImpl up into Server. However, note the clustering + // code might also shift to a different interface. + static rpc::WorkerdDebugPort::Client makeBootstrap( + Server& owner, capnp::HttpOverCapnpFactory& httpOverCapnpFactory) { + return kj::heap(&owner, httpOverCapnpFactory); + } + private: Server& owner; kj::Own listener; @@ -5765,6 +5968,12 @@ kj::Promise Server::handleDrain(kj::Promise drainWhen) { // doc comment, we instead add the promise to `tasks` to be safe. tasks.add(httpServer.httpServer.drain()); } + + // TODO(clustering): This is a harsh ending for actors on this machine. We should really evict + // each one at a time that is convenient, and wait some time for RPC requests to drain. + // Three-party handoff would help with straggler requests that we forward to the new owner. + clusterRpc = kj::none; + abortAllActors(kj::none); } kj::Promise Server::run( @@ -5953,7 +6162,22 @@ kj::Promise Server::startServices(jsg::V8System& v8System, } goto validDurableObjectStorage; case config::Worker::DurableObjectStorage::IN_MEMORY: + if (config.hasCluster() && hadDurable) { + reportConfigError(kj::str("Worker service \"", name, + "\" uses in-memory Durable Object storage, which is not supported in cluster " + "mode.")); + } + goto validDurableObjectStorage; case config::Worker::DurableObjectStorage::LOCAL_DISK: + if (config.hasCluster() && hadDurable && + workerConf.getDurableObjectStorage().getLocalDisk() != + config.getCluster().getSharedDirectory()) { + reportConfigError( + kj::str("Worker service \"", name, "\" uses Durable Object storage disk service \"", + workerConf.getDurableObjectStorage().getLocalDisk(), + "\", but cluster.sharedDirectory is \"", + config.getCluster().getSharedDirectory(), "\".")); + } goto validDurableObjectStorage; } reportConfigError(kj::str("Encountered unknown durableObjectStorage type in service \"", name, @@ -6016,6 +6240,48 @@ kj::Promise Server::startServices(jsg::V8System& v8System, return decltype(services)::Entry{kj::str("internet"_kj), kj::mv(service)}; }); + if (config.hasCluster()) { + if (!experimental) { + reportConfigError( + "Cluster mode is experimental and subject to change. You must run workerd with " + "`--experimental` to use it."_kj.clone()); + } +#ifndef __linux__ + reportConfigError("Cluster mode is currently implemented only on Linux."_kj.clone()); +#endif + + auto cluster = config.getCluster(); + KJ_IF_SOME(sharedDirService, services.find(cluster.getSharedDirectory())) { + auto diskSvc = dynamic_cast(sharedDirService.get()); + if (diskSvc == nullptr) { + reportConfigError(kj::str("cluster.sharedDirectory refers to service \"", + cluster.getSharedDirectory(), "\", but that service is not a disk directory.")); + } else KJ_IF_SOME(dir, diskSvc->getWritable()) { + auto registryDir = dir.openSubdir( + kj::Path({cluster.getRegistrySubdir()}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + auto registry = + kj::heap(kj::mv(registryDir), cluster.getNetwork(), network, timer); + tasks.add(registry->runMaintenance().exclusiveJoin(forkedDrainWhen.addBranch())); + auto& reg = *clusterRegistry.emplace(kj::mv(registry)); + clusterRpc.emplace(capnp::makeRpcServer( + reg, DebugPortListener::makeBootstrap(*this, globalContext->httpOverCapnpFactory))); + + // TODO(clustering): We really shouldn't start accepting connections until the server + // is totally up, but the RPC system actually starts accepting immediately on + // construction, even if you don't call run(). + KJ_IF_SOME(rpc, clusterRpc) { + tasks.add(rpc.run().exclusiveJoin(forkedDrainWhen.addBranch())); + } + } else { + reportConfigError(kj::str("cluster.sharedDirectory refers to disk service \"", + cluster.getSharedDirectory(), "\", but that service is defined read-only.")); + } + } else { + reportConfigError(kj::str("cluster.sharedDirectory refers to service \"", + cluster.getSharedDirectory(), "\", but no such service is defined.")); + } + } + // Third pass: Cross-link services. for (auto& service: services) { ConfigErrorReporter errorReporter(*this, service.key); diff --git a/src/workerd/server/server.h b/src/workerd/server/server.h index 6e336936c81..d0111effe41 100644 --- a/src/workerd/server/server.h +++ b/src/workerd/server/server.h @@ -9,8 +9,10 @@ #include #include #include +#include #include +#include #include #include #include @@ -158,6 +160,9 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl kj::Maybe channelTokenHandler; + kj::Maybe> clusterRegistry; + kj::Maybe> clusterRpc; + kj::HashMap>> socketOverrides; kj::HashMap directoryOverrides; From f7847b45d12561b65cdd3b8c3cea1ecf3b2133f2 Mon Sep 17 00:00:00 2001 From: Kenton Varda Date: Sat, 23 May 2026 18:10:41 -0500 Subject: [PATCH 9/9] Add integration tests for clustering mode. This test suite was written entirely by Claude Opus 4.7. --- src/workerd/server/tests/cluster/BUILD.bazel | 45 ++ .../tests/cluster/cluster-test-cidr.wd-test | 49 ++ .../server/tests/cluster/cluster-test.mjs | 465 ++++++++++++++++++ .../server/tests/cluster/cluster-test.wd-test | 60 +++ .../server/tests/cluster/cluster-worker.js | 86 ++++ 5 files changed, 705 insertions(+) create mode 100644 src/workerd/server/tests/cluster/BUILD.bazel create mode 100644 src/workerd/server/tests/cluster/cluster-test-cidr.wd-test create mode 100644 src/workerd/server/tests/cluster/cluster-test.mjs create mode 100644 src/workerd/server/tests/cluster/cluster-test.wd-test create mode 100644 src/workerd/server/tests/cluster/cluster-worker.js diff --git a/src/workerd/server/tests/cluster/BUILD.bazel b/src/workerd/server/tests/cluster/BUILD.bazel new file mode 100644 index 00000000000..ae7ad386f7f --- /dev/null +++ b/src/workerd/server/tests/cluster/BUILD.bazel @@ -0,0 +1,45 @@ +load("@aspect_rules_js//js:defs.bzl", "js_test") + +js_test( + name = "cluster-test-unix", + size = "medium", + data = [ + ":cluster-test.wd-test", + ":cluster-worker.js", + "//src/workerd/server:workerd", + ], + entry_point = "cluster-test.mjs", + env = { + "WORKERD_BINARY": "$(rootpath //src/workerd/server:workerd)", + "WD_TEST_CONFIG": "$(rootpath :cluster-test.wd-test)", + }, + tags = ["js-test"], + target_compatible_with = select({ + "@platforms//os:linux": [], + "//conditions:default": ["@platforms//:incompatible"], + }), +) + +js_test( + name = "cluster-test-cidr", + # The CIDR variant exercises the dead-peer detection path, which has to + # wait out the new-entry grace period before unlinking the dead peer's + # registry file. That makes the takeover test ~15s on its own, so the + # combined run needs more than the default "medium" budget. + size = "large", + data = [ + ":cluster-test-cidr.wd-test", + ":cluster-worker.js", + "//src/workerd/server:workerd", + ], + entry_point = "cluster-test.mjs", + env = { + "WORKERD_BINARY": "$(rootpath //src/workerd/server:workerd)", + "WD_TEST_CONFIG": "$(rootpath :cluster-test-cidr.wd-test)", + }, + tags = ["js-test"], + target_compatible_with = select({ + "@platforms//os:linux": [], + "//conditions:default": ["@platforms//:incompatible"], + }), +) diff --git a/src/workerd/server/tests/cluster/cluster-test-cidr.wd-test b/src/workerd/server/tests/cluster/cluster-test-cidr.wd-test new file mode 100644 index 00000000000..3bebcbb6634 --- /dev/null +++ b/src/workerd/server/tests/cluster/cluster-test-cidr.wd-test @@ -0,0 +1,49 @@ +# Copyright (c) 2026 Cloudflare, Inc. +# Licensed under the Apache 2.0 license found in the LICENSE file or at: +# https://opensource.org/licenses/Apache-2.0 + +# CIDR/IP variant of cluster-test.wd-test. Uses 127.0.0.0/8 (the loopback) so +# the registry-file locking, TCP peer connections, new-entry grace period, and +# dead-peer lock probes are exercised independently of the unix-socket variant. + +using Workerd = import "/workerd/workerd.capnp"; + +const config :Workerd.Config = ( + services = [ + ( name = "shared", + disk = ( + writable = true, + allowDotfiles = true, + ) + ), + + ( name = "main", + worker = ( + modules = [ + ( name = "cluster-worker.js", esModule = embed "cluster-worker.js" ), + ], + compatibilityDate = "2024-09-01", + compatibilityFlags = ["nodejs_compat"], + bindings = [ + ( name = "COUNTER", durableObjectNamespace = "Counter" ), + ( name = "NODE_ID", fromEnvironment = "NODE_ID" ), + ], + durableObjectNamespaces = [ + ( className = "Counter", uniqueKey = "counter-test-namespace" ), + ], + durableObjectStorage = ( localDisk = "shared" ), + ) + ), + ], + + sockets = [ + ( name = "http", address = "*:0", http = (), service = "main" ), + ], + + cluster = ( + sharedDirectory = "shared", + registrySubdir = "workerd-registry", + network = "127.0.0.0/8", + key = "test-cluster-shared-secret", + ), +); diff --git a/src/workerd/server/tests/cluster/cluster-test.mjs b/src/workerd/server/tests/cluster/cluster-test.mjs new file mode 100644 index 00000000000..3203dc34812 --- /dev/null +++ b/src/workerd/server/tests/cluster/cluster-test.mjs @@ -0,0 +1,465 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 +// +// Integration test for the cluster mode described in scalable-durable-objects.md. +// +// The test spawns several workerd instances pointing at the same shared +// directory, then exercises: +// 1. Consistent DO state across instances (single-writer correctness). +// 2. Transparent forwarding when a request arrives at the "wrong" instance. +// 3. Takeover after a node is killed. +// 4. Alarms are rejected with a clear error in cluster mode. +// 5. No `metadata.sqlite` is ever created (no AlarmScheduler in cluster mode). +// +// Two variants are run: the unix-socket cluster network mode (default) and a +// localhost CIDR (`127.0.0.0/8`) IP-socket mode that exercises the registry +// file locking and TCP peer connections separately. The variant is selected +// by the WD_TEST_CONFIG environment variable supplied by the BUILD target. + +import { spawn } from 'node:child_process'; +import { mkdtemp, readdir, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { env } from 'node:process'; +import { test } from 'node:test'; +import assert from 'node:assert'; +import { setTimeout as sleep } from 'node:timers/promises'; + +assert( + env.WORKERD_BINARY !== undefined, + 'You must set the WORKERD_BINARY environment variable.' +); +assert( + env.WD_TEST_CONFIG !== undefined, + 'You must set the WD_TEST_CONFIG environment variable.' +); + +const CONTROL_FD = 3; + +// A minimal harness specialized for the cluster test. Unlike server-harness.mjs +// it: +// - allows overriding bindings via process environment (NODE_ID), +// - permits multiple instances to share a directory, +// - supports SIGKILL in addition to SIGTERM for the takeover test. +class ClusterNode { + #binary; + #config; + #sharedPath; + #nodeId; + #child = null; + #httpPort = null; + #closed = null; + #stderrBuffer = ''; + + constructor({ binary, config, sharedPath, nodeId }) { + this.#binary = binary; + this.#config = config; + this.#sharedPath = sharedPath; + this.#nodeId = nodeId; + } + + get nodeId() { + return this.#nodeId; + } + + get httpPort() { + assert(this.#httpPort !== null, 'node not started'); + return this.#httpPort; + } + + // Returns the contents of the node's stderr observed so far. Useful for + // verifying that internal error messages (e.g. the cluster alarm rejection) + // were emitted. + get stderr() { + return this.#stderrBuffer; + } + + async start() { + assert.strictEqual(this.#child, null); + + const args = [ + 'serve', + this.#config, + '--experimental', + '--verbose', + `--control-fd=${CONTROL_FD}`, + `--directory-path=shared=${this.#sharedPath}`, + `--socket-addr=http=127.0.0.1:0`, + ]; + + const child = spawn(this.#binary, args, { + stdio: ['ignore', 'inherit', 'pipe', 'pipe'], + env: { ...env, NODE_ID: this.#nodeId }, + }); + this.#child = child; + + child.stderr.on('data', (data) => { + const chunk = data.toString('utf8'); + this.#stderrBuffer += chunk; + // Mirror to our own stderr so test output remains useful. + process.stderr.write(`[${this.#nodeId}] ${chunk}`); + }); + + // Watch the control fd for the assigned http port. + const portPromise = new Promise((resolve, reject) => { + let buffer = ''; + const onData = (data) => { + buffer += data.toString('utf8'); + let nl; + while ((nl = buffer.indexOf('\n')) !== -1) { + const line = buffer.slice(0, nl).trim(); + buffer = buffer.slice(nl + 1); + if (!line) continue; + try { + const parsed = JSON.parse(line); + if (parsed.event === 'listen' && parsed.socket === 'http') { + child.stdio[CONTROL_FD].off('data', onData); + resolve(parsed.port); + return; + } + } catch (err) { + reject( + new Error( + `Failed to parse control message from node ${this.#nodeId}: ${line}` + ) + ); + return; + } + } + }; + child.stdio[CONTROL_FD].on('data', onData); + child.once('error', reject); + child.once('exit', (code, signal) => { + if (this.#httpPort === null) { + reject( + new Error( + `Node ${this.#nodeId} exited before listening (code=${code}, signal=${signal})` + ) + ); + } + }); + }); + + this.#closed = new Promise((resolve) => { + child.once('exit', (code, signal) => resolve({ code, signal })); + }); + + await new Promise((resolve, reject) => { + child.once('spawn', resolve).once('error', reject); + }); + + this.#httpPort = await portPromise; + } + + async stop({ signal = 'SIGTERM', timeoutMs = 10_000 } = {}) { + if (this.#child === null) return null; + const child = this.#child; + this.#child = null; + child.kill(signal); + + const killTimer = setTimeout(() => { + try { + child.kill('SIGKILL'); + } catch (_) { + // process may already be gone. + } + }, timeoutMs); + + const result = await this.#closed; + clearTimeout(killTimer); + this.#httpPort = null; + return result; + } +} + +async function fetchJson(port, path, init) { + const url = `http://127.0.0.1:${port}${path}`; + const res = await fetch(url, init); + const text = await res.text(); + let body; + try { + body = JSON.parse(text); + } catch (err) { + throw new Error( + `Non-JSON response from ${url} (status ${res.status}): ${text}` + ); + } + return { status: res.status, body }; +} + +async function withCluster(numNodes, fn) { + // Each test run gets its own shared directory, so the registry/lock-file + // state doesn't leak between tests. + const sharedPath = await mkdtemp(join(tmpdir(), 'workerd-cluster-')); + const nodes = []; + try { + for (let i = 0; i < numNodes; i++) { + const node = new ClusterNode({ + binary: env.WORKERD_BINARY, + config: env.WD_TEST_CONFIG, + sharedPath, + nodeId: `node${i}`, + }); + await node.start(); + nodes.push(node); + } + await fn({ nodes, sharedPath }); + } finally { + // Stop all nodes (ignore errors -- some may already be stopped by the + // test itself). + for (const node of nodes) { + try { + await node.stop({ timeoutMs: 5000 }); + } catch (_) { + // ignore + } + } + await rm(sharedPath, { recursive: true, force: true }); + } +} + +// --------------------------------------------------------------------------- +// Tests + +test('cluster: DO state is consistent across instances (single writer)', async () => { + await withCluster(3, async ({ nodes }) => { + // Send a series of /increment requests to a round-robin of nodes, all for + // the same DO name. Track the maximum returned count -- it must be a + // strictly-increasing sequence (1, 2, 3, ...) because only one instance is + // the writer at any given time and the DO state is persistent. + const totalRequests = 12; + const expected = []; + for (let i = 1; i <= totalRequests; i++) expected.push(i); + + const observed = []; + const ownerIds = new Set(); + for (let i = 0; i < totalRequests; i++) { + const node = nodes[i % nodes.length]; + const { status, body } = await fetchJson( + node.httpPort, + '/increment?name=consistency' + ); + assert.strictEqual( + status, + 200, + `request to ${node.nodeId} failed: ${JSON.stringify(body)}` + ); + observed.push(body.count); + ownerIds.add(body.nodeId); + } + observed.sort((a, b) => a - b); + assert.deepStrictEqual( + observed, + expected, + `each /increment must be a unique strictly-increasing count` + ); + + // The DO is owned by exactly one node at a time. Even though we directed + // requests at all 3 nodes, every observed response should report the same + // nodeId -- the actual owner -- because incoming requests are forwarded. + assert.strictEqual( + ownerIds.size, + 1, + `expected exactly one DO owner across ${totalRequests} requests, got: ${[...ownerIds].join(', ')}` + ); + }); +}); + +test('cluster: requests to non-owner are transparently forwarded', async () => { + await withCluster(2, async ({ nodes }) => { + // Prime the DO by sending an increment to node 0. That node should claim + // ownership and respond. Then send a /get to node 1. Node 1 should forward + // to node 0 (the owner) and return the same state, and node 0 should be + // identified as the responder. + const first = await fetchJson( + nodes[0].httpPort, + '/increment?name=forwarding' + ); + assert.strictEqual(first.status, 200); + assert.strictEqual(first.body.count, 1); + const owner = first.body.nodeId; + + const second = await fetchJson(nodes[1].httpPort, '/get?name=forwarding'); + assert.strictEqual(second.status, 200); + assert.strictEqual( + second.body.count, + 1, + `forwarded /get must see prior increment` + ); + assert.strictEqual( + second.body.nodeId, + owner, + `forwarded request must be served by the original owner (${owner}), not the forwarding node (${nodes[1].nodeId})` + ); + + // Same id must be reported for both calls (sanity check for idFromName + // determinism across instances). + assert.strictEqual(first.body.id, second.body.id); + }); +}); + +test('cluster: killing the owner allows another node to take over', async () => { + await withCluster(2, async ({ nodes }) => { + // Prime the DO on whichever node ends up owning it. + const initial = await fetchJson( + nodes[0].httpPort, + '/increment?name=takeover' + ); + assert.strictEqual(initial.status, 200); + assert.strictEqual(initial.body.count, 1); + const originalOwner = initial.body.nodeId; + const ownerIndex = nodes.findIndex((n) => n.nodeId === originalOwner); + assert.notStrictEqual(ownerIndex, -1); + const survivorIndex = ownerIndex === 0 ? 1 : 0; + const survivor = nodes[survivorIndex]; + + // Hard-kill the owner. SIGKILL leaves the registry file in place, so the + // survivor has to detect death via a failed RPC + dead-peer cleanup probe. + await nodes[ownerIndex].stop({ signal: 'SIGKILL', timeoutMs: 2000 }); + + // Hit the survivor repeatedly. The first request may fail or take time + // while the survivor's RPC attempt to the dead owner is in flight; once + // the dead-peer cleanup completes, subsequent requests succeed and the + // survivor takes ownership. The persisted count must be preserved. + let response = null; + let lastErr = null; + const deadline = Date.now() + 30_000; + while (Date.now() < deadline) { + try { + response = await fetchJson( + survivor.httpPort, + '/increment?name=takeover' + ); + if (response.status === 200) break; + } catch (err) { + lastErr = err; + } + await sleep(200); + } + assert( + response !== null && response.status === 200, + `survivor never succeeded; last error: ${lastErr}` + ); + assert.strictEqual( + response.body.nodeId, + survivor.nodeId, + `survivor (${survivor.nodeId}) must own the DO after takeover, got ${response.body.nodeId}` + ); + assert.strictEqual( + response.body.count, + 2, + `persisted state must be preserved across takeover` + ); + + // Further requests to the survivor continue to work and increment. + const followup = await fetchJson( + survivor.httpPort, + '/increment?name=takeover' + ); + assert.strictEqual(followup.status, 200); + assert.strictEqual(followup.body.count, 3); + assert.strictEqual(followup.body.nodeId, survivor.nodeId); + }); +}); + +test('cluster: alarms are rejected with a clear error', async () => { + await withCluster(1, async ({ nodes, sharedPath }) => { + // Fire the request. In cluster mode `setAlarm()` does not throw + // synchronously to the JS code -- the rejection happens during output-gate + // flush, after the worker's fetch handler has already returned. The + // observable behaviour is therefore a non-2xx HTTP status, while the + // canonical error message is logged to stderr where the test can detect + // it. (The spec mandates the message "Durable Object alarms are not yet + // supported in cluster mode".) + let response; + try { + response = await fetch( + `http://127.0.0.1:${nodes[0].httpPort}/set-alarm?name=alarm-test` + ); + } catch (err) { + // A network-level failure is acceptable too: it confirms that the alarm + // path did not silently succeed. + response = null; + } + if (response !== null) { + // Drain the body to allow logging to flush. + try { + await response.text(); + } catch (_) { + // ignore + } + assert.notStrictEqual( + Math.floor(response.status / 100), + 2, + `expected non-2xx for set-alarm in cluster mode, got ${response.status}` + ); + } + + // Wait briefly for stderr to flush the error message. + const deadline = Date.now() + 5000; + const expectedMessage = + 'Durable Object alarms are not yet supported in cluster mode'; + while ( + Date.now() < deadline && + !nodes[0].stderr.includes(expectedMessage) + ) { + await sleep(50); + } + assert( + nodes[0].stderr.includes(expectedMessage), + `expected stderr to contain the cluster alarm rejection message ` + + `("${expectedMessage}"); stderr so far:\n${nodes[0].stderr}` + ); + + // No per-namespace metadata.sqlite should exist anywhere under the + // shared directory -- in cluster mode the AlarmScheduler is never + // constructed. + const nsDir = join(sharedPath, 'counter-test-namespace'); + let entries = []; + try { + entries = await readdir(nsDir); + } catch (err) { + // Namespace dir might not exist if the DO was never created on disk + // because the request failed before storage was opened. That is a + // valid (and even stronger) signal that no metadata.sqlite was made. + } + assert( + !entries.some((name) => name.startsWith('metadata.sqlite')), + `cluster mode must not create metadata.sqlite; found: ${entries.join(', ')}` + ); + }); +}); + +test('cluster: registry directory is populated for each running instance', async () => { + await withCluster(2, async ({ nodes, sharedPath }) => { + // Give the instances a brief moment to settle. Both should have written + // their registry entries before they reported their listening sockets, + // but allow a small grace period for filesystem visibility. + const registryDir = join(sharedPath, 'workerd-registry'); + let entries = []; + const deadline = Date.now() + 5000; + while (Date.now() < deadline) { + try { + entries = await readdir(registryDir); + } catch (_) { + entries = []; + } + if (entries.length >= nodes.length) break; + await sleep(100); + } + assert.strictEqual( + entries.length, + nodes.length, + `expected ${nodes.length} registry entries, got ${entries.length}: [${entries.join(', ')}]` + ); + // Each entry should be a 64-char hex public key. + for (const name of entries) { + assert.match( + name, + /^[0-9a-f]{64}$/, + `registry entry name should be 64-char hex, got: ${name}` + ); + } + }); +}); diff --git a/src/workerd/server/tests/cluster/cluster-test.wd-test b/src/workerd/server/tests/cluster/cluster-test.wd-test new file mode 100644 index 00000000000..6c6d53b2d7e --- /dev/null +++ b/src/workerd/server/tests/cluster/cluster-test.wd-test @@ -0,0 +1,60 @@ +# Copyright (c) 2026 Cloudflare, Inc. +# Licensed under the Apache 2.0 license found in the LICENSE file or at: +# https://opensource.org/licenses/Apache-2.0 + +# Config for the cluster integration test. Designed to be reused across multiple +# workerd instances pointing at the same shared directory: +# - The `shared` disk service has no path; the test driver overrides it via +# `--directory-path shared=` so all instances see the same directory. +# - The `http` socket has address `*:0`; the test driver overrides this and +# reads the actual assigned port from the control fd. +# - The cluster `network` field defaults to "unix"; the test driver overrides +# it to "127.0.0.0/8" when testing the CIDR/IP variant by passing a different +# config (see cluster-test-cidr.wd-test). +# - The NODE_ID env binding lets each instance be identified per request, so +# the test can verify which workerd owns/serves a given DO. + +using Workerd = import "/workerd/workerd.capnp"; + +const config :Workerd.Config = ( + services = [ + # The shared filesystem used by the whole cluster. Path is supplied via + # `--directory-path shared=` on the command line so all instances + # share the same directory. + ( name = "shared", + disk = ( + writable = true, + allowDotfiles = true, + ) + ), + + ( name = "main", + worker = ( + modules = [ + ( name = "cluster-worker.js", esModule = embed "cluster-worker.js" ), + ], + compatibilityDate = "2024-09-01", + compatibilityFlags = ["nodejs_compat"], + bindings = [ + ( name = "COUNTER", durableObjectNamespace = "Counter" ), + ( name = "NODE_ID", fromEnvironment = "NODE_ID" ), + ], + durableObjectNamespaces = [ + ( className = "Counter", uniqueKey = "counter-test-namespace" ), + ], + durableObjectStorage = ( localDisk = "shared" ), + ) + ), + ], + + sockets = [ + ( name = "http", address = "*:0", http = (), service = "main" ), + ], + + cluster = ( + sharedDirectory = "shared", + registrySubdir = "workerd-registry", + network = "unix", + key = "test-cluster-shared-secret", + ), +); diff --git a/src/workerd/server/tests/cluster/cluster-worker.js b/src/workerd/server/tests/cluster/cluster-worker.js new file mode 100644 index 00000000000..dbd107fee3f --- /dev/null +++ b/src/workerd/server/tests/cluster/cluster-worker.js @@ -0,0 +1,86 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +// Test worker for the cluster integration test. Exposes a Counter Durable Object +// and a top-level fetch() that routes requests to the DO by name. +// +// The DO supports the following request paths: +// /increment Increments and returns the current counter value. +// /get Returns the current counter value without incrementing it. +// /set-alarm Tries to schedule a DO alarm. In cluster mode this should +// fail with a "not yet supported" error. +// /identity Returns information identifying this instance (NODE_ID env) +// along with the DO id, so the test can verify which instance +// served the request. + +export default { + async fetch(request, env) { + const url = new URL(request.url); + + // The DO name to act on is given via the `name` query parameter (defaults + // to "default"). All instances using the same name will route to the same + // DO id. + const name = url.searchParams.get('name') ?? 'default'; + + // Synthesize a stub URL for the DO. + const id = env.COUNTER.idFromName(name); + const stub = env.COUNTER.get(id); + + // Forward to the DO with the same pathname. + const forwardUrl = new URL(url.pathname, 'http://do/'); + return await stub.fetch(forwardUrl.toString(), { + method: request.method, + headers: request.headers, + }); + }, +}; + +export class Counter { + constructor(state, env) { + this.state = state; + this.env = env; + } + + async fetch(request) { + const url = new URL(request.url); + const nodeId = this.env.NODE_ID ?? ''; + const idHex = this.state.id.toString(); + + if (url.pathname === '/increment') { + let count = (await this.state.storage.get('count')) ?? 0; + count += 1; + await this.state.storage.put('count', count); + return Response.json({ + count, + nodeId, + id: idHex, + }); + } else if (url.pathname === '/get') { + const count = (await this.state.storage.get('count')) ?? 0; + return Response.json({ count, nodeId, id: idHex }); + } else if (url.pathname === '/set-alarm') { + // Try to schedule an alarm 60s in the future. In cluster mode this should + // throw a clear error. + try { + await this.state.storage.setAlarm(Date.now() + 60_000); + return Response.json({ ok: true, nodeId, id: idHex }); + } catch (err) { + return Response.json( + { ok: false, error: String(err), nodeId, id: idHex }, + { status: 500 } + ); + } + } else if (url.pathname === '/identity') { + return Response.json({ nodeId, id: idHex }); + } + + return new Response('Not found', { status: 404 }); + } + + async alarm() { + // Should never be called in cluster mode; if it ever is, surface that via + // a stored marker so the test can observe the failure mode. + await this.state.storage.put('alarm-fired', true); + } +}