Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batch io operations when verifying & importing block #5473

Merged
merged 5 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ export function getBeaconBlockApi({
return this.publishBlock(signedBlock, {ignoreIfKnown: true});
},

async publishBlock(signedBlock, opts?: ImportBlockOpts) {
async publishBlock(signedBlock, opts: ImportBlockOpts = {}) {
const seenTimestampSec = Date.now() / 1000;

// Simple implementation of a pending block queue. Keeping the block here recycles the API logic, and keeps the
Expand Down Expand Up @@ -225,7 +225,8 @@ export function getBeaconBlockApi({
() => network.gossip.publishBeaconBlockMaybeBlobs(blockForImport) as Promise<unknown>,

() =>
chain.processBlock(blockForImport, opts).catch((e) => {
// there is no rush to persist block since we published it to gossip anyway
chain.processBlock(blockForImport, {...opts, eagerPersistBlock: false}).catch((e) => {
if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) {
network.events.emit(NetworkEvent.unknownBlockParent, blockForImport, network.peerId.toString());
}
Expand Down
51 changes: 18 additions & 33 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import {ChainEvent, ReorgEventData} from "../emitter.js";
import {REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js";
import {RegenCaller} from "../regen/interface.js";
import type {BeaconChain} from "../chain.js";
import {BlockInputType, FullyVerifiedBlock, ImportBlockOpts, AttestationImportOpt} from "./types.js";
import {FullyVerifiedBlock, ImportBlockOpts, AttestationImportOpt} from "./types.js";
import {getCheckpointFromState} from "./utils/checkpoint.js";
import {writeBlockInputToDb} from "./writeBlockInputToDb.js";

/**
* Fork-choice allows to import attestations from current (0) or past (1) epoch.
Expand Down Expand Up @@ -51,7 +52,7 @@ export async function importBlock(
opts: ImportBlockOpts
): Promise<void> {
const {blockInput, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock;
const {block, serializedData, source} = blockInput;
const {block, source} = blockInput;
const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
const blockRootHex = toHexString(blockRoot);
const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime());
Expand All @@ -60,28 +61,9 @@ export async function importBlock(
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;

// 1. Persist block to hot DB (pre-emptively)
if (serializedData) {
// skip serializing data if we already have it
this.metrics?.importBlock.persistBlockWithSerializedDataCount.inc();
await this.db.block.putBinary(this.db.block.getId(block), serializedData);
} else {
this.metrics?.importBlock.persistBlockNoSerializedDataCount.inc();
await this.db.block.add(block);
}
this.logger.debug("Persisted block to hot DB", {
slot: block.message.slot,
root: blockRootHex,
});

if (blockInput.type === BlockInputType.postDeneb) {
const {blobs} = blockInput;
// NOTE: Old blobs are pruned on archive
await this.db.blobsSidecar.add(blobs);
this.logger.debug("Persisted blobsSidecar to hot DB", {
blobsLen: blobs.blobs.length,
slot: blobs.beaconBlockSlot,
root: toHexString(blobs.beaconBlockRoot),
});
// If eagerPersistBlock = true we do that in verifyBlocksInEpoch to batch all I/O operations to save block time to head
if (!opts.eagerPersistBlock) {
await writeBlockInputToDb.call(this, [blockInput]);
}

// 2. Import block to fork choice
Expand Down Expand Up @@ -286,15 +268,18 @@ export async function importBlock(
// - Persist state witness
// - Use block's syncAggregate
if (blockEpoch >= this.config.ALTAIR_FORK_EPOCH) {
try {
this.lightClientServer.onImportBlockHead(
block.message as allForks.AllForksLightClient["BeaconBlock"],
postState as CachedBeaconStateAltair,
parentBlockSlot
);
} catch (e) {
this.logger.error("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error);
}
// we want to import block asap so do this in the next event loop
setTimeout(() => {
try {
this.lightClientServer.onImportBlockHead(
block.message as allForks.AllForksLightClient["BeaconBlock"],
postState as CachedBeaconStateAltair,
parentBlockSlot
);
} catch (e) {
this.logger.verbose("Error lightClientServer.onImportBlock", {slot: block.message.slot}, e as Error);
}
}, 0);
}
}

Expand Down
19 changes: 19 additions & 0 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {importBlock} from "./importBlock.js";
import {assertLinearChainSegment} from "./utils/chainSegment.js";
import {BlockInput, FullyVerifiedBlock, ImportBlockOpts} from "./types.js";
import {verifyBlocksSanityChecks} from "./verifyBlocksSanityChecks.js";
import {removeEagerlyPersistedBlockInputs} from "./writeBlockInputToDb.js";
export {ImportBlockOpts, AttestationImportOpt} from "./types.js";

const QUEUE_MAX_LENGTH = 256;
Expand Down Expand Up @@ -141,6 +142,24 @@ export async function processBlocks(
}
}

// Clean db if we don't have blocks in forkchoice but already persisted them to db
//
// NOTE: this function is awaited to ensure that DB size remains constant, otherwise an attacker may bloat the
// disk with big malicious payloads. Our sequential block importer will wait for this promise before importing
// another block. The removal call error is not propagated since that would halt the chain.
//
// LOG: Because the error is not propagated and there's a risk of db bloat, the error is logged at warn level
// to alert the user of potential db bloat. This error _should_ never happen user must act and report to us
if (opts.eagerPersistBlock) {
await removeEagerlyPersistedBlockInputs.call(this, blocks).catch((e) => {
this.logger.warn(
"Error pruning eagerly imported block inputs, DB may grow in size if this error happens frequently",
{slot: blocks.map((block) => block.block.message.slot).join(",")},
e
);
});
}

throw err;
}
}
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ export type ImportBlockOpts = {
validBlobsSidecar?: boolean;
/** Seen timestamp seconds */
seenTimestampSec?: number;
/** Set to true if persist block right at verification time */
eagerPersistBlock?: boolean;
};

/**
Expand Down
11 changes: 9 additions & 2 deletions packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
isStateValidatorsNodesPopulated,
DataAvailableStatus,
} from "@lodestar/state-transition";
import {bellatrix} from "@lodestar/types";
import {WithOptionalBytes, bellatrix} from "@lodestar/types";
import {ForkName} from "@lodestar/params";
import {toHexString} from "@chainsafe/ssz";
import {ProtoBlock} from "@lodestar/fork-choice";
Expand All @@ -20,6 +20,7 @@ import {CAPELLA_OWL_BANNER} from "./utils/ownBanner.js";
import {verifyBlocksStateTransitionOnly} from "./verifyBlocksStateTransitionOnly.js";
import {verifyBlocksSignatures} from "./verifyBlocksSignatures.js";
import {verifyBlocksExecutionPayload, SegmentExecStatus} from "./verifyBlocksExecutionPayloads.js";
import {writeBlockInputToDb} from "./writeBlockInputToDb.js";

/**
* Verifies 1 or more blocks are fully valid; from a linear sequence of blocks.
Expand All @@ -35,7 +36,7 @@ import {verifyBlocksExecutionPayload, SegmentExecStatus} from "./verifyBlocksExe
export async function verifyBlocksInEpoch(
this: BeaconChain,
parentBlock: ProtoBlock,
blocksInput: BlockInput[],
blocksInput: WithOptionalBytes<BlockInput>[],
dataAvailabilityStatuses: DataAvailableStatus[],
opts: BlockProcessOpts & ImportBlockOpts
): Promise<{
Expand Down Expand Up @@ -84,6 +85,7 @@ export async function verifyBlocksInEpoch(
const abortController = new AbortController();

try {
// batch all I/O operations to reduce overhead
const [segmentExecStatus, {postStates, proposerBalanceDeltas}] = await Promise.all([
// Execution payloads
verifyBlocksExecutionPayload(this, parentBlock, blocks, preState0, abortController.signal, opts),
Expand All @@ -101,6 +103,11 @@ export async function verifyBlocksInEpoch(

// All signatures at once
verifyBlocksSignatures(this.bls, this.logger, this.metrics, preState0, blocks, opts),

// ideally we want to only persist blocks after verifying them however the reality is there are
// rarely invalid blocks we'll batch all I/O operation here to reduce the overhead if there's
// an error, we'll remove blocks not in forkchoice
opts.eagerPersistBlock ? writeBlockInputToDb.call(this, blocksInput) : Promise.resolve(),
]);

if (segmentExecStatus.execAborted === null && segmentExecStatus.mergeBlockFound !== null) {
Expand Down
81 changes: 81 additions & 0 deletions packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import {WithOptionalBytes, allForks, deneb} from "@lodestar/types";
import {toHex} from "@lodestar/utils";
import {BeaconChain} from "../chain.js";
import {BlockInput, BlockInputType} from "./types.js";

/**
* Persists block input data to DB. This operation must be eventually completed if a block is imported to the fork-choice.
* Else the node will be in an inconsistent state that can lead to being stuck.
*
* This operation may be performed before, during or after importing to the fork-choice. As long as errors
* are handled properly for eventual consistency.
*/
export async function writeBlockInputToDb(
this: BeaconChain,
blocksInput: WithOptionalBytes<BlockInput>[]
): Promise<void> {
const fnPromises: Promise<void>[] = [];

for (const blockInput of blocksInput) {
const {block, serializedData, type} = blockInput;
const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
const blockRootHex = toHex(blockRoot);
if (serializedData) {
// skip serializing data if we already have it
this.metrics?.importBlock.persistBlockWithSerializedDataCount.inc();
fnPromises.push(this.db.block.putBinary(this.db.block.getId(block), serializedData));
} else {
this.metrics?.importBlock.persistBlockNoSerializedDataCount.inc();
fnPromises.push(this.db.block.add(block));
}
this.logger.debug("Persist block to hot DB", {
slot: block.message.slot,
root: blockRootHex,
});

if (type === BlockInputType.postDeneb) {
const {blobs} = blockInput;
// NOTE: Old blobs are pruned on archive
fnPromises.push(this.db.blobsSidecar.add(blobs));
this.logger.debug("Persist blobsSidecar to hot DB", {
blobsLen: blobs.blobs.length,
slot: blobs.beaconBlockSlot,
root: toHex(blobs.beaconBlockRoot),
});
}
}

await Promise.all(fnPromises);
}

/**
* Prunes eagerly persisted block inputs only if not known to the fork-choice
*/
export async function removeEagerlyPersistedBlockInputs(
this: BeaconChain,
blockInputs: WithOptionalBytes<BlockInput>[]
): Promise<void> {
const blockToRemove: allForks.SignedBeaconBlock[] = [];
const blobsToRemove: deneb.BlobsSidecar[] = [];

for (const blockInput of blockInputs) {
const {block, type} = blockInput;
const blockRoot = toHex(this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message));
if (!this.forkChoice.hasBlockHex(blockRoot)) {
blockToRemove.push(block);

if (type === BlockInputType.postDeneb) {
blobsToRemove.push(blockInput.blobs);
this.db.blobsSidecar.remove(blockInput.blobs).catch((e) => {
this.logger.verbose("Error removing eagerly imported blobsSidecar", {blockRoot}, e);
});
}
}
}

await Promise.all([
// TODO: Batch DB operations not with Promise.all but with level db ops
this.db.block.batchRemove(blockToRemove),
this.db.blobsSidecar.batchRemove(blobsToRemove),
]);
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
blsVerifyOnMainThread: true,
// to track block process steps
seenTimestampSec,
// gossip block is validated, we want to process it asap
eagerPersistBlock: true,
})
.then(() => {
// Returns the delay between the start of `block.slot` and `current time`
Expand Down
3 changes: 3 additions & 0 deletions packages/beacon-node/src/sync/range/range.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) {
// when this runs, syncing is the most important thing and gossip is not likely to run
// so we can utilize worker threads to verify signatures
blsVerifyOnMainThread: false,
// we want to be safe to only persist blocks after verifying it to avoid any attacks that may cause our DB
// to grow too much
eagerPersistBlock: false,
};

if (this.opts?.disableProcessAsChainSegment) {
Expand Down
7 changes: 6 additions & 1 deletion packages/beacon-node/src/sync/unknownBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,12 @@ export class UnknownBlockSync {
// otherwise we can't utilize bls thread pool capacity and Gossip Job Wait Time can't be kept low consistently.
// See https://github.com/ChainSafe/lodestar/issues/3792
const res = await wrapError(
this.chain.processBlock(pendingBlock.blockInput, {ignoreIfKnown: true, blsVerifyOnMainThread: true})
this.chain.processBlock(pendingBlock.blockInput, {
ignoreIfKnown: true,
blsVerifyOnMainThread: true,
// block is validated with correct root, we want to process it as soon as possible
eagerPersistBlock: true,
})
);
pendingBlock.status = PendingBlockStatus.pending;

Expand Down