Skip to content

Commit

Permalink
Merge 318259d into c4f5f05
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed May 8, 2023
2 parents c4f5f05 + 318259d commit 547030c
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 30 deletions.
14 changes: 10 additions & 4 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export async function importBlock(
opts: ImportBlockOpts
): Promise<void> {
const {blockInput, postState, parentBlockSlot, executionStatus} = fullyVerifiedBlock;
const {block} = blockInput;
const {block, serializedData} = blockInput;
const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
const blockRootHex = toHexString(blockRoot);
const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime());
Expand All @@ -60,8 +60,14 @@ export async function importBlock(
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;

// 1. Persist block to hot DB (pre-emptively)

await this.db.block.add(block);
if (serializedData) {
// skip serializing data if we already have it
this.metrics?.importBlock.persistBlockWithSerializedDataCount.inc();
await this.db.block.putBinary(this.db.block.getId(block), serializedData);
} else {
this.metrics?.importBlock.persistBlockNoSerializedDataCount.inc();
await this.db.block.add(block);
}
this.logger.debug("Persisted block to hot DB", {
slot: block.message.slot,
root: blockRootHex,
Expand Down Expand Up @@ -245,7 +251,7 @@ export async function importBlock(
// Only track "recent" blocks. Otherwise sync can distort this metrics heavily.
// We want to track recent blocks coming from gossip, unknown block sync, and API.
if (delaySec < 64 * this.config.SECONDS_PER_SLOT) {
this.metrics.elapsedTimeTillBecomeHead.observe(delaySec);
this.metrics.importBlock.elapsedTimeTillBecomeHead.observe(delaySec);
}
}

Expand Down
10 changes: 5 additions & 5 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {allForks} from "@lodestar/types";
import {WithOptionalBytes, allForks} from "@lodestar/types";
import {toHex} from "@lodestar/utils";
import {JobItemQueue} from "../../util/queue/index.js";
import {Metrics} from "../../metrics/metrics.js";
Expand All @@ -18,10 +18,10 @@ const QUEUE_MAX_LENGTH = 256;
* BlockProcessor processes block jobs in a queued fashion, one after the other.
*/
export class BlockProcessor {
readonly jobQueue: JobItemQueue<[BlockInput[], ImportBlockOpts], void>;
readonly jobQueue: JobItemQueue<[WithOptionalBytes<BlockInput>[], ImportBlockOpts], void>;

constructor(chain: BeaconChain, metrics: Metrics | null, opts: BlockProcessOpts, signal: AbortSignal) {
this.jobQueue = new JobItemQueue<[BlockInput[], ImportBlockOpts], void>(
this.jobQueue = new JobItemQueue<[WithOptionalBytes<BlockInput>[], ImportBlockOpts], void>(
(job, importOpts) => {
return processBlocks.call(chain, job, {...opts, ...importOpts});
},
Expand All @@ -30,7 +30,7 @@ export class BlockProcessor {
);
}

async processBlocksJob(job: BlockInput[], opts: ImportBlockOpts = {}): Promise<void> {
async processBlocksJob(job: WithOptionalBytes<BlockInput>[], opts: ImportBlockOpts = {}): Promise<void> {
await this.jobQueue.push(job, opts);
}
}
Expand All @@ -47,7 +47,7 @@ export class BlockProcessor {
*/
export async function processBlocks(
this: BeaconChain,
blocks: BlockInput[],
blocks: WithOptionalBytes<BlockInput>[],
opts: BlockProcessOpts & ImportBlockOpts
): Promise<void> {
if (blocks.length === 0) {
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {CachedBeaconStateAllForks, computeEpochAtSlot} from "@lodestar/state-transition";
import {MaybeValidExecutionStatus} from "@lodestar/fork-choice";
import {allForks, deneb, Slot} from "@lodestar/types";
import {allForks, deneb, Slot, WithOptionalBytes} from "@lodestar/types";
import {ForkSeq, MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";

Expand Down Expand Up @@ -92,7 +92,7 @@ export type ImportBlockOpts = {
* A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and ready to import
*/
export type FullyVerifiedBlock = {
blockInput: BlockInput;
blockInput: WithOptionalBytes<BlockInput>;
postState: CachedBeaconStateAllForks;
parentBlockSlot: Slot;
proposerBalanceDelta: number;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {ChainForkConfig} from "@lodestar/config";
import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {Slot} from "@lodestar/types";
import {Slot, WithOptionalBytes} from "@lodestar/types";
import {toHexString} from "@lodestar/utils";
import {IClock} from "../../util/clock.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
Expand All @@ -21,14 +21,14 @@ import {BlockInput, ImportBlockOpts} from "./types.js";
*/
export function verifyBlocksSanityChecks(
chain: {forkChoice: IForkChoice; clock: IClock; config: ChainForkConfig},
blocks: BlockInput[],
blocks: WithOptionalBytes<BlockInput>[],
opts: ImportBlockOpts
): {relevantBlocks: BlockInput[]; parentSlots: Slot[]; parentBlock: ProtoBlock | null} {
): {relevantBlocks: WithOptionalBytes<BlockInput>[]; parentSlots: Slot[]; parentBlock: ProtoBlock | null} {
if (blocks.length === 0) {
throw Error("Empty partiallyVerifiedBlocks");
}

const relevantBlocks: BlockInput[] = [];
const relevantBlocks: WithOptionalBytes<BlockInput>[] = [];
const parentSlots: Slot[] = [];
let parentBlock: ProtoBlock | null = null;

Expand Down
18 changes: 15 additions & 3 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,19 @@ import {
PubkeyIndexMap,
} from "@lodestar/state-transition";
import {BeaconConfig} from "@lodestar/config";
import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, deneb, Wei} from "@lodestar/types";
import {
allForks,
UintNum64,
Root,
phase0,
Slot,
RootHex,
Epoch,
ValidatorIndex,
deneb,
Wei,
WithOptionalBytes,
} from "@lodestar/types";
import {CheckpointWithHex, ExecutionStatus, IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {ProcessShutdownCallback} from "@lodestar/validator";
import {Logger, pruneSetToMax, toHex} from "@lodestar/utils";
Expand Down Expand Up @@ -450,11 +462,11 @@ export class BeaconChain implements IBeaconChain {
return blobsSidecar;
}

async processBlock(block: BlockInput, opts?: ImportBlockOpts): Promise<void> {
async processBlock(block: WithOptionalBytes<BlockInput>, opts?: ImportBlockOpts): Promise<void> {
return this.blockProcessor.processBlocksJob([block], opts);
}

async processChainSegment(blocks: BlockInput[], opts?: ImportBlockOpts): Promise<void> {
async processChainSegment(blocks: WithOptionalBytes<BlockInput>[], opts?: ImportBlockOpts): Promise<void> {
return this.blockProcessor.processBlocksJob(blocks, opts);
}

Expand Down
18 changes: 15 additions & 3 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex, deneb, Wei} from "@lodestar/types";
import {
allForks,
UintNum64,
Root,
phase0,
Slot,
RootHex,
Epoch,
ValidatorIndex,
deneb,
Wei,
WithOptionalBytes,
} from "@lodestar/types";
import {CachedBeaconStateAllForks, Index2PubkeyCache, PubkeyIndexMap} from "@lodestar/state-transition";
import {BeaconConfig} from "@lodestar/config";
import {CompositeTypeAny, TreeView, Type} from "@chainsafe/ssz";
Expand Down Expand Up @@ -122,9 +134,9 @@ export interface IBeaconChain {
produceBlindedBlock(blockAttributes: BlockAttributes): Promise<{block: allForks.BlindedBeaconBlock; blockValue: Wei}>;

/** Process a block until complete */
processBlock(block: BlockInput, opts?: ImportBlockOpts): Promise<void>;
processBlock(block: WithOptionalBytes<BlockInput>, opts?: ImportBlockOpts): Promise<void>;
/** Process a chain of blocks until complete */
processChainSegment(blocks: BlockInput[], opts?: ImportBlockOpts): Promise<void>;
processChainSegment(blocks: WithOptionalBytes<BlockInput>[], opts?: ImportBlockOpts): Promise<void>;

getStatus(): phase0.Status;

Expand Down
20 changes: 15 additions & 5 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,21 @@ export function createLodestarMetrics(
buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4],
}),
},
elapsedTimeTillBecomeHead: register.histogram({
name: "lodestar_gossip_block_elapsed_time_till_become_head",
help: "Time elapsed between block slot time and the time block becomes head",
buckets: [0.5, 1, 2, 4, 6, 12],
}),
importBlock: {
persistBlockNoSerializedDataCount: register.gauge({
name: "lodestar_import_block_persist_block_no_serialized_data_count",
help: "Count persisting block with no serialized data",
}),
persistBlockWithSerializedDataCount: register.gauge({
name: "lodestar_import_block_persist_block_with_serialized_data_count",
help: "Count persisting block with serialized data",
}),
elapsedTimeTillBecomeHead: register.histogram({
name: "lodestar_gossip_block_elapsed_time_till_become_head",
help: "Time elapsed between block slot time and the time block becomes head",
buckets: [0.5, 1, 2, 4, 6, 12],
}),
},
engineNotifyNewPayloadResult: register.gauge<"result">({
name: "lodestar_execution_engine_notify_new_payload_result_total",
help: "The total result of calling notifyNewPayload execution engine api",
Expand Down
12 changes: 8 additions & 4 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {peerIdFromString} from "@libp2p/peer-id";
import {toHexString} from "@chainsafe/ssz";
import {BeaconConfig} from "@lodestar/config";
import {Logger, prettyBytes} from "@lodestar/utils";
import {Root, Slot, ssz} from "@lodestar/types";
import {Root, Slot, ssz, WithBytes} from "@lodestar/types";
import {ForkName, ForkSeq} from "@lodestar/params";
import {Metrics} from "../../metrics/index.js";
import {OpSource} from "../../metrics/validatorMonitor.js";
Expand Down Expand Up @@ -124,7 +124,11 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
}
}

function handleValidBeaconBlock(blockInput: BlockInput, peerIdStr: string, seenTimestampSec: number): void {
function handleValidBeaconBlock(
blockInput: WithBytes<BlockInput>,
peerIdStr: string,
seenTimestampSec: number
): void {
const signedBlock = blockInput.block;

// Handler - MUST NOT `await`, to allow validation result to be propagated
Expand Down Expand Up @@ -178,7 +182,7 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH

const blockInput = getBlockInput.preDeneb(config, signedBlock);
await validateBeaconBlock(blockInput, topic.fork, peerIdStr, seenTimestampSec);
handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec);
handleValidBeaconBlock({...blockInput, serializedData}, peerIdStr, seenTimestampSec);
},

[GossipType.beacon_block_and_blobs_sidecar]: async ({serializedData}, topic, peerIdStr, seenTimestampSec) => {
Expand All @@ -193,7 +197,7 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
const blockInput = getBlockInput.postDeneb(config, beaconBlock, blobsSidecar);
await validateBeaconBlock(blockInput, topic.fork, peerIdStr, seenTimestampSec);
validateGossipBlobsSidecar(beaconBlock, blobsSidecar);
handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec);
handleValidBeaconBlock({...blockInput, serializedData}, peerIdStr, seenTimestampSec);
},

[GossipType.beacon_aggregate_and_proof]: async ({serializedData}, topic, _peer, seenTimestampSec) => {
Expand Down
2 changes: 2 additions & 0 deletions packages/types/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ export enum BlockSource {

export type SlotRootHex = {slot: Slot; root: RootHex};
export type SlotOptionalRoot = {slot: Slot; root?: RootHex};
export type WithBytes<T extends Record<string, unknown>> = T & {serializedData: Uint8Array};
export type WithOptionalBytes<T extends Record<string, unknown>> = T & {serializedData?: Uint8Array};

0 comments on commit 547030c

Please sign in to comment.