Skip to content

Commit

Permalink
feat: implement peerDAS on electra
Browse files Browse the repository at this point in the history
add some presets

add further params and types

add data column to types repo and network

move to max request data columns to preset

add the datacolumns data in blockinput and fix breaking errors in seen gossip blockinput

handle data columns in gossip and the seengossip

further propagate forkaware blockdata and resolve build/type issues

further handle datacolumns sync by range by root and forkaware data handling

fix issues

chore: update c-kzg to peerDas version

feat: add peerDas ckzg functions to interface

fix the lookups

handle the publishing flow

various sync try fixes

fixes

compute blob side car

various misl debuggings and fixes

debug and apply fixes and get range and by root sync to work will full custody

enable syncing with lower custody requirement

use node peerid rather than a dummy string

get and use the nodeid from enr and correctly compute subnets and column indexes

filterout and connect to peers only matching out custody requiremnt

try adding custody requirement

add protection for subnet calc

get the sync working with devnet 0

correctly set the enr with custody subnet info

rebase fixes
  • Loading branch information
g11tech committed Jun 2, 2024
1 parent 8a26fac commit 99671a2
Show file tree
Hide file tree
Showing 70 changed files with 1,967 additions and 236 deletions.
2 changes: 1 addition & 1 deletion packages/beacon-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
"@lodestar/utils": "^1.18.1",
"@lodestar/validator": "^1.18.1",
"@multiformats/multiaddr": "^12.1.3",
"c-kzg": "^2.1.2",
"c-kzg": "matthewkeil/c-kzg-4844#67bf9367817f0fa5ebd390aeb8c3ae88bdbc170e",
"datastore-core": "^9.1.1",
"datastore-level": "^10.1.1",
"deepmerge": "^4.3.1",
Expand Down
50 changes: 39 additions & 11 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import {fromHexString, toHexString} from "@chainsafe/ssz";
import {routes, ServerApi, ResponseFormat} from "@lodestar/api";
import {computeEpochAtSlot, computeTimeAtSlot, reconstructFullBlockOrContents} from "@lodestar/state-transition";
import {SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
import {SLOTS_PER_HISTORICAL_ROOT, ForkName} from "@lodestar/params";
import {sleep, toHex} from "@lodestar/utils";
import {allForks, deneb, isSignedBlockContents, ProducedBlockSource} from "@lodestar/types";
import {allForks, deneb, electra, isSignedBlockContents, ProducedBlockSource} from "@lodestar/types";
import {
BlockSource,
getBlockInput,
ImportBlockOpts,
BlockInput,
BlobsSource,
BlockInputDataBlobs,
BlockInputDataDataColumns,
DataColumnsSource,
BlockInputData,
} from "../../../../chain/blocks/types.js";
import {promiseAllMaybeAsync} from "../../../../util/promises.js";
import {isOptimisticBlock} from "../../../../util/forkChoice.js";
import {computeBlobSidecars} from "../../../../util/blobs.js";
import {computeBlobSidecars, computeDataColumnSidecars} from "../../../../util/blobs.js";
import {BlockError, BlockErrorCode, BlockGossipError} from "../../../../chain/errors/index.js";
import {OpSource} from "../../../../metrics/validatorMonitor.js";
import {NetworkEvent} from "../../../../network/index.js";
Expand Down Expand Up @@ -49,17 +52,40 @@ export function getBeaconBlockApi({
opts: PublishBlockOpts = {}
) => {
const seenTimestampSec = Date.now() / 1000;
let blockForImport: BlockInput, signedBlock: allForks.SignedBeaconBlock, blobSidecars: deneb.BlobSidecars;
let blockForImport: BlockInput,
signedBlock: allForks.SignedBeaconBlock,
blobSidecars: deneb.BlobSidecars,
dataColumnSidecars: electra.DataColumnSidecars;

if (isSignedBlockContents(signedBlockOrContents)) {
({signedBlock} = signedBlockOrContents);
blobSidecars = computeBlobSidecars(config, signedBlock, signedBlockOrContents);
const blockData = {
fork: config.getForkName(signedBlock.message.slot),
blobs: blobSidecars,
blobsSource: BlobsSource.api,
blobsBytes: blobSidecars.map(() => null),
} as BlockInputDataBlobs;
const fork = config.getForkName(signedBlock.message.slot);
let blockData: BlockInputData;
if (fork === ForkName.electra) {
dataColumnSidecars = computeDataColumnSidecars(config, signedBlock, signedBlockOrContents);
blockData = {
fork,
numColumns: dataColumnSidecars.length,
// custodyColumns is a 1 based index of ith column present in dataColumns[custodyColumns[i-1]]
custodyColumns: new Uint8Array(Array.from({length: dataColumnSidecars.length}, (_, j) => 1 + j)),
dataColumns: dataColumnSidecars,
dataColumnsBytes: dataColumnSidecars.map(() => null),
dataColumnsSource: DataColumnsSource.api,
} as BlockInputDataDataColumns;
blobSidecars = [];
} else if (fork === ForkName.deneb) {
blobSidecars = computeBlobSidecars(config, signedBlock, signedBlockOrContents);
blockData = {
fork,
blobs: blobSidecars,
blobsSource: BlobsSource.api,
blobsBytes: blobSidecars.map(() => null),
} as BlockInputDataBlobs;
dataColumnSidecars = [];
} else {
throw Error(`Invalid data fork=${fork} for publish`);
}

blockForImport = getBlockInput.availableData(
config,
signedBlock,
Expand All @@ -71,6 +97,7 @@ export function getBeaconBlockApi({
} else {
signedBlock = signedBlockOrContents;
blobSidecars = [];
dataColumnSidecars = [];
// TODO: Once API supports submitting data as SSZ, replace null with blockBytes
blockForImport = getBlockInput.preData(config, signedBlock, BlockSource.api, null);
}
Expand Down Expand Up @@ -206,6 +233,7 @@ export function getBeaconBlockApi({
// b) they might require more hops to reach recipients in peerDAS kind of setup where
// blobs might need to hop between nodes because of partial subnet subscription
...blobSidecars.map((blobSidecar) => () => network.publishBlobSidecar(blobSidecar)),
...dataColumnSidecars.map((dataColumnSidecar) => () => network.publishDataColumnSidecar(dataColumnSidecar)),
() => network.publishBeaconBlock(signedBlock) as Promise<unknown>,
() =>
// there is no rush to persist block since we published it to gossip anyway
Expand Down
31 changes: 18 additions & 13 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {toHexString} from "@chainsafe/ssz";
import {capella, ssz, allForks, altair} from "@lodestar/types";
import {ForkSeq, INTERVALS_PER_SLOT, MAX_SEED_LOOKAHEAD, SLOTS_PER_EPOCH} from "@lodestar/params";
import {ForkName, ForkSeq, INTERVALS_PER_SLOT, MAX_SEED_LOOKAHEAD, SLOTS_PER_EPOCH} from "@lodestar/params";
import {
CachedBeaconStateAltair,
computeEpochAtSlot,
Expand Down Expand Up @@ -113,18 +113,23 @@ export async function importBlock(
// out of data range blocks and import then in forkchoice although one would not be able to
// attest and propose with such head similar to optimistic sync
if (blockInput.type === BlockInputType.availableData) {
const {blobsSource, blobs} = blockInput.blockData;

this.metrics?.importBlock.blobsBySource.inc({blobsSource});
for (const blobSidecar of blobs) {
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot: blockSlot,
index,
kzgCommitment: toHexString(kzgCommitment),
versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)),
});
const {blockData} = blockInput;
if (blockData.fork === ForkName.deneb) {
const {blobsSource, blobs} = blockData;

this.metrics?.importBlock.blobsBySource.inc({blobsSource});
for (const blobSidecar of blobs) {
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot: blockSlot,
index,
kzgCommitment: toHexString(kzgCommitment),
versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)),
});
}
} else if (blockData.fork === ForkName.electra) {
// TODO peerDAS build and emit the event for the datacolumns
}
}
});
Expand Down
58 changes: 52 additions & 6 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {CachedBeaconStateAllForks, computeEpochAtSlot} from "@lodestar/state-transition";
import {MaybeValidExecutionStatus, DataAvailabilityStatus} from "@lodestar/fork-choice";
import {allForks, deneb, Slot, RootHex} from "@lodestar/types";
import {allForks, deneb, Slot, RootHex, electra, ColumnIndex} from "@lodestar/types";
import {ForkSeq, ForkName} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";

Expand Down Expand Up @@ -28,26 +28,52 @@ export enum BlobsSource {
byRoot = "req_resp_by_root",
}

export enum DataColumnsSource {
gossip = "gossip",
api = "api",
byRange = "req_resp_by_range",
byRoot = "req_resp_by_root",
}

export enum GossipedInputType {
block = "block",
blob = "blob",
dataColumn = "dataColumn",
}

export type BlobsCache = Map<number, {blobSidecar: deneb.BlobSidecar; blobBytes: Uint8Array | null}>;
export type DataColumnsCache = Map<
number,
{dataColumnSidecar: electra.DataColumnSidecar; dataColumnBytes: Uint8Array | null}
>;

type ForkBlobsInfo = {fork: ForkName.deneb};
type BlobsData = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]; blobsSource: BlobsSource};
export type BlockInputDataBlobs = ForkBlobsInfo & BlobsData;
export type BlockInputData = BlockInputDataBlobs;

