Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(streams): complete documentation #3893

Merged
merged 2 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 14 additions & 5 deletions streams/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,25 @@ export class Buffer {
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
get readable() {

/** Getter returning the instance's {@linkcode ReadableStream}. */
get readable(): ReadableStream<Uint8Array> {
return this.#readable;
}

#writable = new WritableStream<Uint8Array>({
write: (chunk) => {
const m = this.#grow(chunk.byteLength);
copy(chunk, this.#buf, m);
},
});
get writable() {

/** Getter returning the instance's {@linkcode WritableStream}. */
get writable(): WritableStream<Uint8Array> {
return this.#writable;
}

/** Constructs a new instance. */
iuioiua marked this conversation as resolved.
Show resolved Hide resolved
constructor(ab?: ArrayBufferLike | ArrayLike<number>) {
this.#buf = ab === undefined ? new Uint8Array(0) : new Uint8Array(ab);
}
Expand Down Expand Up @@ -87,10 +93,12 @@ export class Buffer {
return this.#buf.buffer.byteLength;
}

/** Discards all but the first `n` unread bytes from the buffer but
/**
* Discards all but the first `n` unread bytes from the buffer but
* continues to use the same allocated storage. It throws if `n` is
* negative or greater than the length of the buffer. */
truncate(n: number) {
* negative or greater than the length of the buffer.
*/
truncate(n: number): void {
if (n === 0) {
this.reset();
return;
Expand All @@ -101,6 +109,7 @@ export class Buffer {
this.#reslice(this.#off + n);
}

/** Resets to an empty buffer. */
reset() {
this.#reslice(0);
this.#off = 0;
Expand Down
5 changes: 4 additions & 1 deletion streams/byte_slice_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import { assert } from "../assert/assert.ts";

/**
* A transform stream that only transforms from the zero-indexed `start` and `end` bytes (both inclusive).
* A transform stream that only transforms from the zero-indexed `start` and
* `end` bytes (both inclusive).
*
* @example
* ```ts
* import { ByteSliceStream } from "https://deno.land/std@$STD_VERSION/streams/byte_slice_stream.ts";
*
* const response = await fetch("https://example.com");
* const rangedStream = response.body!
* .pipeThrough(new ByteSliceStream(3, 8));
Expand All @@ -18,6 +20,7 @@ export class ByteSliceStream extends TransformStream<Uint8Array, Uint8Array> {
#offsetStart = 0;
#offsetEnd = 0;

/** Constructs a new instance. */
constructor(start = 0, end = Infinity) {
super({
start: () => {
Expand Down
5 changes: 3 additions & 2 deletions streams/copy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import { DEFAULT_BUFFER_SIZE } from "./_common.ts";
import type { Reader, Writer } from "../io/types.d.ts";

/**
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream.pipeTo} instead.
*
* Copies from `src` to `dst` until either EOF (`null`) is read from `src` or
* an error occurs. It resolves to the number of bytes copied or rejects with
* the first error encountered while copying.
Expand All @@ -23,6 +21,9 @@ import type { Reader, Writer } from "../io/types.d.ts";
* @param src The source to copy from
* @param dst The destination to copy to
* @param options Can be used to tune size of the buffer. Default size is 32kB
*
* @deprecated (will be removed after 1.0.0) Use
* {@linkcode ReadableStream.pipeTo} instead.
*/
export async function copy(
src: Reader,
Expand Down
8 changes: 3 additions & 5 deletions streams/delimiter_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import { concat } from "../bytes/concat.ts";
import { createLPS } from "./_common.ts";

/** Disposition of the delimiter. */
/** Disposition of the delimiter for {@linkcode DelimiterStreamOptions}. */
export type DelimiterDisposition =
/** Include delimiter in the found chunk. */
| "suffix"
Expand All @@ -14,6 +14,7 @@ export type DelimiterDisposition =
| "discard" // delimiter discarded
;

/** Options for {@linkcode DelimiterStream}. */
export interface DelimiterStreamOptions {
/** Disposition of the delimiter. */
disposition?: DelimiterDisposition;
Expand Down Expand Up @@ -46,10 +47,6 @@ export interface DelimiterStreamOptions {
* )
* .pipeThrough(new TextDecoderStream());
* ```
*
* @param delimiter Delimiter byte sequence
* @param options Options for the transform stream
* @returns Transform stream
*/
export class DelimiterStream extends TransformStream<Uint8Array, Uint8Array> {
#bufs: Uint8Array[] = [];
Expand All @@ -58,6 +55,7 @@ export class DelimiterStream extends TransformStream<Uint8Array, Uint8Array> {
#delimLPS: Uint8Array | null;
#disp: DelimiterDisposition;

/** Constructs a new instance. */
constructor(
delimiter: Uint8Array,
options?: DelimiterStreamOptions,
Expand Down
11 changes: 11 additions & 0 deletions streams/early_zip_readable_streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@
* Merge multiple streams into a single one, taking order into account, and each stream
* will wait for a chunk to enqueue before the next stream can append another chunk.
* If a stream ends before other ones, the others will be cancelled.
*
* @example
* ```ts
* import { earlyZipReadableStreams } from "https://deno.land/std@$STD_VERSION/streams/early_zip_readable_streams.ts";
*
* const stream1 = ReadableStream.from(["1", "2", "3"]);
* const stream2 = ReadableStream.from(["a", "b", "c"]);
* const zippedStream = earlyZipReadableStreams(stream1, stream2);
*
* await Array.fromAsync(zippedStream); // ["1", "a", "2", "b", "3", "c"];
* ```
*/
export function earlyZipReadableStreams<T>(
...streams: ReadableStream<T>[]
Expand Down
16 changes: 10 additions & 6 deletions streams/iterate_reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import { DEFAULT_BUFFER_SIZE } from "./_common.ts";
import type { Reader, ReaderSync } from "../io/types.d.ts";

export type { Reader, ReaderSync };

/**
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream} instead.
*
* Turns a Reader, `r`, into an async iterator.
* Turns a {@linkcode Reader}, `r`, into an async iterator.
*
* @example
* ```ts
* import { iterateReader } from "https://deno.land/std@$STD_VERSION/streams/iterate_reader.ts";
*
Expand All @@ -22,6 +23,7 @@ import type { Reader, ReaderSync } from "../io/types.d.ts";
* Second argument can be used to tune size of a buffer.
* Default size of the buffer is 32kB.
*
* @example
* ```ts
* import { iterateReader } from "https://deno.land/std@$STD_VERSION/streams/iterate_reader.ts";
*
Expand All @@ -34,6 +36,8 @@ import type { Reader, ReaderSync } from "../io/types.d.ts";
* }
* f.close();
* ```
*
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream} instead.
*/
export async function* iterateReader(
r: Reader,
Expand All @@ -54,9 +58,7 @@ export async function* iterateReader(
}

/**
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream} instead.
*
* Turns a ReaderSync, `r`, into an iterator.
* Turns a {@linkcode ReaderSync}, `r`, into an iterator.
*
* ```ts
* import { iterateReaderSync } from "https://deno.land/std@$STD_VERSION/streams/iterate_reader.ts";
Expand Down Expand Up @@ -88,6 +90,8 @@ export async function* iterateReader(
* a view on that buffer on each iteration. It is therefore caller's
* responsibility to copy contents of the buffer if needed; otherwise the
* next iteration will overwrite contents of previously returned chunk.
*
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream} instead.
*/
export function* iterateReaderSync(
r: ReaderSync,
Expand Down
13 changes: 9 additions & 4 deletions streams/limited_bytes_transform_stream.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// This module is browser compatible.

/** A TransformStream that will only read & enqueue `size` amount of bytes.
* This operation is chunk based and not BYOB based,
* and as such will read more than needed.
/**
* A {@linkcode TransformStream} that will only read & enqueue `size` amount of
* bytes. This operation is chunk based and not BYOB based, and as such will
* read more than needed.
*
* if options.error is set, then instead of terminating the stream,
* If `options.error` is set, then instead of terminating the stream,
* an error will be thrown.
*
* @example
* ```ts
* import { LimitedBytesTransformStream } from "https://deno.land/std@$STD_VERSION/streams/limited_bytes_transform_stream.ts";
*
* const res = await fetch("https://example.com");
* const parts = res.body!
* .pipeThrough(new LimitedBytesTransformStream(512 * 1024));
Expand All @@ -18,6 +21,8 @@
export class LimitedBytesTransformStream
extends TransformStream<Uint8Array, Uint8Array> {
#read = 0;

/** Constructs a new instance. */
constructor(size: number, options: { error?: boolean } = {}) {
super({
transform: (chunk, controller) => {
Expand Down
9 changes: 7 additions & 2 deletions streams/limited_transform_stream.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// This module is browser compatible.

/** A TransformStream that will only read & enqueue `size` amount of chunks.
/**
* A {@linkcode TransformStream} that will only read & enqueue `size` amount of
* chunks.
*
* if options.error is set, then instead of terminating the stream,
* If `options.error` is set, then instead of terminating the stream,
* an error will be thrown.
*
* @example
* ```ts
* import { LimitedTransformStream } from "https://deno.land/std@$STD_VERSION/streams/limited_transform_stream.ts";
* const res = await fetch("https://example.com");
Expand All @@ -14,6 +17,8 @@
*/
export class LimitedTransformStream<T> extends TransformStream<T, T> {
#read = 0;

/** Constructs a new {@linkcode LimitedTransformStream} instance. */
constructor(size: number, options: { error?: boolean } = {}) {
super({
transform: (chunk, controller) => {
Expand Down
11 changes: 11 additions & 0 deletions streams/merge_readable_streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@
* Merge multiple streams into a single one, not taking order into account.
* If a stream ends before other ones, the other will continue adding data,
* and the finished one will not add any more data.
*
* @example
* ```ts
* import { mergeReadableStreams } from "https://deno.land/std@$STD_VERSION/streams/merge_readable_streams.ts";
*
* const stream1 = ReadableStream.from(["1", "2", "3"]);
* const stream2 = ReadableStream.from(["a", "b", "c"]);
*
* // ["2", "c", "a", "b", "3", "1"]
* await Array.fromAsync(mergeReadableStreams(stream1, stream2));
* ```
*/
export function mergeReadableStreams<T>(
...streams: ReadableStream<T>[]
Expand Down
20 changes: 12 additions & 8 deletions streams/read_all.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import { Buffer } from "../io/buffer.ts";
import type { Reader, ReaderSync } from "../io/types.d.ts";

/**
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream} and {@linkcode import("./to_array_buffer.ts").toArrayBuffer} instead.
*
* Read Reader `r` until EOF (`null`) and resolve to the content as
* Uint8Array`.
* Read {@linkcode Reader} `r` until EOF (`null`) and resolve to the content as
* {@linkcode Uint8Array}.
*
* @example
* ```ts
* import { Buffer } from "https://deno.land/std@$STD_VERSION/io/buffer.ts";
* import { readAll } from "https://deno.land/std@$STD_VERSION/streams/read_all.ts";
Expand All @@ -28,6 +27,9 @@ import type { Reader, ReaderSync } from "../io/types.d.ts";
* const reader = new Buffer(myData.buffer);
* const bufferContent = await readAll(reader);
* ```
*
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream}
* and {@linkcode toArrayBuffer} instead.
*/
export async function readAll(r: Reader): Promise<Uint8Array> {
const buf = new Buffer();
Expand All @@ -36,11 +38,10 @@ export async function readAll(r: Reader): Promise<Uint8Array> {
}

/**
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream} and {@linkcode import("./to_array_buffer.ts").toArrayBuffer} instead.
*
* Synchronously reads Reader `r` until EOF (`null`) and returns the content
* as `Uint8Array`.
* Synchronously reads {@linkcode Reader} `r` until EOF (`null`) and returns
* the content as {@linkcode Uint8Array}.
*
* @example
* ```ts
* import { Buffer } from "https://deno.land/std@$STD_VERSION/io/buffer.ts";
* import { readAllSync } from "https://deno.land/std@$STD_VERSION/streams/read_all.ts";
Expand All @@ -59,6 +60,9 @@ export async function readAll(r: Reader): Promise<Uint8Array> {
* const reader = new Buffer(myData.buffer);
* const bufferContent = readAllSync(reader);
* ```
*
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream} and
* {@linkcode toArrayBuffer} instead.
*/
export function readAllSync(r: ReaderSync): Uint8Array {
const buf = new Buffer();
Expand Down
15 changes: 11 additions & 4 deletions streams/readable_stream_from_reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { DEFAULT_CHUNK_SIZE } from "./_common.ts";
import type { Closer, Reader } from "../io/types.d.ts";
export type { Closer };

function isCloser(value: unknown): value is Closer {
return typeof value === "object" && value !== null && value !== undefined &&
Expand All @@ -11,7 +12,12 @@ function isCloser(value: unknown): value is Closer {
typeof (value as Record<string, any>)["close"] === "function";
}

/** @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream} directly instead. */
/**
* Options for {@linkcode readableStreamFromReader}.
*
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream}
* directly instead.
*/
export interface ReadableStreamFromReaderOptions {
/** If the `reader` is also a `Closer`, automatically close the `reader`
* when `EOF` is encountered, or a read error occurs.
Expand All @@ -29,9 +35,8 @@ export interface ReadableStreamFromReaderOptions {
}

/**
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream} directly instead.
*
* Create a `ReadableStream<Uint8Array>` from a `Reader`.
* Create a {@linkcode ReadableStream} of {@linkcode Uint8Array}s from a
* {@linkcode Reader}.
*
* When the pull algorithm is called on the stream, a chunk from the reader
* will be read. When `null` is returned from the reader, the stream will be
Expand All @@ -45,6 +50,8 @@ export interface ReadableStreamFromReaderOptions {
* const file = await Deno.open("./file.txt", { read: true });
* const fileStream = readableStreamFromReader(file);
* ```
*
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream} directly instead.
*/
export function readableStreamFromReader(
reader: Reader | (Reader & Closer),
Expand Down