diff --git a/gax/src/streamingCalls/streaming.ts b/gax/src/streamingCalls/streaming.ts index 414e770b2..3b777d9e9 100644 --- a/gax/src/streamingCalls/streaming.ts +++ b/gax/src/streamingCalls/streaming.ts @@ -419,6 +419,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult { this.destroy(); return; //end chunk } else { + this.retries!++; retryStream = this.retry(stream, retry); this.stream = retryStream; return retryStream; diff --git a/gax/test/test-application/src/index.ts b/gax/test/test-application/src/index.ts index f7328414a..8df241036 100644 --- a/gax/test/test-application/src/index.ts +++ b/gax/test/test-application/src/index.ts @@ -150,6 +150,9 @@ async function testShowcase() { await testCollect(grpcClientWithServerStreamingRetries); await testChat(grpcClientWithServerStreamingRetries); await testWait(grpcClientWithServerStreamingRetries); + await testShouldFailOnThirdError( + grpcSequenceClientWithServerStreamingRetries + ); } function createStreamingSequenceRequestFactory( @@ -657,7 +660,7 @@ async function testServerStreamingRetrieswithRetryRequestOptions( const finalData: string[] = []; const retryRequestOptions = { objectMode: true, - retries: 1, + retries: 2, maxRetryDelay: 70, retryDelayMultiplier: 3, totalTimeout: 650, @@ -711,6 +714,74 @@ async function testServerStreamingRetrieswithRetryRequestOptions( }); } +// When maxRetries are set to 2 then on the third error from the server gax +// should throw an error that says the retry count has been exceeded. +async function testShouldFailOnThirdError(client: SequenceServiceClient) { + const backoffSettings = createBackoffSettings( + 100, + 1.2, + 1000, + null, + 1.5, + 3000, + null + ); + const allowedCodes = [4, 5, 6]; + const retryOptions = new RetryOptions(allowedCodes, backoffSettings); + backoffSettings.maxRetries = 2; + + const settings = { + retry: retryOptions, + }; + + client.initialize(); + + const request = createStreamingSequenceRequestFactory( + [ + Status.DEADLINE_EXCEEDED, // Error code 4 + Status.NOT_FOUND, // Error code 5 + Status.ALREADY_EXISTS, // Error code 6 + Status.OK, + ], + [0.1, 0.1, 0.1, 0.1], + [0, 0, 0, 1], + 'This is testing the brand new and shiny StreamingSequence server 3' + ); + const response = await client.createStreamingSequence(request); + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + attemptStream.on('data', () => { + reject(new GoogleError('The stream should not receive any data')); + }); + attemptStream.on('error', (error: GoogleError) => { + try { + assert.strictEqual(error.code, 4); + assert.strictEqual( + error.message, + 'Exceeded maximum number of retries before any response was received' + ); + resolve(); + } catch (assertionError: unknown) { + reject(assertionError); + } + }); + attemptStream.on('end', () => { + reject( + new GoogleError('The stream should not end before it receives an error') + ); + }); + }); +} + // streaming call that retries twice with RetryRequestOpsions and resumes from where it left off async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrategy( client: SequenceServiceClient diff --git a/gax/test/unit/streaming.ts b/gax/test/unit/streaming.ts index e3807911e..6bc4f2f82 100644 --- a/gax/test/unit/streaming.ts +++ b/gax/test/unit/streaming.ts @@ -1348,19 +1348,20 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en ); call.on('error', err => { - assert(err instanceof GoogleError); - if (err.code !== 14) { - // ignore the error we are expecting - assert.strictEqual(err.code, 4); - // even though max retries is 2 - // the retry function will always be called maxRetries+1 - // the final call is where the failure happens - assert.strictEqual(retrySpy.callCount, 3); - assert.strictEqual( - err.message, - 'Exceeded maximum number of retries before any response was received' - ); - done(); + try { + assert(err instanceof GoogleError); + if (err.code !== 14) { + // ignore the error we are expecting + assert.strictEqual(err.code, 4); + assert.strictEqual(retrySpy.callCount, 2); + assert.strictEqual( + err.message, + 'Exceeded maximum number of retries before any response was received' + ); + done(); + } + } catch (error: unknown) { + done(error); } }); });