Skip to content

Commit

Permalink
feat(util-stream-browser): provide handling utilities for browser str…
Browse files Browse the repository at this point in the history
…eam (#3783)

* feat(util-stream-browser): intial commit to sdk stream mixin

* feat(util-stream-browser): implement browser stream utilities

* feat(util-stream-browser): update sdkStreamMixin input to unknown

* fix(util-stream-browser): address feedbacks

Co-authored-by: Trivikram Kamat <16024985+trivikr@users.noreply.github.com>
  • Loading branch information
AllanZhengYP and trivikr committed Jul 13, 2022
1 parent 0ef4af6 commit 2255877
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 2 deletions.
7 changes: 5 additions & 2 deletions packages/util-stream-browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"build:types": "tsc -p tsconfig.types.json",
"build:types:downlevel": "downlevel-dts dist-types dist-types/ts3.4",
"clean": "rimraf ./dist-* && rimraf *.tsbuildinfo",
"test": "exit 0"
"test": "jest"
},
"main": "./dist-es/index.js",
"module": "./dist-es/index.js",
Expand All @@ -18,11 +18,14 @@
},
"license": "Apache-2.0",
"dependencies": {
"@aws-sdk/fetch-http-handler": "*",
"@aws-sdk/types": "*",
"@aws-sdk/util-base64-browser": "*",
"@aws-sdk/util-hex-encoding": "*",
"@aws-sdk/util-utf8-browser": "*",
"tslib": "^2.3.1"
},
"devDependencies": {
"@types/node": "^10.0.0",
"concurrently": "7.0.0",
"downlevel-dts": "0.7.0",
"rimraf": "3.0.2",
Expand Down
1 change: 1 addition & 0 deletions packages/util-stream-browser/src/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from "./getAwsChunkedEncodingStream";
export * from "./sdk-stream-mixin";
222 changes: 222 additions & 0 deletions packages/util-stream-browser/src/sdk-stream-mixin.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// @jest-environment jsdom
import { streamCollector } from "@aws-sdk/fetch-http-handler";
import { SdkStreamMixin } from "@aws-sdk/types";
import { toBase64 } from "@aws-sdk/util-base64-browser";
import { toHex } from "@aws-sdk/util-hex-encoding";
import { toUtf8 } from "@aws-sdk/util-utf8-browser";

import { sdkStreamMixin } from "./sdk-stream-mixin";

jest.mock("@aws-sdk/fetch-http-handler");
jest.mock("@aws-sdk/util-base64-browser");
jest.mock("@aws-sdk/util-hex-encoding");
jest.mock("@aws-sdk/util-utf8-browser");

const mockStreamCollectorReturn = Uint8Array.from([117, 112, 113]);
(streamCollector as jest.Mock).mockReturnValue(mockStreamCollectorReturn);

describe(sdkStreamMixin.name, () => {
const expectAllTransformsToFail = async (sdkStream: SdkStreamMixin) => {
const transformMethods: Array<keyof SdkStreamMixin> = [
"transformToByteArray",
"transformToString",
"transformToWebStream",
];
for (const method of transformMethods) {
try {
await sdkStream[method]();
fail(new Error("expect subsequent tranform to fail"));
} catch (error) {
expect(error.message).toContain("The stream has already been transformed");
}
}
};

let originalReadableStreamCtr = global.ReadableStream;
const mockReadableStream = jest.fn();
class ReadableStream {
constructor() {
mockReadableStream();
}
}

let payloadStream: ReadableStream;

beforeAll(() => {
global.ReadableStream = ReadableStream as any;
});

beforeEach(() => {
originalReadableStreamCtr = global.ReadableStream;
jest.clearAllMocks();
payloadStream = new ReadableStream();
});

afterEach(() => {
global.ReadableStream = originalReadableStreamCtr;
});

it("should throw if input stream is not a Blob or Web Stream instance", () => {
const originalBlobCtr = global.Blob;
global.Blob = undefined;
global.ReadableStream = undefined;
try {
sdkStreamMixin({});
fail("expect unexpected stream to fail");
} catch (e) {
expect(e.message).toContain("nexpected stream implementation");
global.Blob = originalBlobCtr;
}
});

describe("transformToByteArray", () => {
it("should transform binary stream to byte array", async () => {
const sdkStream = sdkStreamMixin(payloadStream);
const byteArray = await sdkStream.transformToByteArray();
expect(streamCollector as jest.Mock).toBeCalledWith(payloadStream);
expect(byteArray).toEqual(mockStreamCollectorReturn);
});

it("should fail any subsequent tranform calls", async () => {
const sdkStream = sdkStreamMixin(payloadStream);
await sdkStream.transformToByteArray();
await expectAllTransformsToFail(sdkStream);
});
});

describe("transformToString", () => {
let originalTextDecoder = global.TextDecoder;
const mockDecode = jest.fn();
global.TextDecoder = jest.fn().mockImplementation(function () {
return { decode: mockDecode };
});

beforeEach(() => {
originalTextDecoder = global.TextDecoder;
jest.clearAllMocks();
});

afterEach(() => {
global.TextDecoder = originalTextDecoder;
});

it.each([
[undefined, toUtf8],
["utf8", toUtf8],
["utf-8", toUtf8],
["base64", toBase64],
["hex", toHex],
])("should transform to string with %s encoding", async (encoding, encodingFn) => {
const mockEncodedStringValue = `a string with ${encoding} encoding`;
(encodingFn as jest.Mock).mockReturnValueOnce(mockEncodedStringValue);
const sdkStream = sdkStreamMixin(payloadStream);
const str = await sdkStream.transformToString(encoding);
expect(streamCollector).toBeCalled();
expect(encodingFn).toBeCalledWith(mockStreamCollectorReturn);
expect(str).toEqual(mockEncodedStringValue);
});

it("should use TexDecoder to handle other encodings", async () => {
const utfLabel = "windows-1251";
mockDecode.mockReturnValue(`a string with ${utfLabel} encoding`);
const sdkStream = sdkStreamMixin(payloadStream);
const str = await sdkStream.transformToString(utfLabel);
expect(global.TextDecoder).toBeCalledWith(utfLabel);
expect(str).toEqual(`a string with ${utfLabel} encoding`);
});

it("should throw if TextDecoder is not available", async () => {
global.TextDecoder = null;
const utfLabel = "windows-1251";
const sdkStream = sdkStreamMixin(payloadStream);
try {
await sdkStream.transformToString(utfLabel);
fail("expect transformToString to throw when TextDecoder is not available");
} catch (error) {
expect(error.message).toContain("TextDecoder is not available");
}
});

it("should fail any subsequent tranform calls", async () => {
const sdkStream = sdkStreamMixin(payloadStream);
await sdkStream.transformToString();
await expectAllTransformsToFail(sdkStream);
});
});

describe("transformToWebStream with ReadableStream payload", () => {
it("should return the payload if it is Web Stream instance", () => {
const payloadStream = new ReadableStream();
const sdkStream = sdkStreamMixin(payloadStream as any);
const transformed = sdkStream.transformToWebStream();
expect(transformed).toBe(payloadStream);
});

it("should fail any subsequent tranform calls", async () => {
const payloadStream = new ReadableStream();
const sdkStream = sdkStreamMixin(payloadStream as any);
sdkStream.transformToWebStream();
await expectAllTransformsToFail(sdkStream);
});
});

describe("transformToWebStream with Blob payload", () => {
let originalBlobCtr = global.Blob;
const mockBlob = jest.fn();
const mockBlobStream = jest.fn();
class Blob {
constructor() {
mockBlob();
}

stream() {
return mockBlobStream();
}
}
global.Blob = Blob as any;

beforeEach(() => {
global.ReadableStream = undefined;
originalBlobCtr = global.Blob;
jest.clearAllMocks();
});

afterEach(() => {
global.Blob = originalBlobCtr;
});

it("should transform blob to web stream with Blob.stream()", () => {
mockBlobStream.mockReturnValue("transformed");
const payloadStream = new Blob();
const sdkStream = sdkStreamMixin(payloadStream as any);
const transformed = sdkStream.transformToWebStream();
expect(transformed).toBe("transformed");
expect(mockBlobStream).toBeCalled();
});

it("should fail if Blob.stream() is not available", async () => {
class Blob {
constructor() {
mockBlob();
}
}

global.Blob = Blob as any;
const payloadStream = new Blob();
const sdkStream = sdkStreamMixin(payloadStream as any);
try {
sdkStream.transformToWebStream();
fail("expect to fail");
} catch (e) {
expect(e.message).toContain("Please make sure the Blob.stream() is polyfilled");
}
});

it("should fail any subsequent tranform calls", async () => {
const payloadStream = new Blob();
const sdkStream = sdkStreamMixin(payloadStream as any);
sdkStream.transformToWebStream();
await expectAllTransformsToFail(sdkStream);
});
});
});
79 changes: 79 additions & 0 deletions packages/util-stream-browser/src/sdk-stream-mixin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { streamCollector } from "@aws-sdk/fetch-http-handler";
import { SdkStream, SdkStreamMixin } from "@aws-sdk/types";
import { toBase64 } from "@aws-sdk/util-base64-browser";
import { toHex } from "@aws-sdk/util-hex-encoding";
import { toUtf8 } from "@aws-sdk/util-utf8-browser";

const ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED = "The stream has already been transformed.";

/**
* The stream handling utility functions for browsers and React Native
*
* @internal
*/
export const sdkStreamMixin = (stream: unknown): SdkStream<ReadableStream | Blob> => {
if (!isBlobInstance(stream) && !isReadableStreamInstance(stream)) {
//@ts-ignore
const name = stream?.__proto__?.constructor?.name || stream;
throw new Error(`Unexpected stream implementation, expect Blob or ReadableStream, got ${name}`);
}

let transformed = false;
const transformToByteArray = async () => {
if (transformed) {
throw new Error(ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED);
}
transformed = true;
return await streamCollector(stream);
};

const blobToWebStream = (blob: Blob) => {
if (typeof blob.stream !== "function") {
throw new Error(
"Cannot transform payload Blob to web stream. Please make sure the Blob.stream() is polyfilled.\n" +
"If you are using React Native, this API is not yet supported, see: https://react-native.canny.io/feature-requests/p/fetch-streaming-body"
);
}
return blob.stream();
};

return Object.assign<ReadableStream | Blob, SdkStreamMixin>(stream, {
transformToByteArray: transformToByteArray,

transformToString: async (encoding?: string) => {
const buf = await transformToByteArray();
if (encoding === "base64") {
return toBase64(buf);
} else if (encoding === "hex") {
return toHex(buf);
} else if (encoding === undefined || encoding === "utf8" || encoding === "utf-8") {
// toUtf8() itself will use TextDecoder and fallback to pure JS implementation.
return toUtf8(buf);
} else if (typeof TextDecoder === "function") {
return new TextDecoder(encoding).decode(buf);
} else {
throw new Error("TextDecoder is not available, please make sure polyfill is provided.");
}
},

transformToWebStream: () => {
if (transformed) {
throw new Error(ERR_MSG_STREAM_HAS_BEEN_TRANSFORMED);
}
transformed = true;
if (isBlobInstance(stream)) {
// ReadableStream is undefined in React Native
return blobToWebStream(stream);
} else if (isReadableStreamInstance(stream)) {
return stream;
} else {
throw new Error(`Cannot transform payload to web stream, got ${stream}`);
}
},
});
};

const isBlobInstance = (stream: unknown): stream is Blob => typeof Blob === "function" && stream instanceof Blob;

const isReadableStreamInstance = (stream: unknown): stream is ReadableStream =>
typeof ReadableStream === "function" && stream instanceof ReadableStream;

0 comments on commit 2255877

Please sign in to comment.