Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: disable retry-request, add exponential backoff in mutateRows and readRows #1060

Merged
merged 18 commits into from
Mar 31, 2022
Merged
80 changes: 0 additions & 80 deletions src/decorateStatus.ts

This file was deleted.

2 changes: 0 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import {
CreateInstanceResponse,
IInstance,
} from './instance';
import {shouldRetryRequest} from './decorateStatus';
import {google} from '../protos/protos';
import {ServiceError} from 'google-gax';
import * as v2 from './v2';
Expand Down Expand Up @@ -842,7 +841,6 @@ export class Bigtable {
currentRetryAttempt: 0,
noResponseRetries: 0,
objectMode: true,
shouldRetryFn: shouldRetryRequest,
},
config.retryOpts
);
Expand Down
121 changes: 93 additions & 28 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import {promisifyAll} from '@google-cloud/promisify';
import arrify = require('arrify');
import {ServiceError} from 'google-gax';
import {decorateStatus} from './decorateStatus';
import {BackoffSettings} from 'google-gax/build/src/gax';
import {PassThrough, Transform} from 'stream';

// eslint-disable-next-line @typescript-eslint/no-var-requires
Expand Down Expand Up @@ -46,9 +46,16 @@ import {Duplex} from 'stream';
// See protos/google/rpc/code.proto
// (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE)
const RETRYABLE_STATUS_CODES = new Set([4, 10, 14]);
const IDEMPOTENT_RETRYABLE_STATUS_CODES = new Set([4, 14]);
// (1=CANCELLED)
const IGNORED_STATUS_CODES = new Set([1]);

const DEFAULT_BACKOFF_SETTINGS: BackoffSettings = {
initialRetryDelayMillis: 10,
retryDelayMultiplier: 2,
maxRetryDelayMillis: 60000,
};

