diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index 316e7f995fc3..279513d2d1d1 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -108,7 +108,7 @@ import { FullNodeCheckpointsBuilder, NodeKeystoreAdapter, ValidatorClient, - createBlockProposalHandler, + createProposalHandler, createValidatorClient, } from '@aztec/validator-client'; import { createWorldStateSynchronizer } from '@aztec/world-state'; @@ -390,19 +390,21 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { } } - // If there's no validator client, create a BlockProposalHandler to handle block proposals + // If there's no validator client, create a ProposalHandler to handle block and checkpoint proposals // for monitoring or reexecution. Reexecution (default) allows us to follow the pending chain, // while non-reexecution is used for validating the proposals and collecting their txs. + // Checkpoint proposals are handled if the blob client can upload blobs. if (!validatorClient) { const reexecute = !!config.alwaysReexecuteBlockProposals; - log.info(`Setting up block proposal handler` + (reexecute ? ' with reexecution of proposals' : '')); - createBlockProposalHandler(config, { + log.info(`Setting up proposal handler` + (reexecute ? ' with reexecution of proposals' : '')); + createProposalHandler(config, { checkpointsBuilder: validatorCheckpointsBuilder, worldState: worldStateSynchronizer, epochCache, blockSource: archiver, l1ToL2MessageSource: archiver, p2pClient, + blobClient, dateProvider, telemetry, }).register(p2pClient, reexecute); diff --git a/yarn-project/validator-client/src/factory.ts b/yarn-project/validator-client/src/factory.ts index 6c706c5dc855..79c8dc36166e 100644 --- a/yarn-project/validator-client/src/factory.ts +++ b/yarn-project/validator-client/src/factory.ts @@ -8,12 +8,12 @@ import type { ValidatorClientFullConfig, WorldStateSynchronizer } from '@aztec/s import type { L1ToL2MessageSource } from '@aztec/stdlib/messaging'; import type { TelemetryClient } from '@aztec/telemetry-client'; -import { BlockProposalHandler } from './block_proposal_handler.js'; import type { FullNodeCheckpointsBuilder } from './checkpoint_builder.js'; import { ValidatorMetrics } from './metrics.js'; +import { ProposalHandler } from './proposal_handler.js'; import { ValidatorClient } from './validator.js'; -export function createBlockProposalHandler( +export function createProposalHandler( config: ValidatorClientFullConfig, deps: { checkpointsBuilder: FullNodeCheckpointsBuilder; @@ -22,6 +22,7 @@ export function createBlockProposalHandler( l1ToL2MessageSource: L1ToL2MessageSource; p2pClient: P2PClient; epochCache: EpochCache; + blobClient: BlobClientInterface; dateProvider: DateProvider; telemetry: TelemetryClient; }, @@ -31,7 +32,7 @@ export function createBlockProposalHandler( txsPermitted: !config.disableTransactions, maxTxsPerBlock: config.validateMaxTxsPerBlock ?? config.validateMaxTxsPerCheckpoint, }); - return new BlockProposalHandler( + return new ProposalHandler( deps.checkpointsBuilder, deps.worldState, deps.blockSource, @@ -40,6 +41,7 @@ export function createBlockProposalHandler( blockProposalValidator, deps.epochCache, config, + deps.blobClient, metrics, deps.dateProvider, deps.telemetry, diff --git a/yarn-project/validator-client/src/index.ts b/yarn-project/validator-client/src/index.ts index e1bb317f9f81..1cef663abc9b 100644 --- a/yarn-project/validator-client/src/index.ts +++ b/yarn-project/validator-client/src/index.ts @@ -1,4 +1,4 @@ -export * from './block_proposal_handler.js'; +export * from './proposal_handler.js'; export * from './checkpoint_builder.js'; export * from './config.js'; export * from './factory.js'; diff --git a/yarn-project/validator-client/src/metrics.ts b/yarn-project/validator-client/src/metrics.ts index 160ac8c17280..7142399023df 100644 --- a/yarn-project/validator-client/src/metrics.ts +++ b/yarn-project/validator-client/src/metrics.ts @@ -11,7 +11,7 @@ import { createUpDownCounterWithDefault, } from '@aztec/telemetry-client'; -import type { BlockProposalValidationFailureReason } from './block_proposal_handler.js'; +import type { BlockProposalValidationFailureReason } from './proposal_handler.js'; export class ValidatorMetrics { private failedReexecutionCounter: UpDownCounter; diff --git a/yarn-project/validator-client/src/block_proposal_handler.ts b/yarn-project/validator-client/src/proposal_handler.ts similarity index 67% rename from yarn-project/validator-client/src/block_proposal_handler.ts rename to yarn-project/validator-client/src/proposal_handler.ts index 1582c74b334c..92733970caf4 100644 --- a/yarn-project/validator-client/src/block_proposal_handler.ts +++ b/yarn-project/validator-client/src/proposal_handler.ts @@ -1,20 +1,29 @@ +import type { BlobClientInterface } from '@aztec/blob-client/client'; +import { type Blob, getBlobsPerL1Block } from '@aztec/blob-lib'; import { INITIAL_L2_BLOCK_NUM } from '@aztec/constants'; import type { EpochCache } from '@aztec/epoch-cache'; +import { validateFeeAssetPriceModifier } from '@aztec/ethereum/contracts'; import { BlockNumber, CheckpointNumber, SlotNumber } from '@aztec/foundation/branded-types'; import { pick } from '@aztec/foundation/collection'; import { Fr } from '@aztec/foundation/curves/bn254'; import { TimeoutError } from '@aztec/foundation/error'; +import type { LogData } from '@aztec/foundation/log'; import { createLogger } from '@aztec/foundation/log'; import { retryUntil } from '@aztec/foundation/retry'; import { DateProvider, Timer } from '@aztec/foundation/timer'; import type { P2P, PeerId } from '@aztec/p2p'; import { BlockProposalValidator } from '@aztec/p2p/msg_validators'; import type { BlockData, L2Block, L2BlockSink, L2BlockSource } from '@aztec/stdlib/block'; +import { validateCheckpoint } from '@aztec/stdlib/checkpoint'; import { getEpochAtSlot, getTimestampForSlot } from '@aztec/stdlib/epoch-helpers'; import { Gas } from '@aztec/stdlib/gas'; import type { ITxProvider, ValidatorClientFullConfig, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; -import { type L1ToL2MessageSource, computeInHashFromL1ToL2Messages } from '@aztec/stdlib/messaging'; -import type { BlockProposal } from '@aztec/stdlib/p2p'; +import { + type L1ToL2MessageSource, + accumulateCheckpointOutHashes, + computeInHashFromL1ToL2Messages, +} from '@aztec/stdlib/messaging'; +import type { BlockProposal, CheckpointProposalCore } from '@aztec/stdlib/p2p'; import { MerkleTreeId } from '@aztec/stdlib/trees'; import type { CheckpointGlobalVariables, FailedTx, Tx } from '@aztec/stdlib/tx'; import { @@ -66,11 +75,14 @@ export type BlockProposalValidationFailureResult = { export type BlockProposalValidationResult = BlockProposalValidationSuccessResult | BlockProposalValidationFailureResult; +export type CheckpointProposalValidationResult = { isValid: true } | { isValid: false; reason: string }; + type CheckpointComputationResult = | { checkpointNumber: CheckpointNumber; reason?: undefined } | { checkpointNumber?: undefined; reason: 'invalid_proposal' | 'global_variables_mismatch' }; -export class BlockProposalHandler { +/** Handles block and checkpoint proposals for both validator and non-validator nodes. */ +export class ProposalHandler { public readonly tracer: Tracer; constructor( @@ -82,21 +94,26 @@ export class BlockProposalHandler { private blockProposalValidator: BlockProposalValidator, private epochCache: EpochCache, private config: ValidatorClientFullConfig, + private blobClient: BlobClientInterface, private metrics?: ValidatorMetrics, private dateProvider: DateProvider = new DateProvider(), telemetry: TelemetryClient = getTelemetryClient(), - private log = createLogger('validator:block-proposal-handler'), + private log = createLogger('validator:proposal-handler'), ) { if (config.fishermanMode) { this.log = this.log.createChild('[FISHERMAN]'); } - this.tracer = telemetry.getTracer('BlockProposalHandler'); + this.tracer = telemetry.getTracer('ProposalHandler'); } - register(p2pClient: P2P, shouldReexecute: boolean): BlockProposalHandler { + /** + * Registers non-validator handlers for block and checkpoint proposals on the p2p client. + * Block proposals are always registered. Checkpoint proposals are registered if the blob client can upload. + */ + register(p2pClient: P2P, shouldReexecute: boolean): ProposalHandler { // Non-validator handler that processes or re-executes for monitoring but does not attest. // Returns boolean indicating whether the proposal was valid. - const handler = async (proposal: BlockProposal, proposalSender: PeerId): Promise => { + const blockHandler = async (proposal: BlockProposal, proposalSender: PeerId): Promise => { try { const { slotNumber, blockNumber } = proposal; const result = await this.handleBlockProposal(proposal, proposalSender, shouldReexecute); @@ -123,7 +140,35 @@ export class BlockProposalHandler { } }; - p2pClient.registerBlockProposalHandler(handler); + p2pClient.registerBlockProposalHandler(blockHandler); + + // Register checkpoint proposal handler if blob uploads are enabled and we are reexecuting + if (this.blobClient.canUpload() && shouldReexecute) { + const checkpointHandler = async (checkpoint: CheckpointProposalCore, _sender: PeerId) => { + try { + const proposalInfo = { + proposalSlotNumber: checkpoint.slotNumber, + archive: checkpoint.archive.toString(), + proposer: checkpoint.getSender()?.toString(), + }; + const result = await this.handleCheckpointProposal(checkpoint, proposalInfo); + if (result.isValid) { + this.log.info(`Non-validator checkpoint proposal at slot ${checkpoint.slotNumber} handled`, proposalInfo); + } else { + this.log.warn( + `Non-validator checkpoint proposal at slot ${checkpoint.slotNumber} failed: ${result.reason}`, + proposalInfo, + ); + } + } catch (error) { + this.log.error('Error processing checkpoint proposal in non-validator handler', error); + } + // Non-validators don't attest + return undefined; + }; + p2pClient.registerCheckpointProposalHandler(checkpointHandler); + } + return this; } @@ -623,4 +668,233 @@ export class BlockProposalHandler { totalManaUsed, }; } + + /** + * Validates a checkpoint proposal and uploads blobs if configured. + * Used by both non-validator nodes (via register) and the validator client (via delegation). + */ + async handleCheckpointProposal( + proposal: CheckpointProposalCore, + proposalInfo: LogData, + ): Promise { + const proposer = proposal.getSender(); + if (!proposer) { + this.log.warn(`Received checkpoint proposal with invalid signature for slot ${proposal.slotNumber}`); + return { isValid: false, reason: 'invalid_signature' }; + } + + if (!validateFeeAssetPriceModifier(proposal.feeAssetPriceModifier)) { + this.log.warn( + `Received checkpoint proposal with invalid feeAssetPriceModifier ${proposal.feeAssetPriceModifier} for slot ${proposal.slotNumber}`, + ); + return { isValid: false, reason: 'invalid_fee_asset_price_modifier' }; + } + + const result = await this.validateCheckpointProposal(proposal, proposalInfo); + + // Upload blobs to filestore if validation passed (fire and forget) + if (result.isValid) { + this.tryUploadBlobsForCheckpoint(proposal, proposalInfo); + } + + return result; + } + + /** + * Validates a checkpoint proposal by building the full checkpoint and comparing it with the proposal. + * @returns Validation result with isValid flag and reason if invalid. + */ + async validateCheckpointProposal( + proposal: CheckpointProposalCore, + proposalInfo: LogData, + ): Promise { + const slot = proposal.slotNumber; + + // Timeout block syncing at the start of the next slot + const config = this.checkpointsBuilder.getConfig(); + const nextSlotTimestampSeconds = Number(getTimestampForSlot(SlotNumber(slot + 1), config)); + const timeoutSeconds = Math.max(1, nextSlotTimestampSeconds - Math.floor(this.dateProvider.now() / 1000)); + + // Wait for last block to sync by archive + let lastBlockHeader; + try { + lastBlockHeader = await retryUntil( + async () => { + await this.blockSource.syncImmediate(); + return this.blockSource.getBlockHeaderByArchive(proposal.archive); + }, + `waiting for block with archive ${proposal.archive.toString()} for slot ${slot}`, + timeoutSeconds, + 0.5, + ); + } catch (err) { + if (err instanceof TimeoutError) { + this.log.warn(`Timed out waiting for block with archive matching checkpoint proposal`, proposalInfo); + return { isValid: false, reason: 'last_block_not_found' }; + } + this.log.error(`Error fetching last block for checkpoint proposal`, err, proposalInfo); + return { isValid: false, reason: 'block_fetch_error' }; + } + + if (!lastBlockHeader) { + this.log.warn(`Last block not found for checkpoint proposal`, proposalInfo); + return { isValid: false, reason: 'last_block_not_found' }; + } + + // Get all full blocks for the slot and checkpoint + const blocks = await this.blockSource.getBlocksForSlot(slot); + if (blocks.length === 0) { + this.log.warn(`No blocks found for slot ${slot}`, proposalInfo); + return { isValid: false, reason: 'no_blocks_for_slot' }; + } + + // Ensure the last block for this slot matches the archive in the checkpoint proposal + if (!blocks.at(-1)?.archive.root.equals(proposal.archive)) { + this.log.warn(`Last block archive mismatch for checkpoint proposal`, proposalInfo); + return { isValid: false, reason: 'last_block_archive_mismatch' }; + } + + this.log.debug(`Found ${blocks.length} blocks for slot ${slot}`, { + ...proposalInfo, + blockNumbers: blocks.map(b => b.number), + }); + + // Get checkpoint constants from first block + const firstBlock = blocks[0]; + const constants = this.extractCheckpointConstants(firstBlock); + const checkpointNumber = firstBlock.checkpointNumber; + + // Get L1-to-L2 messages for this checkpoint + const l1ToL2Messages = await this.l1ToL2MessageSource.getL1ToL2Messages(checkpointNumber); + + // Collect the out hashes of all the checkpoints before this one in the same epoch + const epoch = getEpochAtSlot(slot, this.epochCache.getL1Constants()); + const previousCheckpointOutHashes = (await this.blockSource.getCheckpointsDataForEpoch(epoch)) + .filter(c => c.checkpointNumber < checkpointNumber) + .map(c => c.checkpointOutHash); + + // Fork world state at the block before the first block + const parentBlockNumber = BlockNumber(firstBlock.number - 1); + const fork = await this.worldState.fork(parentBlockNumber); + + try { + // Create checkpoint builder with all existing blocks + const checkpointBuilder = await this.checkpointsBuilder.openCheckpoint( + checkpointNumber, + constants, + proposal.feeAssetPriceModifier, + l1ToL2Messages, + previousCheckpointOutHashes, + fork, + blocks, + this.log.getBindings(), + ); + + // Complete the checkpoint to get computed values + const computedCheckpoint = await checkpointBuilder.completeCheckpoint(); + + // Compare checkpoint header with proposal + if (!computedCheckpoint.header.equals(proposal.checkpointHeader)) { + this.log.warn(`Checkpoint header mismatch`, { + ...proposalInfo, + computed: computedCheckpoint.header.toInspect(), + proposal: proposal.checkpointHeader.toInspect(), + }); + return { isValid: false, reason: 'checkpoint_header_mismatch' }; + } + + // Compare archive root with proposal + if (!computedCheckpoint.archive.root.equals(proposal.archive)) { + this.log.warn(`Archive root mismatch`, { + ...proposalInfo, + computed: computedCheckpoint.archive.root.toString(), + proposal: proposal.archive.toString(), + }); + return { isValid: false, reason: 'archive_mismatch' }; + } + + // Check that the accumulated epoch out hash matches the value in the proposal. + // The epoch out hash is the accumulated hash of all checkpoint out hashes in the epoch. + const checkpointOutHash = computedCheckpoint.getCheckpointOutHash(); + const computedEpochOutHash = accumulateCheckpointOutHashes([...previousCheckpointOutHashes, checkpointOutHash]); + const proposalEpochOutHash = proposal.checkpointHeader.epochOutHash; + if (!computedEpochOutHash.equals(proposalEpochOutHash)) { + this.log.warn(`Epoch out hash mismatch`, { + proposalEpochOutHash: proposalEpochOutHash.toString(), + computedEpochOutHash: computedEpochOutHash.toString(), + checkpointOutHash: checkpointOutHash.toString(), + previousCheckpointOutHashes: previousCheckpointOutHashes.map(h => h.toString()), + ...proposalInfo, + }); + return { isValid: false, reason: 'out_hash_mismatch' }; + } + + // Final round of validations on the checkpoint, just in case. + try { + validateCheckpoint(computedCheckpoint, { + rollupManaLimit: this.checkpointsBuilder.getConfig().rollupManaLimit, + maxDABlockGas: this.config.validateMaxDABlockGas, + maxL2BlockGas: this.config.validateMaxL2BlockGas, + maxTxsPerBlock: this.config.validateMaxTxsPerBlock, + maxTxsPerCheckpoint: this.config.validateMaxTxsPerCheckpoint, + }); + } catch (err) { + this.log.warn(`Checkpoint validation failed: ${err}`, proposalInfo); + return { isValid: false, reason: 'checkpoint_validation_failed' }; + } + + this.log.verbose(`Checkpoint proposal validation successful for slot ${slot}`, proposalInfo); + return { isValid: true }; + } finally { + await fork.close(); + } + } + + /** Extracts checkpoint global variables from a block. */ + private extractCheckpointConstants(block: L2Block): CheckpointGlobalVariables { + const gv = block.header.globalVariables; + return { + chainId: gv.chainId, + version: gv.version, + slotNumber: gv.slotNumber, + timestamp: gv.timestamp, + coinbase: gv.coinbase, + feeRecipient: gv.feeRecipient, + gasFees: gv.gasFees, + }; + } + + /** Triggers blob upload for a checkpoint if the blob client can upload (fire and forget). */ + protected tryUploadBlobsForCheckpoint(proposal: CheckpointProposalCore, proposalInfo: LogData): void { + if (this.blobClient.canUpload()) { + void this.uploadBlobsForCheckpoint(proposal, proposalInfo); + } + } + + /** Uploads blobs for a checkpoint to the filestore. */ + protected async uploadBlobsForCheckpoint(proposal: CheckpointProposalCore, proposalInfo: LogData): Promise { + try { + const lastBlockHeader = await this.blockSource.getBlockHeaderByArchive(proposal.archive); + if (!lastBlockHeader) { + this.log.warn(`Failed to get last block header for blob upload`, proposalInfo); + return; + } + + const blocks = await this.blockSource.getBlocksForSlot(proposal.slotNumber); + if (blocks.length === 0) { + this.log.warn(`No blocks found for blob upload`, proposalInfo); + return; + } + + const blobFields = blocks.flatMap(b => b.toBlobFields()); + const blobs: Blob[] = await getBlobsPerL1Block(blobFields); + await this.blobClient.sendBlobsToFilestore(blobs); + this.log.debug(`Uploaded ${blobs.length} blobs to filestore for checkpoint at slot ${proposal.slotNumber}`, { + ...proposalInfo, + numBlobs: blobs.length, + }); + } catch (err) { + this.log.warn(`Failed to upload blobs for checkpoint: ${err}`, proposalInfo); + } + } } diff --git a/yarn-project/validator-client/src/validator.ha.integration.test.ts b/yarn-project/validator-client/src/validator.ha.integration.test.ts index 80c7bd532974..185b9c734556 100644 --- a/yarn-project/validator-client/src/validator.ha.integration.test.ts +++ b/yarn-project/validator-client/src/validator.ha.integration.test.ts @@ -33,12 +33,12 @@ import { afterEach, beforeEach, describe, expect, it } from '@jest/globals'; import { type MockProxy, mock } from 'jest-mock-extended'; import { type PrivateKeyAccount, generatePrivateKey, privateKeyToAccount } from 'viem/accounts'; -import { BlockProposalHandler } from './block_proposal_handler.js'; import type { FullNodeCheckpointsBuilder } from './checkpoint_builder.js'; import type { ValidatorClientConfig } from './config.js'; import { HAKeyStore } from './key_store/ha_key_store.js'; import { NodeKeystoreAdapter } from './key_store/node_keystore_adapter.js'; import { ValidatorMetrics } from './metrics.js'; +import { ProposalHandler } from './proposal_handler.js'; import { ValidatorClient } from './validator.js'; describe('ValidatorClient HA Integration', () => { @@ -195,7 +195,7 @@ describe('ValidatorClient HA Integration', () => { txsPermitted: true, maxTxsPerBlock: undefined, }); - const blockProposalHandler = new BlockProposalHandler( + const proposalHandler = new ProposalHandler( checkpointsBuilder, worldState, blockSource, @@ -204,6 +204,7 @@ describe('ValidatorClient HA Integration', () => { blockProposalValidator, epochCache, config, + blobClient, metrics, dateProvider, getTelemetryClient(), @@ -215,13 +216,10 @@ describe('ValidatorClient HA Integration', () => { haKeyStore, epochCache, p2pClient, - blockProposalHandler, - blockSource, - checkpointsBuilder, - worldState, - l1ToL2MessageSource, + proposalHandler, config, blobClient, + haSigner, dateProvider, getTelemetryClient(), ) as ValidatorClient; diff --git a/yarn-project/validator-client/src/validator.test.ts b/yarn-project/validator-client/src/validator.test.ts index 52d8916c5a2f..b905f049f041 100644 --- a/yarn-project/validator-client/src/validator.test.ts +++ b/yarn-project/validator-client/src/validator.test.ts @@ -51,6 +51,7 @@ import type { } from './checkpoint_builder.js'; import { type ValidatorClientConfig, validatorClientConfigMappings } from './config.js'; import { HAKeyStore } from './key_store/ha_key_store.js'; +import { ProposalHandler } from './proposal_handler.js'; import { ValidatorClient } from './validator.js'; function makeKeyStore(validator: { @@ -84,7 +85,7 @@ describe('ValidatorClient', () => { > & { disableTransactions: boolean; }; - let validatorClient: TestValidatorClient; + let validatorClient: ValidatorClient; let p2pClient: MockProxy; let blockSource: MockProxy; let l1ToL2MessageSource: MockProxy; @@ -92,6 +93,7 @@ describe('ValidatorClient', () => { let checkpointsBuilder: MockProxy; let worldState: MockProxy; let validatorAccounts: PrivateKeyAccount[]; + let validatorPrivateKeys: ReturnType[]; let dateProvider: TestDateProvider; let txProvider: MockProxy; let keyStoreManager: KeystoreManager; @@ -135,7 +137,7 @@ describe('ValidatorClient', () => { haKeyStore.start.mockImplementation(() => Promise.resolve()); haKeyStore.stop.mockImplementation(() => Promise.resolve()); - const validatorPrivateKeys = [generatePrivateKey(), generatePrivateKey()]; + validatorPrivateKeys = [generatePrivateKey(), generatePrivateKey()]; validatorAccounts = validatorPrivateKeys.map(privateKey => privateKeyToAccount(privateKey)); haKeyStore.getAddresses.mockReturnValue(validatorAccounts.map(account => EthAddress.fromString(account.address))); @@ -172,7 +174,7 @@ describe('ValidatorClient', () => { keyStoreManager, blobClient, dateProvider, - )) as TestValidatorClient; + )) as ValidatorClient; }); describe('createBlockProposal', () => { @@ -386,10 +388,27 @@ describe('ValidatorClient', () => { expect(isValid).toBe(true); }); + it('should process block proposal from own validator key (HA peer)', async () => { + const selfSigner = new Secp256k1Signer(Buffer32.fromString(validatorPrivateKeys[0])); + const emptyInHash = computeInHashFromL1ToL2Messages([]); + const selfProposal = await makeBlockProposal({ + blockHeader: proposal.blockHeader, + inHash: emptyInHash, + signer: selfSigner, + }); + + epochCache.getProposerAttesterAddressInSlot.mockResolvedValue(selfSigner.address); + + const handleSpy = jest.spyOn(validatorClient.getProposalHandler(), 'handleBlockProposal'); + const isValid = await validatorClient.validateBlockProposal(selfProposal, sender); + expect(isValid).toBe(true); + expect(handleSpy).toHaveBeenCalled(); + }); + it('should return early when escape hatch is open', async () => { epochCache.isEscapeHatchOpenAtSlot.mockResolvedValueOnce(true); - const handleSpy = jest.spyOn(validatorClient.getBlockProposalHandler(), 'handleBlockProposal'); + const handleSpy = jest.spyOn(validatorClient.getProposalHandler(), 'handleBlockProposal'); const isValid = await validatorClient.validateBlockProposal(proposal, sender); expect(isValid).toBe(false); @@ -458,7 +477,10 @@ describe('ValidatorClient', () => { it('should attest to a checkpoint proposal after validating a block for that slot', async () => { const addCheckpointAttestationsSpy = jest.spyOn(p2pClient, 'addOwnCheckpointAttestations'); - const uploadBlobsSpy = jest.spyOn(validatorClient, 'uploadBlobsForCheckpoint'); + const uploadBlobsSpy = jest.spyOn( + validatorClient.getProposalHandler() as TestProposalHandler, + 'tryUploadBlobsForCheckpoint', + ); const didValidate = await validatorClient.validateBlockProposal(proposal, sender); expect(didValidate).toBe(true); @@ -473,10 +495,15 @@ describe('ValidatorClient', () => { }, }); + // Mock validateCheckpointProposal to pass, so handleCheckpointProposal runs its + // own checks (signature, fee modifier) and then proceeds to blob upload. + const validateCheckpointSpy = jest + .spyOn(validatorClient.getProposalHandler(), 'validateCheckpointProposal') + .mockResolvedValue({ isValid: true }); + // Enable blob upload for this attestation blobClient.canUpload.mockReturnValue(true); - validatorClient.updateConfig({ skipCheckpointProposalValidation: true }); const attestations = await validatorClient.attestToCheckpointProposal(checkpointProposal, sender); expect(attestations).toBeDefined(); @@ -485,6 +512,7 @@ describe('ValidatorClient', () => { expect(uploadBlobsSpy).toHaveBeenCalled(); uploadBlobsSpy.mockRestore(); + validateCheckpointSpy.mockRestore(); }); it('should not attest to a checkpoint proposal that references a middle block instead of the last', async () => { @@ -776,7 +804,7 @@ describe('ValidatorClient', () => { // blocks in the same checkpoint share the same checkpointNumber, they will always // compute the same inHash from the same L1 messages. If a malicious proposal has a // different inHash, it will fail the existing validation at lines 192-200 in - // block_proposal_handler.ts. + // proposal_handler.ts. }); it('should validate proposals in fisherman mode but not create or broadcast attestations', async () => { @@ -881,7 +909,10 @@ describe('ValidatorClient', () => { blockSource.getBlocksForSlot.mockResolvedValue([mockBlock]); const proposal = await makeCheckpointProposal({ lastBlock: {} }); - await validatorClient.uploadBlobsForCheckpoint(proposal, proposalInfo); + await (validatorClient.getProposalHandler() as TestProposalHandler).uploadBlobsForCheckpoint( + proposal, + proposalInfo, + ); expect(blockSource.getBlocksForSlot).toHaveBeenCalledWith(proposal.slotNumber); expect(blobClient.sendBlobsToFilestore).toHaveBeenCalled(); @@ -891,7 +922,10 @@ describe('ValidatorClient', () => { blockSource.getBlockHeaderByArchive.mockResolvedValue(undefined); const proposal = await makeCheckpointProposal({ lastBlock: {} }); - await validatorClient.uploadBlobsForCheckpoint(proposal, proposalInfo); + await (validatorClient.getProposalHandler() as TestProposalHandler).uploadBlobsForCheckpoint( + proposal, + proposalInfo, + ); expect(blobClient.sendBlobsToFilestore).not.toHaveBeenCalled(); }); @@ -903,7 +937,9 @@ describe('ValidatorClient', () => { blobClient.sendBlobsToFilestore.mockRejectedValue(new Error('upload failed')); const proposal = await makeCheckpointProposal({ lastBlock: {} }); - await expect(validatorClient.uploadBlobsForCheckpoint(proposal, proposalInfo)).resolves.toBeUndefined(); + await expect( + (validatorClient.getProposalHandler() as TestProposalHandler).uploadBlobsForCheckpoint(proposal, proposalInfo), + ).resolves.toBeUndefined(); }); }); @@ -1068,8 +1104,11 @@ describe('ValidatorClient', () => { }); /** Exposes protected methods for direct testing */ -class TestValidatorClient extends ValidatorClient { +class TestProposalHandler extends ProposalHandler { declare public uploadBlobsForCheckpoint: ( - ...args: Parameters + ...args: Parameters ) => Promise; + declare public tryUploadBlobsForCheckpoint: ( + ...args: Parameters + ) => void; } diff --git a/yarn-project/validator-client/src/validator.ts b/yarn-project/validator-client/src/validator.ts index 6293175d54da..6b37f0606c0d 100644 --- a/yarn-project/validator-client/src/validator.ts +++ b/yarn-project/validator-client/src/validator.ts @@ -1,7 +1,5 @@ import type { BlobClientInterface } from '@aztec/blob-client/client'; -import { type Blob, getBlobsPerL1Block } from '@aztec/blob-lib'; import type { EpochCache } from '@aztec/epoch-cache'; -import { validateFeeAssetPriceModifier } from '@aztec/ethereum/contracts'; import { BlockNumber, CheckpointNumber, @@ -10,11 +8,9 @@ import { SlotNumber, } from '@aztec/foundation/branded-types'; import { Fr } from '@aztec/foundation/curves/bn254'; -import { TimeoutError } from '@aztec/foundation/error'; import type { EthAddress } from '@aztec/foundation/eth-address'; import type { Signature } from '@aztec/foundation/eth-signature'; -import { type LogData, type Logger, createLogger } from '@aztec/foundation/log'; -import { retryUntil } from '@aztec/foundation/retry'; +import { type Logger, createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { sleep } from '@aztec/foundation/sleep'; import { DateProvider } from '@aztec/foundation/timer'; @@ -23,9 +19,8 @@ import type { DuplicateAttestationInfo, DuplicateProposalInfo, P2P, PeerId } fro import { AuthRequest, AuthResponse, BlockProposalValidator, ReqRespSubProtocol } from '@aztec/p2p'; import { OffenseType, WANT_TO_SLASH_EVENT, type Watcher, type WatcherEmitter } from '@aztec/slasher'; import type { AztecAddress } from '@aztec/stdlib/aztec-address'; -import type { CommitteeAttestationsAndSigners, L2Block, L2BlockSink, L2BlockSource } from '@aztec/stdlib/block'; -import { validateCheckpoint } from '@aztec/stdlib/checkpoint'; -import { getEpochAtSlot, getTimestampForSlot } from '@aztec/stdlib/epoch-helpers'; +import type { CommitteeAttestationsAndSigners, L2BlockSink, L2BlockSource } from '@aztec/stdlib/block'; +import { getEpochAtSlot } from '@aztec/stdlib/epoch-helpers'; import type { CreateCheckpointProposalLastBlockData, ITxProvider, @@ -33,7 +28,7 @@ import type { ValidatorClientFullConfig, WorldStateSynchronizer, } from '@aztec/stdlib/interfaces/server'; -import { type L1ToL2MessageSource, accumulateCheckpointOutHashes } from '@aztec/stdlib/messaging'; +import type { L1ToL2MessageSource } from '@aztec/stdlib/messaging'; import { type BlockProposal, type BlockProposalOptions, @@ -43,7 +38,7 @@ import { type CheckpointProposalOptions, } from '@aztec/stdlib/p2p'; import type { CheckpointHeader } from '@aztec/stdlib/rollup'; -import type { BlockHeader, CheckpointGlobalVariables, Tx } from '@aztec/stdlib/tx'; +import type { BlockHeader, Tx } from '@aztec/stdlib/tx'; import { AttestationTimeoutError } from '@aztec/stdlib/validators'; import { type TelemetryClient, type Tracer, getTelemetryClient } from '@aztec/telemetry-client'; import { createHASigner } from '@aztec/validator-ha-signer/factory'; @@ -53,13 +48,13 @@ import type { ValidatorHASigner } from '@aztec/validator-ha-signer/validator-ha- import { EventEmitter } from 'events'; import type { TypedDataDefinition } from 'viem'; -import { BlockProposalHandler, type BlockProposalValidationFailureReason } from './block_proposal_handler.js'; import type { FullNodeCheckpointsBuilder } from './checkpoint_builder.js'; import { ValidationService } from './duties/validation_service.js'; import { HAKeyStore } from './key_store/ha_key_store.js'; import type { ExtendedValidatorKeyStore } from './key_store/interface.js'; import { NodeKeystoreAdapter } from './key_store/node_keystore_adapter.js'; import { ValidatorMetrics } from './metrics.js'; +import { type BlockProposalValidationFailureReason, ProposalHandler } from './proposal_handler.js'; // We maintain a set of proposers who have proposed invalid blocks. // Just cap the set to avoid unbounded growth. @@ -102,11 +97,7 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) private keyStore: ExtendedValidatorKeyStore, private epochCache: EpochCache, private p2pClient: P2P, - private blockProposalHandler: BlockProposalHandler, - private blockSource: L2BlockSource, - private checkpointsBuilder: FullNodeCheckpointsBuilder, - private worldState: WorldStateSynchronizer, - private l1ToL2MessageSource: L1ToL2MessageSource, + private proposalHandler: ProposalHandler, private config: ValidatorClientFullConfig, private blobClient: BlobClientInterface, private haSigner: ValidatorHASigner | undefined, @@ -203,7 +194,7 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) txsPermitted: !config.disableTransactions, maxTxsPerBlock: config.validateMaxTxsPerBlock, }); - const blockProposalHandler = new BlockProposalHandler( + const proposalHandler = new ProposalHandler( checkpointsBuilder, worldState, blockSource, @@ -212,6 +203,7 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) blockProposalValidator, epochCache, config, + blobClient, metrics, dateProvider, telemetry, @@ -235,11 +227,7 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) validatorKeyStore, epochCache, p2pClient, - blockProposalHandler, - blockSource, - checkpointsBuilder, - worldState, - l1ToL2MessageSource, + proposalHandler, config, blobClient, haSigner, @@ -256,8 +244,8 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) .filter(addr => !this.config.disabledValidators.some(disabled => disabled.equals(addr))); } - public getBlockProposalHandler() { - return this.blockProposalHandler; + public getProposalHandler() { + return this.proposalHandler; } public signWithAddress(addr: EthAddress, msg: TypedDataDefinition, context: SigningContext) { @@ -385,13 +373,12 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) return false; } - // Ignore proposals from ourselves (may happen in HA setups) + // Log self-proposals from HA peers (same validator key on different nodes) if (this.getValidatorAddresses().some(addr => addr.equals(proposer))) { - this.log.debug(`Ignoring block proposal from self for slot ${slotNumber}`, { + this.log.verbose(`Processing block proposal from HA peer for slot ${slotNumber}`, { proposer: proposer.toString(), slotNumber, }); - return false; } // Check if we're in the committee (for metrics purposes) @@ -416,7 +403,7 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) alwaysReexecuteBlockProposals || this.blobClient.canUpload(); - const validationResult = await this.blockProposalHandler.handleBlockProposal( + const validationResult = await this.proposalHandler.handleBlockProposal( proposal, proposalSender, !!shouldReexecute && !escapeHatchOpen, @@ -490,14 +477,8 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) return undefined; } - // Reject proposals with invalid signatures - if (!proposer) { - this.log.warn(`Received checkpoint proposal with invalid signature for slot ${slotNumber}`); - return undefined; - } - // Ignore proposals from ourselves (may happen in HA setups) - if (this.getValidatorAddresses().some(addr => addr.equals(proposer))) { + if (proposer && this.getValidatorAddresses().some(addr => addr.equals(proposer))) { this.log.debug(`Ignoring block proposal from self for slot ${slotNumber}`, { proposer: proposer.toString(), slotNumber, @@ -505,44 +486,31 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) return undefined; } - // Validate fee asset price modifier is within allowed range - if (!validateFeeAssetPriceModifier(proposal.feeAssetPriceModifier)) { - this.log.warn( - `Received checkpoint proposal with invalid feeAssetPriceModifier ${proposal.feeAssetPriceModifier} for slot ${slotNumber}`, - ); - return undefined; - } - - // Check that I have any address in current committee before attesting + // Check that I have any address in the committee where this checkpoint will land before attesting const inCommittee = await this.epochCache.filterInCommittee(slotNumber, this.getValidatorAddresses()); const partOfCommittee = inCommittee.length > 0; const proposalInfo = { slotNumber, archive: proposal.archive.toString(), - proposer: proposer.toString(), + proposer: proposer?.toString(), }; this.log.info(`Received checkpoint proposal for slot ${slotNumber}`, { ...proposalInfo, fishermanMode: this.config.fishermanMode || false, }); - // Validate the checkpoint proposal before attesting (unless skipCheckpointProposalValidation is set) + // Validate the checkpoint proposal and upload blobs (unless skipCheckpointProposalValidation is set) if (this.config.skipCheckpointProposalValidation) { this.log.warn(`Skipping checkpoint proposal validation for slot ${slotNumber}`, proposalInfo); } else { - const validationResult = await this.validateCheckpointProposal(proposal, proposalInfo); + const validationResult = await this.proposalHandler.handleCheckpointProposal(proposal, proposalInfo); if (!validationResult.isValid) { this.log.warn(`Checkpoint proposal validation failed: ${validationResult.reason}`, proposalInfo); return undefined; } } - // Upload blobs to filestore if we can (fire and forget) - if (this.blobClient.canUpload()) { - void this.uploadBlobsForCheckpoint(proposal, proposalInfo); - } - // Check that I have any address in current committee before attesting // In fisherman mode, we still create attestations for validation even if not in committee if (!partOfCommittee && !this.config.fishermanMode) { @@ -637,201 +605,6 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) return attestations; } - /** - * Validates a checkpoint proposal by building the full checkpoint and comparing it with the proposal. - * @returns Validation result with isValid flag and reason if invalid. - */ - private async validateCheckpointProposal( - proposal: CheckpointProposalCore, - proposalInfo: LogData, - ): Promise<{ isValid: true } | { isValid: false; reason: string }> { - const slot = proposal.slotNumber; - - // Timeout block syncing at the start of the next slot - const config = this.checkpointsBuilder.getConfig(); - const nextSlotTimestampSeconds = Number(getTimestampForSlot(SlotNumber(slot + 1), config)); - const timeoutSeconds = Math.max(1, nextSlotTimestampSeconds - Math.floor(this.dateProvider.now() / 1000)); - - // Wait for last block to sync by archive - let lastBlockHeader: BlockHeader | undefined; - try { - lastBlockHeader = await retryUntil( - async () => { - await this.blockSource.syncImmediate(); - return this.blockSource.getBlockHeaderByArchive(proposal.archive); - }, - `waiting for block with archive ${proposal.archive.toString()} for slot ${slot}`, - timeoutSeconds, - 0.5, - ); - } catch (err) { - if (err instanceof TimeoutError) { - this.log.warn(`Timed out waiting for block with archive matching checkpoint proposal`, proposalInfo); - return { isValid: false, reason: 'last_block_not_found' }; - } - this.log.error(`Error fetching last block for checkpoint proposal`, err, proposalInfo); - return { isValid: false, reason: 'block_fetch_error' }; - } - - if (!lastBlockHeader) { - this.log.warn(`Last block not found for checkpoint proposal`, proposalInfo); - return { isValid: false, reason: 'last_block_not_found' }; - } - - // Get all full blocks for the slot and checkpoint - const blocks = await this.blockSource.getBlocksForSlot(slot); - if (blocks.length === 0) { - this.log.warn(`No blocks found for slot ${slot}`, proposalInfo); - return { isValid: false, reason: 'no_blocks_for_slot' }; - } - - // Ensure the last block for this slot matches the archive in the checkpoint proposal - if (!blocks.at(-1)?.archive.root.equals(proposal.archive)) { - this.log.warn(`Last block archive mismatch for checkpoint proposal`, proposalInfo); - return { isValid: false, reason: 'last_block_archive_mismatch' }; - } - - this.log.debug(`Found ${blocks.length} blocks for slot ${slot}`, { - ...proposalInfo, - blockNumbers: blocks.map(b => b.number), - }); - - // Get checkpoint constants from first block - const firstBlock = blocks[0]; - const constants = this.extractCheckpointConstants(firstBlock); - const checkpointNumber = firstBlock.checkpointNumber; - - // Get L1-to-L2 messages for this checkpoint - const l1ToL2Messages = await this.l1ToL2MessageSource.getL1ToL2Messages(checkpointNumber); - - // Collect the out hashes of all the checkpoints before this one in the same epoch - const epoch = getEpochAtSlot(slot, this.epochCache.getL1Constants()); - const previousCheckpointOutHashes = (await this.blockSource.getCheckpointsDataForEpoch(epoch)) - .filter(c => c.checkpointNumber < checkpointNumber) - .map(c => c.checkpointOutHash); - - // Fork world state at the block before the first block - const parentBlockNumber = BlockNumber(firstBlock.number - 1); - const fork = await this.worldState.fork(parentBlockNumber); - - try { - // Create checkpoint builder with all existing blocks - const checkpointBuilder = await this.checkpointsBuilder.openCheckpoint( - checkpointNumber, - constants, - proposal.feeAssetPriceModifier, - l1ToL2Messages, - previousCheckpointOutHashes, - fork, - blocks, - this.log.getBindings(), - ); - - // Complete the checkpoint to get computed values - const computedCheckpoint = await checkpointBuilder.completeCheckpoint(); - - // Compare checkpoint header with proposal - if (!computedCheckpoint.header.equals(proposal.checkpointHeader)) { - this.log.warn(`Checkpoint header mismatch`, { - ...proposalInfo, - computed: computedCheckpoint.header.toInspect(), - proposal: proposal.checkpointHeader.toInspect(), - }); - return { isValid: false, reason: 'checkpoint_header_mismatch' }; - } - - // Compare archive root with proposal - if (!computedCheckpoint.archive.root.equals(proposal.archive)) { - this.log.warn(`Archive root mismatch`, { - ...proposalInfo, - computed: computedCheckpoint.archive.root.toString(), - proposal: proposal.archive.toString(), - }); - return { isValid: false, reason: 'archive_mismatch' }; - } - - // Check that the accumulated epoch out hash matches the value in the proposal. - // The epoch out hash is the accumulated hash of all checkpoint out hashes in the epoch. - const checkpointOutHash = computedCheckpoint.getCheckpointOutHash(); - const computedEpochOutHash = accumulateCheckpointOutHashes([...previousCheckpointOutHashes, checkpointOutHash]); - const proposalEpochOutHash = proposal.checkpointHeader.epochOutHash; - if (!computedEpochOutHash.equals(proposalEpochOutHash)) { - this.log.warn(`Epoch out hash mismatch`, { - proposalEpochOutHash: proposalEpochOutHash.toString(), - computedEpochOutHash: computedEpochOutHash.toString(), - checkpointOutHash: checkpointOutHash.toString(), - previousCheckpointOutHashes: previousCheckpointOutHashes.map(h => h.toString()), - ...proposalInfo, - }); - return { isValid: false, reason: 'out_hash_mismatch' }; - } - - // Final round of validations on the checkpoint, just in case. - try { - validateCheckpoint(computedCheckpoint, { - rollupManaLimit: this.checkpointsBuilder.getConfig().rollupManaLimit, - maxDABlockGas: this.config.validateMaxDABlockGas, - maxL2BlockGas: this.config.validateMaxL2BlockGas, - maxTxsPerBlock: this.config.validateMaxTxsPerBlock, - maxTxsPerCheckpoint: this.config.validateMaxTxsPerCheckpoint, - }); - } catch (err) { - this.log.warn(`Checkpoint validation failed: ${err}`, proposalInfo); - return { isValid: false, reason: 'checkpoint_validation_failed' }; - } - - this.log.verbose(`Checkpoint proposal validation successful for slot ${slot}`, proposalInfo); - return { isValid: true }; - } finally { - await fork.close(); - } - } - - /** - * Extract checkpoint global variables from a block. - */ - private extractCheckpointConstants(block: L2Block): CheckpointGlobalVariables { - const gv = block.header.globalVariables; - return { - chainId: gv.chainId, - version: gv.version, - slotNumber: gv.slotNumber, - timestamp: gv.timestamp, - coinbase: gv.coinbase, - feeRecipient: gv.feeRecipient, - gasFees: gv.gasFees, - }; - } - - /** - * Uploads blobs for a checkpoint to the filestore (fire and forget). - */ - protected async uploadBlobsForCheckpoint(proposal: CheckpointProposalCore, proposalInfo: LogData): Promise { - try { - const lastBlockHeader = await this.blockSource.getBlockHeaderByArchive(proposal.archive); - if (!lastBlockHeader) { - this.log.warn(`Failed to get last block header for blob upload`, proposalInfo); - return; - } - - const blocks = await this.blockSource.getBlocksForSlot(proposal.slotNumber); - if (blocks.length === 0) { - this.log.warn(`No blocks found for blob upload`, proposalInfo); - return; - } - - const blobFields = blocks.flatMap(b => b.toBlobFields()); - const blobs: Blob[] = await getBlobsPerL1Block(blobFields); - await this.blobClient.sendBlobsToFilestore(blobs); - this.log.debug(`Uploaded ${blobs.length} blobs to filestore for checkpoint at slot ${proposal.slotNumber}`, { - ...proposalInfo, - numBlobs: blobs.length, - }); - } catch (err) { - this.log.warn(`Failed to upload blobs for checkpoint: ${err}`, proposalInfo); - } - } - private slashInvalidBlock(proposal: BlockProposal) { const proposer = proposal.getSender();