Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 301 additions & 8 deletions src/workerd/api/container.c++
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
#include "container.h"

#include <workerd/api/http.h>
#include <workerd/api/streams/readable.h>
#include <workerd/api/streams/writable.h>
#include <workerd/api/system-streams.h>
#include <workerd/io/features.h>
#include <workerd/io/io-context.h>

#include <capnp/compat/byte-stream.h>
#include <kj/filesystem.h>

namespace workerd::api {
Expand All @@ -30,8 +34,175 @@ kj::Maybe<kj::Path> parseRestorePath(kj::StringPtr path) {
}
}

jsg::BufferSource copyBytes(jsg::Lock& js, kj::ArrayPtr<const kj::byte> bytes) {
auto backing = jsg::BackingStore::alloc<v8::ArrayBuffer>(js, bytes.size());
backing.asArrayPtr().copyFrom(bytes);
return jsg::BufferSource(js, kj::mv(backing));
}

void requireValidEnvNameAndValue(kj::StringPtr name, kj::StringPtr value) {
JSG_REQUIRE(name.findFirst('=') == kj::none, Error,
"Environment variable names cannot contain '=': ", name);
JSG_REQUIRE(name.findFirst('\0') == kj::none, Error,
"Environment variable names cannot contain '\\0': ", name);
JSG_REQUIRE(value.findFirst('\0') == kj::none, Error,
"Environment variable values cannot contain '\\0': ", name);
}

kj::String getExecOutputMode(jsg::Optional<kj::String> maybeMode, kj::StringPtr kind) {
auto mode = kj::mv(maybeMode).orDefault(kj::str("pipe"));
JSG_REQUIRE(mode == "pipe" || mode == "ignore" || (kind == "stderr" && mode == "combined"),
TypeError, "Invalid ", kind, " option: ", mode);
return mode;
}

kj::Array<kj::byte> emptyByteArray() {
return kj::heapArray<kj::byte>(0);
}

capnp::ByteStream::Client makeExecPipe(
capnp::ByteStreamFactory& factory, kj::Own<kj::AsyncOutputStream> output) {
return factory.kjToCapnp(capnp::ExplicitEndOutputStream::wrap(kj::mv(output), []() {}));
}

} // namespace

// =======================================================================================
// ExecOutput / ExecProcess

ExecOutput::ExecOutput(
kj::Array<kj::byte> stdoutBytes, kj::Array<kj::byte> stderrBytes, int exitCode)
: stdoutBytes(kj::mv(stdoutBytes)),
stderrBytes(kj::mv(stderrBytes)),
exitCode(exitCode) {}

jsg::BufferSource ExecOutput::getStdout(jsg::Lock& js) {
return copyBytes(js, stdoutBytes);
}

jsg::BufferSource ExecOutput::getStderr(jsg::Lock& js) {
return copyBytes(js, stderrBytes);
}

ExecProcess::ExecProcess(jsg::Optional<jsg::Ref<WritableStream>> stdinStream,
jsg::Optional<jsg::Ref<ReadableStream>> stdoutStream,
jsg::Optional<jsg::Ref<ReadableStream>> stderrStream,
int pid,
rpc::Container::ProcessHandle::Client handle)
: stdinStream(kj::mv(stdinStream)),
stdoutStream(kj::mv(stdoutStream)),
stderrStream(kj::mv(stderrStream)),
pid(pid),
handle(IoContext::current().addObject(kj::heap(kj::mv(handle)))) {}

jsg::Optional<jsg::Ref<WritableStream>> ExecProcess::getStdin() {
return stdinStream.map([](jsg::Ref<WritableStream>& stream) { return stream.addRef(); });
}

jsg::Optional<jsg::Ref<ReadableStream>> ExecProcess::getStdout() {
return stdoutStream.map([](jsg::Ref<ReadableStream>& stream) { return stream.addRef(); });
}

jsg::Optional<jsg::Ref<ReadableStream>> ExecProcess::getStderr() {
return stderrStream.map([](jsg::Ref<ReadableStream>& stream) { return stream.addRef(); });
}

