Skip to content

Commit

Permalink
Merge 1b8a4d6 into 8ccbcb8
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Jul 18, 2022
2 parents 8ccbcb8 + 1b8a4d6 commit 01ba855
Show file tree
Hide file tree
Showing 10 changed files with 317 additions and 168 deletions.
24 changes: 19 additions & 5 deletions packages/beacon-node/src/sync/constants.ts
Expand Up @@ -10,8 +10,19 @@ export const MAX_BATCH_DOWNLOAD_ATTEMPTS = 5;
/** Consider batch faulty after downloading and processing this number of times */
export const MAX_BATCH_PROCESSING_ATTEMPTS = 3;

/** Batch range excludes the first block of the epoch. @see Batch */
export const BATCH_SLOT_OFFSET = 1;
/**
* Number of slots to offset batches.
*
* Before Jul2022 an offset of 1 was required to download the checkpoint block during finalized sync. Otherwise
* the block necessary so switch from Finalized sync to Head sync won't be in the fork-choice and range sync would
* be stuck in a loop downloading the previous epoch to finalized epoch, until we get rate-limited.
*
* After Jul2022 during finalized sync the entire epoch of finalized epoch will be downloaded fullfilling the goal
* to switch to Head sync latter. This does not affect performance nor sync speed and just downloads a few extra
* blocks that would be required by Head sync anyway. However, having an offset of 0 allows to send to the processor
* blocks that belong to the same epoch, which enables batch verification optimizations.
*/
export const BATCH_SLOT_OFFSET = 0;

/** First epoch to allow to start gossip */
export const MIN_EPOCH_TO_START_GOSSIP = -1;
Expand All @@ -23,14 +34,17 @@ export const MIN_EPOCH_TO_START_GOSSIP = -1;
* we will negatively report peers with poor bandwidth. This can be set arbitrarily high, in which
* case the responder will fill the response up to the max request size, assuming they have the
* bandwidth to do so.
*
* Jul2022: Current batch block processor wants only blocks in the same epoch. So we'll process only
* one batch at a time. Metrics can confirm preliminary tests that speed is as good.
*/
export const EPOCHS_PER_BATCH = 2;
export const EPOCHS_PER_BATCH = 1;

/**
* The maximum number of batches to queue before requesting more.
* In good network conditions downloading batches is much faster than processing them
* A number > 5 results in wasted progress when the chain completes syncing
*A number > 10 epochs worth results in wasted progress when the chain completes syncing
*
* TODO: When switching branches usually all batches in AwaitingProcessing are dropped, could it be optimized?
*/
export const BATCH_BUFFER_SIZE = 5;
export const BATCH_BUFFER_SIZE = Math.ceil(10 / EPOCHS_PER_BATCH);
26 changes: 10 additions & 16 deletions packages/beacon-node/src/sync/range/batch.ts
@@ -1,16 +1,10 @@
import PeerId from "peer-id";
import {allForks, Epoch, phase0} from "@lodestar/types";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {IChainForkConfig} from "@lodestar/config";
import {LodestarError} from "@lodestar/utils";
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {BATCH_SLOT_OFFSET, MAX_BATCH_DOWNLOAD_ATTEMPTS, MAX_BATCH_PROCESSING_ATTEMPTS} from "../constants.js";
import {MAX_BATCH_DOWNLOAD_ATTEMPTS, MAX_BATCH_PROCESSING_ATTEMPTS} from "../constants.js";
import {BlockError, BlockErrorCode} from "../../chain/errors/index.js";
import {hashBlocks} from "./utils/index.js";

export type BatchOpts = {
epochsPerBatch: Epoch;
};
import {getBatchSlotRange, hashBlocks} from "./utils/index.js";

