diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 4a29ca2e9fe..b0c054c7b0a 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -888,12 +888,27 @@ interface WorkerdDebugPort { # purposes. # # This interface is subject to change. It is intended for use by miniflare. + # + # This is also the same interface that workerd exports on its cluster port, which other workerd + # instances in the same cluster may connect to. See ClusterConfig in workerd.capnp for how to + # configure clustering. The cluster port is only intended to be used by other instances in the + # cluster, and does not implement a regular two-party RPC protocol. `ClusterRegistry` implements + # the `VatNetwork` intended to be used for this. Nobody else should try to connect to it. + # + # TODO(clustering): Once channel tokens have been extended to support actors, consider changing + # the cluster interface with one that takes a channel token. This seems a bit cleaner and safer + # than letting the client specify props and such directly. + # TODO(clustering): Automatically use encryption on non-localhost networks. We have an X25519 + # keypair for each node, no need for certs! getEntrypoint @0 (service :Text, entrypoint :Text, props :Frankenvalue) -> (entrypoint :WorkerdBootstrap); # Get direct access to a stateless entrypoint. - getActor @1 (service :Text, entrypoint :Text, actorId :Text) -> (actor :WorkerdBootstrap); + getActor @1 (service :Text, entrypoint :Text, actorId :Text, actorName :Text) + -> (actor :WorkerdBootstrap); # Get an actor (Durable Object) stub. # The actorId should be a hex string for Durable Objects or a plain string for ephemeral actors. + # `actorName` may optionally be specified for Durable Objects, if `actorId` is derived from the + # name. } diff --git a/src/workerd/server/BUILD.bazel b/src/workerd/server/BUILD.bazel index ece98eee3c0..f11b2ad1a39 100644 --- a/src/workerd/server/BUILD.bazel +++ b/src/workerd/server/BUILD.bazel @@ -212,6 +212,7 @@ wd_cc_library( ":channel-token_capnp", "//src/workerd/io", "//src/workerd/util:entropy", + "@ssl", ], ) @@ -228,6 +229,9 @@ wd_cc_library( ":alarm-scheduler", ":channel-token", ":channel-token_capnp", + ":cluster-lock", + ":cluster-registry", + ":cluster_capnp", ":container-client", ":facet-tree-index", ":fallback-service", @@ -244,11 +248,45 @@ wd_cc_library( "//src/workerd/jsg", "//src/workerd/util:perfetto", "//src/workerd/util:websocket-error-handler", + "@capnp-cpp//src/capnp:capnp-rpc", "@capnp-cpp//src/kj/compat:kj-gzip", "@capnp-cpp//src/kj/compat:kj-tls", ], ) +wd_capnp_library(src = "cluster.capnp") + +wd_cc_library( + name = "cluster-registry", + srcs = ["cluster-registry.c++"], + hdrs = ["cluster-registry.h"], + visibility = ["//visibility:public"], + deps = [ + ":cluster_capnp", + "//src/workerd/util:ofd-lock", + "@capnp-cpp//src/capnp:capnp-rpc", + "@capnp-cpp//src/kj", + "@capnp-cpp//src/kj:kj-async", + "@ssl", + ], +) + +wd_cc_library( + name = "cluster-lock", + srcs = ["cluster-lock.c++"], + hdrs = ["cluster-lock.h"], + visibility = ["//visibility:public"], + deps = [ + ":cluster-registry", + ":cluster_capnp", + "//src/workerd/io:worker-interface_capnp", + "//src/workerd/util:ofd-lock", + "@capnp-cpp//src/capnp:capnp-rpc", + "@capnp-cpp//src/kj", + "@capnp-cpp//src/kj:kj-async", + ], +) + wd_capnp_library(src = "docker-api.capnp") wd_capnp_library(src = "log-schema.capnp") @@ -398,6 +436,28 @@ kj_test( ], ) +kj_test( + src = "cluster-registry-test.c++", + deps = [ + ":cluster-registry", + "@capnp-cpp//src/capnp:capnp-rpc", + "@capnp-cpp//src/kj", + "@capnp-cpp//src/kj:kj-async", + ], +) + +kj_test( + src = "cluster-lock-test.c++", + deps = [ + ":cluster-lock", + ":cluster-registry", + "//src/workerd/io:worker-interface_capnp", + "@capnp-cpp//src/capnp:capnp-rpc", + "@capnp-cpp//src/kj", + "@capnp-cpp//src/kj:kj-async", + ], +) + kj_test( src = "json-logger-test.c++", deps = [ diff --git a/src/workerd/server/channel-token.c++ b/src/workerd/server/channel-token.c++ index da16b81bd5b..ee9fc9f0c92 100644 --- a/src/workerd/server/channel-token.c++ +++ b/src/workerd/server/channel-token.c++ @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -21,9 +22,31 @@ namespace workerd::server { -ChannelTokenHandler::ChannelTokenHandler(Resolver& resolver): resolver(resolver) { - getEntropy(tokenKey); +ChannelTokenHandler::ChannelTokenHandler(Resolver& resolver, kj::Maybe clusterKey) + : resolver(resolver) { + KJ_IF_SOME(key, clusterKey) { + // All nodes in the cluster must derive the same key, based on `clusterKey`. We HMAC the + // `clusterKey` with a randomly-chosen salt to derive the channel token key. + + // clang-format off + static constexpr byte SALT[] = { + 0x9a, 0x24, 0x71, 0x39, 0x1b, 0x85, 0xce, 0x97, + 0x6c, 0xf9, 0x1c, 0xdf, 0x93, 0xc0, 0xc6, 0x36, + 0xb9, 0xd2, 0x09, 0xfe, 0x01, 0xde, 0xb1, 0x9a, + 0xda, 0xd3, 0x8e, 0x76, 0xbb, 0xae, 0xeb, 0x89, + }; + // clang-format on + + uint outLen = 0; + KJ_ASSERT(HMAC(EVP_sha256(), SALT, sizeof(SALT), key.asBytes().begin(), key.size(), tokenKey, + &outLen) != nullptr); + KJ_ASSERT(outLen == sizeof(tokenKey)); + } else { + // Single-instance, generate a new key for every run. + getEntropy(tokenKey); + } + // Construct key ID by hashing the key. SHA256_CTX ctx{}; KJ_ASSERT(SHA256_Init(&ctx)); KJ_ASSERT(SHA256_Update(&ctx, tokenKey, sizeof(tokenKey))); diff --git a/src/workerd/server/channel-token.h b/src/workerd/server/channel-token.h index ef075e3c87a..ceaaf9141ff 100644 --- a/src/workerd/server/channel-token.h +++ b/src/workerd/server/channel-token.h @@ -34,7 +34,7 @@ class ChannelTokenHandler { kj::StringPtr serviceName, kj::Maybe entrypoint, Frankenvalue props) = 0; }; - explicit ChannelTokenHandler(Resolver& resolver); + explicit ChannelTokenHandler(Resolver& resolver, kj::Maybe clusterKey = kj::none); // Helpers to implement `IoChannelFactory::{SubrequestChannel,ActorClassChannel}::getToken()`. kj::Array encodeSubrequestChannelToken(IoChannelFactory::ChannelTokenUsage usage, diff --git a/src/workerd/server/cluster-lock-test.c++ b/src/workerd/server/cluster-lock-test.c++ new file mode 100644 index 00000000000..980a76de145 --- /dev/null +++ b/src/workerd/server/cluster-lock-test.c++ @@ -0,0 +1,323 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "cluster-lock.h" +#include "cluster-registry.h" + +#include + +#include + +#include +#include +#include +#include +#include +#include + +namespace workerd { +namespace { + +#if __linux__ + +// Helper to create a temp directory on disk with a `registry/` and `locks/` subdir, mirroring +// the production layout (the cluster registry directory and the per-namespace locks directory +// are always separate). +class TempDir { + public: + TempDir() { + char tmpl[] = "/tmp/cluster-lock-test-XXXXXX"; + KJ_ASSERT(mkdtemp(tmpl) != nullptr); + pathStr = kj::str(tmpl); + disk = kj::newDiskFilesystem(); + root = disk->getRoot().openSubdir(kj::Path::parse(pathStr.slice(1)), // remove leading / + kj::WriteMode::MODIFY); + locksDir = root->openSubdir(kj::Path({"locks"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + } + + ~TempDir() noexcept(false) { + auto p = kj::Path::parse(pathStr.slice(1)); + disk->getRoot().remove(p); + } + + kj::Own registry() { + return root->openSubdir(kj::Path({"registry"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + } + + kj::Own locks() { + return locksDir->clone(); + } + + // Read the content of a lock file directly (for asserting that the file is in the expected state). + kj::Array readLockFile(kj::StringPtr name) { + auto file = KJ_ASSERT_NONNULL(locksDir->tryOpenFile(kj::Path({name}))); + return file->readAllBytes(); + } + + // Write raw bytes to a lock file (for setting up test scenarios). + void writeLockFile(kj::StringPtr name, kj::ArrayPtr data) { + auto file = locksDir->openFile(kj::Path({name}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + file->writeAll(data); + } + + private: + kj::String pathStr; + kj::Own disk; + kj::Own root; + kj::Own locksDir; +}; + +// Stub WorkerdDebugPort server used as a per-node bootstrap. We don't actually invoke methods on +// it from these tests; we just need a capability to hand to RpcSystem. +class StubDebugPort final: public rpc::WorkerdDebugPort::Server {}; + +KJ_TEST("ClusterLockManager: unowned (empty file) -> acquire succeeds") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc = capnp::makeRpcServer(reg, capnp::Capability::Client(kj::heap())); + + ClusterLockManager lockManager(tmpDir.locks(), reg, rpc, io.provider->getTimer()); + + auto result = lockManager.acquireOrRoute("a1").wait(io.waitScope); + KJ_ASSERT(result.is()); + + // The lock file should now contain our key. + auto content = tmpDir.readLockFile("a1"); + KJ_ASSERT(content.size() == 32); + KJ_EXPECT(memcmp(content.begin(), reg.getPublicKey().bytes, 32) == 0); +} + +KJ_TEST("ClusterLockManager: owned by self -> returns self-bootstrap, no wire traffic") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc = capnp::makeRpcServer(reg, capnp::Capability::Client(kj::heap())); + + ClusterLockManager lockManager(tmpDir.locks(), reg, rpc, io.provider->getTimer()); + + // First acquire to populate the lock file with our own key. + auto ownedResult = lockManager.acquireOrRoute("a2").wait(io.waitScope); + KJ_ASSERT(ownedResult.is()); + + // Now simulate someone else also calling acquireOrRoute on the same actor (i.e. a forwarding + // path). With the OwnedLock still held, the lock file is non-empty and our key is the owner. + // acquireOrRoute should return a bootstrap client (routing to ourselves). + auto routeResult = lockManager.acquireOrRoute("a2").wait(io.waitScope); + KJ_ASSERT(routeResult.is()); +} + +KJ_TEST("ClusterLockManager: stale owner (peer dead) -> claim path runs") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc = capnp::makeRpcServer(reg, capnp::Capability::Client(kj::heap())); + + // Manually create a lock file naming a non-existent peer as owner. Since `deadKey`'s registry + // file doesn't exist in tmpDir, isPeerDead(deadKey) returns true and the claim path should run. + X25519PublicKey deadKey; + memset(deadKey.bytes, 0xAB, sizeof(deadKey.bytes)); + + tmpDir.writeLockFile("a3", kj::arrayPtr(deadKey.bytes, 32)); + + // Confirm isPeerDead returns true for the dead key. + KJ_EXPECT(reg.isPeerDead(deadKey)); + + ClusterLockManager lockManager(tmpDir.locks(), reg, rpc, io.provider->getTimer()); + + // acquireOrRoute should run the claim path and succeed. + auto result = lockManager.acquireOrRoute("a3").wait(io.waitScope); + KJ_ASSERT(result.is()); + + // The lock file should now contain *our* key. + auto content = tmpDir.readLockFile("a3"); + KJ_ASSERT(content.size() == 32); + KJ_EXPECT(memcmp(content.begin(), reg.getPublicKey().bytes, 32) == 0); +} + +KJ_TEST("ClusterLockManager: wrong-sized stale content + writer dead -> claim immediately") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc = capnp::makeRpcServer(reg, capnp::Capability::Client(kj::heap())); + + // Write a lock file of unexpected size (e.g. a crashed writer left a half-written file). + // Nobody holds the lock, so the claim path should succeed. + kj::byte garbage[10]; + memset(garbage, 0xFF, sizeof(garbage)); + tmpDir.writeLockFile("a4", kj::arrayPtr(garbage, sizeof(garbage))); + + ClusterLockManager lockManager(tmpDir.locks(), reg, rpc, io.provider->getTimer()); + + auto result = lockManager.acquireOrRoute("a4").wait(io.waitScope); + KJ_ASSERT(result.is()); + + // The lock file should now contain our key. + auto content = tmpDir.readLockFile("a4"); + KJ_ASSERT(content.size() == 32); + KJ_EXPECT(memcmp(content.begin(), reg.getPublicKey().bytes, 32) == 0); +} + +KJ_TEST("ClusterLockManager: OwnedLock destructor truncates and releases") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc = capnp::makeRpcServer(reg, capnp::Capability::Client(kj::heap())); + + ClusterLockManager lockManager(tmpDir.locks(), reg, rpc, io.provider->getTimer()); + + { + auto result = lockManager.acquireOrRoute("a5").wait(io.waitScope); + KJ_ASSERT(result.is()); + // Lock file should have our key. + auto content = tmpDir.readLockFile("a5"); + KJ_ASSERT(content.size() == 32); + } + + // After OwnedLock destructor, the file should be truncated to 0 bytes (and the OFD lock + // released). + auto content = tmpDir.readLockFile("a5"); + KJ_EXPECT(content.size() == 0); + + // A second acquire should now succeed via the claim path. + auto result2 = lockManager.acquireOrRoute("a5").wait(io.waitScope); + KJ_ASSERT(result2.is()); +} + +KJ_TEST("ClusterLockManager: owned by live peer -> returns bootstrap client") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + // Stand up two registries in the same dir (this creates registry entries for both, so neither + // is "dead"). + ClusterRegistry reg1( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + ClusterRegistry reg2( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc1 = capnp::makeRpcServer(reg1, capnp::Capability::Client(kj::heap())); + + // reg1 owns a ClusterLockManager; we'll set up a lock file naming reg2 as the owner. + // reg2 is alive (its registry file exists in tmpDir), so isPeerDead(reg2) returns false, + // and acquireOrRoute should return a bootstrap client without falling through to claim. + // + // Note: actor IDs aren't hex of 64 chars, so they won't collide with registry filenames. + tmpDir.writeLockFile("a7", kj::arrayPtr(reg2.getPublicKey().bytes, 32)); + + KJ_EXPECT(!reg1.isPeerDead(reg2.getPublicKey())); + + ClusterLockManager lockManager(tmpDir.locks(), reg1, rpc1, io.provider->getTimer()); + + auto result = lockManager.acquireOrRoute("a7").wait(io.waitScope); + KJ_ASSERT(result.is()); + + // The lock file should NOT have been modified (we did not claim). + auto content = tmpDir.readLockFile("a7"); + KJ_ASSERT(content.size() == 32); + KJ_EXPECT(memcmp(content.begin(), reg2.getPublicKey().bytes, 32) == 0); +} + +KJ_TEST("ClusterLockManager: writer alive -> retries until writer releases") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc = capnp::makeRpcServer(reg, capnp::Capability::Client(kj::heap())); + + // Open the lock file from outside the ClusterLockManager and take an exclusive lock. This + // simulates a writer that holds the lock but hasn't yet flushed its key. The file is left + // empty (matching the freshly-truncated state during the writer's claim path). + auto externalFile = + tmpDir.locks()->openFile(kj::Path({"a8"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + int externalFd = KJ_REQUIRE_NONNULL(externalFile->getFd()); + + auto externalLock = KJ_ASSERT_NONNULL(OfdLock::tryLock(externalFd, OfdLock::EXCLUSIVE)); + + ClusterLockManager lockManager(tmpDir.locks(), reg, rpc, io.provider->getTimer()); + + // acquireOrRoute should see an empty file, attempt to lock, fail, then retry with backoff. + auto promise = lockManager.acquireOrRoute("a8"); + + bool completed = false; + auto trackedPromise = promise.then( + [&](kj::OneOf&& r) { + completed = true; + return kj::mv(r); + }); + + // Pump the event loop briefly; the implementation should be stuck in its backoff loop. + io.provider->getTimer().afterDelay(50 * kj::MILLISECONDS).wait(io.waitScope); + KJ_EXPECT(!completed, "acquireOrRoute should still be waiting for the external lock"); + + // Now the "external writer" writes its key (our own, for simplicity) and releases the lock. + // acquireOrRoute, on its next iteration, should see the valid key, recognize the owner as us, + // and return a bootstrap client. + externalFile->write(0, kj::arrayPtr(reg.getPublicKey().bytes, 32)); + { auto _ = kj::mv(externalLock); } + + auto result = trackedPromise.wait(io.waitScope); + KJ_EXPECT(completed); + KJ_ASSERT(result.is()); +} + +KJ_TEST("ClusterLockManager: concurrent acquisitions, exactly one wins") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg( + tmpDir.registry(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto rpc = capnp::makeRpcServer(reg, capnp::Capability::Client(kj::heap())); + + ClusterLockManager lockManager(tmpDir.locks(), reg, rpc, io.provider->getTimer()); + + // Launch several concurrent calls. + constexpr uint N = 5; + kj::Vector>> + promises; + for (uint i = 0; i < N; ++i) { + promises.add(lockManager.acquireOrRoute("a9")); + } + + auto results = kj::joinPromises(promises.releaseAsArray()).wait(io.waitScope); + + uint ownedCount = 0; + uint routedCount = 0; + for (auto& r: results) { + if (r.is()) { + ++ownedCount; + } else { + ++routedCount; + } + } + // Exactly one should have won. + KJ_EXPECT(ownedCount == 1); + KJ_EXPECT(routedCount == N - 1); +} + +#else // #if __linux__ + +KJ_TEST("dummy test: platform not supported") {} + +#endif // #if __linux__, #else + +} // namespace +} // namespace workerd diff --git a/src/workerd/server/cluster-lock.c++ b/src/workerd/server/cluster-lock.c++ new file mode 100644 index 00000000000..c3480e8a95d --- /dev/null +++ b/src/workerd/server/cluster-lock.c++ @@ -0,0 +1,162 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "cluster-lock.h" + +#include +#include +#include + +namespace workerd { + +namespace { + +// The lock file contains exactly one X25519 public key (the owner). No checksum is needed: a +// single write of `sizeof(X25519PublicKey::bytes)` bytes is atomic with respect to concurrent +// readers on Linux (and likewise on NFS, where it's a single RPC to the server). The writer's +// protocol is truncate(0) → write(full key) → sync, so a reader interleaving sees either size +// 0 (treat as unowned, run claim path) or the full key. + +constexpr kj::Duration INITIAL_BACKOFF = 10 * kj::MILLISECONDS; +constexpr kj::Duration MAX_BACKOFF = 1 * kj::SECONDS; + +// Build a `cluster::VatId` Reader for a given public key, backed by an internal `MessageBuilder`. +class VatIdHolder { + public: + explicit VatIdHolder(const X25519PublicKey& key): msg(scratch) { + auto vatId = msg.initRoot(); + vatId.setPublicKey(kj::ArrayPtr(key.bytes)); + } + + cluster::VatId::Reader getReader() { + return msg.getRoot().asReader(); + } + + private: + capnp::word scratch[8]{}; + capnp::MallocMessageBuilder msg; +}; + +// Compute the next backoff delay with ±25% jitter. `attempt` starts at 0. +kj::Duration computeBackoff(uint attempt) { + // Exponential: 10ms, 20ms, 40ms, ..., capped at 1s. + kj::Duration base = INITIAL_BACKOFF; + for (uint i = 0; i < attempt && base < MAX_BACKOFF; ++i) { + base = base * 2; + } + if (base > MAX_BACKOFF) base = MAX_BACKOFF; + + // Jitter: multiply by a random value in [0.75, 1.25]. Use a cheap pseudo-random source. + auto nanos = + (kj::systemPreciseMonotonicClock().now() - kj::origin()) / kj::NANOSECONDS; + uint32_t r = kj::hashCode(nanos); + double jitter = 0.75 + (r / static_cast(0xFFFFFFFFu)) * 0.5; + return base * static_cast(jitter * 1000) / 1000; +} + +} // namespace + +// ======================================================================================= +// ClusterLockManager::OwnedLock + +ClusterLockManager::OwnedLock::~OwnedLock() noexcept(false) { + if (file.get() != nullptr) { + // Truncate the file back to zero so the next claimant sees an empty file. We then drop the + // OFD lock (via OfdLock's destructor) which makes the file available for other nodes to claim. + // + // Best-effort: don't throw from the destructor. + KJ_TRY { + file->truncate(0); + } + KJ_CATCH(e) { + KJ_LOG(WARNING, "truncate(0) on cluster lock file failed during release", e); + } + } +} + +// ======================================================================================= +// ClusterLockManager + +ClusterLockManager::ClusterLockManager(kj::Own dir, + ClusterRegistry& registry, + capnp::RpcSystem& clusterRpc, + kj::Timer& timer) + : dir(kj::mv(dir)), + registry(registry), + clusterRpc(clusterRpc), + timer(timer) {} + +kj::Promise> +ClusterLockManager::acquireOrRoute(kj::StringPtr actorId) { + // Validate that the actor ID is a hex string. This is the only form of actor ID we expect on + // this path (durable actors named by a 64-char hex SHA-256 hash, or by `idFromName` which also + // produces hex). Enforcing hex incidentally guarantees the ID is safe to use as a filename. + for (char c: actorId) { + KJ_REQUIRE((c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F'), + "actor ID is not a hex string", actorId); + } + KJ_REQUIRE(actorId.size() > 0, "actor ID is empty"); + + kj::Path path({actorId}); + + for (uint attempt = 0;; ++attempt) { + if (attempt > 0) { + co_await timer.afterDelay(computeBackoff(attempt - 1)); + } + + // Open or create the lock file. + auto file = dir->openFile(path, kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + + // Read the owner key directly. read() returns: + // 0 → file is empty (freshly created, or previous owner cleanly + // released) + // sizeof(ownerKey.bytes) → full key present + // anything else → unexpected partial state (e.g. writer crashed mid-protocol) + // + // Since the key has a fixed width, we can assume that if we read the entire key, it is the + // complete and valid key. If the key is not done being written, we could get a short read + // (unlikely, since you'd expect a 32-byte read to always be atomic, but it's not guaranteed). + X25519PublicKey ownerKey; + auto n = file->read(0, ownerKey.bytes); + + if (n == sizeof(ownerKey.bytes)) { + if (!registry.isPeerDead(ownerKey)) { + // Trust the lock file. Build a VatId and ask the RpcSystem to bootstrap. + // The owner might be us — clusterRpc.bootstrap() returns the local bootstrap directly + // without going through ClusterRegistry::connect() in that case. + // + // If the owner is actually unreachable, the bootstrap call will fail later. Internally, + // ClusterRegistry::ConnectionImpl::unregister() runs the dead-peer cleanup probe; + // if the peer's registry file is unlocked, the file is unlinked. Whoever retries + // acquireOrRoute() will then see isPeerDead(ownerKey) return true and fall through to + // the claim path. + VatIdHolder holder(ownerKey); + co_return clusterRpc.bootstrap(holder.getReader()).castAs(); + } + + // Owner is confirmed dead. Fall through to the claim path. + } + // else: n == 0 (empty file) or n is some unexpected partial size. In either case, the claim + // path is the right next step: if a live writer is currently producing the file, they + // hold the exclusive lock and our tryLock will fail; if no one holds the lock, we'll + // succeed and overwrite whatever was there. + + // Claim path. Try to acquire an exclusive OFD lock. + int fd = KJ_REQUIRE_NONNULL(file->getFd(), "lock directory must be disk-backed"); + KJ_IF_SOME(lock, OfdLock::tryLock(fd, OfdLock::EXCLUSIVE)) { + // We got the exclusive lock. Clear any stale content and write our identity. + file->truncate(0); + + const auto& myKey = registry.getPublicKey(); + file->write(0, myKey.bytes); + file->sync(); + + co_return OwnedLock(kj::mv(file), kj::mv(lock)); + } + + // tryLock failed — someone else owns it (or is racing us to claim it). Retry. + } +} + +} // namespace workerd diff --git a/src/workerd/server/cluster-lock.h b/src/workerd/server/cluster-lock.h new file mode 100644 index 00000000000..d259968a48e --- /dev/null +++ b/src/workerd/server/cluster-lock.h @@ -0,0 +1,77 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace workerd { + +// Manages a directory of lock files for DO ownership. Each lock file is named after the actor ID +// and contains the owning node's public key (or is empty if unowned). +// +// The ownership protocol: +// 1. Open/create the lock file. +// 2. Read it. If non-empty and CRC32 valid, parse the owner key. +// - If !registry.isPeerDead(ownerKey): route via clusterRpc.bootstrap(vatIdFor(ownerKey)). +// - If registry.isPeerDead(ownerKey): fall through to the claim path. +// 3. Claim path: tryLock(EXCLUSIVE). If granted, ftruncate, write our key, fsync, +// return OwnedLock. If refused, retry with backoff. +class ClusterLockManager { + public: + ClusterLockManager(kj::Own dir, + ClusterRegistry& registry, + capnp::RpcSystem& clusterRpc, + kj::Timer& timer); + + // RAII ownership handle. Holding this object means this node owns the DO. + // Destruction truncates the lock file and releases the exclusive OFD lock, + // making the DO available for other nodes to claim. + class OwnedLock { + public: + ~OwnedLock() noexcept(false); + OwnedLock(OwnedLock&&) = default; + OwnedLock& operator=(OwnedLock&&) = default; + KJ_DISALLOW_COPY(OwnedLock); + + private: + OwnedLock(kj::Own file, OfdLock lock): file(kj::mv(file)), lock(kj::mv(lock)) {} + + kj::Own file; + OfdLock lock; + friend class ClusterLockManager; + }; + + // The core routing operation. Either acquires local ownership or returns a capability for + // reaching the current owner. + // + // Encapsulates the full protocol including retries with backoff, registry liveness + // cross-referencing, and capability resolution (with retry on stale owners). + // + // Returns: + // OwnedLock — this node has newly acquired ownership (caller should start the DO). + // rpc::WorkerdDebugPort::Client — the DO has an existing owner. The caller calls + // getActor() on this to reach the actor. The owner might be this node, in which case + // the RpcSystem returns the local self-bootstrap directly (no wire traffic). The + // caller does not need to distinguish local vs remote. + kj::Promise> acquireOrRoute( + kj::StringPtr actorId); + + private: + kj::Own dir; + ClusterRegistry& registry; + capnp::RpcSystem& clusterRpc; + kj::Timer& timer; +}; + +} // namespace workerd diff --git a/src/workerd/server/cluster-registry-test.c++ b/src/workerd/server/cluster-registry-test.c++ new file mode 100644 index 00000000000..e7b274922d4 --- /dev/null +++ b/src/workerd/server/cluster-registry-test.c++ @@ -0,0 +1,304 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "cluster-registry.h" + +#include + +#if !_WIN32 +#include +#endif + +#include +#include +#include +#include +#include +#include + +namespace workerd { +namespace { + +#if __linux__ + +// A trivial capability server that counts calls. +class CallCountServer final: public capnp::Capability::Server { + public: + int callCount = 0; + + DispatchCallResult dispatchCall(uint64_t interfaceId, + uint16_t methodId, + capnp::CallContext context) override { + ++callCount; + return {kj::READY_NOW, false, true}; + } +}; + +// Helper to create a temp directory on disk. +class TempDir { + public: + TempDir() { + char tmpl[] = "/tmp/cluster-registry-test-XXXXXX"; + KJ_ASSERT(mkdtemp(tmpl) != nullptr); + pathStr = kj::str(tmpl); + disk = kj::newDiskFilesystem(); + dir = disk->getRoot().openSubdir(kj::Path::parse(pathStr.slice(1)), // remove leading / + kj::WriteMode::MODIFY); + } + + ~TempDir() noexcept(false) { + auto p = kj::Path::parse(pathStr.slice(1)); + disk->getRoot().remove(p); + } + + kj::Own get() { + return dir->clone(); + } + + private: + kj::String pathStr; + kj::Own disk; + kj::Own dir; +}; + +// Helper to make a simple RPC call on a capability client. +void makeCall(capnp::Capability::Client& client, kj::WaitScope& ws) { + auto req = client.typelessRequest( + 0, 0, capnp::MessageSize{4, 0}, capnp::Capability::Client::CallHints()); + req.send().wait(ws); +} + +KJ_TEST("X25519PublicKey: hex roundtrip") { + X25519PublicKey key; + memset(key.bytes, 0xAB, sizeof(key.bytes)); + auto hex = key.toHex(); + KJ_EXPECT(hex.size() == 64); + auto decoded = X25519PublicKey::fromHex(hex); + KJ_EXPECT(decoded == key); +} + +KJ_TEST("X25519PublicKey: equality") { + X25519PublicKey a, b; + memset(a.bytes, 1, sizeof(a.bytes)); + memset(b.bytes, 1, sizeof(b.bytes)); + KJ_EXPECT(a == b); + + b.bytes[0] = 2; + KJ_EXPECT(!(a == b)); +} + +KJ_TEST("ClusterRegistry: self is never dead") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + KJ_EXPECT(!reg.isPeerDead(reg.getPublicKey())); +} + +KJ_TEST("ClusterRegistry: self-connect returns none") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + capnp::MallocMessageBuilder msg; + auto vatId = msg.initRoot(); + vatId.setPublicKey(kj::arrayPtr(reg.getPublicKey().bytes, 32)); + + KJ_EXPECT(reg.connect(vatId.asReader()) == kj::none); +} + +KJ_TEST("ClusterRegistry: self-bootstrap returns local capability") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto server = kj::heap(); + auto& serverRef = *server; + auto bootstrap = capnp::Capability::Client(kj::mv(server)); + auto rpc = capnp::makeRpcServer(reg, kj::mv(bootstrap)); + + capnp::MallocMessageBuilder msg; + auto vatId = msg.initRoot(); + vatId.setPublicKey(kj::arrayPtr(reg.getPublicKey().bytes, 32)); + + auto client = rpc.bootstrap(vatId); + makeCall(client, io.waitScope); + KJ_EXPECT(serverRef.callCount == 1); +} + +KJ_TEST("ClusterRegistry: peer discovery via directory scan") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg1(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + ClusterRegistry reg2(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + // In unix mode, socket files in the directory are registry entries. + KJ_EXPECT(!reg1.isPeerDead(reg2.getPublicKey())); + KJ_EXPECT(!reg2.isPeerDead(reg1.getPublicKey())); +} + +KJ_TEST("ClusterRegistry: isPeerDead caches live peers briefly") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg1(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + ClusterRegistry reg2(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto peerPath = kj::Path({reg2.getPublicKey().toHex()}); + KJ_EXPECT(!reg1.isPeerDead(reg2.getPublicKey())); + + tmpDir.get()->tryRemove(peerPath); + KJ_EXPECT(!reg1.isPeerDead(reg2.getPublicKey())); +} + +KJ_TEST("ClusterRegistry: isPeerDead caches dead peers permanently") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + X25519PublicKey peerKey; + memset(peerKey.bytes, 0xCA, sizeof(peerKey.bytes)); + auto peerPath = kj::Path({peerKey.toHex()}); + + KJ_EXPECT(reg.isPeerDead(peerKey)); + tmpDir.get()->openFile(peerPath, kj::WriteMode::CREATE)->writeAll("placeholder"); + KJ_EXPECT(reg.isPeerDead(peerKey)); +} + +KJ_TEST("ClusterRegistry: end-to-end RPC between two registries") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg1(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + ClusterRegistry reg2(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto maint1 = reg1.runMaintenance(); + auto maint2 = reg2.runMaintenance(); + + auto server1 = kj::heap(); + auto& server1Ref = *server1; + auto server2 = kj::heap(); + auto& server2Ref = *server2; + + auto rpc1 = capnp::makeRpcServer(reg1, capnp::Capability::Client(kj::mv(server1))); + auto rpc2 = capnp::makeRpcServer(reg2, capnp::Capability::Client(kj::mv(server2))); + + auto run1 = rpc1.run(); + auto run2 = rpc2.run(); + + // reg1 -> reg2 + { + capnp::MallocMessageBuilder msg; + auto vatId = msg.initRoot(); + vatId.setPublicKey(kj::arrayPtr(reg2.getPublicKey().bytes, 32)); + + auto client = rpc1.bootstrap(vatId); + makeCall(client, io.waitScope); + KJ_EXPECT(server2Ref.callCount == 1); + } + + // reg2 -> reg1 + { + capnp::MallocMessageBuilder msg; + auto vatId = msg.initRoot(); + vatId.setPublicKey(kj::arrayPtr(reg1.getPublicKey().bytes, 32)); + + auto client = rpc2.bootstrap(vatId); + makeCall(client, io.waitScope); + KJ_EXPECT(server1Ref.callCount == 1); + } +} + +KJ_TEST("ClusterRegistry: end-to-end RPC with binary IP registry entries") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg1( + tmpDir.get(), "127.0.0.0/8"_kj, io.provider->getNetwork(), io.provider->getTimer()); + ClusterRegistry reg2( + tmpDir.get(), "127.0.0.0/8"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + auto maint1 = reg1.runMaintenance(); + auto maint2 = reg2.runMaintenance(); + + auto server2 = kj::heap(); + auto& server2Ref = *server2; + + auto rpc1 = capnp::makeRpcServer(reg1, capnp::Capability::Client(kj::heap())); + auto rpc2 = capnp::makeRpcServer(reg2, capnp::Capability::Client(kj::mv(server2))); + + auto run1 = rpc1.run(); + auto run2 = rpc2.run(); + + capnp::MallocMessageBuilder msg; + auto vatId = msg.initRoot(); + vatId.setPublicKey(kj::arrayPtr(reg2.getPublicKey().bytes, 32)); + + auto client = rpc1.bootstrap(vatId); + makeCall(client, io.waitScope); + KJ_EXPECT(server2Ref.callCount == 1); +} + +KJ_TEST("ClusterRegistry: idle timeout closes connections") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + auto shortTimeout = 100 * kj::MILLISECONDS; + + ClusterRegistry reg1( + tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer(), shortTimeout); + ClusterRegistry reg2( + tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer(), shortTimeout); + + auto server2 = kj::heap(); + auto& server2Ref = *server2; + + auto rpc1 = capnp::makeRpcServer(reg1, capnp::Capability::Client(kj::heap())); + auto rpc2 = capnp::makeRpcServer(reg2, capnp::Capability::Client(kj::mv(server2))); + + auto run1 = rpc1.run(); + auto run2 = rpc2.run(); + + capnp::MallocMessageBuilder msg; + auto vatId = msg.initRoot(); + vatId.setPublicKey(kj::arrayPtr(reg2.getPublicKey().bytes, 32)); + + { + auto client = rpc1.bootstrap(vatId); + makeCall(client, io.waitScope); + KJ_EXPECT(server2Ref.callCount == 1); + } + // Client dropped — connection should become idle. + + // Wait for the idle timeout to fire. + io.provider->getTimer().afterDelay(shortTimeout + 200 * kj::MILLISECONDS).wait(io.waitScope); + + // After idle close, a new bootstrap should still work (new connection). + auto client2 = rpc1.bootstrap(vatId); + makeCall(client2, io.waitScope); + KJ_EXPECT(server2Ref.callCount == 2); +} + +KJ_TEST("ClusterRegistry: verifyNfsLease() succeeds when lock is held") { + auto io = kj::setupAsyncIo(); + TempDir tmpDir; + + ClusterRegistry reg(tmpDir.get(), "unix"_kj, io.provider->getNetwork(), io.provider->getTimer()); + + reg.verifyNfsLease(); +} + +#else // #if __linux__ + +KJ_TEST("dummy test: platform not supported") {} + +#endif // #if __linux__, #else + +} // namespace +} // namespace workerd diff --git a/src/workerd/server/cluster-registry.c++ b/src/workerd/server/cluster-registry.c++ new file mode 100644 index 00000000000..a430d059e86 --- /dev/null +++ b/src/workerd/server/cluster-registry.c++ @@ -0,0 +1,825 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "cluster-registry.h" + +#if !_WIN32 +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +#include +#include +#include +#include +#include + +namespace workerd { + +#if __linux__ + +namespace { + +constexpr kj::Duration REGISTRY_MAINTENANCE_INTERVAL = 60 * kj::SECONDS; +constexpr kj::Duration DIRECTORY_SCAN_INTERVAL = 30 * kj::SECONDS; +constexpr kj::Duration NEW_REGISTRY_ENTRY_GRACE_PERIOD = 15 * kj::SECONDS; +constexpr kj::Duration FAILED_PROBE_THROTTLE = 5 * kj::SECONDS; +constexpr kj::Duration SUSPECT_PEER_AVOIDANCE_PERIOD = 15 * kj::SECONDS; +constexpr kj::Duration PEER_LIVENESS_CACHE_TTL = 15 * kj::SECONDS; + +} // namespace + +// ======================================================================================= +// X25519PublicKey + +kj::String X25519PublicKey::toHex() const { + return kj::encodeHex(kj::arrayPtr(bytes, sizeof(bytes))); +} + +X25519PublicKey X25519PublicKey::fromHex(kj::StringPtr hex) { + KJ_REQUIRE(hex.size() == 64, "X25519 public key hex must be 64 characters", hex.size()); + auto decoded = kj::decodeHex(hex.asArray()); + KJ_REQUIRE(!decoded.hadErrors, "invalid hex in X25519 public key"); + KJ_ASSERT(decoded.size() == 32); + X25519PublicKey result; + memcpy(result.bytes, decoded.begin(), 32); + return result; +} + +// ======================================================================================= +// ClusterRegistry::NativeAddress + +void ClusterRegistry::NativeAddress::setPort(uint port) { + KJ_REQUIRE(port <= static_cast(kj::maxValue), "invalid port", port); + + switch (storage.ss_family) { + case AF_INET: { + KJ_REQUIRE(length == sizeof(sockaddr_in), "invalid IPv4 sockaddr length", length); + auto& sin = *reinterpret_cast(&storage); + sin.sin_port = htons(port); + break; + } + case AF_INET6: { + KJ_REQUIRE(length == sizeof(sockaddr_in6), "invalid IPv6 sockaddr length", length); + auto& sin6 = *reinterpret_cast(&storage); + sin6.sin6_port = htons(port); + break; + } + default: + KJ_FAIL_REQUIRE("address does not have a TCP port", storage.ss_family); + } +} + +ClusterRegistry::NativeAddress ClusterRegistry::NativeAddress::fromSockaddr(const sockaddr* addr) { + KJ_REQUIRE(addr != nullptr, "missing sockaddr"); + + NativeAddress result; + switch (addr->sa_family) { + case AF_INET: + result.length = sizeof(sockaddr_in); + break; + case AF_INET6: + result.length = sizeof(sockaddr_in6); + break; + default: + KJ_FAIL_REQUIRE("sockaddr is not an IP address", addr->sa_family); + } + + memcpy(&result.storage, addr, result.length); + return result; +} + +ClusterRegistry::NativeAddress ClusterRegistry::NativeAddress::parse( + kj::ArrayPtr bytes) { + KJ_REQUIRE( + bytes.size() >= sizeof(sa_family_t), "peer registry address is too short", bytes.size()); + KJ_REQUIRE( + bytes.size() <= sizeof(sockaddr_storage), "peer registry address is too long", bytes.size()); + + NativeAddress result; + result.length = bytes.size(); + memcpy(&result.storage, bytes.begin(), bytes.size()); + + switch (result.storage.ss_family) { + case AF_INET: + KJ_REQUIRE( + result.length == sizeof(sockaddr_in), "invalid IPv4 sockaddr length", result.length); + break; + case AF_INET6: + KJ_REQUIRE( + result.length == sizeof(sockaddr_in6), "invalid IPv6 sockaddr length", result.length); + break; + default: + KJ_FAIL_REQUIRE("peer registry address is not an IP sockaddr", result.storage.ss_family); + } + + return result; +} + +// ======================================================================================= +// ClusterRegistry::ConnectionImpl + +class ClusterRegistry::ConnectionImpl final: public ClusterVatNetworkBase::Connection, + public kj::Refcounted { + public: + ConnectionImpl(ClusterRegistry& registry, + kj::Own stream, + kj::Maybe peerKey, + kj::Maybe> wasRefused) + : registry(registry), + stream(kj::mv(stream)), + msgStream(*this->stream, capnp::IncomingRpcMessage::getShortLivedCallback()), + peerKey(peerKey), + wasRefused(kj::mv(wasRefused)), + outbound(peerKey != kj::none) { + // We always need to pump immediately to send the handshake. + ensurePumping(); + } + + ~ConnectionImpl() noexcept(false) { + unregister(); + } + + cluster::VatId::Reader getPeerVatId() override { + // We can safely expected the RPC system won't call this method until there is actually some + // activity on the stream that causes it to be needed. If this is an inbound stream, there + // is necessary no activity until we receive some messages, and we must have received the + // peer key before that. + // + // This is convenient because otherwise we'd have to figure out what to do with this call + // if we haven't received the handshake yet, and therefore don't actually know who the peer + // is. + auto pk = KJ_ASSERT_NONNULL(peerKey, "getPeerVatId() called too early"); + + // Build `peerVatId` on first use. + KJ_IF_SOME(p, peerVatId) { + return p.getRoot().asReader(); + } else { + auto builder = peerVatId.emplace(peerVatIdScratch).getRoot(); + builder.setPublicKey(kj::arrayPtr(pk.bytes, sizeof(pk.bytes))); + return builder.asReader(); + } + } + + kj::Own newOutgoingMessage(uint firstSegmentWordSize) override; + kj::Promise>> receiveIncomingMessage() override; + kj::Promise shutdown() override; + void setIdle(bool newIdle) override; + + private: + class OutgoingMessageImpl; + class IncomingMessageImpl; + + ClusterRegistry& registry; + kj::Own stream; + capnp::BufferedMessageStream msgStream; + kj::Maybe peerKey; + kj::Maybe> wasRefused; + bool outbound; // is this an outbound or inbound connection? + + capnp::word peerVatIdScratch[8]{}; + kj::Maybe peerVatId; + + kj::Vector> writeQueue; + kj::Maybe> pumpTask; + kj::Maybe brokenReason; + + bool pumping = false; // is pump() running? + bool isShutdown = false; // has shutdown() been called? + + bool idle = false; // tracks last value passed to setIdle() + bool idleTimedOut = false; + + bool sentHandshake = false; + bool receivedHandshake = false; + + kj::Maybe> idleTimer; + kj::Maybe>>>> + interruptReceiveFulfiller; + + void ensurePumping(); + kj::Promise pump(); + + void requireOpen(); + void unregister(); + + friend class ClusterRegistry; +}; + +class ClusterRegistry::ConnectionImpl::OutgoingMessageImpl final: public capnp::OutgoingRpcMessage, + public kj::Refcounted { + public: + OutgoingMessageImpl(ConnectionImpl& conn, uint firstSegmentWordSize) + : conn(conn), + message(firstSegmentWordSize == 0 ? capnp::SUGGESTED_FIRST_SEGMENT_WORDS + : firstSegmentWordSize) {} + + capnp::AnyPointer::Builder getBody() override { + return message.getRoot(); + } + + void send() override { + conn.requireOpen(); + KJ_REQUIRE(!conn.idle, "bug in RpcSystem: trying to send a message while idle"); + KJ_REQUIRE(!conn.idleTimedOut, "send() after idle period timed out"); + + conn.writeQueue.add(kj::addRef(*this)); + conn.ensurePumping(); + } + + size_t sizeInWords() override { + return message.sizeInWords(); + } + + private: + ConnectionImpl& conn; + capnp::MallocMessageBuilder message; + + friend class ConnectionImpl; +}; + +class ClusterRegistry::ConnectionImpl::IncomingMessageImpl final: public capnp::IncomingRpcMessage { + public: + IncomingMessageImpl(kj::Own message): message(kj::mv(message)) {} + + capnp::AnyPointer::Reader getBody() override { + return message->getRoot(); + } + + size_t sizeInWords() override { + return message->sizeInWords(); + } + + private: + kj::Own message; +}; + +kj::Own ClusterRegistry::ConnectionImpl::newOutgoingMessage( + uint firstSegmentWordSize) { + KJ_REQUIRE(!idle, "bug in RpcSystem: trying to send a message while idle"); + return kj::refcounted(*this, firstSegmentWordSize); +} + +kj::Promise>> ClusterRegistry::ConnectionImpl:: + receiveIncomingMessage() { + KJ_IF_SOME(b, brokenReason) { + kj::throwFatalException(b.clone()); + } + if (idleTimedOut) { + co_return kj::none; + } + + if (!receivedHandshake) { + X25519PublicKey receivedKey; + auto readHandshake = stream->read(receivedKey.bytes); + if (outbound && registry.nfsMode()) { + co_await registry.timer.timeoutAfter(registry.connectTimeout, kj::mv(readHandshake)); + } else { + co_await readHandshake; + } + KJ_IF_SOME(expectedKey, peerKey) { + KJ_REQUIRE(receivedKey == expectedKey, "peer did not send expected handshake bytes"); + } else { + peerKey = receivedKey; + } + receivedHandshake = true; + if (outbound) { + KJ_IF_SOME(entry, registry.peers.find(KJ_ASSERT_NONNULL(peerKey))) { + entry.suspect = false; + entry.suspectSince = kj::none; + } + } + } + + auto paf = kj::newPromiseAndFulfiller>>(); + interruptReceiveFulfiller = kj::mv(paf.fulfiller); + + auto message = co_await msgStream.tryReadMessage().exclusiveJoin(kj::mv(paf.promise)); + + KJ_IF_SOME(m, message) { + co_return kj::heap(kj::mv(m)); + } else { + co_return kj::none; + } +} + +kj::Promise ClusterRegistry::ConnectionImpl::shutdown() { + requireOpen(); + isShutdown = true; + ensurePumping(); + return KJ_ASSERT_NONNULL(kj::mv(pumpTask)); +} + +void ClusterRegistry::ConnectionImpl::setIdle(bool newIdle) { + if (newIdle == idle) return; // Already in the desired state. + idle = newIdle; + + // Only outbound connections should potentially close themselves when idle. Inbound should + // be closed at the other end. + if (outbound) { + if (idle) { + // Wait for the idle timeout. + idleTimer = registry.timer.afterDelay(registry.idleTimeout) + .then([this]() { + // Wait until the event queue is empty, just in case we're concurrently receiving a + // message that causes us to become non-idle. (But that shouldn't happen since this is + // an outbound connection. Once idle, it should only be revived from our end. But just to + // be safe.) + return kj::evalLast([this]() { + // We officially timed out. Make sure all future calls to receiveIncomingMessage() return + // kj::none. + idleTimedOut = true; + + // Cancel the current receiveIncomingMessage() by fulfilling the promise that is + // joined with the `tryReadMessage()` promise. + KJ_IF_SOME(i, interruptReceiveFulfiller) { + i->fulfill(kj::none); + } + + // Make sure a new call to connect() can't return this again. + unregister(); + }); + }).eagerlyEvaluate(nullptr); + } else { + // Cancel the last idle wait. + idleTimer = kj::none; + } + } +} + +void ClusterRegistry::ConnectionImpl::ensurePumping() { + if (!pumping) { + pumping = true; + pumpTask = pump(); + } +} + +kj::Promise ClusterRegistry::ConnectionImpl::pump() { + KJ_TRY { + if (!sentHandshake) { + co_await stream->write(registry.publicKey.bytes); + sentHandshake = true; + } + + // Give a chance for messages to accumulate to be sent all at once. + co_await kj::yieldUntilQueueEmpty(); + + while (!writeQueue.empty()) { + // Take ownership of all messages currently pending. `writeQueue` is left empty, but may + // be repopulated concurrently. + auto ownMessages = kj::mv(writeQueue); + + // Write them all at once. + auto messageSegments = + KJ_MAP(msg, ownMessages) { return msg->message.getSegmentsForOutput(); }; + co_await msgStream.writeMessages(messageSegments); + } + + if (isShutdown) { + co_await msgStream.end(); + } + + pumping = false; + } + KJ_CATCH(exception) { + brokenReason = exception.clone(); + KJ_IF_SOME(i, interruptReceiveFulfiller) { + i->reject(exception.clone()); + } + kj::throwFatalException(kj::mv(exception)); + } +} + +void ClusterRegistry::ConnectionImpl::requireOpen() { + KJ_IF_SOME(e, brokenReason) { + kj::throwFatalException(e.clone()); + } + KJ_REQUIRE(!isShutdown, "bug in RpcSystem: can't send() after shutdown()"); +} + +void ClusterRegistry::ConnectionImpl::unregister() { + if (outbound) { + auto key = KJ_ASSERT_NONNULL(peerKey); // peerKey is always set for outbounds + + KJ_TRY { + KJ_IF_SOME(entry, registry.peers.findEntry(key)) { + KJ_IF_SOME(conn, entry.value.outboundConnection) { + if (&conn == this) { + entry.value.outboundConnection = kj::none; + } + } + + // If we failed to receive the peer handshake, mark the peer as suspect and run the + // dead-peer cleanup path. We may have failed to connect entirely, or we may have + // connected to a server that wasn't the peer we expected. + if (!receivedHandshake) { + bool wasRefusedUnix = false; + KJ_IF_SOME(w, wasRefused) { + wasRefusedUnix = *w; + } + registry.cleanupFailedPeer(entry, wasRefusedUnix); + } + } + } + KJ_CATCH(exception) { + KJ_LOG(WARNING, "failed to clean up cluster peer after connection failure", key.toHex(), + exception); + } + } +} + +// ======================================================================================= +// ClusterRegistry constructor + +ClusterRegistry::ClusterRegistry(kj::Own registryDirParam, + kj::StringPtr networkConfig, + kj::Network& kjNetwork, + kj::Timer& timer, + kj::Duration idleTimeout, + kj::Duration connectTimeout) + : registryDir(kj::mv(registryDirParam)), + dirFd(KJ_REQUIRE_NONNULL( + registryDir->getFd(), "cluster registry directory must be a disk-backed directory")), + timer(timer), + network(kjNetwork), + idleTimeout(idleTimeout), + connectTimeout(connectTimeout) { + // Generate X25519 keypair. + X25519_keypair(publicKey.bytes, privateKey); + publicKeyHex = publicKey.toHex(); + + if (networkConfig == "unix") { + // Unix mode: bind a Unix domain socket at /. + // Use /proc/self/fd// as the socket path. + auto socketPath = kj::str("/proc/self/fd/", dirFd, "/", publicKeyHex); + sockaddr_un addr{}; + KJ_REQUIRE( + socketPath.size() < sizeof(addr.sun_path), "Unix socket path is too long", socketPath); + addr.sun_family = AF_UNIX; + memcpy(addr.sun_path, socketPath.begin(), socketPath.size()); + listener = kjNetwork.getSockaddr(&addr, sizeof(addr))->listen(); + } else { + // CIDR/IP/NFS mode: find a matching local IP and bind an ephemeral port. + auto addr = findMatchingIp(networkConfig); + listener = kjNetwork.getSockaddr(addr.asSockaddr(), addr.length)->listen(); + auto port = listener->getPort(); + addr.setPort(port); + boundAddress = addr; + } +} + +ClusterRegistry::~ClusterRegistry() noexcept(false) { + registryDir->tryRemove(kj::Path({publicKeyHex})); +} + +// Search all IP addresses for all interfaces available in the network namespace to find one that +// matches the given CIDR. +ClusterRegistry::NativeAddress ClusterRegistry::findMatchingIp(kj::StringPtr cidr) { + kj::CidrRange range(cidr); + + struct ifaddrs* ifap = nullptr; + KJ_SYSCALL(getifaddrs(&ifap)); + KJ_DEFER(freeifaddrs(ifap)); + + for (auto* ifa = ifap; ifa != nullptr; ifa = ifa->ifa_next) { + if (ifa->ifa_addr == nullptr) continue; + if (range.matches(ifa->ifa_addr)) { + auto result = NativeAddress::fromSockaddr(ifa->ifa_addr); + result.setPort(0); + return result; + } + } + + KJ_FAIL_REQUIRE("no local interface matches CIDR", cidr); +} + +// ======================================================================================= +// Registry maintenance + +kj::Promise ClusterRegistry::runMaintenance() { + KJ_IF_SOME(addr, boundAddress) { + // CIDR/IP mode: create the registry file, lock it, write our address, then periodically + // verify that our registry entry and NFS lock lease are still valid. + + auto registryFile = registryDir->openFile(kj::Path({publicKeyHex}), kj::WriteMode::CREATE); + + // Acquire an exclusive OFD lock. + auto lock = KJ_ASSERT_NONNULL( + OfdLock::tryLock(KJ_ASSERT_NONNULL(registryFile->getFd()), OfdLock::EXCLUSIVE), + "our registry entry is already locked?"); + + // Write our address. + registryFile->write(0, addr.asBytes()); + registryFile->sync(); + + return maintenanceLoopCanceler.wrap(maintenanceLoop(registration.emplace(Registration{ + .file = kj::mv(registryFile), + .lock = kj::mv(lock), + }))); + } else { + // Unix socket mode: the listener socket's lifetime IS the registry entry. Nothing to maintain. + return kj::NEVER_DONE; + } +} + +kj::Promise ClusterRegistry::maintenanceLoop(Registration& registration) { + for (;;) { + co_await timer.afterDelay(REGISTRY_MAINTENANCE_INTERVAL); + + // Verify nlink > 0 (not unlinked by another node). + KJ_REQUIRE(registration.file->stat().linkCount > 0, + "registry file was unlinked by another node; this instance should shut down"); + + // Verify the OFD lock is still held (NFSv4 lease check). + registration.lock.verifyHeld(); + } +} + +void ClusterRegistry::verifyNfsLease() { + KJ_IF_SOME(r, registration) { + KJ_TRY { + r.lock.verifyHeld(); + } + KJ_CATCH(exception) { + // Might as well cancel the maintenance loop immediately. + maintenanceLoopCanceler.cancel(exception); + kj::throwFatalException(kj::mv(exception)); + } + } +} + +// ======================================================================================= +// Peer discovery + +void ClusterRegistry::scanDirectory() { + // List the registry to discover new peers and add them to the `peers` map, so that they can + // be considered by `pickRandomPeer()`. Keep in mind that on NFS, directory listings tend to be + // cached, and therefore we can's assume the existence or absence of a file really tells us + // anything about whether the peer is currently alive or dead. The best we can do is populate the + // map entries so that we know that peers exist at all, and then we'll need to probe them for + // liveness as usual later on. + + auto names = registryDir->listNames(); + + for (auto& name: names) { + if (name.size() != 64) continue; + if (name == publicKeyHex) continue; + + auto decoded = kj::decodeHex(name.asArray()); + if (decoded.hadErrors || decoded.size() != 32) continue; + + X25519PublicKey key; + memcpy(key.bytes, decoded.begin(), 32); + + peers.findOrCreate(key, [&]() -> decltype(peers)::Entry { return {.key = key}; }); + } +} + +void ClusterRegistry::scanDirectoryIfStale(kj::TimePoint now) { + KJ_IF_SOME(lastScan, lastDirectoryScan) { + if (now - lastScan < DIRECTORY_SCAN_INTERVAL) return; + } + + scanDirectory(); + lastDirectoryScan = now; +} + +bool ClusterRegistry::isPeerDead(const X25519PublicKey& key) { + if (key == publicKey) return false; + + auto& entry = + peers.findOrCreateEntry(key, [&]() -> decltype(peers)::Entry { return {.key = key}; }); + + if (entry.value.knownDead) return true; + + auto now = timer.now(); + KJ_IF_SOME(lastLive, entry.value.lastConfirmedLiveTime) { + if (now - lastLive < PEER_LIVENESS_CACHE_TTL) return false; + } + + if (registryDir->exists(kj::Path({key.toHex()}))) { + entry.value.lastConfirmedLiveTime = now; + return false; + } + + entry.value.knownDead = true; + return true; +} + +kj::Maybe ClusterRegistry::pickRandomPeer() { + kj::Vector candidates; + auto now = timer.now(); + scanDirectoryIfStale(now); + + for (auto& peer: peers) { + if (peer.key == publicKey) continue; + if (peer.value.knownDead) continue; + if (peer.value.suspect) { + KJ_IF_SOME(since, peer.value.suspectSince) { + if (now - since < SUSPECT_PEER_AVOIDANCE_PERIOD) continue; + } + } + candidates.add(peer.key); + } + + if (candidates.empty()) return kj::none; + + // Simple pseudo-random selection using the monotonic clock as a cheap source. (Don't use + // timer.now() since it only updates when the event loop waits for I/O.) + auto nanos = + (kj::systemPreciseMonotonicClock().now() - kj::origin()) / kj::NANOSECONDS; + auto idx = kj::hashCode(nanos) % candidates.size(); + return candidates[idx]; +} + +void ClusterRegistry::cleanupFailedPeer(decltype(peers)::Entry& entry, bool wasRefusedUnix) { + auto now = timer.now(); + + entry.value.suspect = true; + entry.value.suspectSince = now; + + auto path = kj::Path({entry.key.toHex()}); + + if (!nfsMode()) { + if (wasRefusedUnix) { + // We failed with ECONNREFUSED, which means the listener is gone (process exited or closed + // the listen socket), in which case we can certainly clean up the socket from disk. + registryDir->tryRemove(path); + entry.value.knownDead = true; + } else { + // We failed for some other reason. Check if the file has disappeared from disk. + if (registryDir->exists(path)) { + // File still exists. We may have failed with EAGAIN (the peer's listen queue is full) or + // some other error where the peer is not necessarily gone. We've already marked it + // suspect, but we should not actually erase it. + } else { + // File is gone from disk. This may be exactly why the connection failed, but regardless, + // we should now mark the peer dead in our cache. + entry.value.knownDead = true; + } + } + + return; + } + + KJ_IF_SOME(lastProbe, entry.value.lastFailedProbeTime) { + if (now - lastProbe < FAILED_PROBE_THROTTLE) return; + } + entry.value.lastFailedProbeTime = now; + + KJ_IF_SOME(file, registryDir->tryOpenFile(path)) { + auto meta = file->stat(); + auto calendarNow = kj::systemCoarseCalendarClock().now(); + if (calendarNow - meta.lastModified < NEW_REGISTRY_ENTRY_GRACE_PERIOD) return; + + if (OfdLock::tryLock(KJ_ASSERT_NONNULL(file->getFd()), OfdLock::SHARED) != kj::none) { + registryDir->tryRemove(path); + entry.value.knownDead = true; + } + } else { + entry.value.knownDead = true; + } +} + +// ======================================================================================= +// Peer address lookup + +ClusterRegistry::NativeAddress ClusterRegistry::readPeerAddress(const X25519PublicKey& key) { + auto path = kj::Path({key.toHex()}); + auto file = + KJ_REQUIRE_NONNULL(registryDir->tryOpenFile(path), "peer not found in registry", key.toHex()); + auto content = file->readAllBytes(); + KJ_REQUIRE(content.size() > 0, "peer registry file is empty", key.toHex()); + return NativeAddress::parse(content); +} + +// ======================================================================================= +// VatNetwork: connect() + +kj::Maybe> ClusterRegistry::connect( + cluster::VatId::Reader peer) { + auto keyData = peer.getPublicKey(); + KJ_REQUIRE(keyData.size() == 32, "invalid VatId: publicKey must be 32 bytes"); + + X25519PublicKey peerKey; + memcpy(peerKey.bytes, keyData.begin(), 32); + + // Self-connect returns kj::none per VatNetwork contract. + if (peerKey == publicKey) { + return kj::none; + } + + // Check for cached outbound connection. + KJ_IF_SOME(entry, peers.find(peerKey)) { + KJ_IF_SOME(conn, entry.outboundConnection) { + return kj::addRef(conn); + } + } + + auto& entry = peers.findOrCreateEntry( + peerKey, [&]() -> decltype(peers)::Entry { return {.key = peerKey}; }); + + KJ_REQUIRE(!entry.value.knownDead, "peer is known to be dead", peerKey.toHex()); + + // Need to establish a new connection. Look up the peer's address. + kj::Own networkAddress; + if (nfsMode()) { + NativeAddress address; + try { + address = readPeerAddress(peerKey); + } catch (...) { + cleanupFailedPeer(entry, false); + throw; + } + networkAddress = network.getSockaddr(address.asSockaddr(), address.length); + } else { + auto path = kj::str("/proc/self/fd/", dirFd, "/", peerKey.toHex()); + sockaddr_un addr{}; + KJ_REQUIRE(path.size() < sizeof(addr.sun_path), "Unix socket path is too long", path); + addr.sun_family = AF_UNIX; + memcpy(addr.sun_path, path.begin(), path.size()); + networkAddress = network.getSockaddr(&addr, sizeof(addr)); + } + + auto connectPromise = networkAddress->connect(); + + kj::Maybe> wasRefused; + if (!nfsMode()) { + auto wrapper = kj::refcountedWrapper(false); + wasRefused = wrapper->addWrappedRef(); + connectPromise = connectPromise.catch_( + [wrapper = kj::mv(wrapper)]( + kj::Exception&& e) mutable -> kj::Promise> { + if (e.getType() == kj::Exception::Type::DISCONNECTED) { + // If connect() failed with DISCONNECTED on a unix socket, this likely means we got + // ECONNREFUSED. Record this. + wrapper->getWrapped() = true; + } + kj::throwFatalException(kj::mv(e)); + }); + } + + auto promisedStream = kj::newPromisedStream(connectPromise.attach(kj::mv(networkAddress))); + + auto conn = + kj::refcounted(*this, kj::mv(promisedStream), peerKey, kj::mv(wasRefused)); + + // Cache the outbound connection. + entry.value.outboundConnection = *conn; + + return kj::Own(kj::addRef(*conn)); +} + +// ======================================================================================= +// VatNetwork: accept() + +kj::Promise> ClusterRegistry::accept() { + auto stream = co_await listener->accept(); + co_return kj::refcounted(*this, kj::mv(stream), kj::none, kj::none); +} + +// ======================================================================================= + +#else // #if __linux__ + +ClusterRegistry::ClusterRegistry(kj::Own registryDir, + kj::StringPtr network, + kj::Network& kjNetwork, + kj::Timer& timer, + kj::Duration idleTimeout, + kj::Duration connectTimeout) { + KJ_UNIMPLEMENTED("cluster mode is only implemented on linux"); +} +ClusterRegistry::~ClusterRegistry() noexcept(false) {} + +kj::Promise ClusterRegistry::runMaintenance() { + KJ_UNREACHABLE; +} +void ClusterRegistry::verifyNfsLease() { + KJ_UNREACHABLE; +} +bool ClusterRegistry::isPeerDead(const X25519PublicKey& key) { + KJ_UNREACHABLE; +} +kj::Maybe ClusterRegistry::pickRandomPeer() { + KJ_UNREACHABLE; +} +kj::Maybe> ClusterRegistry::connect( + cluster::VatId::Reader peer) { + KJ_UNREACHABLE; +} +kj::Promise> ClusterRegistry::accept() { + KJ_UNREACHABLE; +} + +#endif // #if __linux__, #else + +} // namespace workerd diff --git a/src/workerd/server/cluster-registry.h b/src/workerd/server/cluster-registry.h new file mode 100644 index 00000000000..781e91c4504 --- /dev/null +++ b/src/workerd/server/cluster-registry.h @@ -0,0 +1,171 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#pragma once + +#include +#include + +#if __linux__ +#include +#endif + +#include +#include +#include +#include +#include +#include +#include + +namespace workerd { + +using kj::byte; +using kj::uint; + +struct X25519PublicKey { + kj::byte bytes[32]; + + kj::String toHex() const; + static X25519PublicKey fromHex(kj::StringPtr hex); + + bool operator==(const X25519PublicKey& other) const { + return memcmp(bytes, other.bytes, sizeof(bytes)) == 0; + } + kj::uint hashCode() const { + return kj::hashCode(kj::ArrayPtr(bytes)); + } +}; + +// Type alias for the VatNetwork base that ClusterRegistry implements. +using ClusterVatNetworkBase = capnp::VatNetwork; + +class ClusterRegistry final: public ClusterVatNetworkBase { + // Manages this node's cluster identity, registry presence, peer discovery, and the + // VatNetwork that the cluster's RpcSystem sits on top of. + public: + ClusterRegistry(kj::Own registryDir, + kj::StringPtr network, + kj::Network& kjNetwork, + kj::Timer& timer, + kj::Duration idleTimeout = 60 * kj::SECONDS, + kj::Duration connectTimeout = 1 * kj::SECONDS); + ~ClusterRegistry() noexcept(false); + + const X25519PublicKey& getPublicKey() { + return publicKey; + } + + // Run the registry maintenance loop. Returns a promise that must be held alive for + // the lifetime of the server. + kj::Promise runMaintenance(); + + // Verify that the NFS lease is still valid on the registration file, or throw an exception if + // not (no-op if not in NFS mode). + // + // More specifically: If you call this immediately after open()ing a file, and it returns + // successfully, then the newly-opened file is guaranteed to be on the same lease as the + // registration. (Technically, the lease could have been lost already, but if so, the + // newly-opened file will not be writable.) + void verifyNfsLease(); + + // Definitive proof-of-death query. Returns true only when the peer's registry entry is absent. + bool isPeerDead(const X25519PublicKey& key); + + // Pick a random peer's key from the cached peer list, biased away from recent failures. + kj::Maybe pickRandomPeer(); + + // Returns true if this registry uses IP sockets (not unix sockets) and is therefore in + // "NFS mode" where it tries to be NFS-friendly. + bool nfsMode() { + return boundAddress != kj::none; + } + + // implements VatNetwork ----------------------------------------------------- + kj::Maybe> connect(cluster::VatId::Reader peer) override; + kj::Promise> accept() override; + + private: +#if __linux__ + class ConnectionImpl; + + struct NativeAddress { + sockaddr_storage storage{}; + socklen_t length = 0; + + const sockaddr* asSockaddr() const { + return reinterpret_cast(&storage); + } + + kj::ArrayPtr asBytes() const { + return kj::arrayPtr(reinterpret_cast(&storage), length); + } + + void setPort(uint port); + + static NativeAddress fromSockaddr(const sockaddr* addr); + static NativeAddress parse(kj::ArrayPtr bytes); + }; + + kj::Own registryDir; + int dirFd; // raw fd of registryDir, for POSIX ops + kj::Timer& timer; + kj::Network& network; + kj::Duration idleTimeout; + kj::Duration connectTimeout; + + // Identity + kj::byte privateKey[32]; + X25519PublicKey publicKey; + kj::String publicKeyHex; + + // Address we wrote to the registry file (CIDR/IP/NFS mode). Null in Unix socket mode. + kj::Maybe boundAddress; + + struct Registration { + kj::Own file; + OfdLock lock; + }; + kj::Maybe registration; // only in NFS mode + + kj::Canceler maintenanceLoopCanceler; + + // Listener + kj::Own listener; + + // Peer cache + struct PeerInfo { + kj::Maybe outboundConnection; + kj::Maybe lastConfirmedLiveTime; + kj::Maybe lastFailedProbeTime; + bool suspect = false; + kj::Maybe suspectSince; + bool knownDead = false; + }; + kj::HashMap peers; + kj::Maybe lastDirectoryScan; + + void scanDirectory(); + void scanDirectoryIfStale(kj::TimePoint now); + kj::Promise maintenanceLoop(Registration& registration); + void cleanupFailedPeer(decltype(peers)::Entry& entry, bool wasRefusedUnix); + + NativeAddress findMatchingIp(kj::StringPtr cidr); + + // Read the address for a peer from its registry file. CIDR/IP mode only. + NativeAddress readPeerAddress(const X25519PublicKey& key); + +#else // #if __linux__ + // Just so inline methods compile + X25519PublicKey publicKey; + kj::Maybe boundAddress; + +#endif // #if __linux__, #else +}; + +} // namespace workerd diff --git a/src/workerd/server/cluster.capnp b/src/workerd/server/cluster.capnp new file mode 100644 index 00000000000..35532c83478 --- /dev/null +++ b/src/workerd/server/cluster.capnp @@ -0,0 +1,25 @@ +# Copyright (c) 2026 Cloudflare, Inc. +# Licensed under the Apache 2.0 license found in the LICENSE file or at: +# https://opensource.org/licenses/Apache-2.0 + +@0xbd0c09739255a698; + +using Cxx = import "/capnp/c++.capnp"; +$Cxx.namespace("workerd::cluster"); +$Cxx.allowCancellation; + +struct VatId { + publicKey @0 :Data; + # 32-byte raw X25519 public key. +} + +# The following are stubs for v1. They are not used because canIntroduceTo() returns +# false (the default), so the RPC system falls back to proxying for any cross-peer +# capability passing. Cap'n Proto itself has not yet fully implemented Level 3, so +# leaving these empty is fine. +struct ThirdPartyToContact {} +struct ThirdPartyToAwait {} +struct ThirdPartyCompletion {} + +# Level 4 (Join) is not supported. +struct JoinResult {} diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 95dafe1a09b..ac723793d86 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -5,6 +5,7 @@ #include "server.h" #include "alarm-scheduler.h" +#include "cluster-lock.h" #include "container-client.h" #include "pyodide.h" #include "workerd-api.h" @@ -46,6 +47,7 @@ #include #include #include +#include #include #include #include @@ -191,7 +193,6 @@ Server::Server(kj::Filesystem& fs, reportConfigError(kj::mv(reportConfigError)), loggingOptions(loggingOptions), memoryCacheProvider(kj::heap(timer)), - channelTokenHandler(*this), tasks(*this) {} struct Server::GlobalContext { @@ -1916,6 +1917,8 @@ class Server::WorkerService final: public Service, kj::Array> streamingTails; kj::Array> workerLoaders; kj::Maybe workerdDebugPortNetwork; + kj::Maybe clusterRegistry; + kj::Maybe&> clusterRpc; }; using LinkCallback = kj::Function; @@ -1972,9 +1975,11 @@ class Server::WorkerService final: public Service, } auto actorClass = kj::refcounted(*this, entry.key, Frankenvalue()); - auto ns = kj::heap(kj::mv(actorClass), entry.value, - kj::systemPreciseCalendarClock(), threadContext.getUnsafeTimer(), - threadContext.getByteStreamFactory(), channelTokenHandler, network, dockerPath, + auto ns = kj::heap(kj::mv(actorClass), + // Dynamic workers can't have actor namespaces, so `serviceName` must be available here. + KJ_ASSERT_NONNULL(serviceName), entry.key, entry.value, kj::systemPreciseCalendarClock(), + threadContext.getUnsafeTimer(), threadContext.getByteStreamFactory(), + threadContext.getHttpOverCapnpFactory(), channelTokenHandler, network, dockerPath, containerEgressInterceptorImage, waitUntilTasks); actorNamespaces.insert(entry.key, kj::mv(ns)); } @@ -2079,7 +2084,7 @@ class Server::WorkerService final: public Service, auto linked = callback(*this, errorReporter); for (auto& ns: actorNamespaces) { - ns.value->link(linked.actorStorage); + ns.value->link(linked.actorStorage, linked.clusterRegistry, linked.clusterRpc); } ioChannels = kj::mv(linked); @@ -2249,35 +2254,61 @@ class Server::WorkerService final: public Service, class ActorNamespace final { public: ActorNamespace(kj::Own actorClass, + kj::StringPtr serviceName, + kj::StringPtr className, const ActorConfig& config, const kj::Clock& clock, kj::Timer& timer, capnp::ByteStreamFactory& byteStreamFactory, + capnp::HttpOverCapnpFactory& httpOverCapnpFactory, ChannelTokenHandler& channelTokenHandler, kj::Network& dockerNetwork, kj::Maybe dockerPath, kj::Maybe containerEgressInterceptorImage, kj::TaskSet& waitUntilTasks) : actorClass(kj::mv(actorClass)), + serviceName(serviceName), + className(className), config(config), clock(clock), timer(timer), byteStreamFactory(byteStreamFactory), + httpOverCapnpFactory(httpOverCapnpFactory), channelTokenHandler(channelTokenHandler), dockerNetwork(dockerNetwork), dockerPath(dockerPath), containerEgressInterceptorImage(containerEgressInterceptorImage), waitUntilTasks(waitUntilTasks) {} - void link(kj::Maybe serviceActorStorage) { + class ClusterActorChannel; + + void link(kj::Maybe serviceActorStorage, + kj::Maybe clusterRegistry, + kj::Maybe&> clusterRpc) { KJ_IF_SOME(dir, serviceActorStorage) { KJ_IF_SOME(d, config.tryGet()) { - this->actorStorage.emplace(dir.openSubdir( - kj::Path({d.uniqueKey}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY)); + this->actorStorage.emplace(dir.openSubdir(kj::Path({d.uniqueKey}), + kj::WriteMode::CREATE | kj::WriteMode::MODIFY), + *this); + KJ_IF_SOME(registry, clusterRegistry) { + auto& rpc = KJ_ASSERT_NONNULL(clusterRpc); + this->clusterRegistry = registry; + auto lockDirectory = KJ_ASSERT_NONNULL(this->actorStorage) + .directory->openSubdir(kj::Path({"locks"}), + kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + this->lockManager.emplace(kj::mv(lockDirectory), registry, rpc, timer); + } } } KJ_IF_SOME(d, config.tryGet()) { + if (lockManager != kj::none) { + // TODO(clustering): Support alarms in cluster mode. Until then, we must skip creating + // an AlarmScheduler altogether otherwise multiple instances will fight over the alarm + // database. + return; + } + auto idFactory = kj::heap(d.uniqueKey); AlarmScheduler::GetActorFn getActor = [this, idFactory = kj::mv(idFactory)]( @@ -2309,6 +2340,8 @@ class Server::WorkerService final: public Service, return config; } + // Get the ActorChannel for the given actor ID, routing to the appropriate workerd instance + // if needed. kj::Own getActorChannel(Worker::Actor::Id id) { KJ_IF_SOME(doId, id.tryGet>()) { KJ_IF_SOME(name, doId->getName()) { @@ -2321,7 +2354,43 @@ class Server::WorkerService final: public Service, } } - return kj::refcounted(getActorContainer(kj::mv(id))); + KJ_IF_SOME(ld, lockManager) { + auto key = actorKey(id); + + // First check if the container exists locally. + KJ_IF_SOME(container, actors.find(key)) { + return kj::refcounted(container->addRef()); + } + + auto promise = ld.acquireOrRoute(key); + return kj::refcounted( + promise.then([this, id = kj::mv(id), key = kj::mv(key)]( + kj::OneOf + lockOrClient) mutable -> kj::Own { + KJ_SWITCH_ONEOF(lockOrClient) { + KJ_CASE_ONEOF(ownership, ClusterLockManager::OwnedLock) { + return kj::refcounted( + createActorContainer(kj::mv(key), kj::mv(id), kj::mv(ownership))); + } + KJ_CASE_ONEOF(debugPort, rpc::WorkerdDebugPort::Client) { + auto req = debugPort.getActorRequest(capnp::MessageSize{32, 0}); + req.setService(serviceName); + req.setEntrypoint(className); + req.setActorId(key); + KJ_IF_SOME(i, id.tryGet>()) { + KJ_IF_SOME(name, i->getName()) { + req.setActorName(name); + } + } + return kj::refcounted( + byteStreamFactory, httpOverCapnpFactory, req.send().getActor()); + } + } + KJ_UNREACHABLE; + })); + } else { + return kj::refcounted(getActorContainer(kj::mv(id))); + } } class ActorContainer; @@ -2354,7 +2423,8 @@ class Server::WorkerService final: public Service, ActorNamespace& ns, kj::Maybe parent, kj::OneOf> classAndIdParam, - kj::Timer& timer) + kj::Timer& timer, + kj::Maybe ownershipLock = kj::none) : key(kj::mv(key)), tracker(kj::refcounted(*this)), ns(ns), @@ -2362,7 +2432,8 @@ class Server::WorkerService final: public Service, .orDefault(*this)), parent(parent), timer(timer), - lastAccess(timer.now()) { + lastAccess(timer.now()), + ownershipLock(kj::mv(ownershipLock)) { KJ_SWITCH_ONEOF(classAndIdParam) { KJ_CASE_ONEOF(value, ClassAndId) { // `classAndId` is immediately available. @@ -2624,6 +2695,7 @@ class Server::WorkerService final: public Service, kj::Maybe> shutdownTask; kj::Maybe> onBrokenTask; kj::Maybe brokenReason; + kj::Maybe ownershipLock; // in cluster mode // Reference to the ContainerClient (if container is enabled for this actor) kj::Maybe> containerClient; @@ -2667,6 +2739,7 @@ class Server::WorkerService final: public Service, ns.actorStorage, "can't call getFacetId() when there's no backing storage"); auto indexFile = as.directory->openFile( kj::Path({kj::str(key, ".facets")}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + ns.verifyRegistration(); return *facetTreeIndex.emplace(kj::heap(kj::mv(indexFile))); } } @@ -2817,7 +2890,9 @@ class Server::WorkerService final: public Service, KJ_IF_SOME(as, ns.actorStorage) { kj::Own sqliteHooks; if (parent == kj::none) { - KJ_IF_SOME(a, ns.alarmScheduler) { + if (ns.lockManager != kj::none) { + sqliteHooks = kj::heap(); + } else KJ_IF_SOME(a, ns.alarmScheduler) { sqliteHooks = kj::heap(a, ActorKey{.actorId = key}); } else { // No alarm scheduler available, use default hooks instance. @@ -2835,9 +2910,15 @@ class Server::WorkerService final: public Service, // Before we do anything, make sure the database is in WAL mode. We also need to // do this after reset() is used, so register a callback for that. + if (ns.nfsMode()) { + db->run("PRAGMA locking_mode=EXCLUSIVE;"); + } db->run("PRAGMA journal_mode=WAL;"); db->afterReset([this, &dir = *as.directory, selfId](SqliteDatabase& db) { + if (ns.nfsMode()) { + db.run("PRAGMA locking_mode=EXCLUSIVE;"); + } db.run("PRAGMA journal_mode=WAL;"); // reset() is used when the app called deleteAll(), in which case we also want to @@ -2921,19 +3002,25 @@ class Server::WorkerService final: public Service, } }; - kj::Own getActorContainer(Worker::Actor::Id id) { - kj::String key; - + kj::String actorKey(Worker::Actor::Id& id) { KJ_SWITCH_ONEOF(id) { KJ_CASE_ONEOF(obj, kj::Own) { KJ_REQUIRE(config.is()); - key = obj->toString(); + return obj->toString(); } KJ_CASE_ONEOF(str, kj::String) { KJ_REQUIRE(config.is()); - key = kj::str(str); + return kj::str(str); } } + KJ_UNREACHABLE; + } + + // Get or create an ActorContainer. Use only when NOT in cluster mode. + kj::Own getActorContainer(Worker::Actor::Id id) { + KJ_REQUIRE(lockManager == kj::none); + + kj::String key = actorKey(id); return actors .findOrCreate(key, [&]() mutable { @@ -2945,6 +3032,18 @@ class Server::WorkerService final: public Service, })->addRef(); } + // Create an ActorContainer in cluster mode. + kj::Own createActorContainer( + kj::String key, Worker::Actor::Id id, ClusterLockManager::OwnedLock ownership) { + KJ_ASSERT(actors.find(key) == kj::none); + + auto container = kj::refcounted(kj::str(key), *this, kj::none, + ActorContainer::ClassAndId(kj::addRef(*actorClass), kj::mv(id)), timer, + kj::mv(ownership)); + actors.insert(container->getKey(), container->addRef()); + return container; + } + kj::Own getContainerClient( kj::StringPtr containerId, kj::StringPtr imageName) { KJ_IF_SOME(existingClient, containerClients.find(containerId)) { @@ -3037,6 +3136,8 @@ class Server::WorkerService final: public Service, private: kj::Own actorClass; + kj::StringPtr serviceName; + kj::StringPtr className; const ActorConfig& config; const kj::Clock& clock; @@ -3044,9 +3145,12 @@ class Server::WorkerService final: public Service, kj::Own directory; SqliteDatabase::Vfs vfs; - ActorStorage(kj::Own directoryParam) + ActorStorage(kj::Own directoryParam, ActorNamespace& ns) : directory(kj::mv(directoryParam)), - vfs(*directory) {} + vfs(*directory, + SqliteDatabase::VfsOptions{ + .afterOpen = kj::Function(KJ_BIND_METHOD(ns, verifyRegistration)), + }) {} }; // Note: The Vfs, actorStorage, and ownAlarmScheduler must not be torn down until all actors @@ -3054,6 +3158,9 @@ class Server::WorkerService final: public Service, kj::Maybe actorStorage; kj::Maybe> ownAlarmScheduler; + kj::Maybe clusterRegistry; + kj::Maybe lockManager; + // Tracks the canceler and cleanup promise for a Docker container's lifecycle cleanup. // Useful to await on async calls of a ContainerClient destructor when the new // one appears before they've been resolved. @@ -3089,6 +3196,7 @@ class Server::WorkerService final: public Service, kj::Maybe> cleanupTask; kj::Timer& timer; capnp::ByteStreamFactory& byteStreamFactory; + capnp::HttpOverCapnpFactory& httpOverCapnpFactory; ChannelTokenHandler& channelTokenHandler; kj::Network& dockerNetwork; kj::Maybe dockerPath; @@ -3132,6 +3240,23 @@ class Server::WorkerService final: public Service, } } + bool nfsMode() { + KJ_IF_SOME(r, clusterRegistry) { + return r.nfsMode(); + } else { + return false; + } + } + + // Callback called immediately after any actor storage file is opened to verify that the + // registry lock is still held, and therefore the new open must be on the same NFS lease as + // the registry lock. + void verifyRegistration() { + KJ_IF_SOME(r, clusterRegistry) { + r.verifyNfsLease(); + } + } + // Implements actor loopback, which is used by websocket hibernation to deliver events to the // actor from the websocket's read loop. class Loopback: public Worker::Actor::Loopback, public kj::Refcounted { @@ -3171,6 +3296,43 @@ class Server::WorkerService final: public Service, AlarmScheduler& alarmScheduler; ActorKey actor; }; + + class ClusterActorSqliteHooks final: public ActorSqlite::Hooks { + public: + kj::Promise scheduleRun( + kj::Maybe newAlarmTime, kj::Promise priorTask) override { + // TODO(clustering): Support alarms. + JSG_FAIL_REQUIRE(Error, "Durable Object alarms are not yet supported in cluster mode"); + } + }; + + // ActorChannel implementation wrapping a `WorkerdBootstrap` RPC stub. + class RpcActorChannel final: public IoChannelFactory::ActorChannel { + public: + RpcActorChannel(capnp::ByteStreamFactory& byteStreamFactory, + capnp::HttpOverCapnpFactory& httpOverCapnpFactory, + rpc::WorkerdBootstrap::Client client) + : byteStreamFactory(byteStreamFactory), + httpOverCapnpFactory(httpOverCapnpFactory), + client(kj::mv(client)) {} + + kj::Own startRequest( + IoChannelFactory::SubrequestMetadata metadata) override { + capnp::MessageSize sizeHint{4, 0}; + KJ_IF_SOME(cf, metadata.cfBlobJson) { + sizeHint.wordCount += cf.size() / sizeof(capnp::word); + } + + auto dispatcher = client.startEventRequest(sizeHint).sendForPipeline().getDispatcher(); + return kj::heap( + httpOverCapnpFactory, byteStreamFactory, kj::mv(dispatcher)); + } + + private: + capnp::ByteStreamFactory& byteStreamFactory; + capnp::HttpOverCapnpFactory& httpOverCapnpFactory; + rpc::WorkerdBootstrap::Client client; + }; }; private: @@ -3352,6 +3514,33 @@ class Server::WorkerService final: public Service, kj::Own actorContainer; }; + // TODO(clustering): Replace this with the promised channels coming in the async channel token + // change, once it has landed. + class PromisedActorChannel final: public IoChannelFactory::ActorChannel { + public: + PromisedActorChannel(kj::Promise> promise) + : promise(promise + .then([this](kj::Own inner) { + this->inner = kj::mv(inner); + }) + .fork()) {} + + kj::Own startRequest(IoChannelFactory::SubrequestMetadata metadata) override { + KJ_IF_SOME(i, inner) { + return i->startRequest(kj::mv(metadata)); + } else { + return newPromisedWorkerInterface(promise.addBranch().then( + [self = addRefToThis(), metadata = kj::mv(metadata)]() mutable { + return KJ_ASSERT_NONNULL(self->inner)->startRequest(kj::mv(metadata)); + })); + } + } + + private: + kj::Maybe> inner; + kj::ForkedPromise promise; + }; + // --------------------------------------------------------------------------- // implements kj::TaskSet::ErrorHandler @@ -4870,6 +5059,10 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr auto linkCallback = [this, def = kj::mv(def), totalActorChannels](WorkerService& workerService, Worker::ValidationErrorReporter& errorReporter) mutable { WorkerService::LinkedIoChannels result; + KJ_IF_SOME(registry, this->clusterRegistry) { + result.clusterRegistry = *registry; + result.clusterRpc = KJ_ASSERT_NONNULL(this->clusterRpc); + } auto entrypointNames = workerService.getEntrypointNames(); auto actorClassNames = workerService.getActorClassNames(); @@ -5020,13 +5213,13 @@ kj::Promise> Server::makeWorkerImpl(kj::StringPtr kj::Maybe serviceName; if (!def.isDynamic) serviceName = name; - auto result = - kj::refcounted(channelTokenHandler, serviceName, globalContext->threadContext, - monotonicClock, kj::mv(worker), kj::mv(errorReporter.defaultEntrypoint), - kj::mv(errorReporter.namedEntrypoints), kj::mv(errorReporter.actorClasses), - kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors), - KJ_BIND_METHOD(*this, deleteAllActors), kj::mv(dockerPath), - kj::mv(containerEgressInterceptorImage), def.isDynamic, kj::mv(abortIsolateCallback)); + auto result = kj::refcounted(KJ_ASSERT_NONNULL(channelTokenHandler), serviceName, + globalContext->threadContext, monotonicClock, kj::mv(worker), + kj::mv(errorReporter.defaultEntrypoint), kj::mv(errorReporter.namedEntrypoints), + kj::mv(errorReporter.actorClasses), kj::mv(linkCallback), + KJ_BIND_METHOD(*this, abortAllActors), KJ_BIND_METHOD(*this, deleteAllActors), + kj::mv(dockerPath), kj::mv(containerEgressInterceptorImage), def.isDynamic, + kj::mv(abortIsolateCallback)); result->initActorNamespaces(def.localActorConfigs, network); co_return result; } @@ -5625,6 +5818,15 @@ class Server::DebugPortListener { co_return co_await server.listen(*listener); } + // Expose WorkerdDebugPortImpl for use by the cluster RPC system. + // + // TODO(cleanup): Maybe move WorkerDebugPortImpl up into Server. However, note the clustering + // code might also shift to a different interface. + static rpc::WorkerdDebugPort::Client makeBootstrap( + Server& owner, capnp::HttpOverCapnpFactory& httpOverCapnpFactory) { + return kj::heap(&owner, httpOverCapnpFactory); + } + private: Server& owner; kj::Own listener; @@ -5716,8 +5918,12 @@ class Server::DebugPortListener { auto decoded = kj::decodeHex(actorIdStr); KJ_REQUIRE(decoded.size() == SHA256_DIGEST_LENGTH, "Invalid Durable Object ID: expected 64 hex characters (32 bytes)", decoded.size()); + kj::Maybe name; + if (params.hasActorName()) { + name = params.getActorName().clone(); + } kj::Own id = - kj::heap(decoded.begin(), kj::none); + kj::heap(decoded.begin(), kj::mv(name)); actorId = kj::mv(id); } KJ_CASE_ONEOF(c, Ephemeral) { @@ -5762,6 +5968,12 @@ kj::Promise Server::handleDrain(kj::Promise drainWhen) { // doc comment, we instead add the promise to `tasks` to be safe. tasks.add(httpServer.httpServer.drain()); } + + // TODO(clustering): This is a harsh ending for actors on this machine. We should really evict + // each one at a time that is convenient, and wait some time for RPC requests to drain. + // Three-party handoff would help with straggler requests that we forward to the new owner. + clusterRpc = kj::none; + abortAllActors(kj::none); } kj::Promise Server::run( @@ -5782,6 +5994,8 @@ kj::Promise Server::run( loggingOptions.structuredLogging = StructuredLogging(config.getStructuredLogging()); } + setupChannelTokenHandler(config); + kj::HttpHeaderTable::Builder headerTableBuilder; globalContext = kj::heap(*this, v8System, headerTableBuilder); invalidConfigServiceSingleton = kj::refcounted(); @@ -5804,6 +6018,15 @@ kj::Promise Server::run( co_return co_await listenPromise.exclusiveJoin(kj::mv(fatalPromise)); } +void Server::setupChannelTokenHandler(config::Config::Reader config) { + ChannelTokenHandler::Resolver& tokenResolver = *this; + kj::Maybe channelTokenKey; + if (config.hasCluster()) { + channelTokenKey = config.getCluster().getKey(); + } + channelTokenHandler.emplace(tokenResolver, channelTokenKey); +} + // Configure and start the inspector socket, returning the port the socket started on. uint startInspector( kj::StringPtr inspectorAddress, Server::InspectorServiceIsolateRegistrar& registrar) { @@ -5939,7 +6162,22 @@ kj::Promise Server::startServices(jsg::V8System& v8System, } goto validDurableObjectStorage; case config::Worker::DurableObjectStorage::IN_MEMORY: + if (config.hasCluster() && hadDurable) { + reportConfigError(kj::str("Worker service \"", name, + "\" uses in-memory Durable Object storage, which is not supported in cluster " + "mode.")); + } + goto validDurableObjectStorage; case config::Worker::DurableObjectStorage::LOCAL_DISK: + if (config.hasCluster() && hadDurable && + workerConf.getDurableObjectStorage().getLocalDisk() != + config.getCluster().getSharedDirectory()) { + reportConfigError( + kj::str("Worker service \"", name, "\" uses Durable Object storage disk service \"", + workerConf.getDurableObjectStorage().getLocalDisk(), + "\", but cluster.sharedDirectory is \"", + config.getCluster().getSharedDirectory(), "\".")); + } goto validDurableObjectStorage; } reportConfigError(kj::str("Encountered unknown durableObjectStorage type in service \"", name, @@ -6002,6 +6240,48 @@ kj::Promise Server::startServices(jsg::V8System& v8System, return decltype(services)::Entry{kj::str("internet"_kj), kj::mv(service)}; }); + if (config.hasCluster()) { + if (!experimental) { + reportConfigError( + "Cluster mode is experimental and subject to change. You must run workerd with " + "`--experimental` to use it."_kj.clone()); + } +#ifndef __linux__ + reportConfigError("Cluster mode is currently implemented only on Linux."_kj.clone()); +#endif + + auto cluster = config.getCluster(); + KJ_IF_SOME(sharedDirService, services.find(cluster.getSharedDirectory())) { + auto diskSvc = dynamic_cast(sharedDirService.get()); + if (diskSvc == nullptr) { + reportConfigError(kj::str("cluster.sharedDirectory refers to service \"", + cluster.getSharedDirectory(), "\", but that service is not a disk directory.")); + } else KJ_IF_SOME(dir, diskSvc->getWritable()) { + auto registryDir = dir.openSubdir( + kj::Path({cluster.getRegistrySubdir()}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY); + auto registry = + kj::heap(kj::mv(registryDir), cluster.getNetwork(), network, timer); + tasks.add(registry->runMaintenance().exclusiveJoin(forkedDrainWhen.addBranch())); + auto& reg = *clusterRegistry.emplace(kj::mv(registry)); + clusterRpc.emplace(capnp::makeRpcServer( + reg, DebugPortListener::makeBootstrap(*this, globalContext->httpOverCapnpFactory))); + + // TODO(clustering): We really shouldn't start accepting connections until the server + // is totally up, but the RPC system actually starts accepting immediately on + // construction, even if you don't call run(). + KJ_IF_SOME(rpc, clusterRpc) { + tasks.add(rpc.run().exclusiveJoin(forkedDrainWhen.addBranch())); + } + } else { + reportConfigError(kj::str("cluster.sharedDirectory refers to disk service \"", + cluster.getSharedDirectory(), "\", but that service is defined read-only.")); + } + } else { + reportConfigError(kj::str("cluster.sharedDirectory refers to service \"", + cluster.getSharedDirectory(), "\", but no such service is defined.")); + } + } + // Third pass: Cross-link services. for (auto& service: services) { ConfigErrorReporter errorReporter(*this, service.key); @@ -6220,6 +6500,8 @@ kj::Promise Server::test(jsg::V8System& v8System, loggingOptions.structuredLogging = StructuredLogging(config.getStructuredLogging()); } + setupChannelTokenHandler(config); + kj::HttpHeaderTable::Builder headerTableBuilder; globalContext = kj::heap(*this, v8System, headerTableBuilder); invalidConfigServiceSingleton = kj::refcounted(); diff --git a/src/workerd/server/server.h b/src/workerd/server/server.h index 4ddfd5447a0..d0111effe41 100644 --- a/src/workerd/server/server.h +++ b/src/workerd/server/server.h @@ -9,8 +9,10 @@ #include #include #include +#include #include +#include #include #include #include @@ -156,7 +158,10 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl kj::Own memoryCacheProvider; - ChannelTokenHandler channelTokenHandler; + kj::Maybe channelTokenHandler; + + kj::Maybe> clusterRegistry; + kj::Maybe> clusterRpc; kj::HashMap>> socketOverrides; kj::HashMap directoryOverrides; @@ -339,6 +344,8 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl void unlinkWorkerLoaders(); + void setupChannelTokenHandler(config::Config::Reader config); + kj::Promise preloadPython( kj::StringPtr workerName, const WorkerDef& workerDef, ErrorReporter& errorReporter); diff --git a/src/workerd/server/tests/cluster/BUILD.bazel b/src/workerd/server/tests/cluster/BUILD.bazel new file mode 100644 index 00000000000..ae7ad386f7f --- /dev/null +++ b/src/workerd/server/tests/cluster/BUILD.bazel @@ -0,0 +1,45 @@ +load("@aspect_rules_js//js:defs.bzl", "js_test") + +js_test( + name = "cluster-test-unix", + size = "medium", + data = [ + ":cluster-test.wd-test", + ":cluster-worker.js", + "//src/workerd/server:workerd", + ], + entry_point = "cluster-test.mjs", + env = { + "WORKERD_BINARY": "$(rootpath //src/workerd/server:workerd)", + "WD_TEST_CONFIG": "$(rootpath :cluster-test.wd-test)", + }, + tags = ["js-test"], + target_compatible_with = select({ + "@platforms//os:linux": [], + "//conditions:default": ["@platforms//:incompatible"], + }), +) + +js_test( + name = "cluster-test-cidr", + # The CIDR variant exercises the dead-peer detection path, which has to + # wait out the new-entry grace period before unlinking the dead peer's + # registry file. That makes the takeover test ~15s on its own, so the + # combined run needs more than the default "medium" budget. + size = "large", + data = [ + ":cluster-test-cidr.wd-test", + ":cluster-worker.js", + "//src/workerd/server:workerd", + ], + entry_point = "cluster-test.mjs", + env = { + "WORKERD_BINARY": "$(rootpath //src/workerd/server:workerd)", + "WD_TEST_CONFIG": "$(rootpath :cluster-test-cidr.wd-test)", + }, + tags = ["js-test"], + target_compatible_with = select({ + "@platforms//os:linux": [], + "//conditions:default": ["@platforms//:incompatible"], + }), +) diff --git a/src/workerd/server/tests/cluster/cluster-test-cidr.wd-test b/src/workerd/server/tests/cluster/cluster-test-cidr.wd-test new file mode 100644 index 00000000000..3bebcbb6634 --- /dev/null +++ b/src/workerd/server/tests/cluster/cluster-test-cidr.wd-test @@ -0,0 +1,49 @@ +# Copyright (c) 2026 Cloudflare, Inc. +# Licensed under the Apache 2.0 license found in the LICENSE file or at: +# https://opensource.org/licenses/Apache-2.0 + +# CIDR/IP variant of cluster-test.wd-test. Uses 127.0.0.0/8 (the loopback) so +# the registry-file locking, TCP peer connections, new-entry grace period, and +# dead-peer lock probes are exercised independently of the unix-socket variant. + +using Workerd = import "/workerd/workerd.capnp"; + +const config :Workerd.Config = ( + services = [ + ( name = "shared", + disk = ( + writable = true, + allowDotfiles = true, + ) + ), + + ( name = "main", + worker = ( + modules = [ + ( name = "cluster-worker.js", esModule = embed "cluster-worker.js" ), + ], + compatibilityDate = "2024-09-01", + compatibilityFlags = ["nodejs_compat"], + bindings = [ + ( name = "COUNTER", durableObjectNamespace = "Counter" ), + ( name = "NODE_ID", fromEnvironment = "NODE_ID" ), + ], + durableObjectNamespaces = [ + ( className = "Counter", uniqueKey = "counter-test-namespace" ), + ], + durableObjectStorage = ( localDisk = "shared" ), + ) + ), + ], + + sockets = [ + ( name = "http", address = "*:0", http = (), service = "main" ), + ], + + cluster = ( + sharedDirectory = "shared", + registrySubdir = "workerd-registry", + network = "127.0.0.0/8", + key = "test-cluster-shared-secret", + ), +); diff --git a/src/workerd/server/tests/cluster/cluster-test.mjs b/src/workerd/server/tests/cluster/cluster-test.mjs new file mode 100644 index 00000000000..3203dc34812 --- /dev/null +++ b/src/workerd/server/tests/cluster/cluster-test.mjs @@ -0,0 +1,465 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 +// +// Integration test for the cluster mode described in scalable-durable-objects.md. +// +// The test spawns several workerd instances pointing at the same shared +// directory, then exercises: +// 1. Consistent DO state across instances (single-writer correctness). +// 2. Transparent forwarding when a request arrives at the "wrong" instance. +// 3. Takeover after a node is killed. +// 4. Alarms are rejected with a clear error in cluster mode. +// 5. No `metadata.sqlite` is ever created (no AlarmScheduler in cluster mode). +// +// Two variants are run: the unix-socket cluster network mode (default) and a +// localhost CIDR (`127.0.0.0/8`) IP-socket mode that exercises the registry +// file locking and TCP peer connections separately. The variant is selected +// by the WD_TEST_CONFIG environment variable supplied by the BUILD target. + +import { spawn } from 'node:child_process'; +import { mkdtemp, readdir, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { env } from 'node:process'; +import { test } from 'node:test'; +import assert from 'node:assert'; +import { setTimeout as sleep } from 'node:timers/promises'; + +assert( + env.WORKERD_BINARY !== undefined, + 'You must set the WORKERD_BINARY environment variable.' +); +assert( + env.WD_TEST_CONFIG !== undefined, + 'You must set the WD_TEST_CONFIG environment variable.' +); + +const CONTROL_FD = 3; + +// A minimal harness specialized for the cluster test. Unlike server-harness.mjs +// it: +// - allows overriding bindings via process environment (NODE_ID), +// - permits multiple instances to share a directory, +// - supports SIGKILL in addition to SIGTERM for the takeover test. +class ClusterNode { + #binary; + #config; + #sharedPath; + #nodeId; + #child = null; + #httpPort = null; + #closed = null; + #stderrBuffer = ''; + + constructor({ binary, config, sharedPath, nodeId }) { + this.#binary = binary; + this.#config = config; + this.#sharedPath = sharedPath; + this.#nodeId = nodeId; + } + + get nodeId() { + return this.#nodeId; + } + + get httpPort() { + assert(this.#httpPort !== null, 'node not started'); + return this.#httpPort; + } + + // Returns the contents of the node's stderr observed so far. Useful for + // verifying that internal error messages (e.g. the cluster alarm rejection) + // were emitted. + get stderr() { + return this.#stderrBuffer; + } + + async start() { + assert.strictEqual(this.#child, null); + + const args = [ + 'serve', + this.#config, + '--experimental', + '--verbose', + `--control-fd=${CONTROL_FD}`, + `--directory-path=shared=${this.#sharedPath}`, + `--socket-addr=http=127.0.0.1:0`, + ]; + + const child = spawn(this.#binary, args, { + stdio: ['ignore', 'inherit', 'pipe', 'pipe'], + env: { ...env, NODE_ID: this.#nodeId }, + }); + this.#child = child; + + child.stderr.on('data', (data) => { + const chunk = data.toString('utf8'); + this.#stderrBuffer += chunk; + // Mirror to our own stderr so test output remains useful. + process.stderr.write(`[${this.#nodeId}] ${chunk}`); + }); + + // Watch the control fd for the assigned http port. + const portPromise = new Promise((resolve, reject) => { + let buffer = ''; + const onData = (data) => { + buffer += data.toString('utf8'); + let nl; + while ((nl = buffer.indexOf('\n')) !== -1) { + const line = buffer.slice(0, nl).trim(); + buffer = buffer.slice(nl + 1); + if (!line) continue; + try { + const parsed = JSON.parse(line); + if (parsed.event === 'listen' && parsed.socket === 'http') { + child.stdio[CONTROL_FD].off('data', onData); + resolve(parsed.port); + return; + } + } catch (err) { + reject( + new Error( + `Failed to parse control message from node ${this.#nodeId}: ${line}` + ) + ); + return; + } + } + }; + child.stdio[CONTROL_FD].on('data', onData); + child.once('error', reject); + child.once('exit', (code, signal) => { + if (this.#httpPort === null) { + reject( + new Error( + `Node ${this.#nodeId} exited before listening (code=${code}, signal=${signal})` + ) + ); + } + }); + }); + + this.#closed = new Promise((resolve) => { + child.once('exit', (code, signal) => resolve({ code, signal })); + }); + + await new Promise((resolve, reject) => { + child.once('spawn', resolve).once('error', reject); + }); + + this.#httpPort = await portPromise; + } + + async stop({ signal = 'SIGTERM', timeoutMs = 10_000 } = {}) { + if (this.#child === null) return null; + const child = this.#child; + this.#child = null; + child.kill(signal); + + const killTimer = setTimeout(() => { + try { + child.kill('SIGKILL'); + } catch (_) { + // process may already be gone. + } + }, timeoutMs); + + const result = await this.#closed; + clearTimeout(killTimer); + this.#httpPort = null; + return result; + } +} + +async function fetchJson(port, path, init) { + const url = `http://127.0.0.1:${port}${path}`; + const res = await fetch(url, init); + const text = await res.text(); + let body; + try { + body = JSON.parse(text); + } catch (err) { + throw new Error( + `Non-JSON response from ${url} (status ${res.status}): ${text}` + ); + } + return { status: res.status, body }; +} + +async function withCluster(numNodes, fn) { + // Each test run gets its own shared directory, so the registry/lock-file + // state doesn't leak between tests. + const sharedPath = await mkdtemp(join(tmpdir(), 'workerd-cluster-')); + const nodes = []; + try { + for (let i = 0; i < numNodes; i++) { + const node = new ClusterNode({ + binary: env.WORKERD_BINARY, + config: env.WD_TEST_CONFIG, + sharedPath, + nodeId: `node${i}`, + }); + await node.start(); + nodes.push(node); + } + await fn({ nodes, sharedPath }); + } finally { + // Stop all nodes (ignore errors -- some may already be stopped by the + // test itself). + for (const node of nodes) { + try { + await node.stop({ timeoutMs: 5000 }); + } catch (_) { + // ignore + } + } + await rm(sharedPath, { recursive: true, force: true }); + } +} + +// --------------------------------------------------------------------------- +// Tests + +test('cluster: DO state is consistent across instances (single writer)', async () => { + await withCluster(3, async ({ nodes }) => { + // Send a series of /increment requests to a round-robin of nodes, all for + // the same DO name. Track the maximum returned count -- it must be a + // strictly-increasing sequence (1, 2, 3, ...) because only one instance is + // the writer at any given time and the DO state is persistent. + const totalRequests = 12; + const expected = []; + for (let i = 1; i <= totalRequests; i++) expected.push(i); + + const observed = []; + const ownerIds = new Set(); + for (let i = 0; i < totalRequests; i++) { + const node = nodes[i % nodes.length]; + const { status, body } = await fetchJson( + node.httpPort, + '/increment?name=consistency' + ); + assert.strictEqual( + status, + 200, + `request to ${node.nodeId} failed: ${JSON.stringify(body)}` + ); + observed.push(body.count); + ownerIds.add(body.nodeId); + } + observed.sort((a, b) => a - b); + assert.deepStrictEqual( + observed, + expected, + `each /increment must be a unique strictly-increasing count` + ); + + // The DO is owned by exactly one node at a time. Even though we directed + // requests at all 3 nodes, every observed response should report the same + // nodeId -- the actual owner -- because incoming requests are forwarded. + assert.strictEqual( + ownerIds.size, + 1, + `expected exactly one DO owner across ${totalRequests} requests, got: ${[...ownerIds].join(', ')}` + ); + }); +}); + +test('cluster: requests to non-owner are transparently forwarded', async () => { + await withCluster(2, async ({ nodes }) => { + // Prime the DO by sending an increment to node 0. That node should claim + // ownership and respond. Then send a /get to node 1. Node 1 should forward + // to node 0 (the owner) and return the same state, and node 0 should be + // identified as the responder. + const first = await fetchJson( + nodes[0].httpPort, + '/increment?name=forwarding' + ); + assert.strictEqual(first.status, 200); + assert.strictEqual(first.body.count, 1); + const owner = first.body.nodeId; + + const second = await fetchJson(nodes[1].httpPort, '/get?name=forwarding'); + assert.strictEqual(second.status, 200); + assert.strictEqual( + second.body.count, + 1, + `forwarded /get must see prior increment` + ); + assert.strictEqual( + second.body.nodeId, + owner, + `forwarded request must be served by the original owner (${owner}), not the forwarding node (${nodes[1].nodeId})` + ); + + // Same id must be reported for both calls (sanity check for idFromName + // determinism across instances). + assert.strictEqual(first.body.id, second.body.id); + }); +}); + +test('cluster: killing the owner allows another node to take over', async () => { + await withCluster(2, async ({ nodes }) => { + // Prime the DO on whichever node ends up owning it. + const initial = await fetchJson( + nodes[0].httpPort, + '/increment?name=takeover' + ); + assert.strictEqual(initial.status, 200); + assert.strictEqual(initial.body.count, 1); + const originalOwner = initial.body.nodeId; + const ownerIndex = nodes.findIndex((n) => n.nodeId === originalOwner); + assert.notStrictEqual(ownerIndex, -1); + const survivorIndex = ownerIndex === 0 ? 1 : 0; + const survivor = nodes[survivorIndex]; + + // Hard-kill the owner. SIGKILL leaves the registry file in place, so the + // survivor has to detect death via a failed RPC + dead-peer cleanup probe. + await nodes[ownerIndex].stop({ signal: 'SIGKILL', timeoutMs: 2000 }); + + // Hit the survivor repeatedly. The first request may fail or take time + // while the survivor's RPC attempt to the dead owner is in flight; once + // the dead-peer cleanup completes, subsequent requests succeed and the + // survivor takes ownership. The persisted count must be preserved. + let response = null; + let lastErr = null; + const deadline = Date.now() + 30_000; + while (Date.now() < deadline) { + try { + response = await fetchJson( + survivor.httpPort, + '/increment?name=takeover' + ); + if (response.status === 200) break; + } catch (err) { + lastErr = err; + } + await sleep(200); + } + assert( + response !== null && response.status === 200, + `survivor never succeeded; last error: ${lastErr}` + ); + assert.strictEqual( + response.body.nodeId, + survivor.nodeId, + `survivor (${survivor.nodeId}) must own the DO after takeover, got ${response.body.nodeId}` + ); + assert.strictEqual( + response.body.count, + 2, + `persisted state must be preserved across takeover` + ); + + // Further requests to the survivor continue to work and increment. + const followup = await fetchJson( + survivor.httpPort, + '/increment?name=takeover' + ); + assert.strictEqual(followup.status, 200); + assert.strictEqual(followup.body.count, 3); + assert.strictEqual(followup.body.nodeId, survivor.nodeId); + }); +}); + +test('cluster: alarms are rejected with a clear error', async () => { + await withCluster(1, async ({ nodes, sharedPath }) => { + // Fire the request. In cluster mode `setAlarm()` does not throw + // synchronously to the JS code -- the rejection happens during output-gate + // flush, after the worker's fetch handler has already returned. The + // observable behaviour is therefore a non-2xx HTTP status, while the + // canonical error message is logged to stderr where the test can detect + // it. (The spec mandates the message "Durable Object alarms are not yet + // supported in cluster mode".) + let response; + try { + response = await fetch( + `http://127.0.0.1:${nodes[0].httpPort}/set-alarm?name=alarm-test` + ); + } catch (err) { + // A network-level failure is acceptable too: it confirms that the alarm + // path did not silently succeed. + response = null; + } + if (response !== null) { + // Drain the body to allow logging to flush. + try { + await response.text(); + } catch (_) { + // ignore + } + assert.notStrictEqual( + Math.floor(response.status / 100), + 2, + `expected non-2xx for set-alarm in cluster mode, got ${response.status}` + ); + } + + // Wait briefly for stderr to flush the error message. + const deadline = Date.now() + 5000; + const expectedMessage = + 'Durable Object alarms are not yet supported in cluster mode'; + while ( + Date.now() < deadline && + !nodes[0].stderr.includes(expectedMessage) + ) { + await sleep(50); + } + assert( + nodes[0].stderr.includes(expectedMessage), + `expected stderr to contain the cluster alarm rejection message ` + + `("${expectedMessage}"); stderr so far:\n${nodes[0].stderr}` + ); + + // No per-namespace metadata.sqlite should exist anywhere under the + // shared directory -- in cluster mode the AlarmScheduler is never + // constructed. + const nsDir = join(sharedPath, 'counter-test-namespace'); + let entries = []; + try { + entries = await readdir(nsDir); + } catch (err) { + // Namespace dir might not exist if the DO was never created on disk + // because the request failed before storage was opened. That is a + // valid (and even stronger) signal that no metadata.sqlite was made. + } + assert( + !entries.some((name) => name.startsWith('metadata.sqlite')), + `cluster mode must not create metadata.sqlite; found: ${entries.join(', ')}` + ); + }); +}); + +test('cluster: registry directory is populated for each running instance', async () => { + await withCluster(2, async ({ nodes, sharedPath }) => { + // Give the instances a brief moment to settle. Both should have written + // their registry entries before they reported their listening sockets, + // but allow a small grace period for filesystem visibility. + const registryDir = join(sharedPath, 'workerd-registry'); + let entries = []; + const deadline = Date.now() + 5000; + while (Date.now() < deadline) { + try { + entries = await readdir(registryDir); + } catch (_) { + entries = []; + } + if (entries.length >= nodes.length) break; + await sleep(100); + } + assert.strictEqual( + entries.length, + nodes.length, + `expected ${nodes.length} registry entries, got ${entries.length}: [${entries.join(', ')}]` + ); + // Each entry should be a 64-char hex public key. + for (const name of entries) { + assert.match( + name, + /^[0-9a-f]{64}$/, + `registry entry name should be 64-char hex, got: ${name}` + ); + } + }); +}); diff --git a/src/workerd/server/tests/cluster/cluster-test.wd-test b/src/workerd/server/tests/cluster/cluster-test.wd-test new file mode 100644 index 00000000000..6c6d53b2d7e --- /dev/null +++ b/src/workerd/server/tests/cluster/cluster-test.wd-test @@ -0,0 +1,60 @@ +# Copyright (c) 2026 Cloudflare, Inc. +# Licensed under the Apache 2.0 license found in the LICENSE file or at: +# https://opensource.org/licenses/Apache-2.0 + +# Config for the cluster integration test. Designed to be reused across multiple +# workerd instances pointing at the same shared directory: +# - The `shared` disk service has no path; the test driver overrides it via +# `--directory-path shared=` so all instances see the same directory. +# - The `http` socket has address `*:0`; the test driver overrides this and +# reads the actual assigned port from the control fd. +# - The cluster `network` field defaults to "unix"; the test driver overrides +# it to "127.0.0.0/8" when testing the CIDR/IP variant by passing a different +# config (see cluster-test-cidr.wd-test). +# - The NODE_ID env binding lets each instance be identified per request, so +# the test can verify which workerd owns/serves a given DO. + +using Workerd = import "/workerd/workerd.capnp"; + +const config :Workerd.Config = ( + services = [ + # The shared filesystem used by the whole cluster. Path is supplied via + # `--directory-path shared=` on the command line so all instances + # share the same directory. + ( name = "shared", + disk = ( + writable = true, + allowDotfiles = true, + ) + ), + + ( name = "main", + worker = ( + modules = [ + ( name = "cluster-worker.js", esModule = embed "cluster-worker.js" ), + ], + compatibilityDate = "2024-09-01", + compatibilityFlags = ["nodejs_compat"], + bindings = [ + ( name = "COUNTER", durableObjectNamespace = "Counter" ), + ( name = "NODE_ID", fromEnvironment = "NODE_ID" ), + ], + durableObjectNamespaces = [ + ( className = "Counter", uniqueKey = "counter-test-namespace" ), + ], + durableObjectStorage = ( localDisk = "shared" ), + ) + ), + ], + + sockets = [ + ( name = "http", address = "*:0", http = (), service = "main" ), + ], + + cluster = ( + sharedDirectory = "shared", + registrySubdir = "workerd-registry", + network = "unix", + key = "test-cluster-shared-secret", + ), +); diff --git a/src/workerd/server/tests/cluster/cluster-worker.js b/src/workerd/server/tests/cluster/cluster-worker.js new file mode 100644 index 00000000000..dbd107fee3f --- /dev/null +++ b/src/workerd/server/tests/cluster/cluster-worker.js @@ -0,0 +1,86 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +// Test worker for the cluster integration test. Exposes a Counter Durable Object +// and a top-level fetch() that routes requests to the DO by name. +// +// The DO supports the following request paths: +// /increment Increments and returns the current counter value. +// /get Returns the current counter value without incrementing it. +// /set-alarm Tries to schedule a DO alarm. In cluster mode this should +// fail with a "not yet supported" error. +// /identity Returns information identifying this instance (NODE_ID env) +// along with the DO id, so the test can verify which instance +// served the request. + +export default { + async fetch(request, env) { + const url = new URL(request.url); + + // The DO name to act on is given via the `name` query parameter (defaults + // to "default"). All instances using the same name will route to the same + // DO id. + const name = url.searchParams.get('name') ?? 'default'; + + // Synthesize a stub URL for the DO. + const id = env.COUNTER.idFromName(name); + const stub = env.COUNTER.get(id); + + // Forward to the DO with the same pathname. + const forwardUrl = new URL(url.pathname, 'http://do/'); + return await stub.fetch(forwardUrl.toString(), { + method: request.method, + headers: request.headers, + }); + }, +}; + +export class Counter { + constructor(state, env) { + this.state = state; + this.env = env; + } + + async fetch(request) { + const url = new URL(request.url); + const nodeId = this.env.NODE_ID ?? ''; + const idHex = this.state.id.toString(); + + if (url.pathname === '/increment') { + let count = (await this.state.storage.get('count')) ?? 0; + count += 1; + await this.state.storage.put('count', count); + return Response.json({ + count, + nodeId, + id: idHex, + }); + } else if (url.pathname === '/get') { + const count = (await this.state.storage.get('count')) ?? 0; + return Response.json({ count, nodeId, id: idHex }); + } else if (url.pathname === '/set-alarm') { + // Try to schedule an alarm 60s in the future. In cluster mode this should + // throw a clear error. + try { + await this.state.storage.setAlarm(Date.now() + 60_000); + return Response.json({ ok: true, nodeId, id: idHex }); + } catch (err) { + return Response.json( + { ok: false, error: String(err), nodeId, id: idHex }, + { status: 500 } + ); + } + } else if (url.pathname === '/identity') { + return Response.json({ nodeId, id: idHex }); + } + + return new Response('Not found', { status: 404 }); + } + + async alarm() { + // Should never be called in cluster mode; if it ever is, surface that via + // a stored marker so the test can observe the failure mode. + await this.state.storage.put('alarm-fired', true); + } +} diff --git a/src/workerd/server/workerd.capnp b/src/workerd/server/workerd.capnp index d433362ff99..47cc215443f 100644 --- a/src/workerd/server/workerd.capnp +++ b/src/workerd/server/workerd.capnp @@ -94,6 +94,9 @@ struct Config { logging @6 : LoggingOptions; # Console and Stdio logging configuration options. + + cluster @7 :ClusterConfig; + # Set when the workerd instance should join a cluster. } struct LoggingOptions { @@ -111,6 +114,72 @@ struct LoggingOptions { # Set a custom prefix for process.stderr. Defaults to "stderr: ". } +struct ClusterConfig { + # workerd instances can work together in a cluster, in order to distribute work and Durable + # Objects among them. + # + # All instances must share a single filesystem where Durable Object data is stored. The + # filesystem can be NFSv4 or any filesystem that supports similar locking semantics with proper + # leases. It can also just be a local filesystem, when running multiple workerd instances on the + # same machine to utilize multiple cores. + # + # All instances in the cluster MUST have identical configuration. The idea is that all + # instances behave exactly the same and any instance could forward any request it receives to + # any other instance and it should produce the same result. For the purpose of rolling upgrades, + # instances may temporarily differ in ways that are backwards-compatible. + # + # Cluster mode is currently implemented only on Linux. + + sharedDirectory @0 :Text; + # Name of a `DiskDirectory` service which represents storage shared by all instances in the + # cluster. If instances are on different machines (not just different processes on the same + # machine), this should point to a network filesystem that implements proper locking and leasing, + # like NFSv4. + # + # All `durableObjectStorage.localDisk` configurations in this config file must point at this same + # directory. (Each Durable Object namespace will create a subdirectory.) + + registrySubdir @1 :Text = "workerd-registry"; + # Subdirectory of `sharedDirectory` to put the actual registry files into. (This can be customized + # in case the default name conflicts with one of the Durable Object namespace `uniqueKey`s, which + # determine the directory names under which they are stored.) + # + # The registry is used to discover what workerd instances are available. Each workerd instance + # registers itself in the registry such that other instances can find it and send traffic to it. + # + # Specifically, each instance will create a file in this directory named after the instance's + # public key, which is randomly-generated when the instance starts up. The file contains the + # instance's network address, so that other instances can connect to it. + + network @3 :Text; + # Each workerd instance in the cluster listens for connections from other instances over the + # network. The `network` field indicates which network to listen on. + # + # This can be either: + # * An IPv4 or IPv6 CIDR, which must match one of the machine's IP addresses. + # * The word "unix", to mean that the cluster is actually local to the machine and should + # communicate via Unix-domain sockets. + # + # In the CIDR case, workerd enumerates all IP addresses of all network interfaces available to + # it, selects the first one that matches the CIDR, and opens an ephemeral port on that address. + # Note that this means the port workerd uses is not predictable. That's OK because workerd will + # then write a file to the registry containing its own address. + # + # In the "unix" case, the registry will be populated not by files, but by Unix-domain sockets. + + key @2 :Text; + # Some secret value known only to members of the cluster. This is used to encrypt channel tokens, + # which are used when passing service stubs or Durable Object stubs over RPC. + # + # The secrecy of this key matters if you allow untrusted clients outside of the cluster to speak + # the low-level worker-interface.capnp protocol directly with workerd instances in the cluster. + # At present, this is only exposed via the `capnpConnectHost` and `workerdDebugPort` config + # settings. If you don't use either of those in production, then the secrecy of this key isn't + # very important. + # + # TODO(someday): Support some way to gradually roll out a rotated key. +} + # ======================================================================================== # Sockets @@ -701,8 +770,6 @@ struct Worker { # This mode is intended for local testing purposes. localDisk @12 :Text; - # ** EXPERIMENTAL; SUBJECT TO BACKWARDS-INCOMPATIBLE CHANGE ** - # # Durable Object data will be stored in a directory on local disk. This field is the name of # a service, which must be a DiskDirectory service. For each Durable Object class, a # subdirectory will be created using `uniqueKey` as the name. Within the directory, one or @@ -710,11 +777,14 @@ struct Worker { # a number of different extensions depending on the storage mode. (Currently, the main storage # is a file with the extension `.sqlite`, and in certain situations extra files with the # extensions `.sqlite-wal`, and `.sqlite-shm` may also be present.) + # + # Note that if workerd has been given a `ClusterConfig` at the top level, then it is expected + # this disk directory is shared by the whole cluster (e.g. via NFSv4), and workerd will + # automatically ensure that each Durable Object is scheduled on a single machine. When using + # NFSv4, all Durable Object directories MUST be on the SAME VOLUME as the registry directory + # defined in `ClusterConfig`, so that lease loss affects all files and locks simultaneously. } - # TODO(someday): Support distributing objects across a cluster. At present, objects are always - # local to one instance of the runtime. - moduleFallback @13 :Text; tails @14 :List(ServiceDesignator); @@ -873,6 +943,12 @@ struct DiskDirectory { # is no acceptable format for these, regardless of what the client says it accepts). # # `HEAD` requests are properly optimized to perform a stat() without actually opening the file. + # + # If workerd has been given a `ClusterConfig` at the top level, it is important that any + # `DiskDirectory` is set up in such a way that a request that uses it can be served correctly + # from any instance in the cluster. For read-only directories, this just means they all have to + # contain the same data. For writable directories, you may want to use a shared filesystem like + # NFSv4. path @0 :Text; # The filesystem path of the directory. If not specified, then it must be specified on the diff --git a/src/workerd/util/BUILD.bazel b/src/workerd/util/BUILD.bazel index 9f8ded578bc..1c485380200 100644 --- a/src/workerd/util/BUILD.bazel +++ b/src/workerd/util/BUILD.bazel @@ -453,3 +453,16 @@ kj_test( src = "state-machine-test.c++", deps = [":state-machine"], ) + +wd_cc_library( + name = "ofd-lock", + srcs = ["ofd-lock.c++"], + hdrs = ["ofd-lock.h"], + visibility = ["//visibility:public"], + deps = ["@capnp-cpp//src/kj"], +) + +kj_test( + src = "ofd-lock-test.c++", + deps = [":ofd-lock"], +) diff --git a/src/workerd/util/ofd-lock-test.c++ b/src/workerd/util/ofd-lock-test.c++ new file mode 100644 index 00000000000..aa827c444b2 --- /dev/null +++ b/src/workerd/util/ofd-lock-test.c++ @@ -0,0 +1,182 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "ofd-lock.h" + +#include +#include +#include + +#if !_WIN32 +#include +#include +#include +#endif + +namespace workerd { +namespace { + +#ifdef __linux__ + +kj::OwnFd openTempFile() { + char path[] = "/tmp/ofd-lock-test-XXXXXX"; + int fd = mkstemp(path); + KJ_SYSCALL(fd); + unlink(path); + return kj::OwnFd(fd); +} + +KJ_TEST("exclusive lock conflicts with exclusive lock") { + auto fd1 = openTempFile(); + + // Open a second fd to the same file via /proc/self/fd. + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + + // Second exclusive lock on a different fd to the same file should fail. + KJ_EXPECT(OfdLock::tryLock(fd2, OfdLock::EXCLUSIVE) == kj::none); +} + +KJ_TEST("shared lock conflicts with exclusive lock") { + auto fd1 = openTempFile(); + + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + + // Shared lock should also fail when exclusive is held. + KJ_EXPECT(OfdLock::tryLock(fd2, OfdLock::SHARED) == kj::none); +} + +KJ_TEST("shared locks do not conflict with each other") { + auto fd1 = openTempFile(); + + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::SHARED)); + auto lock2 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd2, OfdLock::SHARED)); + // Both shared locks held simultaneously — no conflict. +} + +KJ_TEST("exclusive lock blocks while shared lock is held") { + auto fd1 = openTempFile(); + + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::SHARED)); + + // Exclusive lock should fail when shared lock is held by a different fd. + KJ_EXPECT(OfdLock::tryLock(fd2, OfdLock::EXCLUSIVE) == kj::none); +} + +KJ_TEST("releasing lock allows re-acquisition") { + auto fd1 = openTempFile(); + + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + { + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + // lock1 goes out of scope here, releasing the lock. + } + + // Now fd2 should be able to acquire an exclusive lock. + auto lock2 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd2, OfdLock::EXCLUSIVE)); +} + +KJ_TEST("move constructor transfers lock") { + auto fd1 = openTempFile(); + + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + + // Move the lock. + auto lock2 = kj::mv(lock1); + + // The lock should still be held (fd2 can't get it). + KJ_EXPECT(OfdLock::tryLock(fd2, OfdLock::EXCLUSIVE) == kj::none); +} + +KJ_TEST("move constructor releases on destruction of target") { + auto fd1 = openTempFile(); + + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + { + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + auto lock2 = kj::mv(lock1); + // lock2 goes out of scope, releasing the lock. + } + + // fd2 should now succeed. + auto lock3 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd2, OfdLock::EXCLUSIVE)); +} + +KJ_TEST("move assignment transfers lock") { + auto fd1 = openTempFile(); + + auto procPath = kj::str("/proc/self/fd/", fd1.get()); + int rawFd2; + KJ_SYSCALL(rawFd2 = open(procPath.cStr(), O_RDWR)); + auto fd2 = kj::OwnFd(rawFd2); + + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + + // Create a second lock on fd2... wait, we can't because fd1 already holds exclusive. + // Instead, test move assignment by creating a dummy that gets replaced. + auto fd3 = openTempFile(); + auto lock3 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd3, OfdLock::EXCLUSIVE)); + + lock3 = kj::mv(lock1); // Releases lock on fd3, takes lock from fd1. + + // fd2 still can't get the lock (lock on fd1 was transferred to lock3). + KJ_EXPECT(OfdLock::tryLock(fd2, OfdLock::EXCLUSIVE) == kj::none); +} + +KJ_TEST("same fd can re-lock (lock is per open file description)") { + auto fd1 = openTempFile(); + + // On the same fd, acquiring the same lock type again is a no-op re-lock, not a conflict. + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + auto lock2 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + // Both succeed because OFD locks are per open file description, and this is the same fd. +} + +KJ_TEST("verifyHeld succeeds when lock is held") { + auto fd1 = openTempFile(); + auto lock1 = KJ_ASSERT_NONNULL(OfdLock::tryLock(fd1, OfdLock::EXCLUSIVE)); + // Should not throw. + lock1.verifyHeld(); +} + +#else // #if __linux__ + +KJ_TEST("dummy test: platform not supported") {} + +#endif // #if __linux__, #else + +} // namespace +} // namespace workerd diff --git a/src/workerd/util/ofd-lock.c++ b/src/workerd/util/ofd-lock.c++ new file mode 100644 index 00000000000..96ad38aee9b --- /dev/null +++ b/src/workerd/util/ofd-lock.c++ @@ -0,0 +1,111 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "ofd-lock.h" + +#include + +#ifdef __linux__ +#include +#include +#endif + +namespace workerd { + +#ifdef __linux__ + +namespace { + +struct flock makeFlock(short type) { + struct flock fl = {}; + fl.l_type = type; + fl.l_whence = SEEK_SET; + fl.l_start = 0; + fl.l_len = 0; // Lock the entire file. + return fl; +} + +} // namespace + +kj::Maybe OfdLock::tryLock(int fd, Type type) { + auto fl = makeFlock(type == SHARED ? F_RDLCK : F_WRLCK); + if (fcntl(fd, F_OFD_SETLK, &fl) == -1) { + int err = errno; + if (err == EAGAIN || err == EACCES) { + // Lock is held by another open file description. + return kj::none; + } + KJ_FAIL_SYSCALL("fcntl(F_OFD_SETLK)", err); + } + return OfdLock(fd, type); +} + +void OfdLock::verifyHeld() { + KJ_REQUIRE(fd >= 0, "OfdLock has been moved away"); + // Re-lock with the same type. If the underlying NFSv4 lock stateid has been invalidated + // (lease lost), the Linux NFS client surfaces this as EIO. If the stateid is still valid, + // the kernel treats the re-lock as a no-op. + auto fl = makeFlock(type == SHARED ? F_RDLCK : F_WRLCK); + KJ_SYSCALL(fcntl(fd, F_OFD_SETLK, &fl), "NFS lease may have been lost"); +} + +OfdLock::OfdLock(int fd, Type type): fd(fd), type(type) {} + +OfdLock::OfdLock(OfdLock&& other): fd(other.fd), type(other.type) { + other.fd = -1; +} + +OfdLock& OfdLock::operator=(OfdLock&& other) { + if (this != &other) { + // Release existing lock, if any. + if (fd >= 0) { + auto fl = makeFlock(F_UNLCK); + fcntl(fd, F_OFD_SETLK, &fl); + } + fd = other.fd; + type = other.type; + other.fd = -1; + } + return *this; +} + +OfdLock::~OfdLock() { + if (fd >= 0) { + auto fl = makeFlock(F_UNLCK); + fcntl(fd, F_OFD_SETLK, &fl); + } +} + +#else // !__linux__ + +kj::Maybe OfdLock::tryLock(int fd, Type type) { + KJ_FAIL_REQUIRE("OFD locks are only supported on Linux. " + "Cluster mode is currently implemented only on Linux."); +} + +void OfdLock::verifyHeld() { + KJ_FAIL_REQUIRE("OFD locks are only supported on Linux. " + "Cluster mode is currently implemented only on Linux."); +} + +OfdLock::OfdLock(int fd, Type type): fd(fd), type(type) {} + +OfdLock::OfdLock(OfdLock&& other): fd(other.fd), type(other.type) { + other.fd = -1; +} + +OfdLock& OfdLock::operator=(OfdLock&& other) { + if (this != &other) { + fd = other.fd; + type = other.type; + other.fd = -1; + } + return *this; +} + +OfdLock::~OfdLock() {} + +#endif // __linux__ + +} // namespace workerd diff --git a/src/workerd/util/ofd-lock.h b/src/workerd/util/ofd-lock.h new file mode 100644 index 00000000000..aebe84b4c8f --- /dev/null +++ b/src/workerd/util/ofd-lock.h @@ -0,0 +1,48 @@ +// Copyright (c) 2026 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#pragma once + +#include +#include + +namespace workerd { + +class OfdLock { + // RAII holder for a lock on an open file description. Releasing this object releases the lock. + // + // Uses Linux OFD (Open File Description) locks via fcntl(F_OFD_SETLK). OFD locks are per-fd + // rather than per-process, which is essential since a single workerd process may manage many + // Durable Objects concurrently. + public: + enum Type { SHARED, EXCLUSIVE }; + + // Try to acquire a lock (non-blocking). Returns kj::none if the lock is held in a conflicting + // mode by another open file description. + // + // On non-Linux platforms, fails at runtime with KJ_FAIL_REQUIRE rather than a compile error, + // so non-Linux builds still compile when cluster mode is unused. + static kj::Maybe tryLock(int fd, Type type); + + // Verify that a lock that this open file description was previously granted is still held. + // Specifically, this is meant to check for NFSv4 lease loss. Implemented as a no-op + // F_OFD_SETLK of the same lock type on the same fd: if the underlying NFSv4 lock stateid + // has been invalidated (lease lost), the Linux NFS client surfaces this as EIO and this + // function throws; if the stateid is still valid, the kernel treats the re-lock as a + // no-op and we return cleanly. + void verifyHeld(); + + OfdLock(OfdLock&& other); + OfdLock& operator=(OfdLock&& other); + KJ_DISALLOW_COPY(OfdLock); + + ~OfdLock(); // releases via F_OFD_SETLK with F_UNLCK + + private: + int fd; // -1 after move + Type type; + explicit OfdLock(int fd, Type type); +}; + +} // namespace workerd diff --git a/src/workerd/util/sqlite.c++ b/src/workerd/util/sqlite.c++ index a65a7db84bb..35cee8a45b0 100644 --- a/src/workerd/util/sqlite.c++ +++ b/src/workerd/util/sqlite.c++ @@ -1982,7 +1982,7 @@ sqlite3_vfs SqliteDatabase::Vfs::makeWrappedNativeVfs() { file->pMethods = nullptr; // Set up currentVfsRoot. - auto& self = *reinterpret_cast(vfs->pAppData); + auto& self = *reinterpret_cast(vfs->pAppData); KJ_ASSERT(currentVfsRoot == AT_FDCWD); currentVfsRoot = self.rootFd; KJ_DEFER(currentVfsRoot = AT_FDCWD); @@ -1994,6 +1994,16 @@ sqlite3_vfs SqliteDatabase::Vfs::makeWrappedNativeVfs() { if (file->pMethods == nullptr) { wrapper->pMethods = nullptr; } else { + KJ_IF_SOME(afterOpen, self.options.afterOpen) { + try { + afterOpen(); + } catch (kj::Exception& e) { + reportVfsErrorCaught(kj::mv(e)); + file->pMethods->xClose(file); + wrapper->pMethods = nullptr; + return SQLITE_CANTOPEN; + } + } wrapper->pMethods = &WrappedNativeFileImpl::METHOD_TABLE; wrapper->vfs = &self; wrapper->rootFd = self.rootFd; @@ -2298,7 +2308,7 @@ sqlite3_vfs SqliteDatabase::Vfs::makeKjVfs() { .pAppData = this, #define WRAP_METHOD(errorCode, block) \ - auto& self KJ_UNUSED = *static_cast(vfs->pAppData); \ + auto& self KJ_UNUSED = *static_cast(vfs->pAppData); \ try block catch (kj::Exception& e) { \ KJ_LOG(ERROR, "SQLite VFS I/O error", e); \ return errorCode; \ @@ -2315,6 +2325,11 @@ sqlite3_vfs SqliteDatabase::Vfs::makeKjVfs() { auto path = kj::Path::parse(zName); auto kjFile = KJ_UNWRAP_OR(self.directory.tryOpenFile(path), { return SQLITE_CANTOPEN; }); + KJ_IF_SOME(afterOpen, self.options.afterOpen) { + afterOpen(); + } else { + // sqelch spurious "dangling else" warning from clang + } kj::Maybe> lock; if (flags & SQLITE_OPEN_MAIN_DB) { lock = self.lockManager.lock(path, *kjFile); @@ -2330,6 +2345,11 @@ sqlite3_vfs SqliteDatabase::Vfs::makeKjVfs() { KJ_ASSERT(flags & SQLITE_OPEN_DELETEONCLOSE); KJ_ASSERT(!(flags & SQLITE_OPEN_MAIN_DB), "main DB can't be a temporary file"); kjFile = self.directory.createTemporary(); + KJ_IF_SOME(afterOpen, self.options.afterOpen) { + afterOpen(); + } else { + // sqelch spurious "dangling else" warning from clang + } } else { kj::WriteMode mode; if (flags & SQLITE_OPEN_CREATE) { @@ -2345,6 +2365,11 @@ sqlite3_vfs SqliteDatabase::Vfs::makeKjVfs() { auto path = kj::Path::parse(zName); kjFile = KJ_UNWRAP_OR(self.directory.tryOpenFile(path, mode), { return SQLITE_CANTOPEN; }); + KJ_IF_SOME(afterOpen, self.options.afterOpen) { + afterOpen(); + } else { + // sqelch spurious "dangling else" warning from clang + } if (flags & SQLITE_OPEN_MAIN_DB) { lock = self.lockManager.lock(path, *kjFile); } diff --git a/src/workerd/util/sqlite.h b/src/workerd/util/sqlite.h index 64623d5dbd0..be8448cc43a 100644 --- a/src/workerd/util/sqlite.h +++ b/src/workerd/util/sqlite.h @@ -783,6 +783,11 @@ struct SqliteDatabase::VfsOptions { // will fall back to the native VFS implementation. In that case, the options you set here will // be ORed with the ones set by the underlying VFS. int deviceCharacteristics = 0x00001000; // = SQLITE_FCNTL_POWERSAFE_OVERWRITE + + // Called immediately after SQLite opens any file through this VFS. This is used by clustered + // Durable Object storage to verify that the original NFS lease is still valid and therefore + // covers the newly-opened file. + kj::Maybe> afterOpen; }; // Implements a SQLite VFS based on a KJ directory.