diff --git a/Cargo.lock b/Cargo.lock index 6809f8b1a1a..0f5c4473517 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5998,6 +5998,7 @@ dependencies = [ "bytes", "crossbeam-channel", "futures", + "ic-base-types", "ic-interfaces", "ic-logger", "ic-metrics", diff --git a/rs/p2p/consensus_manager/BUILD.bazel b/rs/p2p/consensus_manager/BUILD.bazel index de2ec249277..e709cf5f87a 100644 --- a/rs/p2p/consensus_manager/BUILD.bazel +++ b/rs/p2p/consensus_manager/BUILD.bazel @@ -13,6 +13,7 @@ DEPENDENCIES = [ "//rs/protobuf", "//rs/p2p/quic_transport", "//rs/types/types", + "//rs/types/base_types", "@crate_index//:axum_0_7_0", "@crate_index//:backoff", "@crate_index//:bytes", diff --git a/rs/p2p/consensus_manager/Cargo.toml b/rs/p2p/consensus_manager/Cargo.toml index 8e2ade03fa2..6fb4a1a31c7 100644 --- a/rs/p2p/consensus_manager/Cargo.toml +++ b/rs/p2p/consensus_manager/Cargo.toml @@ -11,11 +11,12 @@ backoff = { workspace = true } bytes = { workspace = true } crossbeam-channel = { workspace = true } futures = { workspace = true } +ic-base-types = { path = "../../types/base_types" } ic-interfaces = { path = "../../interfaces" } ic-logger = { path = "../../monitoring/logger" } -ic-protobuf = { path = "../../protobuf" } ic-metrics = { path = "../../monitoring/metrics" } ic-quic-transport = { path = "../quic_transport" } +ic-protobuf = { path = "../../protobuf" } ic-types = { path = "../../types/types" } phantom_newtype = { path = "../../phantom_newtype" } prometheus = { workspace = true } diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 2521a80128a..3d05d4cd5b5 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -1,8 +1,13 @@ use std::sync::{Arc, RwLock}; -use crate::metrics::ConsensusManagerMetrics; +use crate::{ + metrics::ConsensusManagerMetrics, + receiver::{build_axum_router, ConsensusManagerReceiver}, + sender::ConsensusManagerSender, +}; use axum::Router; use crossbeam_channel::Sender as CrossbeamSender; +use ic_base_types::NodeId; use ic_interfaces::p2p::{ artifact_manager::ArtifactProcessorEvent, consensus::{PriorityFnAndFilterProducer, ValidatedPoolReader}, @@ -15,11 +20,7 @@ use ic_protobuf::{ }; use ic_quic_transport::{ConnId, SubnetTopology, Transport}; use ic_types::artifact::{ArtifactKind, UnvalidatedArtifactMutation}; -use ic_types::NodeId; use phantom_newtype::AmountOf; -use receiver::build_axum_router; -use receiver::ConsensusManagerReceiver; -use sender::ConsensusManagerSender; use tokio::{ runtime::Handle, sync::{mpsc::Receiver, watch}, @@ -108,7 +109,7 @@ fn start_consensus_manager( // Locally produced adverts to send to the node's peers. adverts_to_send: Receiver>, // Adverts received from peers - adverts_received: Receiver<(AdvertUpdate, NodeId, ConnId)>, + adverts_received: Receiver<(SlotUpdate, NodeId, ConnId)>, raw_pool: Arc>, priority_fn_producer: Arc>, sender: CrossbeamSender>, @@ -142,7 +143,7 @@ fn start_consensus_manager( ); } -pub(crate) struct AdvertUpdate { +pub(crate) struct SlotUpdate { slot_number: SlotNumber, commit_id: CommitId, update: Update, @@ -153,22 +154,22 @@ pub(crate) enum Update { Advert((Artifact::Id, Artifact::Attribute)), } -impl From> for pb::AdvertUpdate { +impl From> for pb::SlotUpdate { fn from( - AdvertUpdate { + SlotUpdate { slot_number, commit_id, update, - }: AdvertUpdate, + }: SlotUpdate, ) -> Self { Self { commit_id: commit_id.get(), slot_id: slot_number.get(), update: Some(match update { Update::Artifact(artifact) => { - pb::advert_update::Update::Artifact(Artifact::PbMessage::proxy_encode(artifact)) + pb::slot_update::Update::Artifact(Artifact::PbMessage::proxy_encode(artifact)) } - Update::Advert((id, attribute)) => pb::advert_update::Update::Advert(pb::Advert { + Update::Advert((id, attribute)) => pb::slot_update::Update::Advert(pb::Advert { id: Artifact::PbId::proxy_encode(id), attribute: Artifact::PbAttribute::proxy_encode(attribute), }), @@ -177,22 +178,20 @@ impl From> for pb::AdvertUpdate { } } -impl TryFrom for AdvertUpdate { +impl TryFrom for SlotUpdate { type Error = ProxyDecodeError; - fn try_from(value: pb::AdvertUpdate) -> Result { + fn try_from(value: pb::SlotUpdate) -> Result { Ok(Self { slot_number: SlotNumber::from(value.slot_id), commit_id: CommitId::from(value.commit_id), update: match try_from_option_field(value.update, "update")? { - pb::advert_update::Update::Artifact(artifact) => { + pb::slot_update::Update::Artifact(artifact) => { Update::Artifact(Artifact::PbMessage::proxy_decode(&artifact)?) } - pb::advert_update::Update::Advert(pb::Advert { id, attribute }) => { - Update::Advert(( - Artifact::PbId::proxy_decode(&id)?, - Artifact::PbAttribute::proxy_decode(&attribute)?, - )) - } + pb::slot_update::Update::Advert(pb::Advert { id, attribute }) => Update::Advert(( + Artifact::PbId::proxy_decode(&id)?, + Artifact::PbAttribute::proxy_decode(&attribute)?, + )), }, }) } diff --git a/rs/p2p/consensus_manager/src/metrics.rs b/rs/p2p/consensus_manager/src/metrics.rs index d87fba05149..8de74c91891 100644 --- a/rs/p2p/consensus_manager/src/metrics.rs +++ b/rs/p2p/consensus_manager/src/metrics.rs @@ -43,9 +43,9 @@ pub(crate) struct ConsensusManagerMetrics { pub send_view_send_to_peer_delivered_total: IntCounter, pub send_view_resend_reconnect_total: IntCounter, - // Slot manager - pub slot_manager_used_slots: IntGauge, - pub slot_manager_maximum_slots_total: IntCounter, + // Available slot set + pub slot_set_in_use_slots: IntGauge, + pub slot_set_allocated_slots_total: IntCounter, } impl ConsensusManagerMetrics { @@ -250,17 +250,17 @@ impl ConsensusManagerMetrics { .unwrap(), ), - slot_manager_used_slots: metrics_registry.register( + slot_set_in_use_slots: metrics_registry.register( IntGauge::with_opts(opts!( - "ic_consensus_manager_slot_manager_used_slots", + "ic_consensus_manager_slot_set_in_use_slots", "Active slots in use.", const_labels.clone(), )) .unwrap(), ), - slot_manager_maximum_slots_total: metrics_registry.register( + slot_set_allocated_slots_total: metrics_registry.register( IntCounter::with_opts(opts!( - "ic_consensus_manager_slot_manager_maximum_slots_total", + "ic_consensus_manager_slot_set_allocated_slots_total", "Maximum of slots simultaneously used.", const_labels.clone(), )) diff --git a/rs/p2p/consensus_manager/src/receiver.rs b/rs/p2p/consensus_manager/src/receiver.rs index c75cee4522b..43ead30d375 100644 --- a/rs/p2p/consensus_manager/src/receiver.rs +++ b/rs/p2p/consensus_manager/src/receiver.rs @@ -1,9 +1,15 @@ +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + sync::{Arc, RwLock}, + time::Duration, +}; + use crate::{ metrics::{ ConsensusManagerMetrics, DOWNLOAD_TASK_RESULT_ALL_PEERS_DELETED, DOWNLOAD_TASK_RESULT_COMPLETED, DOWNLOAD_TASK_RESULT_DROP, }, - uri_prefix, AdvertUpdate, CommitId, SlotNumber, Update, + uri_prefix, CommitId, SlotNumber, SlotUpdate, Update, }; use axum::{ extract::State, @@ -14,18 +20,13 @@ use axum::{ use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; use bytes::Bytes; use crossbeam_channel::Sender as CrossbeamSender; +use ic_base_types::NodeId; use ic_interfaces::p2p::consensus::{PriorityFnAndFilterProducer, ValidatedPoolReader}; use ic_logger::{error, warn, ReplicaLogger}; use ic_protobuf::{p2p::v1 as pb, proxy::ProtoProxy}; use ic_quic_transport::{ConnId, SubnetTopology, Transport}; use ic_types::artifact::{ArtifactKind, Priority, PriorityFn, UnvalidatedArtifactMutation}; -use ic_types::NodeId; use rand::{rngs::SmallRng, seq::IteratorRandom, SeedableRng}; -use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, - sync::{Arc, RwLock}, - time::Duration, -}; use tokio::{ runtime::Handle, select, @@ -42,13 +43,13 @@ const MAX_ARTIFACT_RPC_TIMEOUT: Duration = Duration::from_secs(120); const PRIORITY_FUNCTION_UPDATE_INTERVAL: Duration = Duration::from_secs(3); type ValidatedPoolReaderRef = Arc + Send + Sync>>; -type ReceivedAdvertSender = Sender<(AdvertUpdate, NodeId, ConnId)>; +type ReceivedAdvertSender = Sender<(SlotUpdate, NodeId, ConnId)>; #[allow(unused)] pub fn build_axum_router( log: ReplicaLogger, pool: ValidatedPoolReaderRef, -) -> (Router, Receiver<(AdvertUpdate, NodeId, ConnId)>) { +) -> (Router, Receiver<(SlotUpdate, NodeId, ConnId)>) { let (update_tx, update_rx) = tokio::sync::mpsc::channel(100); let router = Router::new() .route( @@ -90,8 +91,8 @@ async fn update_handler( Extension(conn_id): Extension, payload: Bytes, ) -> Result<(), StatusCode> { - let update: AdvertUpdate = - pb::AdvertUpdate::proxy_decode(&payload).map_err(|_| StatusCode::BAD_REQUEST)?; + let update: SlotUpdate = + pb::SlotUpdate::proxy_decode(&payload).map_err(|_| StatusCode::BAD_REQUEST)?; if sender.send((update, peer, conn_id)).await.is_err() { error!( @@ -182,7 +183,7 @@ pub(crate) struct ConsensusManagerReceiver - ConsensusManagerReceiver, NodeId, ConnId)> + ConsensusManagerReceiver, NodeId, ConnId)> where Pool: 'static + Send + Sync + ValidatedPoolReader, Artifact: ArtifactKind, @@ -191,7 +192,7 @@ where log: ReplicaLogger, metrics: ConsensusManagerMetrics, rt_handle: Handle, - adverts_received: Receiver<(AdvertUpdate, NodeId, ConnId)>, + adverts_received: Receiver<(SlotUpdate, NodeId, ConnId)>, raw_pool: Arc>, priority_fn_producer: Arc>, sender: CrossbeamSender>, @@ -326,12 +327,12 @@ where pub(crate) fn handle_advert_receive( &mut self, - advert_update: AdvertUpdate, + advert_update: SlotUpdate, peer_id: NodeId, connection_id: ConnId, ) { self.metrics.slot_table_updates_total.inc(); - let AdvertUpdate { + let SlotUpdate { slot_number, commit_id, update, @@ -733,7 +734,7 @@ mod tests { rt_handle: Handle, // Adverts received from peers - adverts_received: Receiver<(AdvertUpdate, NodeId, ConnId)>, + adverts_received: Receiver<(SlotUpdate, NodeId, ConnId)>, raw_pool: Arc>, priority_fn_producer: Arc< dyn PriorityFnAndFilterProducer, @@ -744,7 +745,7 @@ mod tests { ) -> ConsensusManagerReceiver< U64Artifact, MockValidatedPoolReader, - (AdvertUpdate, NodeId, ConnId), + (SlotUpdate, NodeId, ConnId), > { let priority_fn = priority_fn_producer.get_priority_function(&raw_pool.read().unwrap()); let (current_priority_fn, _) = watch::channel(priority_fn); @@ -799,7 +800,7 @@ mod tests { pfn_rx, ); mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(1), update: Update::Advert((0, ())), @@ -825,7 +826,7 @@ mod tests { assert_eq!(mgr.artifact_processor_tasks.len(), 1); // Send stale advert with lower commit id. mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(0), update: Update::Advert((0, ())), @@ -848,7 +849,7 @@ mod tests { ); // Send stale advert with lower conn id mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(0), update: Update::Advert((0, ())), @@ -871,7 +872,7 @@ mod tests { ); // Send stale advert with lower conn id but higher commit id mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(10), update: Update::Advert((0, ())), @@ -894,7 +895,7 @@ mod tests { ); // Send stale advert with lower conn id and lower commit id mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(0), update: Update::Advert((0, ())), @@ -956,7 +957,7 @@ mod tests { ); mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(1), update: Update::Advert((0, ())), @@ -983,7 +984,7 @@ mod tests { assert_eq!(mgr.artifact_processor_tasks.len(), 1); // Send advert with higher conn id. mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(0), update: Update::Advert((1, ())), @@ -1052,7 +1053,7 @@ mod tests { ); mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(1), update: Update::Advert((0, ())), @@ -1062,7 +1063,7 @@ mod tests { ); // Second advert for advert 0. mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(1), update: Update::Advert((0, ())), @@ -1112,7 +1113,7 @@ mod tests { ); mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(1), update: Update::Advert((0, ())), @@ -1122,7 +1123,7 @@ mod tests { ); // Overwrite advert to close the download task. mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(2), update: Update::Advert((1, ())), @@ -1137,7 +1138,7 @@ mod tests { .unwrap(); // Simulate that a new peer was added for this advert while closing. mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(3), update: Update::Advert((0, ())), @@ -1193,7 +1194,7 @@ mod tests { pfn_rx, ); mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(1), update: Update::Advert((0, ())), @@ -1209,7 +1210,7 @@ mod tests { mgr.handle_pfn_timer_tick(); // Overwrite existing advert to finish download task. mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(2), update: Update::Advert((1, ())), @@ -1276,7 +1277,7 @@ mod tests { ); mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(1), update: Update::Advert((0, ())), @@ -1331,7 +1332,7 @@ mod tests { pfn_rx, ); mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(1), update: Update::Advert((0, ())), @@ -1340,7 +1341,7 @@ mod tests { ConnId::from(1), ); mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(1), update: Update::Advert((0, ())), @@ -1421,7 +1422,7 @@ mod tests { pfn_rx, ); mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(1), update: Update::Advert((0, ())), @@ -1484,7 +1485,7 @@ mod tests { ); // Add id 0 on slot 1. mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(1), update: Update::Advert((0, ())), @@ -1494,7 +1495,7 @@ mod tests { ); // Add id 0 on slot 2. mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(2), commit_id: CommitId::from(2), update: Update::Advert((0, ())), @@ -1504,7 +1505,7 @@ mod tests { ); // Overwrite id 0 on slot 1. mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(3), update: Update::Advert((1, ())), @@ -1525,7 +1526,7 @@ mod tests { assert_eq!(mgr.artifact_processor_tasks.len(), 2); // Overwrite remaining id 0 at slot 2. mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(2), commit_id: CommitId::from(4), update: Update::Advert((1, ())), @@ -1582,7 +1583,7 @@ mod tests { pfn_rx, ); mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(1), update: Update::Advert((0, ())), @@ -1591,7 +1592,7 @@ mod tests { ConnId::from(1), ); mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(2), commit_id: CommitId::from(2), update: Update::Advert((1, ())), @@ -1611,7 +1612,7 @@ mod tests { }); // Overwrite id 1 with id 0. mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(2), commit_id: CommitId::from(3), update: Update::Advert((0, ())), @@ -1666,7 +1667,7 @@ mod tests { pfn_rx, ); mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(1), update: Update::Advert((0, ())), @@ -1688,7 +1689,7 @@ mod tests { ); // Advertise id 0 again on same slot. mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(2), update: Update::Advert((0, ())), @@ -1728,7 +1729,7 @@ mod tests { // Overwrite id 0. mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(3), update: Update::Advert((2, ())), @@ -1802,7 +1803,7 @@ mod tests { pfn_rx, ); mgr.handle_advert_receive( - AdvertUpdate { + SlotUpdate { slot_number: SlotNumber::from(1), commit_id: CommitId::from(1), update: Update::Advert((0, ())), @@ -1871,7 +1872,7 @@ mod tests { ConsensusManagerReceiver::< U64Artifact, MockValidatedPoolReader, - (AdvertUpdate, NodeId, ConnId), + (SlotUpdate, NodeId, ConnId), >::download_artifact( no_op_logger(), &0, diff --git a/rs/p2p/consensus_manager/src/sender.rs b/rs/p2p/consensus_manager/src/sender.rs index cac49ef1da9..d3a86b6f02b 100644 --- a/rs/p2p/consensus_manager/src/sender.rs +++ b/rs/p2p/consensus_manager/src/sender.rs @@ -7,6 +7,7 @@ use std::{ use axum::http::Request; use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; use bytes::Bytes; +use ic_base_types::NodeId; use ic_interfaces::p2p::{ artifact_manager::ArtifactProcessorEvent, consensus::ValidatedPoolReader, }; @@ -14,18 +15,16 @@ use ic_logger::{error, warn, ReplicaLogger}; use ic_protobuf::{p2p::v1 as pb, proxy::ProtoProxy}; use ic_quic_transport::{ConnId, Transport}; use ic_types::artifact::{Advert, ArtifactKind}; -use ic_types::NodeId; -use tokio::task::AbortHandle; use tokio::{ runtime::Handle, select, sync::mpsc::Receiver, - task::{JoinHandle, JoinSet}, + task::{AbortHandle, JoinHandle, JoinSet}, time, }; use crate::{ - metrics::ConsensusManagerMetrics, uri_prefix, AdvertUpdate, CommitId, SlotNumber, Update, + metrics::ConsensusManagerMetrics, uri_prefix, CommitId, SlotNumber, SlotUpdate, Update, }; /// The size threshold for an artifact to be pushed. Artifacts smaller than this constant @@ -46,7 +45,7 @@ pub(crate) struct ConsensusManagerSender { pool_reader: Arc + Send + Sync>>, transport: Arc, adverts_to_send: Receiver>, - slot_manager: SlotManager, + slot_manager: AvailableSlotSet, current_commit_id: CommitId, active_adverts: HashMap, SlotNumber)>, } @@ -60,7 +59,8 @@ impl ConsensusManagerSender { transport: Arc, adverts_to_send: Receiver>, ) { - let slot_manager = SlotManager::new(log.clone(), metrics.clone(), Artifact::TAG.into()); + let slot_manager = + AvailableSlotSet::new(log.clone(), metrics.clone(), Artifact::TAG.into()); let manager = Self { log, @@ -114,7 +114,7 @@ impl ConsensusManagerSender { if let Some((send_task, free_slot)) = self.active_adverts.remove(id) { self.metrics.send_view_consensus_purge_active_total.inc(); send_task.abort(); - self.slot_manager.return_slot(free_slot); + self.slot_manager.push(free_slot); } else { self.metrics.send_view_consensus_dup_purge_total.inc(); } @@ -126,7 +126,7 @@ impl ConsensusManagerSender { if let Entry::Vacant(entry) = entry { self.metrics.send_view_consensus_new_adverts_total.inc(); - let slot = self.slot_manager.slot(); + let slot = self.slot_manager.pop(); let send_future = Self::send_advert_to_all_peers( self.rt_handle.clone(), @@ -182,7 +182,7 @@ impl ConsensusManagerSender { None }; - let advert_update: AdvertUpdate = AdvertUpdate { + let advert_update: SlotUpdate = SlotUpdate { slot_number, commit_id, update: match artifact { @@ -191,7 +191,7 @@ impl ConsensusManagerSender { }, }; - let body = Bytes::from(pb::AdvertUpdate::proxy_encode(advert_update)); + let body = Bytes::from(pb::SlotUpdate::proxy_encode(advert_update)); let mut in_progress_transmissions = JoinSet::new(); // Stores the connection ID and the `AbortHandle` of the last successful transmission task to a peer. @@ -272,7 +272,7 @@ async fn send_advert_to_peer( } } -struct SlotManager { +struct AvailableSlotSet { next_free_slot: SlotNumber, free_slots: Vec, log: ReplicaLogger, @@ -280,7 +280,7 @@ struct SlotManager { service_name: &'static str, } -impl SlotManager { +impl AvailableSlotSet { fn new( log: ReplicaLogger, metrics: ConsensusManagerMetrics, @@ -295,13 +295,14 @@ impl SlotManager { } } - fn return_slot(&mut self, slot: SlotNumber) { + fn push(&mut self, slot: SlotNumber) { self.free_slots.push(slot); - self.metrics.slot_manager_used_slots.dec(); + self.metrics.slot_set_in_use_slots.dec(); } - fn slot(&mut self) -> SlotNumber { - self.metrics.slot_manager_used_slots.inc(); + /// Returns available slot. + fn pop(&mut self) -> SlotNumber { + self.metrics.slot_set_in_use_slots.inc(); match self.free_slots.pop() { Some(slot) => slot, None => { @@ -317,7 +318,7 @@ impl SlotManager { let new_slot = self.next_free_slot; self.next_free_slot.inc_assign(); - self.metrics.slot_manager_maximum_slots_total.inc(); + self.metrics.slot_set_allocated_slots_total.inc(); new_slot } @@ -596,8 +597,8 @@ mod tests { .expect_push() .times(3) .returning(move |_, r| { - let advert: AdvertUpdate = - pb::AdvertUpdate::proxy_decode(&r.into_body()).unwrap(); + let advert: SlotUpdate = + pb::SlotUpdate::proxy_decode(&r.into_body()).unwrap(); commit_id_tx.send(advert.commit_id).unwrap(); Ok(()) }); @@ -664,8 +665,8 @@ mod tests { .expect_push() .times(2) .returning(move |_, r| { - let advert: AdvertUpdate = - pb::AdvertUpdate::proxy_decode(&r.into_body()).unwrap(); + let advert: SlotUpdate = + pb::SlotUpdate::proxy_decode(&r.into_body()).unwrap(); commit_id_tx.send(advert.commit_id).unwrap(); Ok(()) }); @@ -708,7 +709,7 @@ mod tests { /// Test that we can take more slots than SLOT_TABLE_THRESHOLD #[test] fn slot_manager_unrestricted() { - let mut sm = SlotManager::new( + let mut sm = AvailableSlotSet::new( no_op_logger(), ConsensusManagerMetrics::new::(&MetricsRegistry::default()), "test", @@ -716,13 +717,13 @@ mod tests { // Take more than SLOT_TABLE_THRESHOLD number of slots for i in 0..(SLOT_TABLE_THRESHOLD * 5) { - assert_eq!(sm.slot().get(), i); + assert_eq!(sm.pop().get(), i); } // Give back all the slots. for i in 0..(SLOT_TABLE_THRESHOLD * 5) { - sm.return_slot(SlotNumber::from(i)); + sm.push(SlotNumber::from(i)); } // Check that we get the slot that was returned last - assert_eq!(sm.slot().get(), SLOT_TABLE_THRESHOLD * 5 - 1); + assert_eq!(sm.pop().get(), SLOT_TABLE_THRESHOLD * 5 - 1); } } diff --git a/rs/protobuf/def/p2p/v1/consensus_manager.proto b/rs/protobuf/def/p2p/v1/consensus_manager.proto index 0978dce703c..4c1b8235046 100644 --- a/rs/protobuf/def/p2p/v1/consensus_manager.proto +++ b/rs/protobuf/def/p2p/v1/consensus_manager.proto @@ -2,7 +2,7 @@ syntax = "proto3"; package p2p.v1; -message AdvertUpdate { +message SlotUpdate { uint64 commit_id = 1; uint64 slot_id = 2; oneof update { diff --git a/rs/protobuf/src/gen/p2p/p2p.v1.rs b/rs/protobuf/src/gen/p2p/p2p.v1.rs index de7a23e5e11..15fbe33d5a7 100644 --- a/rs/protobuf/src/gen/p2p/p2p.v1.rs +++ b/rs/protobuf/src/gen/p2p/p2p.v1.rs @@ -26,16 +26,16 @@ pub struct StateSyncChunkResponse { #[derive(serde::Serialize, serde::Deserialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct AdvertUpdate { +pub struct SlotUpdate { #[prost(uint64, tag = "1")] pub commit_id: u64, #[prost(uint64, tag = "2")] pub slot_id: u64, - #[prost(oneof = "advert_update::Update", tags = "3, 4")] - pub update: ::core::option::Option, + #[prost(oneof = "slot_update::Update", tags = "3, 4")] + pub update: ::core::option::Option, } -/// Nested message and enum types in `AdvertUpdate`. -pub mod advert_update { +/// Nested message and enum types in `SlotUpdate`. +pub mod slot_update { #[derive(serde::Serialize, serde::Deserialize)] #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Oneof)] diff --git a/rs/tests/dashboards/IC/p2pv2.json b/rs/tests/dashboards/IC/p2pv2.json index 333bdabaea5..7b6b96a6177 100644 --- a/rs/tests/dashboards/IC/p2pv2.json +++ b/rs/tests/dashboards/IC/p2pv2.json @@ -2382,7 +2382,7 @@ }, "editorMode": "code", "exemplar": true, - "expr": "label_replace(\nlabel_replace(\n ic_consensus_manager_slot_manager_used_slots{job=\"replica\",ic=\"$ic\",ic_subnet=~\"$ic_subnet\",instance=~\"$instance\",client=~\"$client\"},\n \"ic_subnet\", \"$1\", \"ic_subnet\", \"([^-]+)-.*\"),\n \"instance\", \"$1:...:$2\", \"instance\", \"(\\\\[(?:[^:]+[.:]){4}).*(:[^:]+\\\\]?):[0-9]+\"\n)", + "expr": "label_replace(\nlabel_replace(\n ic_consensus_manager_slot_set_in_use_slots{job=\"replica\",ic=\"$ic\",ic_subnet=~\"$ic_subnet\",instance=~\"$instance\",client=~\"$client\"},\n \"ic_subnet\", \"$1\", \"ic_subnet\", \"([^-]+)-.*\"),\n \"instance\", \"$1:...:$2\", \"instance\", \"(\\\\[(?:[^:]+[.:]){4}).*(:[^:]+\\\\]?):[0-9]+\"\n)", "interval": "", "legendFormat": "{{ic_subnet}} {{instance}} {{result}}", "range": true, @@ -2482,7 +2482,7 @@ }, "editorMode": "code", "exemplar": true, - "expr": "label_replace(\nlabel_replace(\n ic_consensus_manager_slot_manager_maximum_slots_total{job=\"replica\",ic=\"$ic\",ic_subnet=~\"$ic_subnet\",instance=~\"$instance\",client=~\"$client\"},\n \"ic_subnet\", \"$1\", \"ic_subnet\", \"([^-]+)-.*\"),\n \"instance\", \"$1:...:$2\", \"instance\", \"(\\\\[(?:[^:]+[.:]){4}).*(:[^:]+\\\\]?):[0-9]+\"\n)", + "expr": "label_replace(\nlabel_replace(\n ic_consensus_manager_slot_set_allocated_slots_total{job=\"replica\",ic=\"$ic\",ic_subnet=~\"$ic_subnet\",instance=~\"$instance\",client=~\"$client\"},\n \"ic_subnet\", \"$1\", \"ic_subnet\", \"([^-]+)-.*\"),\n \"instance\", \"$1:...:$2\", \"instance\", \"(\\\\[(?:[^:]+[.:]){4}).*(:[^:]+\\\\]?):[0-9]+\"\n)", "interval": "", "legendFormat": "{{ic_subnet}} {{instance}} {{result}}", "range": true,