void ExecProcess::ensureExitCodePromise(jsg::Lock& js) {
if (exitCodePromise != kj::none) {
return;
}

// jsg::Promise is single-use. Keep the original Promise<int> as the public `exitCode`
// property and a separate whenResolved() branch for helpers like output().
auto self = JSG_THIS;
auto promise = IoContext::current().awaitIo(js,
handle->waitRequest(capnp::MessageSize{4, 0})
.send()
.then([self = kj::mv(self)](
capnp::Response<rpc::Container::ProcessHandle::WaitResults>&& results) mutable {
auto exitCode = results.getExitCode();
self->resolvedExitCode = exitCode;
return exitCode;
}));

exitCodePromiseCopy = promise.whenResolved(js);
exitCodePromise = jsg::MemoizedIdentity<jsg::Promise<int>>(kj::mv(promise));
}

jsg::Promise<int> ExecProcess::getExitCodeForOutput(jsg::Lock& js) {
ensureExitCodePromise(js);

KJ_IF_SOME(exitCode, resolvedExitCode) {
return js.resolvedPromise(static_cast<int>(exitCode));
}

auto self = JSG_THIS;
return KJ_ASSERT_NONNULL(exitCodePromiseCopy)
.whenResolved(js)
.then(js, [self = kj::mv(self)](jsg::Lock&) -> int {
return static_cast<int>(KJ_ASSERT_NONNULL(self->resolvedExitCode));
});
}

jsg::MemoizedIdentity<jsg::Promise<int>>& ExecProcess::getExitCode(jsg::Lock& js) {
ensureExitCodePromise(js);
return KJ_ASSERT_NONNULL(exitCodePromise);
}

jsg::Promise<jsg::Ref<ExecOutput>> ExecProcess::output(jsg::Lock& js) {
JSG_REQUIRE(!outputCalled, TypeError, "output() can only be called once.");
outputCalled = true;

auto stdoutPromise = js.resolvedPromise(emptyByteArray());
KJ_IF_SOME(stream, stdoutStream) {
JSG_REQUIRE(!stream->isDisturbed(), TypeError,
"Cannot call output() after stdout has started being consumed.");
stdoutPromise =
stream->getController()
.readAllBytes(js, IoContext::current().getLimitEnforcer().getBufferingLimit())
.then(js, [](jsg::Lock&, jsg::BufferSource bytes) {
return kj::heapArray(bytes.asArrayPtr());
});
}

auto stderrPromise = js.resolvedPromise(emptyByteArray());
KJ_IF_SOME(stream, stderrStream) {
JSG_REQUIRE(!stream->isDisturbed(), TypeError,
"Cannot call output() after stderr has started being consumed.");
stderrPromise = stream->getController()
.readAllBytes(js, kj::maxValue)
.then(js, [](jsg::Lock&, jsg::BufferSource bytes) {
return kj::heapArray(bytes.asArrayPtr());
});
}

auto exitCodePromise = getExitCodeForOutput(js);

return stdoutPromise.then(js,
[stderrPromise = kj::mv(stderrPromise), exitCodePromise = kj::mv(exitCodePromise)](
jsg::Lock& js,
kj::Array<kj::byte> stdoutBytes) mutable -> jsg::Promise<jsg::Ref<ExecOutput>> {
return stderrPromise.then(js,
[stdoutBytes = kj::mv(stdoutBytes), exitCodePromise = kj::mv(exitCodePromise)](
jsg::Lock& js,
kj::Array<kj::byte> stderrBytes) mutable -> jsg::Promise<jsg::Ref<ExecOutput>> {
return exitCodePromise.then(js,
[stdoutBytes = kj::mv(stdoutBytes), stderrBytes = kj::mv(stderrBytes)](
jsg::Lock& js, int exitCode) mutable -> jsg::Ref<ExecOutput> {
return js.alloc<ExecOutput>(kj::mv(stdoutBytes), kj::mv(stderrBytes), exitCode);
});
});
});
}

