Skip to content

Commit

Permalink
Elastic scaling: runtime dependency tracking and enactment (paritytec…
Browse files Browse the repository at this point in the history
…h#3479)

Changes needed to implement the runtime part of elastic scaling:
paritytech#3131,
paritytech#3132,
paritytech#3202

Also fixes paritytech#3675

TODOs:

- [x] storage migration
- [x] optimise process_candidates from O(N^2)
- [x] drop backable candidates which form cycles
- [x] fix unit tests
- [x] add more unit tests
- [x] check the runtime APIs which use the pending availability storage.
We need to expose all of them, see
paritytech#3576
- [x] optimise the candidate selection. we're currently picking randomly
until we satisfy the weight limit. we need to be smart about not
breaking candidate chains while being fair to all paras -
paritytech#3573

Relies on the changes made in
paritytech#3233 in terms of the
inclusion policy and the candidate ordering

---------

Signed-off-by: alindima <alin@parity.io>
Co-authored-by: command-bot <>
Co-authored-by: eskimor <eskimor@users.noreply.github.com>
  • Loading branch information
2 people authored and dharjeezy committed Mar 24, 2024
1 parent 8c6c86d commit 82aafe7
Show file tree
Hide file tree
Showing 18 changed files with 4,075 additions and 1,748 deletions.
11 changes: 6 additions & 5 deletions polkadot/roadmap/implementers-guide/src/runtime/inclusion.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,16 @@ All failed checks should lead to an unrecoverable error making the block invalid
// return a vector of cleaned-up core IDs.
}
```
* `force_enact(ParaId)`: Forcibly enact the candidate with the given ID as though it had been deemed available by
bitfields. Is a no-op if there is no candidate pending availability for this para-id. This should generally not be
used but it is useful during execution of Runtime APIs, where the changes to the state are expected to be discarded
directly after.
* `force_enact(ParaId)`: Forcibly enact the pending candidates of the given paraid as though they had been deemed
available by bitfields. Is a no-op if there is no candidate pending availability for this para-id.
If there are multiple candidates pending availability for this para-id, it will enact all of
them. This should generally not be used but it is useful during execution of Runtime APIs,
where the changes to the state are expected to be discarded directly after.
* `candidate_pending_availability(ParaId) -> Option<CommittedCandidateReceipt>`: returns the `CommittedCandidateReceipt`
pending availability for the para provided, if any.
* `pending_availability(ParaId) -> Option<CandidatePendingAvailability>`: returns the metadata around the candidate
pending availability for the para, if any.
* `collect_disputed(disputed: Vec<CandidateHash>) -> Vec<CoreIndex>`: Sweeps through all paras pending availability. If
* `free_disputed(disputed: Vec<CandidateHash>) -> Vec<CoreIndex>`: Sweeps through all paras pending availability. If
the candidate hash is one of the disputed candidates, then clean up the corresponding storage for that candidate and
the commitments. Return a vector of cleaned-up core IDs.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ There are a couple of important notes to the operations in this inherent as they
this fork.
1. When disputes are initiated, we remove the block from pending availability. This allows us to roll back chains to the
block before blocks are included as opposed to backing. It's important to do this before processing bitfields.
1. `Inclusion::collect_disputed` is kind of expensive so it's important to gate this on whether there are actually any
1. `Inclusion::free_disputed` is kind of expensive so it's important to gate this on whether there are actually any
new disputes. Which should be never.
1. And we don't accept parablocks that have open disputes or disputes that have concluded against the candidate. It's
important to import dispute statements before backing, but this is already the case as disputes are imported before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ No finalization routine runs for this module.
- This clears them from `Scheduled` and marks each corresponding `core` in the `AvailabilityCores` as occupied.
- Since both the availability cores and the newly-occupied cores lists are sorted ascending, this method can be
implemented efficiently.
- `core_para(CoreIndex) -> ParaId`: return the currently-scheduled or occupied ParaId for the given core.
- `group_validators(GroupIndex) -> Option<Vec<ValidatorIndex>>`: return all validators in a given group, if the group
index is valid for this session.
- `availability_timeout_predicate() -> Option<impl Fn(CoreIndex, BlockNumber) -> bool>`: returns an optional predicate
Expand Down
109 changes: 68 additions & 41 deletions polkadot/runtime/parachains/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use sp_runtime::{
RuntimeAppPublic,
};
use sp_std::{
collections::{btree_map::BTreeMap, vec_deque::VecDeque},
collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque},
prelude::Vec,
vec,
};
Expand Down Expand Up @@ -104,6 +104,8 @@ pub(crate) struct BenchBuilder<T: paras_inherent::Config> {
code_upgrade: Option<u32>,
/// Specifies whether the claimqueue should be filled.
fill_claimqueue: bool,
/// Cores which should not be available when being populated with pending candidates.
unavailable_cores: Vec<u32>,
_phantom: sp_std::marker::PhantomData<T>,
}

Expand Down Expand Up @@ -133,6 +135,7 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
elastic_paras: Default::default(),
code_upgrade: None,
fill_claimqueue: true,
unavailable_cores: vec![],
_phantom: sp_std::marker::PhantomData::<T>,
}
}
Expand All @@ -149,6 +152,12 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
self
}

/// Set the cores which should not be available when being populated with pending candidates.
pub(crate) fn set_unavailable_cores(mut self, unavailable_cores: Vec<u32>) -> Self {
self.unavailable_cores = unavailable_cores;
self
}

/// Set a map from para id seed to number of validity votes.
pub(crate) fn set_backed_and_concluding_paras(
mut self,
Expand All @@ -159,7 +168,6 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
}

/// Set a map from para id seed to number of cores assigned to it.
#[cfg(feature = "runtime-benchmarks")]
pub(crate) fn set_elastic_paras(mut self, elastic_paras: BTreeMap<u32, u8>) -> Self {
self.elastic_paras = elastic_paras;
self
Expand Down Expand Up @@ -284,11 +292,13 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
core_idx: CoreIndex,
candidate_hash: CandidateHash,
availability_votes: BitVec<u8, BitOrderLsb0>,
commitments: CandidateCommitments,
) -> inclusion::CandidatePendingAvailability<T::Hash, BlockNumberFor<T>> {
inclusion::CandidatePendingAvailability::<T::Hash, BlockNumberFor<T>>::new(
core_idx, // core
candidate_hash, // hash
Self::candidate_descriptor_mock(), // candidate descriptor
commitments, // commitments
availability_votes, // availability votes
Default::default(), // backers
Zero::zero(), // relay parent
Expand All @@ -309,12 +319,6 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
availability_votes: BitVec<u8, BitOrderLsb0>,
candidate_hash: CandidateHash,
) {
let candidate_availability = Self::candidate_availability_mock(
group_idx,
core_idx,
candidate_hash,
availability_votes,
);
let commitments = CandidateCommitments::<u32> {
upward_messages: Default::default(),
horizontal_messages: Default::default(),
Expand All @@ -323,16 +327,29 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
processed_downward_messages: 0,
hrmp_watermark: 0u32.into(),
};
inclusion::PendingAvailability::<T>::insert(para_id, candidate_availability);
inclusion::PendingAvailabilityCommitments::<T>::insert(&para_id, commitments);
let candidate_availability = Self::candidate_availability_mock(
group_idx,
core_idx,
candidate_hash,
availability_votes,
commitments,
);
inclusion::PendingAvailability::<T>::mutate(para_id, |maybe_andidates| {
if let Some(candidates) = maybe_andidates {
candidates.push_back(candidate_availability);
} else {
*maybe_andidates =
Some([candidate_availability].into_iter().collect::<VecDeque<_>>());
}
});
}

/// Create an `AvailabilityBitfield` where `concluding` is a map where each key is a core index
/// that is concluding and `cores` is the total number of cores in the system.
fn availability_bitvec(concluding: &BTreeMap<u32, u32>, cores: usize) -> AvailabilityBitfield {
fn availability_bitvec(concluding_cores: &BTreeSet<u32>, cores: usize) -> AvailabilityBitfield {
let mut bitfields = bitvec::bitvec![u8, bitvec::order::Lsb0; 0; 0];
for i in 0..cores {
if concluding.get(&(i as u32)).is_some() {
if concluding_cores.contains(&(i as u32)) {
bitfields.push(true);
} else {
bitfields.push(false)
Expand All @@ -356,13 +373,13 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
}
}

/// Register `cores` count of parachains.
/// Register `n_paras` count of parachains.
///
/// Note that this must be called at least 2 sessions before the target session as there is a
/// n+2 session delay for the scheduled actions to take effect.
fn setup_para_ids(cores: usize) {
fn setup_para_ids(n_paras: usize) {
// make sure parachains exist prior to session change.
for i in 0..cores {
for i in 0..n_paras {
let para_id = ParaId::from(i as u32);
let validation_code = mock_validation_code();

Expand Down Expand Up @@ -472,24 +489,8 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
let validators =
self.validators.as_ref().expect("must have some validators prior to calling");

let availability_bitvec = Self::availability_bitvec(concluding_paras, total_cores);

let bitfields: Vec<UncheckedSigned<AvailabilityBitfield>> = validators
.iter()
.enumerate()
.map(|(i, public)| {
let unchecked_signed = UncheckedSigned::<AvailabilityBitfield>::benchmark_sign(
public,
availability_bitvec.clone(),
&self.signing_context(),
ValidatorIndex(i as u32),
);

unchecked_signed
})
.collect();

let mut current_core_idx = 0u32;
let mut concluding_cores = BTreeSet::new();

for (seed, _) in concluding_paras.iter() {
// make sure the candidates that will be concluding are marked as pending availability.
Expand All @@ -505,13 +506,34 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
para_id,
core_idx,
group_idx,
Self::validator_availability_votes_yes(validators.len()),
// No validators have made this candidate available yet.
bitvec::bitvec![u8, bitvec::order::Lsb0; 0; validators.len()],
CandidateHash(H256::from(byte32_slice_from(current_core_idx))),
);
if !self.unavailable_cores.contains(&current_core_idx) {
concluding_cores.insert(current_core_idx);
}
current_core_idx += 1;
}
}

let availability_bitvec = Self::availability_bitvec(&concluding_cores, total_cores);

let bitfields: Vec<UncheckedSigned<AvailabilityBitfield>> = validators
.iter()
.enumerate()
.map(|(i, public)| {
let unchecked_signed = UncheckedSigned::<AvailabilityBitfield>::benchmark_sign(
public,
availability_bitvec.clone(),
&self.signing_context(),
ValidatorIndex(i as u32),
);

unchecked_signed
})
.collect();

bitfields
}

Expand All @@ -522,7 +544,7 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
/// validity votes.
fn create_backed_candidates(
&self,
cores_with_backed_candidates: &BTreeMap<u32, u32>,
paras_with_backed_candidates: &BTreeMap<u32, u32>,
elastic_paras: &BTreeMap<u32, u8>,
includes_code_upgrade: Option<u32>,
) -> Vec<BackedCandidate<T::Hash>> {
Expand All @@ -531,7 +553,7 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
let config = configuration::Pallet::<T>::config();

let mut current_core_idx = 0u32;
cores_with_backed_candidates
paras_with_backed_candidates
.iter()
.flat_map(|(seed, num_votes)| {
assert!(*num_votes <= validators.len() as u32);
Expand Down Expand Up @@ -760,7 +782,7 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {

// NOTE: there is an n+2 session delay for these actions to take effect.
// We are currently in Session 0, so these changes will take effect in Session 2.
Self::setup_para_ids(used_cores);
Self::setup_para_ids(used_cores - extra_cores);
configuration::ActiveConfig::<T>::mutate(|c| {
c.scheduler_params.num_cores = used_cores as u32;
});
Expand All @@ -782,19 +804,19 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {

let disputes = builder.create_disputes(
builder.backed_and_concluding_paras.len() as u32,
used_cores as u32,
(used_cores - extra_cores) as u32,
builder.dispute_sessions.as_slice(),
);
let mut disputed_cores = (builder.backed_and_concluding_paras.len() as u32..
used_cores as u32)
((used_cores - extra_cores) as u32))
.into_iter()
.map(|idx| (idx, 0))
.collect::<BTreeMap<_, _>>();

let mut all_cores = builder.backed_and_concluding_paras.clone();
all_cores.append(&mut disputed_cores);

assert_eq!(inclusion::PendingAvailability::<T>::iter().count(), used_cores as usize,);
assert_eq!(inclusion::PendingAvailability::<T>::iter().count(), used_cores - extra_cores);

// Mark all the used cores as occupied. We expect that there are
// `backed_and_concluding_paras` that are pending availability and that there are
Expand Down Expand Up @@ -831,7 +853,7 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
.keys()
.flat_map(|para_id| {
(0..elastic_paras.get(&para_id).cloned().unwrap_or(1))
.map(|_para_local_core_idx| {
.filter_map(|_para_local_core_idx| {
let ttl = configuration::Pallet::<T>::config().scheduler_params.ttl;
// Load an assignment into provider so that one is present to pop
let assignment =
Expand All @@ -844,8 +866,13 @@ impl<T: paras_inherent::Config> BenchBuilder<T> {
CoreIndex(core_idx),
[ParasEntry::new(assignment, now + ttl)].into(),
);
let res = if builder.unavailable_cores.contains(&core_idx) {
None
} else {
Some(entry)
};
core_idx += 1;
entry
res
})
.collect::<Vec<(CoreIndex, VecDeque<ParasEntry<_>>)>>()
})
Expand Down
Loading

0 comments on commit 82aafe7

Please sign in to comment.