diff --git a/src/broadcast.rs b/src/broadcast.rs index effd092..1a4f04f 100644 --- a/src/broadcast.rs +++ b/src/broadcast.rs @@ -207,6 +207,10 @@ where .windows(2) .all(|w| w[0].remaining_tx <= w[1].remaining_tx) } + + pub fn is_empty(&self) -> bool { + self.storage.is_empty() + } } #[derive(Debug, Clone)] @@ -242,13 +246,6 @@ impl> PartialOrd for Entry { } } -#[cfg(test)] -impl Broadcasts { - pub fn is_empty(&self) -> bool { - self.storage.is_empty() - } -} - #[cfg(test)] mod tests { diff --git a/src/config.rs b/src/config.rs index 68e514d..8c04fd7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -94,10 +94,80 @@ pub struct Config { /// to a value smaller than your network's MTU. 1400 is a good /// value for a in a non-ancient network. pub max_packet_size: NonZeroUsize, + + /// Wether foca should try to let members that are down know about it + /// + /// Whenever a member is declared down by the cluster, their messages + /// get ignored and there's no way for them to learn that this is happening. + /// With this setting enabled, will try to notify the down member that + /// their messages are being discarded. + /// + /// This feature is an extension to the SWIM protocol and should be left + /// disabled if you're aiming at pure SWIM behavior. + pub notify_down_members: bool, + + /// How often should foca ask its peers for more peers + /// + /// [`crate::Message::Announce`] is the mechanism foca uses to discover + /// members. After joining a sizeable cluster, it may take a while until + /// foca discovers every active member. Periodically announcing helps speed + /// up this process. + /// + /// This setting is helpful for any cluster size, but smaller ones can + /// get by without it if discovering the full active roster quickly is + /// not a requirement. + /// + /// As a rule of thumb, use large values for `frequency` (say, every + /// 30s, every minute, etc) and small values for `num_members`: just + /// one might be good enough for many clusters. + /// + /// Whilst you can change the parameters at runtime, foca prevents you from + /// changing it from `None` to `Some` to simplify reasoning. It's required + /// to recreate your foca instance in these cases. + /// + /// This feature is an extension to the SWIM protocol and should be left + /// disabled if you're aiming at pure SWIM behavior. + pub periodic_announce: Option, + + /// How often should foca send cluster updates to peers + /// + /// By default, SWIM disseminates cluster updates during the direct and + /// indirect probe cycle (See [`crate::Message`]). This setting instructs + /// foca to also propagate updates periodically. + /// + /// Periodically gossiping influences the speed in which the cluster learns + /// new information, but gossiping too much is often unnecessary since + /// cluster changes are not (normally) high-rate events. + /// + /// A LAN cluster can afford high frequency gossiping (every 200ms, for example) + /// without much worry; A WAN cluster might have better results gossiping less + /// often (500ms) but to more members at once. + /// + /// Whilst you can change the parameters at runtime, foca prevents you from + /// changing it from `None` to `Some` to simplify reasoning. It's required + /// to recreate your foca instance in these cases. + /// + /// This feature is an extension to the SWIM protocol and should be left + /// disabled if you're aiming at pure SWIM behavior. + pub periodic_gossip: Option, +} + +/// Configuration for a task that should happen periodically +#[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct PeriodicParams { + /// How often should the task be performed + pub frequency: Duration, + /// How many random members should be chosen + pub num_members: NonZeroUsize, } impl Config { /// A simple configuration that should work well in a LAN scenario. + /// + /// This is Foca in its simplest form and has no extensions enabled. + /// Use this config if you are trying to get a grasp of how SWIM + /// works, without any additional behavior. pub fn simple() -> Self { Self { probe_period: Duration::from_millis(1500), @@ -110,6 +180,11 @@ impl Config { remove_down_after: Duration::from_secs(15), max_packet_size: NonZeroUsize::new(1400).unwrap(), + + notify_down_members: false, + + periodic_announce: None, + periodic_gossip: None, } } } @@ -144,6 +219,17 @@ impl Config { remove_down_after: Duration::from_secs(15), max_packet_size: NonZeroUsize::new(1400).unwrap(), + + notify_down_members: true, + + periodic_announce: Some(PeriodicParams { + frequency: Duration::from_secs(30), + num_members: NonZeroUsize::new(1).unwrap(), + }), + periodic_gossip: Some(PeriodicParams { + frequency: Duration::from_millis(200), + num_members: NonZeroUsize::new(3).unwrap(), + }), } } @@ -169,6 +255,17 @@ impl Config { remove_down_after: Duration::from_secs(15), max_packet_size: NonZeroUsize::new(1400).unwrap(), + + notify_down_members: true, + + periodic_announce: Some(PeriodicParams { + frequency: Duration::from_secs(60), + num_members: NonZeroUsize::new(2).unwrap(), + }), + periodic_gossip: Some(PeriodicParams { + frequency: Duration::from_millis(500), + num_members: NonZeroUsize::new(4).unwrap(), + }), } } diff --git a/src/lib.rs b/src/lib.rs index 2529343..c9170c4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -351,17 +351,31 @@ where /// to attempt reducing the time it takes for information to /// propagate thoroughly. #[cfg_attr(feature = "tracing", tracing::instrument(skip(runtime)))] - pub fn gossip(&mut self, mut runtime: impl Runtime) -> Result<()> { + pub fn gossip(&mut self, runtime: impl Runtime) -> Result<()> { + self.choose_and_send( + self.config.num_indirect_probes.get(), + Message::Gossip, + runtime, + ) + } + + // Pick `num_members` random active members and send `msg` to them + fn choose_and_send( + &mut self, + num_members: usize, + msg: Message, + mut runtime: impl Runtime, + ) -> Result<()> { self.member_buf.clear(); self.members.choose_active_members( - self.config.num_indirect_probes.get(), + num_members, &mut self.member_buf, &mut self.rng, |_| true, ); while let Some(chosen) = self.member_buf.pop() { - self.send_message(chosen.into_identity(), Message::Gossip, &mut runtime)?; + self.send_message(chosen.into_identity(), msg.clone(), &mut runtime)?; } Ok(()) @@ -543,7 +557,7 @@ where token, } => { if self.timer_token == token { - let as_down = Member::new(member_id, incarnation, State::Down); + let as_down = Member::new(member_id.clone(), incarnation, State::Down); if let Some(summary) = self .members // Down is terminal, so before doing that we ensure the member @@ -556,7 +570,14 @@ where { self.handle_apply_summary(&summary, as_down, &mut runtime)?; // Member went down we might need to adjust our internal state - self.adjust_connection_state(runtime); + self.adjust_connection_state(&mut runtime); + + if self.config.notify_down_members { + // As a courtesy, we send a lightweight message to the member + // we're declaring down so that if it manages to receive it, + // it can react accordingly + self.send_message(member_id, Message::TurnUndead, runtime)?; + } } else { #[cfg(feature = "tracing")] tracing::debug!("Member not found"); @@ -595,6 +616,47 @@ where Ok(()) } } + Timer::PeriodicAnnounce(token) => { + if token == self.timer_token && self.connection_state == ConnectionState::Connected + { + // The configuration may change during runtime, so we can't + // assume that this is Some() when the timer fires + if let Some(ref params) = self.config.periodic_announce { + // Re-schedule the event + runtime.submit_after( + Timer::PeriodicAnnounce(self.timer_token), + params.frequency, + ); + // And send the messages + self.choose_and_send(params.num_members.get(), Message::Announce, runtime)?; + } + } + // else: invalid token and/or not-connected: may happen if the + // instance gets declared down by the cluster + Ok(()) + } + Timer::PeriodicGossip(token) => { + // Exact same thing as PeriodicAnnounce, just using different settings / messages + if token == self.timer_token && self.connection_state == ConnectionState::Connected + { + if let Some(ref params) = self.config.periodic_gossip { + runtime.submit_after( + Timer::PeriodicGossip(self.timer_token), + params.frequency, + ); + + // Only actually gossip if there are updates to send + if !self.updates.is_empty() || !self.custom_broadcasts.is_empty() { + self.choose_and_send( + params.num_members.get(), + Message::Gossip, + runtime, + )?; + } + } + } + Ok(()) + } } } @@ -631,6 +693,8 @@ where pub fn set_config(&mut self, config: Config) -> Result<()> { if self.config.probe_period != config.probe_period || self.config.probe_rtt != config.probe_rtt + || (self.config.periodic_announce.is_none() && config.periodic_announce.is_some()) + || (self.config.periodic_gossip.is_none() && config.periodic_gossip.is_some()) { Err(Error::InvalidConfig) } else { @@ -726,6 +790,10 @@ where #[cfg(feature = "tracing")] tracing::debug!("Discarded: Inactive sender"); + if self.config.notify_down_members { + self.send_message(src, Message::TurnUndead, runtime)?; + } + return Ok(()); } @@ -840,6 +908,9 @@ where } } Message::Announce => self.send_message(src, Message::Feed, runtime)?, + Message::TurnUndead => { + self.handle_self_update(Incarnation::default(), State::Down, runtime)? + } // Nothing to do. These messages do not expect any reply Message::Gossip | Message::Feed | Message::Broadcast => {} }; @@ -1065,6 +1136,14 @@ where self.config.probe_period, ); + if let Some(ref params) = self.config.periodic_announce { + runtime.submit_after(Timer::PeriodicAnnounce(self.timer_token), params.frequency); + } + + if let Some(ref params) = self.config.periodic_gossip { + runtime.submit_after(Timer::PeriodicGossip(self.timer_token), params.frequency); + } + runtime.notify(Notification::Active); } @@ -1105,8 +1184,8 @@ where .map_err(Error::Encode)?; let (needs_piggyback, only_active_members) = match header.message { - // Announce packets contain nothing but the header - Message::Announce => (false, false), + // Announce/TurnUndead packets contain nothing but the header + Message::Announce | Message::TurnUndead => (false, false), // Feed packets stuff active members at the tail Message::Feed => (true, true), // Broadcast packets stuffs only custom broadcasts @@ -1152,6 +1231,7 @@ where // Seek back and write the correct number of items added buf.get_mut()[tally_position..].as_mut().put_u16(num_items); #[cfg(feature = "tracing")] + #[allow(clippy::needless_borrow)] span.record("num_updates", &num_items); } @@ -1168,11 +1248,13 @@ where #[cfg_attr(not(feature = "tracing"), allow(unused_variables))] let num_broadcasts = self.custom_broadcasts.fill(&mut buf, usize::MAX); #[cfg(feature = "tracing")] + #[allow(clippy::needless_borrow)] span.record("num_broadcasts", &num_broadcasts); } let data = buf.into_inner(); #[cfg(feature = "tracing")] + #[allow(clippy::needless_borrow)] span.record("len", &data.len()); #[cfg(feature = "tracing")] @@ -1595,6 +1677,20 @@ mod tests { }; } + macro_rules! expect_message { + ($runtime: expr, $member: expr, $message: expr) => { + let d = $runtime + .take_data($member) + .unwrap_or_else(|| panic!("Message to member {:?} not found", $member)); + let (header, _) = decode(d); + assert_eq!( + header.message, $message, + "Message to member {:?} is {:?}. Expected {:?}", + $member, header.message, $message + ); + }; + } + #[test] fn can_join_with_another_client() { let mut foca_one = Foca::new(ID::new(1), config(), rng(), codec()); @@ -2334,12 +2430,13 @@ mod tests { // to continue the probe cycle fn craft_probing_foca( num_members: u8, + config: Config, ) -> ( Foca, ID, Timer, ) { - let mut foca = Foca::new(ID::new(1), config(), rng(), codec()); + let mut foca = Foca::new(ID::new(1), config.clone(), rng(), codec()); let mut runtime = InMemoryRuntime::new(); assert!(num_members > 0); @@ -2352,7 +2449,7 @@ mod tests { // The runtime shoud've been instructed to schedule a // probe for later on let expected_timer = Timer::ProbeRandomMember(foca.timer_token()); - expect_scheduling!(runtime, expected_timer.clone(), config().probe_period); + expect_scheduling!(runtime, expected_timer.clone(), config.probe_period); // We'll trigger it right now instead runtime.clear(); @@ -2376,7 +2473,7 @@ mod tests { probed_id: probed, token: foca.timer_token(), }; - expect_scheduling!(runtime, send_indirect_probe.clone(), config().probe_rtt); + expect_scheduling!(runtime, send_indirect_probe.clone(), config.probe_rtt); (foca, probed, send_indirect_probe) } @@ -2388,7 +2485,7 @@ mod tests { // are no more active members in the cluster (thus going Idle) // A foca is probing - let (mut foca, _probed, _send_indirect_probe) = craft_probing_foca(2); + let (mut foca, _probed, _send_indirect_probe) = craft_probing_foca(2, config()); let mut runtime = InMemoryRuntime::new(); // Clippy gets it wrong here: can't use just the plain iterator @@ -2411,7 +2508,7 @@ mod tests { #[test] fn probe_ping_ack_cycle() { - let (mut foca, probed, send_indirect_probe) = craft_probing_foca(5); + let (mut foca, probed, send_indirect_probe) = craft_probing_foca(5, config()); let mut runtime = InMemoryRuntime::new(); // Now if probed replies before the timer fires, the probe @@ -2438,7 +2535,7 @@ mod tests { #[test] fn probe_cycle_requires_correct_probe_number() { - let (mut foca, probed, send_indirect_probe) = craft_probing_foca(5); + let (mut foca, probed, send_indirect_probe) = craft_probing_foca(5, config()); let mut runtime = InMemoryRuntime::new(); let incorrect_probe_number = foca.probe().probe_number() + 1; @@ -2475,7 +2572,7 @@ mod tests { // `num_indirect_probes + 1` so that we can verify that // we don't send more requests than the configured value. let (mut foca, probed, send_indirect_probe) = - craft_probing_foca((num_indirect_probes + 2) as u8); + craft_probing_foca((num_indirect_probes + 2) as u8, config()); let mut runtime = InMemoryRuntime::new(); // `probed` did NOT reply with an Ack before the timer @@ -3206,7 +3303,7 @@ mod tests { fn can_recover_from_incomplete_probe_cycle() { // Here we get a foca in the middle of a probe cycle. The correct // sequencing should submit `_send_indirect_probe` - let (mut foca, _probed, _send_indirect_probe) = craft_probing_foca(2); + let (mut foca, _probed, _send_indirect_probe) = craft_probing_foca(2, config()); let mut runtime = InMemoryRuntime::new(); // ... but we'll manually craft a ProbeRandomMember event instead // to trigger the validation failure @@ -3226,4 +3323,188 @@ mod tests { "didn't submit a new probe event" ); } + + #[test] + fn declaring_a_member_as_down_notifies_them() { + let config = { + let mut c = Config::simple(); + c.notify_down_members = true; + c + }; + + let (mut foca, probed, send_indirect_probe) = craft_probing_foca(2, config); + let mut runtime = InMemoryRuntime::new(); + + // `probed` did NOT reply with an Ack before the timer + assert_eq!(Ok(()), foca.handle_timer(send_indirect_probe, &mut runtime)); + // ... and nothing happens for the indirect cycle + + runtime.clear(); + // So by the time the ChangeSuspectToDown timer fires + assert_eq!( + Ok(()), + foca.handle_timer( + Timer::ChangeSuspectToDown { + member_id: probed, + incarnation: Incarnation::default(), + token: foca.timer_token() + }, + &mut runtime + ) + ); + + // The runtime should be instructed to send a TurnUndead message to `probed` + expect_message!(runtime, probed, Message::::TurnUndead); + } + + #[test] + fn message_from_down_member_is_replied_with_turn_undead() { + let config = { + let mut c = config(); + c.notify_down_members = true; + c + }; + let mut runtime = InMemoryRuntime::new(); + + // We have a simple foca instance + let mut foca = Foca::new(ID::new(1), config, rng(), codec()); + let down_id = ID::new(2); + // That knows that ID=2 is down + assert_eq!(Ok(()), foca.apply(Member::down(down_id), &mut runtime)); + + // And we have a message from member ID=2 to ID=1 + let header = Header { + src: down_id, + src_incarnation: 1, + dst: ID::new(1), + message: Message::Announce, + }; + + let mut msg = Vec::new(); + codec() + .encode_header(&header, &mut msg) + .expect("codec works fine"); + + // When foca receives such message + assert_eq!(Ok(()), foca.handle_data(&msg[..], &mut runtime)); + + // It should send a message to ID=2 notifying it + expect_message!(runtime, down_id, Message::::TurnUndead); + } + + // There are multiple "do this thing periodically" settings. This + // helps test those. Takes: + // - something that knows which configuration to set + // - something that knows which event should be sent + // - the message that should be sent + fn check_periodic_behaviour(config_setter: F, mut event_maker: G, message: Message) + where + F: Fn(&mut Config, config::PeriodicParams), + G: FnMut(TimerToken) -> Timer, + { + let frequency = Duration::from_millis(500); + let num_members = NonZeroUsize::new(2).unwrap(); + let params = config::PeriodicParams { + frequency, + num_members, + }; + let mut config = config(); + + // A foca with the given periodic config + config_setter(&mut config, params); + + let mut foca = Foca::new(ID::new(1), config, rng(), codec()); + let mut runtime = InMemoryRuntime::new(); + + // When it becomes active (i.e.: has at least one active member) + assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(2)), &mut runtime)); + assert_eq!(Ok(()), foca.apply(Member::alive(ID::new(3)), &mut runtime)); + + // Should schedule the given event + expect_scheduling!(runtime, event_maker(foca.timer_token()), frequency); + + runtime.clear(); + // After the event fires + assert_eq!( + Ok(()), + foca.handle_timer(event_maker(foca.timer_token()), &mut runtime) + ); + + // It should've scheduled the event again + expect_scheduling!(runtime, event_maker(foca.timer_token()), frequency); + + // And sent the message to `num_members` random members + // (since num_members=2 and this instance only knows about two, we know + // which should've been picked) + expect_message!(runtime, ID::new(2), message); + expect_message!(runtime, ID::new(3), message); + } + + #[test] + fn periodic_gossip_behaviour() { + check_periodic_behaviour( + |c: &mut Config, p: config::PeriodicParams| { + c.periodic_gossip = Some(p); + }, + |t: TimerToken| -> Timer { Timer::PeriodicGossip(t) }, + Message::Gossip, + ); + } + + #[test] + fn periodic_announce_behaviour() { + check_periodic_behaviour( + |c: &mut Config, p: config::PeriodicParams| { + c.periodic_announce = Some(p); + }, + |t: TimerToken| -> Timer { Timer::PeriodicAnnounce(t) }, + Message::Announce, + ); + } + + #[test] + fn periodic_announce_cannot_be_enabled_at_runtime() { + let mut c = config(); + assert!(c.periodic_announce.is_none()); + + // A foca instance that's running without periodic announce + let mut foca = Foca::new(ID::new(1), c.clone(), rng(), codec()); + + c.periodic_announce = Some(config::PeriodicParams { + frequency: Duration::from_secs(5), + num_members: NonZeroUsize::new(1).unwrap(), + }); + + // Must not be able to enable it during runtime + assert_eq!(Err(Error::InvalidConfig), foca.set_config(c.clone())); + + // However, a foca that starts with periodic announce enabled + let mut foca = Foca::new(ID::new(1), c, rng(), codec()); + + // Is able to turn it off + assert_eq!(Ok(()), foca.set_config(config())); + } + + #[test] + fn periodic_gossip_cannot_be_enabled_at_runtime() { + let mut c = config(); + assert!(c.periodic_gossip.is_none()); + + // A foca instance that's running without periodic gossip + let mut foca = Foca::new(ID::new(1), c.clone(), rng(), codec()); + + c.periodic_gossip = Some(config::PeriodicParams { + frequency: Duration::from_secs(5), + num_members: NonZeroUsize::new(1).unwrap(), + }); + + // Must not be able to enable it during runtime + assert_eq!(Err(Error::InvalidConfig), foca.set_config(c.clone())); + + // However, a foca that starts with periodic gossip enabled + let mut foca = Foca::new(ID::new(1), c, rng(), codec()); + + // Is able to turn it off + assert_eq!(Ok(()), foca.set_config(config())); + } } diff --git a/src/payload.rs b/src/payload.rs index a4f4539..afa87ab 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -134,6 +134,12 @@ pub enum Message { /// Deliberate dissemination of custom broadcasts. Broadcast /// messages do not contain cluster updates. Broadcast, + + /// Indicates that the receiver is considered down by the sender + /// + /// This is an optional message that Foca sends whenever a member + /// that's considered down sends a message. + TurnUndead, } /// ProbeNumber is simply a bookkeeping mechanism to try and prevent diff --git a/src/runtime.rs b/src/runtime.rs index e4e6c49..e767999 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -129,6 +129,14 @@ pub enum Timer { /// Forgets about dead member `T`, allowing them to join the /// cluster again with the same identity. RemoveDown(T), + + /// Sends a [`crate::Message::Announce`] to randomly chosen members as + /// specified by [`crate::Config::periodic_announce`] + PeriodicAnnounce(TimerToken), + + /// Sends a [`crate::Message::Gossip`] to randomly chosen members as + /// specified by [`crate::Config::periodic_gossip`] + PeriodicGossip(TimerToken), } /// TimerToken is simply a bookkeeping mechanism to try and prevent diff --git a/src/testing.rs b/src/testing.rs index 092cbba..ae0e501 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -185,6 +185,7 @@ impl BadCodec { 8 => Message::Announce, 9 => Message::Feed, 10 => Message::Broadcast, + 11 => Message::TurnUndead, other => return Err(BadCodecError::BadMessageID(other)), }; @@ -252,6 +253,9 @@ impl BadCodec { Message::Broadcast => { buf.put_u8(10); } + Message::TurnUndead => { + buf.put_u8(11); + } } Ok(())