-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add onWriteError() and onWriteResult() handlers to BulkWriter #1315
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1315 +/- ##
==========================================
- Coverage 98.56% 98.55% -0.02%
==========================================
Files 32 32
Lines 19115 19274 +159
Branches 1373 1386 +13
==========================================
+ Hits 18841 18995 +154
- Misses 271 276 +5
Partials 3 3
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good. Before you address the feedback, we should see if we want to incorporate @bcoe's API suggestion.
dev/src/bulk-writer.ts
Outdated
const pendingOp = bulkCommitBatch.getLastOp(); | ||
resultPromise.catch(err => { | ||
const operation = new BulkWriterOperation( | ||
documentRef, | ||
'create', | ||
pendingOp | ||
); | ||
this.errorFn(err, operation); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, we would only attach our own Promise if errorFn
is defined, but that would require users to register an error
function before any other API call. I think that we could re-throw the original exception here though if the error callback is not defined. This can be a simple change if you change the default error function to just throw.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully follow along here. If we change the default function to throw, that would cause an UnhandledPromiseRejection
to surface again. Isn't the point of attaching a .catch()
to the existing promise to prevent the UnhandledPromiseRejection
from being thrown in the first place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like:
const defaultCatch = (e) => throw e;
set() {
this.op().catch(() => defaultCatch())
}
onError(errorFn) {
defaultCatch = errorFn;
}
This should avoid the "uncaught Promise rejection" handling only when onError
is called.
…e into bc/bulk-error
dev/src/bulk-writer.ts
Outdated
@@ -449,10 +574,10 @@ export class BulkWriter { | |||
* delete. | |||
* @param {Timestamp=} precondition.lastUpdateTime If set, enforces that the | |||
* document was last updated at lastUpdateTime. Fails the batch if the | |||
* document doesn't exist or was last updated at a different time. | |||
* document doesn't exist or was last updatedra at a different time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo slipped in here
dev/src/bulk-writer.ts
Outdated
if (this.batchQueue.length > 0) { | ||
await this._flush(); | ||
await this.closedDeferred.promise; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I see how it's possible for this to be the case after awaiting the flush, above, because it's not possible to add retries after this.closeCalled = true
, right?
If it is expected, does this need to be in a loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I now see that verifyNotClosed
only throws if fully closed when called from retry
. In which case, I think my second question still stands. Do we know that the await this._flush()
won't cause more retries? Do we either need to do this in a loop or at least await another flush after this.closeDeferred.promise
resolves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use Promise chaining here as proposed in the diff above. Instead of requiring void
as the return type of onWriteError
, we can require the user to call retry
and to pass the result of retry
to use via the error handler. The signature would then be:
onWriteError((err:BulkWriterError) => Promise<WriteResult>)
Once we do that, we can chain the result of this error callback and block on the retry attempt in the flush call.
If we don't do this, we have to somehow ensure that all error callbacks have fired once we process the results. This would be a bit tricky and may not port to Java or other languages:
error.reject(...);
setImmediate(() => {
// This only works in Node, but due to the way that ticks are processed in the VM, we can use a `setImmediate`
// to ensure that our code only runs once all Promise chains are processed.
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be elegant, I think. The ultimate result of the original write would become the result of the retry. It would just be important to document well so that callers don't forget to return the result of retry()
from their error handler or the retry promise would end up unhandled and the original write would resolve with undefined.
@@ -40,6 +40,9 @@ import { | |||
validateInteger, | |||
validateOptional, | |||
} from './validate'; | |||
// TODO: How do I get rid of this? | |||
// eslint-disable-next-line no-undef |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://gist.github.com/schmidt-sebastian/ebb63b96c048f3c43187edac7523f9c4
The idea here is that you can use GrpcStatus
in the typings file and the Gax status in code. They are interchangeable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a necessary evil unfortunately.
dev/src/bulk-writer.ts
Outdated
readonly operation: BulkWriterOperation, | ||
readonly code: GrpcStatus, | ||
readonly message: string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these show up in JSDoc? Should we add comments?
dev/src/bulk-writer.ts
Outdated
/** | ||
* Whether this BulkWriter instance is closed. Once closed, it cannot be | ||
* opened again. | ||
*/ | ||
private closed = false; | ||
private isClosed = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to align this with the status codes in BulkWriterBatch and use an enum (either a new enum or a specific enum for BulkWriter (open/closing/closed)).
dev/src/bulk-writer.ts
Outdated
|
||
/** @hideconstructor */ | ||
constructor( | ||
private readonly firestore: Firestore, | ||
options?: firestore.BulkWriterOptions | ||
) { | ||
this.firestore._incrementBulkWritersCount(); | ||
this.errorFn = () => {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By default, you probably want this error function to throw. You still want "Unhandled Promise Rejections" when there is no error handler and no other callback attached.
dev/src/bulk-writer.ts
Outdated
.catch(err => { | ||
this.errorFn(err); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want to return the Promise that has the error handler attached. That way, the default error handler can re-throw and any user specified error handler can prevent Unhandled Promise Rejections.
dev/src/bulk-writer.ts
Outdated
return this._flush(); | ||
} | ||
|
||
private async _flush(): Promise<void> { | ||
const trackedBatches = this.batchQueue; | ||
const writePromises = trackedBatches.map(batch => batch.awaitBulkCommit()); | ||
this.sendReadyBatches(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Random thought: Should the already build-in retry use the new Public APIs? This would simplify some of the code a bit (https://gist.github.com/schmidt-sebastian/302c19baa2eda89b23435da41afb077f) and it would allow us to use consistent backoff for all retries.
dev/src/bulk-writer.ts
Outdated
if (this.batchQueue.length > 0) { | ||
await this._flush(); | ||
await this.closedDeferred.promise; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use Promise chaining here as proposed in the diff above. Instead of requiring void
as the return type of onWriteError
, we can require the user to call retry
and to pass the result of retry
to use via the error handler. The signature would then be:
onWriteError((err:BulkWriterError) => Promise<WriteResult>)
Once we do that, we can chain the result of this error callback and block on the retry attempt in the flush call.
If we don't do this, we have to somehow ensure that all error callbacks have fired once we process the results. This would be a bit tricky and may not port to Java or other languages:
error.reject(...);
setImmediate(() => {
// This only works in Node, but due to the way that ticks are processed in the VM, we can use a `setImmediate`
// to ensure that our code only runs once all Promise chains are processed.
});
dev/src/bulk-writer.ts
Outdated
private verifyNotClosed(checkIsClosed = false): void { | ||
if (!checkIsClosed && this.closeCalled) { | ||
throw new Error('BulkWriter has already been closed.'); | ||
} | ||
|
||
if (checkIsClosed && this.isClosed) { | ||
throw new Error('BulkWriter has already been closed.'); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be two different functions?
@tilgovi Quick update -- Still working on it in a separate branch #1330. At this point, I have the auto-retry functionality working. |
Remove usage of BulkWriterOperation
All (the pull request submitter and all commit authors) CLAs are signed, but one or more commits were authored or co-authored by someone other than the pull request submitter. We need to confirm that all authors are ok with their commits being contributed to this project. Please have them confirm that by leaving a comment that contains only Note to project maintainer: There may be cases where the author cannot leave a comment, or the comment is not properly detected as consent. In those cases, you can manually confirm consent of the commit author(s), and set the ℹ️ Googlers: Go here for more info. |
9b4fd13
to
166952b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whee. Very nice. Some nits left but it looks like we are basically good to go
@@ -40,6 +40,9 @@ import { | |||
validateInteger, | |||
validateOptional, | |||
} from './validate'; | |||
// TODO: How do I get rid of this? | |||
// eslint-disable-next-line no-undef |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please address.
import { | ||
invalidArgumentMessage, | ||
validateInteger, | ||
validateOptional, | ||
} from './validate'; | ||
// TODO(chenbrian): Figure some way to get rid of this. | ||
// eslint-disable-next-line no-undef | ||
import GrpcStatus = FirebaseFirestore.GrpcStatus; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this line below all other import. This will help us pretend that we care about import statement order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done :)
dev/src/bulk-writer.ts
Outdated
assert( | ||
this.state === BatchState.OPEN, | ||
'Batch should be OPEN when adding writes' | ||
); | ||
const deferred = new Deferred<BatchWriteResult>(); | ||
this.pendingOps.push({ | ||
writeBatchIndex: this.opCount, | ||
key: documentRef.path, | ||
deferred: deferred, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could pendingOps
be an array of writeBatchIndex to deferred (and not a list of these elements)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
/** | ||
* Schedules and runs the provided operation. | ||
*/ | ||
private async _executeWrite( | ||
documentRef: firestore.DocumentReference, | ||
operationType: 'create' | 'set' | 'update' | 'delete', | ||
operationFn: (bulkCommitBatch: BulkCommitBatch) => Promise<WriteResult> | ||
): Promise<WriteResult> { | ||
// A deferred promise that resolves when operationFn completes. | ||
const operationCompletedDeferred = new Deferred<void>(); | ||
this._pendingOps.add(operationCompletedDeferred.promise); | ||
try { | ||
for (let failedAttempts = 0; ; ++failedAttempts) { | ||
const batchQueue = | ||
failedAttempts > 0 ? this._retryBatchQueue : this._batchQueue; | ||
const bulkCommitBatch = this.getEligibleBatch(batchQueue); | ||
|
||
// Send ready batches if this is the first attempt. Subsequent retry | ||
// batches are scheduled after the initial batch returns. | ||
if (failedAttempts === 0) { | ||
this.sendReadyBatches(batchQueue); | ||
} | ||
|
||
try { | ||
const operationResult = await operationFn(bulkCommitBatch); | ||
this._successFn(documentRef, operationResult); | ||
return operationResult; | ||
} catch (error) { | ||
const bulkWriterError = new BulkWriterError( | ||
error.code, | ||
error.message, | ||
documentRef, | ||
operationType, | ||
failedAttempts | ||
); | ||
const shouldRetry = this._errorFn(bulkWriterError); | ||
logger( | ||
'BulkWriter.errorFn', | ||
null, | ||
'Running error callback on error code:', | ||
error.code, | ||
', shouldRetry:', | ||
shouldRetry | ||
); | ||
if (!shouldRetry) { | ||
throw bulkWriterError; | ||
} | ||
} | ||
} | ||
} finally { | ||
operationCompletedDeferred.resolve(); | ||
this._pendingOps.delete(operationCompletedDeferred.promise); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My last effort cleanup attempt:
/** | |
* Schedules and runs the provided operation. | |
*/ | |
private async _executeWrite( | |
documentRef: firestore.DocumentReference, | |
operationType: 'create' | 'set' | 'update' | 'delete', | |
operationFn: (bulkCommitBatch: BulkCommitBatch) => Promise<WriteResult> | |
): Promise<WriteResult> { | |
// A deferred promise that resolves when operationFn completes. | |
const operationCompletedDeferred = new Deferred<void>(); | |
this._pendingOps.add(operationCompletedDeferred.promise); | |
try { | |
for (let failedAttempts = 0; ; ++failedAttempts) { | |
const batchQueue = | |
failedAttempts > 0 ? this._retryBatchQueue : this._batchQueue; | |
const bulkCommitBatch = this.getEligibleBatch(batchQueue); | |
// Send ready batches if this is the first attempt. Subsequent retry | |
// batches are scheduled after the initial batch returns. | |
if (failedAttempts === 0) { | |
this.sendReadyBatches(batchQueue); | |
} | |
try { | |
const operationResult = await operationFn(bulkCommitBatch); | |
this._successFn(documentRef, operationResult); | |
return operationResult; | |
} catch (error) { | |
const bulkWriterError = new BulkWriterError( | |
error.code, | |
error.message, | |
documentRef, | |
operationType, | |
failedAttempts | |
); | |
const shouldRetry = this._errorFn(bulkWriterError); | |
logger( | |
'BulkWriter.errorFn', | |
null, | |
'Running error callback on error code:', | |
error.code, | |
', shouldRetry:', | |
shouldRetry | |
); | |
if (!shouldRetry) { | |
throw bulkWriterError; | |
} | |
} | |
} | |
} finally { | |
operationCompletedDeferred.resolve(); | |
this._pendingOps.delete(operationCompletedDeferred.promise); | |
} | |
} | |
/** | |
* Schedules and runs the provided operation. | |
*/ | |
private async _executeWrite( | |
documentRef: firestore.DocumentReference, | |
operationType: 'create' | 'set' | 'update' | 'delete', | |
operationFn: (bulkCommitBatch: BulkCommitBatch) => Promise<WriteResult> | |
): Promise<WriteResult> { | |
let result : Promise<WriteResult> | undefined; | |
// A deferred promise that resolves when operationFn completes. | |
const operationCompletedDeferred = new Deferred<void>(); | |
this._pendingOps.add(operationCompletedDeferred.promise); | |
let batchQueue = this._batchQueue; | |
this.sendReadyBatches(batchQueue); | |
for (let failedAttempts = 0; result != undefined; ++failedAttempts) { | |
const bulkCommitBatch = this.getEligibleBatch(batchQueue); | |
try { | |
const writeResult = await operationFn(bulkCommitBatch); | |
this._successFn(documentRef, writeResult); | |
result = Promise.resolve(writeResult); | |
} catch (error) { | |
const bulkWriterError = new BulkWriterError( | |
error.code, | |
error.message, | |
documentRef, | |
operationType, | |
failedAttempts | |
); | |
const shouldRetry = this._errorFn(bulkWriterError); | |
logger( | |
'BulkWriter.errorFn', | |
null, | |
'Running error callback on error code:', | |
error.code, | |
', shouldRetry:', | |
shouldRetry | |
); | |
if (shouldRetry) { | |
batchQueue = this._retryBatchQueue; | |
} else { | |
result = Promise.reject(bulkWriterError); | |
} | |
} | |
} | |
result.then(() => { | |
operationCompletedDeferred.resolve(); | |
this._pendingOps.delete(operationCompletedDeferred.promise); | |
}); | |
return result!; | |
} |
Let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
post discussion: The main issue with this implementation is that if the user error callback throws an error, the deferred operation will never be resolved. The double try/catch blocks, while ugly, serve to ensure that the deferred operation is always removed.
Co-authored-by: Sebastian Schmidt <mrschmidt@google.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whee!
Fixes #1282.
onWriteError()
error andonWriteResult()
success handlers to BulkWriter instances.BulkWriter.retry(operation: BulkWriterOperation)
method to retry failed operations.BulkWriterError
that contain an underlyingBulkWriterOperation
.