Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pull block from peers after a cutoff if corresponding gossip blobs are seen #6534

Merged
merged 3 commits into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 8 additions & 7 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {CachedBeaconStateAllForks, computeEpochAtSlot, DataAvailableStatus} from "@lodestar/state-transition";
import {MaybeValidExecutionStatus} from "@lodestar/fork-choice";
import {allForks, deneb, Slot} from "@lodestar/types";
import {allForks, deneb, Slot, RootHex} from "@lodestar/types";
import {ForkSeq} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";

Expand All @@ -25,17 +25,18 @@ export enum GossipedInputType {

export type BlobsCache = Map<number, {blobSidecar: deneb.BlobSidecar; blobBytes: Uint8Array | null}>;
export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]};
type CachedBlobs = {
blobsCache: BlobsCache;
availabilityPromise: Promise<BlockInputBlobs>;
resolveAvailability: (blobs: BlockInputBlobs) => void;
};

export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & (
| {type: BlockInputType.preDeneb}
| ({type: BlockInputType.postDeneb} & BlockInputBlobs)
| {
type: BlockInputType.blobsPromise;
blobsCache: BlobsCache;
availabilityPromise: Promise<BlockInputBlobs>;
resolveAvailability: (blobs: BlockInputBlobs) => void;
}
| ({type: BlockInputType.blobsPromise} & CachedBlobs)
);
export type NullBlockInput = {block: null; blockRootHex: RootHex; blockInputPromise: Promise<BlockInput>} & CachedBlobs;

