From 0e2d2cef4512cba3428ce70889e8952c0c04188b Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Sat, 28 Mar 2026 22:09:35 -0500 Subject: [PATCH 1/3] containers: Implement exec() in containers --- src/workerd/api/container.c++ | 304 ++++++++++++++++++++++++++++++++- src/workerd/api/container.h | 134 ++++++++++++++- src/workerd/io/container.capnp | 39 +++++ 3 files changed, 468 insertions(+), 9 deletions(-) diff --git a/src/workerd/api/container.c++ b/src/workerd/api/container.c++ index baf10f29e1a..a75e64ca9c0 100644 --- a/src/workerd/api/container.c++ +++ b/src/workerd/api/container.c++ @@ -5,10 +5,14 @@ #include "container.h" #include +#include +#include +#include #include #include #include +#include namespace workerd::api { @@ -30,8 +34,170 @@ kj::Maybe parseRestorePath(kj::StringPtr path) { } } +jsg::BufferSource copyBytes(jsg::Lock& js, kj::ArrayPtr bytes) { + auto backing = jsg::BackingStore::alloc(js, bytes.size()); + backing.asArrayPtr().copyFrom(bytes); + return jsg::BufferSource(js, kj::mv(backing)); +} + +void requireValidEnvNameAndValue(kj::StringPtr name, kj::StringPtr value) { + JSG_REQUIRE(name.findFirst('=') == kj::none, Error, + "Environment variable names cannot contain '=': ", name); + JSG_REQUIRE(name.findFirst('\0') == kj::none, Error, + "Environment variable names cannot contain '\\0': ", name); + JSG_REQUIRE(value.findFirst('\0') == kj::none, Error, + "Environment variable values cannot contain '\\0': ", name); +} + +kj::String getExecOutputMode(jsg::Optional maybeMode, kj::StringPtr kind) { + auto mode = kj::mv(maybeMode).orDefault(kj::str("pipe")); + JSG_REQUIRE(mode == "pipe" || mode == "ignore" || (kind == "stderr" && mode == "combined"), + TypeError, "Invalid ", kind, " option: ", mode); + return mode; +} + +kj::Array emptyByteArray() { + return kj::heapArray(0); +} + +capnp::ByteStream::Client makeExecPipe( + capnp::ByteStreamFactory& factory, kj::Own output) { + return factory.kjToCapnp(capnp::ExplicitEndOutputStream::wrap(kj::mv(output), []() {})); +} + } // namespace +// ======================================================================================= +// ExecOutput / ExecProcess + +ExecOutput::ExecOutput(kj::Array stdout, kj::Array stderr, int exitCode) + : stdout(kj::mv(stdout)), + stderr(kj::mv(stderr)), + exitCode(exitCode) {} + +jsg::BufferSource ExecOutput::getStdout(jsg::Lock& js) { + return copyBytes(js, stdout); +} + +jsg::BufferSource ExecOutput::getStderr(jsg::Lock& js) { + return copyBytes(js, stderr); +} + +ExecProcess::ExecProcess(jsg::Optional> stdin, + jsg::Optional> stdout, + jsg::Optional> stderr, + int pid, + rpc::Container::ProcessHandle::Client handle) + : stdin(kj::mv(stdin)), + stdout(kj::mv(stdout)), + stderr(kj::mv(stderr)), + pid(pid), + handle(IoContext::current().addObject(kj::heap(kj::mv(handle)))) {} + +jsg::Optional> ExecProcess::getStdin() { + return stdin.map([](jsg::Ref& stream) { return stream.addRef(); }); +} + +jsg::Optional> ExecProcess::getStdout() { + return stdout.map([](jsg::Ref& stream) { return stream.addRef(); }); +} + +jsg::Optional> ExecProcess::getStderr() { + return stderr.map([](jsg::Ref& stream) { return stream.addRef(); }); +} + +void ExecProcess::ensureExitCodePromise(jsg::Lock& js) { + if (exitCodePromise != kj::none) { + return; + } + + // jsg::Promise is single-use. Keep the original Promise as the public `exitCode` + // property and a separate whenResolved() branch for helpers like output(). + auto self = JSG_THIS; + auto promise = IoContext::current().awaitIo(js, + handle->waitRequest(capnp::MessageSize{4, 0}) + .send() + .then([self = kj::mv(self)]( + capnp::Response&& results) mutable { + auto exitCode = results.getExitCode(); + self->resolvedExitCode = exitCode; + return exitCode; + })); + + exitCodePromiseCopy = promise.whenResolved(js); + exitCodePromise = jsg::MemoizedIdentity>(kj::mv(promise)); +} + +jsg::Promise ExecProcess::getExitCodeForOutput(jsg::Lock& js) { + ensureExitCodePromise(js); + + KJ_IF_SOME(exitCode, resolvedExitCode) { + return js.resolvedPromise(static_cast(exitCode)); + } + + auto self = JSG_THIS; + return KJ_ASSERT_NONNULL(exitCodePromiseCopy) + .whenResolved(js) + .then(js, [self = kj::mv(self)](jsg::Lock&) -> int { + return static_cast(KJ_ASSERT_NONNULL(self->resolvedExitCode)); + }); +} + +jsg::MemoizedIdentity>& ExecProcess::getExitCode(jsg::Lock& js) { + ensureExitCodePromise(js); + return KJ_ASSERT_NONNULL(exitCodePromise); +} + +jsg::Promise> ExecProcess::output(jsg::Lock& js) { + auto stdoutPromise = js.resolvedPromise(emptyByteArray()); + KJ_IF_SOME(stream, stdout) { + JSG_REQUIRE(!stream->isDisturbed(), TypeError, + "Cannot call output() after stdout has started being consumed."); + stdoutPromise = stream->getController() + .readAllBytes(js, kj::maxValue) + .then(js, [](jsg::Lock&, jsg::BufferSource bytes) { + return kj::heapArray(bytes.asArrayPtr()); + }); + } + + auto stderrPromise = js.resolvedPromise(emptyByteArray()); + KJ_IF_SOME(stream, stderr) { + JSG_REQUIRE(!stream->isDisturbed(), TypeError, + "Cannot call output() after stderr has started being consumed."); + stderrPromise = stream->getController() + .readAllBytes(js, kj::maxValue) + .then(js, [](jsg::Lock&, jsg::BufferSource bytes) { + return kj::heapArray(bytes.asArrayPtr()); + }); + } + + auto exitCodePromise = getExitCodeForOutput(js); + + return stdoutPromise.then(js, + [stderrPromise = kj::mv(stderrPromise), exitCodePromise = kj::mv(exitCodePromise)]( + jsg::Lock& js, + kj::Array stdoutBytes) mutable -> jsg::Promise> { + return stderrPromise.then(js, + [stdoutBytes = kj::mv(stdoutBytes), exitCodePromise = kj::mv(exitCodePromise)]( + jsg::Lock& js, + kj::Array stderrBytes) mutable -> jsg::Promise> { + return exitCodePromise.then(js, + [stdoutBytes = kj::mv(stdoutBytes), stderrBytes = kj::mv(stderrBytes)]( + jsg::Lock& js, int exitCode) mutable -> jsg::Ref { + return js.alloc(kj::mv(stdoutBytes), kj::mv(stderrBytes), exitCode); + }); + }); + }); +} + +void ExecProcess::kill(jsg::Lock& js, jsg::Optional signal) { + auto signo = signal.orDefault(15); + JSG_REQUIRE(signo > 0 && signo <= 64, RangeError, "Invalid signal number."); + + auto req = handle->killRequest(capnp::MessageSize{4, 0}); + req.setSigno(signo); + IoContext::current().addTask(req.sendIgnoringResult()); +} // ======================================================================================= // Basic lifecycle methods @@ -58,14 +224,7 @@ void Container::start(jsg::Lock& js, jsg::Optional maybeOptions) auto list = req.initEnvironmentVariables(env.fields.size()); for (auto i: kj::indices(env.fields)) { auto field = &env.fields[i]; - JSG_REQUIRE(field->name.findFirst('=') == kj::none, Error, - "Environment variable names cannot contain '=': ", field->name); - - JSG_REQUIRE(field->name.findFirst('\0') == kj::none, Error, - "Environment variable names cannot contain '\\0': ", field->name); - - JSG_REQUIRE(field->value.findFirst('\0') == kj::none, Error, - "Environment variable values cannot contain '\\0': ", field->name); + requireValidEnvNameAndValue(field->name, field->value); list.set(i, str(field->name, "=", field->value)); } @@ -250,6 +409,135 @@ jsg::Promise Container::interceptOutboundHttps( return ioctx.awaitIo(js, req.sendIgnoringResult()); } +jsg::Promise> Container::exec( + jsg::Lock& js, kj::Array cmd, jsg::Optional maybeOptions) { + JSG_REQUIRE(running, Error, "exec() cannot be called on a container that is not running."); + JSG_REQUIRE(cmd.size() > 0, TypeError, "exec() requires a non-empty command array."); + + auto options = kj::mv(maybeOptions).orDefault({}); + auto stdoutMode = getExecOutputMode(kj::mv(options.stdout), "stdout"); + auto stderrMode = getExecOutputMode(kj::mv(options.stderr), "stderr"); + bool combinedOutput = stderrMode == "combined"; + JSG_REQUIRE(!combinedOutput || stdoutMode == "pipe", TypeError, + "stderr: \"combined\" requires stdout to be \"pipe\"."); + + auto& ioContext = IoContext::current(); + auto& byteStreamFactory = ioContext.getByteStreamFactory(); + + auto req = rpcClient->execRequest(); + auto cmdList = req.initCmd(cmd.size()); + for (auto i: kj::indices(cmd)) { + cmdList.set(i, cmd[i]); + } + + // Init the kj pipes to create the stdout/err bytestreams + kj::Maybe> stdoutInput; + if (stdoutMode == "pipe") { + auto pipe = kj::newOneWayPipe(); + req.setStdout(makeExecPipe(byteStreamFactory, kj::mv(pipe.out))); + stdoutInput = kj::mv(pipe.in); + } + + kj::Maybe> stderrInput; + if (!combinedOutput && stderrMode == "pipe") { + auto pipe = kj::newOneWayPipe(); + req.setStderr(makeExecPipe(byteStreamFactory, kj::mv(pipe.out))); + stderrInput = kj::mv(pipe.in); + } + + auto params = req.initParams(); + params.setCombinedOutput(combinedOutput); + + // Some basic validation... + KJ_IF_SOME(cwd, options.cwd) { + JSG_REQUIRE(cwd.findFirst('\0') == kj::none, TypeError, "cwd cannot contain '\\0' characters."); + params.setWorkingDirectory(cwd); + } + + KJ_IF_SOME(user, options.user) { + JSG_REQUIRE( + user.findFirst('\0') == kj::none, TypeError, "user cannot contain '\\0' characters."); + params.setUser(user); + } + + KJ_IF_SOME(env, options.env) { + auto envList = params.initEnv(env.fields.size()); + for (auto i: kj::indices(env.fields)) { + auto field = &env.fields[i]; + requireValidEnvNameAndValue(field->name, field->value); + envList.set(i, str(field->name, "=", field->value)); + } + } + + // We have to await, because PID won't be available until the response resolves + return ioContext.awaitIo(js, req.send()) + .then(js, + [&ioContext, &byteStreamFactory, options = kj::mv(options), + stdoutInput = kj::mv(stdoutInput), stderrInput = kj::mv(stderrInput)]( + jsg::Lock& js, capnp::Response results) mutable + -> jsg::Ref { + auto process = results.getProcess(); + auto handle = process.getHandle(); + auto pid = process.getPid(); + + // Init the ReadableStreams (stdout/stderr) + jsg::Optional> stdoutStream = kj::none; + KJ_IF_SOME(input, stdoutInput) { + auto source = newSystemStream(kj::mv(input), StreamEncoding::IDENTITY, ioContext); + stdoutStream = js.alloc(ioContext, kj::mv(source)); + } + + // stderrInput is only set if using "pipe" on stderr and not "combined" + jsg::Optional> stderrStream = kj::none; + KJ_IF_SOME(input, stderrInput) { + auto source = newSystemStream(kj::mv(input), StreamEncoding::IDENTITY, ioContext); + stderrStream = js.alloc(ioContext, kj::mv(source)); + } + + jsg::Optional> stdinStream = kj::none; + + // If stdin is undefined, the JS API promises immediate EOF. We still use the pipelined stdin() + // capability so exec() doesn't wait on an extra round-trip. + KJ_IF_SOME(stdinOption, options.stdin) { + auto stdinRequest = handle.stdinRequest(capnp::MessageSize{4, 0}); + // Get the stdin() ByteStream, use the pipelined capability + auto stdinPipeline = stdinRequest.send(); + // ... adapt bytestream into a writer + auto stdinWriter = byteStreamFactory.capnpToKjExplicitEnd(stdinPipeline.getStdin()); + + KJ_SWITCH_ONEOF(stdinOption) { + // user sets ReadableStream... + KJ_CASE_ONEOF(readable, jsg::Ref) { + auto sink = newSystemStream(kj::mv(stdinWriter), StreamEncoding::IDENTITY, ioContext); + auto pipePromise = + (ioContext.waitForDeferredProxy(readable->pumpTo(js, kj::mv(sink), true))); + ioContext.addTask(pipePromise.attach(readable.addRef())); + } + // user sets "pipe"... they want to consume the API with the stdin WritableStream + KJ_CASE_ONEOF(mode, kj::String) { + JSG_REQUIRE(mode == "pipe", TypeError, + "stdin must be a ReadableStream or the string \"pipe\"."); + auto sink = newSystemStream(kj::mv(stdinWriter), StreamEncoding::IDENTITY, ioContext); + auto writable = js.alloc(ioContext, kj::mv(sink), + ioContext.getMetrics().tryCreateWritableByteStreamObserver()); + stdinStream = kj::mv(writable); + } + } + + // all good, we have the stdinStream set + } else { + auto stdinRequest = handle.stdinRequest(capnp::MessageSize{4, 0}); + auto stdinPipeline = stdinRequest.send(); + auto stdinWriter = byteStreamFactory.capnpToKjExplicitEnd(stdinPipeline.getStdin()); + ioContext.addTask(stdinWriter->end().attach(kj::mv(stdinWriter))); + } + + // return the instance to the process after getting pipeline of the process handle + return js.alloc( + kj::mv(stdinStream), kj::mv(stdoutStream), kj::mv(stderrStream), pid, kj::mv(handle)); + }); +} + jsg::Promise Container::monitor(jsg::Lock& js) { JSG_REQUIRE(running, Error, "monitor() cannot be called on a container that is not running."); diff --git a/src/workerd/api/container.h b/src/workerd/api/container.h index 7651a958f16..44cc98cb7c3 100644 --- a/src/workerd/api/container.h +++ b/src/workerd/api/container.h @@ -5,14 +5,142 @@ #pragma once // Container management API for Durable Object-attached containers. // +#include +#include #include #include #include #include +#ifdef stdin +#undef stdin +#endif +#ifdef stdout +#undef stdout +#endif +#ifdef stderr +#undef stderr +#endif + namespace workerd::api { class Fetcher; +class ExecOutput: public jsg::Object { + public: + ExecOutput(kj::Array stdout, kj::Array stderr, int exitCode); + + jsg::BufferSource getStdout(jsg::Lock& js); + jsg::BufferSource getStderr(jsg::Lock& js); + int getExitCode() const { + return exitCode; + } + + JSG_RESOURCE_TYPE(ExecOutput) { + JSG_LAZY_READONLY_INSTANCE_PROPERTY(stdout, getStdout); + JSG_LAZY_READONLY_INSTANCE_PROPERTY(stderr, getStderr); + JSG_READONLY_PROTOTYPE_PROPERTY(exitCode, getExitCode); + + JSG_TS_OVERRIDE({ + readonly stdout: ArrayBuffer; + readonly stderr: ArrayBuffer; + readonly exitCode: number; + }); + } + + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("stdout", stdout); + tracker.trackField("stderr", stderr); + } + + private: + kj::Array stdout; + kj::Array stderr; + int exitCode; +}; + +struct ExecOptions { + jsg::Optional, kj::String>> stdin; + jsg::Optional stdout; + jsg::Optional stderr; + jsg::Optional cwd; + jsg::Optional> env; + jsg::Optional user; + + JSG_STRUCT(stdin, stdout, stderr, cwd, env, user); + JSG_STRUCT_TS_OVERRIDE(ContainerExecOptions { + stdin?: ReadableStream | "pipe"; + stdout?: "pipe" | "ignore"; + stderr?: "pipe" | "ignore" | "combined"; + cwd?: string; + env?: Record; + user?: string; + }); +}; + +class ExecProcess: public jsg::Object { + public: + ExecProcess(jsg::Optional> stdin, + jsg::Optional> stdout, + jsg::Optional> stderr, + int pid, + rpc::Container::ProcessHandle::Client handle); + + jsg::Optional> getStdin(); + jsg::Optional> getStdout(); + jsg::Optional> getStderr(); + int getPid() const { + return pid; + } + jsg::MemoizedIdentity>& getExitCode(jsg::Lock& js); + + jsg::Promise> output(jsg::Lock& js); + void kill(jsg::Lock& js, jsg::Optional signal); + + JSG_RESOURCE_TYPE(ExecProcess) { + JSG_READONLY_PROTOTYPE_PROPERTY(stdin, getStdin); + JSG_READONLY_PROTOTYPE_PROPERTY(stdout, getStdout); + JSG_READONLY_PROTOTYPE_PROPERTY(stderr, getStderr); + JSG_READONLY_PROTOTYPE_PROPERTY(pid, getPid); + JSG_READONLY_PROTOTYPE_PROPERTY(exitCode, getExitCode); + JSG_METHOD(output); + JSG_METHOD(kill); + + JSG_TS_OVERRIDE({ + readonly stdin: WritableStream | null; + readonly stdout: ReadableStream | null; + readonly stderr: ReadableStream | null; + readonly pid: number; + readonly exitCode: Promise; + output(): Promise; + kill(signal?: number): void; + }); + } + + void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { + tracker.trackField("stdin", stdin); + tracker.trackField("stdout", stdout); + tracker.trackField("stderr", stderr); + tracker.trackField("exitCodePromise", exitCodePromise); + tracker.trackField("exitCodePromiseCopy", exitCodePromiseCopy); + } + + private: + void ensureExitCodePromise(jsg::Lock& js); + jsg::Promise getExitCodeForOutput(jsg::Lock& js); + + jsg::Optional> stdin; + jsg::Optional> stdout; + jsg::Optional> stderr; + int pid; + IoOwn handle; + kj::Maybe>> exitCodePromise; + kj::Maybe> exitCodePromiseCopy; + kj::Maybe resolvedExitCode; + + void visitForGc(jsg::GcVisitor& visitor) { + visitor.visit(stdin, stdout, stderr, exitCodePromise, exitCodePromiseCopy); + } +}; // Implements the `ctx.container` API for durable-object-attached containers. This API allows // the DO to supervise the attached container (lightweight virtual machine), including starting, @@ -150,6 +278,8 @@ class Container: public jsg::Object { jsg::Promise snapshotDirectory( jsg::Lock& js, DirectorySnapshotOptions options); jsg::Promise snapshotContainer(jsg::Lock& js, SnapshotOptions options); + jsg::Promise> exec( + jsg::Lock& js, kj::Array cmd, jsg::Optional options); // TODO(containers): listenTcp() @@ -165,6 +295,7 @@ class Container: public jsg::Object { JSG_METHOD(interceptOutboundHttp); JSG_METHOD(interceptAllOutboundHttp); if (flags.getWorkerdExperimental()) { + JSG_METHOD(exec); JSG_METHOD(interceptOutboundHttps); JSG_METHOD(snapshotDirectory); JSG_METHOD(snapshotContainer); @@ -190,7 +321,8 @@ class Container: public jsg::Object { }; #define EW_CONTAINER_ISOLATE_TYPES \ - api::Container, api::Container::DirectorySnapshot, api::Container::DirectorySnapshotOptions, \ + api::ExecOutput, api::ExecOptions, api::ExecProcess, api::Container, \ + api::Container::DirectorySnapshot, api::Container::DirectorySnapshotOptions, \ api::Container::DirectorySnapshotRestoreParams, api::Container::Snapshot, \ api::Container::SnapshotOptions, api::Container::StartupOptions diff --git a/src/workerd/io/container.capnp b/src/workerd/io/container.capnp index f8f9b5d7f06..020d52ee90f 100644 --- a/src/workerd/io/container.capnp +++ b/src/workerd/io/container.capnp @@ -110,6 +110,38 @@ interface Container @0x9aaceefc06523bca { # Optional human-friendly name. Empty string means not set. } + struct ExecOptions { + env @0 :List(Text); + # Environment variables to add/override for the exec'd process, in NAME=VALUE format. + + workingDirectory @1 :Text; + # Working directory for the exec'd process. Empty string means use the container default. + + user @2 :Text; + # User for the exec'd process. Empty string means use the container default. + + combinedOutput @3 :Bool; + # If true, stderr is combined into stdout. If stdout is not set, combined output is discarded. + } + + struct Process { + pid @0 :Int32; + handle @1 :ProcessHandle; + } + + interface ProcessHandle { + wait @0 () -> (exitCode :Int32); + # Waits for the process to exit and returns its exit code. + + stdin @1 () -> (stdin :ByteStream); + # Retrieves a ByteStream handle to write to the process's stdin. + # If not called before wait(), stdin automatically EOFs. + # Throws an error if called after wait() has been called. + + kill @2 (signo :UInt32); + # Sends the given signal to the process. + } + monitor @2 () -> (exitCode: Int32); # Waits for the container to shut down. # @@ -204,4 +236,11 @@ interface Container @0x9aaceefc06523bca { snapshotContainer @11 SnapshotContainerParams -> (snapshot :ContainerSnapshot); # Creates a full container snapshot for the running container. + + exec @12 (cmd :List(Text), stdout :ByteStream, stderr :ByteStream, params :ExecOptions) + -> (process :Process); + # Executes a short-lived process in the running container. + # + # If stdout/stderr are not provided, output is discarded. If params.combinedOutput is true, + # stderr is merged into stdout and the stderr capability is ignored. } From 97aaf220b570fd077d000a1e6ab3acada3f9b65c Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Sat, 28 Mar 2026 22:09:52 -0500 Subject: [PATCH 2/3] containers: Implement exec() local dev --- src/workerd/api/container.c++ | 28 +- src/workerd/api/container.h | 11 +- src/workerd/io/container.capnp | 2 +- src/workerd/server/container-client.c++ | 624 ++++++++++++++++++ src/workerd/server/container-client.h | 15 + src/workerd/server/docker-api.capnp | 33 + .../container-client/container-client.wd-test | 2 +- .../server/tests/container-client/test.js | 225 +++++++ .../experimental/index.d.ts | 23 + .../generated-snapshot/experimental/index.ts | 23 + 10 files changed, 968 insertions(+), 18 deletions(-) diff --git a/src/workerd/api/container.c++ b/src/workerd/api/container.c++ index a75e64ca9c0..1f57392272d 100644 --- a/src/workerd/api/container.c++ +++ b/src/workerd/api/container.c++ @@ -4,6 +4,18 @@ #include "container.h" +// macOS defines stdin/stdout/stderr as macros, which collide with +// member names used by ExecProcess / ExecOutput. +#ifdef stdin +#undef stdin +#endif +#ifdef stdout +#undef stdout +#endif +#ifdef stderr +#undef stderr +#endif + #include #include #include @@ -11,8 +23,8 @@ #include #include -#include #include +#include namespace workerd::api { @@ -149,13 +161,17 @@ jsg::MemoizedIdentity>& ExecProcess::getExitCode(jsg::Lock& js } jsg::Promise> ExecProcess::output(jsg::Lock& js) { + JSG_REQUIRE(!outputCalled, TypeError, "output() can only be called once."); + outputCalled = true; + auto stdoutPromise = js.resolvedPromise(emptyByteArray()); KJ_IF_SOME(stream, stdout) { JSG_REQUIRE(!stream->isDisturbed(), TypeError, "Cannot call output() after stdout has started being consumed."); - stdoutPromise = stream->getController() - .readAllBytes(js, kj::maxValue) - .then(js, [](jsg::Lock&, jsg::BufferSource bytes) { + stdoutPromise = + stream->getController() + .readAllBytes(js, IoContext::current().getLimitEnforcer().getBufferingLimit()) + .then(js, [](jsg::Lock&, jsg::BufferSource bytes) { return kj::heapArray(bytes.asArrayPtr()); }); } @@ -515,8 +531,8 @@ jsg::Promise> Container::exec( } // user sets "pipe"... they want to consume the API with the stdin WritableStream KJ_CASE_ONEOF(mode, kj::String) { - JSG_REQUIRE(mode == "pipe", TypeError, - "stdin must be a ReadableStream or the string \"pipe\"."); + JSG_REQUIRE( + mode == "pipe", TypeError, "stdin must be a ReadableStream or the string \"pipe\"."); auto sink = newSystemStream(kj::mv(stdinWriter), StreamEncoding::IDENTITY, ioContext); auto writable = js.alloc(ioContext, kj::mv(sink), ioContext.getMetrics().tryCreateWritableByteStreamObserver()); diff --git a/src/workerd/api/container.h b/src/workerd/api/container.h index 44cc98cb7c3..935011d07ee 100644 --- a/src/workerd/api/container.h +++ b/src/workerd/api/container.h @@ -12,16 +12,6 @@ #include #include -#ifdef stdin -#undef stdin -#endif -#ifdef stdout -#undef stdout -#endif -#ifdef stderr -#undef stderr -#endif - namespace workerd::api { class Fetcher; @@ -136,6 +126,7 @@ class ExecProcess: public jsg::Object { kj::Maybe>> exitCodePromise; kj::Maybe> exitCodePromiseCopy; kj::Maybe resolvedExitCode; + bool outputCalled = false; void visitForGc(jsg::GcVisitor& visitor) { visitor.visit(stdin, stdout, stderr, exitCodePromise, exitCodePromiseCopy); diff --git a/src/workerd/io/container.capnp b/src/workerd/io/container.capnp index 020d52ee90f..1e4edbc785f 100644 --- a/src/workerd/io/container.capnp +++ b/src/workerd/io/container.capnp @@ -136,7 +136,7 @@ interface Container @0x9aaceefc06523bca { stdin @1 () -> (stdin :ByteStream); # Retrieves a ByteStream handle to write to the process's stdin. # If not called before wait(), stdin automatically EOFs. - # Throws an error if called after wait() has been called. + # Throws an error if called after wait(). kill @2 (signo :UInt32); # Sends the given signal to the process. diff --git a/src/workerd/server/container-client.c++ b/src/workerd/server/container-client.c++ index 6d901aa8c00..70d013ba096 100644 --- a/src/workerd/server/container-client.c++ +++ b/src/workerd/server/container-client.c++ @@ -11,11 +11,24 @@ #include #include #include +#include #include #include #include +// macOS defines stdin/stdout/stderr as macros, which collide with +// capnp-generated method names on the ProcessHandle interface. +#ifdef stdin +#undef stdin +#endif +#ifdef stdout +#undef stdout +#endif +#ifdef stderr +#undef stderr +#endif + #include #include #include @@ -27,6 +40,8 @@ #include #include +#include + namespace workerd::server { namespace { @@ -100,6 +115,154 @@ kj::String parseSnapshotId(kj::StringPtr snapshotId) { } } +struct DockerStreamedResponse { + kj::uint statusCode; + kj::String statusText; + kj::Own connection; +}; + +// Really similar to BufferedInputStreamWrapper, but Async... +// We need this because of Docker's exec keeping a bidirectional connection +// needing to own the IoStream after writing and reading headers, as it does +// "Upgrade: tcp". +class BufferedAsyncIoStream final: public kj::AsyncIoStream { + public: + BufferedAsyncIoStream(kj::Own inner, kj::Array buffered) + : inner(kj::mv(inner)), + buffered(kj::mv(buffered)) {} + + kj::Promise tryRead(void* dst, size_t minBytes, size_t maxBytes) override { + KJ_REQUIRE(minBytes <= maxBytes, minBytes, maxBytes); + + auto out = kj::arrayPtr(reinterpret_cast(dst), maxBytes); + size_t copied = 0; + + auto bufferedRemaining = buffered.size() - bufferedOffset; + if (bufferedRemaining > 0) { + auto toCopy = kj::min(maxBytes, bufferedRemaining); + out.first(toCopy).copyFrom(buffered.asPtr().slice(bufferedOffset, bufferedOffset + toCopy)); + bufferedOffset += toCopy; + copied = toCopy; + + if (copied >= minBytes || copied == maxBytes) { + co_return copied; + } + } + + auto read = co_await inner->tryRead(out.begin() + copied, minBytes - copied, maxBytes - copied); + co_return copied + read; + } + + kj::Maybe tryGetLength() override { + KJ_IF_SOME(innerLength, inner->tryGetLength()) { + return innerLength + (buffered.size() - bufferedOffset); + } + return kj::none; + } + + kj::Promise pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override { + uint64_t pumped = 0; + auto bufferedRemaining = buffered.size() - bufferedOffset; + if (bufferedRemaining > 0) { + auto toWrite = static_cast(kj::min(amount, static_cast(bufferedRemaining))); + co_await output.write(buffered.asPtr().slice(bufferedOffset, bufferedOffset + toWrite)); + bufferedOffset += toWrite; + pumped += toWrite; + + if (pumped == amount) { + co_return pumped; + } + } + + co_return pumped + co_await inner->pumpTo(output, amount - pumped); + } + + kj::Promise write(kj::ArrayPtr buffer) override { + return inner->write(buffer); + } + kj::Promise write(kj::ArrayPtr> pieces) override { + return inner->write(pieces); + } + kj::Maybe> tryPumpFrom( + kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override { + return inner->tryPumpFrom(input, amount); + } + kj::Promise whenWriteDisconnected() override { + return inner->whenWriteDisconnected(); + } + void abortWrite(kj::Exception&& exception) override { + inner->abortWrite(kj::mv(exception)); + } + + void shutdownWrite() override { + inner->shutdownWrite(); + } + void abortRead() override { + inner->abortRead(); + } + void getsockopt(int level, int option, void* value, kj::uint* length) override { + inner->getsockopt(level, option, value, length); + } + void setsockopt(int level, int option, const void* value, kj::uint length) override { + inner->setsockopt(level, option, value, length); + } + void getsockname(struct sockaddr* addr, kj::uint* length) override { + inner->getsockname(addr, length); + } + void getpeername(struct sockaddr* addr, kj::uint* length) override { + inner->getpeername(addr, length); + } + kj::Maybe getFd() const override { + return inner->getFd(); + } + + private: + kj::Own inner; + kj::Array buffered; + size_t bufferedOffset = 0; +}; + +// Docker exec uses a single hijacked stream for stdin and stdout/stderr. Keep that stream in a +// small refcounted holder so the returned stdin ByteStream and the output demux task can share it. +class SharedExecConnection final: public kj::Refcounted { + public: + explicit SharedExecConnection(kj::Own connection) + : connection(kj::mv(connection)) {} + + kj::Own connection; + bool stdinOpened = false; + bool stdinClosed = false; +}; + +class DockerExecStdinStream final: public capnp::ExplicitEndOutputStream { + public: + explicit DockerExecStdinStream(kj::Own sharedConnection) + : sharedConnection(kj::mv(sharedConnection)) {} + + kj::Promise write(kj::ArrayPtr buffer) override { + return sharedConnection->connection->write(buffer); + } + + kj::Promise write(kj::ArrayPtr> pieces) override { + return sharedConnection->connection->write(pieces); + } + + kj::Promise whenWriteDisconnected() override { + return sharedConnection->connection->whenWriteDisconnected(); + } + + kj::Promise end() override { + if (!sharedConnection->stdinClosed) { + sharedConnection->connection->shutdownWrite(); + sharedConnection->stdinClosed = true; + } + return kj::READY_NOW; + } + + private: + kj::Own sharedConnection; +}; + // Strips a port suffix from a string, returning the host and port separately. // For IPv6, expects brackets: "[::1]:8080" -> ("::1", 8080) // For IPv4: "10.0.0.1:8080" -> ("10.0.0.1", 8080) @@ -468,6 +631,188 @@ kj::Promise removeContainer( } } +kj::Maybe tryFindHttpHeaderEnd(kj::ArrayPtr bytes) { + for (auto i: kj::zeroTo(bytes.size())) { + if (i + 4 > bytes.size()) { + return kj::none; + } + if (bytes[i] == '\r' && bytes[i + 1] == '\n' && bytes[i + 2] == '\r' && bytes[i + 3] == '\n') { + return i; + } + } + return kj::none; +} + +// readDockerStreamedResponse is necessary because Docker streamed responses +// require an open bidirectional stream after the response headers have been read. +kj::Promise readDockerStreamedResponse( + kj::Own connection) { + kj::Vector buffer; + auto& input = *connection; + + while (true) { + KJ_IF_SOME(headerEnd, tryFindHttpHeaderEnd(buffer.asPtr())) { + auto parsedHeaders = kj::heapArray(headerEnd + 2); + for (auto i: kj::zeroTo(parsedHeaders.size())) { + parsedHeaders[i] = static_cast(buffer[i]); + } + + kj::HttpHeaderTable headerTable; + kj::HttpHeaders headers(headerTable); + auto parsedResponse = headers.tryParseResponse(parsedHeaders.asPtr()); + headers.takeOwnership(kj::mv(parsedHeaders)); + + kj::uint statusCode = 0; + kj::String statusText; + KJ_SWITCH_ONEOF(parsedResponse) { + KJ_CASE_ONEOF(response, kj::HttpHeaders::Response) { + statusCode = response.statusCode; + statusText = kj::str(response.statusText); + } + KJ_CASE_ONEOF(protocolError, kj::HttpHeaders::ProtocolError) { + KJ_FAIL_REQUIRE("Docker streamed response returned malformed HTTP headers: ", + protocolError.statusMessage, ": ", protocolError.description); + } + } + + auto bodyOffset = headerEnd + 4; + auto prefetchedBytes = kj::heapArray(buffer.asPtr().slice(bodyOffset)); + kj::Own prefixedConnection = kj::mv(connection); + if (prefetchedBytes.size() > 0) { + prefixedConnection = + kj::heap(kj::mv(prefixedConnection), kj::mv(prefetchedBytes)); + } + co_return DockerStreamedResponse{ + .statusCode = statusCode, + .statusText = kj::mv(statusText), + .connection = kj::mv(prefixedConnection), + }; + } + + auto scratch = kj::heapArray(4096); + auto amount = co_await input.tryRead(scratch.begin(), 1, scratch.size()); + KJ_REQUIRE(amount > 0, "EOF while waiting for Docker streamed response headers"); + buffer.addAll(scratch.first(amount)); + KJ_REQUIRE(buffer.size() <= 65536, "Docker streamed response headers exceeded 64KiB"); + } +} + +kj::Promise dockerApiStreamedRequest(kj::Network& network, + kj::String dockerPath, + kj::HttpMethod method, + kj::String endpoint, + const kj::HttpHeaders& headers, + kj::Maybe> body = kj::none) { + auto address = co_await network.parseAddress(dockerPath); + auto connection = co_await address->connect(); + + auto requestHeaders = headers.serializeRequest(method, endpoint); + KJ_IF_SOME(requestBody, body) { + kj::ArrayPtr pieces[] = {requestHeaders.asBytes(), requestBody}; + co_await connection->write(kj::arrayPtr(pieces)); + } else { + co_await connection->write(requestHeaders.asBytes()); + } + + co_return co_await readDockerStreamedResponse(kj::mv(connection)); +} + +uint32_t parseDockerFrameLength(kj::ArrayPtr frameHeader) { + KJ_REQUIRE(frameHeader.size() >= 8, "Docker raw stream header too short"); + return (static_cast(frameHeader[4]) << 24) | + (static_cast(frameHeader[5]) << 16) | (static_cast(frameHeader[6]) << 8) | + static_cast(frameHeader[7]); +} + +void detachEnd(kj::Maybe> stream) { + KJ_IF_SOME(s, stream) { + s->end().attach(kj::mv(s)).detach([](kj::Exception&&) {}); + } +} + +// demuxDockerExecOutput demuxes the input from Docker to passed stdout/stderr. +kj::Promise demuxDockerExecOutput(kj::AsyncInputStream& input, + kj::Maybe> stdout, + kj::Maybe> stderr, + bool combinedOutput) { + kj::Vector buffer; + size_t offset = 0; + + auto compactBuffer = [&]() { + if (offset == 0) { + return; + } + + kj::Vector compacted; + compacted.addAll(buffer.asPtr().slice(offset)); + buffer = kj::mv(compacted); + offset = 0; + }; + + auto ensureBytes = [&](size_t count) -> kj::Promise { + while (buffer.size() - offset < count) { + compactBuffer(); + auto scratch = kj::heapArray(4096); + auto amount = co_await input.tryRead(scratch.begin(), 1, scratch.size()); + if (amount == 0) { + co_return false; + } + buffer.addAll(scratch.first(amount)); + } + co_return true; + }; + + try { + while (co_await ensureBytes(8)) { + auto frameHeader = buffer.asPtr().slice(offset, offset + 8); + auto streamId = frameHeader[0]; + auto frameLength = parseDockerFrameLength(frameHeader); + KJ_REQUIRE(co_await ensureBytes(8 + frameLength), + "Docker exec raw stream ended in the middle of a frame"); + + auto payload = buffer.asPtr().slice(offset + 8, offset + 8 + frameLength); + if (streamId == 1) { + KJ_IF_SOME(out, stdout) { + co_await out->write(payload); + } + } else { + if (streamId == 2) { + if (combinedOutput) { + KJ_IF_SOME(out, stdout) { + co_await out->write(payload); + } + } else { + KJ_IF_SOME(err, stderr) { + co_await err->write(payload); + } + } + } + } + + offset += 8 + frameLength; + } + + if (buffer.size() != offset) { + KJ_FAIL_REQUIRE("Docker exec raw stream ended with a truncated frame header"); + } + + // We need to detach ourselves from the end() as the user might've + // decided to not read them altogether. + detachEnd(kj::mv(stdout)); + detachEnd(kj::mv(stderr)); + } catch (...) { + auto exception = kj::getCaughtExceptionAsKj(); + + KJ_IF_SOME(out, stdout) { + out->abortWrite(kj::cp(exception)); + } + KJ_IF_SOME(err, stderr) { + err->abortWrite(kj::cp(exception)); + } + kj::throwFatalException(kj::mv(exception)); + } +} + kj::String currentSnapshotVolumeTimestamp() { return kj::str((kj::systemPreciseCalendarClock().now() - kj::UNIX_EPOCH) / kj::SECONDS); } @@ -705,6 +1050,93 @@ class ContainerClient::DockerPort final: public rpc::Container::Port::Server { kj::Maybe> pumpTask; }; +class ContainerClient::DockerProcessHandle final: public rpc::Container::ProcessHandle::Server { + public: + DockerProcessHandle(ContainerClient& containerClient, + kj::String execId, + kj::Own connection, + kj::Maybe stdout, + kj::Maybe stderr, + bool combinedOutput) + : containerClient(containerClient.addRef()), + execId(kj::mv(execId)), + sharedConnection(kj::refcounted(kj::mv(connection))) { + kj::Maybe> stdoutStream = kj::none; + KJ_IF_SOME(out, stdout) { + stdoutStream = this->containerClient->byteStreamFactory.capnpToKjExplicitEnd(out); + } else { + stdoutStream = capnp::ExplicitEndOutputStream::wrap(newNullOutputStream(), []() {}); + } + + kj::Maybe> stderrStream = kj::none; + KJ_IF_SOME(err, stderr) { + stderrStream = this->containerClient->byteStreamFactory.capnpToKjExplicitEnd(err); + } else if (!combinedOutput) { + stderrStream = capnp::ExplicitEndOutputStream::wrap(newNullOutputStream(), []() {}); + } + + // Always drain the Docker exec stream. This lets wait() use stream closure as the primary + // process-completion signal, even when stdout/stderr are ignored. + auto task = demuxDockerExecOutput( + *sharedConnection->connection, kj::mv(stdoutStream), kj::mv(stderrStream), combinedOutput) + .attach(this->containerClient->addRef(), kj::addRef(*sharedConnection)); + streamClosedTask = kj::mv(task).fork(); + } + + kj::Promise wait(WaitContext context) override { + waitStarted = true; + if (!sharedConnection->stdinOpened && !sharedConnection->stdinClosed) { + sharedConnection->connection->shutdownWrite(); + sharedConnection->stdinClosed = true; + } + + co_await KJ_ASSERT_NONNULL(streamClosedTask).addBranch(); + + // Docker's exec-inspect state can lag slightly behind the hijacked stream closing, so after + // we observe EOF we allow a short bounded retry window to obtain the final exit code. + for (auto attempt: kj::zeroTo(20)) { + auto inspect = co_await containerClient->inspectExec(execId); + if (!inspect.running) { + context.getResults().setExitCode(inspect.exitCode); + co_return; + } + + if (attempt + 1 < 20) { + co_await containerClient->timer.afterDelay(50 * kj::MILLISECONDS); + } + } + + JSG_FAIL_REQUIRE(Error, "Docker exec stream closed before exit status became available."); + } + + kj::Promise stdin(StdinContext context) override { + JSG_REQUIRE(!waitStarted, Error, "Process stdin() cannot be called after wait()."); + JSG_REQUIRE(!sharedConnection->stdinOpened, Error, "Process stdin() can only be called once."); + + sharedConnection->stdinOpened = true; + context.getResults().setStdin(containerClient->byteStreamFactory.kjToCapnp( + kj::heap(kj::addRef(*sharedConnection)))); + co_return; + } + + kj::Promise kill(KillContext context) override { + auto inspect = co_await containerClient->inspectExec(execId); + JSG_REQUIRE(inspect.pid > 0, Error, "Exec process does not have a visible pid to signal."); + + auto signal = kj::str("-", signalToString(context.getParams().getSigno())); + auto pid = kj::str(inspect.pid); + auto cmd = kj::arr(kj::str("kill"), kj::mv(signal), kj::mv(pid)); + co_await containerClient->runSimpleExec(cmd.asPtr()); + } + + private: + kj::Own containerClient; + kj::String execId; + kj::Own sharedConnection; + bool waitStarted = false; + kj::Maybe> streamClosedTask; +}; + // HTTP service that handles HTTP CONNECT requests from the container sidecar (proxy-everything). // When the sidecar intercepts container egress traffic, it sends HTTP CONNECT to this service. // After accepting the CONNECT, the tunnel carries the actual HTTP request from the container, @@ -1235,6 +1667,152 @@ kj::Promise ContainerClient::createContainer(kj::StringPtr effectiveImage, } } +kj::Promise ContainerClient::createExec(capnp::List::Reader cmd, + rpc::Container::ExecOptions::Reader params, + bool attachStdout, + bool attachStderr) { + capnp::JsonCodec codec; + codec.handleByAnnotation(); + + capnp::MallocMessageBuilder message; + auto request = message.initRoot(); + request.setAttachStdin(true); + request.setAttachStdout(attachStdout); + request.setAttachStderr(attachStderr); + request.setTty(false); + + auto jsonCmd = request.initCmd(cmd.size()); + for (auto i: kj::zeroTo(cmd.size())) { + jsonCmd.set(i, cmd[i]); + } + + if (params.hasEnv()) { + auto env = params.getEnv(); + auto jsonEnv = request.initEnv(env.size()); + for (auto i: kj::zeroTo(env.size())) { + jsonEnv.set(i, env[i]); + } + } + + if (params.hasWorkingDirectory()) { + request.setWorkingDir(params.getWorkingDirectory()); + } + + if (params.hasUser()) { + request.setUser(params.getUser()); + } + + auto response = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, + kj::str("/containers/", containerName, "/exec"), codec.encode(request)); + JSG_REQUIRE(response.statusCode == 201, Error, "Creating Docker exec failed with [", + response.statusCode, "] ", response.body); + + auto parsed = decodeJsonResponse(response.body); + co_return kj::str(parsed->getRoot().getId()); +} + +kj::Promise> ContainerClient::startExec(kj::String execId) { + capnp::JsonCodec codec; + codec.handleByAnnotation(); + + capnp::MallocMessageBuilder message; + auto requestBody = message.initRoot(); + requestBody.setDetach(false); + requestBody.setTty(false); + auto encodedBody = codec.encode(requestBody); + + // Exec attach uses HTTP connection hijacking. A plain POST can succeed with 200 OK but then not + // behave like the raw stream Docker's CLI expects, so we must request the upgrade explicitly. + kj::HttpHeaderTable headerTable; + kj::HttpHeaders headers(headerTable); + headers.setPtr(kj::HttpHeaderId::HOST, "localhost"); + headers.setPtr(kj::HttpHeaderId::CONNECTION, "Upgrade"); + // ... Why not CONNECT or WebSockets, Docker? + headers.setPtr(kj::HttpHeaderId::UPGRADE, "tcp"); + headers.setPtr(kj::HttpHeaderId::CONTENT_TYPE, "application/json"); + headers.set(kj::HttpHeaderId::CONTENT_LENGTH, kj::str(encodedBody.size())); + kj::ArrayPtr encodedBodyBytes = encodedBody.asBytes(); + + auto response = co_await dockerApiStreamedRequest(network, kj::str(dockerPath), + kj::HttpMethod::POST, kj::str("/exec/", execId, "/start"), headers, encodedBodyBytes); + if (response.statusCode != 101) { + auto errorBodyBytes = co_await response.connection->readAllBytes(MAX_JSON_RESPONSE_SIZE); + auto errorBody = kj::str(errorBodyBytes.asChars()); + JSG_FAIL_REQUIRE(Error, "Starting Docker exec failed with [", response.statusCode, "] ", + response.statusText, " ", errorBody); + } + + co_return kj::mv(response.connection); +} + +kj::Promise ContainerClient::inspectExec( + kj::StringPtr execId) { + auto response = co_await dockerApiRequest( + network, kj::str(dockerPath), kj::HttpMethod::GET, kj::str("/exec/", execId, "/json")); + JSG_REQUIRE(response.statusCode == 200, Error, "Inspecting Docker exec failed with [", + response.statusCode, "] ", response.body); + + auto parsed = decodeJsonResponse(response.body); + auto root = parsed->getRoot(); + auto exitCodeValue = root.getExitCode(); + auto exitCode = exitCodeValue.isNumber() ? static_cast(exitCodeValue.getNumber()) : 0; + co_return ExecInspectResponse{ + .exitCode = exitCode, + .running = root.getRunning(), + .pid = root.getPid(), + }; +} + +kj::Promise ContainerClient::runSimpleExec(kj::ArrayPtr cmd) { + capnp::JsonCodec codec; + codec.handleByAnnotation(); + + capnp::MallocMessageBuilder createMessage; + auto createRequest = createMessage.initRoot(); + createRequest.setAttachStdin(false); + createRequest.setAttachStdout(false); + createRequest.setAttachStderr(false); + createRequest.setTty(false); + + auto jsonCmd = createRequest.initCmd(cmd.size()); + for (auto i: kj::indices(cmd)) { + jsonCmd.set(i, cmd[i]); + } + + auto createResponse = + co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, + kj::str("/containers/", containerName, "/exec"), codec.encode(createRequest)); + JSG_REQUIRE(createResponse.statusCode == 201, Error, "Creating helper Docker exec failed with [", + createResponse.statusCode, "] ", createResponse.body); + + auto parsedCreate = + decodeJsonResponse(createResponse.body); + auto execId = kj::str(parsedCreate->getRoot().getId()); + + capnp::JsonCodec startCodec; + startCodec.handleByAnnotation(); + + capnp::MallocMessageBuilder startMessage; + auto startRequest = startMessage.initRoot(); + startRequest.setDetach(true); + startRequest.setTty(false); + + auto startResponse = co_await dockerApiRequest(network, kj::str(dockerPath), kj::HttpMethod::POST, + kj::str("/exec/", execId, "/start"), startCodec.encode(startRequest)); + JSG_REQUIRE(startResponse.statusCode == 200, Error, "Starting helper Docker exec failed with [", + startResponse.statusCode, "] ", startResponse.body); + + while (true) { + auto inspect = co_await inspectExec(execId); + if (!inspect.running) { + JSG_REQUIRE(inspect.exitCode == 0, Error, "Helper Docker exec failed with exit code ", + inspect.exitCode); + co_return; + } + co_await timer.afterDelay(50 * kj::MILLISECONDS); + } +} + kj::Promise ContainerClient::startContainer() { auto endpoint = kj::str("/containers/", containerName, "/start"); // We have to send an empty body since docker API will throw an error if we don't. @@ -1668,6 +2246,52 @@ kj::Promise ContainerClient::signal(SignalContext context) { co_await killContainer(params.getSigno()); } +kj::Promise ContainerClient::exec(ExecContext context) { + auto [ready, done] = getRpcTurn(); + co_await ready; + KJ_DEFER(done->fulfill()); + + JSG_REQUIRE(containerStarted.load(std::memory_order_acquire), Error, + "exec() requires a running container."); + + auto request = context.getParams(); + auto execParams = request.getParams(); + // Always attach stdout/stderr to Docker so the hijacked stream lifetime continues to track the + // process even when the JS API requested "ignore". We discard ignored output locally. + bool attachStdout = true; + bool attachStderr = true; + + auto execId = co_await createExec(request.getCmd(), execParams, attachStdout, attachStderr); + kj::Own execConnection = co_await startExec(kj::str(execId)); + kj::Maybe stdout = kj::none; + if (request.hasStdout()) { + stdout = request.getStdout(); + } + + kj::Maybe stderr = kj::none; + if (request.hasStderr()) { + stderr = request.getStderr(); + } + + // Retrying is not great, however Docker's inspectExec might return running = false + // before it has fully spawned the process (as startExec() returns before + // even docker has spawned the process...) + ExecInspectResponse inspect{.exitCode = 0, .running = false, .pid = 0}; + for (auto attempt: kj::zeroTo(20)) { + inspect = co_await inspectExec(execId); + if (inspect.pid != 0 || !inspect.running || attempt + 1 == 20) { + break; + } + + co_await timer.afterDelay(50 * kj::MILLISECONDS); + } + + auto process = context.getResults().initProcess(); + process.setPid(static_cast(inspect.pid)); + process.setHandle(kj::heap(*this, kj::mv(execId), kj::mv(execConnection), + kj::mv(stdout), kj::mv(stderr), execParams.getCombinedOutput())); +} + kj::Promise ContainerClient::setInactivityTimeout(SetInactivityTimeoutContext context) { auto [ready, done] = getRpcTurn(); co_await ready; diff --git a/src/workerd/server/container-client.h b/src/workerd/server/container-client.h index bfd26438985..1687542a1ad 100644 --- a/src/workerd/server/container-client.h +++ b/src/workerd/server/container-client.h @@ -71,6 +71,7 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::Promise monitor(MonitorContext context) override; kj::Promise destroy(DestroyContext context) override; kj::Promise signal(SignalContext context) override; + kj::Promise exec(ExecContext context) override; kj::Promise getTcpPort(GetTcpPortContext context) override; kj::Promise listenTcp(ListenTcpContext context) override; kj::Promise setInactivityTimeout(SetInactivityTimeoutContext context) override; @@ -110,6 +111,7 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte // Docker-specific Port implementation class DockerPort; + class DockerProcessHandle; // EgressHttpService handles CONNECT requests from proxy-anything sidecar friend class EgressHttpService; @@ -138,6 +140,12 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte uint64_t size; }; + struct ExecInspectResponse { + int32_t exitCode; + bool running; + uint32_t pid; + }; + kj::Promise inspectContainer(); kj::Promise updateSidecarEgressPort(uint16_t ingressHostPort, uint16_t egressPort); @@ -147,6 +155,13 @@ class ContainerClient final: public rpc::Container::Server, public kj::Refcounte kj::Maybe::Reader> environment, kj::ArrayPtr restoreMounts, rpc::Container::StartParams::Reader params); + kj::Promise createExec(capnp::List::Reader cmd, + rpc::Container::ExecOptions::Reader params, + bool attachStdout, + bool attachStderr); + kj::Promise> startExec(kj::String execId); + kj::Promise inspectExec(kj::StringPtr execId); + kj::Promise runSimpleExec(kj::ArrayPtr cmd); kj::Promise startContainer(); kj::Promise stopContainer(); kj::Promise killContainer(uint32_t signal); diff --git a/src/workerd/server/docker-api.capnp b/src/workerd/server/docker-api.capnp index ab2a9b9366b..9603f7b0639 100644 --- a/src/workerd/server/docker-api.capnp +++ b/src/workerd/server/docker-api.capnp @@ -284,6 +284,39 @@ struct Docker { size @1 :UInt64 $Json.name("Size"); } + struct ExecCreateRequest { + attachStdin @0 :Bool = false $Json.name("AttachStdin"); + attachStdout @1 :Bool = false $Json.name("AttachStdout"); + attachStderr @2 :Bool = false $Json.name("AttachStderr"); + tty @3 :Bool = false $Json.name("Tty"); + cmd @4 :List(Text) $Json.name("Cmd"); + env @5 :List(Text) $Json.name("Env"); + workingDir @6 :Text $Json.name("WorkingDir"); + user @7 :Text $Json.name("User"); + } + + struct ExecCreateResponse { + id @0 :Text $Json.name("Id"); + } + + struct ExecStartRequest { + detach @0 :Bool = false $Json.name("Detach"); + tty @1 :Bool = false $Json.name("Tty"); + } + + struct ExecInspectResponse { + canRemove @0 :Bool $Json.name("CanRemove"); + detachKeys @1 :Text $Json.name("DetachKeys"); + exitCode @2 :Json.Value $Json.name("ExitCode"); + id @3 :Text $Json.name("ID"); + openStderr @4 :Bool $Json.name("OpenStderr"); + openStdin @5 :Bool $Json.name("OpenStdin"); + openStdout @6 :Bool $Json.name("OpenStdout"); + processConfig @7 :Json.Value $Json.name("ProcessConfig"); + running @8 :Bool $Json.name("Running"); + pid @9 :UInt32 $Json.name("Pid"); + } + struct Command { struct ContainerCreate { struct Params { diff --git a/src/workerd/server/tests/container-client/container-client.wd-test b/src/workerd/server/tests/container-client/container-client.wd-test index 750abf32ba0..776d6bc7841 100644 --- a/src/workerd/server/tests/container-client/container-client.wd-test +++ b/src/workerd/server/tests/container-client/container-client.wd-test @@ -8,7 +8,7 @@ const unitTests :Workerd.Config = ( modules = [ (name = "worker", esModule = embed "test.js") ], - compatibilityFlags = ["enable_ctx_exports", "nodejs_compat", "experimental", "containers_pid_namespace"], + compatibilityFlags = ["enable_ctx_exports", "nodejs_compat", "experimental", "containers_pid_namespace", "streams_enable_constructors"], containerEngine = (localDocker = (socketPath = "unix:/var/run/docker.sock", containerEgressInterceptorImage = "cloudflare/proxy-everything:main")), durableObjectNamespaces = [ ( className = "DurableObjectExample", diff --git a/src/workerd/server/tests/container-client/test.js b/src/workerd/server/tests/container-client/test.js index 1c0963865ff..e2dbc92273d 100644 --- a/src/workerd/server/tests/container-client/test.js +++ b/src/workerd/server/tests/container-client/test.js @@ -96,6 +96,220 @@ export class DurableObjectExample extends DurableObject { assert.strictEqual(container.running, false); } + async testExec() { + const container = this.ctx.container; + if (container.running) { + const monitor = container.monitor().catch((_err) => {}); + await container.destroy(); + await monitor; + } + + assert.strictEqual(container.running, false); + + container.start({ + env: { EXEC_BASE: 'from-start' }, + enableInternet: true, + }); + + const monitor = container.monitor().catch((_err) => {}); + const textEncoder = new TextEncoder(); + const textDecoder = new TextDecoder(); + const decode = (buffer) => textDecoder.decode(buffer); + const countStreamBytes = async (stream) => { + assert.ok(stream); + + const reader = stream.getReader(); + let total = 0; + try { + for (;;) { + const { value, done } = await reader.read(); + if (done) { + return total; + } + + total += value.byteLength; + } + } finally { + reader.releaseLock(); + } + }; + + await this.waitUntilContainerIsHealthy(); + + // 1. Read stdout directly as a stream. + { + const proc = await container.exec(['cat', '/etc/hostname']); + assert.ok(proc.pid > 0); + const stdout = await new Response(proc.stdout).text(); + assert.ok(stdout.trim().length > 0); + assert.strictEqual(await proc.exitCode, 0); + } + + // 2. Create a file from a ReadableStream stdin using tee. + { + const content = '{"hello":"world","kind":"stream"}\n'; + const proc = await container.exec(['tee', '/tmp/exec-stream.json'], { + stdin: new ReadableStream({ + start(controller) { + controller.enqueue(textEncoder.encode(content)); + controller.close(); + }, + }), + stdout: 'ignore', + }); + assert.strictEqual(await proc.exitCode, 0); + + const verify = await ( + await container.exec(['cat', '/tmp/exec-stream.json']) + ).output(); + + assert.strictEqual(decode(verify.stdout), content); + assert.strictEqual(verify.exitCode, 0); + } + + // 3. Feed stdin interactively through the exposed WritableStream. + { + const proc = await container.exec( + ['sh', '-lc', 'cat > /tmp/exec-pipe.txt'], + { + stdin: 'pipe', + stdout: 'ignore', + } + ); + assert.ok(proc.stdin); + + const writer = proc.stdin.getWriter(); + await writer.write(textEncoder.encode('alpha\n')); + await writer.write(textEncoder.encode('beta\n')); + await writer.close(); + + assert.strictEqual(await proc.exitCode, 0); + + const verify = await ( + await container.exec(['cat', '/tmp/exec-pipe.txt']) + ).output(); + assert.strictEqual(decode(verify.stdout), 'alpha\nbeta\n'); + } + + // 4. Override working directory for commands that rely on relative paths. + { + const proc = await container.exec(['pwd'], { cwd: '/tmp' }); + const output = await proc.output(); + assert.strictEqual(decode(output.stdout).trim(), '/tmp'); + assert.strictEqual(output.exitCode, 0); + } + + // 5. Merge container env with per-exec overrides. + { + const proc = await container.exec( + ['sh', '-lc', 'printf "%s|%s" "$EXEC_BASE" "$EXEC_EXTRA"'], + { + env: { + EXEC_BASE: 'overridden', + EXEC_EXTRA: 'per-exec', + }, + } + ); + const output = await proc.output(); + assert.strictEqual(decode(output.stdout), 'overridden|per-exec'); + assert.strictEqual(output.exitCode, 0); + } + + // 6. Capture stdout and stderr separately. + { + const proc = await container.exec([ + 'sh', + '-lc', + 'printf "out"; printf "err" >&2', + ]); + const output = await proc.output(); + assert.strictEqual(decode(output.stdout), 'out'); + assert.strictEqual(decode(output.stderr), 'err'); + assert.strictEqual(output.exitCode, 0); + } + + // 7. Combine stderr into stdout for shell-style command output. + { + const proc = await container.exec( + ['sh', '-lc', 'printf "out"; printf "err" >&2'], + { stderr: 'combined' } + ); + const output = await proc.output(); + const stdout = decode(output.stdout); + if (stdout !== 'outerr') { + assert.strictEqual(decode(output.stdout), 'errout'); + } + + assert.strictEqual(decode(output.stderr), ''); + assert.strictEqual(output.exitCode, 0); + } + + // 8. Ignore stdout when only success/failure matters. + { + const proc = await container.exec(['sh', '-lc', 'printf "ignore-me"'], { + stdout: 'ignore', + }); + const output = await proc.output(); + assert.strictEqual(decode(output.stdout), ''); + assert.strictEqual(decode(output.stderr), ''); + assert.strictEqual(output.exitCode, 0); + } + + // 9. Preserve stderr and non-zero exit codes for failures. + { + const proc = await container.exec([ + 'sh', + '-lc', + 'printf "boom" >&2; exit 7', + ]); + const output = await proc.output(); + assert.strictEqual(decode(output.stdout), ''); + assert.strictEqual(decode(output.stderr), 'boom'); + assert.strictEqual(output.exitCode, 7); + } + + // 10. Stream-consume large stdout and stderr payloads concurrently without buffering them in + // JS memory. + { + const expectedBytes = 64 * 1024 * 1024; + const proc = await container.exec([ + 'sh', + '-lc', + `head -c ${expectedBytes} /dev/zero & head -c ${expectedBytes} /dev/zero >&2 & wait`, + ]); + + const [stdoutBytes, stderrBytes, exitCode] = await Promise.all([ + countStreamBytes(proc.stdout), + countStreamBytes(proc.stderr), + proc.exitCode, + ]); + + assert.strictEqual(stdoutBytes, expectedBytes); + assert.strictEqual(stderrBytes, expectedBytes); + assert.strictEqual(exitCode, 0); + } + + // 11. Check we throw an error when calling output() after reading from stdout + { + const proc = await container.exec(['echo', 'hello']); + await proc.stdout.getReader().read(); + assert.rejects(() => proc.output(), { + name: 'TypeError', + message: + 'Cannot call output() after stdout has started being consumed.', + }); + } + + // 12. Make sure Stdin EOF's by default if not set + { + await container.exec(['cat']).then((p) => p.output()); + } + + await container.destroy(); + await monitor; + assert.strictEqual(container.running, false); + } + async testSetInactivityTimeout(timeout) { const container = this.ctx.container; if (container.running) { @@ -2072,6 +2286,17 @@ export const testBasics = { }, }; +// Test a variety of common exec() workflows. +export const testExec = { + async test(_ctrl, env) { + const id = env.MY_CONTAINER.idFromName( + getRandomDurableObjectName('testExec') + ); + const stub = env.MY_CONTAINER.get(id); + await stub.testExec(); + }, +}; + // Test exit code monitor functionality export const testExitCode = { async test(_ctrl, env) { diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index 72a3d8f7bc6..b6aa2087c43 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -3933,6 +3933,28 @@ interface EventSourceEventSourceInit { withCredentials?: boolean; fetcher?: Fetcher; } +interface ExecOutput { + readonly stdout: ArrayBuffer; + readonly stderr: ArrayBuffer; + readonly exitCode: number; +} +interface ContainerExecOptions { + stdin?: ReadableStream | "pipe"; + stdout?: "pipe" | "ignore"; + stderr?: "pipe" | "ignore" | "combined"; + cwd?: string; + env?: Record; + user?: string; +} +interface ExecProcess { + readonly stdin: WritableStream | null; + readonly stdout: ReadableStream | null; + readonly stderr: ReadableStream | null; + readonly pid: number; + readonly exitCode: Promise; + output(): Promise; + kill(signal?: number): void; +} interface Container { get running(): boolean; start(options?: ContainerStartupOptions): void; @@ -3943,6 +3965,7 @@ interface Container { setInactivityTimeout(durationMs: number | bigint): Promise; interceptOutboundHttp(addr: string, binding: Fetcher): Promise; interceptAllOutboundHttp(binding: Fetcher): Promise; + exec(cmd: string[], options?: ContainerExecOptions): Promise; interceptOutboundHttps(addr: string, binding: Fetcher): Promise; snapshotDirectory( options: ContainerDirectorySnapshotOptions, diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index 1ba1e4f325b..43d0fa2d97d 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -3939,6 +3939,28 @@ export interface EventSourceEventSourceInit { withCredentials?: boolean; fetcher?: Fetcher; } +export interface ExecOutput { + readonly stdout: ArrayBuffer; + readonly stderr: ArrayBuffer; + readonly exitCode: number; +} +export interface ContainerExecOptions { + stdin?: ReadableStream | "pipe"; + stdout?: "pipe" | "ignore"; + stderr?: "pipe" | "ignore" | "combined"; + cwd?: string; + env?: Record; + user?: string; +} +export interface ExecProcess { + readonly stdin: WritableStream | null; + readonly stdout: ReadableStream | null; + readonly stderr: ReadableStream | null; + readonly pid: number; + readonly exitCode: Promise; + output(): Promise; + kill(signal?: number): void; +} export interface Container { get running(): boolean; start(options?: ContainerStartupOptions): void; @@ -3949,6 +3971,7 @@ export interface Container { setInactivityTimeout(durationMs: number | bigint): Promise; interceptOutboundHttp(addr: string, binding: Fetcher): Promise; interceptAllOutboundHttp(binding: Fetcher): Promise; + exec(cmd: string[], options?: ContainerExecOptions): Promise; interceptOutboundHttps(addr: string, binding: Fetcher): Promise; snapshotDirectory( options: ContainerDirectorySnapshotOptions, From e8718071997d958a1b2363aa4c9c0e12a6e2aac4 Mon Sep 17 00:00:00 2001 From: Gabi Villalonga Simon Date: Mon, 30 Mar 2026 17:50:22 -0500 Subject: [PATCH 3/3] containers: Use $ to prefix the std properties to avoid collision with stdio We also have to rename to (stdout|stderr|stdin)Writer all capnp variables and methods --- src/workerd/api/container.c++ | 65 ++++++++----------- src/workerd/api/container.h | 43 ++++++------ src/workerd/io/container.capnp | 10 +-- src/workerd/server/container-client.c++ | 61 +++++++---------- .../experimental/index.d.ts | 6 +- .../generated-snapshot/experimental/index.ts | 6 +- 6 files changed, 87 insertions(+), 104 deletions(-) diff --git a/src/workerd/api/container.c++ b/src/workerd/api/container.c++ index 1f57392272d..56838cbdeaa 100644 --- a/src/workerd/api/container.c++ +++ b/src/workerd/api/container.c++ @@ -4,18 +4,6 @@ #include "container.h" -// macOS defines stdin/stdout/stderr as macros, which collide with -// member names used by ExecProcess / ExecOutput. -#ifdef stdin -#undef stdin -#endif -#ifdef stdout -#undef stdout -#endif -#ifdef stderr -#undef stderr -#endif - #include #include #include @@ -82,40 +70,41 @@ capnp::ByteStream::Client makeExecPipe( // ======================================================================================= // ExecOutput / ExecProcess -ExecOutput::ExecOutput(kj::Array stdout, kj::Array stderr, int exitCode) - : stdout(kj::mv(stdout)), - stderr(kj::mv(stderr)), +ExecOutput::ExecOutput( + kj::Array stdoutBytes, kj::Array stderrBytes, int exitCode) + : stdoutBytes(kj::mv(stdoutBytes)), + stderrBytes(kj::mv(stderrBytes)), exitCode(exitCode) {} jsg::BufferSource ExecOutput::getStdout(jsg::Lock& js) { - return copyBytes(js, stdout); + return copyBytes(js, stdoutBytes); } jsg::BufferSource ExecOutput::getStderr(jsg::Lock& js) { - return copyBytes(js, stderr); + return copyBytes(js, stderrBytes); } -ExecProcess::ExecProcess(jsg::Optional> stdin, - jsg::Optional> stdout, - jsg::Optional> stderr, +ExecProcess::ExecProcess(jsg::Optional> stdinStream, + jsg::Optional> stdoutStream, + jsg::Optional> stderrStream, int pid, rpc::Container::ProcessHandle::Client handle) - : stdin(kj::mv(stdin)), - stdout(kj::mv(stdout)), - stderr(kj::mv(stderr)), + : stdinStream(kj::mv(stdinStream)), + stdoutStream(kj::mv(stdoutStream)), + stderrStream(kj::mv(stderrStream)), pid(pid), handle(IoContext::current().addObject(kj::heap(kj::mv(handle)))) {} jsg::Optional> ExecProcess::getStdin() { - return stdin.map([](jsg::Ref& stream) { return stream.addRef(); }); + return stdinStream.map([](jsg::Ref& stream) { return stream.addRef(); }); } jsg::Optional> ExecProcess::getStdout() { - return stdout.map([](jsg::Ref& stream) { return stream.addRef(); }); + return stdoutStream.map([](jsg::Ref& stream) { return stream.addRef(); }); } jsg::Optional> ExecProcess::getStderr() { - return stderr.map([](jsg::Ref& stream) { return stream.addRef(); }); + return stderrStream.map([](jsg::Ref& stream) { return stream.addRef(); }); } void ExecProcess::ensureExitCodePromise(jsg::Lock& js) { @@ -165,7 +154,7 @@ jsg::Promise> ExecProcess::output(jsg::Lock& js) { outputCalled = true; auto stdoutPromise = js.resolvedPromise(emptyByteArray()); - KJ_IF_SOME(stream, stdout) { + KJ_IF_SOME(stream, stdoutStream) { JSG_REQUIRE(!stream->isDisturbed(), TypeError, "Cannot call output() after stdout has started being consumed."); stdoutPromise = @@ -177,7 +166,7 @@ jsg::Promise> ExecProcess::output(jsg::Lock& js) { } auto stderrPromise = js.resolvedPromise(emptyByteArray()); - KJ_IF_SOME(stream, stderr) { + KJ_IF_SOME(stream, stderrStream) { JSG_REQUIRE(!stream->isDisturbed(), TypeError, "Cannot call output() after stderr has started being consumed."); stderrPromise = stream->getController() @@ -431,8 +420,8 @@ jsg::Promise> Container::exec( JSG_REQUIRE(cmd.size() > 0, TypeError, "exec() requires a non-empty command array."); auto options = kj::mv(maybeOptions).orDefault({}); - auto stdoutMode = getExecOutputMode(kj::mv(options.stdout), "stdout"); - auto stderrMode = getExecOutputMode(kj::mv(options.stderr), "stderr"); + auto stdoutMode = getExecOutputMode(kj::mv(options.$stdout), "stdout"); + auto stderrMode = getExecOutputMode(kj::mv(options.$stderr), "stderr"); bool combinedOutput = stderrMode == "combined"; JSG_REQUIRE(!combinedOutput || stdoutMode == "pipe", TypeError, "stderr: \"combined\" requires stdout to be \"pipe\"."); @@ -450,14 +439,14 @@ jsg::Promise> Container::exec( kj::Maybe> stdoutInput; if (stdoutMode == "pipe") { auto pipe = kj::newOneWayPipe(); - req.setStdout(makeExecPipe(byteStreamFactory, kj::mv(pipe.out))); + req.setStdoutWriter(makeExecPipe(byteStreamFactory, kj::mv(pipe.out))); stdoutInput = kj::mv(pipe.in); } kj::Maybe> stderrInput; if (!combinedOutput && stderrMode == "pipe") { auto pipe = kj::newOneWayPipe(); - req.setStderr(makeExecPipe(byteStreamFactory, kj::mv(pipe.out))); + req.setStderrWriter(makeExecPipe(byteStreamFactory, kj::mv(pipe.out))); stderrInput = kj::mv(pipe.in); } @@ -514,12 +503,12 @@ jsg::Promise> Container::exec( // If stdin is undefined, the JS API promises immediate EOF. We still use the pipelined stdin() // capability so exec() doesn't wait on an extra round-trip. - KJ_IF_SOME(stdinOption, options.stdin) { - auto stdinRequest = handle.stdinRequest(capnp::MessageSize{4, 0}); - // Get the stdin() ByteStream, use the pipelined capability + KJ_IF_SOME(stdinOption, options.$stdin) { + auto stdinRequest = handle.stdinWriterRequest(capnp::MessageSize{4, 0}); + // Get the stdinWriter() ByteStream, use the pipelined capability auto stdinPipeline = stdinRequest.send(); // ... adapt bytestream into a writer - auto stdinWriter = byteStreamFactory.capnpToKjExplicitEnd(stdinPipeline.getStdin()); + auto stdinWriter = byteStreamFactory.capnpToKjExplicitEnd(stdinPipeline.getWriter()); KJ_SWITCH_ONEOF(stdinOption) { // user sets ReadableStream... @@ -542,9 +531,9 @@ jsg::Promise> Container::exec( // all good, we have the stdinStream set } else { - auto stdinRequest = handle.stdinRequest(capnp::MessageSize{4, 0}); + auto stdinRequest = handle.stdinWriterRequest(capnp::MessageSize{4, 0}); auto stdinPipeline = stdinRequest.send(); - auto stdinWriter = byteStreamFactory.capnpToKjExplicitEnd(stdinPipeline.getStdin()); + auto stdinWriter = byteStreamFactory.capnpToKjExplicitEnd(stdinPipeline.getWriter()); ioContext.addTask(stdinWriter->end().attach(kj::mv(stdinWriter))); } diff --git a/src/workerd/api/container.h b/src/workerd/api/container.h index 935011d07ee..004cd3188f6 100644 --- a/src/workerd/api/container.h +++ b/src/workerd/api/container.h @@ -17,7 +17,7 @@ namespace workerd::api { class Fetcher; class ExecOutput: public jsg::Object { public: - ExecOutput(kj::Array stdout, kj::Array stderr, int exitCode); + ExecOutput(kj::Array stdoutBytes, kj::Array stderrBytes, int exitCode); jsg::BufferSource getStdout(jsg::Lock& js); jsg::BufferSource getStderr(jsg::Lock& js); @@ -38,25 +38,27 @@ class ExecOutput: public jsg::Object { } void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { - tracker.trackField("stdout", stdout); - tracker.trackField("stderr", stderr); + tracker.trackField("stdout", stdoutBytes); + tracker.trackField("stderr", stderrBytes); } private: - kj::Array stdout; - kj::Array stderr; + kj::Array stdoutBytes; + kj::Array stderrBytes; int exitCode; }; struct ExecOptions { - jsg::Optional, kj::String>> stdin; - jsg::Optional stdout; - jsg::Optional stderr; + // $ prefix avoids collision with stdin/stdout/stderr macros from ; + // JSG_STRUCT strips the $ when exposing to JS. + jsg::Optional, kj::String>> $stdin; + jsg::Optional $stdout; + jsg::Optional $stderr; jsg::Optional cwd; jsg::Optional> env; jsg::Optional user; - JSG_STRUCT(stdin, stdout, stderr, cwd, env, user); + JSG_STRUCT($stdin, $stdout, $stderr, cwd, env, user); JSG_STRUCT_TS_OVERRIDE(ContainerExecOptions { stdin?: ReadableStream | "pipe"; stdout?: "pipe" | "ignore"; @@ -64,14 +66,17 @@ struct ExecOptions { cwd?: string; env?: Record; user?: string; + $stdin: never; + $stdout: never; + $stderr: never; }); }; class ExecProcess: public jsg::Object { public: - ExecProcess(jsg::Optional> stdin, - jsg::Optional> stdout, - jsg::Optional> stderr, + ExecProcess(jsg::Optional> stdinStream, + jsg::Optional> stdoutStream, + jsg::Optional> stderrStream, int pid, rpc::Container::ProcessHandle::Client handle); @@ -107,9 +112,9 @@ class ExecProcess: public jsg::Object { } void visitForMemoryInfo(jsg::MemoryTracker& tracker) const { - tracker.trackField("stdin", stdin); - tracker.trackField("stdout", stdout); - tracker.trackField("stderr", stderr); + tracker.trackField("stdin", stdinStream); + tracker.trackField("stdout", stdoutStream); + tracker.trackField("stderr", stderrStream); tracker.trackField("exitCodePromise", exitCodePromise); tracker.trackField("exitCodePromiseCopy", exitCodePromiseCopy); } @@ -118,9 +123,9 @@ class ExecProcess: public jsg::Object { void ensureExitCodePromise(jsg::Lock& js); jsg::Promise getExitCodeForOutput(jsg::Lock& js); - jsg::Optional> stdin; - jsg::Optional> stdout; - jsg::Optional> stderr; + jsg::Optional> stdinStream; + jsg::Optional> stdoutStream; + jsg::Optional> stderrStream; int pid; IoOwn handle; kj::Maybe>> exitCodePromise; @@ -129,7 +134,7 @@ class ExecProcess: public jsg::Object { bool outputCalled = false; void visitForGc(jsg::GcVisitor& visitor) { - visitor.visit(stdin, stdout, stderr, exitCodePromise, exitCodePromiseCopy); + visitor.visit(stdinStream, stdoutStream, stderrStream, exitCodePromise, exitCodePromiseCopy); } }; diff --git a/src/workerd/io/container.capnp b/src/workerd/io/container.capnp index 1e4edbc785f..028127a1ca2 100644 --- a/src/workerd/io/container.capnp +++ b/src/workerd/io/container.capnp @@ -133,7 +133,7 @@ interface Container @0x9aaceefc06523bca { wait @0 () -> (exitCode :Int32); # Waits for the process to exit and returns its exit code. - stdin @1 () -> (stdin :ByteStream); + stdinWriter @1 () -> (writer :ByteStream); # Retrieves a ByteStream handle to write to the process's stdin. # If not called before wait(), stdin automatically EOFs. # Throws an error if called after wait(). @@ -237,10 +237,10 @@ interface Container @0x9aaceefc06523bca { snapshotContainer @11 SnapshotContainerParams -> (snapshot :ContainerSnapshot); # Creates a full container snapshot for the running container. - exec @12 (cmd :List(Text), stdout :ByteStream, stderr :ByteStream, params :ExecOptions) - -> (process :Process); + exec @12 (cmd :List(Text), stdoutWriter :ByteStream, stderrWriter :ByteStream, + params :ExecOptions) -> (process :Process); # Executes a short-lived process in the running container. # - # If stdout/stderr are not provided, output is discarded. If params.combinedOutput is true, - # stderr is merged into stdout and the stderr capability is ignored. + # If stdoutWriter/stderrWriter are not provided, output is discarded. If params.combinedOutput + # is true, stderr is merged into stdout and the stderrWriter capability is ignored. } diff --git a/src/workerd/server/container-client.c++ b/src/workerd/server/container-client.c++ index 70d013ba096..1c10b112edc 100644 --- a/src/workerd/server/container-client.c++ +++ b/src/workerd/server/container-client.c++ @@ -17,18 +17,6 @@ #include -// macOS defines stdin/stdout/stderr as macros, which collide with -// capnp-generated method names on the ProcessHandle interface. -#ifdef stdin -#undef stdin -#endif -#ifdef stdout -#undef stdout -#endif -#ifdef stderr -#undef stderr -#endif - #include #include #include @@ -732,8 +720,8 @@ void detachEnd(kj::Maybe> stream) { // demuxDockerExecOutput demuxes the input from Docker to passed stdout/stderr. kj::Promise demuxDockerExecOutput(kj::AsyncInputStream& input, - kj::Maybe> stdout, - kj::Maybe> stderr, + kj::Maybe> stdoutWriter, + kj::Maybe> stderrWriter, bool combinedOutput) { kj::Vector buffer; size_t offset = 0; @@ -772,17 +760,17 @@ kj::Promise demuxDockerExecOutput(kj::AsyncInputStream& input, auto payload = buffer.asPtr().slice(offset + 8, offset + 8 + frameLength); if (streamId == 1) { - KJ_IF_SOME(out, stdout) { + KJ_IF_SOME(out, stdoutWriter) { co_await out->write(payload); } } else { if (streamId == 2) { if (combinedOutput) { - KJ_IF_SOME(out, stdout) { + KJ_IF_SOME(out, stdoutWriter) { co_await out->write(payload); } } else { - KJ_IF_SOME(err, stderr) { + KJ_IF_SOME(err, stderrWriter) { co_await err->write(payload); } } @@ -798,15 +786,15 @@ kj::Promise demuxDockerExecOutput(kj::AsyncInputStream& input, // We need to detach ourselves from the end() as the user might've // decided to not read them altogether. - detachEnd(kj::mv(stdout)); - detachEnd(kj::mv(stderr)); + detachEnd(kj::mv(stdoutWriter)); + detachEnd(kj::mv(stderrWriter)); } catch (...) { auto exception = kj::getCaughtExceptionAsKj(); - KJ_IF_SOME(out, stdout) { + KJ_IF_SOME(out, stdoutWriter) { out->abortWrite(kj::cp(exception)); } - KJ_IF_SOME(err, stderr) { + KJ_IF_SOME(err, stderrWriter) { err->abortWrite(kj::cp(exception)); } kj::throwFatalException(kj::mv(exception)); @@ -1055,21 +1043,21 @@ class ContainerClient::DockerProcessHandle final: public rpc::Container::Process DockerProcessHandle(ContainerClient& containerClient, kj::String execId, kj::Own connection, - kj::Maybe stdout, - kj::Maybe stderr, + kj::Maybe stdoutWriter, + kj::Maybe stderrWriter, bool combinedOutput) : containerClient(containerClient.addRef()), execId(kj::mv(execId)), sharedConnection(kj::refcounted(kj::mv(connection))) { kj::Maybe> stdoutStream = kj::none; - KJ_IF_SOME(out, stdout) { + KJ_IF_SOME(out, stdoutWriter) { stdoutStream = this->containerClient->byteStreamFactory.capnpToKjExplicitEnd(out); } else { stdoutStream = capnp::ExplicitEndOutputStream::wrap(newNullOutputStream(), []() {}); } kj::Maybe> stderrStream = kj::none; - KJ_IF_SOME(err, stderr) { + KJ_IF_SOME(err, stderrWriter) { stderrStream = this->containerClient->byteStreamFactory.capnpToKjExplicitEnd(err); } else if (!combinedOutput) { stderrStream = capnp::ExplicitEndOutputStream::wrap(newNullOutputStream(), []() {}); @@ -1109,12 +1097,13 @@ class ContainerClient::DockerProcessHandle final: public rpc::Container::Process JSG_FAIL_REQUIRE(Error, "Docker exec stream closed before exit status became available."); } - kj::Promise stdin(StdinContext context) override { - JSG_REQUIRE(!waitStarted, Error, "Process stdin() cannot be called after wait()."); - JSG_REQUIRE(!sharedConnection->stdinOpened, Error, "Process stdin() can only be called once."); + kj::Promise stdinWriter(StdinWriterContext context) override { + JSG_REQUIRE(!waitStarted, Error, "Process stdinWriter() cannot be called after wait()."); + JSG_REQUIRE( + !sharedConnection->stdinOpened, Error, "Process stdinWriter() can only be called once."); sharedConnection->stdinOpened = true; - context.getResults().setStdin(containerClient->byteStreamFactory.kjToCapnp( + context.getResults().setWriter(containerClient->byteStreamFactory.kjToCapnp( kj::heap(kj::addRef(*sharedConnection)))); co_return; } @@ -2263,14 +2252,14 @@ kj::Promise ContainerClient::exec(ExecContext context) { auto execId = co_await createExec(request.getCmd(), execParams, attachStdout, attachStderr); kj::Own execConnection = co_await startExec(kj::str(execId)); - kj::Maybe stdout = kj::none; - if (request.hasStdout()) { - stdout = request.getStdout(); + kj::Maybe stdoutWriter = kj::none; + if (request.hasStdoutWriter()) { + stdoutWriter = request.getStdoutWriter(); } - kj::Maybe stderr = kj::none; - if (request.hasStderr()) { - stderr = request.getStderr(); + kj::Maybe stderrWriter = kj::none; + if (request.hasStderrWriter()) { + stderrWriter = request.getStderrWriter(); } // Retrying is not great, however Docker's inspectExec might return running = false @@ -2289,7 +2278,7 @@ kj::Promise ContainerClient::exec(ExecContext context) { auto process = context.getResults().initProcess(); process.setPid(static_cast(inspect.pid)); process.setHandle(kj::heap(*this, kj::mv(execId), kj::mv(execConnection), - kj::mv(stdout), kj::mv(stderr), execParams.getCombinedOutput())); + kj::mv(stdoutWriter), kj::mv(stderrWriter), execParams.getCombinedOutput())); } kj::Promise ContainerClient::setInactivityTimeout(SetInactivityTimeoutContext context) { diff --git a/types/generated-snapshot/experimental/index.d.ts b/types/generated-snapshot/experimental/index.d.ts index b6aa2087c43..958be179019 100755 --- a/types/generated-snapshot/experimental/index.d.ts +++ b/types/generated-snapshot/experimental/index.d.ts @@ -3939,12 +3939,12 @@ interface ExecOutput { readonly exitCode: number; } interface ContainerExecOptions { - stdin?: ReadableStream | "pipe"; - stdout?: "pipe" | "ignore"; - stderr?: "pipe" | "ignore" | "combined"; cwd?: string; env?: Record; user?: string; + stdin?: ReadableStream | "pipe"; + stdout?: "pipe" | "ignore"; + stderr?: "pipe" | "ignore" | "combined"; } interface ExecProcess { readonly stdin: WritableStream | null; diff --git a/types/generated-snapshot/experimental/index.ts b/types/generated-snapshot/experimental/index.ts index 43d0fa2d97d..f5aa01b29fa 100755 --- a/types/generated-snapshot/experimental/index.ts +++ b/types/generated-snapshot/experimental/index.ts @@ -3945,12 +3945,12 @@ export interface ExecOutput { readonly exitCode: number; } export interface ContainerExecOptions { - stdin?: ReadableStream | "pipe"; - stdout?: "pipe" | "ignore"; - stderr?: "pipe" | "ignore" | "combined"; cwd?: string; env?: Record; user?: string; + stdin?: ReadableStream | "pipe"; + stdout?: "pipe" | "ignore"; + stderr?: "pipe" | "ignore" | "combined"; } export interface ExecProcess { readonly stdin: WritableStream | null;