void ExecProcess::kill(jsg::Lock& js, jsg::Optional<int> signal) {
auto signo = signal.orDefault(15);
JSG_REQUIRE(signo > 0 && signo <= 64, RangeError, "Invalid signal number.");

auto req = handle->killRequest(capnp::MessageSize{4, 0});
req.setSigno(signo);
IoContext::current().addTask(req.sendIgnoringResult());
}
// =======================================================================================
// Basic lifecycle methods

Expand All @@ -58,14 +229,7 @@ void Container::start(jsg::Lock& js, jsg::Optional<StartupOptions> maybeOptions)
auto list = req.initEnvironmentVariables(env.fields.size());
for (auto i: kj::indices(env.fields)) {
auto field = &env.fields[i];
JSG_REQUIRE(field->name.findFirst('=') == kj::none, Error,
"Environment variable names cannot contain '=': ", field->name);

JSG_REQUIRE(field->name.findFirst('\0') == kj::none, Error,
"Environment variable names cannot contain '\\0': ", field->name);

JSG_REQUIRE(field->value.findFirst('\0') == kj::none, Error,
"Environment variable values cannot contain '\\0': ", field->name);
requireValidEnvNameAndValue(field->name, field->value);

list.set(i, str(field->name, "=", field->value));
}
Expand Down Expand Up @@ -250,6 +414,135 @@ jsg::Promise<void> Container::interceptOutboundHttps(
return ioctx.awaitIo(js, req.sendIgnoringResult());
}

