From 48365ffdd5e6b627e6f4320d61f6773eea1ec7fa Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 30 May 2023 06:15:56 +0000 Subject: [PATCH] Shift subnet backbone structure (attnets revamp) (#4304) This PR address the following spec change: https://github.com/ethereum/consensus-specs/pull/3312 Instead of subscribing to a long-lived subnet for every attached validator to a beacon node, all beacon nodes will subscribe to `SUBNETS_PER_NODE` long-lived subnets. This is currently set to 2 for mainnet. This PR does not include any scoring or advanced discovery mechanisms. A future PR will improve discovery and we can implement scoring after the next hard fork when we expect all client teams and all implementations to respect this spec change. This will be a significant change in the subnet network structure for consensus clients and we will likely have to monitor and tweak our peer management logic. --- beacon_node/network/Cargo.toml | 6 +- beacon_node/network/src/service.rs | 3 +- .../src/subnet_service/attestation_subnets.rs | 388 +++--------------- .../network/src/subnet_service/tests/mod.rs | 155 ++++--- .../gnosis/config.yaml | 4 + .../mainnet/config.yaml | 4 + .../prater/config.yaml | 4 + .../sepolia/config.yaml | 4 + consensus/types/src/chain_spec.rs | 38 +- consensus/types/src/config_and_preset.rs | 4 - consensus/types/src/subnet_id.rs | 93 ++++- 11 files changed, 276 insertions(+), 427 deletions(-) diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index a234165d113..c9917289944 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -46,8 +46,4 @@ derivative = "2.2.0" delay_map = "0.3.0" ethereum-types = { version = "0.14.1", optional = true } operation_pool = { path = "../operation_pool" } -execution_layer = { path = "../execution_layer" } - -[features] -deterministic_long_lived_attnets = [ "ethereum-types" ] -# default = ["deterministic_long_lived_attnets"] +execution_layer = { path = "../execution_layer" } \ No newline at end of file diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index edc1d5c2ef5..2c919233fca 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -317,8 +317,7 @@ impl NetworkService { // attestation subnet service let attestation_service = AttestationService::new( beacon_chain.clone(), - #[cfg(feature = "deterministic_long_lived_attnets")] - network_globals.local_enr().node_id().raw().into(), + network_globals.local_enr().node_id(), config, &network_log, ); diff --git a/beacon_node/network/src/subnet_service/attestation_subnets.rs b/beacon_node/network/src/subnet_service/attestation_subnets.rs index e46a52cfb21..b4f52df39d3 100644 --- a/beacon_node/network/src/subnet_service/attestation_subnets.rs +++ b/beacon_node/network/src/subnet_service/attestation_subnets.rs @@ -3,7 +3,6 @@ //! determines whether attestations should be aggregated and/or passed to the beacon node. use super::SubnetServiceMessage; -#[cfg(any(test, feature = "deterministic_long_lived_attnets"))] use std::collections::HashSet; use std::collections::{HashMap, VecDeque}; use std::pin::Pin; @@ -14,10 +13,8 @@ use std::time::Duration; use beacon_chain::{BeaconChain, BeaconChainTypes}; use delay_map::{HashMapDelay, HashSetDelay}; use futures::prelude::*; -use lighthouse_network::{NetworkConfig, Subnet, SubnetDiscovery}; -#[cfg(not(feature = "deterministic_long_lived_attnets"))] -use rand::seq::SliceRandom; -use slog::{debug, error, o, trace, warn}; +use lighthouse_network::{discv5::enr::NodeId, NetworkConfig, Subnet, SubnetDiscovery}; +use slog::{debug, error, info, o, trace, warn}; use slot_clock::SlotClock; use types::{Attestation, EthSpec, Slot, SubnetId, ValidatorSubscription}; @@ -27,10 +24,6 @@ use crate::metrics; /// slot is less than this number, skip the peer discovery process. /// Subnet discovery query takes at most 30 secs, 2 slots take 24s. pub(crate) const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2; -/// The time (in slots) before a last seen validator is considered absent and we unsubscribe from -/// the random gossip topics that we subscribed to due to the validator connection. -#[cfg(not(feature = "deterministic_long_lived_attnets"))] -const LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS: u32 = 150; /// The fraction of a slot that we subscribe to a subnet before the required slot. /// /// Currently a whole slot ahead. @@ -67,30 +60,23 @@ pub struct AttestationService { /// Subnets we are currently subscribed to as short lived subscriptions. /// /// Once they expire, we unsubscribe from these. + /// We subscribe to subnets when we are an aggregator for an exact subnet. short_lived_subscriptions: HashMapDelay, /// Subnets we are currently subscribed to as long lived subscriptions. /// /// We advertise these in our ENR. When these expire, the subnet is removed from our ENR. - #[cfg(feature = "deterministic_long_lived_attnets")] + /// These are required of all beacon nodes. The exact number is determined by the chain + /// specification. long_lived_subscriptions: HashSet, - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - long_lived_subscriptions: HashMapDelay, - /// Short lived subscriptions that need to be done in the future. + /// Short lived subscriptions that need to be executed in the future. scheduled_short_lived_subscriptions: HashSetDelay, /// A collection timeouts to track the existence of aggregate validator subscriptions at an /// `ExactSubnet`. aggregate_validators_on_subnet: Option>, - /// A collection of seen validators. These dictate how many random subnets we should be - /// subscribed to. As these time out, we unsubscribe for the required random subnets and update - /// our ENR. - /// This is a set of validator indices. - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - known_validators: HashSetDelay, - /// The waker for the current thread. waker: Option, @@ -100,16 +86,10 @@ pub struct AttestationService { /// We are always subscribed to all subnets. subscribe_all_subnets: bool, - /// For how many slots we subscribe to long lived subnets. - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - long_lived_subnet_subscription_slots: u64, - /// Our Discv5 node_id. - #[cfg(feature = "deterministic_long_lived_attnets")] - node_id: ethereum_types::U256, + node_id: NodeId, /// Future used to manage subscribing and unsubscribing from long lived subnets. - #[cfg(feature = "deterministic_long_lived_attnets")] next_long_lived_subscription_event: Pin>, /// Whether this node is a block proposer-only node. @@ -122,62 +102,22 @@ pub struct AttestationService { impl AttestationService { /* Public functions */ - #[cfg(not(feature = "deterministic_long_lived_attnets"))] + /// Establish the service based on the passed configuration. pub fn new( beacon_chain: Arc>, + node_id: NodeId, config: &NetworkConfig, log: &slog::Logger, ) -> Self { let log = log.new(o!("service" => "attestation_service")); - // Calculate the random subnet duration from the spec constants. - let spec = &beacon_chain.spec; let slot_duration = beacon_chain.slot_clock.slot_duration(); - let long_lived_subnet_subscription_slots = spec - .epochs_per_random_subnet_subscription - .saturating_mul(T::EthSpec::slots_per_epoch()); - let long_lived_subscription_duration = Duration::from_millis( - slot_duration.as_millis() as u64 * long_lived_subnet_subscription_slots, - ); - - // Panics on overflow. Ensure LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS is not too large. - let last_seen_val_timeout = slot_duration - .checked_mul(LAST_SEEN_VALIDATOR_TIMEOUT_SLOTS) - .expect("LAST_SEEN_VALIDATOR_TIMEOUT must not be ridiculously large"); - let track_validators = !config.import_all_attestations; - let aggregate_validators_on_subnet = - track_validators.then(|| HashSetDelay::new(slot_duration)); - AttestationService { - events: VecDeque::with_capacity(10), - beacon_chain, - short_lived_subscriptions: HashMapDelay::new(slot_duration), - long_lived_subscriptions: HashMapDelay::new(long_lived_subscription_duration), - scheduled_short_lived_subscriptions: HashSetDelay::default(), - aggregate_validators_on_subnet, - known_validators: HashSetDelay::new(last_seen_val_timeout), - waker: None, - discovery_disabled: config.disable_discovery, - proposer_only: config.proposer_only, - subscribe_all_subnets: config.subscribe_all_subnets, - long_lived_subnet_subscription_slots, - log, + if config.subscribe_all_subnets { + slog::info!(log, "Subscribing to all subnets"); + } else { + slog::info!(log, "Deterministic long lived subnets enabled"; "subnets_per_node" => beacon_chain.spec.subnets_per_node, "subscription_duration_in_epochs" => beacon_chain.spec.epochs_per_subnet_subscription); } - } - - #[cfg(feature = "deterministic_long_lived_attnets")] - pub fn new( - beacon_chain: Arc>, - node_id: ethereum_types::U256, - config: &NetworkConfig, - log: &slog::Logger, - ) -> Self { - let log = log.new(o!("service" => "attestation_service")); - - // Calculate the random subnet duration from the spec constants. - let slot_duration = beacon_chain.slot_clock.slot_duration(); - - slog::info!(log, "Deterministic long lived subnets enabled"; "subnets_per_node" => beacon_chain.spec.subnets_per_node); let track_validators = !config.import_all_attestations; let aggregate_validators_on_subnet = @@ -198,9 +138,15 @@ impl AttestationService { // value with a smarter timing Box::pin(tokio::time::sleep(Duration::from_secs(1))) }, + proposer_only: config.proposer_only, log, }; - service.recompute_long_lived_subnets(); + + // If we are not subscribed to all subnets, handle the deterministic set of subnets + if !config.subscribe_all_subnets { + service.recompute_long_lived_subnets(); + } + service } @@ -210,20 +156,12 @@ impl AttestationService { if self.subscribe_all_subnets { self.beacon_chain.spec.attestation_subnet_count as usize } else { - #[cfg(feature = "deterministic_long_lived_attnets")] let count = self .short_lived_subscriptions .keys() .chain(self.long_lived_subscriptions.iter()) .collect::>() .len(); - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - let count = self - .short_lived_subscriptions - .keys() - .chain(self.long_lived_subscriptions.keys()) - .collect::>() - .len(); count } } @@ -236,20 +174,20 @@ impl AttestationService { subscription_kind: SubscriptionKind, ) -> bool { match subscription_kind { - #[cfg(feature = "deterministic_long_lived_attnets")] SubscriptionKind::LongLived => self.long_lived_subscriptions.contains(subnet_id), - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - SubscriptionKind::LongLived => self.long_lived_subscriptions.contains_key(subnet_id), SubscriptionKind::ShortLived => self.short_lived_subscriptions.contains_key(subnet_id), } } + #[cfg(test)] + pub(crate) fn long_lived_subscriptions(&self) -> &HashSet { + &self.long_lived_subscriptions + } + /// Processes a list of validator subscriptions. /// /// This will: /// - Register new validators as being known. - /// - Subscribe to the required number of random subnets. - /// - Update the local ENR for new random subnets due to seeing new validators. /// - Search for peers for required subnets. /// - Request subscriptions for subnets on specific slots when required. /// - Build the timeouts for each of these events. @@ -267,18 +205,17 @@ impl AttestationService { // Maps each subnet_id subscription to it's highest slot let mut subnets_to_discover: HashMap = HashMap::new(); + + // Registers the validator with the attestation service. for subscription in subscriptions { metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_REQUESTS); - // Registers the validator with the attestation service. - // This will subscribe to long-lived random subnets if required. trace!(self.log, "Validator subscription"; "subscription" => ?subscription, ); - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - self.add_known_validator(subscription.validator_index); + // Compute the subnet that is associated with this subscription let subnet_id = match SubnetId::compute_subnet::( subscription.slot, subscription.attestation_committee_index, @@ -316,7 +253,7 @@ impl AttestationService { if subscription.is_aggregator { metrics::inc_counter(&metrics::SUBNET_SUBSCRIPTION_AGGREGATOR_REQUESTS); - if let Err(e) = self.subscribe_to_subnet(exact_subnet) { + if let Err(e) = self.subscribe_to_short_lived_subnet(exact_subnet) { warn!(self.log, "Subscription to subnet error"; "error" => e, @@ -347,14 +284,13 @@ impl AttestationService { Ok(()) } - #[cfg(feature = "deterministic_long_lived_attnets")] fn recompute_long_lived_subnets(&mut self) { // Ensure the next computation is scheduled even if assigning subnets fails. let next_subscription_event = self .recompute_long_lived_subnets_inner() .unwrap_or_else(|_| self.beacon_chain.slot_clock.slot_duration()); - debug!(self.log, "Recomputing deterministic long lived attnets"); + debug!(self.log, "Recomputing deterministic long lived subnets"); self.next_long_lived_subscription_event = Box::pin(tokio::time::sleep(next_subscription_event)); @@ -365,14 +301,13 @@ impl AttestationService { /// Gets the long lived subnets the node should be subscribed to during the current epoch and /// the remaining duration for which they remain valid. - #[cfg(feature = "deterministic_long_lived_attnets")] fn recompute_long_lived_subnets_inner(&mut self) -> Result { let current_epoch = self.beacon_chain.epoch().map_err( |e| error!(self.log, "Failed to get the current epoch from clock"; "err" => ?e), )?; let (subnets, next_subscription_epoch) = SubnetId::compute_subnets_for_epoch::( - self.node_id, + self.node_id.raw().into(), current_epoch, &self.beacon_chain.spec, ) @@ -396,17 +331,12 @@ impl AttestationService { Ok(next_subscription_event) } - #[cfg(all(test, feature = "deterministic_long_lived_attnets"))] - pub fn update_long_lived_subnets_testing(&mut self, subnets: HashSet) { - self.update_long_lived_subnets(subnets) - } - /// Updates the long lived subnets. /// /// New subnets are registered as subscribed, removed subnets as unsubscribed and the Enr /// updated accordingly. - #[cfg(feature = "deterministic_long_lived_attnets")] fn update_long_lived_subnets(&mut self, mut subnets: HashSet) { + info!(self.log, "Subscribing to long-lived subnets"; "subnets" => ?subnets.iter().collect::>()); for subnet in &subnets { // Add the events for those subnets that are new as long lived subscriptions. if !self.long_lived_subscriptions.contains(subnet) { @@ -430,28 +360,15 @@ impl AttestationService { } } - // Check for subnets that are being removed + // Update the long_lived_subnets set and check for subnets that are being removed std::mem::swap(&mut self.long_lived_subscriptions, &mut subnets); for subnet in subnets { if !self.long_lived_subscriptions.contains(&subnet) { - if !self.short_lived_subscriptions.contains_key(&subnet) { - debug!(self.log, "Unsubscribing from subnet"; "subnet" => ?subnet, "subscription_kind" => ?SubscriptionKind::LongLived); - self.queue_event(SubnetServiceMessage::Unsubscribe(Subnet::Attestation( - subnet, - ))); - } - - self.queue_event(SubnetServiceMessage::EnrRemove(Subnet::Attestation(subnet))); + self.handle_removed_subnet(subnet, SubscriptionKind::LongLived); } } } - /// Overwrites the long lived subscriptions for testing. - #[cfg(all(test, feature = "deterministic_long_lived_attnets"))] - pub fn set_long_lived_subscriptions(&mut self, subnets: HashSet) { - self.long_lived_subscriptions = subnets - } - /// Checks if we have subscribed aggregate validators for the subnet. If not, checks the gossip /// verification, re-propagates and returns false. pub fn should_process_attestation( @@ -535,7 +452,7 @@ impl AttestationService { } // Subscribes to the subnet if it should be done immediately, or schedules it if required. - fn subscribe_to_subnet( + fn subscribe_to_short_lived_subnet( &mut self, ExactSubnet { subnet_id, slot }: ExactSubnet, ) -> Result<(), &'static str> { @@ -564,12 +481,7 @@ impl AttestationService { // immediately. if time_to_subscription_start.is_zero() { // This is a current or past slot, we subscribe immediately. - self.subscribe_to_subnet_immediately( - subnet_id, - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - SubscriptionKind::ShortLived, - slot + 1, - )?; + self.subscribe_to_short_lived_subnet_immediately(subnet_id, slot + 1)?; } else { // This is a future slot, schedule subscribing. trace!(self.log, "Scheduling subnet subscription"; "subnet" => ?subnet_id, "time_to_subscription_start" => ?time_to_subscription_start); @@ -580,79 +492,6 @@ impl AttestationService { Ok(()) } - /// Updates the `known_validators` mapping and subscribes to long lived subnets if required. - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - fn add_known_validator(&mut self, validator_index: u64) { - let previously_known = self.known_validators.contains_key(&validator_index); - // Add the new validator or update the current timeout for a known validator. - self.known_validators.insert(validator_index); - if !previously_known { - // New validator has subscribed. - // Subscribe to random topics and update the ENR if needed. - self.subscribe_to_random_subnets(); - } - } - - /// Subscribe to long-lived random subnets and update the local ENR bitfield. - /// The number of subnets to subscribe depends on the number of active validators and number of - /// current subscriptions. - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - fn subscribe_to_random_subnets(&mut self) { - if self.subscribe_all_subnets { - // This case is not handled by this service. - return; - } - - let max_subnets = self.beacon_chain.spec.attestation_subnet_count; - // Calculate how many subnets we need, - let required_long_lived_subnets = { - let subnets_for_validators = self - .known_validators - .len() - .saturating_mul(self.beacon_chain.spec.random_subnets_per_validator as usize); - subnets_for_validators // How many subnets we need - .min(max_subnets as usize) // Capped by the max - .saturating_sub(self.long_lived_subscriptions.len()) // Minus those we have - }; - - if required_long_lived_subnets == 0 { - // Nothing to do. - return; - } - - // Build a list of the subnets that we are not currently advertising. - let available_subnets = (0..max_subnets) - .map(SubnetId::new) - .filter(|subnet_id| !self.long_lived_subscriptions.contains_key(subnet_id)) - .collect::>(); - - let subnets_to_subscribe: Vec<_> = available_subnets - .choose_multiple(&mut rand::thread_rng(), required_long_lived_subnets) - .cloned() - .collect(); - - // Calculate in which slot does this subscription end. - let end_slot = match self.beacon_chain.slot_clock.now() { - Some(slot) => slot + self.long_lived_subnet_subscription_slots, - None => { - return debug!( - self.log, - "Failed to calculate end slot of long lived subnet subscriptions." - ) - } - }; - - for subnet_id in &subnets_to_subscribe { - if let Err(e) = self.subscribe_to_subnet_immediately( - *subnet_id, - SubscriptionKind::LongLived, - end_slot, - ) { - debug!(self.log, "Failed to subscribe to long lived subnet"; "subnet" => ?subnet_id, "err" => e); - } - } - } - /* A collection of functions that handle the various timeouts */ /// Registers a subnet as subscribed. @@ -662,11 +501,9 @@ impl AttestationService { /// out the appropriate events. /// /// On determinist long lived subnets, this is only used for short lived subscriptions. - fn subscribe_to_subnet_immediately( + fn subscribe_to_short_lived_subnet_immediately( &mut self, subnet_id: SubnetId, - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - subscription_kind: SubscriptionKind, end_slot: Slot, ) -> Result<(), &'static str> { if self.subscribe_all_subnets { @@ -685,25 +522,12 @@ impl AttestationService { return Err("Time when subscription would end has already passed."); } - #[cfg(feature = "deterministic_long_lived_attnets")] let subscription_kind = SubscriptionKind::ShortLived; // We need to check and add a subscription for the right kind, regardless of the presence // of the subnet as a subscription of the other kind. This is mainly since long lived // subscriptions can be removed at any time when a validator goes offline. - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - let (subscriptions, already_subscribed_as_other_kind) = match subscription_kind { - SubscriptionKind::ShortLived => ( - &mut self.short_lived_subscriptions, - self.long_lived_subscriptions.contains_key(&subnet_id), - ), - SubscriptionKind::LongLived => ( - &mut self.long_lived_subscriptions, - self.short_lived_subscriptions.contains_key(&subnet_id), - ), - }; - #[cfg(feature = "deterministic_long_lived_attnets")] let (subscriptions, already_subscribed_as_other_kind) = ( &mut self.short_lived_subscriptions, self.long_lived_subscriptions.contains(&subnet_id), @@ -738,57 +562,19 @@ impl AttestationService { subnet_id, ))); } - - // If this is a new long lived subscription, send out the appropriate events. - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - if SubscriptionKind::LongLived == subscription_kind { - let subnet = Subnet::Attestation(subnet_id); - // Advertise this subnet in our ENR. - self.long_lived_subscriptions.insert_at( - subnet_id, - end_slot, - time_to_subscription_end, - ); - self.queue_event(SubnetServiceMessage::EnrAdd(subnet)); - - if !self.discovery_disabled { - self.queue_event(SubnetServiceMessage::DiscoverPeers(vec![ - SubnetDiscovery { - subnet, - min_ttl: None, - }, - ])) - } - } } } Ok(()) } - /// A random subnet has expired. - /// - /// This function selects a new subnet to join, or extends the expiry if there are no more - /// available subnets to choose from. - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - fn handle_random_subnet_expiry(&mut self, subnet_id: SubnetId) { - self.handle_removed_subnet(subnet_id, SubscriptionKind::LongLived); - - // Remove the ENR bitfield bit and choose a new random on from the available subnets - // Subscribe to a new random subnet. - self.subscribe_to_random_subnets(); - } - // Unsubscribes from a subnet that was removed if it does not continue to exist as a // subscription of the other kind. For long lived subscriptions, it also removes the // advertisement from our ENR. fn handle_removed_subnet(&mut self, subnet_id: SubnetId, subscription_kind: SubscriptionKind) { let exists_in_other_subscriptions = match subscription_kind { SubscriptionKind::LongLived => self.short_lived_subscriptions.contains_key(&subnet_id), - #[cfg(feature = "deterministic_long_lived_attnets")] SubscriptionKind::ShortLived => self.long_lived_subscriptions.contains(&subnet_id), - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - SubscriptionKind::ShortLived => self.long_lived_subscriptions.contains_key(&subnet_id), }; if !exists_in_other_subscriptions { @@ -806,48 +592,6 @@ impl AttestationService { ))); } } - - /// A known validator has not sent a subscription in a while. They are considered offline and the - /// beacon node no longer needs to be subscribed to the allocated random subnets. - /// - /// We don't keep track of a specific validator to random subnet, rather the ratio of active - /// validators to random subnets. So when a validator goes offline, we can simply remove the - /// allocated amount of random subnets. - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - fn handle_known_validator_expiry(&mut self) { - // Calculate how many subnets should we remove. - let extra_subnet_count = { - let max_subnets = self.beacon_chain.spec.attestation_subnet_count; - let subnets_for_validators = self - .known_validators - .len() - .saturating_mul(self.beacon_chain.spec.random_subnets_per_validator as usize) - .min(max_subnets as usize); - - self.long_lived_subscriptions - .len() - .saturating_sub(subnets_for_validators) - }; - - if extra_subnet_count == 0 { - // Nothing to do - return; - } - - let advertised_subnets = self - .long_lived_subscriptions - .keys() - .cloned() - .collect::>(); - let to_remove_subnets = advertised_subnets - .choose_multiple(&mut rand::thread_rng(), extra_subnet_count) - .cloned(); - - for subnet_id in to_remove_subnets { - self.long_lived_subscriptions.remove(&subnet_id); - self.handle_removed_subnet(subnet_id, SubscriptionKind::LongLived); - } - } } impl Stream for AttestationService { @@ -868,37 +612,34 @@ impl Stream for AttestationService { return Poll::Ready(Some(event)); } - // Process first any known validator expiries, since these affect how many long lived - // subnets we need. - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - match self.known_validators.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(_validator_index))) => { - self.handle_known_validator_expiry(); - } - Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for random subnet cycles"; "error"=> e); + // If we aren't subscribed to all subnets, handle the deterministic long-lived subnets + if !self.subscribe_all_subnets { + match self.next_long_lived_subscription_event.as_mut().poll(cx) { + Poll::Ready(_) => { + self.recompute_long_lived_subnets(); + // We re-wake the task as there could be other subscriptions to process + self.waker + .as_ref() + .expect("Waker has been set") + .wake_by_ref(); + } + Poll::Pending => {} } - Poll::Ready(None) | Poll::Pending => {} - } - - #[cfg(feature = "deterministic_long_lived_attnets")] - match self.next_long_lived_subscription_event.as_mut().poll(cx) { - Poll::Ready(_) => self.recompute_long_lived_subnets(), - Poll::Pending => {} } // Process scheduled subscriptions that might be ready, since those can extend a soon to // expire subscription. match self.scheduled_short_lived_subscriptions.poll_next_unpin(cx) { Poll::Ready(Some(Ok(ExactSubnet { subnet_id, slot }))) => { - if let Err(e) = self.subscribe_to_subnet_immediately( - subnet_id, - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - SubscriptionKind::ShortLived, - slot + 1, - ) { + if let Err(e) = + self.subscribe_to_short_lived_subnet_immediately(subnet_id, slot + 1) + { debug!(self.log, "Failed to subscribe to short lived subnet"; "subnet" => ?subnet_id, "err" => e); } + self.waker + .as_ref() + .expect("Waker has been set") + .wake_by_ref(); } Poll::Ready(Some(Err(e))) => { error!(self.log, "Failed to check for scheduled subnet subscriptions"; "error"=> e); @@ -910,6 +651,11 @@ impl Stream for AttestationService { match self.short_lived_subscriptions.poll_next_unpin(cx) { Poll::Ready(Some(Ok((subnet_id, _end_slot)))) => { self.handle_removed_subnet(subnet_id, SubscriptionKind::ShortLived); + // We re-wake the task as there could be other subscriptions to process + self.waker + .as_ref() + .expect("Waker has been set") + .wake_by_ref(); } Poll::Ready(Some(Err(e))) => { error!(self.log, "Failed to check for subnet unsubscription times"; "error"=> e); @@ -917,18 +663,6 @@ impl Stream for AttestationService { Poll::Ready(None) | Poll::Pending => {} } - // Process any random subnet expiries. - #[cfg(not(feature = "deterministic_long_lived_attnets"))] - match self.long_lived_subscriptions.poll_next_unpin(cx) { - Poll::Ready(Some(Ok((subnet_id, _end_slot)))) => { - self.handle_random_subnet_expiry(subnet_id) - } - Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for random subnet cycles"; "error"=> e); - } - Poll::Ready(None) | Poll::Pending => {} - } - // Poll to remove entries on expiration, no need to act on expiration events. if let Some(tracked_vals) = self.aggregate_validators_on_subnet.as_mut() { if let Poll::Ready(Some(Err(e))) = tracked_vals.poll_next_unpin(cx) { diff --git a/beacon_node/network/src/subnet_service/tests/mod.rs b/beacon_node/network/src/subnet_service/tests/mod.rs index a407fe1bcf8..3b8c89a442e 100644 --- a/beacon_node/network/src/subnet_service/tests/mod.rs +++ b/beacon_node/network/src/subnet_service/tests/mod.rs @@ -126,10 +126,7 @@ fn get_attestation_service( AttestationService::new( beacon_chain, - #[cfg(feature = "deterministic_long_lived_attnets")] - lighthouse_network::discv5::enr::NodeId::random() - .raw() - .into(), + lighthouse_network::discv5::enr::NodeId::random(), &config, &log, ) @@ -179,9 +176,6 @@ async fn get_events + Unpin>( mod attestation_service { - #[cfg(feature = "deterministic_long_lived_attnets")] - use std::collections::HashSet; - #[cfg(not(windows))] use crate::subnet_service::attestation_subnets::MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD; @@ -192,8 +186,8 @@ mod attestation_service { attestation_committee_index: CommitteeIndex, slot: Slot, committee_count_at_slot: u64, + is_aggregator: bool, ) -> ValidatorSubscription { - let is_aggregator = true; ValidatorSubscription { validator_index, attestation_committee_index, @@ -203,11 +197,11 @@ mod attestation_service { } } - #[cfg(not(feature = "deterministic_long_lived_attnets"))] fn get_subscriptions( validator_count: u64, slot: Slot, committee_count_at_slot: u64, + is_aggregator: bool, ) -> Vec { (0..validator_count) .map(|validator_index| { @@ -216,6 +210,7 @@ mod attestation_service { validator_index, slot, committee_count_at_slot, + is_aggregator, ) }) .collect() @@ -229,6 +224,7 @@ mod attestation_service { // Keep a low subscription slot so that there are no additional subnet discovery events. let subscription_slot = 0; let committee_count = 1; + let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize; // create the attestation service and subscriptions let mut attestation_service = get_attestation_service(None); @@ -243,6 +239,7 @@ mod attestation_service { committee_index, current_slot + Slot::new(subscription_slot), committee_count, + true, )]; // submit the subscriptions @@ -266,16 +263,19 @@ mod attestation_service { // Wait for 1 slot duration to get the unsubscription event let events = get_events( &mut attestation_service, - Some(5), + Some(subnets_per_node * 3 + 2), (MainnetEthSpec::slots_per_epoch() * 3) as u32, ) .await; matches::assert_matches!( - events[..3], + events[..6], [ SubnetServiceMessage::Subscribe(_any1), SubnetServiceMessage::EnrAdd(_any3), SubnetServiceMessage::DiscoverPeers(_), + SubnetServiceMessage::Subscribe(_), + SubnetServiceMessage::EnrAdd(_), + SubnetServiceMessage::DiscoverPeers(_), ] ); @@ -284,10 +284,10 @@ mod attestation_service { if !attestation_service .is_subscribed(&subnet_id, attestation_subnets::SubscriptionKind::LongLived) { - assert_eq!(expected[..], events[3..]); + assert_eq!(expected[..], events[subnets_per_node * 3..]); } - // Should be subscribed to only 1 long lived subnet after unsubscription. - assert_eq!(attestation_service.subscription_count(), 1); + // Should be subscribed to only subnets_per_node long lived subnet after unsubscription. + assert_eq!(attestation_service.subscription_count(), subnets_per_node); } /// Test to verify that we are not unsubscribing to a subnet before a required subscription. @@ -297,6 +297,7 @@ mod attestation_service { // subscription config let validator_index = 1; let committee_count = 1; + let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize; // Makes 2 validator subscriptions to the same subnet but at different slots. // There should be just 1 unsubscription event for the later slot subscription (subscription_slot2). @@ -318,6 +319,7 @@ mod attestation_service { com1, current_slot + Slot::new(subscription_slot1), committee_count, + true, ); let sub2 = get_subscription( @@ -325,6 +327,7 @@ mod attestation_service { com2, current_slot + Slot::new(subscription_slot2), committee_count, + true, ); let subnet_id1 = SubnetId::compute_subnet::( @@ -366,16 +369,22 @@ mod attestation_service { let expected = SubnetServiceMessage::Subscribe(Subnet::Attestation(subnet_id1)); - // Should be still subscribed to 1 long lived and 1 short lived subnet if both are + // Should be still subscribed to 2 long lived and up to 1 short lived subnet if both are // different. if !attestation_service.is_subscribed( &subnet_id1, attestation_subnets::SubscriptionKind::LongLived, ) { - assert_eq!(expected, events[3]); - assert_eq!(attestation_service.subscription_count(), 2); + // The index is 3*subnets_per_node (because we subscribe + discover + enr per long lived + // subnet) + 1 + let index = 3 * subnets_per_node; + assert_eq!(expected, events[index]); + assert_eq!( + attestation_service.subscription_count(), + subnets_per_node + 1 + ); } else { - assert_eq!(attestation_service.subscription_count(), 1); + assert!(attestation_service.subscription_count() == subnets_per_node); } // Get event for 1 more slot duration, we should get the unsubscribe event now. @@ -395,17 +404,17 @@ mod attestation_service { ); } - // Should be subscribed to only 1 long lived subnet after unsubscription. - assert_eq!(attestation_service.subscription_count(), 1); + // Should be subscribed 2 long lived subnet after unsubscription. + assert_eq!(attestation_service.subscription_count(), subnets_per_node); } - #[cfg(not(feature = "deterministic_long_lived_attnets"))] #[tokio::test] - async fn subscribe_all_random_subnets() { + async fn subscribe_all_subnets() { let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count; - let subscription_slot = 10; + let subscription_slot = 3; let subscription_count = attestation_subnet_count; let committee_count = 1; + let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize; // create the attestation service and subscriptions let mut attestation_service = get_attestation_service(None); @@ -419,6 +428,7 @@ mod attestation_service { subscription_count, current_slot + subscription_slot, committee_count, + true, ); // submit the subscriptions @@ -426,42 +436,52 @@ mod attestation_service { .validator_subscriptions(subscriptions) .unwrap(); - let events = get_events(&mut attestation_service, None, 3).await; + let events = get_events(&mut attestation_service, Some(131), 10).await; let mut discover_peer_count = 0; let mut enr_add_count = 0; let mut unexpected_msg_count = 0; + let mut unsubscribe_event_count = 0; for event in &events { match event { SubnetServiceMessage::DiscoverPeers(_) => discover_peer_count += 1, SubnetServiceMessage::Subscribe(_any_subnet) => {} SubnetServiceMessage::EnrAdd(_any_subnet) => enr_add_count += 1, + SubnetServiceMessage::Unsubscribe(_) => unsubscribe_event_count += 1, _ => unexpected_msg_count += 1, } } + // There should be a Subscribe Event, and Enr Add event and a DiscoverPeers event for each + // long-lived subnet initially. The next event should be a bulk discovery event. + let bulk_discovery_index = 3 * subnets_per_node; // The bulk discovery request length should be equal to validator_count - let bulk_discovery_event = events.last().unwrap(); + let bulk_discovery_event = &events[bulk_discovery_index]; if let SubnetServiceMessage::DiscoverPeers(d) = bulk_discovery_event { assert_eq!(d.len(), attestation_subnet_count as usize); } else { panic!("Unexpected event {:?}", bulk_discovery_event); } - // 64 `DiscoverPeer` requests of length 1 corresponding to random subnets + // 64 `DiscoverPeer` requests of length 1 corresponding to deterministic subnets // and 1 `DiscoverPeer` request corresponding to bulk subnet discovery. - assert_eq!(discover_peer_count, subscription_count + 1); - assert_eq!(attestation_service.subscription_count(), 64); - assert_eq!(enr_add_count, 64); + assert_eq!(discover_peer_count, subnets_per_node + 1); + assert_eq!(attestation_service.subscription_count(), subnets_per_node); + assert_eq!(enr_add_count, subnets_per_node); + assert_eq!( + unsubscribe_event_count, + attestation_subnet_count - subnets_per_node as u64 + ); assert_eq!(unexpected_msg_count, 0); // test completed successfully } - #[cfg(not(feature = "deterministic_long_lived_attnets"))] #[tokio::test] - async fn subscribe_all_random_subnets_plus_one() { + async fn subscribe_correct_number_of_subnets() { let attestation_subnet_count = MainnetEthSpec::default_spec().attestation_subnet_count; let subscription_slot = 10; + let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize; + // the 65th subscription should result in no more messages than the previous scenario let subscription_count = attestation_subnet_count + 1; let committee_count = 1; @@ -478,6 +498,7 @@ mod attestation_service { subscription_count, current_slot + subscription_slot, committee_count, + true, ); // submit the subscriptions @@ -506,12 +527,12 @@ mod attestation_service { } else { panic!("Unexpected event {:?}", bulk_discovery_event); } - // 64 `DiscoverPeer` requests of length 1 corresponding to random subnets + // subnets_per_node `DiscoverPeer` requests of length 1 corresponding to long-lived subnets // and 1 `DiscoverPeer` request corresponding to the bulk subnet discovery. - // For the 65th subscription, the call to `subscribe_to_random_subnets` is not made because we are at capacity. - assert_eq!(discover_peer_count, 64 + 1); - assert_eq!(attestation_service.subscription_count(), 64); - assert_eq!(enr_add_count, 64); + + assert_eq!(discover_peer_count, subnets_per_node + 1); + assert_eq!(attestation_service.subscription_count(), subnets_per_node); + assert_eq!(enr_add_count, subnets_per_node); assert_eq!(unexpected_msg_count, 0); } @@ -521,6 +542,7 @@ mod attestation_service { // subscription config let validator_index = 1; let committee_count = 1; + let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize; // Makes 2 validator subscriptions to the same subnet but at different slots. // There should be just 1 unsubscription event for the later slot subscription (subscription_slot2). @@ -542,6 +564,7 @@ mod attestation_service { com1, current_slot + Slot::new(subscription_slot1), committee_count, + true, ); let sub2 = get_subscription( @@ -549,6 +572,7 @@ mod attestation_service { com2, current_slot + Slot::new(subscription_slot2), committee_count, + true, ); let subnet_id1 = SubnetId::compute_subnet::( @@ -596,11 +620,10 @@ mod attestation_service { &subnet_id1, attestation_subnets::SubscriptionKind::LongLived, ) { - assert_eq!(expected_subscription, events[3]); - // fourth is a discovery event - assert_eq!(expected_unsubscription, events[5]); + assert_eq!(expected_subscription, events[subnets_per_node * 3]); + assert_eq!(expected_unsubscription, events[subnets_per_node * 3 + 2]); } - assert_eq!(attestation_service.subscription_count(), 1); + assert_eq!(attestation_service.subscription_count(), 2); println!("{events:?}"); let subscription_slot = current_slot + subscription_slot2 - 1; // one less do to the @@ -633,40 +656,44 @@ mod attestation_service { } #[tokio::test] - #[cfg(feature = "deterministic_long_lived_attnets")] async fn test_update_deterministic_long_lived_subnets() { let mut attestation_service = get_attestation_service(None); - let new_subnet = SubnetId::new(1); - let maintained_subnet = SubnetId::new(2); - let removed_subnet = SubnetId::new(3); + let subnets_per_node = MainnetEthSpec::default_spec().subnets_per_node as usize; - attestation_service - .set_long_lived_subscriptions(HashSet::from([removed_subnet, maintained_subnet])); - // clear initial events - let _events = get_events(&mut attestation_service, None, 1).await; + let current_slot = attestation_service + .beacon_chain + .slot_clock + .now() + .expect("Could not get current slot"); + let subscriptions = get_subscriptions(20, current_slot, 30, false); + + // submit the subscriptions attestation_service - .update_long_lived_subnets_testing(HashSet::from([maintained_subnet, new_subnet])); + .validator_subscriptions(subscriptions) + .unwrap(); - let events = get_events(&mut attestation_service, None, 1).await; - let new_subnet = Subnet::Attestation(new_subnet); - let removed_subnet = Subnet::Attestation(removed_subnet); + // There should only be the same subscriptions as there are in the specification, + // regardless of subscriptions assert_eq!( - events, + attestation_service.long_lived_subscriptions().len(), + subnets_per_node + ); + + let events = get_events(&mut attestation_service, None, 4).await; + + // Check that we attempt to subscribe and register ENRs + matches::assert_matches!( + events[..6], [ - // events for the new subnet - SubnetServiceMessage::Subscribe(new_subnet), - SubnetServiceMessage::EnrAdd(new_subnet), - SubnetServiceMessage::DiscoverPeers(vec![SubnetDiscovery { - subnet: new_subnet, - min_ttl: None - }]), - // events for the removed subnet - SubnetServiceMessage::Unsubscribe(removed_subnet), - SubnetServiceMessage::EnrRemove(removed_subnet), + SubnetServiceMessage::Subscribe(_), + SubnetServiceMessage::EnrAdd(_), + SubnetServiceMessage::DiscoverPeers(_), + SubnetServiceMessage::Subscribe(_), + SubnetServiceMessage::EnrAdd(_), + SubnetServiceMessage::DiscoverPeers(_), ] ); - println!("{events:?}") } } diff --git a/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml b/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml index ca1d1e88a86..95ca9d01080 100644 --- a/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/gnosis/config.yaml @@ -86,3 +86,7 @@ PROPOSER_SCORE_BOOST: 40 DEPOSIT_CHAIN_ID: 100 DEPOSIT_NETWORK_ID: 100 DEPOSIT_CONTRACT_ADDRESS: 0x0B98057eA310F4d31F2a452B414647007d1645d9 + +# Network +# --------------------------------------------------------------- +SUBNETS_PER_NODE: 4 \ No newline at end of file diff --git a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml index 0bbf873a3fb..7b26b30a6ce 100644 --- a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml @@ -86,3 +86,7 @@ PROPOSER_SCORE_BOOST: 40 DEPOSIT_CHAIN_ID: 1 DEPOSIT_NETWORK_ID: 1 DEPOSIT_CONTRACT_ADDRESS: 0x00000000219ab540356cBB839Cbe05303d7705Fa + +# Network +# --------------------------------------------------------------- +SUBNETS_PER_NODE: 2 \ No newline at end of file diff --git a/common/eth2_network_config/built_in_network_configs/prater/config.yaml b/common/eth2_network_config/built_in_network_configs/prater/config.yaml index 69d65ca8fc8..63b3d45db9a 100644 --- a/common/eth2_network_config/built_in_network_configs/prater/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/prater/config.yaml @@ -86,3 +86,7 @@ DEPOSIT_CHAIN_ID: 5 DEPOSIT_NETWORK_ID: 5 # Prater test deposit contract on Goerli Testnet DEPOSIT_CONTRACT_ADDRESS: 0xff50ed3d0ec03aC01D4C79aAd74928BFF48a7b2b + +# Network +# --------------------------------------------------------------- +SUBNETS_PER_NODE: 2 \ No newline at end of file diff --git a/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml b/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml index 29465728995..8489f085f4c 100644 --- a/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/sepolia/config.yaml @@ -74,3 +74,7 @@ PROPOSER_SCORE_BOOST: 40 DEPOSIT_CHAIN_ID: 11155111 DEPOSIT_NETWORK_ID: 11155111 DEPOSIT_CONTRACT_ADDRESS: 0x7f02C3E3c98b133055B8B348B2Ac625669Ed295D + +# Network +# --------------------------------------------------------------- +SUBNETS_PER_NODE: 2 \ No newline at end of file diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 2b25cc1d593..5253dcc4b03 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -168,11 +168,9 @@ pub struct ChainSpec { pub maximum_gossip_clock_disparity_millis: u64, pub target_aggregators_per_committee: u64, pub attestation_subnet_count: u64, - pub random_subnets_per_validator: u64, - pub epochs_per_random_subnet_subscription: u64, pub subnets_per_node: u8, pub epochs_per_subnet_subscription: u64, - attestation_subnet_extra_bits: u8, + pub attestation_subnet_extra_bits: u8, /* * Application params @@ -455,17 +453,7 @@ impl ChainSpec { #[allow(clippy::integer_arithmetic)] pub const fn attestation_subnet_prefix_bits(&self) -> u32 { - // maybe use log2 when stable https://github.com/rust-lang/rust/issues/70887 - - // NOTE: this line is here simply to guarantee that if self.attestation_subnet_count type - // is changed, a compiler warning will be raised. This code depends on the type being u64. - let attestation_subnet_count: u64 = self.attestation_subnet_count; - let attestation_subnet_count_bits = if attestation_subnet_count == 0 { - 0 - } else { - 63 - attestation_subnet_count.leading_zeros() - }; - + let attestation_subnet_count_bits = self.attestation_subnet_count.ilog2(); self.attestation_subnet_extra_bits as u32 + attestation_subnet_count_bits } @@ -625,13 +613,11 @@ impl ChainSpec { network_id: 1, // mainnet network id attestation_propagation_slot_range: 32, attestation_subnet_count: 64, - random_subnets_per_validator: 1, - subnets_per_node: 1, + subnets_per_node: 2, maximum_gossip_clock_disparity_millis: 500, target_aggregators_per_committee: 16, - epochs_per_random_subnet_subscription: 256, epochs_per_subnet_subscription: 256, - attestation_subnet_extra_bits: 6, + attestation_subnet_extra_bits: 0, /* * Application specific @@ -852,13 +838,11 @@ impl ChainSpec { network_id: 100, // Gnosis Chain network id attestation_propagation_slot_range: 32, attestation_subnet_count: 64, - random_subnets_per_validator: 1, - subnets_per_node: 1, + subnets_per_node: 4, // Make this larger than usual to avoid network damage maximum_gossip_clock_disparity_millis: 500, target_aggregators_per_committee: 16, - epochs_per_random_subnet_subscription: 256, epochs_per_subnet_subscription: 256, - attestation_subnet_extra_bits: 6, + attestation_subnet_extra_bits: 0, /* * Application specific @@ -946,6 +930,9 @@ pub struct Config { shard_committee_period: u64, #[serde(with = "serde_utils::quoted_u64")] eth1_follow_distance: u64, + #[serde(default = "default_subnets_per_node")] + #[serde(with = "serde_utils::quoted_u8")] + subnets_per_node: u8, #[serde(with = "serde_utils::quoted_u64")] inactivity_score_bias: u64, @@ -1002,6 +989,10 @@ fn default_safe_slots_to_import_optimistically() -> u64 { 128u64 } +fn default_subnets_per_node() -> u8 { + 2u8 +} + impl Default for Config { fn default() -> Self { let chain_spec = MainnetEthSpec::default_spec(); @@ -1084,6 +1075,7 @@ impl Config { min_validator_withdrawability_delay: spec.min_validator_withdrawability_delay, shard_committee_period: spec.shard_committee_period, eth1_follow_distance: spec.eth1_follow_distance, + subnets_per_node: spec.subnets_per_node, inactivity_score_bias: spec.inactivity_score_bias, inactivity_score_recovery_rate: spec.inactivity_score_recovery_rate, @@ -1130,6 +1122,7 @@ impl Config { min_validator_withdrawability_delay, shard_committee_period, eth1_follow_distance, + subnets_per_node, inactivity_score_bias, inactivity_score_recovery_rate, ejection_balance, @@ -1162,6 +1155,7 @@ impl Config { min_validator_withdrawability_delay, shard_committee_period, eth1_follow_distance, + subnets_per_node, inactivity_score_bias, inactivity_score_recovery_rate, ejection_balance, diff --git a/consensus/types/src/config_and_preset.rs b/consensus/types/src/config_and_preset.rs index b10ad7557b9..01f86d3480a 100644 --- a/consensus/types/src/config_and_preset.rs +++ b/consensus/types/src/config_and_preset.rs @@ -86,10 +86,6 @@ pub fn get_extra_fields(spec: &ChainSpec) -> HashMap { "domain_application_mask".to_uppercase()=> u32_hex(spec.domain_application_mask), "target_aggregators_per_committee".to_uppercase() => spec.target_aggregators_per_committee.to_string().into(), - "random_subnets_per_validator".to_uppercase() => - spec.random_subnets_per_validator.to_string().into(), - "epochs_per_random_subnet_subscription".to_uppercase() => - spec.epochs_per_random_subnet_subscription.to_string().into(), "domain_contribution_and_proof".to_uppercase() => u32_hex(spec.domain_contribution_and_proof), "domain_sync_committee".to_uppercase() => u32_hex(spec.domain_sync_committee), diff --git a/consensus/types/src/subnet_id.rs b/consensus/types/src/subnet_id.rs index b885f89f7d4..6793fe55740 100644 --- a/consensus/types/src/subnet_id.rs +++ b/consensus/types/src/subnet_id.rs @@ -80,15 +80,26 @@ impl SubnetId { epoch: Epoch, spec: &ChainSpec, ) -> Result<(impl Iterator, Epoch), &'static str> { + // Simplify the variable name + let subscription_duration = spec.epochs_per_subnet_subscription; + let node_id_prefix = (node_id >> (256 - spec.attestation_subnet_prefix_bits() as usize)).as_usize(); - let subscription_event_idx = epoch.as_u64() / spec.epochs_per_subnet_subscription; + // NOTE: The as_u64() panics if the number is larger than u64::max_value(). This cannot be + // true as spec.epochs_per_subnet_subscription is a u64. + let node_offset = (node_id % ethereum_types::U256::from(subscription_duration)).as_u64(); + + // Calculate at which epoch this node needs to re-evaluate + let valid_until_epoch = epoch.as_u64() + + subscription_duration + .saturating_sub((epoch.as_u64() + node_offset) % subscription_duration); + + let subscription_event_idx = (epoch.as_u64() + node_offset) / subscription_duration; let permutation_seed = ethereum_hashing::hash(&int_to_bytes::int_to_bytes8(subscription_event_idx)); let num_subnets = 1 << spec.attestation_subnet_prefix_bits(); - let permutated_prefix = compute_shuffled_index( node_id_prefix, num_subnets, @@ -107,7 +118,6 @@ impl SubnetId { let subnet_set_generator = (0..subnets_per_node).map(move |idx| { SubnetId::new((permutated_prefix + idx as u64) % attestation_subnet_count) }); - let valid_until_epoch = (subscription_event_idx + 1) * spec.epochs_per_subnet_subscription; Ok((subnet_set_generator, valid_until_epoch.into())) } } @@ -149,3 +159,80 @@ impl AsRef for SubnetId { subnet_id_to_string(self.0) } } + +#[cfg(test)] +mod tests { + use super::*; + + /// A set of tests compared to the python specification + #[test] + fn compute_subnets_for_epoch_unit_test() { + // Randomized variables used generated with the python specification + let node_ids = [ + "0", + "88752428858350697756262172400162263450541348766581994718383409852729519486397", + "18732750322395381632951253735273868184515463718109267674920115648614659369468", + "27726842142488109545414954493849224833670205008410190955613662332153332462900", + "39755236029158558527862903296867805548949739810920318269566095185775868999998", + "31899136003441886988955119620035330314647133604576220223892254902004850516297", + "58579998103852084482416614330746509727562027284701078483890722833654510444626", + "28248042035542126088870192155378394518950310811868093527036637864276176517397", + "60930578857433095740782970114409273483106482059893286066493409689627770333527", + "103822458477361691467064888613019442068586830412598673713899771287914656699997", + ] + .into_iter() + .map(|v| ethereum_types::U256::from_dec_str(v).unwrap()) + .collect::>(); + + let epochs = [ + 54321u64, 1017090249, 1827566880, 846255942, 766597383, 1204990115, 1616209495, + 1774367616, 1484598751, 3525502229, + ] + .into_iter() + .map(Epoch::from) + .collect::>(); + + // Test mainnet + let spec = ChainSpec::mainnet(); + + // Calculated by hand + let expected_valid_time: Vec = [ + 54528, 1017090371, 1827567108, 846256076, 766597570, 1204990135, 1616209582, + 1774367723, 1484598953, 3525502371, + ] + .into(); + + // Calculated from pyspec + let expected_subnets = vec![ + vec![4u64, 5u64], + vec![61, 62], + vec![23, 24], + vec![38, 39], + vec![53, 54], + vec![39, 40], + vec![48, 49], + vec![39, 40], + vec![34, 35], + vec![37, 38], + ]; + + for x in 0..node_ids.len() { + println!("Test: {}", x); + println!( + "NodeId: {}\n Epoch: {}\n, expected_update_time: {}\n, expected_subnets: {:?}", + node_ids[x], epochs[x], expected_valid_time[x], expected_subnets[x] + ); + + let (computed_subnets, valid_time) = SubnetId::compute_subnets_for_epoch::< + crate::MainnetEthSpec, + >(node_ids[x], epochs[x], &spec) + .unwrap(); + + assert_eq!(Epoch::from(expected_valid_time[x]), valid_time); + assert_eq!( + expected_subnets[x], + computed_subnets.map(SubnetId::into).collect::>() + ); + } + } +}