From ae3bbaa0c58b720614ac05b827c859994f01385b Mon Sep 17 00:00:00 2001 From: Caio Date: Wed, 9 Nov 2022 11:23:14 +0100 Subject: [PATCH 1/6] clippy: tracing: allow needless_borrow on `record` tracing-attributes 0.1.23 ships with a smarter `Span::record` that allows owned values [^1], which makes This means that previously invalid code such as: span.record("num_updates", num_items); Is now valid, which makes clippy throw a warning for needless_borrow Fixing the lint, however, means that foca would require a strict version check for tracing-attributes (otherwise anything pulling 0.1.22 or bellow would not compile anymore). I don't want a strict dependency on tracing: it's tangential do foca and I'd like users to benefit from the improvements that might be shipped along, so I fix the lint warning by ignoring it instead. Once we get a (at least minor) version bump on `tracing`, this can be removed and updated to skip the borrow. [^1]: https://github.com/tokio-rs/tracing/pull/2212 --- src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 2529343..5014546 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1152,6 +1152,7 @@ where // Seek back and write the correct number of items added buf.get_mut()[tally_position..].as_mut().put_u16(num_items); #[cfg(feature = "tracing")] + #[allow(clippy::needless_borrow)] span.record("num_updates", &num_items); } @@ -1168,11 +1169,13 @@ where #[cfg_attr(not(feature = "tracing"), allow(unused_variables))] let num_broadcasts = self.custom_broadcasts.fill(&mut buf, usize::MAX); #[cfg(feature = "tracing")] + #[allow(clippy::needless_borrow)] span.record("num_broadcasts", &num_broadcasts); } let data = buf.into_inner(); #[cfg(feature = "tracing")] + #[allow(clippy::needless_borrow)] span.record("len", &data.len()); #[cfg(feature = "tracing")] From 9b9dc7bc94cacec42e70071aec4b5e7e8ea11a68 Mon Sep 17 00:00:00 2001 From: Caio Date: Thu, 10 Nov 2022 09:44:32 +0100 Subject: [PATCH 2/6] Ability to notify active down members When a member is declared down by the cluster, its messages get ignored until the cluster "forgets" it went down (i.e.: after `Config::remove_down_after` expires). This patch introduces a new setting `Config::notify_down_members` which makes foca send a lightweight message to said members when they're down. --- src/config.rs | 21 +++++++++ src/lib.rs | 126 ++++++++++++++++++++++++++++++++++++++++++++----- src/payload.rs | 6 +++ src/testing.rs | 4 ++ 4 files changed, 145 insertions(+), 12 deletions(-) diff --git a/src/config.rs b/src/config.rs index 68e514d..62b7f70 100644 --- a/src/config.rs +++ b/src/config.rs @@ -94,10 +94,25 @@ pub struct Config { /// to a value smaller than your network's MTU. 1400 is a good /// value for a in a non-ancient network. pub max_packet_size: NonZeroUsize, + + /// Wether foca should try to let members that are down know about it + /// + /// Whenever a member is declared down by the cluster, their messages + /// get ignored and there's no way for them to learn that this is happening. + /// With this setting enabled, will try to notify the down member that + /// their messages are being discarded. + /// + /// This feature is an extension to the SWIM protocol and should be left + /// disabled if you're aiming at pure SWIM behavior. + pub notify_down_members: bool, } impl Config { /// A simple configuration that should work well in a LAN scenario. + /// + /// This is Foca in its simplest form and has no extensions enabled. + /// Use this config if you are trying to get a grasp of how SWIM + /// works, without any additional behavior. pub fn simple() -> Self { Self { probe_period: Duration::from_millis(1500), @@ -110,6 +125,8 @@ impl Config { remove_down_after: Duration::from_secs(15), max_packet_size: NonZeroUsize::new(1400).unwrap(), + + notify_down_members: false, } } } @@ -144,6 +161,8 @@ impl Config { remove_down_after: Duration::from_secs(15), max_packet_size: NonZeroUsize::new(1400).unwrap(), + + notify_down_members: true, } } @@ -169,6 +188,8 @@ impl Config { remove_down_after: Duration::from_secs(15), max_packet_size: NonZeroUsize::new(1400).unwrap(), + + notify_down_members: true, } } diff --git a/src/lib.rs b/src/lib.rs index 5014546..bbdf3fe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -543,7 +543,7 @@ where token, } => { if self.timer_token == token { - let as_down = Member::new(member_id, incarnation, State::Down); + let as_down = Member::new(member_id.clone(), incarnation, State::Down); if let Some(summary) = self .members // Down is terminal, so before doing that we ensure the member @@ -556,7 +556,14 @@ where { self.handle_apply_summary(&summary, as_down, &mut runtime)?; // Member went down we might need to adjust our internal state - self.adjust_connection_state(runtime); + self.adjust_connection_state(&mut runtime); + + if self.config.notify_down_members { + // As a courtesy, we send a lightweight message to the member + // we're declaring down so that if it manages to receive it, + // it can react accordingly + self.send_message(member_id, Message::TurnUndead, runtime)?; + } } else { #[cfg(feature = "tracing")] tracing::debug!("Member not found"); @@ -726,6 +733,10 @@ where #[cfg(feature = "tracing")] tracing::debug!("Discarded: Inactive sender"); + if self.config.notify_down_members { + self.send_message(src, Message::TurnUndead, runtime)?; + } + return Ok(()); } @@ -840,6 +851,9 @@ where } } Message::Announce => self.send_message(src, Message::Feed, runtime)?, + Message::TurnUndead => { + self.handle_self_update(Incarnation::default(), State::Down, runtime)? + } // Nothing to do. These messages do not expect any reply Message::Gossip | Message::Feed | Message::Broadcast => {} }; @@ -1105,8 +1119,8 @@ where .map_err(Error::Encode)?; let (needs_piggyback, only_active_members) = match header.message { - // Announce packets contain nothing but the header - Message::Announce => (false, false), + // Announce/TurnUndead packets contain nothing but the header + Message::Announce | Message::TurnUndead => (false, false), // Feed packets stuff active members at the tail Message::Feed => (true, true), // Broadcast packets stuffs only custom broadcasts @@ -2337,12 +2351,13 @@ mod tests { // to continue the probe cycle fn craft_probing_foca( num_members: u8, + config: Config, ) -> ( Foca, ID, Timer, ) { - let mut foca = Foca::new(ID::new(1), config(), rng(), codec()); + let mut foca = Foca::new(ID::new(1), config.clone(), rng(), codec()); let mut runtime = InMemoryRuntime::new(); assert!(num_members > 0); @@ -2355,7 +2370,7 @@ mod tests { // The runtime shoud've been instructed to schedule a // probe for later on let expected_timer = Timer::ProbeRandomMember(foca.timer_token()); - expect_scheduling!(runtime, expected_timer.clone(), config().probe_period); + expect_scheduling!(runtime, expected_timer.clone(), config.probe_period); // We'll trigger it right now instead runtime.clear(); @@ -2379,7 +2394,7 @@ mod tests { probed_id: probed, token: foca.timer_token(), }; - expect_scheduling!(runtime, send_indirect_probe.clone(), config().probe_rtt); + expect_scheduling!(runtime, send_indirect_probe.clone(), config.probe_rtt); (foca, probed, send_indirect_probe) } @@ -2391,7 +2406,7 @@ mod tests { // are no more active members in the cluster (thus going Idle) // A foca is probing - let (mut foca, _probed, _send_indirect_probe) = craft_probing_foca(2); + let (mut foca, _probed, _send_indirect_probe) = craft_probing_foca(2, config()); let mut runtime = InMemoryRuntime::new(); // Clippy gets it wrong here: can't use just the plain iterator @@ -2414,7 +2429,7 @@ mod tests { #[test] fn probe_ping_ack_cycle() { - let (mut foca, probed, send_indirect_probe) = craft_probing_foca(5); + let (mut foca, probed, send_indirect_probe) = craft_probing_foca(5, config()); let mut runtime = InMemoryRuntime::new(); // Now if probed replies before the timer fires, the probe @@ -2441,7 +2456,7 @@ mod tests { #[test] fn probe_cycle_requires_correct_probe_number() { - let (mut foca, probed, send_indirect_probe) = craft_probing_foca(5); + let (mut foca, probed, send_indirect_probe) = craft_probing_foca(5, config()); let mut runtime = InMemoryRuntime::new(); let incorrect_probe_number = foca.probe().probe_number() + 1; @@ -2478,7 +2493,7 @@ mod tests { // `num_indirect_probes + 1` so that we can verify that // we don't send more requests than the configured value. let (mut foca, probed, send_indirect_probe) = - craft_probing_foca((num_indirect_probes + 2) as u8); + craft_probing_foca((num_indirect_probes + 2) as u8, config()); let mut runtime = InMemoryRuntime::new(); // `probed` did NOT reply with an Ack before the timer @@ -3209,7 +3224,7 @@ mod tests { fn can_recover_from_incomplete_probe_cycle() { // Here we get a foca in the middle of a probe cycle. The correct // sequencing should submit `_send_indirect_probe` - let (mut foca, _probed, _send_indirect_probe) = craft_probing_foca(2); + let (mut foca, _probed, _send_indirect_probe) = craft_probing_foca(2, config()); let mut runtime = InMemoryRuntime::new(); // ... but we'll manually craft a ProbeRandomMember event instead // to trigger the validation failure @@ -3229,4 +3244,91 @@ mod tests { "didn't submit a new probe event" ); } + + #[test] + fn declaring_a_member_as_down_notifies_them() { + let config = { + let mut c = Config::simple(); + c.notify_down_members = true; + c + }; + + let (mut foca, probed, send_indirect_probe) = craft_probing_foca(2, config); + let mut runtime = InMemoryRuntime::new(); + + // `probed` did NOT reply with an Ack before the timer + assert_eq!(Ok(()), foca.handle_timer(send_indirect_probe, &mut runtime)); + // ... and nothing happens for the indirect cycle + + runtime.clear(); + // So by the time the ChangeSuspectToDown timer fires + assert_eq!( + Ok(()), + foca.handle_timer( + Timer::ChangeSuspectToDown { + member_id: probed, + incarnation: Incarnation::default(), + token: foca.timer_token() + }, + &mut runtime + ) + ); + + // The runtime should be instructed to send data to `probed` + let data = runtime + .take_data(probed) + .expect("probed should be sent a message"); + let header = codec().decode_header(&data[..]).expect("valid payload"); + + // And it should be a TurnUndead message + assert_eq!( + header.message, + Message::TurnUndead, + "member should have been sent a TurnUndead message" + ); + } + + #[test] + fn message_from_down_member_is_replied_with_turn_undead() { + let config = { + let mut c = config(); + c.notify_down_members = true; + c + }; + let mut runtime = InMemoryRuntime::new(); + + // We have a simple foca instance + let mut foca = Foca::new(ID::new(1), config, rng(), codec()); + let down_id = ID::new(2); + // That knows that ID=2 is down + assert_eq!(Ok(()), foca.apply(Member::down(down_id), &mut runtime)); + + // And we have a message from member ID=2 to ID=1 + let header = Header { + src: down_id, + src_incarnation: 1, + dst: ID::new(1), + message: Message::Announce, + }; + + let mut msg = Vec::new(); + codec() + .encode_header(&header, &mut msg) + .expect("codec works fine"); + + // When foca receives such message + assert_eq!(Ok(()), foca.handle_data(&msg[..], &mut runtime)); + + let data = runtime + .take_data(down_id) + .expect("should dispatch a message to ID=2"); + let h = codec().decode_header(&data[..]).expect("valid payload"); + + // It should send a message to ID=2 notifying it + assert_eq!( + h.message, + Message::TurnUndead, + "id=2 should have been sent a TurnUndead message" + ); + } } diff --git a/src/payload.rs b/src/payload.rs index a4f4539..afa87ab 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -134,6 +134,12 @@ pub enum Message { /// Deliberate dissemination of custom broadcasts. Broadcast /// messages do not contain cluster updates. Broadcast, + + /// Indicates that the receiver is considered down by the sender + /// + /// This is an optional message that Foca sends whenever a member + /// that's considered down sends a message. + TurnUndead, } /// ProbeNumber is simply a bookkeeping mechanism to try and prevent diff --git a/src/testing.rs b/src/testing.rs index 092cbba..ae0e501 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -185,6 +185,7 @@ impl BadCodec { 8 => Message::Announce, 9 => Message::Feed, 10 => Message::Broadcast, + 11 => Message::TurnUndead, other => return Err(BadCodecError::BadMessageID(other)), }; @@ -252,6 +253,9 @@ impl BadCodec { Message::Broadcast => { buf.put_u8(10); } + Message::TurnUndead => { + buf.put_u8(11); + } } Ok(()) From 950aebcc376b72f76bbcfc89b54246a9e302ed3a Mon Sep 17 00:00:00 2001 From: Caio Date: Fri, 11 Nov 2022 10:51:02 +0100 Subject: [PATCH 3/6] extract choose_and_send method we'll be repeating the pattern of picking random members and sending a message to each --- src/lib.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index bbdf3fe..70a5665 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -351,17 +351,31 @@ where /// to attempt reducing the time it takes for information to /// propagate thoroughly. #[cfg_attr(feature = "tracing", tracing::instrument(skip(runtime)))] - pub fn gossip(&mut self, mut runtime: impl Runtime) -> Result<()> { + pub fn gossip(&mut self, runtime: impl Runtime) -> Result<()> { + self.choose_and_send( + self.config.num_indirect_probes.get(), + Message::Gossip, + runtime, + ) + } + + // Pick `num_members` random active members and send `msg` to them + fn choose_and_send( + &mut self, + num_members: usize, + msg: Message, + mut runtime: impl Runtime, + ) -> Result<()> { self.member_buf.clear(); self.members.choose_active_members( - self.config.num_indirect_probes.get(), + num_members, &mut self.member_buf, &mut self.rng, |_| true, ); while let Some(chosen) = self.member_buf.pop() { - self.send_message(chosen.into_identity(), Message::Gossip, &mut runtime)?; + self.send_message(chosen.into_identity(), msg.clone(), &mut runtime)?; } Ok(()) From 3d0cfa47bd14c2a34b1c55f3bc50d46732ef4afb Mon Sep 17 00:00:00 2001 From: Caio Date: Fri, 11 Nov 2022 10:59:56 +0100 Subject: [PATCH 4/6] Ability to periodically announce to the cluster This introduces Config::periodic_announce, which allows users to instrcut foca to periodically send a Announce message to random members so that it learns about every cluster member faster --- src/config.rs | 45 +++++++++++++++++ src/lib.rs | 131 +++++++++++++++++++++++++++++++++++++++++-------- src/runtime.rs | 4 ++ 3 files changed, 160 insertions(+), 20 deletions(-) diff --git a/src/config.rs b/src/config.rs index 62b7f70..d896ee7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -105,6 +105,39 @@ pub struct Config { /// This feature is an extension to the SWIM protocol and should be left /// disabled if you're aiming at pure SWIM behavior. pub notify_down_members: bool, + + /// How often should foca ask its peers for more peers + /// + /// [`crate::Message::Announce`] is the mechanism foca uses to discover + /// members. After joining a sizeable cluster, it may take a while until + /// foca discovers every active member. Periodically announcing helps speed + /// up this process. + /// + /// This setting is helpful for any cluster size, but smaller ones can + /// get by without it if discovering the full active roster quickly is + /// not a requirement. + /// + /// As a rule of thumb, use large values for `frequency` (say, every + /// 30s, every minute, etc) and small values for `num_members`: just + /// one might be good enough for many clusters. + /// + /// Whilst you can change the parameters at runtime, foca prevents you from + /// changing it from `None` to `Some` to simplify reasoning. It's required + /// to recreate your foca instance in these cases. + /// + /// This feature is an extension to the SWIM protocol and should be left + /// disabled if you're aiming at pure SWIM behavior. + pub periodic_announce: Option, +} + +/// Configuration for a task that should happen periodically +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct PeriodicParams { + /// How often should the task be performed + pub frequency: Duration, + /// How many random members should be chosen + pub num_members: NonZeroUsize, } impl Config { @@ -127,6 +160,8 @@ impl Config { max_packet_size: NonZeroUsize::new(1400).unwrap(), notify_down_members: false, + + periodic_announce: None, } } } @@ -163,6 +198,11 @@ impl Config { max_packet_size: NonZeroUsize::new(1400).unwrap(), notify_down_members: true, + + periodic_announce: Some(PeriodicParams { + frequency: Duration::from_secs(30), + num_members: NonZeroUsize::new(1).unwrap(), + }), } } @@ -190,6 +230,11 @@ impl Config { max_packet_size: NonZeroUsize::new(1400).unwrap(), notify_down_members: true, + + periodic_announce: Some(PeriodicParams { + frequency: Duration::from_secs(60), + num_members: NonZeroUsize::new(2).unwrap(), + }), } } diff --git a/src/lib.rs b/src/lib.rs index 70a5665..6bec9c2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -616,6 +616,25 @@ where Ok(()) } } + Timer::PeriodicAnnounce(token) => { + if token == self.timer_token && self.connection_state == ConnectionState::Connected + { + // The configuration may change during runtime, so we can't + // assume that this is Some() when the timer fires + if let Some(ref params) = self.config.periodic_announce { + // Re-schedule the event + runtime.submit_after( + Timer::PeriodicAnnounce(self.timer_token), + params.frequency, + ); + // And send the messages + self.choose_and_send(params.num_members.get(), Message::Announce, runtime)?; + } + } + // else: invalid token and/or not-connected: may happen if the + // instance gets declared down by the cluster + Ok(()) + } } } @@ -652,6 +671,7 @@ where pub fn set_config(&mut self, config: Config) -> Result<()> { if self.config.probe_period != config.probe_period || self.config.probe_rtt != config.probe_rtt + || (self.config.periodic_announce.is_none() && config.periodic_announce.is_some()) { Err(Error::InvalidConfig) } else { @@ -1093,6 +1113,10 @@ where self.config.probe_period, ); + if let Some(ref params) = self.config.periodic_announce { + runtime.submit_after(Timer::PeriodicAnnounce(self.timer_token), params.frequency); + } + runtime.notify(Notification::Active); } @@ -1626,6 +1650,20 @@ mod tests { }; } + macro_rules! expect_message { + ($runtime: expr, $member: expr, $message: expr) => { + let d = $runtime + .take_data($member) + .unwrap_or_else(|| panic!("Message to member {:?} not found", $member)); + let (header, _) = decode(d); + assert_eq!( + header.message, $message, + "Message to member {:?} is {:?}. Expected {:?}", + $member, header.message, $message + ); + }; + } + #[test] fn can_join_with_another_client() { let mut foca_one = Foca::new(ID::new(1), config(), rng(), codec()); @@ -3288,18 +3326,8 @@ mod tests { ) ); - // The runtime should be instructed to send data to `probed` - let data = runtime - .take_data(probed) - .expect("probed should be sent a message"); - let header = codec().decode_header(&data[..]).expect("valid payload"); - - // And it should be a TurnUndead message - assert_eq!( - header.message, - Message::TurnUndead, - "member should have been sent a TurnUndead message" - ); + // The runtime should be instructed to send a TurnUndead message to `probed` + expect_message!(runtime, probed, Message::::TurnUndead); } #[test] @@ -3333,16 +3361,79 @@ mod tests { // When foca receives such message assert_eq!(Ok(()), foca.handle_data(&msg[..], &mut runtime)); - let data = runtime - .take_data(down_id) - .expect("should dispatch a message to ID=2"); - let h = codec().decode_header(&data[..]).expect("valid payload"); - // It should send a message to ID=2 notifying it + expect_message!(runtime, down_id, Message::::TurnUndead); + } + + #[test] + fn periodic_announce_behaviour() { + let frequency = Duration::from_millis(500); + let num_members = NonZeroUsize::new(2).unwrap(); + let config = { + let mut c = config(); + c.periodic_announce = Some(config::PeriodicParams { + frequency, + num_members, + }); + c + }; + + // A foca instance that periodically announces + let mut foca = Foca::new(ID::new(1), config, rng(), codec()); + let mut runtime = InMemoryRuntime::new(); + + // When it becomes active (i.e.: has at least one active member) + assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(2)), &mut runtime)); + assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(3)), &mut runtime)); + + // Should schedule an event for announcing + expect_scheduling!( + runtime, + Timer::::PeriodicAnnounce(foca.timer_token()), + frequency + ); + + runtime.clear(); + // After the event fires assert_eq!( - h.message, - Message::TurnUndead, - "id=2 should have been sent a TurnUndead message" + Ok(()), + foca.handle_timer(Timer::PeriodicAnnounce(foca.timer_token()), &mut runtime) + ); + + // It should've scheduled the event again + expect_scheduling!( + runtime, + Timer::::PeriodicAnnounce(foca.timer_token()), + frequency ); + + // And sent an announce message to `num_members` random members + // (since num_members=2 and this instance only knows about two, we know + // which should've been picked) + expect_message!(runtime, ID::new(2), Message::::Announce); + expect_message!(runtime, ID::new(3), Message::::Announce); + } + + #[test] + fn periodic_announce_cannot_be_enabled_at_runtime() { + let mut c = config(); + assert!(c.periodic_announce.is_none()); + + // A foca instance that's running without periodic announce + let mut foca = Foca::new(ID::new(1), c.clone(), rng(), codec()); + + c.periodic_announce = Some(config::PeriodicParams { + frequency: Duration::from_secs(5), + num_members: NonZeroUsize::new(1).unwrap(), + }); + + // Must not be able to enable it during runtime + assert_eq!(Err(Error::InvalidConfig), foca.set_config(c.clone())); + + // However, a foca that starts with periodic announce enabled + let mut foca = Foca::new(ID::new(1), c, rng(), codec()); + + // Is able to turn it off + assert_eq!(Ok(()), foca.set_config(config())); } } diff --git a/src/runtime.rs b/src/runtime.rs index e4e6c49..ccd5bbc 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -129,6 +129,10 @@ pub enum Timer { /// Forgets about dead member `T`, allowing them to join the /// cluster again with the same identity. RemoveDown(T), + + /// Sends a [`crate::Message::Announce`] to randomly chosen members as + /// specified by [`crate::Config::periodic_announce`] + PeriodicAnnounce(TimerToken), } /// TimerToken is simply a bookkeeping mechanism to try and prevent From 9ca29d03d34434dfefe8269c38d43673a83068a5 Mon Sep 17 00:00:00 2001 From: Caio Date: Sat, 12 Nov 2022 11:15:27 +0100 Subject: [PATCH 5/6] Ability to periodically gossip This introduces Config::periodic_gossip which instructs foca to disseminate cluster updates to random members with a fixed frequency so that the cluster learns new information faster periodic announce paved the way, so this was mostly copy-paste :-) --- src/config.rs | 31 ++++++++++++++ src/lib.rs | 113 ++++++++++++++++++++++++++++++++++++++----------- src/runtime.rs | 4 ++ 3 files changed, 123 insertions(+), 25 deletions(-) diff --git a/src/config.rs b/src/config.rs index d896ee7..8c04fd7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -128,6 +128,28 @@ pub struct Config { /// This feature is an extension to the SWIM protocol and should be left /// disabled if you're aiming at pure SWIM behavior. pub periodic_announce: Option, + + /// How often should foca send cluster updates to peers + /// + /// By default, SWIM disseminates cluster updates during the direct and + /// indirect probe cycle (See [`crate::Message`]). This setting instructs + /// foca to also propagate updates periodically. + /// + /// Periodically gossiping influences the speed in which the cluster learns + /// new information, but gossiping too much is often unnecessary since + /// cluster changes are not (normally) high-rate events. + /// + /// A LAN cluster can afford high frequency gossiping (every 200ms, for example) + /// without much worry; A WAN cluster might have better results gossiping less + /// often (500ms) but to more members at once. + /// + /// Whilst you can change the parameters at runtime, foca prevents you from + /// changing it from `None` to `Some` to simplify reasoning. It's required + /// to recreate your foca instance in these cases. + /// + /// This feature is an extension to the SWIM protocol and should be left + /// disabled if you're aiming at pure SWIM behavior. + pub periodic_gossip: Option, } /// Configuration for a task that should happen periodically @@ -162,6 +184,7 @@ impl Config { notify_down_members: false, periodic_announce: None, + periodic_gossip: None, } } } @@ -203,6 +226,10 @@ impl Config { frequency: Duration::from_secs(30), num_members: NonZeroUsize::new(1).unwrap(), }), + periodic_gossip: Some(PeriodicParams { + frequency: Duration::from_millis(200), + num_members: NonZeroUsize::new(3).unwrap(), + }), } } @@ -235,6 +262,10 @@ impl Config { frequency: Duration::from_secs(60), num_members: NonZeroUsize::new(2).unwrap(), }), + periodic_gossip: Some(PeriodicParams { + frequency: Duration::from_millis(500), + num_members: NonZeroUsize::new(4).unwrap(), + }), } } diff --git a/src/lib.rs b/src/lib.rs index 6bec9c2..e0fc3a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -635,6 +635,20 @@ where // instance gets declared down by the cluster Ok(()) } + Timer::PeriodicGossip(token) => { + // Exact same thing as PeriodicAnnounce, just using different settings / messages + if token == self.timer_token && self.connection_state == ConnectionState::Connected + { + if let Some(ref params) = self.config.periodic_gossip { + runtime.submit_after( + Timer::PeriodicGossip(self.timer_token), + params.frequency, + ); + self.choose_and_send(params.num_members.get(), Message::Gossip, runtime)?; + } + } + Ok(()) + } } } @@ -672,6 +686,7 @@ where if self.config.probe_period != config.probe_period || self.config.probe_rtt != config.probe_rtt || (self.config.periodic_announce.is_none() && config.periodic_announce.is_some()) + || (self.config.periodic_gossip.is_none() && config.periodic_gossip.is_some()) { Err(Error::InvalidConfig) } else { @@ -1117,6 +1132,10 @@ where runtime.submit_after(Timer::PeriodicAnnounce(self.timer_token), params.frequency); } + if let Some(ref params) = self.config.periodic_gossip { + runtime.submit_after(Timer::PeriodicGossip(self.timer_token), params.frequency); + } + runtime.notify(Notification::Active); } @@ -3365,20 +3384,27 @@ mod tests { expect_message!(runtime, down_id, Message::::TurnUndead); } - #[test] - fn periodic_announce_behaviour() { + // There are multiple "do this thing periodically" settings. This + // helps test those. Takes: + // - something that knows which configuration to set + // - something that knows which event should be sent + // - the message that should be sent + fn check_periodic_behaviour(config_setter: F, mut event_maker: G, message: Message) + where + F: Fn(&mut Config, config::PeriodicParams), + G: FnMut(TimerToken) -> Timer, + { let frequency = Duration::from_millis(500); let num_members = NonZeroUsize::new(2).unwrap(); - let config = { - let mut c = config(); - c.periodic_announce = Some(config::PeriodicParams { - frequency, - num_members, - }); - c + let params = config::PeriodicParams { + frequency, + num_members, }; + let mut config = config(); + + // A foca with the given periodic config + config_setter(&mut config, params); - // A foca instance that periodically announces let mut foca = Foca::new(ID::new(1), config, rng(), codec()); let mut runtime = InMemoryRuntime::new(); @@ -3386,32 +3412,46 @@ mod tests { assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(2)), &mut runtime)); assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(3)), &mut runtime)); - // Should schedule an event for announcing - expect_scheduling!( - runtime, - Timer::::PeriodicAnnounce(foca.timer_token()), - frequency - ); + // Should schedule the given event + expect_scheduling!(runtime, event_maker(foca.timer_token()), frequency); runtime.clear(); // After the event fires assert_eq!( Ok(()), - foca.handle_timer(Timer::PeriodicAnnounce(foca.timer_token()), &mut runtime) + foca.handle_timer(event_maker(foca.timer_token()), &mut runtime) ); // It should've scheduled the event again - expect_scheduling!( - runtime, - Timer::::PeriodicAnnounce(foca.timer_token()), - frequency - ); + expect_scheduling!(runtime, event_maker(foca.timer_token()), frequency); - // And sent an announce message to `num_members` random members + // And sent the message to `num_members` random members // (since num_members=2 and this instance only knows about two, we know // which should've been picked) - expect_message!(runtime, ID::new(2), Message::::Announce); - expect_message!(runtime, ID::new(3), Message::::Announce); + expect_message!(runtime, ID::new(2), message); + expect_message!(runtime, ID::new(3), message); + } + + #[test] + fn periodic_gossip_behaviour() { + check_periodic_behaviour( + |c: &mut Config, p: config::PeriodicParams| { + c.periodic_gossip = Some(p); + }, + |t: TimerToken| -> Timer { Timer::PeriodicGossip(t) }, + Message::Gossip, + ); + } + + #[test] + fn periodic_announce_behaviour() { + check_periodic_behaviour( + |c: &mut Config, p: config::PeriodicParams| { + c.periodic_announce = Some(p); + }, + |t: TimerToken| -> Timer { Timer::PeriodicAnnounce(t) }, + Message::Announce, + ); } #[test] @@ -3436,4 +3476,27 @@ mod tests { // Is able to turn it off assert_eq!(Ok(()), foca.set_config(config())); } + + #[test] + fn periodic_gossip_cannot_be_enabled_at_runtime() { + let mut c = config(); + assert!(c.periodic_gossip.is_none()); + + // A foca instance that's running without periodic gossip + let mut foca = Foca::new(ID::new(1), c.clone(), rng(), codec()); + + c.periodic_gossip = Some(config::PeriodicParams { + frequency: Duration::from_secs(5), + num_members: NonZeroUsize::new(1).unwrap(), + }); + + // Must not be able to enable it during runtime + assert_eq!(Err(Error::InvalidConfig), foca.set_config(c.clone())); + + // However, a foca that starts with periodic gossip enabled + let mut foca = Foca::new(ID::new(1), c, rng(), codec()); + + // Is able to turn it off + assert_eq!(Ok(()), foca.set_config(config())); + } } diff --git a/src/runtime.rs b/src/runtime.rs index ccd5bbc..e767999 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -133,6 +133,10 @@ pub enum Timer { /// Sends a [`crate::Message::Announce`] to randomly chosen members as /// specified by [`crate::Config::periodic_announce`] PeriodicAnnounce(TimerToken), + + /// Sends a [`crate::Message::Gossip`] to randomly chosen members as + /// specified by [`crate::Config::periodic_gossip`] + PeriodicGossip(TimerToken), } /// TimerToken is simply a bookkeeping mechanism to try and prevent From 332bbf42fc3db5c9b50dde2ec81051fee262e640 Mon Sep 17 00:00:00 2001 From: Caio Date: Sun, 13 Nov 2022 10:35:45 +0100 Subject: [PATCH 6/6] Periodic gossip: only send when there are updates Technically, a gossip with no updates is still useful: it tells the receiver that the sender is active so it helps with discovery and, now, with the member gaining awareness that the cluster thinks its dead. It is, however, too chatty. During normal operations the updates buffer is mostly empty; If we start sending messages with no updates too often it won't be much different from a traditional gossip protocol with a heartbeat. So, since pings and announces already cover the case of talking to the cluster unconditionally, it makes sense to make periodic gossiping "smarter" --- src/broadcast.rs | 11 ++++------- src/lib.rs | 10 +++++++++- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/broadcast.rs b/src/broadcast.rs index effd092..1a4f04f 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -207,6 +207,10 @@ where .windows(2) .all(|w| w[0].remaining_tx <= w[1].remaining_tx) } + + pub fn is_empty(&self) -> bool { + self.storage.is_empty() + } } #[derive(Debug, Clone)] @@ -242,13 +246,6 @@ impl> PartialOrd for Entry { } } -#[cfg(test)] -impl Broadcasts { - pub fn is_empty(&self) -> bool { - self.storage.is_empty() - } -} - #[cfg(test)] mod tests { diff --git a/src/lib.rs b/src/lib.rs index e0fc3a3..c9170c4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -644,7 +644,15 @@ where Timer::PeriodicGossip(self.timer_token), params.frequency, ); - self.choose_and_send(params.num_members.get(), Message::Gossip, runtime)?; + + // Only actually gossip if there are updates to send + if !self.updates.is_empty() || !self.custom_broadcasts.is_empty() { + self.choose_and_send( + params.num_members.get(), + Message::Gossip, + runtime, + )?; + } } } Ok(())