type BlobsInputCache = {blobsCache: BlobsCache};
export type BlockInputCacheBlobs = ForkBlobsInfo & BlobsInputCache;
type ForkDataColumnsInfo = {fork: ForkName.electra};
type DataColumnsData = {
// marker of that columns are to be custodied
numColumns: number;
custodyColumns: Uint8Array;
dataColumns: electra.DataColumnSidecars;
dataColumnsBytes: (Uint8Array | null)[];
dataColumnsSource: DataColumnsSource;
};
export type BlockInputDataDataColumns = ForkDataColumnsInfo & DataColumnsData;

export type BlockInputData = BlockInputDataBlobs | BlockInputDataDataColumns;

export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]; blobsSource: BlobsSource};
type Availability<T> = {availabilityPromise: Promise<T>; resolveAvailability: (data: T) => void};

type BlobsInputCache = {blobsCache: BlobsCache};
export type BlockInputCacheBlobs = ForkBlobsInfo & BlobsInputCache;
type CachedBlobs = BlobsInputCache & Availability<BlockInputDataBlobs>;
export type CachedData = ForkBlobsInfo & CachedBlobs;

type DataColumnsInputCache = {dataColumnsCache: DataColumnsCache};
type CachedDataColumns = DataColumnsInputCache & Availability<BlockInputDataDataColumns>;

