-
-
Notifications
You must be signed in to change notification settings - Fork 266
/
index.ts
175 lines (158 loc) Β· 7.21 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import {WithOptionalBytes, allForks} from "@lodestar/types";
import {toHex} from "@lodestar/utils";
import {JobItemQueue} from "../../util/queue/index.js";
import {Metrics} from "../../metrics/metrics.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BlockProcessOpts} from "../options.js";
import type {BeaconChain} from "../chain.js";
import {verifyBlocksInEpoch} from "./verifyBlock.js";
import {importBlock} from "./importBlock.js";
import {assertLinearChainSegment} from "./utils/chainSegment.js";
import {BlockInput, FullyVerifiedBlock, ImportBlockOpts} from "./types.js";
import {verifyBlocksSanityChecks} from "./verifyBlocksSanityChecks.js";
import {removeEagerlyPeristedBlockInputs} from "./writeBlockInputToDb.js";
export {ImportBlockOpts, AttestationImportOpt} from "./types.js";
const QUEUE_MAX_LENGTH = 256;
/**
* BlockProcessor processes block jobs in a queued fashion, one after the other.
*/
export class BlockProcessor {
readonly jobQueue: JobItemQueue<[WithOptionalBytes<BlockInput>[], ImportBlockOpts], void>;
constructor(chain: BeaconChain, metrics: Metrics | null, opts: BlockProcessOpts, signal: AbortSignal) {
this.jobQueue = new JobItemQueue<[WithOptionalBytes<BlockInput>[], ImportBlockOpts], void>(
(job, importOpts) => {
return processBlocks.call(chain, job, {...opts, ...importOpts});
},
{maxLength: QUEUE_MAX_LENGTH, noYieldIfOneItem: true, signal},
metrics?.blockProcessorQueue ?? undefined
);
}
async processBlocksJob(job: WithOptionalBytes<BlockInput>[], opts: ImportBlockOpts = {}): Promise<void> {
await this.jobQueue.push(job, opts);
}
}
/**
* Validate and process a block
*
* The only effects of running this are:
* - forkChoice update, in the case of a valid block
* - various events emitted: checkpoint, forkChoice:*, head, block, error:block
* - (state cache update, from state regeneration)
*
* All other effects are provided by downstream event handlers
*/
export async function processBlocks(
this: BeaconChain,
blocks: WithOptionalBytes<BlockInput>[],
opts: BlockProcessOpts & ImportBlockOpts
): Promise<void> {
if (blocks.length === 0) {
return; // TODO: or throw?
} else if (blocks.length > 1) {
assertLinearChainSegment(this.config, blocks);
}
try {
const {relevantBlocks, dataAvailabilityStatuses, parentSlots, parentBlock} = verifyBlocksSanityChecks(
this,
blocks,
opts
);
// No relevant blocks, skip verifyBlocksInEpoch()
if (relevantBlocks.length === 0 || parentBlock === null) {
// parentBlock can only be null if relevantBlocks are empty
return;
}
// Fully verify a block to be imported immediately after. Does not produce any side-effects besides adding intermediate
// states in the state cache through regen.
const {postStates, proposerBalanceDeltas, segmentExecStatus} = await verifyBlocksInEpoch.call(
this,
parentBlock,
relevantBlocks,
dataAvailabilityStatuses,
opts
);
// If segmentExecStatus has lvhForkchoice then, the entire segment should be invalid
// and we need to further propagate
if (segmentExecStatus.execAborted !== null) {
if (segmentExecStatus.invalidSegmentLHV !== undefined) {
this.forkChoice.validateLatestHash(segmentExecStatus.invalidSegmentLHV);
}
throw segmentExecStatus.execAborted.execError;
}
const {executionStatuses} = segmentExecStatus;
const fullyVerifiedBlocks = relevantBlocks.map(
(block, i): FullyVerifiedBlock => ({
blockInput: block,
postState: postStates[i],
parentBlockSlot: parentSlots[i],
executionStatus: executionStatuses[i],
// Currently dataAvailableStatus is not used upstream but that can change if we
// start supporting optimistic syncing/processing
dataAvailableStatus: dataAvailabilityStatuses[i],
proposerBalanceDelta: proposerBalanceDeltas[i],
// TODO: Make this param mandatory and capture in gossip
seenTimestampSec: opts.seenTimestampSec ?? Math.floor(Date.now() / 1000),
})
);
for (const fullyVerifiedBlock of fullyVerifiedBlocks) {
// No need to sleep(0) here since `importBlock` includes a disk write
// TODO: Consider batching importBlock too if it takes significant time
await importBlock.call(this, fullyVerifiedBlock, opts);
}
} catch (e) {
// above functions should only throw BlockError
const err = getBlockError(e, blocks[0].block);
// TODO: De-duplicate with logic above
// ChainEvent.errorBlock
if (!(err instanceof BlockError)) {
this.logger.error("Non BlockError received", {}, err);
} else if (!opts.disableOnBlockError) {
this.logger.error("Block error", {slot: err.signedBlock.message.slot}, err);
if (err.type.code === BlockErrorCode.INVALID_SIGNATURE) {
const {signedBlock} = err;
const blockSlot = signedBlock.message.slot;
const {state} = err.type;
const forkTypes = this.config.getForkTypes(blockSlot);
this.persistInvalidSszValue(forkTypes.SignedBeaconBlock, signedBlock, `${blockSlot}_invalid_signature`);
this.persistInvalidSszView(state, `${state.slot}_invalid_signature`);
} else if (err.type.code === BlockErrorCode.INVALID_STATE_ROOT) {
const {signedBlock} = err;
const blockSlot = signedBlock.message.slot;
const {preState, postState} = err.type;
const forkTypes = this.config.getForkTypes(blockSlot);
const invalidRoot = toHex(postState.hashTreeRoot());
const suffix = `slot_${blockSlot}_invalid_state_root_${invalidRoot}`;
this.persistInvalidSszValue(forkTypes.SignedBeaconBlock, signedBlock, suffix);
this.persistInvalidSszView(preState, `${suffix}_preState`);
this.persistInvalidSszView(postState, `${suffix}_postState`);
}
}
// Clean db if we don't have blocks in forkchoice but already persisted them to db
//
// NOTE: this function is awaited to ensure that DB size remains constant, otherwise an attacker may bloat the
// disk with big malicious payloads. Our sequential block importer will wait for this promise before importing
// another block. The removal call error is not propagated since that would halt the chain.
//
// LOG: Because the error is not propagated and there's a risk of db bloat, the error is logged at warn level
// to alert the user of potential db bloat. This error _should_ never happen user must act and report to us
await removeEagerlyPeristedBlockInputs.call(this, blocks).catch((e) => {
this.logger.warn(
"Error pruning eagerly imported block inputs, DB may grow in size if this error happens frequently",
{slot: blocks.map((block) => block.block.message.slot).join(",")},
e
);
});
throw err;
}
}
function getBlockError(e: unknown, block: allForks.SignedBeaconBlock): BlockError {
if (e instanceof BlockError) {
return e;
} else if (e instanceof Error) {
const blockError = new BlockError(block, {code: BlockErrorCode.BEACON_CHAIN_ERROR, error: e as Error});
blockError.stack = e.stack;
return blockError;
} else {
return new BlockError(block, {code: BlockErrorCode.BEACON_CHAIN_ERROR, error: e as Error});
}
}