Skip to content

Commit

Permalink
Make SyncChain target dynamic
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed May 2, 2021
1 parent d3f8fdc commit 14dc53a
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 39 deletions.
53 changes: 31 additions & 22 deletions packages/lodestar/src/sync/range/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import {PeerAction} from "../../network";
import {ChainSegmentError} from "../../chain/errors";
import {ItTrigger} from "../../util/itTrigger";
import {byteArrayEquals} from "../../util/bytes";
import {PeerMap} from "../../util/peerMap";
import {wrapError} from "../../util/wrapError";
import {RangeSyncType} from "../utils";
import {PeerSet} from "../../util/peerMap";
import {BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH, BATCH_SLOT_OFFSET} from "../constants";
import {Batch, BatchError, BatchErrorCode, BatchMetadata, BatchOpts, BatchStatus} from "./batch";
import {
Expand All @@ -19,6 +20,7 @@ import {
toBeDownloadedStartEpoch,
toArr,
ChainPeersBalancer,
computeMostCommonTarget,
} from "./utils";

export type SyncChainOpts = BatchOpts;
Expand Down Expand Up @@ -57,8 +59,8 @@ export type ChainTarget = {
export class SyncChainStartError extends Error {}

export type SyncChainDebugState = {
targetRoot: string;
targetSlot: number;
targetRoot: string | null;
targetSlot: number | null;
syncType: RangeSyncType;
status: SyncChainStatus;
startEpoch: number;
Expand All @@ -74,15 +76,18 @@ export enum SyncChainStatus {
}

/**
* Dynamic target sync chain. Peers with multiple targets but with the same syncType are added
* through the `addPeer()` hook.
*
* A chain of blocks that need to be downloaded. Peers who claim to contain the target head
* root are grouped into the peer pool and queried for batches when downloading the chain.
*/
export class SyncChain {
/** Short string id to identify this SyncChain in logs */
readonly logId: string;
/** Should sync up until this slot, then stop */
readonly target: ChainTarget;
readonly syncType: RangeSyncType;
/** Should sync up until this slot, then stop */
target: ChainTarget | null = null;

/** Number of validated epochs. For the SyncRange to prevent switching chains too fast */
validatedEpochs = 0;
Expand All @@ -98,30 +103,28 @@ export class SyncChain {
private readonly batchProcessor = new ItTrigger();
/** Sorted map of batches undergoing some kind of processing. */
private readonly batches = new Map<Epoch, Batch>();
private readonly peerset = new PeerSet();
private readonly peerset = new PeerMap<ChainTarget>();

private readonly logger: ILogger;
private readonly config: IBeaconConfig;
private readonly opts: SyncChainOpts;

constructor(
startEpoch: Epoch,
target: ChainTarget,
syncType: RangeSyncType,
fns: SyncChainFns,
modules: SyncChainModules,
opts?: SyncChainOpts
) {
this.startEpoch = startEpoch;
this.target = target;
this.syncType = syncType;
this.processChainSegment = fns.processChainSegment;
this.downloadBeaconBlocksByRange = fns.downloadBeaconBlocksByRange;
this.reportPeer = fns.reportPeer;
this.config = modules.config;
this.logger = modules.logger;
this.opts = {epochsPerBatch: opts?.epochsPerBatch ?? EPOCHS_PER_BATCH};
this.logId = `${syncType}-${target.slot}-${toHexString(target.root).slice(0, 6)}`;
this.logId = `${syncType}`;

// Trigger event on parent class
this.sync().then(
Expand Down Expand Up @@ -175,19 +178,20 @@ export class SyncChain {
/**
* Add peer to the chain and request batches if active
*/
addPeer(peerId: PeerId): void {
if (!this.peerset.has(peerId)) {
this.peerset.add(peerId);
this.triggerBatchDownloader();
}
addPeer(peer: PeerId, target: ChainTarget): void {
this.peerset.set(peer, target);
this.computeTarget();
this.triggerBatchDownloader();
}

/**
* Returns true if the peer existed and has been removed
* NOTE: The RangeSync will take care of deleting the SyncChain if peers = 0
*/
removePeer(peerId: PeerId): boolean {
return this.peerset.delete(peerId);
const deleted = this.peerset.delete(peerId);
this.computeTarget();
return deleted;
}

/**
Expand All @@ -210,14 +214,14 @@ export class SyncChain {
}

getPeers(): PeerId[] {
return this.peerset.values();
return this.peerset.keys();
}

/** Full debug state for lodestar API */
getDebugState(): SyncChainDebugState {
return {
targetRoot: toHexString(this.target.root),
targetSlot: this.target.slot,
targetRoot: this.target && toHexString(this.target.root),
targetSlot: this.target && this.target.slot,
syncType: this.syncType,
status: this.status,
startEpoch: this.startEpoch,
Expand All @@ -226,6 +230,11 @@ export class SyncChain {
};
}

private computeTarget(): void {
const targets = this.peerset.values();
this.target = computeMostCommonTarget(targets);
}

/**
* Main Promise that handles the sync process. Will resolve when initial sync completes
* i.e. when it successfully processes a epoch >= than this chain `targetEpoch`
Expand All @@ -243,7 +252,7 @@ export class SyncChain {

// If startEpoch of the next batch to be processed > targetEpoch -> Done
const toBeProcessedEpoch = toBeProcessedStartEpoch(toArr(this.batches), this.startEpoch, this.opts);
if (computeStartSlotAtEpoch(this.config, toBeProcessedEpoch) >= this.target.slot) {
if (this.target && computeStartSlotAtEpoch(this.config, toBeProcessedEpoch) >= this.target.slot) {
break;
}

Expand All @@ -267,7 +276,7 @@ export class SyncChain {
// We drop the chain and report all peers.
// There are some edge cases with forks that could cause this situation, but it's unlikely.
if (e instanceof BatchError && e.type.code === BatchErrorCode.MAX_PROCESSING_ATTEMPTS) {
for (const peer of this.peerset.values()) {
for (const peer of this.peerset.keys()) {
this.reportPeer(peer, PeerAction.LowToleranceError, "SyncChainMaxProcessingAttempts");
}
}
Expand All @@ -291,7 +300,7 @@ export class SyncChain {
*/
private triggerBatchDownloader(): void {
try {
this.requestBatches(this.peerset.values());
this.requestBatches(this.peerset.keys());
} catch (e) {
// bubble the error up to the main async iterable loop
this.batchProcessor.end(e);
Expand Down Expand Up @@ -352,7 +361,7 @@ export class SyncChain {
const toBeDownloadedSlot = computeStartSlotAtEpoch(this.config, startEpoch) + BATCH_SLOT_OFFSET;

// Don't request batches beyond the target head slot
if (toBeDownloadedSlot > this.target.slot) {
if (this.target && toBeDownloadedSlot > this.target.slot) {
return null;
}

Expand Down
25 changes: 11 additions & 14 deletions packages/lodestar/src/sync/range/range.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@chainsafe/lodestar-b
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {Epoch, Slot, phase0} from "@chainsafe/lodestar-types";
import {ILogger} from "@chainsafe/lodestar-utils";
import {toHexString} from "@chainsafe/ssz";
import {IBeaconChain} from "../../chain";
import {INetwork} from "../../network";
import {IBeaconMetrics} from "../../metrics";
import {IMetrics} from "../../metrics";
import {RangeSyncType, getRangeSyncType} from "../utils";
import {updateChains, shouldRemoveChain} from "./utils";
import {ChainTarget, SyncChainFns, SyncChain, SyncChainOpts, SyncChainDebugState} from "./chain";
Expand Down Expand Up @@ -40,7 +39,7 @@ type RangeSyncState =
export type RangeSyncModules = {
chain: IBeaconChain;
network: INetwork;
metrics?: IBeaconMetrics;
metrics: IMetrics | null;
config: IBeaconConfig;
logger: ILogger;
};
Expand Down Expand Up @@ -74,10 +73,11 @@ export type RangeSyncOpts = SyncChainOpts;
export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) {
private readonly chain: IBeaconChain;
private readonly network: INetwork;
private readonly metrics?: IBeaconMetrics;
private readonly metrics: IMetrics | null;
private readonly config: IBeaconConfig;
private readonly logger: ILogger;
private readonly chains = new Map<string, SyncChain>();
/** There is a single chain per type, 1 finalized sync, 1 head sync */
private readonly chains = new Map<RangeSyncType, SyncChain>();

private opts?: SyncChainOpts;

Expand Down Expand Up @@ -156,7 +156,7 @@ export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) {
get state(): RangeSyncState {
const syncingHeadTargets: ChainTarget[] = [];
for (const chain of this.chains.values()) {
if (chain.isSyncing) {
if (chain.isSyncing && chain.target) {
if (chain.syncType === RangeSyncType.Finalized) {
return {status: RangeSyncStatus.Finalized, target: chain.target};
} else {
Expand All @@ -181,7 +181,7 @@ export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) {

/** Convenience method for `SyncChain` */
private processChainSegment: SyncChainFns["processChainSegment"] = async (blocks) => {
await this.chain.processChainSegment(blocks, {prefinalized: true, trusted: false}); // Verify signatures
await this.chain.processChainSegment(blocks, {prefinalized: true, trusted: false}); // Not trusted, verify signatures
};

/** Convenience method for `SyncChain` */
Expand All @@ -202,13 +202,10 @@ export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) {
};

private addPeerOrCreateChain(startEpoch: Epoch, target: ChainTarget, peer: PeerId, syncType: RangeSyncType): void {
const id = `${syncType}-${target.slot}-${toHexString(target.root)}`;

let syncChain = this.chains.get(id);
let syncChain = this.chains.get(syncType);
if (!syncChain) {
syncChain = new SyncChain(
startEpoch,
target,
syncType,
{
processChainSegment: this.processChainSegment,
Expand All @@ -219,11 +216,11 @@ export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) {
{config: this.config, logger: this.logger},
this.opts
);
this.chains.set(id, syncChain);
this.logger.verbose("New syncChain", {id: syncChain.logId});
this.chains.set(syncType, syncChain);
this.logger.verbose("New syncChain", {syncType});
}

syncChain.addPeer(peer);
syncChain.addPeer(peer, target);
}

private update(localFinalizedEpoch: Epoch): void {
Expand Down
31 changes: 31 additions & 0 deletions packages/lodestar/src/sync/range/utils/chainTarget.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import {Root, Slot} from "@chainsafe/lodestar-types";
import {toHexString} from "@chainsafe/ssz";

/**
* Sync this up to this target. Uses slot instead of epoch to re-use logic for finalized sync
* and head sync. The root is used to uniquely identify this chain on different forks
*/
export type ChainTarget = {
slot: Slot;
root: Root;
};

export function computeMostCommonTarget(targets: ChainTarget[]): ChainTarget | null {
const targetsById = new Map<string, ChainTarget>();
const countById = new Map<string, number>();

for (const target of targets) {
const targetId = `${target.slot}-${toHexString(target.root)}`;
targetsById.set(targetId, target);
countById.set(targetId, 1 + (countById.get(targetId) ?? 0));
}

let mostCommon: {count: number; targetId: string} | null = null;
for (const [targetId, count] of countById.entries()) {
if (!mostCommon || count > mostCommon.count) {
mostCommon = {count, targetId};
}
}

return mostCommon && (targetsById.get(mostCommon.targetId) ?? null);
}
2 changes: 1 addition & 1 deletion packages/lodestar/src/sync/range/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export * from "./batches";
export * from "./chainTarget";
export * from "./hashBlocks";
export * from "./peerBalancer";
export * from "./shouldRemoveChain";
export * from "./updateChains";
export * from "./wrapError";
4 changes: 2 additions & 2 deletions packages/lodestar/src/sync/range/utils/shouldRemoveChain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export function shouldRemoveChain(syncChain: SyncChain, localFinalizedSlot: Slot
// Sync chain has no more peers to download from
syncChain.peers === 0 ||
// Outdated: our chain has progressed beyond this sync chain
syncChain.target.slot < localFinalizedSlot ||
chain.forkChoice.hasBlock(syncChain.target.root)
(syncChain.target !== null &&
(syncChain.target.slot < localFinalizedSlot || chain.forkChoice.hasBlock(syncChain.target.root)))
);
}

0 comments on commit 14dc53a

Please sign in to comment.