Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(BREAKING): command pipe writes may now return a promise #225

Merged
merged 2 commits into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions mod.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Deno.test("should not get stderr when set to writer", async () => {
assertThrows(
() => output.stderr,
Error,
`Stderr was not piped (was streamed). Call .stderr(\"piped\") or .stderr(\"inheritPiped\") when building the command.`,
`Stderr was streamed to another source and is no longer available.`,
);
});

Expand Down Expand Up @@ -374,14 +374,11 @@ Deno.test("should handle boolean list 'and'", async () => {

Deno.test("should support custom command handlers", async () => {
const builder = new CommandBuilder()
.registerCommand("zardoz-speaks", (context) => {
.registerCommand("zardoz-speaks", async (context) => {
if (context.args.length != 1) {
context.stderr.writeLine("zardoz-speaks: expected 1 argument");
return {
code: 1,
};
return context.error("zardoz-speaks: expected 1 argument");
}
context.stdout.writeLine(`zardoz speaks to ${context.args[0]}`);
await context.stdout.writeLine(`zardoz speaks to ${context.args[0]}`);
return {
code: 0,
};
Expand Down Expand Up @@ -802,7 +799,7 @@ Deno.test("piping to stdin", async () => {
.stderr("piped")
.noThrow();
assertEquals(result.code, 1);
assertEquals(result.stderr, "stdin pipe broken. Error: Exited with code: 1\n");
assertEquals(result.stderr, "stdin pipe broken. Exited with code: 1\n");
}
});

Expand Down Expand Up @@ -862,13 +859,9 @@ Deno.test("piping to a writable that throws", async () => {
throw new Error("failed");
},
});
await assertRejects(
async () => {
await $`echo 1`.stdout(writableStream);
},
Error,
"failed",
);
const result = await $`echo 1`.stdout(writableStream).stderr("piped").noThrow();
assertEquals(result.code, 1);
assertEquals(result.stderr, "echo: failed\n");
});

Deno.test("piping stdout/stderr to a file", async () => {
Expand Down Expand Up @@ -1030,7 +1023,7 @@ Deno.test("streaming api errors while streaming", async () => {
.stdout("piped")
.stderr("piped")
.spawn();
assertEquals(result.stderr, "stdin pipe broken. Error: Exited with code: 1\n");
assertEquals(result.stderr, "stdin pipe broken. Exited with code: 1\n");
assertEquals(result.stdout, "1\n2\n");
}
});
Expand Down
88 changes: 45 additions & 43 deletions src/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@ import { Delay } from "./common.ts";
import { Buffer, colors, path, readerFromStreamReader } from "./deps.ts";
import {
CapturingBufferWriter,
CapturingBufferWriterSync,
InheritStaticTextBypassWriter,
NullPipeWriter,
PipedBuffer,
Reader,
ShellPipeReaderKind,
ShellPipeWriter,
ShellPipeWriterKind,
Writer,
WriterSync,
} from "./pipes.ts";
import { parseCommand, spawn } from "./shell.ts";
import { isShowingProgressBars } from "./console/progress/interval.ts";
import { PathRef } from "./path.ts";
import { RequestBuilder } from "./request.ts";
import { writerFromStreamWriter } from "https://deno.land/std@0.213.0/streams/writer_from_stream_writer.ts";

type BufferStdio = "inherit" | "null" | "streamed" | Buffer;

Expand Down Expand Up @@ -823,74 +826,57 @@ export function parseAndSpawnCommand(state: CommandBuilderState) {
}
const combinedBuffer = new Buffer();
return [
new CapturingBufferWriter(stdoutBuffer, combinedBuffer),
new CapturingBufferWriter(stderrBuffer, combinedBuffer),
getCapturingBuffer(stdoutBuffer, combinedBuffer),
getCapturingBuffer(stderrBuffer, combinedBuffer),
combinedBuffer,
] as const;
}
return [stdoutBuffer, stderrBuffer, undefined] as const;

function getOutputBuffer(innerWriter: WriterSync, { kind, options }: ShellPipeWriterKindWithOptions) {
function getCapturingBuffer(buffer: Writer | WriterSync, combinedBuffer: Buffer) {
if ("write" in buffer) {
return new CapturingBufferWriter(buffer, combinedBuffer);
} else {
return new CapturingBufferWriterSync(buffer, combinedBuffer);
}
}

function getOutputBuffer(inheritWriter: WriterSync, { kind, options }: ShellPipeWriterKindWithOptions) {
if (typeof kind === "object") {
if (kind instanceof PathRef) {
const file = kind.openSync({ write: true, truncate: true, create: true });
disposables.push(file);
return file;
} else if (kind instanceof WritableStream) {
// this is sketch
const writer = kind.getWriter();
const promiseMap = new Map<number, Promise<void>>();
let hadError = false;
let foundErr: unknown = undefined;
let index = 0;
const streamWriter = kind.getWriter();
asyncDisposables.push({
async [Symbol.asyncDispose]() {
await Promise.all(promiseMap.values());
if (foundErr) {
throw foundErr;
}
if (!options?.preventClose && !hadError) {
await writer.close();
streamWriter.releaseLock();
if (!options?.preventClose) {
try {
await kind.close();
} catch {
// ignore, the stream have errored
}
}
},
});
return {
writeSync(buffer: Uint8Array) {
if (foundErr) {
const errorToThrow = foundErr;
foundErr = undefined;
throw errorToThrow;
}
const newIndex = index++;
promiseMap.set(
newIndex,
writer.write(buffer).catch((err) => {
if (err != null) {
foundErr = err;
hadError = true;
}
}).finally(() => {
promiseMap.delete(newIndex);
}),
);
return buffer.length;
},
};
return writerFromStreamWriter(streamWriter);
} else {
return kind;
}
}
switch (kind) {
case "inherit":
if (hasProgressBars) {
return new InheritStaticTextBypassWriter(innerWriter);
return new InheritStaticTextBypassWriter(inheritWriter);
} else {
return "inherit";
}
case "piped":
return new PipedBuffer();
case "inheritPiped":
return new CapturingBufferWriter(innerWriter, new Buffer());
return new CapturingBufferWriterSync(inheritWriter, new Buffer());
case "null":
return "null";
default: {
Expand All @@ -902,9 +888,17 @@ export function parseAndSpawnCommand(state: CommandBuilderState) {
}

function finalizeCommandResultBuffer(
buffer: PipedBuffer | "inherit" | "null" | CapturingBufferWriter | InheritStaticTextBypassWriter | WriterSync,
buffer:
| PipedBuffer
| "inherit"
| "null"
| CapturingBufferWriter
| CapturingBufferWriterSync
| InheritStaticTextBypassWriter
| Writer
| WriterSync,
): BufferStdio {
if (buffer instanceof CapturingBufferWriter) {
if (buffer instanceof CapturingBufferWriterSync || buffer instanceof CapturingBufferWriter) {
return buffer.getBuffer();
} else if (buffer instanceof InheritStaticTextBypassWriter) {
buffer.flush(); // this is line buffered, so flush anything left
Expand All @@ -920,7 +914,15 @@ export function parseAndSpawnCommand(state: CommandBuilderState) {
}

function finalizeCommandResultBufferForError(
buffer: PipedBuffer | "inherit" | "null" | CapturingBufferWriter | InheritStaticTextBypassWriter | WriterSync,
buffer:
| PipedBuffer
| "inherit"
| "null"
| CapturingBufferWriter
| CapturingBufferWriterSync
| InheritStaticTextBypassWriter
| Writer
| WriterSync,
error: Error,
) {
if (buffer instanceof InheritStaticTextBypassWriter) {
Expand Down Expand Up @@ -1013,7 +1015,7 @@ export class CommandResult {

/** Raw stderr bytes. */
get stderrBytes(): Uint8Array {
if (this.#stdout === "streamed") {
if (this.#stderr === "streamed") {
throw new Error(
`Stderr was streamed to another source and is no longer available.`,
);
Expand Down
14 changes: 9 additions & 5 deletions src/command_handler.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { ExecuteResult } from "./result.ts";
import type { KillSignal } from "./command.ts";
import { Reader, WriterSync } from "./pipes.ts";
import { Reader } from "./pipes.ts";

/** Used to read from stdin. */
export type CommandPipeReader = "inherit" | "null" | Reader;

/** Used to write to stdout or stderr. */
export interface CommandPipeWriter extends WriterSync {
writeSync(p: Uint8Array): number;
writeText(text: string): void;
writeLine(text: string): void;
export interface CommandPipeWriter {
write(p: Uint8Array): Promise<number> | number;
writeText(text: string): Promise<void> | void;
writeLine(text: string): Promise<void> | void;
}

/** Context of the currently executing command. */
Expand All @@ -21,6 +21,10 @@ export interface CommandContext {
get stdout(): CommandPipeWriter;
get stderr(): CommandPipeWriter;
get signal(): KillSignal;
/// Helper function for writing a line to stderr and returning a 1 exit code.
error(message: string): Promise<ExecuteResult> | ExecuteResult;
/// Helper function for writing a line to stderr and returning the provided exit code.
error(code: number, message: string): Promise<ExecuteResult> | ExecuteResult;
}

/** Handler for executing a command. */
Expand Down
41 changes: 30 additions & 11 deletions src/commands/cat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ export async function catCommand(
const code = await executeCat(context);
return { code };
} catch (err) {
context.stderr.writeLine(`cat: ${err?.message ?? err}`);
return { code: 1 };
return context.error(`cat: ${err?.message ?? err}`);
}
}

Expand All @@ -29,8 +28,14 @@ async function executeCat(context: CommandContext) {
if (typeof context.stdin === "object") { // stdin is a Reader
while (!context.signal.aborted) {
const size = await context.stdin.read(buf);
if (!size || size === 0) break;
else context.stdout.writeSync(buf.slice(0, size));
if (!size || size === 0) {
break;
} else {
const maybePromise = context.stdout.write(buf.slice(0, size));
if (maybePromise instanceof Promise) {
await maybePromise;
}
}
}
exitCode = context.signal.abortedExitCode ?? 0;
} else {
Expand All @@ -44,15 +49,24 @@ async function executeCat(context: CommandContext) {
while (!context.signal.aborted) {
// NOTE: rust supports cancellation here
const size = file.readSync(buf);
if (!size || size === 0) break;
else context.stdout.writeSync(buf.slice(0, size));
if (!size || size === 0) {
break;
} else {
const maybePromise = context.stdout.write(buf.slice(0, size));
if (maybePromise instanceof Promise) {
await maybePromise;
}
}
}
exitCode = context.signal.abortedExitCode ?? 0;
} catch (err) {
context.stderr.writeLine(`cat ${path}: ${err}`);
const maybePromise = context.stderr.writeLine(`cat ${path}: ${err?.message ?? err}`);
if (maybePromise instanceof Promise) {
await maybePromise;
}
exitCode = 1;
} finally {
if (file) file.close();
file?.close();
}
}
}
Expand All @@ -62,10 +76,15 @@ async function executeCat(context: CommandContext) {
export function parseCatArgs(args: string[]): CatFlags {
const paths = [];
for (const arg of parseArgKinds(args)) {
if (arg.kind === "Arg") paths.push(arg.arg);
else bailUnsupported(arg); // for now, we don't support any arguments
if (arg.kind === "Arg") {
paths.push(arg.arg);
} else {
bailUnsupported(arg); // for now, we don't support any arguments
}
}

if (paths.length === 0) paths.push("-");
if (paths.length === 0) {
paths.push("-");
}
return { paths };
}
3 changes: 1 addition & 2 deletions src/commands/cd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ export async function cdCommand(context: CommandContext): Promise<ExecuteResult>
}],
};
} catch (err) {
context.stderr.writeLine(`cd: ${err?.message ?? err}`);
return { code: 1 };
return context.error(`cd: ${err?.message ?? err}`);
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/commands/cp_mv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ export async function cpCommand(
await executeCp(context.cwd, context.args);
return { code: 0 };
} catch (err) {
context.stderr.writeLine(`cp: ${err?.message ?? err}`);
return { code: 1 };
return context.error(`cp: ${err?.message ?? err}`);
}
}

Expand Down Expand Up @@ -101,8 +100,7 @@ export async function mvCommand(
await executeMove(context.cwd, context.args);
return { code: 0 };
} catch (err) {
context.stderr.writeLine(`mv: ${err?.message ?? err}`);
return { code: 1 };
return context.error(`mv: ${err?.message ?? err}`);
}
}

Expand Down
18 changes: 15 additions & 3 deletions src/commands/echo.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
import { CommandContext } from "../command_handler.ts";
import { ExecuteResult } from "../result.ts";

export function echoCommand(context: CommandContext): ExecuteResult {
context.stdout.writeLine(context.args.join(" "));
return { code: 0 };
export function echoCommand(context: CommandContext): ExecuteResult | Promise<ExecuteResult> {
try {
const maybePromise = context.stdout.writeLine(context.args.join(" "));
if (maybePromise instanceof Promise) {
return maybePromise.then(() => ({ code: 0 })).catch((err) => handleFailure(context, err));
} else {
return { code: 0 };
}
} catch (err) {
return handleFailure(context, err);
}
}

function handleFailure(context: CommandContext, err: any) {
return context.error(`echo: ${err?.message ?? err}`);
}
Loading
Loading