Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make batch sizes dynamic for eth1 fetch of blocks/logs #4532

Merged
merged 5 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 17 additions & 1 deletion dashboards/lodestar_execution_engine.json
Original file line number Diff line number Diff line change
Expand Up @@ -1001,9 +1001,25 @@
"interval": "",
"legendFormat": "eth1_follow_distance_dynamic",
"refId": "A"
},
{
"exemplar": false,
"expr": "lodestar_eth1_blocks_batch_size_dynamic",
"hide": false,
"interval": "",
"legendFormat": "eth1_blocks_batch_size_dynamic",
"refId": "B"
},
{
"exemplar": false,
"expr": "lodestar_eth1_logs_batch_size_dynamic",
"hide": false,
"interval": "",
"legendFormat": "eth1_logs_batch_size_dynamic",
"refId": "C"
}
],
"title": "Eth1 Follow Distance Dynamic",
"title": "Eth1 Dynamic Stats",
"type": "timeseries"
},
{
Expand Down
67 changes: 54 additions & 13 deletions packages/beacon-node/src/eth1/eth1DepositDataTracker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import {phase0, ssz} from "@lodestar/types";
import {IChainForkConfig} from "@lodestar/config";
import {BeaconStateAllForks, becomesNewEth1Data} from "@lodestar/state-transition";
import {ErrorAborted, fromHex, ILogger, isErrorAborted, sleep} from "@lodestar/utils";
import {ErrorAborted, TimeoutError, fromHex, ILogger, isErrorAborted, sleep} from "@lodestar/utils";

import {IBeaconDb} from "../db/index.js";
import {IMetrics} from "../metrics/index.js";
import {Eth1DepositsCache} from "./eth1DepositsCache.js";
Expand All @@ -12,9 +13,14 @@ import {Eth1DataAndDeposits, IEth1Provider} from "./interface.js";
import {Eth1Options} from "./options.js";
import {HttpRpcError} from "./provider/jsonRpcHttpClient.js";
import {parseEth1Block} from "./provider/eth1Provider.js";
import {isJsonRpcTruncatedError} from "./provider/utils.js";

const MAX_BLOCKS_PER_BLOCK_QUERY = 1000;
const MIN_BLOCKS_PER_BLOCK_QUERY = 10;

const MAX_BLOCKS_PER_LOG_QUERY = 1000;
const MIN_BLOCKS_PER_LOG_QUERY = 10;

/** Eth1 blocks happen every 14s approx, not need to update too often once synced */
const AUTO_UPDATE_PERIOD_MS = 60 * 1000;
/** Prevent infinite loops */
Expand Down Expand Up @@ -53,7 +59,13 @@ export class Eth1DepositDataTracker {
private depositsCache: Eth1DepositsCache;
private eth1DataCache: Eth1DataCache;
private lastProcessedDepositBlockNumber: number | null = null;

/** Dynamically adjusted follow distance */
private eth1FollowDistance: number;
/** Dynamically adusted batch size to fetch deposit logs */
private eth1GetBlocksBatchSizeDynamic = MAX_BLOCKS_PER_BLOCK_QUERY;
/** Dynamically adusted batch size to fetch deposit logs */
private eth1GetLogsBatchSizeDynamic = MAX_BLOCKS_PER_LOG_QUERY;
private readonly forcedEth1DataVote: phase0.Eth1Data | null;

constructor(
Expand Down Expand Up @@ -81,16 +93,20 @@ export class Eth1DepositDataTracker {
if (metrics) {
// Set constant value once
metrics?.eth1.eth1FollowDistanceSecondsConfig.set(config.SECONDS_PER_ETH1_BLOCK * config.ETH1_FOLLOW_DISTANCE);
metrics.eth1.eth1FollowDistanceDynamic.addCollect(() =>
metrics.eth1.eth1FollowDistanceDynamic.set(this.eth1FollowDistance)
);
metrics.eth1.eth1FollowDistanceDynamic.addCollect(() => {
metrics.eth1.eth1FollowDistanceDynamic.set(this.eth1FollowDistance);
metrics.eth1.eth1GetBlocksBatchSizeDynamic.set(this.eth1GetBlocksBatchSizeDynamic);
metrics.eth1.eth1GetLogsBatchSizeDynamic.set(this.eth1GetLogsBatchSizeDynamic);
});
}

this.runAutoUpdate().catch((e: Error) => {
if (!(e instanceof ErrorAborted)) {
this.logger.error("Error on eth1 loop", {}, e);
}
});
if (opts.enabled) {
this.runAutoUpdate().catch((e: Error) => {
if (!(e instanceof ErrorAborted)) {
this.logger.error("Error on eth1 loop", {}, e);
}
});
}
}

/**
Expand Down Expand Up @@ -202,9 +218,22 @@ export class Eth1DepositDataTracker {
// The DB may contain deposits from a different chain making lastProcessedDepositBlockNumber > current chain tip
// The Math.min() fixes those rare scenarios where fromBlock > toBlock
const fromBlock = Math.min(remoteFollowBlock, this.getFromBlockToFetch(lastProcessedDepositBlockNumber));
const toBlock = Math.min(remoteFollowBlock, fromBlock + MAX_BLOCKS_PER_LOG_QUERY - 1);
const toBlock = Math.min(remoteFollowBlock, fromBlock + this.eth1GetLogsBatchSizeDynamic - 1);

let depositEvents;
try {
depositEvents = await this.eth1Provider.getDepositEvents(fromBlock, toBlock);
this.eth1GetLogsBatchSizeDynamic = Math.min(MAX_BLOCKS_PER_LOG_QUERY, this.eth1GetLogsBatchSizeDynamic * 2);
} catch (e) {
if (isJsonRpcTruncatedError(e as Error) || e instanceof TimeoutError) {
this.eth1GetLogsBatchSizeDynamic = Math.max(
MIN_BLOCKS_PER_LOG_QUERY,
Math.floor(this.eth1GetLogsBatchSizeDynamic / 2)
);
}
throw e;
}

const depositEvents = await this.eth1Provider.getDepositEvents(fromBlock, toBlock);
this.logger.verbose("Fetched deposits", {depositCount: depositEvents.length, fromBlock, toBlock});
this.metrics?.eth1.depositEventsFetched.inc(depositEvents.length);

Expand Down Expand Up @@ -253,11 +282,23 @@ export class Eth1DepositDataTracker {
);
const toBlock = Math.min(
remoteFollowBlock,
fromBlock + MAX_BLOCKS_PER_BLOCK_QUERY - 1, // Block range is inclusive
fromBlock + this.eth1GetBlocksBatchSizeDynamic - 1, // Block range is inclusive
lastProcessedDepositBlockNumber
);

const blocksRaw = await this.eth1Provider.getBlocksByNumber(fromBlock, toBlock);
let blocksRaw;
try {
blocksRaw = await this.eth1Provider.getBlocksByNumber(fromBlock, toBlock);
this.eth1GetBlocksBatchSizeDynamic = Math.min(MAX_BLOCKS_PER_BLOCK_QUERY, this.eth1GetBlocksBatchSizeDynamic * 2);
} catch (e) {
if (isJsonRpcTruncatedError(e as Error) || e instanceof TimeoutError) {
this.eth1GetBlocksBatchSizeDynamic = Math.max(
MIN_BLOCKS_PER_BLOCK_QUERY,
Math.floor(this.eth1GetBlocksBatchSizeDynamic / 2)
);
}
throw e;
}
const blocks = blocksRaw.map(parseEth1Block);

this.logger.verbose("Fetched eth1 blocks", {blockCount: blocks.length, fromBlock, toBlock});
Expand Down
60 changes: 10 additions & 50 deletions packages/beacon-node/src/eth1/provider/eth1Provider.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import {toHexString} from "@chainsafe/ssz";
import {phase0} from "@lodestar/types";
import {IChainConfig} from "@lodestar/config";
import {fromHex, retry} from "@lodestar/utils";
import {fromHex} from "@lodestar/utils";

import {chunkifyInclusiveRange} from "../../util/chunkify.js";
import {linspace} from "../../util/numpy.js";
import {depositEventTopics, parseDepositLog} from "../utils/depositContract.js";
import {Eth1Block, IEth1Provider} from "../interface.js";
Expand Down Expand Up @@ -77,32 +76,12 @@ export class Eth1Provider implements IEth1Provider {
}

async getDepositEvents(fromBlock: number, toBlock: number): Promise<phase0.DepositEvent[]> {
const logsRawArr = await retry(
(attempt) => {
// Large log requests can return with code 200 but truncated, with broken JSON
// This retry will split a given block range into smaller ranges exponentially
// The underlying http client should handle network errors and retry
const chunkCount = 2 ** (attempt - 1);
const blockRanges = chunkifyInclusiveRange(fromBlock, toBlock, chunkCount);
return Promise.all(
blockRanges.map(([from, to]) => {
const options = {
fromBlock: from,
toBlock: to,
address: this.depositContractAddress,
topics: depositEventTopics,
};
return this.getLogs(options);
})
);
},
{
retries: 3,
retryDelay: 3000,
shouldRetry: isJsonRpcTruncatedError,
}
);

const logsRawArr = await this.getLogs({
fromBlock,
toBlock,
address: this.depositContractAddress,
topics: depositEventTopics,
});
return logsRawArr.flat(1).map((log) => parseDepositLog(log));
}

Expand All @@ -111,29 +90,10 @@ export class Eth1Provider implements IEth1Provider {
*/
async getBlocksByNumber(fromBlock: number, toBlock: number): Promise<EthJsonRpcBlockRaw[]> {
const method = "eth_getBlockByNumber";
const blocksArr = await retry(
(attempt) => {
// Large batch requests can return with code 200 but truncated, with broken JSON
// This retry will split a given block range into smaller ranges exponentially
// The underlying http client should handle network errors and retry
const chunkCount = 2 ** (attempt - 1);
const blockRanges = chunkifyInclusiveRange(fromBlock, toBlock, chunkCount);
return Promise.all(
blockRanges.map(([from, to]) =>
this.rpc.fetchBatch<IEthJsonRpcReturnTypes[typeof method]>(
linspace(from, to).map((blockNumber) => ({method, params: [numToQuantity(blockNumber), false]})),
getBlocksByNumberOpts
)
)
);
},
{
retries: 3,
retryDelay: 3000,
shouldRetry: isJsonRpcTruncatedError,
}
const blocksArr = await this.rpc.fetchBatch<IEthJsonRpcReturnTypes[typeof method]>(
linspace(fromBlock, toBlock).map((blockNumber) => ({method, params: [numToQuantity(blockNumber), false]})),
getBlocksByNumberOpts
);

const blocks: EthJsonRpcBlockRaw[] = [];
for (const block of blocksArr.flat(1)) {
if (block) blocks.push(block);
Expand Down
8 changes: 8 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,14 @@ export function createLodestarMetrics(
name: "lodestar_eth1_follow_distance_dynamic",
help: "Eth1 dynamic follow distance changed by the deposit tracker if blocks are slow",
}),
eth1GetBlocksBatchSizeDynamic: register.gauge({
name: "lodestar_eth1_blocks_batch_size_dynamic",
help: "Dynamic batch size to fetch blocks",
}),
eth1GetLogsBatchSizeDynamic: register.gauge({
name: "lodestar_eth1_logs_batch_size_dynamic",
help: "Dynamic batch size to fetch deposit logs",
}),

// Merge Search info
eth1MergeStatus: register.gauge({
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import {expect} from "chai";
import sinon from "sinon";
import {config} from "@lodestar/config/default";
import {TimeoutError} from "@lodestar/utils";

import {Eth1DepositDataTracker} from "../../../src/eth1/eth1DepositDataTracker.js";
import {Eth1Provider} from "../../../src/eth1/provider/eth1Provider.js";
import {testLogger} from "../../utils/logger.js";
import {defaultEth1Options} from "../../../src/eth1/options.js";
import {BeaconDb} from "../../../src/db/beacon.js";

describe("Eth1DepositDataTracker", function () {
const sandbox = sinon.createSandbox();
const controller = new AbortController();

const logger = testLogger();
const opts = {...defaultEth1Options, enabled: false};
const signal = controller.signal;
const eth1Provider = new Eth1Provider(config, opts, signal, null);
const db = sinon.createStubInstance(BeaconDb);

const eth1DepositDataTracker = new Eth1DepositDataTracker(
opts,
{config, db, logger, signal, metrics: null},
eth1Provider
);
sinon
.stub(
(eth1DepositDataTracker as never) as {
getLastProcessedDepositBlockNumber: typeof eth1DepositDataTracker["getLastProcessedDepositBlockNumber"];
},
"getLastProcessedDepositBlockNumber"
)
.resolves(0);

sinon.stub(eth1DepositDataTracker["eth1DataCache"], "getHighestCachedBlockNumber").resolves(0);
sinon.stub(eth1DepositDataTracker["eth1DataCache"], "add").resolves(void 0);

sinon.stub(eth1DepositDataTracker["depositsCache"], "getEth1DataForBlocks").resolves([]);
sinon.stub(eth1DepositDataTracker["depositsCache"], "add").resolves(void 0);
sinon.stub(eth1DepositDataTracker["depositsCache"], "getLowestDepositEventBlockNumber").resolves(0);

const getBlocksByNumberStub = sinon.stub(eth1Provider, "getBlocksByNumber");
const getDepositEventsStub = sinon.stub(eth1Provider, "getDepositEvents");

after(() => {
sandbox.restore();
});

it("Should dynamically adjust blocks batch size", async function () {
let expectedSize = 1000;
expect(eth1DepositDataTracker["eth1GetBlocksBatchSizeDynamic"]).to.be.equal(expectedSize);

// If there are timeerrors or parse errors then batch size should reduce
getBlocksByNumberStub.throws(new TimeoutError("timeout error"));
for (let i = 0; i < 10; i++) {
expectedSize = Math.max(Math.floor(expectedSize / 2), 10);
await eth1DepositDataTracker["updateBlockCache"](3000).catch((_e) => void 0);
expect(eth1DepositDataTracker["eth1GetBlocksBatchSizeDynamic"]).to.be.equal(expectedSize);
}
expect(expectedSize).to.be.equal(10);

getBlocksByNumberStub.resolves([]);
for (let i = 0; i < 10; i++) {
expectedSize = Math.min(expectedSize * 2, 1000);
await eth1DepositDataTracker["updateBlockCache"](3000);
expect(eth1DepositDataTracker["eth1GetBlocksBatchSizeDynamic"]).to.be.equal(expectedSize);
}
expect(expectedSize).to.be.equal(1000);
});

it("Should dynamically adjust logs batch size", async function () {
let expectedSize = 1000;
expect(eth1DepositDataTracker["eth1GetLogsBatchSizeDynamic"]).to.be.equal(expectedSize);

// If there are timeerrors or parse errors then batch size should reduce
getDepositEventsStub.throws(new TimeoutError("timeout error"));
for (let i = 0; i < 10; i++) {
expectedSize = Math.max(Math.floor(expectedSize / 2), 10);
await eth1DepositDataTracker["updateDepositCache"](3000).catch((_e) => void 0);
expect(eth1DepositDataTracker["eth1GetLogsBatchSizeDynamic"]).to.be.equal(expectedSize);
}
expect(expectedSize).to.be.equal(10);

getDepositEventsStub.resolves([]);
for (let i = 0; i < 10; i++) {
expectedSize = Math.min(expectedSize * 2, 1000);
await eth1DepositDataTracker["updateDepositCache"](3000);
expect(eth1DepositDataTracker["eth1GetLogsBatchSizeDynamic"]).to.be.equal(expectedSize);
}
expect(expectedSize).to.be.equal(1000);
});
});