Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(io): iterateReader[Sync]() #4247

Merged
merged 10 commits into from
Feb 19, 2024
Merged
104 changes: 104 additions & 0 deletions io/iterate_reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
// This module is browser compatible.

import { DEFAULT_BUFFER_SIZE } from "./_constants.ts";
import type { Reader, ReaderSync } from "./types.ts";

export type { Reader, ReaderSync };

/**
* Turns a {@linkcode Reader} into an async iterator.
*
* @example
* ```ts
* import { iterateReader } from "https://deno.land/std@$STD_VERSION/io/iterate_reader.ts";
*
* using file = await Deno.open("/etc/passwd");
* for await (const chunk of iterateReader(file)) {
* console.log(chunk);
* }
* ```
*
* Second argument can be used to tune size of a buffer.
* Default size of the buffer is 32kB.
*
* @example
* ```ts
* import { iterateReader } from "https://deno.land/std@$STD_VERSION/io/iterate_reader.ts";
*
* using file = await Deno.open("/etc/passwd");
* const iter = iterateReader(file, {
* bufSize: 1024 * 1024
* });
* for await (const chunk of iter) {
* console.log(chunk);
* }
* ```
*/
export async function* iterateReader(
reader: Reader,
options?: {
bufSize?: number;
},
): AsyncIterableIterator<Uint8Array> {
const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE;
const b = new Uint8Array(bufSize);
while (true) {
const result = await reader.read(b);
if (result === null) {
break;
}

yield b.slice(0, result);
}
}

/**
* Turns a {@linkcode ReaderSync} into an iterator.
*
* ```ts
* import { iterateReaderSync } from "https://deno.land/std@$STD_VERSION/io/iterate_reader.ts";
*
* using file = Deno.openSync("/etc/passwd");
* for (const chunk of iterateReaderSync(file)) {
* console.log(chunk);
* }
* ```
*
* Second argument can be used to tune size of a buffer.
* Default size of the buffer is 32kB.
*
* ```ts
* import { iterateReaderSync } from "https://deno.land/std@$STD_VERSION/io/iterate_reader.ts";

* using file = await Deno.open("/etc/passwd");
* const iter = iterateReaderSync(file, {
* bufSize: 1024 * 1024
* });
* for (const chunk of iter) {
* console.log(chunk);
* }
* ```
*
* Iterator uses an internal buffer of fixed size for efficiency; it returns
* a view on that buffer on each iteration. It is therefore caller's
* responsibility to copy contents of the buffer if needed; otherwise the
* next iteration will overwrite contents of previously returned chunk.
*/
export function* iterateReaderSync(
reader: ReaderSync,
options?: {
bufSize?: number;
},
): IterableIterator<Uint8Array> {
const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE;
const b = new Uint8Array(bufSize);
while (true) {
const result = reader.readSync(b);
if (result === null) {
break;
}

yield b.slice(0, result);
}
}
111 changes: 111 additions & 0 deletions io/iterate_reader_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import { assertEquals } from "../assert/assert_equals.ts";
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

import { iterateReader, iterateReaderSync } from "./iterate_reader.ts";
import { readerFromIterable } from "../streams/reader_from_iterable.ts";
import { delay } from "../async/delay.ts";
import type { Reader, ReaderSync } from "./types.ts";

