From 5d2063d2629f36f07ffd061d1f5252582941f2fc Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 18 Jul 2023 16:59:55 +1000 Subject: [PATCH] Single-pass epoch processing (#4483) --- beacon_node/beacon_chain/src/beacon_chain.rs | 1 + .../beacon_chain/src/block_verification.rs | 11 +- beacon_node/beacon_chain/src/builder.rs | 3 +- .../beacon_chain/src/canonical_head.rs | 6 +- beacon_node/beacon_chain/src/errors.rs | 1 + .../http_api/src/validator_inclusion.rs | 9 +- beacon_node/store/src/errors.rs | 9 +- beacon_node/store/src/hot_cold_store.rs | 11 +- consensus/fork_choice/src/fork_choice.rs | 202 +----- consensus/state_processing/src/all_caches.rs | 55 ++ .../src/common/slash_validator.rs | 6 +- .../update_progressive_balances_cache.rs | 80 +-- consensus/state_processing/src/epoch_cache.rs | 107 ++- consensus/state_processing/src/lib.rs | 2 + .../src/per_block_processing.rs | 6 +- .../process_operations.rs | 22 +- .../src/per_epoch_processing.rs | 12 +- .../src/per_epoch_processing/altair.rs | 67 +- .../altair/inactivity_updates.rs | 71 +- .../altair/justification_and_finalization.rs | 24 +- .../altair/participation_cache.rs | 8 +- .../altair/rewards_and_penalties.rs | 61 +- .../src/per_epoch_processing/base.rs | 6 +- .../src/per_epoch_processing/capella.rs | 86 --- .../effective_balance_updates.rs | 45 +- .../epoch_processing_summary.rs | 192 +++--- .../per_epoch_processing/registry_updates.rs | 16 + .../src/per_epoch_processing/single_pass.rs | 628 ++++++++++++++++++ .../src/per_epoch_processing/slashings.rs | 41 +- .../state_processing/src/upgrade/altair.rs | 1 + .../state_processing/src/upgrade/capella.rs | 1 + .../state_processing/src/upgrade/merge.rs | 1 + consensus/types/src/beacon_state.rs | 85 ++- .../types/src/beacon_state/compact_state.rs | 2 + .../progressive_balances_cache.rs | 218 ++++-- .../types/src/beacon_state/slashings_cache.rs | 63 ++ consensus/types/src/chain_spec.rs | 20 +- consensus/types/src/epoch_cache.rs | 39 +- lcli/src/skip_slots.rs | 3 +- lcli/src/transition_blocks.rs | 17 +- .../ef_tests/src/cases/epoch_processing.rs | 50 +- testing/ef_tests/src/cases/operations.rs | 2 +- 42 files changed, 1559 insertions(+), 731 deletions(-) create mode 100644 consensus/state_processing/src/all_caches.rs create mode 100644 consensus/state_processing/src/per_epoch_processing/single_pass.rs create mode 100644 consensus/types/src/beacon_state/slashings_cache.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index df5d7551df7..3db45e9d636 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -4299,6 +4299,7 @@ impl BeaconChain { let attestation_packing_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES); + state.build_total_active_balance_cache_at(state.current_epoch(), &self.spec)?; let mut prev_filter_cache = HashMap::new(); let prev_attestation_filter = |att: &AttestationRef| { self.filter_op_pool_attestation(&mut prev_filter_cache, att, &state) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index af0bf9e785e..441bca0aeeb 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -78,7 +78,7 @@ use state_processing::{ block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError}, per_block_processing, per_slot_processing, state_advance::partial_state_advance, - BlockProcessingError, BlockSignatureStrategy, ConsensusContext, SlotProcessingError, + AllCaches, BlockProcessingError, BlockSignatureStrategy, ConsensusContext, SlotProcessingError, StateProcessingStrategy, VerifyBlockRoot, }; use std::borrow::Cow; @@ -1712,6 +1712,15 @@ fn load_parent( BeaconChainError::DBInconsistent(format!("Missing state {:?}", parent_state_root)) })?; + if !state.all_caches_built() { + slog::warn!( + chain.log, + "Parent state lacks built caches"; + "block_slot" => block.slot(), + "state_slot" => state.slot(), + ); + } + if block.slot() != state.slot() { slog::warn!( chain.log, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index b8f305d69fa..fe72a1e964d 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -24,6 +24,7 @@ use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold}; use slasher::Slasher; use slog::{crit, error, info, Logger}; use slot_clock::{SlotClock, TestingSlotClock}; +use state_processing::AllCaches; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; @@ -447,7 +448,7 @@ where // Prime all caches before storing the state in the database and computing the tree hash // root. weak_subj_state - .build_caches(&self.spec) + .build_all_caches(&self.spec) .map_err(|e| format!("Error building caches on checkpoint state: {e:?}"))?; let computed_state_root = weak_subj_state diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 7470da30e23..fed632b91d1 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -50,6 +50,7 @@ use itertools::process_results; use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard}; use slog::{crit, debug, error, warn, Logger}; use slot_clock::SlotClock; +use state_processing::AllCaches; use std::sync::Arc; use std::time::Duration; use store::{iter::StateRootsIterator, KeyValueStoreOp, StoreItem}; @@ -666,7 +667,10 @@ impl BeaconChain { // Regardless of where we got the state from, attempt to build all the // caches except the tree hash cache. - new_snapshot.beacon_state.build_caches(&self.spec)?; + new_snapshot + .beacon_state + .build_all_caches(&self.spec) + .map_err(Error::HeadCacheError)?; let new_cached_head = CachedHead { snapshot: Arc::new(new_snapshot), diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index f4bce65164f..37f31ae6c1e 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -53,6 +53,7 @@ pub enum BeaconChainError { SlotClockDidNotStart, NoStateForSlot(Slot), BeaconStateError(BeaconStateError), + HeadCacheError(EpochCacheError), DBInconsistent(String), DBError(store::Error), ForkChoiceError(ForkChoiceError), diff --git a/beacon_node/http_api/src/validator_inclusion.rs b/beacon_node/http_api/src/validator_inclusion.rs index 0e078e7d035..ca26e9ec414 100644 --- a/beacon_node/http_api/src/validator_inclusion.rs +++ b/beacon_node/http_api/src/validator_inclusion.rs @@ -4,11 +4,8 @@ use eth2::{ lighthouse::{GlobalValidatorInclusionData, ValidatorInclusionData}, types::ValidatorId, }; -use state_processing::per_epoch_processing::{ - altair::participation_cache::Error as ParticipationCacheError, process_epoch, - EpochProcessingSummary, -}; -use types::{BeaconState, ChainSpec, Epoch, EthSpec}; +use state_processing::per_epoch_processing::{process_epoch, EpochProcessingSummary}; +use types::{BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec}; /// Returns the state in the last slot of `epoch`. fn end_of_epoch_state( @@ -35,7 +32,7 @@ fn get_epoch_processing_summary( .map_err(|e| warp_utils::reject::custom_server_error(format!("{:?}", e))) } -fn convert_cache_error(error: ParticipationCacheError) -> warp::reject::Rejection { +fn convert_cache_error(error: BeaconStateError) -> warp::reject::Rejection { warp_utils::reject::custom_server_error(format!("{:?}", error)) } diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index cd4f9895b75..43e58732de2 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -3,7 +3,7 @@ use crate::hdiff; use crate::hot_cold_store::HotColdDBError; use ssz::DecodeError; use state_processing::BlockReplayError; -use types::{milhouse, BeaconStateError, Epoch, Hash256, InconsistentFork, Slot}; +use types::{milhouse, BeaconStateError, Epoch, EpochCacheError, Hash256, InconsistentFork, Slot}; pub type Result = std::result::Result; @@ -71,6 +71,7 @@ pub enum Error { Hdiff(hdiff::Error), InconsistentFork(InconsistentFork), ZeroCacheSize, + CacheBuildError(EpochCacheError), } pub trait HandleUnavailable { @@ -141,6 +142,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: EpochCacheError) -> Error { + Error::CacheBuildError(e) + } +} + #[derive(Debug)] pub struct DBError { pub message: String, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 400cc15d20b..190f10064ac 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -30,7 +30,8 @@ use slog::{debug, error, info, warn, Logger}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use state_processing::{ - block_replayer::PreSlotHook, BlockProcessingError, BlockReplayer, SlotProcessingError, + block_replayer::PreSlotHook, AllCaches, BlockProcessingError, BlockReplayer, + SlotProcessingError, }; use std::cmp::min; use std::collections::VecDeque; @@ -1152,7 +1153,7 @@ impl, Cold: ItemStore> HotColdDB let state_cacher_hook = |opt_state_root: Option, state: &mut BeaconState<_>| { // Ensure all caches are built before attempting to cache. state.update_tree_hash_cache()?; - state.build_caches(&self.spec)?; + state.build_all_caches(&self.spec)?; if let Some(state_root) = opt_state_root { // Cache @@ -1246,7 +1247,7 @@ impl, Cold: ItemStore> HotColdDB )?; state.update_tree_hash_cache()?; - state.build_caches(&self.spec)?; + state.build_all_caches(&self.spec)?; } // Apply state diff. Block replay should have ensured that the diff is now applicable. @@ -1280,7 +1281,7 @@ impl, Cold: ItemStore> HotColdDB let tree_hash_ms = t.elapsed().as_millis(); let t = std::time::Instant::now(); - state.build_caches(&self.spec)?; + state.build_all_caches(&self.spec)?; let cache_ms = t.elapsed().as_millis(); debug!( @@ -1358,7 +1359,7 @@ impl, Cold: ItemStore> HotColdDB // Do a tree hash here so that the cache is fully built. state.update_tree_hash_cache()?; - state.build_caches(&self.spec)?; + state.build_all_caches(&self.spec)?; let latest_block_root = state.get_latest_block_root(*state_root); Ok((state, latest_block_root)) diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index b3c474dea36..47543ca11b9 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -1,15 +1,10 @@ use crate::{ForkChoiceStore, InvalidationOperation}; -use per_epoch_processing::altair::participation_cache::Error as ParticipationCacheError; use proto_array::{ Block as ProtoBlock, DisallowedReOrgOffsets, ExecutionStatus, ProposerHeadError, ProposerHeadInfo, ProtoArrayForkChoice, ReOrgThreshold, }; -use slog::{crit, debug, error, warn, Logger}; +use slog::{crit, debug, warn, Logger}; use ssz_derive::{Decode, Encode}; -use state_processing::per_epoch_processing::altair::ParticipationCache; -use state_processing::per_epoch_processing::{ - weigh_justification_and_finalization, JustificationAndFinalizationState, -}; use state_processing::{ per_block_processing::errors::AttesterSlashingValidationError, per_epoch_processing, }; @@ -17,13 +12,13 @@ use std::cmp::Ordering; use std::collections::BTreeSet; use std::marker::PhantomData; use std::time::Duration; +use types::ProgressiveBalancesMode; use types::{ consts::merge::INTERVALS_PER_SLOT, AbstractExecPayload, AttestationShufflingId, AttesterSlashing, BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, Checkpoint, Epoch, EthSpec, ExecPayload, ExecutionBlockHash, Hash256, IndexedAttestation, RelativeEpoch, SignedBeaconBlock, Slot, }; -use types::{ProgressiveBalancesCache, ProgressiveBalancesMode}; #[derive(Debug)] pub enum Error { @@ -77,7 +72,6 @@ pub enum Error { proposer_boost_root: Hash256, }, UnrealizedVoteProcessing(state_processing::EpochProcessingError), - ParticipationCacheError(ParticipationCacheError), ValidatorStatuses(BeaconStateError), ProgressiveBalancesCacheCheckFailed(String), } @@ -106,12 +100,6 @@ impl From for Error { } } -impl From for Error { - fn from(e: ParticipationCacheError) -> Self { - Error::ParticipationCacheError(e) - } -} - #[derive(Debug, Clone, Copy)] /// Controls how fork choice should behave when restoring from a persisted fork choice. pub enum ResetPayloadStatuses { @@ -654,6 +642,7 @@ where /// The supplied block **must** pass the `state_transition` function as it will not be run /// here. #[allow(clippy::too_many_arguments)] + // FIXME(sproul): remove progressive balances mode pub fn on_block>( &mut self, system_time_current_slot: Slot, @@ -662,9 +651,9 @@ where block_delay: Duration, state: &BeaconState, payload_verification_status: PayloadVerificationStatus, - progressive_balances_mode: ProgressiveBalancesMode, + _progressive_balances_mode: ProgressiveBalancesMode, spec: &ChainSpec, - log: &Logger, + _log: &Logger, ) -> Result<(), Error> { // If this block has already been processed we do not need to reprocess it. // We check this immediately in case re-processing the block mutates some property of the @@ -758,78 +747,37 @@ where parent_justified.epoch == block_epoch && parent_finalized.epoch + 1 >= block_epoch }); - let (unrealized_justified_checkpoint, unrealized_finalized_checkpoint) = if let Some(( - parent_justified, - parent_finalized, - )) = - parent_checkpoints - { - (parent_justified, parent_finalized) - } else { - let justification_and_finalization_state = match block { - BeaconBlockRef::Capella(_) - | BeaconBlockRef::Merge(_) - | BeaconBlockRef::Altair(_) => match progressive_balances_mode { - ProgressiveBalancesMode::Disabled => { - let participation_cache = ParticipationCache::new(state, spec)?; - per_epoch_processing::altair::process_justification_and_finalization( + let (unrealized_justified_checkpoint, unrealized_finalized_checkpoint) = + if let Some((parent_justified, parent_finalized)) = parent_checkpoints { + (parent_justified, parent_finalized) + } else { + let justification_and_finalization_state = match block { + BeaconBlockRef::Capella(_) + | BeaconBlockRef::Merge(_) + | BeaconBlockRef::Altair(_) => { + // FIXME(sproul): initialize progressive balances + per_epoch_processing::altair::process_justification_and_finalization(state)? + } + BeaconBlockRef::Base(_) => { + let mut validator_statuses = + per_epoch_processing::base::ValidatorStatuses::new(state, spec) + .map_err(Error::ValidatorStatuses)?; + validator_statuses + .process_attestations(state) + .map_err(Error::ValidatorStatuses)?; + per_epoch_processing::base::process_justification_and_finalization( state, - &participation_cache, + &validator_statuses.total_balances, + spec, )? } - ProgressiveBalancesMode::Fast - | ProgressiveBalancesMode::Checked - | ProgressiveBalancesMode::Strict => { - let maybe_participation_cache = progressive_balances_mode - .perform_comparative_checks() - .then(|| ParticipationCache::new(state, spec)) - .transpose()?; - - process_justification_and_finalization_from_progressive_cache::( - state, - maybe_participation_cache.as_ref(), - ) - .or_else(|e| { - if progressive_balances_mode != ProgressiveBalancesMode::Strict { - error!( - log, - "Processing with progressive balances cache failed"; - "info" => "falling back to the non-optimized processing method", - "error" => ?e, - ); - let participation_cache = maybe_participation_cache - .map(Ok) - .unwrap_or_else(|| ParticipationCache::new(state, spec))?; - per_epoch_processing::altair::process_justification_and_finalization( - state, - &participation_cache, - ).map_err(Error::from) - } else { - Err(e) - } - })? - } - }, - BeaconBlockRef::Base(_) => { - let mut validator_statuses = - per_epoch_processing::base::ValidatorStatuses::new(state, spec) - .map_err(Error::ValidatorStatuses)?; - validator_statuses - .process_attestations(state) - .map_err(Error::ValidatorStatuses)?; - per_epoch_processing::base::process_justification_and_finalization( - state, - &validator_statuses.total_balances, - spec, - )? - } - }; + }; - ( - justification_and_finalization_state.current_justified_checkpoint(), - justification_and_finalization_state.finalized_checkpoint(), - ) - }; + ( + justification_and_finalization_state.current_justified_checkpoint(), + justification_and_finalization_state.finalized_checkpoint(), + ) + }; // Update best known unrealized justified & finalized checkpoints if unrealized_justified_checkpoint.epoch @@ -1556,92 +1504,6 @@ where } } -/// Process justification and finalization using progressive cache. Also performs a comparative -/// check against the `ParticipationCache` if it is supplied. -/// -/// Returns an error if the cache is not initialized or if there is a mismatch on the comparative check. -fn process_justification_and_finalization_from_progressive_cache( - state: &BeaconState, - maybe_participation_cache: Option<&ParticipationCache>, -) -> Result, Error> -where - E: EthSpec, - T: ForkChoiceStore, -{ - let justification_and_finalization_state = JustificationAndFinalizationState::new(state); - if state.current_epoch() <= E::genesis_epoch() + 1 { - return Ok(justification_and_finalization_state); - } - - // Load cached balances - let progressive_balances_cache: &ProgressiveBalancesCache = state.progressive_balances_cache(); - let previous_target_balance = - progressive_balances_cache.previous_epoch_target_attesting_balance()?; - let current_target_balance = - progressive_balances_cache.current_epoch_target_attesting_balance()?; - let total_active_balance = state.get_total_active_balance()?; - - if let Some(participation_cache) = maybe_participation_cache { - check_progressive_balances::( - state, - participation_cache, - previous_target_balance, - current_target_balance, - total_active_balance, - )?; - } - - weigh_justification_and_finalization( - justification_and_finalization_state, - total_active_balance, - previous_target_balance, - current_target_balance, - ) - .map_err(Error::from) -} - -/// Perform comparative checks against `ParticipationCache`, will return error if there's a mismatch. -fn check_progressive_balances( - state: &BeaconState, - participation_cache: &ParticipationCache, - cached_previous_target_balance: u64, - cached_current_target_balance: u64, - cached_total_active_balance: u64, -) -> Result<(), Error> -where - E: EthSpec, - T: ForkChoiceStore, -{ - let slot = state.slot(); - let epoch = state.current_epoch(); - - // Check previous epoch target balances - let previous_target_balance = participation_cache.previous_epoch_target_attesting_balance()?; - if previous_target_balance != cached_previous_target_balance { - return Err(Error::ProgressiveBalancesCacheCheckFailed( - format!("Previous epoch target attesting balance mismatch, slot: {}, epoch: {}, actual: {}, cached: {}", slot, epoch, previous_target_balance, cached_previous_target_balance) - )); - } - - // Check current epoch target balances - let current_target_balance = participation_cache.current_epoch_target_attesting_balance()?; - if current_target_balance != cached_current_target_balance { - return Err(Error::ProgressiveBalancesCacheCheckFailed( - format!("Current epoch target attesting balance mismatch, slot: {}, epoch: {}, actual: {}, cached: {}", slot, epoch, current_target_balance, cached_current_target_balance) - )); - } - - // Check current epoch total balances - let total_active_balance = participation_cache.current_epoch_total_active_balance(); - if total_active_balance != cached_total_active_balance { - return Err(Error::ProgressiveBalancesCacheCheckFailed( - format!("Current epoch total active balance mismatch, slot: {}, epoch: {}, actual: {}, cached: {}", slot, epoch, total_active_balance, cached_total_active_balance) - )); - } - - Ok(()) -} - /// Helper struct that is used to encode/decode the state of the `ForkChoice` as SSZ bytes. /// /// This is used when persisting the state of the fork choice to disk. diff --git a/consensus/state_processing/src/all_caches.rs b/consensus/state_processing/src/all_caches.rs new file mode 100644 index 00000000000..36a5c68b253 --- /dev/null +++ b/consensus/state_processing/src/all_caches.rs @@ -0,0 +1,55 @@ +use crate::common::update_progressive_balances_cache::initialize_progressive_balances_cache; +use crate::epoch_cache::initialize_epoch_cache; +use types::{BeaconState, ChainSpec, EpochCacheError, EthSpec, Hash256, RelativeEpoch}; + +/// Mixin trait for the beacon state that provides operations on *all* caches. +/// +/// The reason this trait exists here away from `BeaconState` itself is that some caches are +/// computed by functions in `state_processing`. +pub trait AllCaches { + /// Build all caches. + /// + /// Note that this excludes milhouse's intrinsic tree-hash cache. That needs to be managed + /// separately. + fn build_all_caches(&mut self, spec: &ChainSpec) -> Result<(), EpochCacheError>; + + /// Return true if all caches are built. + /// + /// Note that this excludes milhouse's intrinsic tree-hash cache. That needs to be managed + /// separately. + fn all_caches_built(&self) -> bool; +} + +impl AllCaches for BeaconState { + fn build_all_caches(&mut self, spec: &ChainSpec) -> Result<(), EpochCacheError> { + self.build_caches(spec)?; + initialize_epoch_cache(self, spec)?; + initialize_progressive_balances_cache(self, None, spec)?; + Ok(()) + } + + fn all_caches_built(&self) -> bool { + let current_epoch = self.current_epoch(); + let epoch_cache_decision_block_root = + if let Ok(root) = self.proposer_shuffling_decision_root(Hash256::zero()) { + root + } else { + return false; + }; + self.get_total_active_balance_at_epoch(current_epoch) + .is_ok() + && self.committee_cache_is_initialized(RelativeEpoch::Previous) + && self.committee_cache_is_initialized(RelativeEpoch::Current) + && self.committee_cache_is_initialized(RelativeEpoch::Next) + && self + .progressive_balances_cache() + .is_initialized_at(current_epoch) + && self.pubkey_cache().len() == self.validators().len() + && self.exit_cache().check_initialized().is_ok() + && self.slashings_cache_is_initialized() + && self + .epoch_cache() + .check_validity::(current_epoch, epoch_cache_decision_block_root) + .is_ok() + } +} diff --git a/consensus/state_processing/src/common/slash_validator.rs b/consensus/state_processing/src/common/slash_validator.rs index b3b70b919e6..6e657fefca6 100644 --- a/consensus/state_processing/src/common/slash_validator.rs +++ b/consensus/state_processing/src/common/slash_validator.rs @@ -20,6 +20,7 @@ pub fn slash_validator( spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { let epoch = state.current_epoch(); + let latest_block_slot = state.latest_block_header().slot; initiate_validator_exit(state, slashed_index, spec)?; @@ -44,7 +45,10 @@ pub fn slash_validator( .safe_div(spec.min_slashing_penalty_quotient_for_state(state))?, )?; - update_progressive_balances_on_slashing(state, slashed_index)?; + update_progressive_balances_on_slashing(state, slashed_index, validator_effective_balance)?; + state + .slashings_cache_mut() + .record_validator_slashing(latest_block_slot, slashed_index)?; // Apply proposer and whistleblower rewards let proposer_index = ctxt.get_proposer_index(state, spec)? as usize; diff --git a/consensus/state_processing/src/common/update_progressive_balances_cache.rs b/consensus/state_processing/src/common/update_progressive_balances_cache.rs index 1d84ea5f1e8..37e00ad6ba9 100644 --- a/consensus/state_processing/src/common/update_progressive_balances_cache.rs +++ b/consensus/state_processing/src/common/update_progressive_balances_cache.rs @@ -7,10 +7,9 @@ use crate::per_epoch_processing::altair::ParticipationCache; use crate::{BlockProcessingError, EpochProcessingError}; use lighthouse_metrics::set_gauge; use std::borrow::Cow; -use types::consts::altair::TIMELY_TARGET_FLAG_INDEX; use types::{ - is_progressive_balances_enabled, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, - ParticipationFlags, ProgressiveBalancesCache, VList, + is_progressive_balances_enabled, BeaconState, BeaconStateError, ChainSpec, Epoch, + EpochTotalBalances, EthSpec, ProgressiveBalancesCache, }; /// Initializes the `ProgressiveBalancesCache` cache using balance values from the @@ -27,6 +26,7 @@ pub fn initialize_progressive_balances_cache( return Ok(()); } + // FIXME(sproul): simplify the participation cache let participation_cache = match maybe_participation_cache { Some(cache) => Cow::Borrowed(cache), None => { @@ -38,19 +38,22 @@ pub fn initialize_progressive_balances_cache( } }; - let previous_epoch_target_attesting_balance = participation_cache - .previous_epoch_target_attesting_balance_raw() - .map_err(|e| BeaconStateError::ParticipationCacheError(format!("{e:?}")))?; - - let current_epoch_target_attesting_balance = participation_cache - .current_epoch_target_attesting_balance_raw() - .map_err(|e| BeaconStateError::ParticipationCacheError(format!("{e:?}")))?; - let current_epoch = state.current_epoch(); + let previous_epoch_cache = EpochTotalBalances { + total_flag_balances: participation_cache + .previous_epoch_participation + .total_flag_balances, + }; + let current_epoch_cache = EpochTotalBalances { + total_flag_balances: participation_cache + .current_epoch_participation + .total_flag_balances, + }; + state.progressive_balances_cache_mut().initialize( current_epoch, - previous_epoch_target_attesting_balance, - current_epoch_target_attesting_balance, + previous_epoch_cache, + current_epoch_cache, ); update_progressive_balances_metrics(state.progressive_balances_cache())?; @@ -62,16 +65,16 @@ pub fn initialize_progressive_balances_cache( pub fn update_progressive_balances_on_attestation( state: &mut BeaconState, epoch: Epoch, - validator_index: usize, + flag_index: usize, + validator_effective_balance: u64, + validator_slashed: bool, ) -> Result<(), BlockProcessingError> { - if is_progressive_balances_enabled(state) { - let validator = state.get_validator(validator_index)?; - if !validator.slashed() { - let validator_effective_balance = validator.effective_balance(); - state - .progressive_balances_cache_mut() - .on_new_target_attestation(epoch, validator_effective_balance)?; - } + if is_progressive_balances_enabled(state) && !validator_slashed { + state.progressive_balances_cache_mut().on_new_attestation( + epoch, + flag_index, + validator_effective_balance, + )?; } Ok(()) } @@ -80,21 +83,22 @@ pub fn update_progressive_balances_on_attestation( pub fn update_progressive_balances_on_slashing( state: &mut BeaconState, validator_index: usize, + validator_effective_balance: u64, ) -> Result<(), BlockProcessingError> { if is_progressive_balances_enabled(state) { - let previous_epoch_participation = state.previous_epoch_participation()?; - let is_previous_epoch_target_attester = - is_target_attester_in_epoch::(previous_epoch_participation, validator_index)?; - - let current_epoch_participation = state.current_epoch_participation()?; - let is_current_epoch_target_attester = - is_target_attester_in_epoch::(current_epoch_participation, validator_index)?; + let previous_epoch_participation = *state + .previous_epoch_participation()? + .get(validator_index) + .ok_or(BeaconStateError::UnknownValidator(validator_index))?; - let validator_effective_balance = state.get_effective_balance(validator_index)?; + let current_epoch_participation = *state + .current_epoch_participation()? + .get(validator_index) + .ok_or(BeaconStateError::UnknownValidator(validator_index))?; state.progressive_balances_cache_mut().on_slashing( - is_previous_epoch_target_attester, - is_current_epoch_target_attester, + previous_epoch_participation, + current_epoch_participation, validator_effective_balance, )?; } @@ -133,15 +137,3 @@ pub fn update_progressive_balances_metrics( Ok(()) } - -fn is_target_attester_in_epoch( - epoch_participation: &VList, - validator_index: usize, -) -> Result { - let participation_flags = epoch_participation - .get(validator_index) - .ok_or(BeaconStateError::UnknownValidator(validator_index))?; - participation_flags - .has_flag(TIMELY_TARGET_FLAG_INDEX) - .map_err(|e| e.into()) -} diff --git a/consensus/state_processing/src/epoch_cache.rs b/consensus/state_processing/src/epoch_cache.rs index b26252e51a5..308b8327932 100644 --- a/consensus/state_processing/src/epoch_cache.rs +++ b/consensus/state_processing/src/epoch_cache.rs @@ -1,14 +1,85 @@ use crate::common::altair::BaseRewardPerIncrement; use crate::common::base::SqrtTotalActiveBalance; use crate::common::{altair, base}; +use safe_arith::SafeArith; use types::epoch_cache::{EpochCache, EpochCacheError, EpochCacheKey}; -use types::{ActivationQueue, BeaconState, ChainSpec, Epoch, EthSpec, Hash256}; +use types::{ActivationQueue, BeaconState, ChainSpec, EthSpec, Hash256}; + +/// Precursor to an `EpochCache`. +pub struct PreEpochCache { + epoch_key: EpochCacheKey, + effective_balances: Vec, +} + +impl PreEpochCache { + pub fn new_for_next_epoch(state: &BeaconState) -> Result { + // The decision block root for the next epoch is the latest block root from this epoch. + let latest_block_header = state.latest_block_header(); + + // State root should already have been filled in by `process_slot`, except in the case + // of a `partial_state_advance`. + let decision_block_root = latest_block_header.canonical_root(); + + let epoch_key = EpochCacheKey { + epoch: state.next_epoch()?, + decision_block_root, + }; + + Ok(Self { + epoch_key, + effective_balances: Vec::with_capacity(state.validators().len()), + }) + } + + pub fn push_effective_balance(&mut self, effective_balance: u64) { + self.effective_balances.push(effective_balance); + } + + pub fn into_epoch_cache( + self, + total_active_balance: u64, + activation_queue: ActivationQueue, + spec: &ChainSpec, + ) -> Result { + let epoch = self.epoch_key.epoch; + let sqrt_total_active_balance = SqrtTotalActiveBalance::new(total_active_balance); + let base_reward_per_increment = BaseRewardPerIncrement::new(total_active_balance, spec)?; + + let effective_balance_increment = spec.effective_balance_increment; + let max_effective_balance_eth = spec + .max_effective_balance + .safe_div(effective_balance_increment)?; + + let mut base_rewards = Vec::with_capacity(max_effective_balance_eth.safe_add(1)? as usize); + + for effective_balance_eth in 0..=max_effective_balance_eth { + let effective_balance = effective_balance_eth.safe_mul(effective_balance_increment)?; + let base_reward = if spec + .altair_fork_epoch + .map_or(false, |altair_epoch| epoch < altair_epoch) + { + base::get_base_reward(effective_balance, sqrt_total_active_balance, spec)? + } else { + altair::get_base_reward(effective_balance, base_reward_per_increment, spec)? + }; + base_rewards.push(base_reward); + } + + Ok(EpochCache::new( + self.epoch_key, + self.effective_balances, + base_rewards, + activation_queue, + spec, + )) + } +} pub fn initialize_epoch_cache( state: &mut BeaconState, - epoch: Epoch, spec: &ChainSpec, ) -> Result<(), EpochCacheError> { + let epoch = state.current_epoch(); let epoch_cache: &EpochCache = state.epoch_cache(); let decision_block_root = state .proposer_shuffling_decision_root(Hash256::zero()) @@ -22,42 +93,30 @@ pub fn initialize_epoch_cache( return Ok(()); } - // Compute base rewards. state.build_total_active_balance_cache_at(epoch, spec)?; let total_active_balance = state.get_total_active_balance_at_epoch(epoch)?; - let sqrt_total_active_balance = SqrtTotalActiveBalance::new(total_active_balance); - let base_reward_per_increment = BaseRewardPerIncrement::new(total_active_balance, spec)?; - - let mut base_rewards = Vec::with_capacity(state.validators().len()); - // Compute activation queue. + // Collect effective balances and compute activation queue. + let mut effective_balances = Vec::with_capacity(state.validators().len()); let mut activation_queue = ActivationQueue::default(); for (index, validator) in state.validators().iter().enumerate() { - let effective_balance = validator.effective_balance(); - - let base_reward = if spec - .altair_fork_epoch - .map_or(false, |altair_epoch| epoch < altair_epoch) - { - base::get_base_reward(effective_balance, sqrt_total_active_balance, spec)? - } else { - altair::get_base_reward(effective_balance, base_reward_per_increment, spec)? - }; - base_rewards.push(base_reward); + effective_balances.push(validator.effective_balance()); // Add to speculative activation queue. activation_queue.add_if_could_be_eligible_for_activation(index, validator, epoch, spec); } - *state.epoch_cache_mut() = EpochCache::new( - EpochCacheKey { + // Compute base rewards. + let pre_epoch_cache = PreEpochCache { + epoch_key: EpochCacheKey { epoch, decision_block_root, }, - base_rewards, - activation_queue, - ); + effective_balances, + }; + *state.epoch_cache_mut() = + pre_epoch_cache.into_epoch_cache(total_active_balance, activation_queue, spec)?; Ok(()) } diff --git a/consensus/state_processing/src/lib.rs b/consensus/state_processing/src/lib.rs index 9c8bde8c466..92def937ddc 100644 --- a/consensus/state_processing/src/lib.rs +++ b/consensus/state_processing/src/lib.rs @@ -16,6 +16,7 @@ mod macros; mod metrics; +pub mod all_caches; pub mod block_replayer; pub mod common; pub mod consensus_context; @@ -28,6 +29,7 @@ pub mod state_advance; pub mod upgrade; pub mod verify_operation; +pub use all_caches::AllCaches; pub use block_replayer::{BlockReplayError, BlockReplayer, StateProcessingStrategy}; pub use consensus_context::{ConsensusContext, ContextError}; pub use genesis::{ diff --git a/consensus/state_processing/src/per_block_processing.rs b/consensus/state_processing/src/per_block_processing.rs index 3077af831fd..524c458e53e 100644 --- a/consensus/state_processing/src/per_block_processing.rs +++ b/consensus/state_processing/src/per_block_processing.rs @@ -119,8 +119,9 @@ pub fn per_block_processing>( .map_err(BlockProcessingError::InconsistentStateFork)?; // Build epoch cache if it hasn't already been built, or if it is no longer valid - initialize_epoch_cache(state, state.current_epoch(), spec)?; + initialize_epoch_cache(state, spec)?; initialize_progressive_balances_cache(state, None, spec)?; + state.build_slashings_cache()?; let verify_signatures = match block_signature_strategy { BlockSignatureStrategy::VerifyBulk => { @@ -242,6 +243,9 @@ pub fn process_block_header( ); } + state + .slashings_cache_mut() + .update_latest_block_slot(block_header.slot); *state.latest_block_header_mut() = block_header; // Verify proposer is not slashed diff --git a/consensus/state_processing/src/per_block_processing/process_operations.rs b/consensus/state_processing/src/per_block_processing/process_operations.rs index f8474c47095..0c65b38e992 100644 --- a/consensus/state_processing/src/per_block_processing/process_operations.rs +++ b/consensus/state_processing/src/per_block_processing/process_operations.rs @@ -98,7 +98,6 @@ pub mod base { pub mod altair { use super::*; use crate::common::update_progressive_balances_cache::update_progressive_balances_on_attestation; - use types::consts::altair::TIMELY_TARGET_FLAG_INDEX; pub fn process_attestations( state: &mut BeaconState, @@ -151,6 +150,9 @@ pub mod altair { for index in &attesting_indices { let index = *index as usize; + let validator_effective_balance = state.epoch_cache().get_effective_balance(index)?; + let validator_slashed = state.slashings_cache().is_slashed(index); + for (flag_index, &weight) in PARTICIPATION_FLAG_WEIGHTS.iter().enumerate() { let epoch_participation = state.get_epoch_participation_mut( data.target.epoch, @@ -168,13 +170,13 @@ pub mod altair { proposer_reward_numerator .safe_add_assign(state.get_base_reward(index)?.safe_mul(weight)?)?; - if flag_index == TIMELY_TARGET_FLAG_INDEX { - update_progressive_balances_on_attestation( - state, - data.target.epoch, - index, - )?; - } + update_progressive_balances_on_attestation( + state, + data.target.epoch, + flag_index, + validator_effective_balance, + validator_slashed, + )?; } } } @@ -201,6 +203,8 @@ pub fn process_proposer_slashings( ctxt: &mut ConsensusContext, spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { + state.build_slashings_cache()?; + // Verify and apply proposer slashings in series. // We have to verify in series because an invalid block may contain multiple slashings // for the same validator, and we need to correctly detect and reject that. @@ -234,6 +238,8 @@ pub fn process_attester_slashings( ctxt: &mut ConsensusContext, spec: &ChainSpec, ) -> Result<(), BlockProcessingError> { + state.build_slashings_cache()?; + for (i, attester_slashing) in attester_slashings.iter().enumerate() { verify_attester_slashing(state, attester_slashing, verify_signatures, spec) .map_err(|e| e.into_with_index(i))?; diff --git a/consensus/state_processing/src/per_epoch_processing.rs b/consensus/state_processing/src/per_epoch_processing.rs index 26324b454c7..c34508b26ac 100644 --- a/consensus/state_processing/src/per_epoch_processing.rs +++ b/consensus/state_processing/src/per_epoch_processing.rs @@ -1,14 +1,14 @@ #![deny(clippy::wildcard_imports)] use crate::metrics; -pub use epoch_processing_summary::EpochProcessingSummary; +pub use epoch_processing_summary::{EpochProcessingSummary, ParticipationEpochSummary}; use errors::EpochProcessingError as Error; pub use justification_and_finalization_state::JustificationAndFinalizationState; use safe_arith::SafeArith; use types::{BeaconState, ChainSpec, EthSpec}; -pub use registry_updates::process_registry_updates; -pub use slashings::process_slashings; +pub use registry_updates::{process_registry_updates, process_registry_updates_slow}; +pub use slashings::{process_slashings, process_slashings_slow}; pub use weigh_justification_and_finalization::weigh_justification_and_finalization; pub mod altair; @@ -21,6 +21,7 @@ pub mod historical_roots_update; pub mod justification_and_finalization_state; pub mod registry_updates; pub mod resets; +pub mod single_pass; pub mod slashings; pub mod tests; pub mod weigh_justification_and_finalization; @@ -42,8 +43,9 @@ pub fn process_epoch( match state { BeaconState::Base(_) => base::process_epoch(state, spec), - BeaconState::Altair(_) | BeaconState::Merge(_) => altair::process_epoch(state, spec), - BeaconState::Capella(_) => capella::process_epoch(state, spec), + BeaconState::Altair(_) | BeaconState::Merge(_) | BeaconState::Capella(_) => { + altair::process_epoch(state, spec) + } } } diff --git a/consensus/state_processing/src/per_epoch_processing/altair.rs b/consensus/state_processing/src/per_epoch_processing/altair.rs index b236b5780a0..f5905aaa351 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair.rs @@ -1,18 +1,19 @@ -use super::{process_registry_updates, process_slashings, EpochProcessingSummary, Error}; +use super::{EpochProcessingSummary, Error}; use crate::common::update_progressive_balances_cache::{ initialize_progressive_balances_cache, update_progressive_balances_on_epoch_transition, }; use crate::epoch_cache::initialize_epoch_cache; +use crate::per_epoch_processing::single_pass::{process_epoch_single_pass, SinglePassConfig}; use crate::per_epoch_processing::{ - effective_balance_updates::process_effective_balance_updates, + capella::process_historical_summaries_update, historical_roots_update::process_historical_roots_update, resets::{process_eth1_data_reset, process_randao_mixes_reset, process_slashings_reset}, }; -pub use inactivity_updates::process_inactivity_updates; +pub use inactivity_updates::process_inactivity_updates_slow; pub use justification_and_finalization::process_justification_and_finalization; pub use participation_cache::ParticipationCache; pub use participation_flag_updates::process_participation_flag_updates; -pub use rewards_and_penalties::process_rewards_and_penalties; +pub use rewards_and_penalties::process_rewards_and_penalties_slow; pub use sync_committee_updates::process_sync_committee_updates; use types::{BeaconState, ChainSpec, EthSpec, RelativeEpoch}; @@ -27,53 +28,51 @@ pub fn process_epoch( state: &mut BeaconState, spec: &ChainSpec, ) -> Result, Error> { - // Ensure the committee caches are built. + // Ensure the required caches are built. state.build_committee_cache(RelativeEpoch::Previous, spec)?; state.build_committee_cache(RelativeEpoch::Current, spec)?; state.build_committee_cache(RelativeEpoch::Next, spec)?; state.build_total_active_balance_cache_at(state.current_epoch(), spec)?; - initialize_epoch_cache(state, state.current_epoch(), spec)?; + initialize_epoch_cache(state, spec)?; + initialize_progressive_balances_cache::(state, None, spec)?; - // Pre-compute participating indices and total balances. - let mut participation_cache = ParticipationCache::new(state, spec)?; let sync_committee = state.current_sync_committee()?.clone(); - initialize_progressive_balances_cache::(state, Some(&participation_cache), spec)?; // Justification and finalization. - let justification_and_finalization_state = - process_justification_and_finalization(state, &participation_cache)?; + let justification_and_finalization_state = process_justification_and_finalization(state)?; justification_and_finalization_state.apply_changes_to_state(state); - process_inactivity_updates(state, &mut participation_cache, spec)?; - - // Rewards and Penalties. - process_rewards_and_penalties(state, &participation_cache, spec)?; - - // Registry Updates. - process_registry_updates(state, spec)?; - - // Slashings. - process_slashings( - state, - Some(participation_cache.process_slashings_indices()), - participation_cache.current_epoch_total_active_balance(), - spec, - )?; + // In a single pass: + // - Inactivity updates + // - Rewards and penalties + // - Registry updates + // - Slashings + // - Effective balance updates + // + // The `process_eth1_data_reset` is not covered in the single pass, but happens afterwards + // without loss of correctness. + let current_epoch_progressive_balances = state.progressive_balances_cache().clone(); + let current_epoch_total_active_balance = state.get_total_active_balance()?; + let participation_summary = + process_epoch_single_pass(state, spec, SinglePassConfig::default())?; // Reset eth1 data votes. process_eth1_data_reset(state)?; - // Update effective balances with hysteresis (lag). - process_effective_balance_updates(state, Some(&participation_cache), spec)?; - // Reset slashings process_slashings_reset(state)?; // Set randao mix process_randao_mixes_reset(state)?; - // Set historical root accumulator - process_historical_roots_update(state)?; + // Set historical summaries accumulator + if state.historical_summaries().is_ok() { + // Post-Capella. + process_historical_summaries_update(state)?; + } else { + // Pre-Capella + process_historical_roots_update(state)?; + } // Rotate current/previous epoch participation process_participation_flag_updates(state)?; @@ -82,12 +81,12 @@ pub fn process_epoch( // Rotate the epoch caches to suit the epoch transition. state.advance_caches(spec)?; - initialize_epoch_cache(state, state.next_epoch()?, spec)?; - update_progressive_balances_on_epoch_transition(state, spec)?; Ok(EpochProcessingSummary::Altair { - participation_cache, + progressive_balances: current_epoch_progressive_balances, + current_epoch_total_active_balance, + participation: participation_summary, sync_committee, }) } diff --git a/consensus/state_processing/src/per_epoch_processing/altair/inactivity_updates.rs b/consensus/state_processing/src/per_epoch_processing/altair/inactivity_updates.rs index f568f5fc988..592ffc09616 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair/inactivity_updates.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair/inactivity_updates.rs @@ -1,68 +1,23 @@ -use super::ParticipationCache; +use crate::per_epoch_processing::single_pass::{process_epoch_single_pass, SinglePassConfig}; use crate::EpochProcessingError; -use safe_arith::SafeArith; -use std::cmp::min; use types::beacon_state::BeaconState; use types::chain_spec::ChainSpec; -use types::consts::altair::TIMELY_TARGET_FLAG_INDEX; use types::eth_spec::EthSpec; -pub fn process_inactivity_updates( +/// Slow version of `process_inactivity_updates`. +/// +/// Should only be used for testing. +pub fn process_inactivity_updates_slow( state: &mut BeaconState, - participation_cache: &mut ParticipationCache, spec: &ChainSpec, ) -> Result<(), EpochProcessingError> { - let previous_epoch = state.previous_epoch(); - // Score updates based on previous epoch participation, skip genesis epoch - if state.current_epoch() == T::genesis_epoch() { - return Ok(()); - } - - // Fast path: inactivity scores have already been pre-computed. - if let Some(inactivity_score_updates) = participation_cache.inactivity_score_updates.take() { - // We need to flush the existing inactivity scores in case tree hashing hasn't happened in - // a long time (e.g. during state reconstruction). - // FIXME(sproul): re-think this - state.inactivity_scores_mut()?.apply_updates()?; - state - .inactivity_scores_mut()? - .bulk_update(inactivity_score_updates)?; - return Ok(()); - } - - let is_in_inactivity_leak = state.is_in_inactivity_leak(previous_epoch, spec); - - let mut inactivity_scores = state.inactivity_scores_mut()?.iter_cow(); - - while let Some((index, inactivity_score)) = inactivity_scores.next_cow() { - let validator = match participation_cache.get_validator(index) { - Ok(val) if val.is_eligible => val, - _ => continue, - }; - - let inactivity_score_mut; - - // Increase inactivity score of inactive validators - if validator.is_unslashed_participating_index(TIMELY_TARGET_FLAG_INDEX)? { - // Avoid mutating when the inactivity score is 0 and can't go any lower -- the common - // case. - if *inactivity_score == 0 { - continue; - } - inactivity_score_mut = inactivity_score.to_mut(); - inactivity_score_mut.safe_sub_assign(1)?; - } else { - inactivity_score_mut = inactivity_score.to_mut(); - inactivity_score_mut.safe_add_assign(spec.inactivity_score_bias)?; - } - - // Decrease the score of all validators for forgiveness when not during a leak - if !is_in_inactivity_leak { - inactivity_score_mut.safe_sub_assign(min( - spec.inactivity_score_recovery_rate, - *inactivity_score_mut, - ))?; - } - } + process_epoch_single_pass( + state, + spec, + SinglePassConfig { + inactivity_updates: true, + ..SinglePassConfig::disable_all() + }, + )?; Ok(()) } diff --git a/consensus/state_processing/src/per_epoch_processing/altair/justification_and_finalization.rs b/consensus/state_processing/src/per_epoch_processing/altair/justification_and_finalization.rs index 9c619e57702..61b5c1ed5ab 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair/justification_and_finalization.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair/justification_and_finalization.rs @@ -1,4 +1,3 @@ -use super::ParticipationCache; use crate::per_epoch_processing::Error; use crate::per_epoch_processing::{ weigh_justification_and_finalization, JustificationAndFinalizationState, @@ -6,20 +5,23 @@ use crate::per_epoch_processing::{ use safe_arith::SafeArith; use types::{BeaconState, EthSpec}; -/// Update the justified and finalized checkpoints for matching target attestations. -pub fn process_justification_and_finalization( - state: &BeaconState, - participation_cache: &ParticipationCache, -) -> Result, Error> { +/// Process justification and finalization using the progressive balances cache. +pub fn process_justification_and_finalization( + state: &BeaconState, +) -> Result, Error> { let justification_and_finalization_state = JustificationAndFinalizationState::new(state); - - if state.current_epoch() <= T::genesis_epoch().safe_add(1)? { + if state.current_epoch() <= E::genesis_epoch().safe_add(1)? { return Ok(justification_and_finalization_state); } - let total_active_balance = participation_cache.current_epoch_total_active_balance(); - let previous_target_balance = participation_cache.previous_epoch_target_attesting_balance()?; - let current_target_balance = participation_cache.current_epoch_target_attesting_balance()?; + // Load cached balances + let progressive_balances_cache = state.progressive_balances_cache(); + let previous_target_balance = + progressive_balances_cache.previous_epoch_target_attesting_balance()?; + let current_target_balance = + progressive_balances_cache.current_epoch_target_attesting_balance()?; + let total_active_balance = state.get_total_active_balance()?; + weigh_justification_and_finalization( justification_and_finalization_state, total_active_balance, diff --git a/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs b/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs index 9fb4813e78d..ae975aaba5c 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs @@ -49,12 +49,12 @@ impl From for Error { /// Caches the participation values for one epoch (either the previous or current). #[derive(PartialEq, Debug, Clone)] -struct SingleEpochParticipationCache { +pub(crate) struct SingleEpochParticipationCache { /// Stores the sum of the balances for all validators in `self.unslashed_participating_indices` /// for all flags in `NUM_FLAG_INDICES`. /// /// A flag balance is only incremented if a validator is in that flag set. - total_flag_balances: [Balance; NUM_FLAG_INDICES], + pub(crate) total_flag_balances: [Balance; NUM_FLAG_INDICES], /// Stores the sum of all balances of all validators in `self.unslashed_participating_indices` /// (regardless of which flags are set). total_active_balance: Balance, @@ -170,10 +170,10 @@ impl ValidatorInfoCache { pub struct ParticipationCache { current_epoch: Epoch, /// Caches information about active validators pertaining to `self.current_epoch`. - current_epoch_participation: SingleEpochParticipationCache, + pub(crate) current_epoch_participation: SingleEpochParticipationCache, previous_epoch: Epoch, /// Caches information about active validators pertaining to `self.previous_epoch`. - previous_epoch_participation: SingleEpochParticipationCache, + pub(crate) previous_epoch_participation: SingleEpochParticipationCache, /// Caches validator information relevant to `process_epoch`. validators: ValidatorInfoCache, /// Caches the result of the `get_eligible_validator_indices` function. diff --git a/consensus/state_processing/src/per_epoch_processing/altair/rewards_and_penalties.rs b/consensus/state_processing/src/per_epoch_processing/altair/rewards_and_penalties.rs index 19987f01538..a4a7c8b97e6 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair/rewards_and_penalties.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair/rewards_and_penalties.rs @@ -1,61 +1,30 @@ use super::ParticipationCache; +use crate::per_epoch_processing::{ + single_pass::{process_epoch_single_pass, SinglePassConfig}, + Delta, Error, +}; use safe_arith::SafeArith; use types::consts::altair::{ PARTICIPATION_FLAG_WEIGHTS, TIMELY_HEAD_FLAG_INDEX, TIMELY_TARGET_FLAG_INDEX, WEIGHT_DENOMINATOR, }; -use types::{BeaconState, BeaconStateError, ChainSpec, EthSpec}; - -use crate::common::{decrease_balance_directly, increase_balance_directly}; -use crate::per_epoch_processing::{Delta, Error}; +use types::{BeaconState, ChainSpec, EthSpec}; /// Apply attester and proposer rewards. /// -/// Spec v1.1.0 -pub fn process_rewards_and_penalties( +/// This function should only be used for testing. +pub fn process_rewards_and_penalties_slow( state: &mut BeaconState, - participation_cache: &ParticipationCache, spec: &ChainSpec, ) -> Result<(), Error> { - if state.current_epoch() == T::genesis_epoch() { - return Ok(()); - } - - let mut deltas = vec![Delta::default(); state.validators().len()]; - - let total_active_balance = participation_cache.current_epoch_total_active_balance(); - - for flag_index in 0..PARTICIPATION_FLAG_WEIGHTS.len() { - get_flag_index_deltas( - &mut deltas, - state, - flag_index, - total_active_balance, - participation_cache, - spec, - )?; - } - - get_inactivity_penalty_deltas(&mut deltas, state, participation_cache, spec)?; - - // Apply the deltas, erroring on overflow above but not on overflow below (saturating at 0 - // instead). - let mut balances = state.balances_mut().iter_cow(); - - while let Some((i, balance)) = balances.next_cow() { - let delta = deltas - .get(i) - .ok_or(BeaconStateError::BalancesOutOfBounds(i))?; - - if delta.rewards == 0 && delta.penalties == 0 { - continue; - } - - let balance = balance.to_mut(); - increase_balance_directly(balance, delta.rewards)?; - decrease_balance_directly(balance, delta.penalties)?; - } - + process_epoch_single_pass( + state, + spec, + SinglePassConfig { + rewards_and_penalties: true, + ..SinglePassConfig::disable_all() + }, + )?; Ok(()) } diff --git a/consensus/state_processing/src/per_epoch_processing/base.rs b/consensus/state_processing/src/per_epoch_processing/base.rs index 5f270123529..b1fb0ca22b0 100644 --- a/consensus/state_processing/src/per_epoch_processing/base.rs +++ b/consensus/state_processing/src/per_epoch_processing/base.rs @@ -25,7 +25,7 @@ pub fn process_epoch( state.build_committee_cache(RelativeEpoch::Current, spec)?; state.build_committee_cache(RelativeEpoch::Next, spec)?; state.build_total_active_balance_cache_at(state.current_epoch(), spec)?; - initialize_epoch_cache(state, state.current_epoch(), spec)?; + initialize_epoch_cache(state, spec)?; // Load the struct we use to assign validators into sets based on their participation. // @@ -47,7 +47,6 @@ pub fn process_epoch( // Slashings. process_slashings( state, - None, validator_statuses.total_balances.current_epoch(), spec, )?; @@ -56,7 +55,7 @@ pub fn process_epoch( process_eth1_data_reset(state)?; // Update effective balances with hysteresis (lag). - process_effective_balance_updates(state, None, spec)?; + process_effective_balance_updates(state, spec)?; // Reset slashings process_slashings_reset(state)?; @@ -72,7 +71,6 @@ pub fn process_epoch( // Rotate the epoch caches to suit the epoch transition. state.advance_caches(spec)?; - initialize_epoch_cache(state, state.next_epoch()?, spec)?; Ok(EpochProcessingSummary::Base { total_balances: validator_statuses.total_balances, diff --git a/consensus/state_processing/src/per_epoch_processing/capella.rs b/consensus/state_processing/src/per_epoch_processing/capella.rs index d0df6ad1903..161bce54232 100644 --- a/consensus/state_processing/src/per_epoch_processing/capella.rs +++ b/consensus/state_processing/src/per_epoch_processing/capella.rs @@ -1,89 +1,3 @@ -use super::altair::inactivity_updates::process_inactivity_updates; -use super::altair::justification_and_finalization::process_justification_and_finalization; -use super::altair::participation_cache::ParticipationCache; -use super::altair::participation_flag_updates::process_participation_flag_updates; -use super::altair::rewards_and_penalties::process_rewards_and_penalties; -use super::altair::sync_committee_updates::process_sync_committee_updates; -use super::{process_registry_updates, process_slashings, EpochProcessingSummary, Error}; -use crate::per_epoch_processing::{ - effective_balance_updates::process_effective_balance_updates, - resets::{process_eth1_data_reset, process_randao_mixes_reset, process_slashings_reset}, -}; -use types::{BeaconState, ChainSpec, EthSpec, RelativeEpoch}; - -use crate::common::update_progressive_balances_cache::{ - initialize_progressive_balances_cache, update_progressive_balances_on_epoch_transition, -}; -use crate::epoch_cache::initialize_epoch_cache; pub use historical_summaries_update::process_historical_summaries_update; mod historical_summaries_update; - -pub fn process_epoch( - state: &mut BeaconState, - spec: &ChainSpec, -) -> Result, Error> { - // Ensure the committee caches are built. - state.build_committee_cache(RelativeEpoch::Previous, spec)?; - state.build_committee_cache(RelativeEpoch::Current, spec)?; - state.build_committee_cache(RelativeEpoch::Next, spec)?; - state.build_total_active_balance_cache_at(state.current_epoch(), spec)?; - initialize_epoch_cache(state, state.current_epoch(), spec)?; - - // Pre-compute participating indices and total balances. - let mut participation_cache = ParticipationCache::new(state, spec)?; - let sync_committee = state.current_sync_committee()?.clone(); - initialize_progressive_balances_cache(state, Some(&participation_cache), spec)?; - - // Justification and finalization. - let justification_and_finalization_state = - process_justification_and_finalization(state, &participation_cache)?; - justification_and_finalization_state.apply_changes_to_state(state); - - process_inactivity_updates(state, &mut participation_cache, spec)?; - - // Rewards and Penalties. - process_rewards_and_penalties(state, &participation_cache, spec)?; - - // Registry Updates. - process_registry_updates(state, spec)?; - - // Slashings. - process_slashings( - state, - Some(participation_cache.process_slashings_indices()), - participation_cache.current_epoch_total_active_balance(), - spec, - )?; - - // Reset eth1 data votes. - process_eth1_data_reset(state)?; - - // Update effective balances with hysteresis (lag). - process_effective_balance_updates(state, Some(&participation_cache), spec)?; - - // Reset slashings - process_slashings_reset(state)?; - - // Set randao mix - process_randao_mixes_reset(state)?; - - // Set historical summaries accumulator - process_historical_summaries_update(state)?; - - // Rotate current/previous epoch participation - process_participation_flag_updates(state)?; - - process_sync_committee_updates(state, spec)?; - - // Rotate the epoch caches to suit the epoch transition. - state.advance_caches(spec)?; - initialize_epoch_cache(state, state.next_epoch()?, spec)?; - - update_progressive_balances_on_epoch_transition(state, spec)?; - - Ok(EpochProcessingSummary::Altair { - participation_cache, - sync_committee, - }) -} diff --git a/consensus/state_processing/src/per_epoch_processing/effective_balance_updates.rs b/consensus/state_processing/src/per_epoch_processing/effective_balance_updates.rs index fb1ff475233..dccb03ac0de 100644 --- a/consensus/state_processing/src/per_epoch_processing/effective_balance_updates.rs +++ b/consensus/state_processing/src/per_epoch_processing/effective_balance_updates.rs @@ -1,13 +1,12 @@ use super::errors::EpochProcessingError; -use crate::per_epoch_processing::altair::ParticipationCache; +use crate::per_epoch_processing::single_pass::{process_epoch_single_pass, SinglePassConfig}; use safe_arith::SafeArith; use types::beacon_state::BeaconState; use types::chain_spec::ChainSpec; -use types::{BeaconStateError, EthSpec, ProgressiveBalancesCache}; +use types::{BeaconStateError, EthSpec}; pub fn process_effective_balance_updates( state: &mut BeaconState, - maybe_participation_cache: Option<&ParticipationCache>, spec: &ChainSpec, ) -> Result<(), EpochProcessingError> { // Compute new total active balance for the next epoch as a side-effect of iterating the @@ -20,8 +19,7 @@ pub fn process_effective_balance_updates( .safe_div(spec.hysteresis_quotient)?; let downward_threshold = hysteresis_increment.safe_mul(spec.hysteresis_downward_multiplier)?; let upward_threshold = hysteresis_increment.safe_mul(spec.hysteresis_upward_multiplier)?; - let (validators, balances, progressive_balances_cache) = - state.validators_and_balances_and_progressive_balances_mut(); + let (validators, balances, _) = state.validators_and_balances_and_progressive_balances_mut(); let mut validators_iter = validators.iter_cow(); while let Some((index, validator)) = validators_iter.next_cow() { @@ -47,18 +45,7 @@ pub fn process_effective_balance_updates( } if new_effective_balance != validator.effective_balance() { - let old_effective_balance = validator.effective_balance(); validator.to_mut().mutable.effective_balance = new_effective_balance; - - if let Some(participation_cache) = maybe_participation_cache { - update_progressive_balances( - participation_cache, - progressive_balances_cache, - index, - old_effective_balance, - new_effective_balance, - )?; - } } } @@ -67,21 +54,17 @@ pub fn process_effective_balance_updates( Ok(()) } -fn update_progressive_balances( - participation_cache: &ParticipationCache, - progressive_balances_cache: &mut ProgressiveBalancesCache, - index: usize, - old_effective_balance: u64, - new_effective_balance: u64, +pub fn process_effective_balance_updates_slow( + state: &mut BeaconState, + spec: &ChainSpec, ) -> Result<(), EpochProcessingError> { - if old_effective_balance != new_effective_balance { - let is_current_epoch_target_attester = - participation_cache.is_current_epoch_timely_target_attester(index)?; - progressive_balances_cache.on_effective_balance_change( - is_current_epoch_target_attester, - old_effective_balance, - new_effective_balance, - )?; - } + process_epoch_single_pass( + state, + spec, + SinglePassConfig { + effective_balance_updates: true, + ..SinglePassConfig::disable_all() + }, + )?; Ok(()) } diff --git a/consensus/state_processing/src/per_epoch_processing/epoch_processing_summary.rs b/consensus/state_processing/src/per_epoch_processing/epoch_processing_summary.rs index 984f93d550b..1fc96479006 100644 --- a/consensus/state_processing/src/per_epoch_processing/epoch_processing_summary.rs +++ b/consensus/state_processing/src/per_epoch_processing/epoch_processing_summary.rs @@ -1,10 +1,11 @@ -use super::{ - altair::{participation_cache::Error as ParticipationCacheError, ParticipationCache}, - base::{validator_statuses::InclusionInfo, TotalBalances, ValidatorStatus}, -}; +use super::base::{validator_statuses::InclusionInfo, TotalBalances, ValidatorStatus}; use crate::metrics; use std::sync::Arc; -use types::{EthSpec, SyncCommittee}; +use types::{ + consts::altair::{TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX, TIMELY_TARGET_FLAG_INDEX}, + BeaconStateError, Epoch, EthSpec, ParticipationFlags, ProgressiveBalancesCache, SyncCommittee, + VList, Validator, +}; /// Provides a summary of validator participation during the epoch. #[derive(PartialEq, Debug)] @@ -14,15 +15,80 @@ pub enum EpochProcessingSummary { statuses: Vec, }, Altair { - participation_cache: ParticipationCache, + progressive_balances: ProgressiveBalancesCache, + current_epoch_total_active_balance: u64, + participation: ParticipationEpochSummary, sync_committee: Arc>, }, } +#[derive(PartialEq, Debug)] +pub struct ParticipationEpochSummary { + /// Copy of the validator registry prior to mutation. + validators: VList, + /// Copy of the participation flags for the previous epoch. + previous_epoch_participation: VList, + /// Copy of the participation flags for the current epoch. + current_epoch_participation: VList, + previous_epoch: Epoch, + current_epoch: Epoch, +} + +impl ParticipationEpochSummary { + pub fn new( + validators: VList, + previous_epoch_participation: VList, + current_epoch_participation: VList, + previous_epoch: Epoch, + current_epoch: Epoch, + ) -> Self { + Self { + validators, + previous_epoch_participation, + current_epoch_participation, + previous_epoch, + current_epoch, + } + } + + pub fn is_active_and_unslashed(&self, val_index: usize, epoch: Epoch) -> bool { + self.validators + .get(val_index) + .map(|validator| !validator.slashed() && validator.is_active_at(epoch)) + .unwrap_or(false) + } + + pub fn is_previous_epoch_unslashed_participating_index( + &self, + val_index: usize, + flag_index: usize, + ) -> Result { + Ok(self.is_active_and_unslashed(val_index, self.previous_epoch) + && self + .previous_epoch_participation + .get(val_index) + .ok_or(BeaconStateError::UnknownValidator(val_index))? + .has_flag(flag_index)?) + } + + pub fn is_current_epoch_unslashed_participating_index( + &self, + val_index: usize, + flag_index: usize, + ) -> Result { + Ok(self.is_active_and_unslashed(val_index, self.current_epoch) + && self + .current_epoch_participation + .get(val_index) + .ok_or(BeaconStateError::UnknownValidator(val_index))? + .has_flag(flag_index)?) + } +} + impl EpochProcessingSummary { /// Updates some Prometheus metrics with some values in `self`. #[cfg(feature = "metrics")] - pub fn observe_metrics(&self) -> Result<(), ParticipationCacheError> { + pub fn observe_metrics(&self) -> Result<(), BeaconStateError> { metrics::set_gauge( &metrics::PARTICIPATION_PREV_EPOCH_HEAD_ATTESTING_GWEI_TOTAL, self.previous_epoch_head_attesting_balance()? as i64, @@ -56,35 +122,30 @@ impl EpochProcessingSummary { match self { EpochProcessingSummary::Base { total_balances, .. } => total_balances.current_epoch(), EpochProcessingSummary::Altair { - participation_cache, + current_epoch_total_active_balance, .. - } => participation_cache.current_epoch_total_active_balance(), + } => *current_epoch_total_active_balance, } } /// Returns the sum of the effective balance of all validators in the current epoch who /// included an attestation that matched the target. - pub fn current_epoch_target_attesting_balance(&self) -> Result { + pub fn current_epoch_target_attesting_balance(&self) -> Result { match self { EpochProcessingSummary::Base { total_balances, .. } => { Ok(total_balances.current_epoch_target_attesters()) } EpochProcessingSummary::Altair { - participation_cache, + progressive_balances, .. - } => participation_cache.current_epoch_target_attesting_balance(), + } => progressive_balances.current_epoch_target_attesting_balance(), } } /// Returns the sum of the effective balance of all validators in the previous epoch. pub fn previous_epoch_total_active_balance(&self) -> u64 { - match self { - EpochProcessingSummary::Base { total_balances, .. } => total_balances.previous_epoch(), - EpochProcessingSummary::Altair { - participation_cache, - .. - } => participation_cache.previous_epoch_total_active_balance(), - } + // FIXME(sproul): this is not a useful concept and should be deleted + self.current_epoch_total_active_balance() } /// Returns `true` if `val_index` was included in the active validator indices in the current @@ -98,10 +159,9 @@ impl EpochProcessingSummary { EpochProcessingSummary::Base { statuses, .. } => statuses .get(val_index) .map_or(false, |s| s.is_active_in_current_epoch && !s.is_slashed), - EpochProcessingSummary::Altair { - participation_cache, - .. - } => participation_cache.is_active_unslashed_in_current_epoch(val_index), + EpochProcessingSummary::Altair { participation, .. } => { + participation.is_active_and_unslashed(val_index, participation.current_epoch) + } } } @@ -119,34 +179,30 @@ impl EpochProcessingSummary { pub fn is_current_epoch_target_attester( &self, val_index: usize, - ) -> Result { + ) -> Result { match self { EpochProcessingSummary::Base { statuses, .. } => Ok(statuses .get(val_index) .map_or(false, |s| s.is_current_epoch_target_attester)), - EpochProcessingSummary::Altair { - participation_cache, - .. - } => participation_cache - .is_current_epoch_timely_target_attester(val_index) - .or_else(|e| match e { - ParticipationCacheError::InvalidValidatorIndex(_) => Ok(false), - e => Err(e), - }), + EpochProcessingSummary::Altair { participation, .. } => participation + .is_current_epoch_unslashed_participating_index( + val_index, + TIMELY_TARGET_FLAG_INDEX, + ), } } /// Returns the sum of the effective balance of all validators in the previous epoch who /// included an attestation that matched the target. - pub fn previous_epoch_target_attesting_balance(&self) -> Result { + pub fn previous_epoch_target_attesting_balance(&self) -> Result { match self { EpochProcessingSummary::Base { total_balances, .. } => { Ok(total_balances.previous_epoch_target_attesters()) } EpochProcessingSummary::Altair { - participation_cache, + progressive_balances, .. - } => participation_cache.previous_epoch_target_attesting_balance(), + } => progressive_balances.previous_epoch_target_attesting_balance(), } } @@ -157,15 +213,15 @@ impl EpochProcessingSummary { /// /// - Base: any attestation can match the head. /// - Altair: only "timely" attestations can match the head. - pub fn previous_epoch_head_attesting_balance(&self) -> Result { + pub fn previous_epoch_head_attesting_balance(&self) -> Result { match self { EpochProcessingSummary::Base { total_balances, .. } => { Ok(total_balances.previous_epoch_head_attesters()) } EpochProcessingSummary::Altair { - participation_cache, + progressive_balances, .. - } => participation_cache.previous_epoch_head_attesting_balance(), + } => progressive_balances.previous_epoch_head_attesting_balance(), } } @@ -176,15 +232,15 @@ impl EpochProcessingSummary { /// /// - Base: any attestation can match the source. /// - Altair: only "timely" attestations can match the source. - pub fn previous_epoch_source_attesting_balance(&self) -> Result { + pub fn previous_epoch_source_attesting_balance(&self) -> Result { match self { EpochProcessingSummary::Base { total_balances, .. } => { Ok(total_balances.previous_epoch_attesters()) } EpochProcessingSummary::Altair { - participation_cache, + progressive_balances, .. - } => participation_cache.previous_epoch_source_attesting_balance(), + } => progressive_balances.previous_epoch_source_attesting_balance(), } } @@ -199,10 +255,9 @@ impl EpochProcessingSummary { EpochProcessingSummary::Base { statuses, .. } => statuses .get(val_index) .map_or(false, |s| s.is_active_in_previous_epoch && !s.is_slashed), - EpochProcessingSummary::Altair { - participation_cache, - .. - } => participation_cache.is_active_unslashed_in_previous_epoch(val_index), + EpochProcessingSummary::Altair { participation, .. } => { + participation.is_active_and_unslashed(val_index, participation.previous_epoch) + } } } @@ -215,20 +270,16 @@ impl EpochProcessingSummary { pub fn is_previous_epoch_target_attester( &self, val_index: usize, - ) -> Result { + ) -> Result { match self { EpochProcessingSummary::Base { statuses, .. } => Ok(statuses .get(val_index) .map_or(false, |s| s.is_previous_epoch_target_attester)), - EpochProcessingSummary::Altair { - participation_cache, - .. - } => participation_cache - .is_previous_epoch_timely_target_attester(val_index) - .or_else(|e| match e { - ParticipationCacheError::InvalidValidatorIndex(_) => Ok(false), - e => Err(e), - }), + EpochProcessingSummary::Altair { participation, .. } => participation + .is_previous_epoch_unslashed_participating_index( + val_index, + TIMELY_TARGET_FLAG_INDEX, + ), } } @@ -246,20 +297,13 @@ impl EpochProcessingSummary { pub fn is_previous_epoch_head_attester( &self, val_index: usize, - ) -> Result { + ) -> Result { match self { EpochProcessingSummary::Base { statuses, .. } => Ok(statuses .get(val_index) .map_or(false, |s| s.is_previous_epoch_head_attester)), - EpochProcessingSummary::Altair { - participation_cache, - .. - } => participation_cache - .is_previous_epoch_timely_head_attester(val_index) - .or_else(|e| match e { - ParticipationCacheError::InvalidValidatorIndex(_) => Ok(false), - e => Err(e), - }), + EpochProcessingSummary::Altair { participation, .. } => participation + .is_previous_epoch_unslashed_participating_index(val_index, TIMELY_HEAD_FLAG_INDEX), } } @@ -277,20 +321,16 @@ impl EpochProcessingSummary { pub fn is_previous_epoch_source_attester( &self, val_index: usize, - ) -> Result { + ) -> Result { match self { EpochProcessingSummary::Base { statuses, .. } => Ok(statuses .get(val_index) .map_or(false, |s| s.is_previous_epoch_attester)), - EpochProcessingSummary::Altair { - participation_cache, - .. - } => participation_cache - .is_previous_epoch_timely_source_attester(val_index) - .or_else(|e| match e { - ParticipationCacheError::InvalidValidatorIndex(_) => Ok(false), - e => Err(e), - }), + EpochProcessingSummary::Altair { participation, .. } => participation + .is_previous_epoch_unslashed_participating_index( + val_index, + TIMELY_SOURCE_FLAG_INDEX, + ), } } diff --git a/consensus/state_processing/src/per_epoch_processing/registry_updates.rs b/consensus/state_processing/src/per_epoch_processing/registry_updates.rs index 230fd6bd5a0..812f95a68a3 100644 --- a/consensus/state_processing/src/per_epoch_processing/registry_updates.rs +++ b/consensus/state_processing/src/per_epoch_processing/registry_updates.rs @@ -1,3 +1,4 @@ +use crate::per_epoch_processing::single_pass::{process_epoch_single_pass, SinglePassConfig}; use crate::{common::initiate_validator_exit, per_epoch_processing::Error}; use safe_arith::SafeArith; use types::{BeaconState, ChainSpec, EthSpec, Validator}; @@ -54,3 +55,18 @@ pub fn process_registry_updates( Ok(()) } + +pub fn process_registry_updates_slow( + state: &mut BeaconState, + spec: &ChainSpec, +) -> Result<(), Error> { + process_epoch_single_pass( + state, + spec, + SinglePassConfig { + registry_updates: true, + ..SinglePassConfig::disable_all() + }, + )?; + Ok(()) +} diff --git a/consensus/state_processing/src/per_epoch_processing/single_pass.rs b/consensus/state_processing/src/per_epoch_processing/single_pass.rs new file mode 100644 index 00000000000..f4b4b77c08d --- /dev/null +++ b/consensus/state_processing/src/per_epoch_processing/single_pass.rs @@ -0,0 +1,628 @@ +use crate::{ + common::update_progressive_balances_cache::initialize_progressive_balances_cache, + epoch_cache::{initialize_epoch_cache, PreEpochCache}, + per_epoch_processing::{Delta, Error, ParticipationEpochSummary}, +}; +use itertools::izip; +use safe_arith::{SafeArith, SafeArithIter}; +use std::cmp::{max, min}; +use std::collections::BTreeSet; +use types::{ + consts::altair::{ + NUM_FLAG_INDICES, PARTICIPATION_FLAG_WEIGHTS, TIMELY_HEAD_FLAG_INDEX, + TIMELY_TARGET_FLAG_INDEX, WEIGHT_DENOMINATOR, + }, + ActivationQueue, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, ExitCache, ForkName, + ParticipationFlags, ProgressiveBalancesCache, Unsigned, Validator, +}; + +pub struct SinglePassConfig { + pub inactivity_updates: bool, + pub rewards_and_penalties: bool, + pub registry_updates: bool, + pub slashings: bool, + pub effective_balance_updates: bool, +} + +impl Default for SinglePassConfig { + fn default() -> SinglePassConfig { + Self::enable_all() + } +} + +impl SinglePassConfig { + pub fn enable_all() -> SinglePassConfig { + Self { + inactivity_updates: true, + rewards_and_penalties: true, + registry_updates: true, + slashings: true, + effective_balance_updates: true, + } + } + + pub fn disable_all() -> SinglePassConfig { + SinglePassConfig { + inactivity_updates: false, + rewards_and_penalties: false, + registry_updates: false, + slashings: false, + effective_balance_updates: false, + } + } +} + +/// Values from the state that are immutable throughout epoch processing. +struct StateContext { + current_epoch: Epoch, + next_epoch: Epoch, + is_in_inactivity_leak: bool, + total_active_balance: u64, + churn_limit: u64, + fork_name: ForkName, +} + +struct RewardsAndPenaltiesContext { + unslashed_participating_increments_array: [u64; NUM_FLAG_INDICES], + active_increments: u64, +} + +struct SlashingsContext { + adjusted_total_slashing_balance: u64, + target_withdrawable_epoch: Epoch, +} + +struct EffectiveBalancesContext { + downward_threshold: u64, + upward_threshold: u64, +} + +#[derive(Debug, PartialEq, Clone)] +pub struct ValidatorInfo { + pub index: usize, + pub effective_balance: u64, + pub base_reward: u64, + pub is_eligible: bool, + pub is_slashed: bool, + pub is_active_current_epoch: bool, + pub is_active_previous_epoch: bool, + // Used for determining rewards. + pub previous_epoch_participation: ParticipationFlags, + // Used for updating the progressive balances cache for next epoch. + pub current_epoch_participation: ParticipationFlags, +} + +impl ValidatorInfo { + #[inline] + pub fn is_unslashed_participating_index(&self, flag_index: usize) -> Result { + Ok(self.is_active_previous_epoch + && !self.is_slashed + && self + .previous_epoch_participation + .has_flag(flag_index) + .map_err(|_| Error::InvalidFlagIndex(flag_index))?) + } +} + +pub fn process_epoch_single_pass( + state: &mut BeaconState, + spec: &ChainSpec, + conf: SinglePassConfig, +) -> Result, Error> { + initialize_epoch_cache(state, spec)?; + initialize_progressive_balances_cache(state, None, spec)?; + state.build_exit_cache(spec)?; + + let previous_epoch = state.previous_epoch(); + let current_epoch = state.current_epoch(); + let next_epoch = state.next_epoch()?; + let is_in_inactivity_leak = state.is_in_inactivity_leak(previous_epoch, spec); + let total_active_balance = state.get_total_active_balance()?; + let churn_limit = state.get_churn_limit(spec)?; + let finalized_checkpoint = state.finalized_checkpoint(); + let fork_name = state.fork_name_unchecked(); + + let state_ctxt = &StateContext { + current_epoch, + next_epoch, + is_in_inactivity_leak, + total_active_balance, + churn_limit, + fork_name, + }; + + // Contexts that require immutable access to `state`. + let slashings_ctxt = &SlashingsContext::new(state, state_ctxt, spec)?; + let mut next_epoch_cache = PreEpochCache::new_for_next_epoch(state)?; + + // Split the state into several disjoint mutable borrows. + let ( + validators, + balances, + previous_epoch_participation, + current_epoch_participation, + inactivity_scores, + progressive_balances, + exit_cache, + epoch_cache, + ) = state.mutable_validator_fields()?; + + let num_validators = validators.len(); + + // Take a snapshot of the validators and participation before mutating. This is used for + // informational purposes (e.g. by the validator monitor). + let summary = ParticipationEpochSummary::new( + validators.clone(), + previous_epoch_participation.clone(), + current_epoch_participation.clone(), + previous_epoch, + current_epoch, + ); + + // Compute shared values required for different parts of epoch processing. + let rewards_ctxt = &RewardsAndPenaltiesContext::new(progressive_balances, state_ctxt, spec)?; + let activation_queue = &epoch_cache + .activation_queue()? + .get_validators_eligible_for_activation(finalized_checkpoint.epoch, churn_limit as usize); + let effective_balances_ctxt = &EffectiveBalancesContext::new(spec)?; + + // Iterate over the validators and related fields in one pass. + let mut validators_iter = validators.iter_cow(); + let mut balances_iter = balances.iter_cow(); + let mut inactivity_scores_iter = inactivity_scores.iter_cow(); + + // Values computed for the next epoch transition. + let mut next_epoch_total_active_balance = 0; + let mut next_epoch_activation_queue = ActivationQueue::default(); + + for (index, &previous_epoch_participation, ¤t_epoch_participation) in izip!( + 0..num_validators, + previous_epoch_participation.iter(), + current_epoch_participation.iter(), + ) { + let (_, validator_cow) = validators_iter + .next_cow() + .ok_or(BeaconStateError::UnknownValidator(index))?; + let (_, balance_cow) = balances_iter + .next_cow() + .ok_or(BeaconStateError::UnknownValidator(index))?; + let (_, inactivity_score_cow) = inactivity_scores_iter + .next_cow() + .ok_or(BeaconStateError::UnknownValidator(index))?; + + let validator = validator_cow.to_mut(); + let balance = balance_cow.to_mut(); + let inactivity_score = inactivity_score_cow.to_mut(); + + let is_active_current_epoch = validator.is_active_at(current_epoch); + let is_active_previous_epoch = validator.is_active_at(previous_epoch); + let is_eligible = is_active_previous_epoch + || (validator.slashed() + && previous_epoch + Epoch::new(1) < validator.withdrawable_epoch()); + + let base_reward = if is_eligible { + epoch_cache.get_base_reward(index)? + } else { + 0 + }; + + let validator_info = &ValidatorInfo { + index, + effective_balance: validator.effective_balance(), + base_reward, + is_eligible, + is_slashed: validator.slashed(), + is_active_current_epoch, + is_active_previous_epoch, + previous_epoch_participation, + current_epoch_participation, + }; + + if current_epoch != E::genesis_epoch() { + // `process_inactivity_updates` + if conf.inactivity_updates { + process_single_inactivity_update( + inactivity_score, + validator_info, + state_ctxt, + spec, + )?; + } + + // `process_rewards_and_penalties` + if conf.rewards_and_penalties { + process_single_reward_and_penalty( + balance, + inactivity_score, + validator_info, + rewards_ctxt, + state_ctxt, + spec, + )?; + } + } + + // `process_registry_updates` + if conf.registry_updates { + process_single_registry_update( + validator, + validator_info, + exit_cache, + activation_queue, + &mut next_epoch_activation_queue, + state_ctxt, + spec, + )?; + } + + // `process_slashings` + if conf.slashings { + process_single_slashing(balance, validator, slashings_ctxt, state_ctxt, spec)?; + } + + // `process_effective_balance_updates` + if conf.effective_balance_updates { + process_single_effective_balance_update( + *balance, + validator, + validator_info, + &mut next_epoch_total_active_balance, + &mut next_epoch_cache, + progressive_balances, + effective_balances_ctxt, + state_ctxt, + spec, + )?; + } + } + + if conf.effective_balance_updates { + state.set_total_active_balance(next_epoch, next_epoch_total_active_balance); + *state.epoch_cache_mut() = next_epoch_cache.into_epoch_cache( + next_epoch_total_active_balance, + next_epoch_activation_queue, + spec, + )?; + } + + Ok(summary) +} + +fn process_single_inactivity_update( + inactivity_score: &mut u64, + validator_info: &ValidatorInfo, + state_ctxt: &StateContext, + spec: &ChainSpec, +) -> Result<(), Error> { + if !validator_info.is_eligible { + return Ok(()); + } + + // Increase inactivity score of inactive validators + if validator_info.is_unslashed_participating_index(TIMELY_TARGET_FLAG_INDEX)? { + // Avoid mutating when the inactivity score is 0 and can't go any lower -- the common + // case. + if *inactivity_score == 0 { + return Ok(()); + } + inactivity_score.safe_sub_assign(1)?; + } else { + inactivity_score.safe_add_assign(spec.inactivity_score_bias)?; + } + + // Decrease the score of all validators for forgiveness when not during a leak + if !state_ctxt.is_in_inactivity_leak { + inactivity_score + .safe_sub_assign(min(spec.inactivity_score_recovery_rate, *inactivity_score))?; + } + + Ok(()) +} + +fn process_single_reward_and_penalty( + balance: &mut u64, + inactivity_score: &u64, + validator_info: &ValidatorInfo, + rewards_ctxt: &RewardsAndPenaltiesContext, + state_ctxt: &StateContext, + spec: &ChainSpec, +) -> Result<(), Error> { + if !validator_info.is_eligible { + return Ok(()); + } + + let mut delta = Delta::default(); + for flag_index in 0..NUM_FLAG_INDICES { + get_flag_index_delta( + &mut delta, + validator_info, + flag_index, + rewards_ctxt, + state_ctxt, + )?; + } + get_inactivity_penalty_delta( + &mut delta, + validator_info, + inactivity_score, + state_ctxt, + spec, + )?; + + balance.safe_add_assign(delta.rewards)?; + *balance = balance.saturating_sub(delta.penalties); + + Ok(()) +} + +fn get_flag_index_delta( + delta: &mut Delta, + validator_info: &ValidatorInfo, + flag_index: usize, + rewards_ctxt: &RewardsAndPenaltiesContext, + state_ctxt: &StateContext, +) -> Result<(), Error> { + let base_reward = validator_info.base_reward; + let weight = get_flag_weight(flag_index)?; + let unslashed_participating_increments = + rewards_ctxt.get_unslashed_participating_increments(flag_index)?; + + if validator_info.is_unslashed_participating_index(flag_index)? { + if !state_ctxt.is_in_inactivity_leak { + let reward_numerator = base_reward + .safe_mul(weight)? + .safe_mul(unslashed_participating_increments)?; + delta.reward( + reward_numerator.safe_div( + rewards_ctxt + .active_increments + .safe_mul(WEIGHT_DENOMINATOR)?, + )?, + )?; + } + } else if flag_index != TIMELY_HEAD_FLAG_INDEX { + delta.penalize(base_reward.safe_mul(weight)?.safe_div(WEIGHT_DENOMINATOR)?)?; + } + Ok(()) +} + +/// Get the weight for a `flag_index` from the constant list of all weights. +fn get_flag_weight(flag_index: usize) -> Result { + PARTICIPATION_FLAG_WEIGHTS + .get(flag_index) + .copied() + .ok_or(Error::InvalidFlagIndex(flag_index)) +} + +fn get_inactivity_penalty_delta( + delta: &mut Delta, + validator_info: &ValidatorInfo, + inactivity_score: &u64, + state_ctxt: &StateContext, + spec: &ChainSpec, +) -> Result<(), Error> { + if !validator_info.is_unslashed_participating_index(TIMELY_TARGET_FLAG_INDEX)? { + let penalty_numerator = validator_info + .effective_balance + .safe_mul(*inactivity_score)?; + let penalty_denominator = spec + .inactivity_score_bias + .safe_mul(spec.inactivity_penalty_quotient_for_fork(state_ctxt.fork_name))?; + delta.penalize(penalty_numerator.safe_div(penalty_denominator)?)?; + } + Ok(()) +} + +impl RewardsAndPenaltiesContext { + fn new( + progressive_balances: &ProgressiveBalancesCache, + state_ctxt: &StateContext, + spec: &ChainSpec, + ) -> Result { + let mut unslashed_participating_increments_array = [0; NUM_FLAG_INDICES]; + for flag_index in 0..NUM_FLAG_INDICES { + let unslashed_participating_balance = + progressive_balances.previous_epoch_flag_attesting_balance(flag_index)?; + let unslashed_participating_increments = + unslashed_participating_balance.safe_div(spec.effective_balance_increment)?; + + *unslashed_participating_increments_array + .get_mut(flag_index) + .ok_or(Error::InvalidFlagIndex(flag_index))? = unslashed_participating_increments; + } + let active_increments = state_ctxt + .total_active_balance + .safe_div(spec.effective_balance_increment)?; + + Ok(Self { + unslashed_participating_increments_array, + active_increments, + }) + } + + fn get_unslashed_participating_increments(&self, flag_index: usize) -> Result { + self.unslashed_participating_increments_array + .get(flag_index) + .copied() + .ok_or(Error::InvalidFlagIndex(flag_index)) + } +} + +fn process_single_registry_update( + validator: &mut Validator, + validator_info: &ValidatorInfo, + exit_cache: &mut ExitCache, + activation_queue: &BTreeSet, + next_epoch_activation_queue: &mut ActivationQueue, + state_ctxt: &StateContext, + spec: &ChainSpec, +) -> Result<(), Error> { + let current_epoch = state_ctxt.current_epoch; + + if validator.is_eligible_for_activation_queue(spec) { + validator.mutable.activation_eligibility_epoch = current_epoch.safe_add(1)?; + } + + if validator.is_active_at(current_epoch) + && validator.effective_balance() <= spec.ejection_balance + { + initiate_validator_exit(validator, exit_cache, state_ctxt, spec)?; + } + + if activation_queue.contains(&validator_info.index) { + validator.mutable.activation_epoch = spec.compute_activation_exit_epoch(current_epoch)?; + } + + // Caching: add to speculative activation queue for next epoch. + next_epoch_activation_queue.add_if_could_be_eligible_for_activation( + validator_info.index, + validator, + state_ctxt.next_epoch, + spec, + ); + + Ok(()) +} + +fn initiate_validator_exit( + validator: &mut Validator, + exit_cache: &mut ExitCache, + state_ctxt: &StateContext, + spec: &ChainSpec, +) -> Result<(), Error> { + // Return if the validator already initiated exit + if validator.exit_epoch() != spec.far_future_epoch { + return Ok(()); + } + + // Compute exit queue epoch + let delayed_epoch = spec.compute_activation_exit_epoch(state_ctxt.current_epoch)?; + let mut exit_queue_epoch = exit_cache + .max_epoch()? + .map_or(delayed_epoch, |epoch| max(epoch, delayed_epoch)); + let exit_queue_churn = exit_cache.get_churn_at(exit_queue_epoch)?; + + if exit_queue_churn >= state_ctxt.churn_limit { + exit_queue_epoch.safe_add_assign(1)?; + } + + validator.mutable.exit_epoch = exit_queue_epoch; + validator.mutable.withdrawable_epoch = + exit_queue_epoch.safe_add(spec.min_validator_withdrawability_delay)?; + + exit_cache.record_validator_exit(exit_queue_epoch)?; + Ok(()) +} + +impl SlashingsContext { + fn new( + state: &BeaconState, + state_ctxt: &StateContext, + spec: &ChainSpec, + ) -> Result { + let sum_slashings = state.get_all_slashings().iter().copied().safe_sum()?; + let adjusted_total_slashing_balance = min( + sum_slashings.safe_mul(spec.proportional_slashing_multiplier_for_state(state))?, + state_ctxt.total_active_balance, + ); + + let target_withdrawable_epoch = state_ctxt + .current_epoch + .safe_add(E::EpochsPerSlashingsVector::to_u64().safe_div(2)?)?; + + Ok(Self { + adjusted_total_slashing_balance, + target_withdrawable_epoch, + }) + } +} + +fn process_single_slashing( + balance: &mut u64, + validator: &Validator, + slashings_ctxt: &SlashingsContext, + state_ctxt: &StateContext, + spec: &ChainSpec, +) -> Result<(), Error> { + if validator.slashed() + && slashings_ctxt.target_withdrawable_epoch == validator.withdrawable_epoch() + { + let increment = spec.effective_balance_increment; + let penalty_numerator = validator + .effective_balance() + .safe_div(increment)? + .safe_mul(slashings_ctxt.adjusted_total_slashing_balance)?; + let penalty = penalty_numerator + .safe_div(state_ctxt.total_active_balance)? + .safe_mul(increment)?; + + *balance = balance.saturating_sub(penalty); + } + Ok(()) +} + +impl EffectiveBalancesContext { + fn new(spec: &ChainSpec) -> Result { + let hysteresis_increment = spec + .effective_balance_increment + .safe_div(spec.hysteresis_quotient)?; + let downward_threshold = + hysteresis_increment.safe_mul(spec.hysteresis_downward_multiplier)?; + let upward_threshold = hysteresis_increment.safe_mul(spec.hysteresis_upward_multiplier)?; + + Ok(Self { + downward_threshold, + upward_threshold, + }) + } +} + +#[allow(clippy::too_many_arguments)] +fn process_single_effective_balance_update( + balance: u64, + validator: &mut Validator, + validator_info: &ValidatorInfo, + next_epoch_total_active_balance: &mut u64, + next_epoch_cache: &mut PreEpochCache, + progressive_balances: &mut ProgressiveBalancesCache, + eb_ctxt: &EffectiveBalancesContext, + state_ctxt: &StateContext, + spec: &ChainSpec, +) -> Result<(), Error> { + let old_effective_balance = validator.effective_balance(); + let new_effective_balance = if balance.safe_add(eb_ctxt.downward_threshold)? + < validator.effective_balance() + || validator + .effective_balance() + .safe_add(eb_ctxt.upward_threshold)? + < balance + { + min( + balance.safe_sub(balance.safe_rem(spec.effective_balance_increment)?)?, + spec.max_effective_balance, + ) + } else { + validator.effective_balance() + }; + + if validator.is_active_at(state_ctxt.next_epoch) { + next_epoch_total_active_balance.safe_add_assign(new_effective_balance)?; + } + + if new_effective_balance != old_effective_balance { + validator.mutable.effective_balance = new_effective_balance; + + // Update progressive balances cache for the *current* epoch, which will soon become the + // previous epoch once the epoch transition completes. + progressive_balances.on_effective_balance_change( + validator_info.current_epoch_participation, + old_effective_balance, + new_effective_balance, + )?; + } + + // Caching: update next epoch effective balances. + next_epoch_cache.push_effective_balance(new_effective_balance); + + Ok(()) +} diff --git a/consensus/state_processing/src/per_epoch_processing/slashings.rs b/consensus/state_processing/src/per_epoch_processing/slashings.rs index 1a14f8f9934..01636e7d169 100644 --- a/consensus/state_processing/src/per_epoch_processing/slashings.rs +++ b/consensus/state_processing/src/per_epoch_processing/slashings.rs @@ -1,12 +1,14 @@ use crate::common::decrease_balance; -use crate::per_epoch_processing::Error; +use crate::per_epoch_processing::{ + single_pass::{process_epoch_single_pass, SinglePassConfig}, + Error, +}; use safe_arith::{SafeArith, SafeArithIter}; use types::{BeaconState, ChainSpec, EthSpec, Unsigned}; /// Process slashings. pub fn process_slashings( state: &mut BeaconState, - indices: Option>, total_balance: u64, spec: &ChainSpec, ) -> Result<(), Error> { @@ -20,17 +22,15 @@ pub fn process_slashings( let target_withdrawable_epoch = epoch.safe_add(T::EpochsPerSlashingsVector::to_u64().safe_div(2)?)?; - let indices = indices.unwrap_or_else(|| { - state - .validators() - .iter() - .enumerate() - .filter(|(_, validator)| { - validator.slashed() && target_withdrawable_epoch == validator.withdrawable_epoch() - }) - .map(|(index, validator)| (index, validator.effective_balance())) - .collect() - }); + let indices = state + .validators() + .iter() + .enumerate() + .filter(|(_, validator)| { + validator.slashed() && target_withdrawable_epoch == validator.withdrawable_epoch() + }) + .map(|(index, validator)| (index, validator.effective_balance())) + .collect::>(); for (index, validator_effective_balance) in indices { let increment = spec.effective_balance_increment; @@ -46,3 +46,18 @@ pub fn process_slashings( Ok(()) } + +pub fn process_slashings_slow( + state: &mut BeaconState, + spec: &ChainSpec, +) -> Result<(), Error> { + process_epoch_single_pass( + state, + spec, + SinglePassConfig { + slashings: true, + ..SinglePassConfig::disable_all() + }, + )?; + Ok(()) +} diff --git a/consensus/state_processing/src/upgrade/altair.rs b/consensus/state_processing/src/upgrade/altair.rs index 310df07927d..836f5a74fcb 100644 --- a/consensus/state_processing/src/upgrade/altair.rs +++ b/consensus/state_processing/src/upgrade/altair.rs @@ -106,6 +106,7 @@ pub fn upgrade_to_altair( committee_caches: mem::take(&mut pre.committee_caches), pubkey_cache: mem::take(&mut pre.pubkey_cache), exit_cache: mem::take(&mut pre.exit_cache), + slashings_cache: mem::take(&mut pre.slashings_cache), epoch_cache: EpochCache::default(), }); diff --git a/consensus/state_processing/src/upgrade/capella.rs b/consensus/state_processing/src/upgrade/capella.rs index 2ffcfe72b8d..db00374de9c 100644 --- a/consensus/state_processing/src/upgrade/capella.rs +++ b/consensus/state_processing/src/upgrade/capella.rs @@ -68,6 +68,7 @@ pub fn upgrade_to_capella( committee_caches: mem::take(&mut pre.committee_caches), pubkey_cache: mem::take(&mut pre.pubkey_cache), exit_cache: mem::take(&mut pre.exit_cache), + slashings_cache: mem::take(&mut pre.slashings_cache), epoch_cache: EpochCache::default(), }); diff --git a/consensus/state_processing/src/upgrade/merge.rs b/consensus/state_processing/src/upgrade/merge.rs index cfe5e5d0347..02705743ceb 100644 --- a/consensus/state_processing/src/upgrade/merge.rs +++ b/consensus/state_processing/src/upgrade/merge.rs @@ -64,6 +64,7 @@ pub fn upgrade_to_bellatrix( committee_caches: mem::take(&mut pre.committee_caches), pubkey_cache: mem::take(&mut pre.pubkey_cache), exit_cache: mem::take(&mut pre.exit_cache), + slashings_cache: mem::take(&mut pre.slashings_cache), epoch_cache: EpochCache::default(), }); diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 6a16bd3c4b3..92b2471696b 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1,5 +1,4 @@ use self::committee_cache::get_active_validator_indices; -use self::exit_cache::ExitCache; use crate::test_utils::TestRandom; use crate::validator::ValidatorTrait; use crate::*; @@ -29,7 +28,9 @@ pub use self::committee_cache::{ CommitteeCache, }; pub use crate::beacon_state::balance::Balance; +pub use crate::beacon_state::exit_cache::ExitCache; pub use crate::beacon_state::progressive_balances_cache::*; +pub use crate::beacon_state::slashings_cache::SlashingsCache; use crate::epoch_cache::EpochCache; use crate::historical_summary::HistoricalSummary; pub use eth_spec::*; @@ -44,6 +45,7 @@ mod exit_cache; mod iter; mod progressive_balances_cache; mod pubkey_cache; +mod slashings_cache; mod tests; pub const CACHED_EPOCHS: usize = 3; @@ -103,6 +105,10 @@ pub enum Error { }, RelativeEpochError(RelativeEpochError), ExitCacheUninitialized, + SlashingsCacheUninitialized { + initialized_slot: Option, + latest_block_slot: Slot, + }, CommitteeCacheUninitialized(Option), SyncCommitteeCacheUninitialized, BlsError(bls::Error), @@ -153,6 +159,7 @@ pub enum Error { TotalActiveBalanceDiffUninitialized, MissingImmutableValidator(usize), IndexNotSupported(usize), + InvalidFlagIndex(usize), MerkleTreeError(merkle_proof::MerkleTreeError), } @@ -449,6 +456,12 @@ where #[test_random(default)] #[metastruct(exclude)] pub exit_cache: ExitCache, + #[serde(skip_serializing, skip_deserializing)] + #[ssz(skip_serializing, skip_deserializing)] + #[tree_hash(skip_hashing)] + #[test_random(default)] + #[metastruct(exclude)] + pub slashings_cache: SlashingsCache, /// Epoch cache of values that are useful for block processing that are static over an epoch. #[serde(skip_serializing, skip_deserializing)] #[ssz(skip_serializing, skip_deserializing)] @@ -516,6 +529,7 @@ impl BeaconState { ], pubkey_cache: PubkeyCache::default(), exit_cache: ExitCache::default(), + slashings_cache: SlashingsCache::default(), epoch_cache: EpochCache::default(), }) } @@ -1301,6 +1315,57 @@ impl BeaconState { } } + #[allow(clippy::type_complexity)] + pub fn mutable_validator_fields( + &mut self, + ) -> Result< + ( + &mut Validators, + &mut Balances, + &VList, + &VList, + &mut VList, + &mut ProgressiveBalancesCache, + &mut ExitCache, + &mut EpochCache, + ), + Error, + > { + match self { + BeaconState::Base(_) => Err(Error::IncorrectStateVariant), + BeaconState::Altair(state) => Ok(( + &mut state.validators, + &mut state.balances, + &state.previous_epoch_participation, + &state.current_epoch_participation, + &mut state.inactivity_scores, + &mut state.progressive_balances_cache, + &mut state.exit_cache, + &mut state.epoch_cache, + )), + BeaconState::Merge(state) => Ok(( + &mut state.validators, + &mut state.balances, + &state.previous_epoch_participation, + &state.current_epoch_participation, + &mut state.inactivity_scores, + &mut state.progressive_balances_cache, + &mut state.exit_cache, + &mut state.epoch_cache, + )), + BeaconState::Capella(state) => Ok(( + &mut state.validators, + &mut state.balances, + &state.previous_epoch_participation, + &state.current_epoch_participation, + &mut state.inactivity_scores, + &mut state.progressive_balances_cache, + &mut state.exit_cache, + &mut state.epoch_cache, + )), + } + } + /// Get a mutable reference to the balance of a single validator. pub fn get_balance_mut(&mut self, validator_index: usize) -> Result<&mut u64, Error> { self.balances_mut() @@ -1400,7 +1465,7 @@ impl BeaconState { epoch: Epoch, spec: &ChainSpec, ) -> Result { - Ok(epoch.safe_add(1)?.safe_add(spec.max_seed_lookahead)?) + Ok(spec.compute_activation_exit_epoch(epoch)?) } /// Return the churn limit for the current epoch (number of validators who can leave per epoch). @@ -1574,6 +1639,7 @@ impl BeaconState { self.build_all_committee_caches(spec)?; self.update_pubkey_cache()?; self.build_exit_cache(spec)?; + self.build_slashings_cache()?; Ok(()) } @@ -1594,6 +1660,20 @@ impl BeaconState { Ok(()) } + /// Build the slashings cache if it needs to be built. + pub fn build_slashings_cache(&mut self) -> Result<(), Error> { + let latest_block_slot = self.latest_block_header().slot; + if !self.slashings_cache().is_initialized(latest_block_slot) { + *self.slashings_cache_mut() = SlashingsCache::new(latest_block_slot, self.validators()); + } + Ok(()) + } + + pub fn slashings_cache_is_initialized(&self) -> bool { + let latest_block_slot = self.latest_block_header().slot; + self.slashings_cache().is_initialized(latest_block_slot) + } + /// Drop all caches on the state. pub fn drop_all_caches(&mut self) -> Result<(), Error> { self.drop_total_active_balance_cache(); @@ -1603,6 +1683,7 @@ impl BeaconState { self.drop_pubkey_cache(); self.drop_progressive_balances_cache(); *self.exit_cache_mut() = ExitCache::default(); + *self.slashings_cache_mut() = SlashingsCache::default(); *self.epoch_cache_mut() = EpochCache::default(); Ok(()) } diff --git a/consensus/types/src/beacon_state/compact_state.rs b/consensus/types/src/beacon_state/compact_state.rs index 4f332f92719..58d81576d09 100644 --- a/consensus/types/src/beacon_state/compact_state.rs +++ b/consensus/types/src/beacon_state/compact_state.rs @@ -52,6 +52,7 @@ macro_rules! full_to_compact { progressive_balances_cache: $s.progressive_balances_cache.clone(), pubkey_cache: $s.pubkey_cache.clone(), exit_cache: $s.exit_cache.clone(), + slashings_cache: $s.slashings_cache.clone(), epoch_cache: $s.epoch_cache.clone(), // Variant-specific fields @@ -114,6 +115,7 @@ macro_rules! compact_to_full { progressive_balances_cache: $inner.progressive_balances_cache, pubkey_cache: $inner.pubkey_cache, exit_cache: $inner.exit_cache, + slashings_cache: $inner.slashings_cache, epoch_cache: $inner.epoch_cache, // Variant-specific fields diff --git a/consensus/types/src/beacon_state/progressive_balances_cache.rs b/consensus/types/src/beacon_state/progressive_balances_cache.rs index 9f5c223d573..342b50f758f 100644 --- a/consensus/types/src/beacon_state/progressive_balances_cache.rs +++ b/consensus/types/src/beacon_state/progressive_balances_cache.rs @@ -1,5 +1,11 @@ use crate::beacon_state::balance::Balance; -use crate::{BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec}; +use crate::{ + consts::altair::{ + NUM_FLAG_INDICES, TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX, + TIMELY_TARGET_FLAG_INDEX, + }, + BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, ParticipationFlags, +}; use arbitrary::Arbitrary; use safe_arith::SafeArith; use serde_derive::{Deserialize, Serialize}; @@ -17,21 +23,110 @@ pub struct ProgressiveBalancesCache { #[derive(Debug, PartialEq, Arbitrary, Clone)] struct Inner { pub current_epoch: Epoch, - pub previous_epoch_target_attesting_balance: Balance, - pub current_epoch_target_attesting_balance: Balance, + pub previous_epoch_cache: EpochTotalBalances, + pub current_epoch_cache: EpochTotalBalances, +} + +/// Caches the participation values for one epoch (either the previous or current). +#[derive(PartialEq, Debug, Clone, Arbitrary)] +pub struct EpochTotalBalances { + /// Stores the sum of the balances for all validators in `self.unslashed_participating_indices` + /// for all flags in `NUM_FLAG_INDICES`. + /// + /// A flag balance is only incremented if a validator is in that flag set. + pub total_flag_balances: [Balance; NUM_FLAG_INDICES], +} + +impl EpochTotalBalances { + pub fn new(spec: &ChainSpec) -> Self { + let zero_balance = Balance::zero(spec.effective_balance_increment); + + Self { + total_flag_balances: [zero_balance; NUM_FLAG_INDICES], + } + } + + /// Returns the total balance of attesters who have `flag_index` set. + pub fn total_flag_balance(&self, flag_index: usize) -> Result { + self.total_flag_balances + .get(flag_index) + .map(Balance::get) + .ok_or(BeaconStateError::InvalidFlagIndex(flag_index)) + } + + /// Returns the raw total balance of attesters who have `flag_index` set. + pub fn total_flag_balance_raw(&self, flag_index: usize) -> Result { + self.total_flag_balances + .get(flag_index) + .copied() + .ok_or(BeaconStateError::InvalidFlagIndex(flag_index)) + } + + pub fn on_new_attestation( + &mut self, + flag_index: usize, + validator_effective_balance: u64, + ) -> Result<(), BeaconStateError> { + let balance = self + .total_flag_balances + .get_mut(flag_index) + .ok_or(BeaconStateError::InvalidFlagIndex(flag_index))?; + balance.safe_add_assign(validator_effective_balance)?; + Ok(()) + } + + pub fn on_slashing( + &mut self, + participation_flags: ParticipationFlags, + validator_effective_balance: u64, + ) -> Result<(), BeaconStateError> { + for flag_index in 0..NUM_FLAG_INDICES { + if participation_flags.has_flag(flag_index)? { + self.total_flag_balances + .get_mut(flag_index) + .ok_or(BeaconStateError::InvalidFlagIndex(flag_index))? + .safe_sub_assign(validator_effective_balance)?; + } + } + Ok(()) + } + + pub fn on_effective_balance_change( + &mut self, + current_epoch_participation_flags: ParticipationFlags, + old_effective_balance: u64, + new_effective_balance: u64, + ) -> Result<(), BeaconStateError> { + for flag_index in 0..NUM_FLAG_INDICES { + if current_epoch_participation_flags.has_flag(flag_index)? { + let total = self + .total_flag_balances + .get_mut(flag_index) + .ok_or(BeaconStateError::InvalidFlagIndex(flag_index))?; + if new_effective_balance > old_effective_balance { + total + .safe_add_assign(new_effective_balance.safe_sub(old_effective_balance)?)?; + } else { + total + .safe_sub_assign(old_effective_balance.safe_sub(new_effective_balance)?)?; + } + } + } + Ok(()) + } } impl ProgressiveBalancesCache { pub fn initialize( &mut self, current_epoch: Epoch, - previous_epoch_target_attesting_balance: Balance, - current_epoch_target_attesting_balance: Balance, + previous_epoch_cache: EpochTotalBalances, + current_epoch_cache: EpochTotalBalances, ) { self.inner = Some(Inner { current_epoch, - previous_epoch_target_attesting_balance, - current_epoch_target_attesting_balance, + previous_epoch_cache, + current_epoch_cache, }); } @@ -39,24 +134,31 @@ impl ProgressiveBalancesCache { self.inner.is_some() } + pub fn is_initialized_at(&self, epoch: Epoch) -> bool { + self.inner + .as_ref() + .map_or(false, |inner| inner.current_epoch == epoch) + } + /// When a new target attestation has been processed, we update the cached /// `current_epoch_target_attesting_balance` to include the validator effective balance. /// If the epoch is neither the current epoch nor the previous epoch, an error is returned. - pub fn on_new_target_attestation( + pub fn on_new_attestation( &mut self, epoch: Epoch, + flag_index: usize, validator_effective_balance: u64, ) -> Result<(), BeaconStateError> { let cache = self.get_inner_mut()?; if epoch == cache.current_epoch { cache - .current_epoch_target_attesting_balance - .safe_add_assign(validator_effective_balance)?; + .current_epoch_cache + .on_new_attestation(flag_index, validator_effective_balance)?; } else if epoch.safe_add(1)? == cache.current_epoch { cache - .previous_epoch_target_attesting_balance - .safe_add_assign(validator_effective_balance)?; + .previous_epoch_cache + .on_new_attestation(flag_index, validator_effective_balance)?; } else { return Err(BeaconStateError::ProgressiveBalancesCacheInconsistent); } @@ -68,21 +170,17 @@ impl ProgressiveBalancesCache { /// validator's effective balance to exclude the validator weight. pub fn on_slashing( &mut self, - is_previous_epoch_target_attester: bool, - is_current_epoch_target_attester: bool, + previous_epoch_participation: ParticipationFlags, + current_epoch_participation: ParticipationFlags, effective_balance: u64, ) -> Result<(), BeaconStateError> { let cache = self.get_inner_mut()?; - if is_previous_epoch_target_attester { - cache - .previous_epoch_target_attesting_balance - .safe_sub_assign(effective_balance)?; - } - if is_current_epoch_target_attester { - cache - .current_epoch_target_attesting_balance - .safe_sub_assign(effective_balance)?; - } + cache + .previous_epoch_cache + .on_slashing(previous_epoch_participation, effective_balance)?; + cache + .current_epoch_cache + .on_slashing(current_epoch_participation, effective_balance)?; Ok(()) } @@ -90,22 +188,16 @@ impl ProgressiveBalancesCache { /// its share of the target attesting balance in the cache. pub fn on_effective_balance_change( &mut self, - is_current_epoch_target_attester: bool, + current_epoch_participation: ParticipationFlags, old_effective_balance: u64, new_effective_balance: u64, ) -> Result<(), BeaconStateError> { let cache = self.get_inner_mut()?; - if is_current_epoch_target_attester { - if new_effective_balance > old_effective_balance { - cache - .current_epoch_target_attesting_balance - .safe_add_assign(new_effective_balance.safe_sub(old_effective_balance)?)?; - } else { - cache - .current_epoch_target_attesting_balance - .safe_sub_assign(old_effective_balance.safe_sub(new_effective_balance)?)?; - } - } + cache.current_epoch_cache.on_effective_balance_change( + current_epoch_participation, + old_effective_balance, + new_effective_balance, + )?; Ok(()) } @@ -114,25 +206,53 @@ impl ProgressiveBalancesCache { pub fn on_epoch_transition(&mut self, spec: &ChainSpec) -> Result<(), BeaconStateError> { let cache = self.get_inner_mut()?; cache.current_epoch.safe_add_assign(1)?; - cache.previous_epoch_target_attesting_balance = - cache.current_epoch_target_attesting_balance; - cache.current_epoch_target_attesting_balance = - Balance::zero(spec.effective_balance_increment); + cache.previous_epoch_cache = std::mem::replace( + &mut cache.current_epoch_cache, + EpochTotalBalances::new(spec), + ); Ok(()) } + pub fn previous_epoch_flag_attesting_balance( + &self, + flag_index: usize, + ) -> Result { + self.get_inner()? + .previous_epoch_cache + .total_flag_balance(flag_index) + } + + pub fn current_epoch_flag_attesting_balance( + &self, + flag_index: usize, + ) -> Result { + self.get_inner()? + .current_epoch_cache + .total_flag_balance(flag_index) + } + + pub fn previous_epoch_source_attesting_balance(&self) -> Result { + self.previous_epoch_flag_attesting_balance(TIMELY_SOURCE_FLAG_INDEX) + } + pub fn previous_epoch_target_attesting_balance(&self) -> Result { - Ok(self - .get_inner()? - .previous_epoch_target_attesting_balance - .get()) + self.previous_epoch_flag_attesting_balance(TIMELY_TARGET_FLAG_INDEX) + } + + pub fn previous_epoch_head_attesting_balance(&self) -> Result { + self.previous_epoch_flag_attesting_balance(TIMELY_HEAD_FLAG_INDEX) + } + + pub fn current_epoch_source_attesting_balance(&self) -> Result { + self.current_epoch_flag_attesting_balance(TIMELY_SOURCE_FLAG_INDEX) } pub fn current_epoch_target_attesting_balance(&self) -> Result { - Ok(self - .get_inner()? - .current_epoch_target_attesting_balance - .get()) + self.current_epoch_flag_attesting_balance(TIMELY_TARGET_FLAG_INDEX) + } + + pub fn current_epoch_head_attesting_balance(&self) -> Result { + self.current_epoch_flag_attesting_balance(TIMELY_HEAD_FLAG_INDEX) } fn get_inner_mut(&mut self) -> Result<&mut Inner, BeaconStateError> { @@ -158,7 +278,7 @@ pub enum ProgressiveBalancesMode { /// Enable the usage of progressive cache, with checks against the `ParticipationCache` and falls /// back to the existing calculation if there is a balance mismatch. Checked, - /// Enable the usage of progressive cache, with checks against the `ParticipationCache`. Errors + /// Enable the usage of progressive cache, with checks against the `ParticipationCache`. BeaconStateErrors /// if there is a balance mismatch. Used in testing only. Strict, /// Enable the usage of progressive cache, with no comparative checks against the diff --git a/consensus/types/src/beacon_state/slashings_cache.rs b/consensus/types/src/beacon_state/slashings_cache.rs new file mode 100644 index 00000000000..19813ebbfe1 --- /dev/null +++ b/consensus/types/src/beacon_state/slashings_cache.rs @@ -0,0 +1,63 @@ +use crate::{BeaconStateError, Slot, Validator}; +use arbitrary::Arbitrary; +use rpds::HashTrieSetSync as HashTrieSet; + +/// Persistent (cheap to clone) cache of all slashed validator indices. +#[derive(Debug, Default, Clone, PartialEq, Arbitrary)] +pub struct SlashingsCache { + latest_block_slot: Option, + #[arbitrary(default)] + slashed_validators: HashTrieSet, +} + +impl SlashingsCache { + /// Initialize a new cache for the given list of validators. + pub fn new<'a, V, I>(latest_block_slot: Slot, validators: V) -> Self + where + V: IntoIterator, + I: ExactSizeIterator + Iterator, + { + let slashed_validators = validators + .into_iter() + .enumerate() + .filter_map(|(i, validator)| validator.slashed().then_some(i)) + .collect(); + Self { + latest_block_slot: Some(latest_block_slot), + slashed_validators, + } + } + + pub fn is_initialized(&self, slot: Slot) -> bool { + self.latest_block_slot == Some(slot) + } + + pub fn check_initialized(&self, latest_block_slot: Slot) -> Result<(), BeaconStateError> { + if self.is_initialized(latest_block_slot) { + Ok(()) + } else { + Err(BeaconStateError::SlashingsCacheUninitialized { + initialized_slot: self.latest_block_slot, + latest_block_slot, + }) + } + } + + pub fn record_validator_slashing( + &mut self, + block_slot: Slot, + validator_index: usize, + ) -> Result<(), BeaconStateError> { + self.check_initialized(block_slot)?; + self.slashed_validators.insert_mut(validator_index); + Ok(()) + } + + pub fn is_slashed(&self, validator_index: usize) -> bool { + self.slashed_validators.contains(&validator_index) + } + + pub fn update_latest_block_slot(&mut self, latest_block_slot: Slot) { + self.latest_block_slot = Some(latest_block_slot); + } +} diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 24448691593..95a45ce12f0 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -1,6 +1,7 @@ use crate::application_domain::{ApplicationDomain, APPLICATION_DOMAIN_BUILDER}; use crate::*; use int_to_bytes::int_to_bytes4; +use safe_arith::{ArithError, SafeArith}; use serde::{Deserializer, Serialize, Serializer}; use serde_derive::Deserialize; use serde_utils::quoted_u64::MaybeQuoted; @@ -285,12 +286,17 @@ impl ChainSpec { } /// For a given `BeaconState`, return the inactivity penalty quotient associated with its variant. + // FIXME(sproul): delete once unused pub fn inactivity_penalty_quotient_for_state(&self, state: &BeaconState) -> u64 { - match state { - BeaconState::Base(_) => self.inactivity_penalty_quotient, - BeaconState::Altair(_) => self.inactivity_penalty_quotient_altair, - BeaconState::Merge(_) => self.inactivity_penalty_quotient_bellatrix, - BeaconState::Capella(_) => self.inactivity_penalty_quotient_bellatrix, + self.inactivity_penalty_quotient_for_fork(state.fork_name_unchecked()) + } + + pub fn inactivity_penalty_quotient_for_fork(&self, fork_name: ForkName) -> u64 { + match fork_name { + ForkName::Base => self.inactivity_penalty_quotient, + ForkName::Altair => self.inactivity_penalty_quotient_altair, + ForkName::Merge => self.inactivity_penalty_quotient_bellatrix, + ForkName::Capella => self.inactivity_penalty_quotient_bellatrix, } } @@ -458,6 +464,10 @@ impl ChainSpec { Hash256::from(domain) } + pub fn compute_activation_exit_epoch(&self, epoch: Epoch) -> Result { + epoch.safe_add(1)?.safe_add(self.max_seed_lookahead) + } + #[allow(clippy::integer_arithmetic)] pub const fn attestation_subnet_prefix_bits(&self) -> u32 { let attestation_subnet_count_bits = self.attestation_subnet_count.ilog2(); diff --git a/consensus/types/src/epoch_cache.rs b/consensus/types/src/epoch_cache.rs index 07dcb0c56eb..1f717f2fcaf 100644 --- a/consensus/types/src/epoch_cache.rs +++ b/consensus/types/src/epoch_cache.rs @@ -1,5 +1,5 @@ -use crate::{ActivationQueue, BeaconStateError, Epoch, EthSpec, Hash256, Slot}; -use safe_arith::ArithError; +use crate::{ActivationQueue, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, Slot}; +use safe_arith::{ArithError, SafeArith}; use std::sync::Arc; /// Cache of values which are uniquely determined at the start of an epoch. @@ -18,10 +18,16 @@ struct Inner { /// Unique identifier for this cache, which can be used to check its validity before use /// with any `BeaconState`. key: EpochCacheKey, - /// Base reward for every validator in this epoch. + /// Effective balance for every validator in this epoch. + effective_balances: Vec, + /// Base rewards for every effective balance increment (currently 0..32 ETH). + /// + /// Keyed by `effective_balance / effective_balance_increment`. base_rewards: Vec, /// Validator activation queue. activation_queue: ActivationQueue, + /// Effective balance increment. + effective_balance_increment: u64, } #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, arbitrary::Arbitrary)] @@ -35,6 +41,7 @@ pub enum EpochCacheError { IncorrectEpoch { cache: Epoch, state: Epoch }, IncorrectDecisionBlock { cache: Hash256, state: Hash256 }, ValidatorIndexOutOfBounds { validator_index: usize }, + EffectiveBalanceOutOfBounds { effective_balance_eth: usize }, InvalidSlot { slot: Slot }, Arith(ArithError), BeaconState(BeaconStateError), @@ -56,14 +63,18 @@ impl From for EpochCacheError { impl EpochCache { pub fn new( key: EpochCacheKey, + effective_balances: Vec, base_rewards: Vec, activation_queue: ActivationQueue, + spec: &ChainSpec, ) -> EpochCache { Self { inner: Some(Arc::new(Inner { key, + effective_balances, base_rewards, activation_queue, + effective_balance_increment: spec.effective_balance_increment, })), } } @@ -93,16 +104,34 @@ impl EpochCache { } #[inline] - pub fn get_base_reward(&self, validator_index: usize) -> Result { + pub fn get_effective_balance(&self, validator_index: usize) -> Result { self.inner .as_ref() .ok_or(EpochCacheError::CacheNotInitialized)? - .base_rewards + .effective_balances .get(validator_index) .copied() .ok_or(EpochCacheError::ValidatorIndexOutOfBounds { validator_index }) } + #[inline] + pub fn get_base_reward(&self, validator_index: usize) -> Result { + let inner = self + .inner + .as_ref() + .ok_or(EpochCacheError::CacheNotInitialized)?; + let effective_balance = self.get_effective_balance(validator_index)?; + let effective_balance_eth = + effective_balance.safe_div(inner.effective_balance_increment)? as usize; + inner + .base_rewards + .get(effective_balance_eth) + .copied() + .ok_or(EpochCacheError::EffectiveBalanceOutOfBounds { + effective_balance_eth, + }) + } + pub fn activation_queue(&self) -> Result<&ActivationQueue, EpochCacheError> { let inner = self .inner diff --git a/lcli/src/skip_slots.rs b/lcli/src/skip_slots.rs index 9a78d032b09..bd0fdbf823c 100644 --- a/lcli/src/skip_slots.rs +++ b/lcli/src/skip_slots.rs @@ -51,6 +51,7 @@ use environment::Environment; use eth2::{types::StateId, BeaconNodeHttpClient, SensitiveUrl, Timeouts}; use ssz::Encode; use state_processing::state_advance::{complete_state_advance, partial_state_advance}; +use state_processing::AllCaches; use std::fs::File; use std::io::prelude::*; use std::path::PathBuf; @@ -109,7 +110,7 @@ pub fn run(env: Environment, matches: &ArgMatches) -> Result<(), let target_slot = initial_slot + slots; state - .build_caches(spec) + .build_all_caches(spec) .map_err(|e| format!("Unable to build caches: {:?}", e))?; let state_root = if let Some(root) = cli_state_root.or(state_root) { diff --git a/lcli/src/transition_blocks.rs b/lcli/src/transition_blocks.rs index d2f921e90e8..92df22e0fc6 100644 --- a/lcli/src/transition_blocks.rs +++ b/lcli/src/transition_blocks.rs @@ -72,10 +72,9 @@ use eth2::{ BeaconNodeHttpClient, SensitiveUrl, Timeouts, }; use ssz::Encode; -use state_processing::epoch_cache::initialize_epoch_cache; use state_processing::{ block_signature_verifier::BlockSignatureVerifier, per_block_processing, per_slot_processing, - BlockSignatureStrategy, ConsensusContext, StateProcessingStrategy, VerifyBlockRoot, + AllCaches, BlockSignatureStrategy, ConsensusContext, StateProcessingStrategy, VerifyBlockRoot, }; use std::borrow::Cow; use std::fs::File; @@ -209,7 +208,7 @@ pub fn run(env: Environment, matches: &ArgMatches) -> Result<(), if config.exclude_cache_builds { pre_state - .build_caches(spec) + .build_all_caches(spec) .map_err(|e| format!("Unable to build caches: {:?}", e))?; let state_root = pre_state .update_tree_hash_cache() @@ -313,7 +312,7 @@ fn do_transition( if !config.exclude_cache_builds { let t = Instant::now(); pre_state - .build_caches(spec) + .build_all_caches(spec) .map_err(|e| format!("Unable to build caches: {:?}", e))?; debug!("Build caches: {:?}", t.elapsed()); @@ -343,9 +342,12 @@ fn do_transition( } debug!("Slot processing: {:?}", t.elapsed()); + // Slot and epoch processing should keep the caches fully primed. + assert!(pre_state.all_caches_built()); + let t = Instant::now(); pre_state - .build_caches(spec) + .build_all_caches(spec) .map_err(|e| format!("Unable to build caches: {:?}", e))?; debug!("Build all caches (again): {:?}", t.elapsed()); @@ -355,11 +357,6 @@ fn do_transition( let ctxt = ConsensusContext::new(pre_state.slot()) .set_current_block_root(block_root) .set_proposer_index(block.message().proposer_index()); - - if config.exclude_cache_builds { - let epoch = pre_state.current_epoch(); - initialize_epoch_cache(&mut pre_state, epoch, spec).map_err(|e| format!("{e:?}"))?; - } ctxt }; diff --git a/testing/ef_tests/src/cases/epoch_processing.rs b/testing/ef_tests/src/cases/epoch_processing.rs index b8dd2f47c56..1b15ae075ec 100644 --- a/testing/ef_tests/src/cases/epoch_processing.rs +++ b/testing/ef_tests/src/cases/epoch_processing.rs @@ -5,13 +5,17 @@ use crate::decode::{ssz_decode_state, yaml_decode_file}; use crate::type_name; use crate::type_name::TypeName; use serde_derive::Deserialize; +use state_processing::common::update_progressive_balances_cache::initialize_progressive_balances_cache; use state_processing::epoch_cache::initialize_epoch_cache; use state_processing::per_epoch_processing::capella::process_historical_summaries_update; -use state_processing::per_epoch_processing::effective_balance_updates::process_effective_balance_updates; +use state_processing::per_epoch_processing::effective_balance_updates::{ + process_effective_balance_updates, process_effective_balance_updates_slow, +}; use state_processing::per_epoch_processing::{ altair, base, historical_roots_update::process_historical_roots_update, - process_registry_updates, process_slashings, + process_registry_updates, process_registry_updates_slow, process_slashings, + process_slashings_slow, resets::{process_eth1_data_reset, process_randao_mixes_reset, process_slashings_reset}, }; use state_processing::EpochProcessingError; @@ -103,11 +107,9 @@ impl EpochTransition for JustificationAndFinalization { Ok(()) } BeaconState::Altair(_) | BeaconState::Merge(_) | BeaconState::Capella(_) => { + initialize_progressive_balances_cache(state, None, spec)?; let justification_and_finalization_state = - altair::process_justification_and_finalization( - state, - &altair::ParticipationCache::new(state, spec).unwrap(), - )?; + altair::process_justification_and_finalization(state)?; justification_and_finalization_state.apply_changes_to_state(state); Ok(()) } @@ -124,11 +126,7 @@ impl EpochTransition for RewardsAndPenalties { base::process_rewards_and_penalties(state, &mut validator_statuses, spec) } BeaconState::Altair(_) | BeaconState::Merge(_) | BeaconState::Capella(_) => { - altair::process_rewards_and_penalties( - state, - &altair::ParticipationCache::new(state, spec).unwrap(), - spec, - ) + altair::process_rewards_and_penalties_slow(state, spec) } } } @@ -136,8 +134,13 @@ impl EpochTransition for RewardsAndPenalties { impl EpochTransition for RegistryUpdates { fn run(state: &mut BeaconState, spec: &ChainSpec) -> Result<(), EpochProcessingError> { - initialize_epoch_cache(state, state.current_epoch(), spec)?; - process_registry_updates(state, spec) + initialize_epoch_cache(state, spec)?; + + if let BeaconState::Base(_) = state { + process_registry_updates(state, spec) + } else { + process_registry_updates_slow(state, spec) + } } } @@ -149,19 +152,12 @@ impl EpochTransition for Slashings { validator_statuses.process_attestations(state)?; process_slashings( state, - None, validator_statuses.total_balances.current_epoch(), spec, )?; } BeaconState::Altair(_) | BeaconState::Merge(_) | BeaconState::Capella(_) => { - let mut cache = altair::ParticipationCache::new(state, spec).unwrap(); - process_slashings( - state, - Some(cache.process_slashings_indices()), - cache.current_epoch_total_active_balance(), - spec, - )?; + process_slashings_slow(state, spec)?; } }; Ok(()) @@ -176,7 +172,11 @@ impl EpochTransition for Eth1DataReset { impl EpochTransition for EffectiveBalanceUpdates { fn run(state: &mut BeaconState, spec: &ChainSpec) -> Result<(), EpochProcessingError> { - process_effective_balance_updates(state, None, spec) + if let BeaconState::Base(_) = state { + process_effective_balance_updates(state, spec) + } else { + process_effective_balance_updates_slow(state, spec) + } } } @@ -238,11 +238,7 @@ impl EpochTransition for InactivityUpdates { match state { BeaconState::Base(_) => Ok(()), BeaconState::Altair(_) | BeaconState::Merge(_) | BeaconState::Capella(_) => { - altair::process_inactivity_updates( - state, - &mut altair::ParticipationCache::new(state, spec).unwrap(), - spec, - ) + altair::process_inactivity_updates_slow(state, spec) } } } diff --git a/testing/ef_tests/src/cases/operations.rs b/testing/ef_tests/src/cases/operations.rs index 163d492e544..2d4844eb861 100644 --- a/testing/ef_tests/src/cases/operations.rs +++ b/testing/ef_tests/src/cases/operations.rs @@ -88,7 +88,7 @@ impl Operation for Attestation { spec: &ChainSpec, _: &Operations, ) -> Result<(), BlockProcessingError> { - initialize_epoch_cache(state, state.current_epoch(), spec)?; + initialize_epoch_cache(state, spec)?; let mut ctxt = ConsensusContext::new(state.slot()); match state { BeaconState::Base(_) => base::process_attestations(