Skip to content

Commit

Permalink
feat: retry BatchGetDocuments RPCs that fail with errors (#1544)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Jun 25, 2021
1 parent 7473343 commit b39dd3c
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 187 deletions.
182 changes: 182 additions & 0 deletions dev/src/document-reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*!
* Copyright 2021 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {DocumentSnapshot, DocumentSnapshotBuilder} from './document';
import {DocumentReference} from './reference';
import {FieldPath} from './path';
import {isPermanentRpcError} from './util';
import {google} from '../protos/firestore_v1_proto_api';
import {logger} from './logger';
import {Firestore} from './index';
import {DocumentData} from '@google-cloud/firestore';
import api = google.firestore.v1;

/**
* A wrapper around BatchGetDocumentsRequest that retries request upon stream
* failure and returns ordered results.
*
* @private
*/
export class DocumentReader<T> {
/** An optional field mask to apply to this read. */
fieldMask?: FieldPath[];
/** An optional transaction ID to use for this read. */
transactionId?: Uint8Array;

private outstandingDocuments = new Set<string>();
private retrievedDocuments = new Map<string, DocumentSnapshot>();

/**
* Creates a new DocumentReader that fetches the provided documents (via
* `get()`).
*
* @param firestore The Firestore instance to use.
* @param allDocuments The documents to get.
*/
constructor(
private firestore: Firestore,
private allDocuments: Array<DocumentReference<T>>
) {
for (const docRef of this.allDocuments) {
this.outstandingDocuments.add(docRef.formattedName);
}
}

/**
* Invokes the BatchGetDocuments RPC and returns the results.
*
* @param requestTag A unique client-assigned identifier for this request.
*/
async get(requestTag: string): Promise<Array<DocumentSnapshot<T>>> {
await this.fetchDocuments(requestTag);

// BatchGetDocuments doesn't preserve document order. We use the request
// order to sort the resulting documents.
const orderedDocuments: Array<DocumentSnapshot<T>> = [];

for (const docRef of this.allDocuments) {
const document = this.retrievedDocuments.get(docRef.formattedName);
if (document !== undefined) {
// Recreate the DocumentSnapshot with the DocumentReference
// containing the original converter.
const finalDoc = new DocumentSnapshotBuilder(
docRef as DocumentReference<T>
);
finalDoc.fieldsProto = document._fieldsProto;
finalDoc.readTime = document.readTime;
finalDoc.createTime = document.createTime;
finalDoc.updateTime = document.updateTime;
orderedDocuments.push(finalDoc.build());
} else {
throw new Error(`Did not receive document for "${docRef.path}".`);
}
}

return orderedDocuments;
}

private async fetchDocuments(requestTag: string): Promise<void> {
if (!this.outstandingDocuments.size) {
return;
}

const request: api.IBatchGetDocumentsRequest = {
database: this.firestore.formattedName,
transaction: this.transactionId,
documents: Array.from(this.outstandingDocuments),
};

if (this.fieldMask) {
const fieldPaths = this.fieldMask.map(
fieldPath => fieldPath.formattedName
);
request.mask = {fieldPaths};
}

let resultCount = 0;

try {
const stream = await this.firestore.requestStream(
'batchGetDocuments',
request,
requestTag
);
stream.resume();

for await (const response of stream) {
let snapshot: DocumentSnapshot<DocumentData>;

if (response.found) {
logger(
'DocumentReader.fetchDocuments',
requestTag,
'Received document: %s',
response.found.name!
);
snapshot = this.firestore.snapshot_(
response.found,
response.readTime!
);
} else {
logger(
'DocumentReader.fetchDocuments',
requestTag,
'Document missing: %s',
response.missing!
);
snapshot = this.firestore.snapshot_(
response.missing!,
response.readTime!
);
}

const path = snapshot.ref.formattedName;
this.outstandingDocuments.delete(path);
this.retrievedDocuments.set(path, snapshot);
++resultCount;
}
} catch (error) {
const shouldRetry =
// Transactional reads are retried via the transaction runner.
!this.transactionId &&
// Only retry if we made progress.
resultCount > 0 &&
// Don't retry permanent errors.
error.code !== undefined &&
!isPermanentRpcError(error, 'batchGetDocuments');

logger(
'DocumentReader.fetchDocuments',
requestTag,
'BatchGetDocuments failed with error: %s. Retrying: %s',
error,
shouldRetry
);
if (shouldRetry) {
return this.fetchDocuments(requestTag);
} else {
throw error;
}
} finally {
logger(
'DocumentReader.fetchDocuments',
requestTag,
'Received %d results',
resultCount
);
}
}
}
137 changes: 7 additions & 130 deletions dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {ExponentialBackoff, ExponentialBackoffSetting} from './backoff';
import {BulkWriter} from './bulk-writer';
import {BundleBuilder} from './bundle';
import {fieldsFromJson, timestampFromJson} from './convert';
import {DocumentReader} from './document-reader';
import {
DocumentSnapshot,
DocumentSnapshotBuilder,
Expand All @@ -34,14 +35,12 @@ import {
import {logger, setLibVersion} from './logger';
import {
DEFAULT_DATABASE_ID,
FieldPath,
QualifiedResourcePath,
ResourcePath,
validateResourcePath,
} from './path';
import {ClientPool} from './pool';
import {CollectionReference} from './reference';
import {DocumentReference} from './reference';
import {CollectionReference, DocumentReference} from './reference';
import {Serializer} from './serializer';
import {Timestamp} from './timestamp';
import {parseGetAllArguments, Transaction} from './transaction';
Expand Down Expand Up @@ -1080,138 +1079,16 @@ export class Firestore implements firestore.Firestore {
const stack = Error().stack!;

return this.initializeIfNeeded(tag)
.then(() => this.getAll_(documents, fieldMask, tag))
.then(() => {
const reader = new DocumentReader(this, documents);
reader.fieldMask = fieldMask || undefined;
return reader.get(tag);
})
.catch(err => {
throw wrapError(err, stack);
});
}

/**
* Internal method to retrieve multiple documents from Firestore, optionally
* as part of a transaction.
*
* @private
* @param docRefs The documents to receive.
* @param fieldMask An optional field mask to apply to this read.
* @param requestTag A unique client-assigned identifier for this request.
* @param transactionId The transaction ID to use for this read.
* @returns A Promise that contains an array with the resulting documents.
*/
getAll_<T>(
docRefs: Array<firestore.DocumentReference<T>>,
fieldMask: firestore.FieldPath[] | null,
requestTag: string,
transactionId?: Uint8Array
): Promise<Array<DocumentSnapshot<T>>> {
const requestedDocuments = new Set<string>();
const retrievedDocuments = new Map<string, DocumentSnapshot>();

for (const docRef of docRefs) {
requestedDocuments.add((docRef as DocumentReference<T>).formattedName);
}

const request: api.IBatchGetDocumentsRequest = {
database: this.formattedName,
transaction: transactionId,
documents: Array.from(requestedDocuments),
};

if (fieldMask) {
const fieldPaths = fieldMask.map(
fieldPath => (fieldPath as FieldPath).formattedName
);
request.mask = {fieldPaths};
}

return this.requestStream('batchGetDocuments', request, requestTag).then(
stream => {
return new Promise<Array<DocumentSnapshot<T>>>((resolve, reject) => {
stream
.on('error', err => {
logger(
'Firestore.getAll_',
requestTag,
'GetAll failed with error:',
err
);
reject(err);
})
.on('data', (response: api.IBatchGetDocumentsResponse) => {
try {
let document;

if (response.found) {
logger(
'Firestore.getAll_',
requestTag,
'Received document: %s',
response.found.name!
);
document = this.snapshot_(response.found, response.readTime!);
} else {
logger(
'Firestore.getAll_',
requestTag,
'Document missing: %s',
response.missing!
);
document = this.snapshot_(
response.missing!,
response.readTime!
);
}

const path = document.ref.path;
retrievedDocuments.set(path, document);
} catch (err) {
logger(
'Firestore.getAll_',
requestTag,
'GetAll failed with exception:',
err
);
reject(err);
}
})
.on('end', () => {
logger(
'Firestore.getAll_',
requestTag,
'Received %d results',
retrievedDocuments.size
);

// BatchGetDocuments doesn't preserve document order. We use
// the request order to sort the resulting documents.
const orderedDocuments: Array<DocumentSnapshot<T>> = [];

for (const docRef of docRefs) {
const document = retrievedDocuments.get(docRef.path);
if (document !== undefined) {
// Recreate the DocumentSnapshot with the DocumentReference
// containing the original converter.
const finalDoc = new DocumentSnapshotBuilder(
docRef as DocumentReference<T>
);
finalDoc.fieldsProto = document._fieldsProto;
finalDoc.readTime = document.readTime;
finalDoc.createTime = document.createTime;
finalDoc.updateTime = document.updateTime;
orderedDocuments.push(finalDoc.build());
} else {
reject(
new Error(`Did not receive document for "${docRef.path}".`)
);
}
}
resolve(orderedDocuments);
});
stream.resume();
});
}
);
}

/**
* Registers a listener on this client, incrementing the listener count. This
* is used to verify that all listeners are unsubscribed when terminate() is
Expand Down
25 changes: 8 additions & 17 deletions dev/src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import {
validateMinNumberOfArguments,
validateOptional,
} from './validate';

import {DocumentReader} from './document-reader';
import api = proto.google.firestore.v1;

/*!
Expand Down Expand Up @@ -125,16 +125,9 @@ export class Transaction implements firestore.Transaction {
}

if (refOrQuery instanceof DocumentReference) {
return this._firestore
.getAll_(
[refOrQuery],
/* fieldMask= */ null,
this._requestTag,
this._transactionId
)
.then(res => {
return Promise.resolve(res[0]);
});
const documentReader = new DocumentReader(this._firestore, [refOrQuery]);
documentReader.transactionId = this._transactionId;
return documentReader.get(this._requestTag).then(([res]) => res);
}

if (refOrQuery instanceof Query) {
Expand Down Expand Up @@ -191,12 +184,10 @@ export class Transaction implements firestore.Transaction {
documentRefsOrReadOptions
);

return this._firestore.getAll_(
documents,
fieldMask,
this._requestTag,
this._transactionId
);
const documentReader = new DocumentReader(this._firestore, documents);
documentReader.fieldMask = fieldMask || undefined;
documentReader.transactionId = this._transactionId;
return documentReader.get(this._requestTag);
}

/**
Expand Down
Loading

0 comments on commit b39dd3c

Please sign in to comment.