Skip to content

Commit

Permalink
fix(eventstream-handler-node): start streaming without waiting for re…
Browse files Browse the repository at this point in the history
…sponse (#6311)

* fix(eventstream-handler-node): start streaming without waiting for response

* test(middleware-eventstream): use dummy asynciterator in integ tests
  • Loading branch information
kuhe committed Jul 25, 2024
1 parent ff30a3e commit 0072f42
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ jest.mock("./EventSigningStream");
jest.mock("@smithy/eventstream-codec");

describe(EventStreamPayloadHandler.name, () => {
const collectData = (stream: Readable) => {
const chunks: any = [];
return new Promise((resolve, reject) => {
stream.on("data", (chunk) => chunks.push(chunk));
stream.on("error", reject);
stream.on("end", () => resolve(Buffer.concat(chunks).toString("utf8")));
});
};

const mockMessageSigner: MessageSigner = {
sign: jest.fn(),
signMessage: jest.fn(),
Expand Down Expand Up @@ -49,7 +58,7 @@ describe(EventStreamPayloadHandler.name, () => {
utf8Decoder: mockUtf8Decoder,
utf8Encoder: mockUtf8encoder,
});
const mockRequest = { body: new Readable() } as HttpRequest;
const mockRequest = { body: new PassThrough() } as HttpRequest;

try {
await handler.handle(mockNextHandler, {
Expand Down Expand Up @@ -126,6 +135,42 @@ describe(EventStreamPayloadHandler.name, () => {
});
});

it("should start piping regardless of whether the downstream resolves", async () => {
const authorization =
"AWS4-HMAC-SHA256 Credential=AKID/20200510/us-west-2/foo/aws4_request, SignedHeaders=host, Signature=1234567890";
const originalPayload = new PassThrough();
const mockRequest = {
body: originalPayload,
headers: { authorization },
} as any;
const handler = new EventStreamPayloadHandler({
messageSigner: () => Promise.resolve(mockMessageSigner),
utf8Decoder: mockUtf8Decoder,
utf8Encoder: mockUtf8encoder,
});

(mockNextHandler as any).mockImplementationOnce(async (args: FinalizeHandlerArguments<any>) => {
const handledRequest = args.request as HttpRequest;

originalPayload.end("Some Data");
const collected = await collectData(handledRequest.body);

// this means the stream is flowing without this downstream middleware
// having resolved yet.
expect(collected).toEqual("Some Data");

return Promise.resolve({ output: { handledRequest } });
});

const {
output: { handledRequest },
} = await handler.handle(mockNextHandler, {
request: mockRequest,
input: {},
});
expect(handledRequest.body).not.toBe(originalPayload);
});

it("should start piping to request payload through event signer if downstream middleware returns", async () => {
const authorization =
"AWS4-HMAC-SHA256 Credential=AKID/20200510/us-west-2/foo/aws4_request, SignedHeaders=host, Signature=1234567890";
Expand Down Expand Up @@ -155,14 +200,6 @@ describe(EventStreamPayloadHandler.name, () => {
expect(handledRequest.body).not.toBe(originalPayload);
// Expect the data from the output payload from eventstream payload handler the same as from the
// stream supplied to the handler.
const collectData = (stream: Readable) => {
const chunks: any = [];
return new Promise((resolve, reject) => {
stream.on("data", (chunk) => chunks.push(chunk));
stream.on("error", reject);
stream.on("end", () => resolve(Buffer.concat(chunks).toString("utf8")));
});
};
originalPayload.end("Some Data");
const collected = await collectData(handledRequest.body);
expect(collected).toEqual("Some Data");
Expand Down
25 changes: 12 additions & 13 deletions packages/eventstream-handler-node/src/EventStreamPayloadHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,9 @@ export class EventStreamPayloadHandler implements IEventStreamPayloadHandler {
objectMode: true,
});

let result: FinalizeHandlerOutput<any>;
try {
result = await next(args);
} catch (e) {
// Close the payload stream otherwise the retry would hang
// because of the previous connection.
request.body.end();
throw e;
}

// If response is successful, start piping the payload stream
const match = (request.headers["authorization"] || "").match(/Signature=([\w]+)$/);
const match = request.headers?.authorization?.match(/Signature=([\w]+)$/);
// Sign the eventstream based on the signature from initial request.
const priorSignature = (match || [])[1] || (query && (query["X-Amz-Signature"] as string)) || "";
const priorSignature = match?.[1] ?? (query?.["X-Amz-Signature"] as string) ?? "";
const signingStream = new EventSigningStream({
priorSignature,
eventStreamCodec: this.eventStreamCodec,
Expand All @@ -91,6 +80,16 @@ export class EventStreamPayloadHandler implements IEventStreamPayloadHandler {
}
});

let result: FinalizeHandlerOutput<any>;
try {
result = await next(args);
} catch (e) {
// Close the payload stream otherwise the retry would hang
// because of the previous connection.
request.body.end();
throw e;
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,7 @@ describe("middleware-eventstream", () => {
botAliasId: "undefined",
localeId: "undefined",
sessionId: "undefined",
requestEventStream: {
[Symbol.asyncIterator]() {
return {
next() {
return this as any;
},
};
},
},
requestEventStream: (async function* () {})(),
});

expect.assertions(2);
Expand All @@ -61,15 +53,7 @@ describe("middleware-eventstream", () => {
VideoWidth: "undefined",
VideoHeight: "undefined",
ChallengeVersions: "undefined",
LivenessRequestStream: {
[Symbol.asyncIterator]() {
return {
next() {
return this as any;
},
};
},
},
LivenessRequestStream: (async function* () {})(),
});

expect.assertions(2);
Expand All @@ -91,15 +75,7 @@ describe("middleware-eventstream", () => {
await client.startStreamTranscription({
MediaSampleRateHertz: 144,
MediaEncoding: "ogg-opus",
AudioStream: {
[Symbol.asyncIterator]() {
return {
next() {
return this as any;
},
};
},
},
AudioStream: (async function* () {})(),
});

expect.assertions(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,7 @@ describe("middleware-websocket", () => {
VideoWidth: "1024",
VideoHeight: "1024",
ChallengeVersions: "a,b,c",
LivenessRequestStream: {
[Symbol.asyncIterator]() {
return this as any;
},
},
LivenessRequestStream: (async function* () {})(),
});
});
});
Expand Down

0 comments on commit 0072f42

Please sign in to comment.