Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Altair: Database for SyncCommittee #2377

Merged
merged 1 commit into from Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/db/src/schema.ts
Expand Up @@ -53,6 +53,8 @@ export enum Bucket {
altair_blockArchive = 28, // Slot -> altair.SignedBeaconBlock
altair_pendingBlock = 29, // Slot -> altair.SignedBeaconBlock
altair_stateArchive = 30, // Slot -> altair.BeaconState
altair_syncCommitteeSignature = 31, // Root => SyncCommitteeSignature
altair_contributionAndProof = 32, // Root => ContributionAndProof
}

export enum Key {
Expand Down
9 changes: 9 additions & 0 deletions packages/lodestar/src/db/beacon.ts
Expand Up @@ -22,6 +22,8 @@ import {
import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single";
import {SeenAttestationCache} from "./seenAttestationCache";
import {PendingBlockRepository} from "./repositories/pendingBlock";
import {SyncCommitteeSignatureRepository} from "./repositories/syncCommitteeSignature";
import {ContributionAndProofRepository} from "./repositories/contributionAndProof";

export class BeaconDb extends DatabaseService implements IBeaconDb {
badBlock: BadBlockRepository;
Expand All @@ -43,6 +45,10 @@ export class BeaconDb extends DatabaseService implements IBeaconDb {
preGenesisState: PreGenesisState;
preGenesisStateLastProcessedBlock: PreGenesisStateLastProcessedBlock;

// altair
syncCommitteeSignature: SyncCommitteeSignatureRepository;
contributionAndProof: ContributionAndProofRepository;

constructor(opts: IDatabaseApiOptions) {
super(opts);
// Warning: If code is ever run in the constructor, must change this stub to not extend 'packages/lodestar/test/utils/stub/beaconDb.ts' -
Expand All @@ -62,6 +68,9 @@ export class BeaconDb extends DatabaseService implements IBeaconDb {
this.eth1Data = new Eth1DataRepository(this.config, this.db);
this.preGenesisState = new PreGenesisState(this.config, this.db);
this.preGenesisStateLastProcessedBlock = new PreGenesisStateLastProcessedBlock(this.config, this.db);
// altair
this.syncCommitteeSignature = new SyncCommitteeSignatureRepository(this.config, this.db);
this.contributionAndProof = new ContributionAndProofRepository(this.config, this.db);
}

/**
Expand Down
6 changes: 6 additions & 0 deletions packages/lodestar/src/db/interface.ts
Expand Up @@ -21,6 +21,8 @@ import {
import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single";
import {SeenAttestationCache} from "./seenAttestationCache";
import {PendingBlockRepository} from "./repositories/pendingBlock";
import {SyncCommitteeSignatureRepository} from "./repositories/syncCommitteeSignature";
import {ContributionAndProofRepository} from "./repositories/contributionAndProof";

/**
* The DB service manages the data layer of the beacon chain
Expand Down Expand Up @@ -62,6 +64,10 @@ export interface IBeaconDb {
depositDataRoot: DepositDataRootRepository;
eth1Data: Eth1DataRepository;

// altair
syncCommitteeSignature: SyncCommitteeSignatureRepository;
contributionAndProof: ContributionAndProofRepository;

processBlockOperations(signedBlock: phase0.SignedBeaconBlock): Promise<void>;

/**
Expand Down
29 changes: 29 additions & 0 deletions packages/lodestar/src/db/repositories/contributionAndProof.ts
@@ -0,0 +1,29 @@
import {computeStartSlotAtEpoch} from "@chainsafe/lodestar-beacon-state-transition";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {Bucket, IDatabaseController, Repository} from "@chainsafe/lodestar-db";
import {altair, phase0} from "@chainsafe/lodestar-types";

/**
* Repository for ContributionAndProof.
* Added via gossip or api.
* Removed when it's old.
*/
export class ContributionAndProofRepository extends Repository<Uint8Array, altair.ContributionAndProof> {
constructor(config: IBeaconConfig, db: IDatabaseController<Buffer, Buffer>) {
super(config, db, Bucket.altair_contributionAndProof, config.types.altair.ContributionAndProof);
}

/**
* Id is hashTreeRoot of SyncCommitteeContribution
*/
getId(value: altair.ContributionAndProof): Uint8Array {
return this.config.types.altair.SyncCommitteeContribution.hashTreeRoot(value.contribution);
}

async pruneFinalized(finalizedEpoch: phase0.Epoch): Promise<void> {
const finalizedEpochStartSlot = computeStartSlotAtEpoch(this.config, finalizedEpoch);
const entries = await this.entries();
const idsToDelete = entries.filter((e) => e.value.contribution.slot < finalizedEpochStartSlot).map((e) => e.key);
await this.batchDelete(idsToDelete);
}
}
57 changes: 57 additions & 0 deletions packages/lodestar/src/db/repositories/syncCommitteeSignature.ts
@@ -0,0 +1,57 @@
import {computeStartSlotAtEpoch} from "@chainsafe/lodestar-beacon-state-transition";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {Bucket, IDatabaseController, Repository} from "@chainsafe/lodestar-db";
import {fromHexString, toHexString} from "@chainsafe/ssz";
import {altair, phase0} from "@chainsafe/lodestar-types";
import {notNullish} from "@chainsafe/lodestar-utils";

/**
* Repository for SyncCommitteeSignature.
* Added via gossip or api.
* Removed when it's old.
*/
export class SyncCommitteeSignatureRepository extends Repository<Uint8Array, altair.SyncCommitteeSignature> {
// index to produce SyncCommitteeContribution
// for each slot, index SyncCommitteeSignature roots by block root hex
private contributionIndex = new Map<phase0.Slot, Map<string, string[]>>();
constructor(config: IBeaconConfig, db: IDatabaseController<Buffer, Buffer>) {
super(config, db, Bucket.altair_syncCommitteeSignature, config.types.altair.SyncCommitteeSignature);
}

async put(key: Uint8Array, value: altair.SyncCommitteeSignature): Promise<void> {
const blockRootHex = toHexString(value.beaconBlockRoot);
const slot = value.slot;
const rootHex = toHexString(key);
let slotIndex = this.contributionIndex.get(slot);
if (!slotIndex) {
slotIndex = new Map();
this.contributionIndex.set(slot, slotIndex);
}
if (!slotIndex.get(blockRootHex)) {
slotIndex.set(blockRootHex, [rootHex]);
} else {
const rootHexes = new Set([rootHex, ...(slotIndex.get(blockRootHex) || [])]);
slotIndex.set(blockRootHex, Array.from(rootHexes));
}
await super.put(key, value);
}

async getByBlock(slot: phase0.Slot, blockRootHex: phase0.Root): Promise<altair.SyncCommitteeSignature[]> {
const rootsByBlockRoot = this.contributionIndex.get(slot);
if (!rootsByBlockRoot) return [];
const rootHexes = rootsByBlockRoot.get(toHexString(blockRootHex));
if (!rootHexes) return [];
const syncCommitteeSignatures = await Promise.all(rootHexes.map((rootHex) => this.get(fromHexString(rootHex))));
return syncCommitteeSignatures.filter(notNullish);
}

async pruneFinalized(finalizedEpoch: phase0.Epoch): Promise<void> {
const finalizedEpochStartSlot = computeStartSlotAtEpoch(this.config, finalizedEpoch);
const entries = await this.entries();
const idsToDelete = entries.filter((e) => e.value.slot < finalizedEpochStartSlot).map((e) => e.key);
await this.batchDelete(idsToDelete);
for (const slot of this.contributionIndex.keys()) {
if (slot < finalizedEpochStartSlot) this.contributionIndex.delete(slot);
}
}
}
13 changes: 8 additions & 5 deletions packages/lodestar/src/tasks/index.ts
Expand Up @@ -86,16 +86,19 @@ export class TasksService {
// should be after ArchiveBlocksTask to handle restart cleanly
await this.statesArchiver.maybeArchiveState(finalized);

const finalizedEpoch = finalized.epoch;
await Promise.all([
this.chain.checkpointStateCache.pruneFinalized(finalized.epoch),
this.chain.stateCache.deleteAllBeforeEpoch(finalized.epoch),
this.db.attestation.pruneFinalized(finalized.epoch),
this.db.aggregateAndProof.pruneFinalized(finalized.epoch),
this.chain.checkpointStateCache.pruneFinalized(finalizedEpoch),
this.chain.stateCache.deleteAllBeforeEpoch(finalizedEpoch),
this.db.attestation.pruneFinalized(finalizedEpoch),
this.db.aggregateAndProof.pruneFinalized(finalizedEpoch),
this.db.syncCommitteeSignature.pruneFinalized(finalizedEpoch),
this.db.contributionAndProof.pruneFinalized(finalizedEpoch),
]);

// tasks rely on extended fork choice
this.chain.forkChoice.prune(finalized.root);
this.logger.verbose("Finish processing finalized checkpoint", {epoch: finalized.epoch});
this.logger.verbose("Finish processing finalized checkpoint", {epoch: finalizedEpoch});
} catch (e) {
this.logger.error("Error processing finalized checkpoint", {epoch: finalized.epoch}, e);
}
Expand Down