Skip to content

Commit

Permalink
add migration for AtStake stale entries + autocompound, fix for remov…
Browse files Browse the repository at this point in the history
…ing candidates without blocks from rewarded rounds (moonbeam-foundation#1878)

* add migration for AtStake

* add validations

* add migrations to remove the stale AtStake entries before autoCompound migrations

* better logs

* fix AtStake not cleaned up for candidates not producing blocks

* Allow fork test to start from the expected round index

Co-authored-by: Crystalin <alan@purestake.com>
  • Loading branch information
2 people authored and imstar15 committed May 16, 2023
1 parent baba4dc commit 79a84e6
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 150 deletions.
45 changes: 5 additions & 40 deletions pallets/parachain-staking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,11 +580,6 @@ pub mod pallet {
ValueQuery,
>;

/// Migration storage holding value for collators already migrated to the new snapshot variant
#[pallet::storage]
#[pallet::getter(fn migrated_at_stake)]
pub type MigratedAtStake<T: Config> = StorageValue<_, RoundIndex, OptionQuery>;

#[pallet::storage]
#[pallet::getter(fn delayed_payouts)]
/// Delayed payouts
Expand Down Expand Up @@ -1521,6 +1516,10 @@ pub mod pallet {
// clean up storage items that we no longer need
<DelayedPayouts<T>>::remove(paid_for_round);
<Points<T>>::remove(paid_for_round);

// remove up to 1000 candidates that did not produce any blocks for
// the given round
let _ = <AtStake<T>>::clear_prefix(paid_for_round, 1000, None);
}
result.1 // weight consumed by pay_one_collator_reward
} else {
Expand Down Expand Up @@ -1561,36 +1560,7 @@ pub mod pallet {
let mut amt_due = total_paid;
// Take the snapshot of block author and delegations

// Decode [CollatorSnapshot] depending upon when the storage was migrated
let is_at_stake_migrated = <MigratedAtStake<T>>::get()
.map_or(false, |migrated_at_round| {
paid_for_round >= migrated_at_round
});
#[allow(deprecated)]
let state = if is_at_stake_migrated {
let at_stake: CollatorSnapshot<T::AccountId, BalanceOf<T>> =
<AtStake<T>>::take(paid_for_round, &collator);
at_stake
} else {
// storage still not migrated, decode as deprecated CollatorSnapshot.
let key = <AtStake<T>>::hashed_key_for(paid_for_round, &collator);
let at_stake: deprecated::CollatorSnapshot<T::AccountId, BalanceOf<T>> =
frame_support::storage::unhashed::get(&key).unwrap_or_default();

CollatorSnapshot {
bond: at_stake.bond,
delegations: at_stake
.delegations
.into_iter()
.map(|d| BondWithAutoCompound {
owner: d.owner,
amount: d.amount,
auto_compound: Percent::zero(),
})
.collect(),
total: at_stake.total,
}
};
let state = <AtStake<T>>::take(paid_for_round, &collator);

let num_delegators = state.delegations.len();
if state.delegations.is_empty() {
Expand Down Expand Up @@ -1736,11 +1706,6 @@ pub mod pallet {
delegations: rewardable_delegations,
total: total_counted,
};
<MigratedAtStake<T>>::mutate(|v| {
if v.is_none() {
*v = Some(now);
}
});
<AtStake<T>>::insert(now, account, snapshot);
Self::deposit_event(Event::CollatorChosen {
round: now,
Expand Down
261 changes: 256 additions & 5 deletions pallets/parachain-staking/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
use crate::delegation_requests::{DelegationAction, ScheduledRequest};
use crate::pallet::{DelegationScheduledRequests, DelegatorState, Total};
#[allow(deprecated)]
use crate::types::deprecated::{DelegationChange, Delegator as OldDelegator};
use crate::types::Delegator;
use crate::types::deprecated::{
CollatorSnapshot as OldCollatorSnapshot, DelegationChange, Delegator as OldDelegator,
};
use crate::types::{CollatorSnapshot, Delegator};
use crate::{
BalanceOf, Bond, BottomDelegations, CandidateInfo, CandidateMetadata, CapacityStatus,
CollatorCandidate, Config, Delegations, Event, Pallet, Points, Round, Staked, TopDelegations,
AtStake, BalanceOf, Bond, BondWithAutoCompound, BottomDelegations, CandidateInfo,
CandidateMetadata, CapacityStatus, CollatorCandidate, Config, DelayedPayouts, Delegations,
Event, Pallet, Points, Round, RoundIndex, Staked, TopDelegations,
};
#[cfg(feature = "try-runtime")]
use frame_support::traits::OnRuntimeUpgradeHelpersExt;
Expand All @@ -42,7 +45,255 @@ use frame_support::{
#[cfg(feature = "try-runtime")]
use scale_info::prelude::string::String;
use sp_runtime::traits::{Saturating, Zero};
use sp_std::{convert::TryInto, vec::Vec};
use sp_runtime::Percent;
use sp_std::collections::btree_set::BTreeSet;
use sp_std::{convert::TryInto, vec, vec::Vec};

/// Migration `AtStake` storage item to include auto-compound value
pub struct MigrateAtStakeAutoCompound<T>(PhantomData<T>);
impl<T: Config> MigrateAtStakeAutoCompound<T> {
const PALLET_PREFIX: &'static [u8] = b"ParachainStaking";
const AT_STAKE_PREFIX: &'static [u8] = b"AtStake";
}
impl<T: Config> OnRuntimeUpgrade for MigrateAtStakeAutoCompound<T> {
#[allow(deprecated)]
fn on_runtime_upgrade() -> Weight {
use sp_std::collections::btree_set::BTreeSet;

log::info!(target: "MigrateAtStakeAutoCompound", "running migration to add auto-compound values");
let mut reads = 0u64;
let mut writes = 0u64;

let max_unpaid_round = <Round<T>>::get()
.current
.saturating_sub(T::RewardPaymentDelay::get());

// validate only from `max_unpaid_round`, since we have some stale entries and this adds
// to the PoV size during try-runtime
<AtStake<T>>::translate(
|round, candidate, old_state: OldCollatorSnapshot<T::AccountId, BalanceOf<T>>| {
reads = reads.saturating_add(1);
writes = writes.saturating_add(1);

log::info!(
target: "MigrateAtStakeAutoCompound",
"migration from old format round {:?}, candidate {:?}", round, candidate
);
Some(CollatorSnapshot {
bond: old_state.bond,
delegations: old_state
.delegations
.into_iter()
.map(|d| BondWithAutoCompound {
owner: d.owner,
amount: d.amount,
auto_compound: Percent::zero(),
})
.collect(),
total: old_state.total,
})
},
);

T::DbWeight::get().reads_writes(reads, writes)
}

#[allow(deprecated)]
#[cfg(feature = "try-runtime")]
fn pre_upgrade() -> Result<(), &'static str> {
use frame_support::storage;

let mut num_to_update = 0u32;
let mut rounds_candidates = vec![];

// validate only from `max_unpaid_round`, since we have some stale entries and this adds
// to the PoV size during try-runtime
let current_round = <Round<T>>::get().current;
let max_unpaid_round = current_round.saturating_sub(T::RewardPaymentDelay::get());
for round in max_unpaid_round..=current_round {
for candidate in <AtStake<T>>::iter_key_prefix(round) {
let key = <AtStake<T>>::hashed_key_for(round.clone(), candidate.clone());
let state: OldCollatorSnapshot<T::AccountId, BalanceOf<T>> =
storage::unhashed::get(&key).expect("unable to decode value");

num_to_update = num_to_update.saturating_add(1);
rounds_candidates.push((round.clone(), candidate.clone()));
let mut delegation_str = vec![];
for d in state.delegations {
delegation_str.push(format!(
"owner={:?}_amount={:?}_autoCompound=0%",
d.owner, d.amount
));
}
Self::set_temp_storage(
format!(
"bond={:?}_total={:?}_delegations={:?}",
state.bond, state.total, delegation_str
),
&*format!("round_{:?}_candidate_{:?}", round, candidate),
);
}
}

rounds_candidates.sort();
Self::set_temp_storage(format!("{:?}", rounds_candidates), "rounds_candidates");
Self::set_temp_storage(num_to_update, "num_to_update");
Ok(())
}

#[cfg(feature = "try-runtime")]
fn post_upgrade() -> Result<(), &'static str> {
let mut num_updated = 0u32;
let mut rounds_candidates = vec![];
let max_unpaid_round = <Round<T>>::get()
.current
.saturating_sub(T::RewardPaymentDelay::get());
for (round, candidate, state) in <AtStake<T>>::iter() {
if round < max_unpaid_round {
log::warn!(
target: "MigrateAtStakeAutoCompound",
"skipping storage check for round {:?}, this round entry should not exist",
round
);
continue;
}

num_updated = num_updated.saturating_add(1);
rounds_candidates.push((round.clone(), candidate.clone()));
let mut delegation_str = vec![];
for d in state.delegations {
delegation_str.push(format!(
"owner={:?}_amount={:?}_autoCompound={:?}",
d.owner, d.amount, d.auto_compound
));
}
assert_eq!(
Some(format!(
"bond={:?}_total={:?}_delegations={:?}",
state.bond, state.total, delegation_str
)),
Self::get_temp_storage(&*format!("round_{:?}_candidate_{:?}", round, candidate)),
"incorrect delegations migration for round_{:?}_candidate_{:?}",
round,
candidate,
);
}

rounds_candidates.sort();
assert_eq!(
Some(format!("{:?}", rounds_candidates)),
Self::get_temp_storage("rounds_candidates")
);
assert_eq!(Some(num_updated), Self::get_temp_storage("num_to_update"));
Ok(())
}
}

/// Removes old entries for paid rounds from the `AtStake` storage item.
pub struct RemovePaidRoundsFromAtStake<T>(PhantomData<T>);
impl<T: Config> OnRuntimeUpgrade for RemovePaidRoundsFromAtStake<T> {
#[allow(deprecated)]
fn on_runtime_upgrade() -> Weight {
use sp_std::collections::btree_set::BTreeSet;

let mut reads = 0u64;
let mut writes = 0u64;
let max_unpaid_round = <Round<T>>::get()
.current
.saturating_sub(T::RewardPaymentDelay::get());

log::info!(
target: "RemovePaidRoundsFromAtStake",
"running migration to remove entries for paid rounds < {:?}",
max_unpaid_round,
);

// Remove all the keys that are older than the last possible unpaid round. As an additional
// check we also verify that the `Points` & `DelayedPayouts` storage item have already been
// removed to avoid the risk to removing the snapshot with outstanding errors.
<AtStake<T>>::iter_keys()
.filter(|(round, _)| {
round < &max_unpaid_round
&& !<Points<T>>::contains_key(round)
&& !<DelayedPayouts<T>>::contains_key(round)
})
.map(|(round, _)| round)
.collect::<BTreeSet<_>>()
.iter()
.for_each(|round| {
writes = writes.saturating_add(1);
log::info!(target: "RemovePaidRoundsFromAtStake", "removing round {:?}", round);
<AtStake<T>>::remove_prefix(round, None);
});

T::DbWeight::get().reads_writes(reads, writes)
}

#[allow(deprecated)]
#[cfg(feature = "try-runtime")]
fn pre_upgrade() -> Result<(), &'static str> {
let max_unpaid_round = <Round<T>>::get()
.current
.saturating_sub(T::RewardPaymentDelay::get());

let rounds_to_keep = <AtStake<T>>::iter_keys()
.filter(|(round, _)| {
if round >= &max_unpaid_round {
true
} else {
let points_exist = <Points<T>>::contains_key(round);
let delayed_payouts_exist = <DelayedPayouts<T>>::contains_key(round);
if points_exist {
log::info!(
target: "RemovePaidRoundsFromAtStake",
"Points storage still exists for round {:?}, max_unpaid_round {:?}, \
entry will not be removed",
round,
max_unpaid_round
);
};
if delayed_payouts_exist {
log::info!(
target: "RemovePaidRoundsFromAtStake",
"DelayedPayouts storage still exists for round {:?}, max_unpaid_round {:?}, \
entry will not be removed",
round,
max_unpaid_round
);
};
points_exist || delayed_payouts_exist
}
})
.map(|(round, _)| round)
.collect::<BTreeSet<_>>();
Self::set_temp_storage(format!("{:?}", rounds_to_keep), "rounds_to_keep");
Ok(())
}

#[cfg(feature = "try-runtime")]
fn post_upgrade() -> Result<(), &'static str> {
let max_unpaid_round = <Round<T>>::get()
.current
.saturating_sub(T::RewardPaymentDelay::get());
let rounds_kept = <AtStake<T>>::iter_keys()
.map(|(round, _)| {
assert!(
round >= max_unpaid_round,
"unexpected stale round storage item, max_unpaid_round={:?}, got={:?}",
max_unpaid_round,
round
);
round
})
.collect::<BTreeSet<_>>();

assert_eq!(
Some(format!("{:?}", rounds_kept)),
Self::get_temp_storage("rounds_to_keep")
);
Ok(())
}
}

/// Migration to move delegator requests towards a delegation, from [DelegatorState] into
/// [DelegationScheduledRequests] storage item.
Expand Down

0 comments on commit 79a84e6

Please sign in to comment.