Skip to content

Commit

Permalink
fix: reduce duplicate code (#1612)
Browse files Browse the repository at this point in the history
* fix: reduce duplicate code

* fix: removed unused dependency

* fix: npm run fix

* apply Dan's suggestion

* add error handling back in

* lint
  • Loading branch information
leahecole committed Jun 6, 2024
1 parent d3e3bf9 commit c540ef6
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 58 deletions.
66 changes: 18 additions & 48 deletions gax/src/streamingCalls/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
*/
streamHandoffHelper(stream: CancellableStream, retry: RetryOptions): void {
let enteredError = false;
const eventsToForward = ['metadata', 'response', 'status'];

eventsToForward.forEach(event => {
stream.on(event, this.emit.bind(this, event));
});
this.eventForwardHelper(stream);

stream.on('error', error => {
enteredError = true;
Expand All @@ -296,20 +292,14 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
});
}

/**
* Forward events from an API request stream to the user's stream.
* @param {Stream} stream - The API request stream.
* @param {RetryOptions} retry - Configures the exceptions upon which the
* function should retry, and the parameters to the exponential backoff retry
* algorithm.
*/

forwardEvents(stream: Stream) {
eventForwardHelper(stream: Stream) {
const eventsToForward = ['metadata', 'response', 'status'];
eventsToForward.forEach(event => {
stream.on(event, this.emit.bind(this, event));
});
}

statusMetadataHelper(stream: Stream) {
// gRPC is guaranteed emit the 'status' event but not 'metadata', and 'status' is the last event to emit.
// Emit the 'response' event if stream has no 'metadata' event.
// This avoids the stream swallowing the other events, such as 'end'.
Expand Down Expand Up @@ -340,6 +330,18 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
});
this._responseHasSent = true;
});
}
/**
* Forward events from an API request stream to the user's stream.
* @param {Stream} stream - The API request stream.
* @param {RetryOptions} retry - Configures the exceptions upon which the
* function should retry, and the parameters to the exponential backoff retry
* algorithm.
*/
forwardEvents(stream: Stream) {
this.eventForwardHelper(stream);
this.statusMetadataHelper(stream);

stream.on('error', error => {
GoogleError.parseGRPCStatusDetails(error);
});
Expand Down Expand Up @@ -368,40 +370,8 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
retry: RetryOptions
): CancellableStream | undefined {
let retryStream = this.stream;
const eventsToForward = ['metadata', 'response', 'status'];
eventsToForward.forEach(event => {
stream.on(event, this.emit.bind(this, event));
});
// gRPC is guaranteed emit the 'status' event but not 'metadata', and 'status' is the last event to emit.
// Emit the 'response' event if stream has no 'metadata' event.
// This avoids the stream swallowing the other events, such as 'end'.
stream.on('status', () => {
if (!this._responseHasSent) {
stream.emit('response', {
code: 200,
details: '',
message: 'OK',
});
}
});

// We also want to supply the status data as 'response' event to support
// the behavior of google-cloud-node expects.
// see:
// https://github.com/GoogleCloudPlatform/google-cloud-node/pull/1775#issuecomment-259141029
// https://github.com/GoogleCloudPlatform/google-cloud-node/blob/116436fa789d8b0f7fc5100b19b424e3ec63e6bf/packages/common/src/grpc-service.js#L355
stream.on('metadata', metadata => {
// Create a response object with succeeds.
// TODO: unify this logic with the decoration of gRPC response when it's
// added. see: https://github.com/googleapis/gax-nodejs/issues/65
stream.emit('response', {
code: 200,
details: '',
message: 'OK',
metadata,
});
this._responseHasSent = true;
});
this.eventForwardHelper(stream);
this.statusMetadataHelper(stream);

stream.on('error', error => {
const timeout = retry.backoffSettings.totalTimeoutMillis;
Expand Down
10 changes: 2 additions & 8 deletions gax/src/streamingRetryRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const requestOps = null;
const objectMode = true; // we don't support objectMode being false

interface streamingRetryRequestOptions {
request?: Function;
request: Function;
maxRetries?: number;
}
/**
Expand All @@ -40,14 +40,8 @@ interface streamingRetryRequestOptions {
*/
export function streamingRetryRequest(opts: streamingRetryRequestOptions) {
opts = Object.assign({}, DEFAULTS, opts);

if (opts.request === undefined) {
try {
// eslint-disable-next-line node/no-unpublished-require
opts.request = require('request');
} catch (e) {
throw new Error('A request library must be provided to retry-request.');
}
throw new Error('A request function must be provided');
}

let numNoResponseAttempts = 0;
Expand Down
4 changes: 2 additions & 2 deletions gax/test/unit/streamingRetryRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ describe('retry-request', () => {
});
assert.strictEqual(retryStream._readableState.objectMode, true);
});

it('throws request error', done => {
try {
const opts = {};
//@ts-expect-error
streamingRetryRequest(opts);
} catch (err) {
assert(err instanceof Error);
assert.match(err.message, /A request library must be provided/);
assert.match(err.message, /A request function must be provided/);
done();
}
});
Expand Down

0 comments on commit c540ef6

Please sign in to comment.