Skip to content

Commit

Permalink
feat(util-stream-node): provide handling utilities for Node.js stream (
Browse files Browse the repository at this point in the history
…#3778)

* feat(util-sdk-stream): implement sdk stream utility mixin

* feat: types of the SDK Stream

feat(util-stream-node): merge util-sdk-stream into util-stream-node(browser)

feeat(util-stream-node): rename transformToBuffer to transformToByteArray; unit test

* feat(util-stream-node): update sdkStreamMixin input to unknown
  • Loading branch information
AllanZhengYP committed Jul 12, 2022
1 parent 5b2dc89 commit 0ef4af6
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 0 deletions.
2 changes: 2 additions & 0 deletions packages/util-stream-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
},
"license": "Apache-2.0",
"dependencies": {
"@aws-sdk/node-http-handler": "*",
"@aws-sdk/types": "*",
"@aws-sdk/util-buffer-from": "*",
"tslib": "^2.3.1"
},
"devDependencies": {
Expand Down
1 change: 1 addition & 0 deletions packages/util-stream-node/src/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from "./getAwsChunkedEncodingStream";
export * from "./sdk-stream-mixin";
166 changes: 166 additions & 0 deletions packages/util-stream-node/src/sdk-stream-mixin.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import { SdkStreamMixin } from "@aws-sdk/types";
import { fromArrayBuffer } from "@aws-sdk/util-buffer-from";
import { PassThrough, Readable, Writable } from "stream";

import { sdkStreamMixin } from "./sdk-stream-mixin";

jest.mock("@aws-sdk/util-buffer-from");

describe(sdkStreamMixin.name, () => {
const writeDataToStream = (stream: Writable, data: Array<ArrayBufferLike>): Promise<void> =>
new Promise((resolve, reject) => {
data.forEach((chunk) => {
stream.write(chunk, (err) => {
if (err) reject(err);
});
});
stream.end(resolve);
});
const byteArrayFromBuffer = (buf: Buffer) => new Uint8Array(buf.buffer, buf.byteOffset, buf.byteLength);
let passThrough: PassThrough;
const expectAllTransformsToFail = async (sdkStream: SdkStreamMixin) => {
const transformMethods: Array<keyof SdkStreamMixin> = [
"transformToByteArray",
"transformToString",
"transformToWebStream",
];
for (const method of transformMethods) {
try {
await sdkStream[method]();
fail(new Error("expect subsequent tranform to fail"));
} catch (error) {
expect(error.message).toContain("The stream has already been transformed");
}
}
};

beforeEach(() => {
passThrough = new PassThrough();
});

it("should throw if unexpected stream implementation is supplied", () => {
try {
const payload = {};
sdkStreamMixin(payload);
fail("should throw when unexpected stream is supplied");
} catch (error) {
expect(error.message).toContain("Unexpected stream implementation");
}
});

describe("transformToByteArray", () => {
it("should transform binary stream to byte array", async () => {
const mockData = [Buffer.from("foo"), Buffer.from("bar"), Buffer.from("buzz")];
const expected = byteArrayFromBuffer(Buffer.from("foobarbuzz"));
const sdkStream = sdkStreamMixin(passThrough);
await writeDataToStream(passThrough, mockData);
expect(await sdkStream.transformToByteArray()).toEqual(expected);
});

it("should fail any subsequent tranform calls", async () => {
const sdkStream = sdkStreamMixin(passThrough);
await writeDataToStream(passThrough, [Buffer.from("abc")]);
expect(await sdkStream.transformToByteArray()).toEqual(byteArrayFromBuffer(Buffer.from("abc")));
await expectAllTransformsToFail(sdkStream);
});
});

describe("transformToString", () => {
const toStringMock = jest.fn();
beforeAll(() => {
jest.resetAllMocks();
});

it("should transform the stream to string with utf-8 encoding by default", async () => {
(fromArrayBuffer as jest.Mock).mockImplementation(
jest.requireActual("@aws-sdk/util-buffer-from").fromArrayBuffer
);
const sdkStream = sdkStreamMixin(passThrough);
await writeDataToStream(passThrough, [Buffer.from("foo")]);
const transformed = await sdkStream.transformToString();
expect(transformed).toEqual("foo");
});

it.each([undefined, "utf-8", "ascii", "base64", "latin1", "binary"])(
"should transform the stream to string with %s encoding",
async (encoding) => {
(fromArrayBuffer as jest.Mock).mockReturnValue({ toString: toStringMock });
const sdkStream = sdkStreamMixin(passThrough);
await writeDataToStream(passThrough, [Buffer.from("foo")]);
await sdkStream.transformToString(encoding);
expect(toStringMock).toBeCalledWith(encoding);
}
);

it("should fail any subsequent tranform calls", async () => {
const sdkStream = sdkStreamMixin(passThrough);
await writeDataToStream(passThrough, [Buffer.from("foo")]);
await sdkStream.transformToString();
await expectAllTransformsToFail(sdkStream);
});
});

describe("transformToWebStream", () => {
it("should throw if any event listener is attached on the underlying stream", async () => {
passThrough.on("data", console.log);
const sdkStream = sdkStreamMixin(passThrough);
try {
sdkStream.transformToWebStream();
fail(new Error("expect web stream transformation to fail"));
} catch (error) {
expect(error.message).toContain("The stream has been consumed by other callbacks");
}
});

describe("when Readable.toWeb() is not supported", () => {
// @ts-expect-error
const originalToWebImpl = Readable.toWeb;
beforeAll(() => {
// @ts-expect-error
Readable.toWeb = undefined;
});
afterAll(() => {
// @ts-expect-error
Readable.toWeb = originalToWebImpl;
});

it("should throw", async () => {
const sdkStream = sdkStreamMixin(passThrough);
try {
sdkStream.transformToWebStream();
fail(new Error("expect web stream transformation to fail"));
} catch (error) {
expect(error.message).toContain("Readable.toWeb() is not supported");
}
});
});

describe("when Readable.toWeb() is supported", () => {
// @ts-expect-error
const originalToWebImpl = Readable.toWeb;
beforeAll(() => {
// @ts-expect-error
Readable.toWeb = jest.fn().mockReturnValue("A web stream");
});

afterAll(() => {
// @ts-expect-error
Readable.toWeb = originalToWebImpl;
});

it("should tranform Node stream to web stream", async () => {
const sdkStream = sdkStreamMixin(passThrough);
sdkStream.transformToWebStream();
// @ts-expect-error
expect(Readable.toWeb).toBeCalled();
});

it("should fail any subsequent tranform calls", async () => {
const sdkStream = sdkStreamMixin(passThrough);
await writeDataToStream(passThrough, [Buffer.from("foo")]);
await sdkStream.transformToWebStream();
await expectAllTransformsToFail(sdkStream);
});
});
});
});
54 changes: 54 additions & 0 deletions packages/util-stream-node/src/sdk-stream-mixin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { streamCollector } from "@aws-sdk/node-http-handler";
import { SdkStream, SdkStreamMixin } from "@aws-sdk/types";
import { fromArrayBuffer } from "@aws-sdk/util-buffer-from";
import { Readable } from "stream";

const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transformed.";

/**
* The function that mixes in the utility functions to help consuming runtime-specific payload stream.
*
* @internal
*/
export const sdkStreamMixin = (stream: unknown): SdkStream<Readable> => {
if (!(stream instanceof Readable)) {
// @ts-ignore
const name = stream?.__proto__?.constructor?.name || stream;
throw new Error(`Unexpected stream implementation, expect Stream.Readable instance, got ${name}`);
}

let transformed = false;
const transformToByteArray = async () => {
if (transformed) {
throw new Error(ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED);
}
transformed = true;
return await streamCollector(stream);
};

return Object.assign<Readable, SdkStreamMixin>(stream, {
transformToByteArray,
transformToString: async (encoding?: string) => {
const buf = await transformToByteArray();
return fromArrayBuffer(buf.buffer, buf.byteOffset, buf.byteLength).toString(encoding);
},
transformToWebStream: () => {
if (transformed) {
throw new Error(ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED);
}
if (stream.readableFlowing !== null) {
// Prevent side effect of consuming webstream.
throw new Error("The stream has been consumed by other callbacks.");
}
// @ts-expect-error toWeb() is only available in Node.js >= 17.0.0
if (typeof Readable.toWeb !== "function") {
throw new Error(
"Readable.toWeb() is not supported. Please make sure you are using Node.js >= 17.0.0, or polyfill is available."
);
}
transformed = true;
// @ts-expect-error toWeb() is only available in Node.js >= 17.0.0
return Readable.toWeb(stream);
},
});
};

0 comments on commit 0ef4af6

Please sign in to comment.