Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/googleapis/gax-nodejs into …
Browse files Browse the repository at this point in the history
…prepare-reset-counter-to-0-PR-fix

# Conflicts:
#	gax/test/test-application/src/index.ts
  • Loading branch information
danieljbruce committed May 22, 2024
2 parents 646dc3d + 4b77bf3 commit 9a5d03b
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 17 deletions.
6 changes: 4 additions & 2 deletions gax/src/streamingCalls/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
*/
streamHandoffHelper(stream: CancellableStream, retry: RetryOptions): void {
let enteredError = false;
const eventsToForward = ['metadata', 'response', 'status', 'data'];
const eventsToForward = ['metadata', 'response', 'status'];

eventsToForward.forEach(event => {
stream.on(event, this.emit.bind(this, event));
Expand All @@ -282,8 +282,9 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
this.streamHandoffErrorHandler(stream, retry, error);
});

stream.on('data', () => {
stream.on('data', (data: ResponseType) => {
this.retries = 0;
this.emit('data', data);
});

stream.on('end', () => {
Expand Down Expand Up @@ -423,6 +424,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
this.destroy();
return; //end chunk
} else {
this.retries!++;
retryStream = this.retry(stream, retry);
this.stream = retryStream;
return retryStream;
Expand Down
73 changes: 72 additions & 1 deletion gax/test/test-application/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ async function testShowcase() {
await testCollect(grpcClientWithServerStreamingRetries);
await testChat(grpcClientWithServerStreamingRetries);
await testWait(grpcClientWithServerStreamingRetries);
await testShouldFailOnThirdError(
grpcSequenceClientWithServerStreamingRetries
);
}

function createStreamingSequenceRequestFactory(
Expand Down Expand Up @@ -659,7 +662,7 @@ async function testServerStreamingRetrieswithRetryRequestOptions(
const finalData: string[] = [];
const retryRequestOptions = {
objectMode: true,
retries: 1,
retries: 2,
maxRetryDelay: 70,
retryDelayMultiplier: 3,
totalTimeout: 650,
Expand Down Expand Up @@ -788,6 +791,74 @@ async function testResetRetriesToZero(client: SequenceServiceClient) {
});
}

// When maxRetries are set to 2 then on the third error from the server gax
// should throw an error that says the retry count has been exceeded.
async function testShouldFailOnThirdError(client: SequenceServiceClient) {
const backoffSettings = createBackoffSettings(
100,
1.2,
1000,
null,
1.5,
3000,
null
);
const allowedCodes = [4, 5, 6];
const retryOptions = new RetryOptions(allowedCodes, backoffSettings);
backoffSettings.maxRetries = 2;

const settings = {
retry: retryOptions,
};

client.initialize();

const request = createStreamingSequenceRequestFactory(
[
Status.DEADLINE_EXCEEDED, // Error code 4
Status.NOT_FOUND, // Error code 5
Status.ALREADY_EXISTS, // Error code 6
Status.OK,
],
[0.1, 0.1, 0.1, 0.1],
[0, 0, 0, 1],
'This is testing the brand new and shiny StreamingSequence server 3'
);
const response = await client.createStreamingSequence(request);
await new Promise<void>((resolve, reject) => {
const sequence = response[0];

const attemptRequest =
new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest();
attemptRequest.name = sequence.name!;

const attemptStream = client.attemptStreamingSequence(
attemptRequest,
settings
);
attemptStream.on('data', () => {
reject(new GoogleError('The stream should not receive any data'));
});
attemptStream.on('error', (error: GoogleError) => {
try {
assert.strictEqual(error.code, 4);
assert.strictEqual(
error.message,
'Exceeded maximum number of retries before any response was received'
);
resolve();
} catch (assertionError: unknown) {
reject(assertionError);
}
});
attemptStream.on('end', () => {
reject(
new GoogleError('The stream should not end before it receives an error')
);
});
});
}

// streaming call that retries twice with RetryRequestOpsions and resumes from where it left off
async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrategy(
client: SequenceServiceClient
Expand Down
28 changes: 14 additions & 14 deletions gax/test/unit/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,6 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
const s = new PassThrough({
objectMode: true,
});
s.push('hello');
setImmediate(() => {
s.emit('metadata');
});
Expand Down Expand Up @@ -1348,19 +1347,20 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en
);

call.on('error', err => {
assert(err instanceof GoogleError);
if (err.code !== 14) {
// ignore the error we are expecting
assert.strictEqual(err.code, 4);
// even though max retries is 2
// the retry function will always be called maxRetries+1
// the final call is where the failure happens
assert.strictEqual(retrySpy.callCount, 3);
assert.strictEqual(
err.message,
'Exceeded maximum number of retries before any response was received'
);
done();
try {
assert(err instanceof GoogleError);
if (err.code !== 14) {
// ignore the error we are expecting
assert.strictEqual(err.code, 4);
assert.strictEqual(retrySpy.callCount, 2);
assert.strictEqual(
err.message,
'Exceeded maximum number of retries before any response was received'
);
done();
}
} catch (error: unknown) {
done(error);
}
});
});
Expand Down

0 comments on commit 9a5d03b

Please sign in to comment.