Skip to content

Commit

Permalink
fix: retry ABORTED for non-transactional commits (#1111)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Jun 10, 2020
1 parent 9a24cc0 commit f175236
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 26 deletions.
37 changes: 28 additions & 9 deletions dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import {CallOptions, GoogleError} from 'google-gax';
import {CallOptions, RetryOptions, Status} from 'google-gax';
import {Duplex, PassThrough} from 'stream';
import * as through2 from 'through2';
import {URL} from 'url';
Expand Down Expand Up @@ -53,7 +53,13 @@ import {
Settings,
UnaryMethod,
} from './types';
import {Deferred, isPermanentRpcError, requestTag, wrapError} from './util';
import {
Deferred,
getRetryParams,
isPermanentRpcError,
requestTag,
wrapError,
} from './util';
import {
validateBoolean,
validateFunction,
Expand Down Expand Up @@ -1127,15 +1133,25 @@ export class Firestore {
* Returns GAX call options that set the cloud resource header.
* @private
*/
private createCallOptions(): CallOptions {
return {
private createCallOptions(
methodName: string,
retryCodes?: number[]
): CallOptions {
const callOptions: CallOptions = {
otherArgs: {
headers: {
[CLOUD_RESOURCE_HEADER]: this.formattedName,
...this._settings.customHeaders,
},
},
};

if (retryCodes) {
const retryParams = getRetryParams(methodName);
callOptions.retry = new RetryOptions(retryCodes, retryParams);
}

return callOptions;
}

/**
Expand All @@ -1158,7 +1174,7 @@ export class Firestore {
* and GAX options.
* @param requestTag A unique client-assigned identifier for this request.
* @param func Method returning a Promise than can be retried.
* @returns - A Promise with the function's result if successful within
* @returns A Promise with the function's result if successful within
* `attemptsRemaining`. Otherwise, returns the last rejected Promise.
*/
private async _retry<T>(
Expand Down Expand Up @@ -1188,7 +1204,7 @@ export class Firestore {
} catch (err) {
lastError = err;

if (isPermanentRpcError(err, methodName, serviceConfig)) {
if (isPermanentRpcError(err, methodName)) {
break;
}
}
Expand Down Expand Up @@ -1321,14 +1337,17 @@ export class Firestore {
* and GAX options.
* @param request The Protobuf request to send.
* @param requestTag A unique client-assigned identifier for this request.
* @param retryCodes If provided, a custom list of retry codes. If not
* provided, retry is based on the behavior as defined in the ServiceConfig.
* @returns A Promise with the request result.
*/
request<Req, Resp>(
methodName: FirestoreUnaryMethod,
request: Req,
requestTag: string
requestTag: string,
retryCodes?: number[]
): Promise<Resp> {
const callOptions = this.createCallOptions();
const callOptions = this.createCallOptions(methodName, retryCodes);

return this._clientPool.run(requestTag, async gapicClient => {
try {
Expand Down Expand Up @@ -1371,7 +1390,7 @@ export class Firestore {
request: {},
requestTag: string
): Promise<Duplex> {
const callOptions = this.createCallOptions();
const callOptions = this.createCallOptions(methodName);

const bidirectional = methodName === 'listen';

Expand Down
43 changes: 35 additions & 8 deletions dev/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,26 @@
*/

import {randomBytes} from 'crypto';
import {GoogleError, ServiceConfig, Status} from 'google-gax';
import {
CallSettings,
ClientConfig,
constructSettings,
createDefaultBackoffSettings,
GoogleError,
Status,
} from 'google-gax';
import {BackoffSettings} from 'google-gax/build/src/gax';
import * as gapicConfig from './v1/firestore_client_config.json';

import {DocumentData} from './types';

const serviceConfig = constructSettings(
'google.firestore.v1.Firestore',
gapicConfig as ClientConfig,
{},
Status
) as {[k: string]: CallSettings};

/**
* A Promise implementation that supports deferred resolution.
* @private
Expand Down Expand Up @@ -132,21 +148,32 @@ export function isFunction(value: unknown): boolean {
*/
export function isPermanentRpcError(
err: GoogleError,
methodName: string,
config: ServiceConfig
methodName: string
): boolean {
if (err.code !== undefined) {
const serviceConfigName = methodName[0].toUpperCase() + methodName.slice(1);
const retryCodeNames = config.methods[serviceConfigName]!.retry_codes_name!;
const retryCodes = config.retry_codes![retryCodeNames].map(
errorName => Status[errorName as keyof typeof Status]
);
const retryCodes = getRetryCodes(methodName);
return retryCodes.indexOf(err.code) === -1;
} else {
return false;
}
}

/**
* Returns the list of retryable error codes specified in the service
* configuration.
*/
export function getRetryCodes(methodName: string): number[] {
return serviceConfig[methodName]?.retry?.retryCodes ?? [];
}

/** Returns the backoff setting from the service configuration. */
export function getRetryParams(methodName: string): BackoffSettings {
return (
serviceConfig[methodName]?.retry?.backoffSettings ??
createDefaultBackoffSettings()
);
}

/**
* Wraps the provided error in a new error that includes the provided stack.
*
Expand Down
24 changes: 18 additions & 6 deletions dev/src/write-batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
* limitations under the License.
*/

import * as assert from 'assert';
import {describe, it} from 'mocha';

import {google} from '../protos/firestore_v1_proto_api';
import {
DocumentMask,
Expand All @@ -37,7 +34,13 @@ import {
UpdateMap,
} from './types';
import {DocumentData} from './types';
import {isObject, isPlainObject, requestTag, wrapError} from './util';
import {
getRetryCodes,
isObject,
isPlainObject,
requestTag,
wrapError,
} from './util';
import {
customObjectMessage,
invalidArgumentMessage,
Expand Down Expand Up @@ -565,10 +568,12 @@ export class WriteBatch {
request.writes!.push(req.write);
}

const retryCodes = [Status.ABORTED, ...getRetryCodes('commit')];

const response = await this._firestore.request<
api.IBatchWriteRequest,
api.BatchWriteResponse
>('batchWrite', request, tag);
>('batchWrite', request, tag, retryCodes);

return (response.writeResults || []).map((result, i) => {
const status = response.status[i];
Expand Down Expand Up @@ -637,13 +642,20 @@ export class WriteBatch {
request.writes!.length
);

let retryCodes: number[] | undefined;

if (explicitTransaction) {
request.transaction = explicitTransaction;
} else {
// Commits outside of transaction should also be retried when they fail
// with status code ABORTED.
retryCodes = [Status.ABORTED, ...getRetryCodes('commit')];
}

const response = await this._firestore.request<
api.ICommitRequest,
api.CommitResponse
>('commit', request, tag);
>('commit', request, tag, retryCodes);

return (response.writeResults || []).map(
writeResult =>
Expand Down
4 changes: 3 additions & 1 deletion dev/test/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ describe('BulkWriter', () => {
enforceSingleConcurrentRequest = false
): Promise<BulkWriter> {
const overrides: ApiOverride = {
batchWrite: async request => {
batchWrite: async (request, options) => {
expect(options!.retry!.retryCodes).contains(Status.ABORTED);

expect(request).to.deep.eq({
database: `projects/${PROJECT_ID}/databases/(default)`,
writes: mock[requestCounter].request.writes,
Expand Down
6 changes: 5 additions & 1 deletion dev/test/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,11 @@ function runTransaction<T>(
return response(request.response as api.IBeginTransactionResponse);
}
},
commit: actual => {
commit: (actual, options) => {
// Ensure that we do not specify custom retry behavior for transactional
// commits.
expect(options!.retry).to.be.undefined;

const request = expectedRequests.shift()!;
expect(request.type).to.equal('commit');
expect(actual).to.deep.eq(request.request);
Expand Down
4 changes: 3 additions & 1 deletion dev/test/write-batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,9 @@ describe('batch support', () => {

beforeEach(() => {
const overrides: ApiOverride = {
commit: request => {
commit: (request, options) => {
expect(options!.retry!.retryCodes).contains(Status.ABORTED);

expect(request).to.deep.eq({
database: `projects/${PROJECT_ID}/databases/(default)`,
writes: [
Expand Down

0 comments on commit f175236

Please sign in to comment.