diff --git a/samples/tcp-ingress/README.md b/samples/tcp-ingress/README.md deleted file mode 100644 index fe332ffe8e1..00000000000 --- a/samples/tcp-ingress/README.md +++ /dev/null @@ -1,10 +0,0 @@ -# REPL Server - -This sample contains a simple TCP server based on the connect() handler. When a connection gets -established on port 8081, it simply pipes the input stream to the output. - -## How to use -``` -./bazel-bin/src/workerd/server/workerd serve samples/tcp-ingress/config.capnp --experimental -echo "Hello World!" | nc localhost 8081 -``` diff --git a/samples/tcp-ingress/config.capnp b/samples/tcp-ingress/config.capnp deleted file mode 100644 index ab33c6ca405..00000000000 --- a/samples/tcp-ingress/config.capnp +++ /dev/null @@ -1,20 +0,0 @@ -using Workerd = import "/workerd/workerd.capnp"; - -const tcpIngressExample :Workerd.Config = ( - services = [ - (name = "main", worker = .worker), - ], - - sockets = [ - ( name = "http", address = "*:8080", http = (), service = "main" ), - ( name = "tcp", address = "*:8081", tcp = (), service = "main" ) - ] -); - -const worker :Workerd.Worker = ( - modules = [ - (name = "worker", esModule = embed "worker.js") - ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], - compatibilityDate = "2026-03-01", -); diff --git a/samples/tcp-ingress/worker.js b/samples/tcp-ingress/worker.js deleted file mode 100644 index 32aafb0b78d..00000000000 --- a/samples/tcp-ingress/worker.js +++ /dev/null @@ -1,11 +0,0 @@ - -export default { - async fetch(req) { - return new Response("ok"); - }, - - async connect(socket) { - // pipe the input stream to the output - await socket.readable.pipeTo(socket.writable); - } -}; diff --git a/src/workerd/api/actor.h b/src/workerd/api/actor.h index 7b2976e749e..39a6862268e 100644 --- a/src/workerd/api/actor.h +++ b/src/workerd/api/actor.h @@ -108,7 +108,6 @@ class DurableObject final: public Fetcher { JSG_TS_DEFINE(interface DurableObject { fetch(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?(ws: WebSocket, message: string | ArrayBuffer): void | Promise; webSocketClose?(ws: WebSocket, code: number, reason: string, wasClean: boolean): void | Promise; @@ -116,7 +115,7 @@ class DurableObject final: public Fetcher { }); JSG_TS_OVERRIDE( type DurableObjectStub = - Fetcher + Fetcher & { readonly id: DurableObjectId; readonly name?: string; diff --git a/src/workerd/api/global-scope.c++ b/src/workerd/api/global-scope.c++ index ba4cb0c9f8e..28909d5609d 100644 --- a/src/workerd/api/global-scope.c++ +++ b/src/workerd/api/global-scope.c++ @@ -15,7 +15,6 @@ #endif #include #include -#include #include #include #include @@ -87,7 +86,6 @@ jsg::LenientOptional mapAddRef(jsg::Lock& js, jsg::LenientOptional& functi ExportedHandler ExportedHandler::clone(jsg::Lock& js) { return ExportedHandler{ .fetch{mapAddRef(js, fetch)}, - .connect{mapAddRef(js, connect)}, .tail{mapAddRef(js, tail)}, .trace{mapAddRef(js, trace)}, .tailStream{mapAddRef(js, tailStream)}, @@ -120,49 +118,6 @@ void ServiceWorkerGlobalScope::clear() { unhandledRejections.clear(); } -kj::Promise ServiceWorkerGlobalScope::connect(kj::String host, - const kj::HttpHeaders& headers, - kj::AsyncIoStream& connection, - kj::HttpService::ConnectResponse& response, - Worker::Lock& lock, - kj::Maybe exportedHandler) { - ExportedHandler& eh = JSG_REQUIRE_NONNULL(exportedHandler, Error, - "Connect ingress is not currently supported with Service Workers syntax."); - KJ_REQUIRE(FeatureFlags::get(lock).getWorkerdExperimental(), - "connect handling requires the experimental flag."); - - KJ_IF_SOME(handler, eh.connect) { - // Has a connect handler! - response.accept(200, "OK", headers); - - // Using neuterable stream to manage lifetime of stream promises - auto ownConnection = newNeuterableIoStream(connection); - - auto& ioContext = IoContext::current(); - jsg::Lock& js = lock; - - // TLS support is not implemented so far. Note that setupSocket() expects the domain parameter - // to be set to the expected host name using startTLS, so that it can be provided to the TLS - // callback, so we'd need to change that or figure out a way to get the host domain. - auto nullTlsStarter = kj::heap(); - // We set isDefaultFetchPort to false here – sockets.c++ sets it for ports 443 and 8080 to - // provide a more descriptive error message for HTTP, but this is not relevant on the TCP server - // side. - jsg::Ref jsSocket = setupSocket(js, kj::mv(ownConnection), kj::none, kj::none, - kj::mv(nullTlsStarter), SecureTransportKind::OFF, kj::none, false, kj::none); - // handleProxyStatus() is required to indicate that the socket was opened properly. Since the - // connection is already open at this point, exception handling is not required. - jsSocket->handleProxyStatus(js, kj::Promise>(kj::none)); - - kj::Maybe span = ioContext.makeTraceSpan("connect_handler"_kjc); - auto promise = handler(js, kj::mv(jsSocket), eh.env.addRef(js), eh.getCtx()); - return ioContext.awaitJs(js, kj::mv(promise)).attach(kj::mv(span)); - } - lock.logWarningOnce("Received a connect event but we lack a handler. " - "Did you remember to export a connect() function?"); - JSG_FAIL_REQUIRE(Error, "Handler does not export a connect() function."); -} - kj::Promise> ServiceWorkerGlobalScope::request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, diff --git a/src/workerd/api/global-scope.h b/src/workerd/api/global-scope.h index ffaf66fac9e..a25fb487818 100644 --- a/src/workerd/api/global-scope.h +++ b/src/workerd/api/global-scope.h @@ -357,10 +357,6 @@ struct ExportedHandler { jsg::Optional> ctx); jsg::LenientOptional> fetch; - using ConnectHandler = jsg::Promise( - jsg::Ref socket, jsg::Value env, jsg::Optional> ctx); - jsg::LenientOptional> connect; - using TailHandler = kj::Promise(kj::Array> events, jsg::Value env, jsg::Optional> ctx); @@ -400,7 +396,6 @@ struct ExportedHandler { jsg::SelfRef self; JSG_STRUCT(fetch, - connect, tail, trace, tailStream, @@ -418,7 +413,6 @@ struct ExportedHandler { JSG_STRUCT_TS_DEFINE( type ExportedHandlerFetchHandler = (request: Request>, env: Env, ctx: ExecutionContext) => Response | Promise; - type ExportedHandlerConnectHandler = (socket: Socket, env: Env, ctx: ExecutionContext) => void | Promise; type ExportedHandlerTailHandler = (events: TraceItem[], env: Env, ctx: ExecutionContext) => void | Promise; type ExportedHandlerTraceHandler = (traces: TraceItem[], env: Env, ctx: ExecutionContext) => void | Promise; type ExportedHandlerTailStreamHandler = (event : TailStream.TailEvent, env: Env, ctx: ExecutionContext) => TailStream.TailEventHandlerType | Promise; @@ -429,7 +423,6 @@ struct ExportedHandler { JSG_STRUCT_TS_OVERRIDE( { email?: EmailExportedHandler; fetch?: ExportedHandlerFetchHandler; - connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -529,14 +522,6 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope { // TODO(cleanup): Factor out the shared code used between old-style event listeners vs. module // exports and move that code somewhere more appropriate. - // Received TCP/socket ingress (called from C++, not JS). - kj::Promise connect(kj::String host, - const kj::HttpHeaders& headers, - kj::AsyncIoStream& connection, - kj::HttpService::ConnectResponse& response, - Worker::Lock& lock, - kj::Maybe exportedHandler); - // Received sendTraces (called from C++, not JS). void sendTraces(kj::ArrayPtr> traces, Worker::Lock& lock, diff --git a/src/workerd/api/sockets.c++ b/src/workerd/api/sockets.c++ index ebfcd6b4ccb..874af6dece8 100644 --- a/src/workerd/api/sockets.c++ +++ b/src/workerd/api/sockets.c++ @@ -84,11 +84,11 @@ class StreamWorkerInterface; jsg::Ref setupSocket(jsg::Lock& js, kj::Own connection, - kj::Maybe remoteAddress, + kj::String remoteAddress, jsg::Optional options, kj::Own tlsStarter, SecureTransportKind secureTransport, - kj::Maybe domain, + kj::String domain, bool isDefaultFetchPort, kj::Maybe> maybeOpenedPrPair) { auto& ioContext = IoContext::current(); @@ -323,10 +323,10 @@ jsg::Ref Socket::startTls(jsg::Lock& js, jsg::Optional tlsOp secureTransport != SecureTransportKind::ON, TypeError, "Cannot startTls on a TLS socket."); JSG_REQUIRE(connectionData != kj::none, TypeError, "The connection was closed before startTls could be started."); + JSG_REQUIRE(domain != nullptr, TypeError, "startTls can only be called once."); auto invalidOptKindMsg = "The `secureTransport` socket option must be set to 'starttls' for startTls to be used."; JSG_REQUIRE(secureTransport == SecureTransportKind::STARTTLS, TypeError, invalidOptKindMsg); - JSG_REQUIRE(domain != kj::none, TypeError, "startTls can only be called once."); // The current socket's writable buffers need to be flushed. The socket's WritableStream is backed // by an AsyncIoStream which doesn't implement any buffering, so we don't need to worry about @@ -346,10 +346,10 @@ jsg::Ref Socket::startTls(jsg::Lock& js, jsg::Optional tlsOp // flush to complete. While it is unlikely to be GC'd while we are waiting because // the user code *likely* is holding a active reference to it at this point, we // don't want to take any chances. This prevents a possible UAF. - JSG_VISITABLE_LAMBDA((self = JSG_THIS, domain = kj::heapString(KJ_ASSERT_NONNULL(domain)), - tlsOptions = kj::mv(tlsOptions), - openedResolver = openedPrPair.resolver.addRef(js), - remoteAddress = mapCopyString(remoteAddress)), + JSG_VISITABLE_LAMBDA( + (self = JSG_THIS, domain = kj::heapString(domain), tlsOptions = kj::mv(tlsOptions), + openedResolver = openedPrPair.resolver.addRef(js), + remoteAddress = kj::str(remoteAddress)), (self, openedResolver), (jsg::Lock & js) mutable { auto& context = IoContext::current(); @@ -381,7 +381,7 @@ jsg::Ref Socket::startTls(jsg::Lock& js, jsg::Optional tlsOp // Fork the starter promise because we need to create two separate things waiting // on it below. The first is resolving the openedResolver with a JS promise that - // wraps one branch, the second is the kj::Promise that we use to resolve the + // wraps one branch, the secnod is the kj::Promise that we use to resolve the // secureStream for the promised stream. This keeps us from having to bounce in and // out of the JS isolate lock. auto forkedPromise = KJ_ASSERT_NONNULL(*tlsStarter)(acceptedHostname).fork(); @@ -410,9 +410,9 @@ jsg::Ref Socket::startTls(jsg::Lock& js, jsg::Optional tlsOp // The existing tlsStarter gets consumed and we won't need it again. Pass in an empty tlsStarter // to `setupSocket`. auto newTlsStarter = kj::heap(); - return setupSocket(js, kj::newPromisedStream(kj::mv(secureStreamPromise)), - mapCopyString(remoteAddress), kj::mv(options), kj::mv(newTlsStarter), SecureTransportKind::ON, - kj::mv(domain), isDefaultFetchPort, kj::mv(openedPrPair)); + return setupSocket(js, kj::newPromisedStream(kj::mv(secureStreamPromise)), kj::str(remoteAddress), + kj::mv(options), kj::mv(newTlsStarter), SecureTransportKind::ON, kj::mv(domain), + isDefaultFetchPort, kj::mv(openedPrPair)); } void Socket::handleProxyStatus( @@ -436,7 +436,7 @@ void Socket::handleProxyStatus( if (isDefaultFetchPort) { msg = kj::str(msg, ". It looks like you might be trying to connect to a HTTP-based service", " — consider using fetch instead"); - } else if (remoteAddress.orDefault(kj::String()).contains(".hyperdrive.local"_kj)) { + } else if (remoteAddress.contains(".hyperdrive.local"_kj)) { // No attempts to connect to Hyperdrive should end up here, since they go through the other // version of handleProxyStatus. If they end up here somehow, log about it to get some // context that can aid in debugging. @@ -450,7 +450,7 @@ void Socket::handleProxyStatus( // because there's no useful value we can provide. openedResolver.resolve(js, SocketInfo{ - .remoteAddress = mapCopyString(remoteAddress), + .remoteAddress = kj::str(remoteAddress), .localAddress = kj::none, }); } @@ -478,7 +478,7 @@ void Socket::handleProxyStatus(jsg::Lock& js, kj::Promise>> connectionStream, - kj::Maybe remoteAddress, + kj::String remoteAddress, jsg::Ref readableParam, jsg::Ref writable, jsg::PromiseResolverPair closedPrPair, @@ -68,7 +68,7 @@ class Socket: public jsg::Object { jsg::Optional options, kj::Own tlsStarter, SecureTransportKind secureTransport, - kj::Maybe domain, + kj::String domain, bool isDefaultFetchPort, jsg::PromiseResolverPair openedPrPair) : connectionData(context.addObject(kj::heap( @@ -200,12 +200,12 @@ class Socket: public jsg::Object { // Memoized copy that is returned by the `closed` attribute. jsg::MemoizedIdentity> closedPromise; jsg::Optional options; - kj::Maybe remoteAddress; + kj::String remoteAddress; // Set to true when the socket is upgraded to a secure one. bool upgraded = false; SecureTransportKind secureTransport; // The domain/ip this socket is connected to. Used for startTls. - kj::Maybe domain; + kj::String domain; // Whether the port this socket connected to is 80/443. Used for nicer errors. bool isDefaultFetchPort; // This fulfiller is used to resolve the `openedPromise` below. @@ -245,11 +245,11 @@ class Socket: public jsg::Object { jsg::Ref setupSocket(jsg::Lock& js, kj::Own connection, - kj::Maybe remoteAddress, + kj::String remoteAddress, jsg::Optional options, kj::Own tlsStarter, SecureTransportKind secureTransport, - kj::Maybe domain, + kj::String domain, bool isDefaultFetchPort, kj::Maybe> maybeOpenedPrPair); diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index 853f12049cd..b1be96654d5 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -31,18 +31,6 @@ wd_test( data = ["delete-all-deletes-alarm-test.js"], ) -wd_test( - src = "connect-handler-test.wd-test", - args = ["--experimental"], - data = [ - "connect-handler-test.js", - "connect-handler-test-proxy.js", - ], - # Test uses TCP sockets for ports 8081-8083 and may fail when running concurrently with other - # tests that do so. - tags = ["exclusive"], -) - wd_test( src = "actor-alarms-test.wd-test", args = ["--experimental"], @@ -66,10 +54,7 @@ wd_test( "tail-worker-test-invalid.js", "tail-worker-test-jsrpc.js", "websocket-hibernation.js", - "connect-handler-test.js", - "connect-handler-test-proxy.js", ], - tags = ["exclusive"], ) # Test to validate timing semantics for JSRPC streaming responses. @@ -380,7 +365,6 @@ wd_test( src = "js-rpc-test.wd-test", args = ["--experimental"], data = ["js-rpc-test.js"], - tags = ["exclusive"], ) wd_test( @@ -631,7 +615,6 @@ wd_test( "--no-verbose", ], data = ["js-rpc-test.js"], - tags = ["exclusive"], ) wd_test( diff --git a/src/workerd/api/tests/connect-handler-test-proxy.js b/src/workerd/api/tests/connect-handler-test-proxy.js deleted file mode 100644 index af23f67b995..00000000000 --- a/src/workerd/api/tests/connect-handler-test-proxy.js +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright (c) 2026 Cloudflare, Inc. -// Licensed under the Apache 2.0 license found in the LICENSE file or at: -// https://opensource.org/licenses/Apache-2.0 -import { connect } from 'cloudflare:sockets'; -import { WorkerEntrypoint } from 'cloudflare:workers'; - -export class ConnectProxy extends WorkerEntrypoint { - async connect(socket) { - // proxy for ConnectEndpoint instance on port 8083. - let upstream = connect('localhost:8083'); - await Promise.all([ - socket.readable.pipeTo(upstream.writable), - upstream.readable.pipeTo(socket.writable), - ]); - } -} - -export class ConnectEndpoint extends WorkerEntrypoint { - async connect(socket) { - const enc = new TextEncoder(); - let writer = socket.writable.getWriter(); - await writer.write(enc.encode('hello-from-endpoint')); - await writer.close(); - } -} diff --git a/src/workerd/api/tests/connect-handler-test.js b/src/workerd/api/tests/connect-handler-test.js deleted file mode 100644 index f21a2930144..00000000000 --- a/src/workerd/api/tests/connect-handler-test.js +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) 2026 Cloudflare, Inc. -// Licensed under the Apache 2.0 license found in the LICENSE file or at: -// https://opensource.org/licenses/Apache-2.0 -import { connect } from 'cloudflare:sockets'; -import { strictEqual } from 'assert'; - -export const connectHandler = { - async test() { - // Check that the connect handler can send a message through a socket - const socket = connect('localhost:8081'); - await socket.opened; - const dec = new TextDecoder(); - let result = ''; - for await (const chunk of socket.readable) { - result += dec.decode(chunk, { stream: true }); - } - result += dec.decode(); - strictEqual(result, 'hello'); - await socket.closed; - }, -}; - -export const connectHandlerProxy = { - async test() { - // Check that we can get a message proxied through a connect handler. This call connects us with - // an instance of Server, which serves as a proxy for an instance of OtherServer, as defined in - // connect-handler-test-proxy.js. - const socket = connect('localhost:8082'); - await socket.opened; - const dec = new TextDecoder(); - let result = ''; - for await (const chunk of socket.readable) { - result += dec.decode(chunk, { stream: true }); - } - result += dec.decode(); - strictEqual(result, 'hello-from-endpoint'); - await socket.closed; - }, -}; - -export default { - async connect(socket) { - const enc = new TextEncoder(); - let writer = socket.writable.getWriter(); - await writer.write(enc.encode('hello')); - await writer.close(); - }, -}; diff --git a/src/workerd/api/tests/connect-handler-test.wd-test b/src/workerd/api/tests/connect-handler-test.wd-test deleted file mode 100644 index 98b517a861a..00000000000 --- a/src/workerd/api/tests/connect-handler-test.wd-test +++ /dev/null @@ -1,36 +0,0 @@ -using Workerd = import "/workerd/workerd.capnp"; - -const unitTests :Workerd.Config = ( - services = [ - ( name = "connect-handler-test", - worker = ( - modules = [ - (name = "worker", esModule = embed "connect-handler-test.js"), - ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], - ) - ), - ( name = "connect-handler-test-proxy", - worker = ( - modules = [ - (name = "worker", esModule = embed "connect-handler-test-proxy.js"), - ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], - ) - ), - ( name = "connect-handler-test-endpoint", - worker = ( - modules = [ - (name = "worker", esModule = embed "connect-handler-test-proxy.js"), - ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], - ) - ), - ( name = "internet", network = ( allow = ["private"] ) ) - ], - sockets = [ - (name = "tcp", address = "*:8081", tcp = (), service = "connect-handler-test"), - (name = "tcp", address = "*:8082", tcp = (), service = (name = "connect-handler-test-proxy", entrypoint = "ConnectProxy")), - (name = "tcp", address = "*:8083", tcp = (), service = (name = "connect-handler-test-endpoint", entrypoint = "ConnectEndpoint")) - ] -); diff --git a/src/workerd/api/tests/js-rpc-socket-test.wd-test b/src/workerd/api/tests/js-rpc-socket-test.wd-test index 4c871606ed7..a13544d8cf6 100644 --- a/src/workerd/api/tests/js-rpc-socket-test.wd-test +++ b/src/workerd/api/tests/js-rpc-socket-test.wd-test @@ -72,7 +72,6 @@ const unitTests :Workerd.Config = ( http = (capnpConnectHost = "cappy") ) ), - ( name = "internet", network = ( allow = ["private"] ) ), ], sockets = [ ( name = "MyService-loop", @@ -100,8 +99,6 @@ const unitTests :Workerd.Config = ( service = (name = "js-rpc-test", entrypoint = "GreeterFactory"), http = (capnpConnectHost = "cappy") ), - # For testing connect() handler - (name = "tcp", address = "*:8081", tcp = (), service = "js-rpc-test") ], v8Flags = [ "--expose-gc" ], ); diff --git a/src/workerd/api/tests/js-rpc-test.js b/src/workerd/api/tests/js-rpc-test.js index 4558cd517fa..a58506b249a 100644 --- a/src/workerd/api/tests/js-rpc-test.js +++ b/src/workerd/api/tests/js-rpc-test.js @@ -197,13 +197,6 @@ export class MyService extends WorkerEntrypoint { return new Response('method = ' + req.method + ', url = ' + req.url); } - async connect(socket) { - const enc = new TextEncoder(); - let writer = socket.writable.getWriter(); - await writer.write(enc.encode('hello')); - await writer.close(); - } - // Define a property to test behavior of property accessors. get nonFunctionProperty() { return { foo: 123 }; @@ -641,20 +634,6 @@ export let extendingEntrypointClasses = { assert.equal(svc instanceof WorkerEntrypoint, true); }, }; -export let connectBinding = { - async test(controller, env, ctx) { - let socket = await env.MyService.connect('localhost:8081'); - await socket.opened; - const dec = new TextDecoder(); - let result = ''; - for await (const chunk of socket.readable) { - result += dec.decode(chunk, { stream: true }); - } - result += dec.decode(); - assert.strictEqual(result, 'hello'); - await socket.closed; - }, -}; export let namedServiceBinding = { async test(controller, env, ctx) { diff --git a/src/workerd/api/tests/tail-worker-test-receiver.js b/src/workerd/api/tests/tail-worker-test-receiver.js index 7cbf5082685..a71d5dfdeca 100644 --- a/src/workerd/api/tests/tail-worker-test-receiver.js +++ b/src/workerd/api/tests/tail-worker-test-receiver.js @@ -32,7 +32,7 @@ export const test = { // The shared tail worker we configured only produces onset and outcome events, so every trace is identical here. // Number of traces based on how often main tail worker is invoked from previous tests - let numTraces = 32; + let numTraces = 29; let basicTrace = '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"trace","traces":[]}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}'; assert.deepStrictEqual( diff --git a/src/workerd/api/tests/tail-worker-test.js b/src/workerd/api/tests/tail-worker-test.js index 434d8357db0..a99f08bb12b 100644 --- a/src/workerd/api/tests/tail-worker-test.js +++ b/src/workerd/api/tests/tail-worker-test.js @@ -132,10 +132,6 @@ export const test = { // Test for transient objects - getCounter returns an object with methods // All transient calls happen in a single trace event, with only the entrypoint method reported '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"MyService","scriptTags":[],"info":{"type":"jsrpc"}}{"type":"attributes","info":[{"name":"jsrpc.method","value":"getCounter"}]}{"type":"log","level":"log","message":["bar"]}{"type":"log","level":"log","message":["getCounter called"]}{"type":"return"}{"type":"log","level":"log","message":["increment called on transient"]}{"type":"log","level":"log","message":["getValue called on transient"]}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', - // tests/connect-handler-test.js: connect events - '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"connectHandler","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"connect","spanId":"0000000000000001"}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', - '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","scriptTags":[],"info":{"type":"connect"}}{"type":"return"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', - '{"type":"onset","executionModel":"stateless","spanId":"0000000000000000","entrypoint":"connectHandlerProxy","scriptTags":[],"info":{"type":"custom"}}{"type":"spanOpen","name":"connect","spanId":"0000000000000001"}{"type":"spanClose","outcome":"ok"}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}', ]; assert.deepStrictEqual(response, expected); diff --git a/src/workerd/api/tests/tail-worker-test.wd-test b/src/workerd/api/tests/tail-worker-test.wd-test index 75aed0f3abe..f58e8f8d999 100644 --- a/src/workerd/api/tests/tail-worker-test.wd-test +++ b/src/workerd/api/tests/tail-worker-test.wd-test @@ -32,32 +32,6 @@ const unitTests :Workerd.Config = ( (name = "alarms", worker = .alarmsWorker), (name = "hiber", worker = .hiberWorker), (name = "js-rpc-test", worker = .jsRpcWorker), - ( name = "connect-handler-test", - worker = ( - modules = [ - (name = "worker", esModule = embed "connect-handler-test.js"), - ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], - streamingTails = ["log"], - ) - ), - ( name = "connect-handler-test-proxy", - worker = ( - modules = [ - (name = "worker", esModule = embed "connect-handler-test-proxy.js"), - ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], - ) - ), - ( name = "connect-handler-test-endpoint", - worker = ( - modules = [ - (name = "worker", esModule = embed "connect-handler-test-proxy.js"), - ], - compatibilityFlags = ["nodejs_compat_v2", "experimental"], - ) - ), - ( name = "internet", network = ( allow = ["private"] ) ), (name = "TEST_TMPDIR", disk = (writable = true)), # Dummy buffered tail worker (gets traces from alarms worker and produces trace for main tracer) (name = "buffered", worker = .logBuffered, ), @@ -76,11 +50,6 @@ const unitTests :Workerd.Config = ( ), ) ], - sockets = [ - (name = "tcp", address = "*:8081", tcp = (), service = "connect-handler-test"), - (name = "tcp", address = "*:8082", tcp = (), service = (name = "connect-handler-test-proxy", entrypoint = "ConnectProxy")), - (name = "tcp", address = "*:8083", tcp = (), service = (name = "connect-handler-test-endpoint", entrypoint = "ConnectEndpoint")) - ] ); const alarmsWorker :Workerd.Worker = ( diff --git a/src/workerd/api/trace.c++ b/src/workerd/api/trace.c++ index 0885754f7d9..53cbd8f14fe 100644 --- a/src/workerd/api/trace.c++ +++ b/src/workerd/api/trace.c++ @@ -164,9 +164,6 @@ kj::Maybe getTraceEvent(jsg::Lock& js, const Trace& trace) KJ_CASE_ONEOF(scheduled, tracing::ScheduledEventInfo) { return kj::Maybe(js.alloc(trace, scheduled)); } - KJ_CASE_ONEOF(connect, tracing::ConnectEventInfo) { - return kj::Maybe(jsg::alloc(js, trace, connect)); - } KJ_CASE_ONEOF(alarm, tracing::AlarmEventInfo) { return kj::Maybe(js.alloc(trace, alarm)); } @@ -253,9 +250,6 @@ kj::Maybe TraceItem::getEvent(jsg::Lock& js) { KJ_CASE_ONEOF(info, jsg::Ref) { return info.addRef(); } - KJ_CASE_ONEOF(info, jsg::Ref) { - return info.addRef(); - } } KJ_UNREACHABLE; }); @@ -755,9 +749,6 @@ void TraceItem::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { KJ_CASE_ONEOF(info, jsg::Ref) { tracker.trackField("eventInfo", info); } - KJ_CASE_ONEOF(info, jsg::Ref) { - tracker.trackField("eventInfo", info); - } } } for (const auto& log: logs) { @@ -816,7 +807,4 @@ void TraceItem::HibernatableWebSocketEventInfo::visitForMemoryInfo( } } -TraceItem::ConnectEventInfo::ConnectEventInfo( - jsg::Lock& js, const Trace& trace, const tracing::ConnectEventInfo& eventInfo) {} - } // namespace workerd::api diff --git a/src/workerd/api/trace.h b/src/workerd/api/trace.h index 1fb7f17e3b7..7f2a8c537a0 100644 --- a/src/workerd/api/trace.h +++ b/src/workerd/api/trace.h @@ -73,7 +73,6 @@ class TraceItem final: public jsg::Object { class FetchEventInfo; class JsRpcEventInfo; - class ConnectEventInfo; class ScheduledEventInfo; class AlarmEventInfo; class QueueEventInfo; @@ -86,7 +85,6 @@ class TraceItem final: public jsg::Object { using EventInfo = kj::OneOf, jsg::Ref, - jsg::Ref, jsg::Ref, jsg::Ref, jsg::Ref, @@ -289,14 +287,6 @@ class TraceItem::JsRpcEventInfo final: public jsg::Object { kj::String rpcMethod; }; -class TraceItem::ConnectEventInfo final: public jsg::Object { - public: - explicit ConnectEventInfo( - jsg::Lock& js, const Trace& trace, const tracing::ConnectEventInfo& eventInfo); - - JSG_RESOURCE_TYPE(ConnectEventInfo) {} -}; - class TraceItem::ScheduledEventInfo final: public jsg::Object { public: explicit ScheduledEventInfo(const Trace& trace, const tracing::ScheduledEventInfo& eventInfo); @@ -660,12 +650,12 @@ class TraceCustomEvent final: public WorkerInterface::CustomEvent { #define EW_TRACE_ISOLATE_TYPES \ api::ScriptVersion, api::TailEvent, api::TraceItem, api::TraceItem::AlarmEventInfo, \ - api::TraceItem::ConnectEventInfo, api::TraceItem::CustomEventInfo, \ - api::TraceItem::ScheduledEventInfo, api::TraceItem::QueueEventInfo, \ - api::TraceItem::EmailEventInfo, api::TraceItem::TailEventInfo, \ - api::TraceItem::TailEventInfo::TailItem, api::TraceItem::FetchEventInfo, \ - api::TraceItem::FetchEventInfo::Request, api::TraceItem::FetchEventInfo::Response, \ - api::TraceItem::JsRpcEventInfo, api::TraceItem::HibernatableWebSocketEventInfo, \ + api::TraceItem::CustomEventInfo, api::TraceItem::ScheduledEventInfo, \ + api::TraceItem::QueueEventInfo, api::TraceItem::EmailEventInfo, \ + api::TraceItem::TailEventInfo, api::TraceItem::TailEventInfo::TailItem, \ + api::TraceItem::FetchEventInfo, api::TraceItem::FetchEventInfo::Request, \ + api::TraceItem::FetchEventInfo::Response, api::TraceItem::JsRpcEventInfo, \ + api::TraceItem::HibernatableWebSocketEventInfo, \ api::TraceItem::HibernatableWebSocketEventInfo::Message, \ api::TraceItem::HibernatableWebSocketEventInfo::Close, \ api::TraceItem::HibernatableWebSocketEventInfo::Error, api::TraceLog, api::TraceException, \ diff --git a/src/workerd/io/trace-stream.c++ b/src/workerd/io/trace-stream.c++ index cb7dcae0dae..d743c3ac575 100644 --- a/src/workerd/io/trace-stream.c++ +++ b/src/workerd/io/trace-stream.c++ @@ -24,7 +24,6 @@ namespace { V(CFJSON, "cfJson") \ V(CLOSE, "close") \ V(CODE, "code") \ - V(CONNECT, "connect") \ V(COUNT, "count") \ V(CPUTIME, "cpuTime") \ V(CRON, "cron") \ @@ -293,12 +292,6 @@ jsg::JsValue ToJs(jsg::Lock& js, const HibernatableWebSocketEventInfo& info, Str return obj; } -jsg::JsValue ToJs(jsg::Lock& js, const ConnectEventInfo& info, StringCache& cache) { - auto obj = js.obj(); - obj.set(js, TYPE_STR, cache.get(js, CONNECT_STR)); - return obj; -} - jsg::JsValue ToJs(jsg::Lock& js, const CustomEventInfo& info, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, CUSTOM_STR)); @@ -392,9 +385,6 @@ jsg::JsValue ToJs(jsg::Lock& js, const Onset& onset, StringCache& cache) { KJ_CASE_ONEOF(hws, HibernatableWebSocketEventInfo) { obj.set(js, INFO_STR, ToJs(js, hws, cache)); } - KJ_CASE_ONEOF(connect, ConnectEventInfo) { - obj.set(js, INFO_STR, ToJs(js, connect, cache)); - } KJ_CASE_ONEOF(custom, CustomEventInfo) { obj.set(js, INFO_STR, ToJs(js, custom, cache)); } diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index 32f72e026b6..08f2734ba7d 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -326,16 +326,6 @@ static kj::HttpMethod validateMethod(capnp::HttpMethod method) { } // namespace -ConnectEventInfo::ConnectEventInfo() {} - -ConnectEventInfo::ConnectEventInfo(rpc::Trace::ConnectEventInfo::Reader reader) {} - -void ConnectEventInfo::copyTo(rpc::Trace::ConnectEventInfo::Builder builder) const {} - -ConnectEventInfo ConnectEventInfo::clone() const { - return ConnectEventInfo(); -} - FetchEventInfo::FetchEventInfo( kj::HttpMethod method, kj::String url, kj::String cfJson, kj::Array
headers) : method(method), @@ -791,10 +781,6 @@ void Trace::copyTo(rpc::Trace::Builder builder) const { auto jsRpcBuilder = eventInfoBuilder.initJsRpc(); jsRpc.copyTo(jsRpcBuilder); } - KJ_CASE_ONEOF(connect, tracing::ConnectEventInfo) { - auto connectBuilder = eventInfoBuilder.initConnect(); - connect.copyTo(connectBuilder); - } KJ_CASE_ONEOF(scheduled, tracing::ScheduledEventInfo) { auto scheduledBuilder = eventInfoBuilder.initScheduled(); scheduled.copyTo(scheduledBuilder); @@ -905,9 +891,6 @@ void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLev case rpc::Trace::EventInfo::Which::JS_RPC: eventInfo = tracing::JsRpcEventInfo(e.getJsRpc()); break; - case rpc::Trace::EventInfo::Which::CONNECT: - eventInfo = tracing::ConnectEventInfo(e.getConnect()); - break; case rpc::Trace::EventInfo::Which::SCHEDULED: eventInfo = tracing::ScheduledEventInfo(e.getScheduled()); break; @@ -1124,9 +1107,6 @@ Onset::Info readOnsetInfo(const rpc::Trace::Onset::Info::Reader& info) { case rpc::Trace::Onset::Info::JS_RPC: { return JsRpcEventInfo(info.getJsRpc()); } - case rpc::Trace::Onset::Info::CONNECT: { - return ConnectEventInfo(info.getConnect()); - } case rpc::Trace::Onset::Info::SCHEDULED: { return ScheduledEventInfo(info.getScheduled()); } @@ -1157,9 +1137,6 @@ void writeOnsetInfo(const Onset::Info& info, rpc::Trace::Onset::Info::Builder& i KJ_CASE_ONEOF(fetch, FetchEventInfo) { fetch.copyTo(infoBuilder.initFetch()); } - KJ_CASE_ONEOF(connect, ConnectEventInfo) { - connect.copyTo(infoBuilder.initConnect()); - } KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { jsrpc.copyTo(infoBuilder.initJsRpc()); } @@ -1312,9 +1289,6 @@ EventInfo cloneEventInfo(const EventInfo& info) { KJ_CASE_ONEOF(fetch, FetchEventInfo) { return fetch.clone(); } - KJ_CASE_ONEOF(connect, ConnectEventInfo) { - return connect.clone(); - } KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { return jsrpc.clone(); } diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index e6377dc6e70..75aac486182 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -362,15 +362,6 @@ struct JsRpcEventInfo final { kj::String toString() const; }; -class ConnectEventInfo { - public: - explicit ConnectEventInfo(); - explicit ConnectEventInfo(rpc::Trace::ConnectEventInfo::Reader reader); - - void copyTo(rpc::Trace::ConnectEventInfo::Builder builder) const; - ConnectEventInfo clone() const; -}; - // Describes a scheduled request struct ScheduledEventInfo final { explicit ScheduledEventInfo(double scheduledTime, kj::String cron); @@ -586,7 +577,6 @@ using EventInfo = kj::OneOf; EventInfo cloneEventInfo(const EventInfo& info); diff --git a/src/workerd/io/worker-entrypoint.c++ b/src/workerd/io/worker-entrypoint.c++ index 17fc33247a7..d83937f30a0 100644 --- a/src/workerd/io/worker-entrypoint.c++ +++ b/src/workerd/io/worker-entrypoint.c++ @@ -249,30 +249,6 @@ void WorkerEntrypoint::init(kj::Own worker, .attach(kj::mv(actor)); } -kj::Exception exceptionToPropagate(bool isInternalException, kj::Exception&& exception) { - if (isInternalException) { - // We've already logged it here, the only thing that matters to the client is that we failed - // due to an internal error. Note that this does not need to be labeled "remote." since jsg - // will sanitize it as an internal error. Note that we use `setDescription()` to preserve - // the exception type for `jsg::exceptionToJs(...)` downstream. - exception.setDescription(kj::str("worker_do_not_log; Request failed due to internal error")); - return kj::mv(exception); - } else { - // We do not care how many remote capnp servers this went through since we are returning - // it to the worker via jsg. - // TODO(someday) We also do this stripping when making the tunneled exception for - // `jsg::isTunneledException(...)`. It would be lovely if we could simply store some type - // instead of `loggedExceptionEarlier`. It would save use some work. - auto description = jsg::stripRemoteExceptionPrefix(exception.getDescription()); - if (!description.startsWith("remote.")) { - // If we already were annotated as remote from some other worker entrypoint, no point - // adding an additional prefix. - exception.setDescription(kj::str("remote.", description)); - } - return kj::mv(exception); - } -} - kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, kj::StringPtr url, const kj::HttpHeaders& headers, @@ -459,6 +435,31 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, } } + auto exceptionToPropagate = [&]() { + if (isInternalException) { + // We've already logged it here, the only thing that matters to the client is that we failed + // due to an internal error. Note that this does not need to be labeled "remote." since jsg + // will sanitize it as an internal error. Note that we use `setDescription()` to preserve + // the exception type for `jsg::exceptionToJs(...)` downstream. + exception.setDescription( + kj::str("worker_do_not_log; Request failed due to internal error")); + return kj::mv(exception); + } else { + // We do not care how many remote capnp servers this went through since we are returning + // it to the worker via jsg. + // TODO(someday) We also do this stripping when making the tunneled exception for + // `jsg::isTunneledException(...)`. It would be lovely if we could simply store some type + // instead of `loggedExceptionEarlier`. It would save use some work. + auto description = jsg::stripRemoteExceptionPrefix(exception.getDescription()); + if (!description.startsWith("remote.")) { + // If we already were annotated as remote from some other worker entrypoint, no point + // adding an additional prefix. + exception.setDescription(kj::str("remote.", description)); + } + return kj::mv(exception); + } + }; + if (wrappedResponse->isSent()) { // We can't fail open if the response was already sent, so set `failOpenService` null so that // that branch isn't taken below. @@ -470,7 +471,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, // TODO(cleanup): We'd really like to tunnel exceptions any time a worker is calling another // worker, not just for actors (and W2W below), but getting that right will require cleaning // up error handling more generally. - return exceptionToPropagate(isInternalException, kj::mv(exception)); + return exceptionToPropagate(); } else KJ_IF_SOME(service, failOpenService) { // Fall back to origin. @@ -504,7 +505,7 @@ kj::Promise WorkerEntrypoint::request(kj::HttpMethod method, // Like with the isActor check, we want to return exceptions back to the caller. // We don't want to handle this case the same as the isActor case though, since we want // fail-open to operate normally, which means this case must happen after fail-open handling. - return exceptionToPropagate(isInternalException, kj::mv(exception)); + return exceptionToPropagate(); } else { // Return error. @@ -541,17 +542,19 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, auto incomingRequest = kj::mv(KJ_REQUIRE_NONNULL(this->incomingRequest, "connect() can only be called once")); this->incomingRequest = kj::none; + // Whenever we implement incoming connections over the `connect` handler we need to remember to + // add tracing `onset` and `return` events using setEventInfo()/setReturn(), as with the other + // event types here. + incomingRequest->delivered(); auto& context = incomingRequest->getContext(); - auto featureFlags = context.getWorker().getIsolate().getApi().getFeatureFlags(); - if (featureFlags.getConnectPassThrough()) { - incomingRequest->delivered(); + KJ_DEFER({ + // Since we called incomingRequest->delivered, we are obliged to call `drain()`. + auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); + waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); + }); - KJ_DEFER({ - // Since we called incomingRequest->delivered, we are obliged to call `drain()`. - auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); - waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); - }); + if (context.getWorker().getIsolate().getApi().getFeatureFlags().getConnectPassThrough()) { // connect_pass_through feature flag means we should just forward the connect request on to // the global outbound. @@ -561,100 +564,9 @@ kj::Promise WorkerEntrypoint::connect(kj::StringPtr host, // Note: Intentionally return without co_await so that the `incomingRequest` is destroyed, // because we don't have any need to keep the context around. return next->connect(host, headers, connection, response, settings); - } else if (!featureFlags.getWorkerdExperimental()) { - JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported"); - } - - // TODO(soon): Implement basic TLS support for connect handler. - JSG_REQUIRE(!settings.useTls, Error, "Incoming CONNECT with TLS not supported"); - // Capture workerTracer, see request() for rationale. - kj::Maybe workerTracer; - - bool isActor = context.getActor() != kj::none; - - KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) { - t.setEventInfo(*incomingRequest, tracing::ConnectEventInfo()); - workerTracer = t; } - incomingRequest->delivered(); - - auto metricsForCatch = kj::addRef(incomingRequest->getMetrics()); - - return context - .run( - [this, &headers, &context, &connection, &response, entrypointName = entrypointName, - versionInfo = kj::mv(versionInfo), host = kj::str(host)](Worker::Lock& lock) mutable { - jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock); - - return lock.getGlobalScope().connect(kj::mv(host), headers, connection, response, lock, - lock.getExportedHandler( - entrypointName, kj::mv(versionInfo), kj::mv(props), context.getActor())); - }) - .then([&context, workerTracer]() { - KJ_IF_SOME(t, workerTracer) { - t.setReturn(context.now()); - } - }) - .catch_([this, &context](kj::Exception&& exception) mutable -> kj::Promise { - // Log JS exceptions to the JS console, if fiddle is attached. This also has the effect of - // logging internal errors to syslog. - loggedExceptionEarlier = true; - context.logUncaughtExceptionAsync(UncaughtExceptionSource::REQUEST_HANDLER, kj::cp(exception)); - - // Do not allow the exception to escape the isolate without waiting for the output gate to - // open. Note that in the success path, this is taken care of in `FetchEvent::respondWith()`. - return context.waitForOutputLocks().then( - [exception = kj::mv(exception)]() mutable -> kj::Promise { - return kj::mv(exception); - }); - }) - .attach(kj::defer([this, incomingRequest = kj::mv(incomingRequest), &context]() mutable { - // The request has been canceled, but allow it to continue executing in the background. - auto promise = incomingRequest->drain().attach(kj::mv(incomingRequest)); - waitUntilTasks.add(maybeAddGcPassForTest(context, kj::mv(promise))); - })) - .catch_([this, isActor, &response, metrics = kj::mv(metricsForCatch), workerTracer]( - kj::Exception&& exception) mutable -> kj::Promise { - // Don't return errors to end user. - auto isInternalException = !jsg::isTunneledException(exception.getDescription()) && - !jsg::isDoNotLogException(exception.getDescription()); - if (!loggedExceptionEarlier) { - // This exception seems to have originated during the deferred proxy task, so it was not - // logged to the IoContext earlier. - if (exception.getType() != kj::Exception::Type::DISCONNECTED && isInternalException) { - LOG_EXCEPTION("workerEntrypoint", exception); - } else { - KJ_LOG(INFO, exception); // Run with --verbose to see exception logs. - } - } - if (isActor || tunnelExceptions) { - // We want to tunnel exceptions from actors back to the caller. - // TODO(cleanup): We'd really like to tunnel exceptions any time a worker is calling another - // worker, not just for actors (and W2W below), but getting that right will require cleaning - // up error handling more generally. - return exceptionToPropagate(isInternalException, kj::mv(exception)); - } else { - // Return error. - - // We're catching the exception and replacing it with 5xx, but metrics should still indicate - // an exception. - metrics->reportFailure(exception); - - kj::HttpHeaders headers(threadContext.getHeaderTable()); - if (exception.getType() == kj::Exception::Type::OVERLOADED) { - response.reject(503, "Service Unavailable", headers, static_cast(0)); - } else { - response.reject(500, "Internal Server Error", headers, static_cast(0)); - } - // TODO(o11y): Should we also indicate a return response code for TCP? - KJ_IF_SOME(t, workerTracer) { - t.setReturn(kj::none); - } - - return kj::READY_NOW; - } - }); + JSG_FAIL_REQUIRE(TypeError, "Incoming CONNECT on a worker not supported"); } kj::Promise WorkerEntrypoint::prewarm(kj::StringPtr url) { diff --git a/src/workerd/io/worker-interface.capnp b/src/workerd/io/worker-interface.capnp index 6d70a4a90e5..05d88c0b91c 100644 --- a/src/workerd/io/worker-interface.capnp +++ b/src/workerd/io/worker-interface.capnp @@ -97,7 +97,6 @@ struct Trace @0x8e8d911203762d34 { email @16 :EmailEventInfo; trace @18 :TraceEventInfo; hibernatableWebSocket @20 :HibernatableWebSocketEventInfo; - connect @29 :ConnectEventInfo; } struct FetchEventInfo { method @0 :HttpMethod; @@ -115,9 +114,6 @@ struct Trace @0x8e8d911203762d34 { methodName @0 :Text; } - struct ConnectEventInfo { - } - struct ScheduledEventInfo { scheduledTime @0 :Float64; cron @1 :Text; @@ -274,7 +270,6 @@ struct Trace @0x8e8d911203762d34 { email @5 :EmailEventInfo; trace @6 :TraceEventInfo; hibernatableWebSocket @7 :HibernatableWebSocketEventInfo; - connect @9 :ConnectEventInfo; custom @8 :CustomEventInfo; } } diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index c1c9581b566..75f998bd68d 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -34,7 +34,6 @@ #include #include #include -#include #include #include #include @@ -5350,20 +5349,17 @@ class Server::HttpListener final: public kj::Refcounted { kj::AsyncIoStream& connection, ConnectResponse& response, kj::HttpConnectSettings settings) override { - TRACE_EVENT("workerd", "Connection:connect()"); KJ_IF_SOME(h, parent.rewriter->getCapnpConnectHost()) { if (h == host) { // Client is requesting to open a capnp session! response.accept(200, "OK", kj::HttpHeaders(parent.headerTable)); - co_return co_await parent.acceptCapnpConnection(connection); + return parent.acceptCapnpConnection(connection); } } - IoChannelFactory::SubrequestMetadata metadata; - metadata.cfBlobJson = mapCopyString(cfBlobJson); - - auto worker = parent.service->startRequest(kj::mv(metadata)); - co_return co_await worker->connect(host, headers, connection, response, kj::mv(settings)); + // TODO(someday): Deliver connect() event to to worker? For now we call the default + // implementation which throws an exception. + return kj::HttpService::connect(host, headers, connection, response, kj::mv(settings)); } // --------------------------------------------------------------------------- @@ -5383,57 +5379,6 @@ class Server::HttpListener final: public kj::Refcounted { }; }; -class Server::TcpListener final: public kj::Refcounted { - public: - TcpListener(Server& owner, - kj::Own listener, - kj::Own service, - kj::HttpHeaderTable& headerTable, - kj::StringPtr addrStr) - : owner(owner), - listener(kj::mv(listener)), - service(kj::mv(service)), - headerTable(headerTable), - addrStr(addrStr) {} - - kj::Promise run() { - TRACE_EVENT("workerd", "TcpListener::run"); - for (;;) { - kj::AuthenticatedStream stream = co_await listener->acceptAuthenticated(); - TRACE_EVENT("workerd", "TcpListener handle connection"); - - IoChannelFactory::SubrequestMetadata metadata; - auto req = service->startRequest(kj::mv(metadata)); - auto response = kj::heap(); - kj::HttpHeaders headers(headerTable); - owner.tasks.add(req->connect(addrStr, headers, *stream.stream, *response, {}) - .attach(kj::mv(stream.stream), kj::mv(response)) - .attach(kj::mv(req))); - } - } - - private: - Server& owner; - kj::Own listener; - kj::Own service; - kj::HttpHeaderTable& headerTable; - kj::StringPtr addrStr; - - struct ResponseWrapper final: public kj::HttpService::ConnectResponse { - void accept( - uint statusCode, kj::StringPtr statusText, const kj::HttpHeaders& headers) override { - // Ok.. we're accepting the connection... anything to do? - } - kj::Own reject(uint statusCode, - kj::StringPtr statusText, - const kj::HttpHeaders& headers, - kj::Maybe expectedBodySize = kj::none) override { - // Doh... we're rejecting the connection... anything to do? - return newNullOutputStream(); - } - }; -}; - kj::Promise Server::listenHttp(kj::Own listener, kj::Own service, kj::StringPtr physicalProtocol, @@ -5444,13 +5389,6 @@ kj::Promise Server::listenHttp(kj::Own listener, co_return co_await obj->run(); } -kj::Promise Server::listenTcp( - kj::Own listener, kj::Own service, kj::StringPtr addrStr) { - auto obj = kj::refcounted( - *this, kj::mv(listener), kj::mv(service), globalContext->headerTable, addrStr); - co_return co_await obj->run(); -} - // ======================================================================================= // Debug port for exposing all services via RPC @@ -5851,39 +5789,6 @@ kj::Promise Server::startServices(jsg::V8System& v8System, } } -kj::Maybe Server::parseSocketType( - config::Socket::Reader sock, kj::StringPtr name) { - switch (sock.which()) { - case config::Socket::HTTP: { - SocketTypeConfig result; - result.defaultPort = 80; - result.httpOptions = sock.getHttp(); - result.physicalProtocol = "http"; - return kj::mv(result); - } - case config::Socket::HTTPS: { - auto https = sock.getHttps(); - SocketTypeConfig result; - result.defaultPort = 443; - result.httpOptions = https.getOptions(); - result.tls = makeTlsContext(https.getTlsOptions()); - result.physicalProtocol = "https"; - return kj::mv(result); - } - case config::Socket::TCP: { - auto tcp = sock.getTcp(); - SocketTypeConfig result; - if (tcp.hasTlsOptions()) { - result.tls = makeTlsContext(tcp.getTlsOptions()); - } - return kj::mv(result); - } - } - reportConfigError(kj::str("Encountered unknown socket type in \"", name, - "\". Was the config compiled with a newer version of the schema?")); - return kj::none; -} - kj::Promise Server::listenOnSockets(config::Config::Reader config, kj::HttpHeaderTable::Builder& headerTableBuilder, kj::ForkedPromise& forkedDrainWhen, @@ -5920,10 +5825,31 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, continue; } - auto maybeSocketConfig = parseSocketType(sock, name); - if (maybeSocketConfig == kj::none) continue; - auto& socketConfig = KJ_ASSERT_NONNULL(maybeSocketConfig); + uint defaultPort = 0; + config::HttpOptions::Reader httpOptions; + kj::Maybe> tls; + kj::StringPtr physicalProtocol; + switch (sock.which()) { + case config::Socket::HTTP: + defaultPort = 80; + httpOptions = sock.getHttp(); + physicalProtocol = "http"; + goto validSocket; + case config::Socket::HTTPS: { + auto https = sock.getHttps(); + defaultPort = 443; + httpOptions = https.getOptions(); + tls = makeTlsContext(https.getTlsOptions()); + physicalProtocol = "https"; + goto validSocket; + } + } + reportConfigError(kj::str("Encountered unknown socket type in \"", name, + "\". Was the config compiled with a " + "newer version of the schema?")); + continue; + validSocket: using PromisedReceived = kj::Promise>; PromisedReceived listener = nullptr; KJ_IF_SOME(l, listenerOverride) { @@ -5932,10 +5858,10 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, listener = ([](kj::Promise> promise) -> PromisedReceived { auto parsed = co_await promise; co_return parsed->listen(); - })(network.parseAddress(addrStr, socketConfig.defaultPort)); + })(network.parseAddress(addrStr, defaultPort)); } - KJ_IF_SOME(t, socketConfig.tls) { + KJ_IF_SOME(t, tls) { listener = ([](kj::Promise> promise, kj::Own tls) -> PromisedReceived { auto port = co_await promise; @@ -5945,19 +5871,12 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, // Need to create rewriter before waiting on anything since `headerTableBuilder` will no longer // be available later. - auto rewriter = kj::heap(socketConfig.httpOptions, headerTableBuilder); + auto rewriter = kj::heap(httpOptions, headerTableBuilder); auto handle = kj::coCapture( - [this, service = kj::mv(service), rewriter = kj::mv(rewriter), - physicalProtocol = socketConfig.physicalProtocol, name, - isHttp = sock.which() != config::Socket::TCP, addrStr]( + [this, service = kj::mv(service), rewriter = kj::mv(rewriter), physicalProtocol, name]( kj::Promise> promise) mutable -> kj::Promise { - if (isHttp) { - TRACE_EVENT("workerd", "setup listenHttp"); - } else { - TRACE_EVENT("workerd", "setup listenTcp"); - } - + TRACE_EVENT("workerd", "setup listenHttp"); auto listener = co_await promise; KJ_IF_SOME(stream, controlOverride) { auto message = kj::str("{\"event\":\"listen\",\"socket\":\"", name, @@ -5968,12 +5887,7 @@ kj::Promise Server::listenOnSockets(config::Config::Reader config, KJ_LOG(ERROR, e); } } - - if (isHttp) { - co_await listenHttp(kj::mv(listener), kj::mv(service), physicalProtocol, kj::mv(rewriter)); - } else { - co_await listenTcp(kj::mv(listener), kj::mv(service), addrStr); - } + co_await listenHttp(kj::mv(listener), kj::mv(service), physicalProtocol, kj::mv(rewriter)); }); tasks.add(handle(kj::mv(listener)).exclusiveJoin(forkedDrainWhen.addBranch())); } diff --git a/src/workerd/server/server.h b/src/workerd/server/server.h index 7f17abdd66f..07569fa5539 100644 --- a/src/workerd/server/server.h +++ b/src/workerd/server/server.h @@ -286,9 +286,6 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl kj::StringPtr physicalProtocol, kj::Own rewriter); - kj::Promise listenTcp( - kj::Own listener, kj::Own service, kj::StringPtr addrStr); - kj::Promise listenDebugPort(kj::Own listener); class InvalidConfigService; @@ -301,7 +298,6 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl class WorkerEntrypointService; class WorkerdBootstrapImpl; class HttpListener; - class TcpListener; class DebugPortListener; struct ErrorReporter; @@ -323,16 +319,6 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl kj::ForkedPromise& forkedDrainWhen, bool forTest = false); - // Parsed socket protocol/TLS config. Extracted from the switch in listenOnSockets() to avoid - // goto-over-initialization inside a coroutine, which triggers a clang optimizer crash. - struct SocketTypeConfig { - uint defaultPort = 0; - config::HttpOptions::Reader httpOptions; - kj::Maybe> tls; - kj::StringPtr physicalProtocol; - }; - kj::Maybe parseSocketType(config::Socket::Reader sock, kj::StringPtr name); - void unlinkWorkerLoaders(); kj::Promise preloadPython( diff --git a/src/workerd/server/workerd.capnp b/src/workerd/server/workerd.capnp index 9be9b11854b..9e1cb23b62a 100644 --- a/src/workerd/server/workerd.capnp +++ b/src/workerd/server/workerd.capnp @@ -143,11 +143,8 @@ struct Socket { options @3 :HttpOptions; tlsOptions @4 :TlsOptions; } - tcp :group { - tlsOptions @6 :TlsOptions; - } - # TODO(someday): TCP proxy, SMTP, Cap'n Proto, ... + # TODO(someday): TCP, TCP proxy, SMTP, Cap'n Proto, ... } service @5 :ServiceDesignator; diff --git a/types/defines/rpc.d.ts b/types/defines/rpc.d.ts index ae73fc15b97..fe4b87d5378 100644 --- a/types/defines/rpc.d.ts +++ b/types/defines/rpc.d.ts @@ -239,7 +239,6 @@ declare namespace CloudflareWorkersModule { email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -261,7 +260,6 @@ declare namespace CloudflareWorkersModule { alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer diff --git a/types/defines/trace.d.ts b/types/defines/trace.d.ts index f316c6a3dee..79ce6fd5e40 100644 --- a/types/defines/trace.d.ts +++ b/types/defines/trace.d.ts @@ -74,10 +74,6 @@ interface FetchResponseInfo { readonly statusCode: number; } -interface ConnectEventInfo { - readonly type: "connect"; -} - type EventOutcome = "ok" | "canceled" | "exception" | "unknown" | "killSwitch" | "daemonDown" | "exceededCpu" | "exceededMemory" | "loadShed" | "responseStreamDisconnected" | "scriptNotFound"; @@ -99,10 +95,10 @@ interface Onset { readonly scriptName?: string; readonly scriptTags?: string[]; readonly scriptVersion?: ScriptVersion; - readonly info: FetchEventInfo | ConnectEventInfo | JsRpcEventInfo | - ScheduledEventInfo | AlarmEventInfo | QueueEventInfo | - EmailEventInfo | TraceEventInfo | - HibernatableWebSocketEventInfo | CustomEventInfo; + readonly info: FetchEventInfo | JsRpcEventInfo | ScheduledEventInfo | + AlarmEventInfo | QueueEventInfo | EmailEventInfo | + TraceEventInfo | HibernatableWebSocketEventInfo | + CustomEventInfo; } interface Outcome { diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index 87d7dc1376f..b42c699ec90 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -511,11 +511,6 @@ type ExportedHandlerFetchHandler< env: Env, ctx: ExecutionContext, ) => Response | Promise; -type ExportedHandlerConnectHandler = ( - socket: Socket, - env: Env, - ctx: ExecutionContext, -) => void | Promise; type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -557,7 +552,6 @@ interface ExportedHandler< Props = unknown, > { fetch?: ExportedHandlerFetchHandler; - connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -590,7 +584,6 @@ declare abstract class ColoLocalActorNamespace { } interface DurableObject { fetch(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?( ws: WebSocket, @@ -608,7 +601,7 @@ type DurableObjectStub< T extends Rpc.DurableObjectBranded | undefined = undefined, > = Fetcher< T, - "alarm" | "connect" | "webSocketMessage" | "webSocketClose" | "webSocketError" + "alarm" | "webSocketMessage" | "webSocketClose" | "webSocketError" > & { readonly id: DurableObjectId; readonly name?: string; @@ -3280,7 +3273,6 @@ interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo - | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3310,7 +3302,6 @@ interface TraceItem { interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } -interface TraceItemConnectEventInfo {} interface TraceItemCustomEventInfo {} interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -13262,7 +13253,6 @@ declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -13283,7 +13273,6 @@ declare namespace CloudflareWorkersModule { constructor(ctx: DurableObjectState, env: Env); alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer, @@ -14283,9 +14272,6 @@ declare namespace TailStream { readonly type: "fetch"; readonly statusCode: number; } - interface ConnectEventInfo { - readonly type: "connect"; - } type EventOutcome = | "ok" | "canceled" @@ -14316,7 +14302,6 @@ declare namespace TailStream { readonly scriptVersion?: ScriptVersion; readonly info: | FetchEventInfo - | ConnectEventInfo | JsRpcEventInfo | ScheduledEventInfo | AlarmEventInfo diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index d52a5c56a9b..646e4246f84 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -513,11 +513,6 @@ export type ExportedHandlerFetchHandler< env: Env, ctx: ExecutionContext, ) => Response | Promise; -export type ExportedHandlerConnectHandler = ( - socket: Socket, - env: Env, - ctx: ExecutionContext, -) => void | Promise; export type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -559,7 +554,6 @@ export interface ExportedHandler< Props = unknown, > { fetch?: ExportedHandlerFetchHandler; - connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -592,7 +586,6 @@ export declare abstract class ColoLocalActorNamespace { } export interface DurableObject { fetch(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?( ws: WebSocket, @@ -610,7 +603,7 @@ export type DurableObjectStub< T extends Rpc.DurableObjectBranded | undefined = undefined, > = Fetcher< T, - "alarm" | "connect" | "webSocketMessage" | "webSocketClose" | "webSocketError" + "alarm" | "webSocketMessage" | "webSocketClose" | "webSocketError" > & { readonly id: DurableObjectId; readonly name?: string; @@ -3286,7 +3279,6 @@ export interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo - | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3316,7 +3308,6 @@ export interface TraceItem { export interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } -export interface TraceItemConnectEventInfo {} export interface TraceItemCustomEventInfo {} export interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -13229,7 +13220,6 @@ export declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -13250,7 +13240,6 @@ export declare namespace CloudflareWorkersModule { constructor(ctx: DurableObjectState, env: Env); alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer, @@ -14240,9 +14229,6 @@ export declare namespace TailStream { readonly type: "fetch"; readonly statusCode: number; } - interface ConnectEventInfo { - readonly type: "connect"; - } type EventOutcome = | "ok" | "canceled" @@ -14273,7 +14259,6 @@ export declare namespace TailStream { readonly scriptVersion?: ScriptVersion; readonly info: | FetchEventInfo - | ConnectEventInfo | JsRpcEventInfo | ScheduledEventInfo | AlarmEventInfo diff --git a/types/generated-snapshot/latest/index.d.ts b/types/generated-snapshot/latest/index.d.ts index 8e6b76d564d..65c259fb2b3 100755 --- a/types/generated-snapshot/latest/index.d.ts +++ b/types/generated-snapshot/latest/index.d.ts @@ -489,11 +489,6 @@ type ExportedHandlerFetchHandler< env: Env, ctx: ExecutionContext, ) => Response | Promise; -type ExportedHandlerConnectHandler = ( - socket: Socket, - env: Env, - ctx: ExecutionContext, -) => void | Promise; type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -535,7 +530,6 @@ interface ExportedHandler< Props = unknown, > { fetch?: ExportedHandlerFetchHandler; - connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -564,7 +558,6 @@ interface Cloudflare { } interface DurableObject { fetch(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?( ws: WebSocket, @@ -582,7 +575,7 @@ type DurableObjectStub< T extends Rpc.DurableObjectBranded | undefined = undefined, > = Fetcher< T, - "alarm" | "connect" | "webSocketMessage" | "webSocketClose" | "webSocketError" + "alarm" | "webSocketMessage" | "webSocketClose" | "webSocketError" > & { readonly id: DurableObjectId; readonly name?: string; @@ -3129,7 +3122,6 @@ interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo - | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3159,7 +3151,6 @@ interface TraceItem { interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } -interface TraceItemConnectEventInfo {} interface TraceItemCustomEventInfo {} interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -12549,7 +12540,6 @@ declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -12570,7 +12560,6 @@ declare namespace CloudflareWorkersModule { constructor(ctx: DurableObjectState, env: Env); alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer, @@ -13570,9 +13559,6 @@ declare namespace TailStream { readonly type: "fetch"; readonly statusCode: number; } - interface ConnectEventInfo { - readonly type: "connect"; - } type EventOutcome = | "ok" | "canceled" @@ -13603,7 +13589,6 @@ declare namespace TailStream { readonly scriptVersion?: ScriptVersion; readonly info: | FetchEventInfo - | ConnectEventInfo | JsRpcEventInfo | ScheduledEventInfo | AlarmEventInfo diff --git a/types/generated-snapshot/latest/index.ts b/types/generated-snapshot/latest/index.ts index f99255aea9e..223e1c0539a 100755 --- a/types/generated-snapshot/latest/index.ts +++ b/types/generated-snapshot/latest/index.ts @@ -491,11 +491,6 @@ export type ExportedHandlerFetchHandler< env: Env, ctx: ExecutionContext, ) => Response | Promise; -export type ExportedHandlerConnectHandler = ( - socket: Socket, - env: Env, - ctx: ExecutionContext, -) => void | Promise; export type ExportedHandlerTailHandler = ( events: TraceItem[], env: Env, @@ -537,7 +532,6 @@ export interface ExportedHandler< Props = unknown, > { fetch?: ExportedHandlerFetchHandler; - connect?: ExportedHandlerConnectHandler; tail?: ExportedHandlerTailHandler; trace?: ExportedHandlerTraceHandler; tailStream?: ExportedHandlerTailStreamHandler; @@ -566,7 +560,6 @@ export interface Cloudflare { } export interface DurableObject { fetch(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; webSocketMessage?( ws: WebSocket, @@ -584,7 +577,7 @@ export type DurableObjectStub< T extends Rpc.DurableObjectBranded | undefined = undefined, > = Fetcher< T, - "alarm" | "connect" | "webSocketMessage" | "webSocketClose" | "webSocketError" + "alarm" | "webSocketMessage" | "webSocketClose" | "webSocketError" > & { readonly id: DurableObjectId; readonly name?: string; @@ -3135,7 +3128,6 @@ export interface TraceItem { | ( | TraceItemFetchEventInfo | TraceItemJsRpcEventInfo - | TraceItemConnectEventInfo | TraceItemScheduledEventInfo | TraceItemAlarmEventInfo | TraceItemQueueEventInfo @@ -3165,7 +3157,6 @@ export interface TraceItem { export interface TraceItemAlarmEventInfo { readonly scheduledTime: Date; } -export interface TraceItemConnectEventInfo {} export interface TraceItemCustomEventInfo {} export interface TraceItemScheduledEventInfo { readonly scheduledTime: number; @@ -12516,7 +12507,6 @@ export declare namespace CloudflareWorkersModule { constructor(ctx: ExecutionContext, env: Env); email?(message: ForwardableEmailMessage): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; queue?(batch: MessageBatch): void | Promise; scheduled?(controller: ScheduledController): void | Promise; tail?(events: TraceItem[]): void | Promise; @@ -12537,7 +12527,6 @@ export declare namespace CloudflareWorkersModule { constructor(ctx: DurableObjectState, env: Env); alarm?(alarmInfo?: AlarmInvocationInfo): void | Promise; fetch?(request: Request): Response | Promise; - connect?(socket: Socket): void | Promise; webSocketMessage?( ws: WebSocket, message: string | ArrayBuffer, @@ -13527,9 +13516,6 @@ export declare namespace TailStream { readonly type: "fetch"; readonly statusCode: number; } - interface ConnectEventInfo { - readonly type: "connect"; - } type EventOutcome = | "ok" | "canceled" @@ -13560,7 +13546,6 @@ export declare namespace TailStream { readonly scriptVersion?: ScriptVersion; readonly info: | FetchEventInfo - | ConnectEventInfo | JsRpcEventInfo | ScheduledEventInfo | AlarmEventInfo