export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clockSlot: Slot): boolean {
return (
Expand Down
82 changes: 58 additions & 24 deletions packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {BLOBSIDECAR_FIXED_SIZE, ForkSeq} from "@lodestar/params";

import {
BlockInput,
NullBlockInput,
getBlockInput,
BlockSource,
BlockInputBlobs,
Expand All @@ -28,9 +29,12 @@ type BlockInputCacheType = {
block?: allForks.SignedBeaconBlock;
blockBytes?: Uint8Array | null;
blobsCache: BlobsCache;
// promise and its callback cached for delayed resolution
// blobs promise and its callback cached for delayed resolution
availabilityPromise: Promise<BlockInputBlobs>;
resolveAvailability: (blobs: BlockInputBlobs) => void;
// block promise and its callback cached for delayed resolution
blockInputPromise: Promise<BlockInput>;
resolveBlockInput: (blockInput: BlockInput) => void;
};

const MAX_GOSSIPINPUT_CACHE = 5;
Expand Down Expand Up @@ -66,7 +70,10 @@ export class SeenGossipBlockInput {
blockInput: BlockInput;
blockInputMeta: {pending: GossipedInputType.blob | null; haveBlobs: number; expectedBlobs: number};
}
| {blockInput: null; blockInputMeta: {pending: GossipedInputType.block; haveBlobs: number; expectedBlobs: null}} {
| {
blockInput: NullBlockInput;
blockInputMeta: {pending: GossipedInputType.block; haveBlobs: number; expectedBlobs: null};
} {
let blockHex;
let blockCache;

Expand Down Expand Up @@ -98,7 +105,15 @@ export class SeenGossipBlockInput {
this.blockInputCache.set(blockHex, blockCache);
}

const {block: signedBlock, blockBytes, blobsCache, availabilityPromise, resolveAvailability} = blockCache;
const {
block: signedBlock,
blockBytes,
blobsCache,
availabilityPromise,
resolveAvailability,
blockInputPromise,
resolveBlockInput,
} = blockCache;

if (signedBlock !== undefined) {
if (config.getForkSeq(signedBlock.message.slot) < ForkSeq.deneb) {
Expand All @@ -123,28 +138,34 @@ export class SeenGossipBlockInput {
resolveAvailability(allBlobs);
metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.GOSSIP});
const {blobs, blobsBytes} = allBlobs;
const blockInput = getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.gossip,
blobs,
blockBytes ?? null,
blobsBytes
);

resolveBlockInput(blockInput);
return {
blockInput: getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.gossip,
blobs,
blockBytes ?? null,
blobsBytes
),
blockInput,
blockInputMeta: {pending: null, haveBlobs: blobs.length, expectedBlobs: blobKzgCommitments.length},
};
} else {
const blockInput = getBlockInput.blobsPromise(
config,
signedBlock,
BlockSource.gossip,
blobsCache,
blockBytes ?? null,
availabilityPromise,
resolveAvailability
);

resolveBlockInput(blockInput);
return {
blockInput: getBlockInput.blobsPromise(
config,
signedBlock,
BlockSource.gossip,
blobsCache,
blockBytes ?? null,
availabilityPromise,
resolveAvailability
),
blockInput,
blockInputMeta: {
pending: GossipedInputType.blob,
haveBlobs: blobsCache.size,
Expand All @@ -155,23 +176,36 @@ export class SeenGossipBlockInput {
} else {
// will need to wait for the block to showup
return {
blockInput: null,
blockInput: {
block: null,
blockRootHex: blockHex,
blobsCache,
availabilityPromise,
resolveAvailability,
blockInputPromise,
},
blockInputMeta: {pending: GossipedInputType.block, haveBlobs: blobsCache.size, expectedBlobs: null},
};
}
}
}

function getEmptyBlockInputCacheEntry(): BlockInputCacheType {
// Capture both the promise and its callbacks.
// Capture both the promise and its callbacks for blockInput and final availability
// It is not spec'ed but in tests in Firefox and NodeJS the promise constructor is run immediately
let resolveBlockInput: ((block: BlockInput) => void) | null = null;
const blockInputPromise = new Promise<BlockInput>((resolveCB) => {
resolveBlockInput = resolveCB;
});

let resolveAvailability: ((blobs: BlockInputBlobs) => void) | null = null;
const availabilityPromise = new Promise<BlockInputBlobs>((resolveCB) => {
resolveAvailability = resolveCB;
});
if (resolveAvailability === null) {

if (resolveAvailability === null || resolveBlockInput === null) {
throw Error("Promise Constructor was not executed immediately");
}
const blobsCache = new Map();
return {availabilityPromise, resolveAvailability, blobsCache};
return {blockInputPromise, resolveBlockInput, availabilityPromise, resolveAvailability, blobsCache};
}
4 changes: 2 additions & 2 deletions packages/beacon-node/src/network/events.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {EventEmitter} from "events";
import {PeerId, TopicValidatorResult} from "@libp2p/interface";
import {phase0, RootHex} from "@lodestar/types";
import {BlockInput} from "../chain/blocks/types.js";
import {BlockInput, NullBlockInput} from "../chain/blocks/types.js";
import {StrictEventEmitterSingleArg} from "../util/strictEvents.js";
import {PeerIdStr} from "../util/peerId.js";
import {EventDirection} from "../util/workerEvents.js";
Expand Down Expand Up @@ -32,7 +32,7 @@ export type NetworkEventData = {
[NetworkEvent.reqRespRequest]: {request: RequestTypedContainer; peer: PeerId};
[NetworkEvent.unknownBlockParent]: {blockInput: BlockInput; peer: PeerIdStr};
[NetworkEvent.unknownBlock]: {rootHex: RootHex; peer?: PeerIdStr};
[NetworkEvent.unknownBlockInput]: {blockInput: BlockInput; peer?: PeerIdStr};
[NetworkEvent.unknownBlockInput]: {blockInput: BlockInput | NullBlockInput; peer?: PeerIdStr};
[NetworkEvent.pendingGossipsubMessage]: PendingGossipsubMessage;
[NetworkEvent.gossipMessageValidationResult]: {
msgId: string;
Expand Down
84 changes: 65 additions & 19 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import {toHexString} from "@chainsafe/ssz";
import {BeaconConfig} from "@lodestar/config";
import {BeaconConfig, ChainForkConfig} from "@lodestar/config";
import {LogLevel, Logger, prettyBytes} from "@lodestar/utils";
import {Root, Slot, ssz, allForks, deneb} from "@lodestar/types";
import {Root, Slot, ssz, allForks, deneb, UintNum64} from "@lodestar/types";
import {ForkName, ForkSeq} from "@lodestar/params";
import {routes} from "@lodestar/api";
import {computeTimeAtSlot} from "@lodestar/state-transition";
import {Metrics} from "../../metrics/index.js";
import {OpSource} from "../../metrics/validatorMonitor.js";
import {
Expand Down Expand Up @@ -45,7 +46,13 @@ import {PeerAction} from "../peers/index.js";
import {validateLightClientFinalityUpdate} from "../../chain/validation/lightClientFinalityUpdate.js";
import {validateLightClientOptimisticUpdate} from "../../chain/validation/lightClientOptimisticUpdate.js";
import {validateGossipBlobSidecar} from "../../chain/validation/blobSidecar.js";
import {BlockInput, GossipedInputType, BlobSidecarValidation, BlockInputType} from "../../chain/blocks/types.js";
import {
BlockInput,
GossipedInputType,
BlobSidecarValidation,
BlockInputType,
NullBlockInput,
} from "../../chain/blocks/types.js";
import {sszDeserialize} from "../gossip/topic.js";
import {INetworkCore} from "../core/index.js";
import {INetwork} from "../interface.js";
Expand Down Expand Up @@ -73,6 +80,7 @@ export type ValidatorFnsModules = {
};

const MAX_UNKNOWN_BLOCK_ROOT_RETRIES = 1;
const BLOCK_AVAILABILITY_CUTOFF_MS = 3_000;

/**
* Gossip handlers perform validation + handling in a single function.
Expand Down Expand Up @@ -129,7 +137,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
);
const blockInput = blockInputRes.blockInput;
// blockInput can't be returned null, improve by enforcing via return types
if (blockInput === null) {
if (blockInput.block === null) {
throw Error(
`Invalid null blockInput returned by getGossipBlockInput for type=${GossipedInputType.block} blockHex=${blockHex} slot=${slot}`
);
Expand Down Expand Up @@ -182,7 +190,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
gossipIndex: number,
peerIdStr: string,
seenTimestampSec: number
): Promise<BlockInput | null> {
): Promise<BlockInput | NullBlockInput> {
const blobBlockHeader = blobSidecar.signedBlockHeader.message;
const slot = blobBlockHeader.slot;
const blockRoot = ssz.phase0.BeaconBlockHeader.hashTreeRoot(blobBlockHeader);
Expand Down Expand Up @@ -226,7 +234,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
} catch (e) {
if (e instanceof BlobSidecarGossipError) {
// Don't trigger this yet if full block and blobs haven't arrived yet
if (e.type.code === BlobSidecarErrorCode.PARENT_UNKNOWN && blockInput !== null) {
if (e.type.code === BlobSidecarErrorCode.PARENT_UNKNOWN && blockInput.block !== null) {
logger.debug("Gossip blob has error", {slot, root: blockHex, code: e.type.code});
events.emit(NetworkEvent.unknownBlockParent, {blockInput, peer: peerIdStr});
}
Expand All @@ -252,7 +260,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
metrics?.registerBeaconBlock(OpSource.gossip, seenTimestampSec, signedBlock.message);
// if blobs are not yet fully available start an aggressive blob pull
if (blockInput.type === BlockInputType.blobsPromise) {
events.emit(NetworkEvent.unknownBlockInput, {blockInput: blockInput, peer: peerIdStr});
events.emit(NetworkEvent.unknownBlockInput, {blockInput, peer: peerIdStr});
}

chain
Expand Down Expand Up @@ -351,7 +359,10 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
}: GossipHandlerParamGeneric<GossipType.blob_sidecar>) => {
const {serializedData} = gossipData;
const blobSidecar = sszDeserialize(topic, serializedData);
if (config.getForkSeq(blobSidecar.signedBlockHeader.message.slot) < ForkSeq.deneb) {
const blobSlot = blobSidecar.signedBlockHeader.message.slot;
const index = blobSidecar.index;

if (config.getForkSeq(blobSlot) < ForkSeq.deneb) {
throw new GossipActionError(GossipAction.REJECT, {code: "PRE_DENEB_BLOCK"});
}
const blockInput = await validateBeaconBlob(
Expand All @@ -361,20 +372,39 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
peerIdStr,
seenTimestampSec
);
if (blockInput !== null) {
// TODO DENEB:
//
// With blobsPromise the block import would have been attempted with the receipt of the block gossip
// and should have resolved the availability promise, however we could track if the block processing
// was halted and requeue it
if (blockInput.block !== null) {
// we can just queue up the blockInput in the processor, but block gossip handler would have already
// queued it up.
//
// handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec);
} else {
// TODO DENEB:
//
// If block + blobs not fully received in the slot within some deadline, we should trigger block/blob
// pull using req/resp by root pre-emptively even though it will be trigged on seeing any block/blob
// gossip on next slot via missing parent checks
// wait for the block to arrive till some cutoff else emit unknownBlockInput event
chain.logger.debug("Block not yet available, racing with cutoff", {blobSlot, index});
const normalBlockInput = await raceWithCutoff(
chain,
blobSlot,
blockInput.blockInputPromise,
BLOCK_AVAILABILITY_CUTOFF_MS
).catch((_e) => {
return null;
});

if (normalBlockInput !== null) {
chain.logger.debug("Block corresponding to blob is now available for processing", {blobSlot, index});
// we can directly send it for processing but block gossip handler will queue it up anyway
// if we see any issues later, we can send it to handleValidBeaconBlock
//
// handleValidBeaconBlock(normalBlockInput, peerIdStr, seenTimestampSec);
//
// however we can emit the event which will atleast add the peer to the list of peers to pull
// data from
if (normalBlockInput.type === BlockInputType.blobsPromise) {
events.emit(NetworkEvent.unknownBlockInput, {blockInput: normalBlockInput, peer: peerIdStr});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after block comes and there are not enough blobs, we already emit this event in handleValidBeaconBlock(), then we also emit it here. Is this a duplicate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is a duplicate aimed to add this peer to the peers for unknown blobs pull

}
} else {
chain.logger.debug("Block not available till BLOCK_AVAILABILITY_CUTOFF_MS", {blobSlot, index});
events.emit(NetworkEvent.unknownBlockInput, {blockInput, peer: peerIdStr});
}
}
},

Expand Down Expand Up @@ -735,3 +765,19 @@ export async function validateGossipFnRetryUnknownRoot<T>(
}
}
}

async function raceWithCutoff<T>(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger},
blockSlot: Slot,
availabilityPromise: Promise<T>,
cutoffMsFromSlotStart: number
): Promise<T> {
const cutoffTimeMs = Math.max(
computeTimeAtSlot(chain.config, blockSlot, chain.genesisTime) * 1000 + cutoffMsFromSlotStart - Date.now(),
0
);
const cutoffTimeout = new Promise((_resolve, reject) => setTimeout(reject, cutoffTimeMs));
await Promise.race([availabilityPromise, cutoffTimeout]);
// we can only be here if availabilityPromise has resolved else an error will be thrown
return availabilityPromise;
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import {fromHexString} from "@chainsafe/ssz";
import {ChainForkConfig} from "@lodestar/config";
import {phase0, deneb} from "@lodestar/types";
import {ForkSeq} from "@lodestar/params";
import {BlockInput, BlockInputType, BlockSource, getBlockInputBlobs, getBlockInput} from "../../chain/blocks/types.js";
import {
BlockInput,
BlockInputType,
BlockSource,
getBlockInputBlobs,
getBlockInput,
NullBlockInput,
} from "../../chain/blocks/types.js";
import {PeerIdStr} from "../../util/peerId.js";
import {INetwork} from "../interface.js";
import {BlockInputAvailabilitySource} from "../../chain/seenCache/seenGossipBlockInput.js";
Expand Down Expand Up @@ -46,16 +54,26 @@ export async function unavailableBeaconBlobsByRoot(
config: ChainForkConfig,
network: INetwork,
peerId: PeerIdStr,
unavailableBlockInput: BlockInput,
unavailableBlockInput: BlockInput | NullBlockInput,
metrics: Metrics | null
): Promise<BlockInput> {
if (unavailableBlockInput.type !== BlockInputType.blobsPromise) {
return unavailableBlockInput;
if (unavailableBlockInput.block !== null && unavailableBlockInput.type !== BlockInputType.blobsPromise) {
return unavailableBlockInput as BlockInput;
}

const blobIdentifiers: deneb.BlobIdentifier[] = [];
const {block, blobsCache, resolveAvailability, blockBytes} = unavailableBlockInput;
// resolve the block if thats unavailable
let block, blobsCache, blockBytes, resolveAvailability;
if (unavailableBlockInput.block === null) {
const allBlocks = await network.sendBeaconBlocksByRoot(peerId, [fromHexString(unavailableBlockInput.blockRootHex)]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now the block source lodestar_import_block_by_source_total is very tricky, it could be wrong for now

for block, we want to know if it comes from gossip, or this function, or the above beaconBlocksMaybeBlobsByRoot() function

same for blobs, we want to know % of them coming from 3 different places as above

if it's too much for this PR please have a follow up PR or track in another issue @g11tech

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes lets do a followup on this

block = allBlocks[0].data;
blockBytes = allBlocks[0].bytes;
({blobsCache, resolveAvailability} = unavailableBlockInput);
} else {
({block, blobsCache, resolveAvailability, blockBytes} = unavailableBlockInput);
}

// resolve missing blobs
const blobIdentifiers: deneb.BlobIdentifier[] = [];
const slot = block.message.slot;
const blockRoot = config.getForkTypes(slot).BeaconBlock.hashTreeRoot(block.message);

Expand Down