/**
* @typedef {object} Policy
* @property {number} [version] Specifies the format of the policy.
Expand Down Expand Up @@ -735,7 +742,8 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
const rowsLimit = options.limit || 0;
const hasLimit = rowsLimit !== 0;
let rowsRead = 0;
let numRequestsMade = 0;
let numConsecutiveErrors = 0;
let retryTimer: NodeJS.Timeout | null;

rowKeys = options.keys || [];

Expand Down Expand Up @@ -788,13 +796,20 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
if (activeRequestStream) {
activeRequestStream.abort();
}
if (retryTimer) {
clearTimeout(retryTimer);
}
return end();
};

let chunkTransformer: ChunkTransformer;
let rowStream: Duplex;

const makeNewRequest = () => {
// Avoid cancelling an expired timer if user
// cancelled the stream in the middle of a retry
retryTimer = null;

const lastRowKey = chunkTransformer ? chunkTransformer.lastRowKey : '';
// eslint-disable-next-line @typescript-eslint/no-explicit-any
chunkTransformer = new ChunkTransformer({decode: options.decode} as any);
Expand All @@ -805,7 +820,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
} as google.bigtable.v2.IReadRowsRequest;

const retryOpts = {
currentRetryAttempt: numRequestsMade,
currentRetryAttempt: numConsecutiveErrors,
// Handling retries in this client. Specify the retry options to
// make sure nothing is retried in retry-request.
noResponseRetries: 0,
shouldRetryFn: (_: any) => {
return false;
},
};

if (lastRowKey) {
Expand Down Expand Up @@ -915,7 +936,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
) {
return next();
}
numRequestsMade = 0;
rowsRead++;
const row = this.row(rowData.key);
row.data = rowData.data;
Expand All @@ -936,20 +956,32 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
userStream.end();
return;
}
numConsecutiveErrors++;
if (
numRequestsMade <= maxRetries &&
numConsecutiveErrors <= maxRetries &&
RETRYABLE_STATUS_CODES.has(error.code)
) {
makeNewRequest();
const backOffSettings =
options.gaxOptions?.retry?.backoffSettings ||
DEFAULT_BACKOFF_SETTINGS;
const nextRetryDelay = getNextDelay(
numConsecutiveErrors,
backOffSettings
);
retryTimer = setTimeout(makeNewRequest, nextRetryDelay);
} else {
userStream.emit('error', error);
}
})
.on('data', _ => {
// Reset error count after a successful read so the backoff
// time won't keep increasing when as stream had multiple errors
numConsecutiveErrors = 0;
mutianf marked this conversation as resolved.
Show resolved Hide resolved
})
.on('end', () => {
activeRequestStream = null;
});
rowStream.pipe(userStream);
numRequestsMade++;
};

makeNewRequest();
Expand Down Expand Up @@ -1504,23 +1536,43 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
);
const mutationErrorsByEntryIndex = new Map();

const onBatchResponse = (
err: ServiceError | PartialFailureError | null
) => {
// TODO: enable retries when the entire RPC fails
if (err) {
// The error happened before a request was even made, don't retry.
const isRetryable = (err: ServiceError | null) => {
// Don't retry if there are no more entries or retry attempts
if (pendingEntryIndices.size === 0 || numRequestsMade >= maxRetries + 1) {
return false;
}
// If the error is empty but there are still outstanding mutations,
// it means that there are retryable errors in the mutate response
// even when the RPC succeeded
return !err || IDEMPOTENT_RETRYABLE_STATUS_CODES.has(err.code);
};

const onBatchResponse = (err: ServiceError | null) => {
// Return if the error happened before a request was made
if (numRequestsMade === 0) {
callback(err);
return;
}
if (pendingEntryIndices.size !== 0 && numRequestsMade <= maxRetries) {
makeNextBatchRequest();

if (isRetryable(err)) {
const backOffSettings =
options.gaxOptions?.retry?.backoffSettings ||
DEFAULT_BACKOFF_SETTINGS;
const nextDelay = getNextDelay(numRequestsMade, backOffSettings);
setTimeout(makeNextBatchRequest, nextDelay);
return;
}

// If there's no more pending mutations, set the error
// to null
if (pendingEntryIndices.size === 0) {
err = null;
}

if (mutationErrorsByEntryIndex.size !== 0) {
const mutationErrors = Array.from(mutationErrorsByEntryIndex.values());
err = new PartialFailureError(mutationErrors);
callback(new PartialFailureError(mutationErrors, err));
return;
}

callback(err);
mutianf marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -1541,6 +1593,12 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);

const retryOpts = {
currentRetryAttempt: numRequestsMade,
// Handling retries in this client. Specify the retry options to
// make sure nothing is retried in retry-request.
noResponseRetries: 0,
shouldRetryFn: (_: any) => {
return false;
},
};

this.bigtable
Expand All @@ -1552,13 +1610,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
retryOpts,
})
.on('error', (err: ServiceError) => {
// TODO: this check doesn't actually do anything, onBatchResponse
// currently doesn't retry RPC errors, only entry failures
if (numRequestsMade === 0) {
callback(err); // Likely a "projectId not detected" error.
return;
}

onBatchResponse(err);
})
.on('data', (obj: google.bigtable.v2.IMutateRowsResponse) => {
Expand All @@ -1572,13 +1623,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
mutationErrorsByEntryIndex.delete(originalEntriesIndex);
return;
}
if (!RETRYABLE_STATUS_CODES.has(entry.status!.code!)) {
if (!IDEMPOTENT_RETRYABLE_STATUS_CODES.has(entry.status!.code!)) {
pendingEntryIndices.delete(originalEntriesIndex);
}
const status = decorateStatus(entry.status);
const errorDetails = entry.status;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(status as any).entry = originalEntry;
mutationErrorsByEntryIndex.set(originalEntriesIndex, status);
(errorDetails as any).entry = originalEntry;
mutationErrorsByEntryIndex.set(originalEntriesIndex, errorDetails);
});
})
.on('end', onBatchResponse);
Expand Down Expand Up @@ -1997,14 +2048,25 @@ promisifyAll(Table, {
exclude: ['family', 'row'],
});

function getNextDelay(numConsecutiveErrors: number, config: BackoffSettings) {
// 0 - 100 ms jitter
const jitter = Math.floor(Math.random() * 100);
const calculatedNextRetryDelay =
config.initialRetryDelayMillis *
Math.pow(config.retryDelayMultiplier, numConsecutiveErrors) +
jitter;

return Math.min(calculatedNextRetryDelay, config.maxRetryDelayMillis);
}

export interface GoogleInnerError {
reason?: string;
message?: string;
}

export class PartialFailureError extends Error {
errors?: GoogleInnerError[];
constructor(errors: GoogleInnerError[]) {
constructor(errors: GoogleInnerError[], rpcError?: ServiceError | null) {
super();
this.errors = errors;
this.name = 'PartialFailureError';
Expand All @@ -2017,5 +2079,8 @@ export class PartialFailureError extends Error {
messages.push('\n');
}
this.message = messages.join('\n');
if (rpcError) {
this.message += 'Request failed with: ' + rpcError.message;
}
}
}
2 changes: 1 addition & 1 deletion system-test/data/mutate-rows-retry-test.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
],
"responses": [
{ "code": 200, "entry_codes": [ 4, 4, 4, 4, 4, 1 ] },
{ "code": 200, "entry_codes": [ 10, 14, 10, 14, 0 ] },
{ "code": 200, "entry_codes": [ 4, 14, 14, 14, 0 ] },
{ "code": 200, "entry_codes": [ 1, 4, 4, 0 ] },
{ "code": 200, "entry_codes": [ 0, 4 ] },
{ "code": 200, "entry_codes": [ 4 ] },
Expand Down
4 changes: 2 additions & 2 deletions system-test/data/read-rows-retry-test.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@

{
"name": "resets the retry counter after a successful read",
"max_retries": 3,
"max_retries": 4,
"request_options": [
{ "rowKeys": [],
"rowRanges": [{}]
Expand Down Expand Up @@ -211,7 +211,7 @@

{
"name": "does the previous 5 things in one giant test case",
"max_retries": 3,
"max_retries": 4,
"createReadStream_options": {
"limit": 10,
"ranges": [{
Expand Down
Loading