export type CachedData = (ForkBlobsInfo & CachedBlobs) | (ForkDataColumnsInfo & CachedDataColumns);

export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & (
| {type: BlockInputType.preData | BlockInputType.outOfRangeData}
Expand Down Expand Up @@ -157,6 +183,26 @@ export function getBlockInputBlobs(blobsCache: BlobsCache): Omit<BlobsData, "blo
return {blobs, blobsBytes};
}

export function getBlockInputDataColumns(
dataColumnsCache: DataColumnsCache,
columnIndexes: ColumnIndex[]
): Omit<DataColumnsData, "numColumns" | "custodyColumns" | "dataColumnsSource"> {
const dataColumns = [];
const dataColumnsBytes = [];

for (const index of columnIndexes) {
const dataColumnCache = dataColumnsCache.get(index);
if (dataColumnCache === undefined) {
// check if the index is correct as per the custody columns
throw Error(`Missing dataColumnCache at index=${index}`);
}
const {dataColumnSidecar, dataColumnBytes} = dataColumnCache;
dataColumns.push(dataColumnSidecar);
dataColumnsBytes.push(dataColumnBytes);
}
return {dataColumns, dataColumnsBytes};
}

export enum AttestationImportOpt {
Skip,
Force,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,19 @@ import {DataAvailabilityStatus} from "@lodestar/fork-choice";
import {ChainForkConfig} from "@lodestar/config";
import {deneb, UintNum64} from "@lodestar/types";
import {Logger} from "@lodestar/utils";
import {ForkName} from "@lodestar/params";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {validateBlobSidecars} from "../validation/blobSidecar.js";
import {validateDataColumnsSidecars} from "../validation/dataColumnSidecar.js";
import {Metrics} from "../../metrics/metrics.js";
import {BlockInput, BlockInputType, ImportBlockOpts, BlobSidecarValidation, getBlockInput} from "./types.js";
import {
BlockInput,
BlockInputType,
ImportBlockOpts,
BlobSidecarValidation,
getBlockInput,
BlockInputData,
} from "./types.js";

// we can now wait for full 12 seconds because unavailable block sync will try pulling
// the blobs from the network anyway after 500ms of seeing the block
Expand Down Expand Up @@ -88,27 +97,37 @@ async function maybeValidateBlobs(
// run full validation
const {block} = blockInput;
const blockSlot = block.message.slot;

const blobsData =
blockInput.type === BlockInputType.availableData
? blockInput.blockData
: await raceWithCutoff(chain, blockInput, blockInput.cachedData.availabilityPromise);
const {blobs} = blobsData;

const {blobKzgCommitments} = (block as deneb.SignedBeaconBlock).message.body;
const beaconBlockRoot = chain.config.getForkTypes(blockSlot).BeaconBlock.hashTreeRoot(block.message);

// if the blob siddecars have been individually verified then we can skip kzg proof check
// but other checks to match blobs with block data still need to be performed
const skipProofsCheck = opts.validBlobSidecars === BlobSidecarValidation.Individual;
validateBlobSidecars(blockSlot, beaconBlockRoot, blobKzgCommitments, blobs, {skipProofsCheck});
const blockData =
blockInput.type === BlockInputType.availableData
? blockInput.blockData
: await raceWithCutoff(
chain,
blockInput,
blockInput.cachedData.availabilityPromise as Promise<BlockInputData>
);

if (blockData.fork === ForkName.deneb) {
const {blobs} = blockData;

// if the blob siddecars have been individually verified then we can skip kzg proof check
// but other checks to match blobs with block data still need to be performed
const skipProofsCheck = opts.validBlobSidecars === BlobSidecarValidation.Individual;
validateBlobSidecars(blockSlot, beaconBlockRoot, blobKzgCommitments, blobs, {skipProofsCheck});
} else if (blockData.fork === ForkName.electra) {
const {dataColumns} = blockData;
const skipProofsCheck = opts.validBlobSidecars === BlobSidecarValidation.Individual;
// might require numColumns, custodyColumns from blockData as input to below
validateDataColumnsSidecars(blockSlot, beaconBlockRoot, blobKzgCommitments, dataColumns, {skipProofsCheck});
}

const availableBlockInput = getBlockInput.availableData(
chain.config,
blockInput.block,
blockInput.source,
blockInput.blockBytes,
blobsData
blockData
);
return {dataAvailabilityStatus: DataAvailabilityStatus.Available, availableBlockInput: availableBlockInput};
}
Expand Down
Loading

0 comments on commit 99671a2

Please sign in to comment.