jsg::Promise<jsg::Ref<ExecProcess>> Container::exec(
jsg::Lock& js, kj::Array<kj::String> cmd, jsg::Optional<ExecOptions> maybeOptions) {
JSG_REQUIRE(running, Error, "exec() cannot be called on a container that is not running.");
JSG_REQUIRE(cmd.size() > 0, TypeError, "exec() requires a non-empty command array.");

auto options = kj::mv(maybeOptions).orDefault({});
auto stdoutMode = getExecOutputMode(kj::mv(options.$stdout), "stdout");
auto stderrMode = getExecOutputMode(kj::mv(options.$stderr), "stderr");
bool combinedOutput = stderrMode == "combined";
JSG_REQUIRE(!combinedOutput || stdoutMode == "pipe", TypeError,
"stderr: \"combined\" requires stdout to be \"pipe\".");

auto& ioContext = IoContext::current();
auto& byteStreamFactory = ioContext.getByteStreamFactory();

auto req = rpcClient->execRequest();
auto cmdList = req.initCmd(cmd.size());
for (auto i: kj::indices(cmd)) {
cmdList.set(i, cmd[i]);
}

// Init the kj pipes to create the stdout/err bytestreams
kj::Maybe<kj::Own<kj::AsyncInputStream>> stdoutInput;
if (stdoutMode == "pipe") {
auto pipe = kj::newOneWayPipe();
req.setStdout(makeExecPipe(byteStreamFactory, kj::mv(pipe.out)));
stdoutInput = kj::mv(pipe.in);
}

kj::Maybe<kj::Own<kj::AsyncInputStream>> stderrInput;
if (!combinedOutput && stderrMode == "pipe") {
auto pipe = kj::newOneWayPipe();
req.setStderr(makeExecPipe(byteStreamFactory, kj::mv(pipe.out)));
stderrInput = kj::mv(pipe.in);
}

auto params = req.initParams();
params.setCombinedOutput(combinedOutput);

// Some basic validation...
KJ_IF_SOME(cwd, options.cwd) {
JSG_REQUIRE(cwd.findFirst('\0') == kj::none, TypeError, "cwd cannot contain '\\0' characters.");
params.setWorkingDirectory(cwd);
}

KJ_IF_SOME(user, options.user) {
JSG_REQUIRE(
user.findFirst('\0') == kj::none, TypeError, "user cannot contain '\\0' characters.");
params.setUser(user);
}

KJ_IF_SOME(env, options.env) {
auto envList = params.initEnv(env.fields.size());
for (auto i: kj::indices(env.fields)) {
auto field = &env.fields[i];
requireValidEnvNameAndValue(field->name, field->value);
envList.set(i, str(field->name, "=", field->value));
}
}

// We have to await, because PID won't be available until the response resolves
return ioContext.awaitIo(js, req.send())
.then(js,
[&ioContext, &byteStreamFactory, options = kj::mv(options),
stdoutInput = kj::mv(stdoutInput), stderrInput = kj::mv(stderrInput)](
jsg::Lock& js, capnp::Response<rpc::Container::ExecResults> results) mutable
-> jsg::Ref<ExecProcess> {
auto process = results.getProcess();
auto handle = process.getHandle();
auto pid = process.getPid();

// Init the ReadableStreams (stdout/stderr)
jsg::Optional<jsg::Ref<ReadableStream>> stdoutStream = kj::none;
KJ_IF_SOME(input, stdoutInput) {
auto source = newSystemStream(kj::mv(input), StreamEncoding::IDENTITY, ioContext);
stdoutStream = js.alloc<ReadableStream>(ioContext, kj::mv(source));
}

// stderrInput is only set if using "pipe" on stderr and not "combined"
jsg::Optional<jsg::Ref<ReadableStream>> stderrStream = kj::none;
KJ_IF_SOME(input, stderrInput) {
auto source = newSystemStream(kj::mv(input), StreamEncoding::IDENTITY, ioContext);
stderrStream = js.alloc<ReadableStream>(ioContext, kj::mv(source));
}

jsg::Optional<jsg::Ref<WritableStream>> stdinStream = kj::none;

// If stdin is undefined, the JS API promises immediate EOF. We still use the pipelined stdin()
// capability so exec() doesn't wait on an extra round-trip.
KJ_IF_SOME(stdinOption, options.$stdin) {
auto stdinRequest = handle.stdinRequest(capnp::MessageSize{4, 0});
// Get the stdin() ByteStream, use the pipelined capability
auto stdinPipeline = stdinRequest.send();
// ... adapt bytestream into a writer
auto stdinWriter = byteStreamFactory.capnpToKjExplicitEnd(stdinPipeline.getStdin());

KJ_SWITCH_ONEOF(stdinOption) {
// user sets ReadableStream...
KJ_CASE_ONEOF(readable, jsg::Ref<ReadableStream>) {
auto sink = newSystemStream(kj::mv(stdinWriter), StreamEncoding::IDENTITY, ioContext);
auto pipePromise =
(ioContext.waitForDeferredProxy(readable->pumpTo(js, kj::mv(sink), true)));
ioContext.addTask(pipePromise.attach(readable.addRef()));
}
// user sets "pipe"... they want to consume the API with the stdin WritableStream
KJ_CASE_ONEOF(mode, kj::String) {
JSG_REQUIRE(
mode == "pipe", TypeError, "stdin must be a ReadableStream or the string \"pipe\".");
auto sink = newSystemStream(kj::mv(stdinWriter), StreamEncoding::IDENTITY, ioContext);
auto writable = js.alloc<WritableStream>(ioContext, kj::mv(sink),
ioContext.getMetrics().tryCreateWritableByteStreamObserver());
stdinStream = kj::mv(writable);
}
}

// all good, we have the stdinStream set
} else {
auto stdinRequest = handle.stdinRequest(capnp::MessageSize{4, 0});
auto stdinPipeline = stdinRequest.send();
auto stdinWriter = byteStreamFactory.capnpToKjExplicitEnd(stdinPipeline.getStdin());
ioContext.addTask(stdinWriter->end().attach(kj::mv(stdinWriter)));
}

// return the instance to the process after getting pipeline of the process handle
return js.alloc<ExecProcess>(
kj::mv(stdinStream), kj::mv(stdoutStream), kj::mv(stderrStream), pid, kj::mv(handle));
});
}

jsg::Promise<void> Container::monitor(jsg::Lock& js) {
JSG_REQUIRE(running, Error, "monitor() cannot be called on a container that is not running.");

Expand Down
Loading
Loading