Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: skip serializing gossip block when persisting to db #5426

Merged
merged 3 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 10 additions & 4 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export async function importBlock(
opts: ImportBlockOpts
): Promise<void> {
const {blockInput, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock;
const {block} = blockInput;
const {block, serializedData} = blockInput;
const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
const blockRootHex = toHexString(blockRoot);
const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime());
Expand All @@ -60,8 +60,14 @@ export async function importBlock(
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;

// 1. Persist block to hot DB (pre-emptively)

await this.db.block.add(block);
if (serializedData) {
// skip serializing data if we already have it
this.metrics?.importBlock.persistBlockWithSerializedDataCount.inc();
await this.db.block.putBinary(this.db.block.getId(block), serializedData);
} else {
this.metrics?.importBlock.persistBlockNoSerializedDataCount.inc();
await this.db.block.add(block);
dapplion marked this conversation as resolved.
Show resolved Hide resolved
}
this.logger.debug("Persisted block to hot DB", {
slot: block.message.slot,
root: blockRootHex,
Expand Down Expand Up @@ -245,7 +251,7 @@ export async function importBlock(
// Only track "recent" blocks. Otherwise sync can distort this metrics heavily.
// We want to track recent blocks coming from gossip, unknown block sync, and API.
if (delaySec < 64 * this.config.SECONDS_PER_SLOT) {
this.metrics.elapsedTimeTillBecomeHead.observe(delaySec);
this.metrics.importBlock.elapsedTimeTillBecomeHead.observe(delaySec);
}
}

Expand Down
10 changes: 5 additions & 5 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {allForks} from "@lodestar/types";
import {WithOptionalBytes, allForks} from "@lodestar/types";
import {toHex} from "@lodestar/utils";
import {JobItemQueue} from "../../util/queue/index.js";
import {Metrics} from "../../metrics/metrics.js";
Expand All @@ -18,10 +18,10 @@ const QUEUE_MAX_LENGTH = 256;
* BlockProcessor processes block jobs in a queued fashion, one after the other.
*/
export class BlockProcessor {
readonly jobQueue: JobItemQueue<[BlockInput[], ImportBlockOpts], void>;
readonly jobQueue: JobItemQueue<[WithOptionalBytes<BlockInput>[], ImportBlockOpts], void>;

constructor(chain: BeaconChain, metrics: Metrics | null, opts: BlockProcessOpts, signal: AbortSignal) {
this.jobQueue = new JobItemQueue<[BlockInput[], ImportBlockOpts], void>(
this.jobQueue = new JobItemQueue<[WithOptionalBytes<BlockInput>[], ImportBlockOpts], void>(
(job, importOpts) => {
return processBlocks.call(chain, job, {...opts, ...importOpts});
},
Expand All @@ -30,7 +30,7 @@ export class BlockProcessor {
);
}

async processBlocksJob(job: BlockInput[], opts: ImportBlockOpts = {}): Promise<void> {
async processBlocksJob(job: WithOptionalBytes<BlockInput>[], opts: ImportBlockOpts = {}): Promise<void> {
await this.jobQueue.push(job, opts);
}
}
Expand All @@ -47,7 +47,7 @@ export class BlockProcessor {
*/
export async function processBlocks(
this: BeaconChain,
blocks: BlockInput[],
blocks: WithOptionalBytes<BlockInput>[],
opts: BlockProcessOpts & ImportBlockOpts
): Promise<void> {
if (blocks.length === 0) {
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {CachedBeaconStateAllForks, computeEpochAtSlot} from "@lodestar/state-transition";
import {MaybeValidExecutionStatus} from "@lodestar/fork-choice";
import {allForks, deneb, Slot} from "@lodestar/types";
import {allForks, deneb, Slot, WithOptionalBytes} from "@lodestar/types";
import {ForkSeq, MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";

Expand Down Expand Up @@ -92,7 +92,7 @@ export type ImportBlockOpts = {
* A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and ready to import
*/
export type FullyVerifiedBlock = {
blockInput: BlockInput;
blockInput: WithOptionalBytes<BlockInput>;
postState: CachedBeaconStateAllForks;
parentBlockSlot: Slot;
proposerBalanceDelta: number;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {ChainForkConfig} from "@lodestar/config";
import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {Slot} from "@lodestar/types";
import {Slot, WithOptionalBytes} from "@lodestar/types";
import {toHexString} from "@lodestar/utils";
import {IClock} from "../../util/clock.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
Expand All @@ -21,14 +21,14 @@ import {BlockInput, ImportBlockOpts} from "./types.js";
*/
export function verifyBlocksSanityChecks(
chain: {forkChoice: IForkChoice; clock: IClock; config: ChainForkConfig},
blocks: BlockInput[],
blocks: WithOptionalBytes<BlockInput>[],
opts: ImportBlockOpts
): {relevantBlocks: BlockInput[]; parentSlots: Slot[]; parentBlock: ProtoBlock | null} {
): {relevantBlocks: WithOptionalBytes<BlockInput>[]; parentSlots: Slot[]; parentBlock: ProtoBlock | null} {
if (blocks.length === 0) {
throw Error("Empty partiallyVerifiedBlocks");
}

const relevantBlocks: BlockInput[] = [];
const relevantBlocks: WithOptionalBytes<BlockInput>[] = [];
const parentSlots: Slot[] = [];
let parentBlock: ProtoBlock | null = null;

Expand Down
18 changes: 15 additions & 3 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,19 @@ import {
PubkeyIndexMap,
} from "@lodestar/state-transition";
import {BeaconConfig} from "@lodestar/config";
import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, deneb, Wei} from "@lodestar/types";
import {
allForks,
UintNum64,
Root,
phase0,
Slot,
RootHex,
Epoch,
ValidatorIndex,
deneb,
Wei,
WithOptionalBytes,
} from "@lodestar/types";
import {CheckpointWithHex, ExecutionStatus, IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {ProcessShutdownCallback} from "@lodestar/validator";
import {Logger, pruneSetToMax, toHex} from "@lodestar/utils";
Expand Down Expand Up @@ -450,11 +462,11 @@ export class BeaconChain implements IBeaconChain {
return blobsSidecar;
}

async processBlock(block: BlockInput, opts?: ImportBlockOpts): Promise<void> {
async processBlock(block: WithOptionalBytes<BlockInput>, opts?: ImportBlockOpts): Promise<void> {
return this.blockProcessor.processBlocksJob([block], opts);
}

async processChainSegment(blocks: BlockInput[], opts?: ImportBlockOpts): Promise<void> {
async processChainSegment(blocks: WithOptionalBytes<BlockInput>[], opts?: ImportBlockOpts): Promise<void> {
return this.blockProcessor.processBlocksJob(blocks, opts);
}

Expand Down
18 changes: 15 additions & 3 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, deneb, Wei} from "@lodestar/types";
import {
allForks,
UintNum64,
Root,
phase0,
Slot,
RootHex,
Epoch,
ValidatorIndex,
deneb,
Wei,
WithOptionalBytes,
} from "@lodestar/types";
import {CachedBeaconStateAllForks, Index2PubkeyCache, PubkeyIndexMap} from "@lodestar/state-transition";
import {BeaconConfig} from "@lodestar/config";
import {CompositeTypeAny, TreeView, Type} from "@chainsafe/ssz";
Expand Down Expand Up @@ -122,9 +134,9 @@ export interface IBeaconChain {
produceBlindedBlock(blockAttributes: BlockAttributes): Promise<{block: allForks.BlindedBeaconBlock; blockValue: Wei}>;

/** Process a block until complete */
processBlock(block: BlockInput, opts?: ImportBlockOpts): Promise<void>;
processBlock(block: WithOptionalBytes<BlockInput>, opts?: ImportBlockOpts): Promise<void>;
/** Process a chain of blocks until complete */
processChainSegment(blocks: BlockInput[], opts?: ImportBlockOpts): Promise<void>;
processChainSegment(blocks: WithOptionalBytes<BlockInput>[], opts?: ImportBlockOpts): Promise<void>;

getStatus(): phase0.Status;

Expand Down
20 changes: 15 additions & 5 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -496,11 +496,21 @@ export function createLodestarMetrics(
buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4],
}),
},
elapsedTimeTillBecomeHead: register.histogram({
name: "lodestar_gossip_block_elapsed_time_till_become_head",
help: "Time elapsed between block slot time and the time block becomes head",
buckets: [0.5, 1, 2, 4, 6, 12],
}),
importBlock: {
persistBlockNoSerializedDataCount: register.gauge({
name: "lodestar_import_block_persist_block_no_serialized_data_count",
help: "Count persisting block with no serialized data",
}),
persistBlockWithSerializedDataCount: register.gauge({
name: "lodestar_import_block_persist_block_with_serialized_data_count",
help: "Count persisting block with serialized data",
}),
elapsedTimeTillBecomeHead: register.histogram({
name: "lodestar_gossip_block_elapsed_time_till_become_head",
help: "Time elapsed between block slot time and the time block becomes head",
buckets: [0.5, 1, 2, 4, 6, 12],
}),
},
engineNotifyNewPayloadResult: register.gauge<"result">({
name: "lodestar_execution_engine_notify_new_payload_result_total",
help: "The total result of calling notifyNewPayload execution engine api",
Expand Down
12 changes: 8 additions & 4 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {peerIdFromString} from "@libp2p/peer-id";
import {toHexString} from "@chainsafe/ssz";
import {BeaconConfig} from "@lodestar/config";
import {Logger, prettyBytes} from "@lodestar/utils";
import {Root, Slot, ssz} from "@lodestar/types";
import {Root, Slot, ssz, WithBytes} from "@lodestar/types";
import {ForkName, ForkSeq} from "@lodestar/params";
import {Metrics} from "../../metrics/index.js";
import {OpSource} from "../../metrics/validatorMonitor.js";
Expand Down Expand Up @@ -124,7 +124,11 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
}
}

function handleValidBeaconBlock(blockInput: BlockInput, peerIdStr: string, seenTimestampSec: number): void {
function handleValidBeaconBlock(
blockInput: WithBytes<BlockInput>,
peerIdStr: string,
seenTimestampSec: number
): void {
const signedBlock = blockInput.block;

// Handler - MUST NOT `await`, to allow validation result to be propagated
Expand Down Expand Up @@ -178,7 +182,7 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH

const blockInput = getBlockInput.preDeneb(config, signedBlock);
await validateBeaconBlock(blockInput, topic.fork, peerIdStr, seenTimestampSec);
handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec);
handleValidBeaconBlock({...blockInput, serializedData}, peerIdStr, seenTimestampSec);
},

[GossipType.beacon_block_and_blobs_sidecar]: async ({serializedData}, topic, peerIdStr, seenTimestampSec) => {
Expand All @@ -193,7 +197,7 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
const blockInput = getBlockInput.postDeneb(config, beaconBlock, blobsSidecar);
await validateBeaconBlock(blockInput, topic.fork, peerIdStr, seenTimestampSec);
validateGossipBlobsSidecar(beaconBlock, blobsSidecar);
handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec);
handleValidBeaconBlock({...blockInput, serializedData}, peerIdStr, seenTimestampSec);
},

[GossipType.beacon_aggregate_and_proof]: async ({serializedData}, topic, _peer, seenTimestampSec) => {
Expand Down
2 changes: 2 additions & 0 deletions packages/types/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ export enum BlockSource {

export type SlotRootHex = {slot: Slot; root: RootHex};
export type SlotOptionalRoot = {slot: Slot; root?: RootHex};
export type WithBytes<T extends Record<string, unknown>> = T & {serializedData: Uint8Array};
export type WithOptionalBytes<T extends Record<string, unknown>> = T & {serializedData?: Uint8Array};