/**
* Current state of a batch
Expand Down Expand Up @@ -54,14 +48,15 @@ export type BatchMetadata = {
};

/**
* Batches are downloaded excluding the first block of the epoch assuming it has already been
* downloaded.
* Batches are downloaded at the first block of the epoch.
*
* For example:
*
* Epoch boundary | |
* ... | 30 | 31 | 32 | 33 | 34 | ... | 61 | 62 | 63 | 64 | 65 |
* Batch 1 | Batch 2 | Batch 3
* Batch 1 | Batch 2 | Batch 3
*
* Jul2022: Offset changed from 1 to 0, see rationale in {@link BATCH_SLOT_OFFSET}
*/
export class Batch {
readonly startEpoch: Epoch;
Expand All @@ -77,15 +72,14 @@ export class Batch {
private readonly failedDownloadAttempts: PeerId[] = [];
private readonly config: IChainForkConfig;

constructor(startEpoch: Epoch, config: IChainForkConfig, opts: BatchOpts) {
const startSlot = computeStartSlotAtEpoch(startEpoch) + BATCH_SLOT_OFFSET;
const endSlot = startSlot + opts.epochsPerBatch * SLOTS_PER_EPOCH;
constructor(startEpoch: Epoch, config: IChainForkConfig) {
const {startSlot, count} = getBatchSlotRange(startEpoch);

this.config = config;
this.startEpoch = startEpoch;
this.request = {
startSlot: startSlot,
count: endSlot - startSlot,
startSlot,
count,
step: 1,
};
}
Expand Down
83 changes: 45 additions & 38 deletions packages/beacon-node/src/sync/range/chain.ts
@@ -1,6 +1,5 @@
import PeerId from "peer-id";
import {Epoch, Root, Slot, phase0, allForks} from "@lodestar/types";
import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {ErrorAborted, ILogger} from "@lodestar/utils";
import {IChainForkConfig} from "@lodestar/config";
import {toHexString} from "@chainsafe/ssz";
Expand All @@ -10,20 +9,20 @@ import {byteArrayEquals} from "../../util/bytes.js";
import {PeerMap} from "../../util/peerMap.js";
import {wrapError} from "../../util/wrapError.js";
import {RangeSyncType} from "../utils/remoteSyncType.js";
import {BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH, BATCH_SLOT_OFFSET} from "../constants.js";
import {Batch, BatchError, BatchErrorCode, BatchMetadata, BatchOpts, BatchStatus} from "./batch.js";
import {BATCH_BUFFER_SIZE, EPOCHS_PER_BATCH} from "../constants.js";
import {Batch, BatchError, BatchErrorCode, BatchMetadata, BatchStatus} from "./batch.js";
import {
validateBatchesStatus,
getNextBatchToProcess,
toBeProcessedStartEpoch,
toBeDownloadedStartEpoch,
toArr,
ChainPeersBalancer,
computeMostCommonTarget,
batchStartEpochIsAfterSlot,
isSyncChainDone,
getBatchSlotRange,
} from "./utils/index.js";

export type SyncChainOpts = Partial<BatchOpts>;

export type SyncChainModules = {
config: IChainForkConfig;
logger: ILogger;
Expand Down Expand Up @@ -70,7 +69,7 @@ export type SyncChainDebugState = {
export enum SyncChainStatus {
Stopped = "Stopped",
Syncing = "Syncing",
Synced = "Synced",
Done = "Done",
Error = "Error",
}

Expand All @@ -94,8 +93,12 @@ export class SyncChain {
/** Number of validated epochs. For the SyncRange to prevent switching chains too fast */
validatedEpochs = 0;

/** The start of the chain segment. Any epoch previous to this one has been validated. */
private startEpoch: Epoch;
readonly firstBatchEpoch: Epoch;
/**
* The start of the chain segment. Any epoch previous to this one has been validated.
* But the `lastEpochWithProcessBlocks` may not be valid entirely. The
*/
private lastEpochWithProcessBlocks: Epoch;
private status = SyncChainStatus.Stopped;

private readonly processChainSegment: SyncChainFns["processChainSegment"];
Expand All @@ -109,25 +112,23 @@ export class SyncChain {

private readonly logger: ILogger;
private readonly config: IChainForkConfig;
private readonly opts: BatchOpts;

constructor(
startEpoch: Epoch,
initialBatchEpoch: Epoch,
initialTarget: ChainTarget,
syncType: RangeSyncType,
fns: SyncChainFns,
modules: SyncChainModules,
opts?: SyncChainOpts
modules: SyncChainModules
) {
this.startEpoch = startEpoch;
this.firstBatchEpoch = initialBatchEpoch;
this.lastEpochWithProcessBlocks = initialBatchEpoch;
this.target = initialTarget;
this.syncType = syncType;
this.processChainSegment = fns.processChainSegment;
this.downloadBeaconBlocksByRange = fns.downloadBeaconBlocksByRange;
this.reportPeer = fns.reportPeer;
this.config = modules.config;
this.logger = modules.logger;
this.opts = {epochsPerBatch: opts?.epochsPerBatch ?? EPOCHS_PER_BATCH};
this.logId = `${syncType}`;

// Trigger event on parent class
Expand All @@ -148,17 +149,24 @@ export class SyncChain {
case SyncChainStatus.Syncing:
return; // Skip, already started
case SyncChainStatus.Error:
case SyncChainStatus.Synced:
case SyncChainStatus.Done:
throw new SyncChainStartError(`Attempted to start an ended SyncChain ${this.status}`);
}

this.status = SyncChainStatus.Syncing;

this.logger.debug("SyncChain startSyncing", {
localFinalizedEpoch,
lastEpochWithProcessBlocks: this.lastEpochWithProcessBlocks,
targetSlot: this.target.slot,
});

// to avoid dropping local progress, we advance the chain with its batch boundaries.
// get the aligned epoch that produces a batch containing the `localFinalizedEpoch`
const localFinalizedEpochAligned =
this.startEpoch + Math.floor((localFinalizedEpoch - this.startEpoch) / EPOCHS_PER_BATCH) * EPOCHS_PER_BATCH;
this.advanceChain(localFinalizedEpochAligned);
const lastEpochWithProcessBlocksAligned =
this.lastEpochWithProcessBlocks +
Math.floor((localFinalizedEpoch - this.lastEpochWithProcessBlocks) / EPOCHS_PER_BATCH) * EPOCHS_PER_BATCH;
this.advanceChain(lastEpochWithProcessBlocksAligned);

// Potentially download new batches and process pending
this.triggerBatchDownloader();
Expand Down Expand Up @@ -205,16 +213,17 @@ export class SyncChain {
return toArr(this.batches).map((batch) => batch.getMetadata());
}

get startEpochValue(): Epoch {
return this.startEpoch;
get lastValidatedSlot(): Slot {
// Last epoch of the batch after the last one validated
return getBatchSlotRange(this.lastEpochWithProcessBlocks + EPOCHS_PER_BATCH).startSlot - 1;
}

get isSyncing(): boolean {
return this.status === SyncChainStatus.Syncing;
}

get isRemovable(): boolean {
return this.status === SyncChainStatus.Error || this.status === SyncChainStatus.Synced;
return this.status === SyncChainStatus.Error || this.status === SyncChainStatus.Done;
}

get peers(): number {
Expand All @@ -232,7 +241,7 @@ export class SyncChain {
targetSlot: this.target.slot,
syncType: this.syncType,
status: this.status,
startEpoch: this.startEpoch,
startEpoch: this.lastEpochWithProcessBlocks,
peers: this.peers,
batches: this.getBatchesState(),
};
Expand Down Expand Up @@ -260,9 +269,8 @@ export class SyncChain {
// TODO: Consider running this check less often after the sync is well tested
validateBatchesStatus(toArr(this.batches));

// If startEpoch of the next batch to be processed > targetEpoch -> Done
const toBeProcessedEpoch = toBeProcessedStartEpoch(toArr(this.batches), this.startEpoch, this.opts);
if (computeStartSlotAtEpoch(toBeProcessedEpoch) >= this.target.slot) {
// Returns true if SyncChain has processed all possible blocks with slot <= target.slot
if (isSyncChainDone(toArr(this.batches), this.lastEpochWithProcessBlocks, this.target.slot)) {
break;
}

Expand All @@ -271,8 +279,8 @@ export class SyncChain {
if (batch) await this.processBatch(batch);
}

this.status = SyncChainStatus.Synced;
this.logger.verbose("SyncChain Synced", {id: this.logId});
this.status = SyncChainStatus.Done;
this.logger.verbose("SyncChain Done", {id: this.logId});
} catch (e) {
if (e instanceof ErrorAborted) {
return; // Ignore
Expand Down Expand Up @@ -366,11 +374,10 @@ export class SyncChain {
}

// This line decides the starting epoch of the next batch. MUST ensure no duplicate batch for the same startEpoch
const startEpoch = toBeDownloadedStartEpoch(batches, this.startEpoch, this.opts);
const toBeDownloadedSlot = computeStartSlotAtEpoch(startEpoch) + BATCH_SLOT_OFFSET;
const startEpoch = toBeDownloadedStartEpoch(batches, this.lastEpochWithProcessBlocks);

// Don't request batches beyond the target head slot
if (toBeDownloadedSlot > this.target.slot) {
// Don't request batches beyond the target head slot. The to-be-downloaded batch must be strictly after target.slot
if (batchStartEpochIsAfterSlot(startEpoch, this.target.slot)) {
return null;
}

Expand All @@ -379,7 +386,7 @@ export class SyncChain {
return null;
}

const batch = new Batch(startEpoch, this.config, this.opts);
const batch = new Batch(startEpoch, this.config);
this.batches.set(startEpoch, batch);
return batch;
}
Expand Down Expand Up @@ -459,16 +466,16 @@ export class SyncChain {
}

/**
* Drops any batches previous to `newStartEpoch` and updates the chain boundaries
* Drops any batches previous to `newLatestValidatedEpoch` and updates the chain boundaries
*/
private advanceChain(newStartEpoch: Epoch): void {
private advanceChain(newLastEpochWithProcessBlocks: Epoch): void {
// make sure this epoch produces an advancement
if (newStartEpoch <= this.startEpoch) {
if (newLastEpochWithProcessBlocks <= this.lastEpochWithProcessBlocks) {
return;
}

for (const [batchKey, batch] of this.batches.entries()) {
if (batch.startEpoch < newStartEpoch) {
if (batch.startEpoch < newLastEpochWithProcessBlocks) {
this.batches.delete(batchKey);
this.validatedEpochs += EPOCHS_PER_BATCH;

Expand All @@ -488,7 +495,7 @@ export class SyncChain {
}
}

this.startEpoch = newStartEpoch;
this.lastEpochWithProcessBlocks = newLastEpochWithProcessBlocks;
}
}

Expand Down

0 comments on commit 01ba855

Please sign in to comment.