fix: fork closure in epoch proving jobs#23390
Conversation
spalladino
left a comment
There was a problem hiding this comment.
LGTM. Just left a few comments on possible refactors.
| // temporary stack to control fork lifetime | ||
| await using cleanup = new AsyncDisposableStack(); |
| private async processCheckpoints( | ||
| parallelism: number, | ||
| processCheckpoint: (checkpoint: Checkpoint) => Promise<void>, | ||
| ): Promise<void> { | ||
| let hasError = false; | ||
| let firstError: unknown; | ||
|
|
||
| await asyncPool(Math.max(parallelism, 1), this.checkpoints, async checkpoint => { | ||
| if (hasError || this.abortController.signal.aborted) { | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| this.checkState(); | ||
| await processCheckpoint(checkpoint); | ||
| } catch (err) { | ||
| if (!hasError) { | ||
| hasError = true; | ||
| firstError = err; | ||
| this.failProcessing(); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| if (hasError) { | ||
| throw firstError; | ||
| } | ||
|
|
||
| if (this.abortController.signal.aborted) { | ||
| this.checkState(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Sounds like this is something we could bake into the asyncPool helper directly?
There was a problem hiding this comment.
I was wondering the same thing, but I didn't because the pool was forked from some repo.
| return await execWithSignal( | ||
| () => processFn(), | ||
| processingSignal, | ||
| signal => | ||
| signal.reason?.name === 'TimeoutError' ? new PublicProcessorTimeoutError() : new PublicProcessorAbortError(), | ||
| ); | ||
| } | ||
|
|
||
| private getProcessingSignal(tx: Tx, deadline: Date | undefined, signal: AbortSignal | undefined) { | ||
| if (!deadline) { | ||
| return signal; | ||
| } | ||
|
|
||
| const timeout = +deadline - this.dateProvider.now(); | ||
| if (timeout <= 0) { | ||
| throw new PublicProcessorTimeoutError(); | ||
| } | ||
|
|
||
| const txHash = tx.getTxHash(); | ||
| this.log.debug(`Processing tx ${txHash.toString()} within ${timeout}ms`, { | ||
| deadline: deadline.toISOString(), | ||
| now: new Date(this.dateProvider.now()).toISOString(), | ||
| txHash, | ||
| }); | ||
|
|
||
| return await executeTimeout( | ||
| () => processFn(), | ||
| timeout, | ||
| () => new PublicProcessorTimeoutError(), | ||
| ); | ||
| const timeoutSignal = AbortSignal.timeout(timeout); | ||
| return signal ? AbortSignal.any([signal, timeoutSignal]) : timeoutSignal; | ||
| } |
There was a problem hiding this comment.
Seems like this is calling for an overload of executeTimeout or execWithSignal that has both timeout and signal?
There was a problem hiding this comment.
Honestly, I'd remove executeTimeout in favour of execWithSignal now that we can use AbortSignal.timeout in recent node versions.
Flakey Tests🤖 says: This CI run detected 1 tests that failed, but were tolerated due to a .test_patterns.yml entry. |
Handle fork lifetime correctly around checkpoints that might be cancelled part way through processing.