Deno.test("iterateReader()", async () => {
// ref: https://github.com/denoland/deno/issues/2330
const encoder = new TextEncoder();

class TestReader implements Reader {
#offset = 0;
#buf: Uint8Array;

constructor(s: string) {
this.#buf = new Uint8Array(encoder.encode(s));
}

read(p: Uint8Array): Promise<number | null> {
const n = Math.min(p.byteLength, this.#buf.byteLength - this.#offset);
p.set(this.#buf.slice(this.#offset, this.#offset + n));
this.#offset += n;

if (n === 0) {
return Promise.resolve(null);
}

return Promise.resolve(n);
}
}

const reader = new TestReader("hello world!");

let totalSize = 0;
await Array.fromAsync(
iterateReader(reader),
(buf) => totalSize += buf.byteLength,
);

assertEquals(totalSize, 12);
});

Deno.test("iterateReader() works with slow consumer", async () => {
const a = new Uint8Array([97]);
const b = new Uint8Array([98]);
const iter = iterateReader(readerFromIterable([a, b]));
const promises = [];
for await (const bytes of iter) {
promises.push(delay(10).then(() => bytes));
}
assertEquals([a, b], await Promise.all(promises));
});

Deno.test("iterateReaderSync()", () => {
// ref: https://github.com/denoland/deno/issues/2330
const encoder = new TextEncoder();

class TestReader implements ReaderSync {
#offset = 0;
#buf: Uint8Array;

constructor(s: string) {
this.#buf = new Uint8Array(encoder.encode(s));
}

readSync(p: Uint8Array): number | null {
const n = Math.min(p.byteLength, this.#buf.byteLength - this.#offset);
p.set(this.#buf.slice(this.#offset, this.#offset + n));
this.#offset += n;

if (n === 0) {
return null;
}

return n;
}
}

const reader = new TestReader("hello world!");

let totalSize = 0;
for (const buf of iterateReaderSync(reader)) {
totalSize += buf.byteLength;
}

assertEquals(totalSize, 12);
});

Deno.test("iterateReaderSync() works with slow consumer", async () => {
const a = new Uint8Array([97]);
const b = new Uint8Array([98]);
const data = [a, b];
const readerSync = {
readSync(u8: Uint8Array) {
const bytes = data.shift();
if (bytes) {
u8.set(bytes);
return bytes.length;
}
return null;
},
};
const iter = iterateReaderSync(readerSync);
const promises = [];
for (const bytes of iter) {
promises.push(delay(10).then(() => bytes));
}
assertEquals([a, b], await Promise.all(promises));
});
1 change: 1 addition & 0 deletions io/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export * from "./buf_writer.ts";
export * from "./buffer.ts";
export * from "./copy.ts";
export * from "./copy_n.ts";
export * from "./iterate_reader.ts";
export * from "./limited_reader.ts";
export * from "./multi_reader.ts";
export * from "./read_all.ts";
Expand Down
10 changes: 6 additions & 4 deletions io/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ export interface Reader {
*
* Implementations should not retain a reference to `p`.
*
* Use iterateReader() from https://deno.land/std@$STD_VERSION/streams/iterate_reader.ts to turn a Reader into an
* AsyncIterator.
* Use
* {@linkcode https://deno.land/std@$STD_VERSION/io/to_iterator.ts?s=toIterator}
* to turn a {@linkcode Reader} into an {@linkcode AsyncIterableIterator}.
*/
read(p: Uint8Array): Promise<number | null>;
}
Expand All @@ -52,8 +53,9 @@ export interface ReaderSync {
*
* Implementations should not retain a reference to `p`.
*
* Use iterateReaderSync() from https://deno.land/std@$STD_VERSION/streams/iterate_reader.ts to turn a ReaderSync
* into an Iterator.
* Use
* {@linkcode https://deno.land/std@$STD_VERSION/io/to_iterator.ts?s=toIteratorSync}
* to turn a {@linkcode ReaderSync} into an {@linkcode IterableIterator}.
*/
readSync(p: Uint8Array): number | null;
}
Expand Down
35 changes: 10 additions & 25 deletions streams/iterate_reader.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
// This module is browser compatible.

import { DEFAULT_BUFFER_SIZE } from "./_common.ts";
import {
iterateReader as _iterateReader,
iterateReaderSync as _iterateReaderSync,
} from "../io/iterate_reader.ts";
import type { Reader, ReaderSync } from "../io/types.ts";

export type { Reader, ReaderSync };
Expand Down Expand Up @@ -35,24 +38,15 @@ export type { Reader, ReaderSync };
* }
* ```
*
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStreamDefaultReader} instead.
* @deprecated (will be removed in 1.0.0) Use {@linkcode iterateReader} instead.
iuioiua marked this conversation as resolved.
Show resolved Hide resolved
*/
export async function* iterateReader(
export function iterateReader(
r: Reader,
options?: {
bufSize?: number;
},
): AsyncIterableIterator<Uint8Array> {
const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE;
const b = new Uint8Array(bufSize);
while (true) {
const result = await r.read(b);
if (result === null) {
break;
}

yield b.slice(0, result);
}
return _iterateReader(r, options);
}

/**
Expand Down Expand Up @@ -87,22 +81,13 @@ export async function* iterateReader(
* responsibility to copy contents of the buffer if needed; otherwise the
* next iteration will overwrite contents of previously returned chunk.
*
* @deprecated (will be removed after 1.0.0) Use {@linkcode ReadableStream} instead.
* @deprecated (will be removed in 1.0.0) Use {@linkcode iterateReaderSync} instead.
*/
export function* iterateReaderSync(
export function iterateReaderSync(
r: ReaderSync,
options?: {
bufSize?: number;
},
): IterableIterator<Uint8Array> {
const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE;
const b = new Uint8Array(bufSize);
while (true) {
const result = r.readSync(b);
if (result === null) {
break;
}

yield b.slice(0, result);
}
return _iterateReaderSync(r, options);
}