diff --git a/.expeditor/verify.pipeline.yml b/.expeditor/verify.pipeline.yml index 838c99e34d..41fcb20ed2 100644 --- a/.expeditor/verify.pipeline.yml +++ b/.expeditor/verify.pipeline.yml @@ -657,7 +657,7 @@ steps: shell: [ "powershell", "-Command" ] always-pull: true propagate-environment: true - timeout_in_minutes: 5 + timeout_in_minutes: 20 retry: automatic: limit: 1 diff --git a/components/butterfly/src/main.rs b/components/butterfly/src/main.rs index cec4d128f5..517626b29f 100644 --- a/components/butterfly/src/main.rs +++ b/components/butterfly/src/main.rs @@ -52,10 +52,10 @@ fn main() { member.address = format!("{}", addr.ip()); member.swim_port = addr.port(); member.gossip_port = addr.port(); - server.member_list.add_initial_member(member); + server.member_list.add_initial_member_imlw(member); } - server.start(server::timing::Timing::default()) + server.start_mlr(&server::timing::Timing::default()) .expect("Cannot start server"); loop { println!("{:#?}", server.member_list); diff --git a/components/butterfly/src/member.rs b/components/butterfly/src/member.rs index fb1a04a2bb..a91303d9a2 100644 --- a/components/butterfly/src/member.rs +++ b/components/butterfly/src/member.rs @@ -330,6 +330,9 @@ pub struct MemberList { } impl Serialize for MemberList { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. fn serialize(&self, serializer: S) -> result::Result where S: Serializer { @@ -364,10 +367,16 @@ impl MemberList { update_counter: AtomicUsize::new(0), } } + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. fn read_entries(&self) -> ReadGuard<'_, HashMap> { self.entries.read() } + /// # Locking + /// * `MemberList::entries` (write) This method must not be called while any MemberList::entries + /// lock is held. fn write_entries(&self) -> WriteGuard<'_, HashMap> { self.entries.write() } @@ -382,15 +391,29 @@ impl MemberList { pub fn get_update_counter(&self) -> usize { self.update_counter.load(Ordering::Relaxed) } - pub fn len_initial_members(&self) -> usize { self.initial_members_read().len() } + /// # Locking + /// * `MemberList::intitial_entries` (read) This method must not be called while any + /// MemberList::intitial_entries lock is held. + pub fn len_initial_members_imlr(&self) -> usize { self.initial_members_read().len() } - pub fn add_initial_member(&self, member: Member) { self.initial_members_write().push(member); } + /// # Locking + /// * `MemberList::intitial_entries` (write) This method must not be called while any + /// MemberList::intitial_entries lock is held. + pub fn add_initial_member_imlw(&self, member: Member) { + self.initial_members_write().push(member); + } - pub fn set_initial_members(&self, members: Vec) { + /// # Locking + /// * `MemberList::intitial_entries` (write) This method must not be called while any + /// MemberList::intitial_entries lock is held. + pub fn set_initial_members_imlw(&self, members: Vec) { *self.initial_members_write() = members; } - pub fn with_initial_members(&self, mut with_closure: impl FnMut(&Member)) { + /// # Locking + /// * `MemberList::intitial_entries` (read) This method must not be called while any + /// MemberList::intitial_entries lock is held. + pub fn with_initial_members_imlr(&self, with_closure: impl Fn(&Member)) { for member in self.initial_members_read().iter() { with_closure(member); } @@ -459,13 +482,20 @@ impl MemberList { /// | Suspect | | | propagate | propagate | /// | Confirmed | | | | propagate | /// | Departed | | | | | + /// + /// # Locking + /// * `MemberList::entries` (write) This method must not be called while any MemberList::entries + /// lock is held. // TODO (CM): why don't we just insert a membership record here? - pub fn insert(&self, incoming_member: Member, incoming_health: Health) -> bool { - self.insert_membership(Membership { member: incoming_member, - health: incoming_health, }) + pub fn insert_mlw(&self, incoming_member: Member, incoming_health: Health) -> bool { + self.insert_membership_mlw(Membership { member: incoming_member, + health: incoming_health, }) } - fn insert_membership(&self, incoming: Membership) -> bool { + /// # Locking + /// * `MemberList::entries` (write) This method must not be called while any MemberList::entries + /// lock is held. + fn insert_membership_mlw(&self, incoming: Membership) -> bool { // Is this clone necessary, or can a key be a reference to a field contained in the value? // Maybe the members we store should not contain the ID to reduce the duplication? let modified = match self.write_entries().entry(incoming.member.id.clone()) { @@ -490,13 +520,16 @@ impl MemberList { if modified { self.increment_update_counter(); - self.calculate_peer_health_metrics(); + self.calculate_peer_health_metrics_mlr(); } modified } - pub fn set_departed(&self, member_id: &str) { + /// # Locking + /// * `MemberList::entries` (write) This method must not be called while any MemberList::entries + /// lock is held. + pub fn set_departed_mlw(&self, member_id: &str) { if let Some(member_list::Entry { member, health, .. }) = self.write_entries().get_mut(member_id) { @@ -510,7 +543,10 @@ impl MemberList { } } - fn calculate_peer_health_metrics(&self) { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + fn calculate_peer_health_metrics_mlr(&self) { let mut health_counts: HashMap = HashMap::new(); for entry in self.read_entries().values() { @@ -528,10 +564,20 @@ impl MemberList { } /// Returns the health of the member, if the member exists. - pub fn health_of(&self, member: &Member) -> Option { self.health_of_by_id(&member.id) } + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn health_of_mlr(&self, member: &Member) -> Option { + self.health_of_by_id_mlr(&member.id) + } /// Returns the health of the member, if the member exists. - pub fn health_of_by_id(&self, member_id: &str) -> Option { + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn health_of_by_id_mlr(&self, member_id: &str) -> Option { self.read_entries() .get(member_id) .map(|member_list::Entry { health, .. }| *health) @@ -539,13 +585,17 @@ impl MemberList { /// Returns the health of the member, blocking for a limited timeout /// - /// Errors: + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + /// + /// # Errors: /// * `Error::Timeout` if the health data can't be accessed within `timeout` /// * `Error::UnknownMember` if the member does not exist - pub fn health_of_by_id_with_timeout(&self, - member_id: &str, - timeout: std::time::Duration) - -> Result { + pub fn health_of_by_id_with_timeout_mlr(&self, + member_id: &str, + timeout: std::time::Duration) + -> Result { let entries = self.entries.try_read_for(timeout); if entries.is_none() { @@ -561,11 +611,15 @@ impl MemberList { /// Returns true if the member is alive, suspect, or persistent; used during the target /// selection phase of the outbound thread. - pub fn pingable(&self, member: &Member) -> bool { + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn pingable_mlr(&self, member: &Member) -> bool { if member.persistent { return true; } - match self.health_of(member) { + match self.health_of_mlr(member) { Some(Health::Alive) | Some(Health::Suspect) => true, _ => false, } @@ -573,12 +627,20 @@ impl MemberList { /// Returns true if we are pinging this member because they are persistent, but we think they /// are gone. - pub fn persistent_and_confirmed(&self, member: &Member) -> bool { - member.persistent && self.health_of(member) == Some(Health::Confirmed) + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn persistent_and_confirmed_mlr(&self, member: &Member) -> bool { + member.persistent && self.health_of_mlr(member) == Some(Health::Confirmed) } /// Returns a protobuf membership record for the given member id. - pub fn membership_for(&self, member_id: &str) -> Option { + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn membership_for_mlr(&self, member_id: &str) -> Option { self.read_entries() .get(member_id) .map(|member_list::Entry { member, health, .. }| { @@ -588,12 +650,23 @@ impl MemberList { } /// Returns the number of entries. - pub fn len(&self) -> usize { self.read_entries().len() } + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn len_mlr(&self) -> usize { self.read_entries().len() } - pub fn is_empty(&self) -> bool { self.read_entries().is_empty() } + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn is_empty_mlr(&self) -> bool { self.read_entries().is_empty() } /// A randomized list of members to check. - pub fn check_list(&self, exclude_id: &str) -> Vec { + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn check_list_mlr(&self, exclude_id: &str) -> Vec { let mut members: Vec<_> = self.read_entries() .values() .map(|member_list::Entry { member, .. }| member) @@ -605,10 +678,15 @@ impl MemberList { } /// Takes a function whose first argument is a member, and calls it for every pingreq target. - pub fn with_pingreq_targets(&self, - sending_member_id: &str, - target_member_id: &str, - mut with_closure: impl FnMut(&Member)) { + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. Additionally `with_closure` is called with this lock held, so the closure + /// must not call any functions which take this lock. + pub fn with_pingreq_targets_mlr(&self, + sending_member_id: &str, + target_member_id: &str, + mut with_closure: impl FnMut(&Member)) { for member_list::Entry { member, .. } in self.read_entries() .values() @@ -625,29 +703,26 @@ impl MemberList { /// If an owned `Member` is required, use this. If a shared reference is /// good enough, use `with_member`. - pub fn get_cloned(&self, member_id: &str) -> Option { + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn get_cloned_mlr(&self, member_id: &str) -> Option { self.read_entries() .get(member_id) .map(|member_list::Entry { member, .. }| member.clone()) } - /// Calls a function whose argument is a reference to a membership entry matching the given ID. - pub fn with_member(&self, member_id: &str, mut with_closure: impl FnMut(Option<&Member>)) { - with_closure(self.read_entries() - .get(member_id) - .map(|member_list::Entry { member, .. }| member)); - } - - /// Iterates over the member list, calling the function for each member. - pub fn with_members(&self, mut with_closure: impl FnMut(&Member)) { - for member_list::Entry { member, .. } in self.read_entries().values() { - with_closure(member); - } - } - - pub fn with_memberships(&self, - mut with_closure: impl FnMut(Membership) -> Result) - -> Result { + /// Iterates over the memberships list, calling the function for each membership. + /// This could be return Result instead, but there's only the one caller now. + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. Additionally `with_closure` is called with this lock held, so the closure + /// must not call any functions which take this lock. + pub fn with_memberships_mlr(&self, + mut with_closure: impl FnMut(Membership) -> Result) + -> Result { let mut ok = Ok(T::default()); for membership in self.read_entries() .values() @@ -665,15 +740,23 @@ impl MemberList { /// have now expired to Confirmed. Health is updated /// appropriately, and a list of newly-Confirmed Member IDs is /// returned. - pub fn members_expired_to_confirmed(&self, timeout: Duration) -> Vec { - self.members_expired_to(Health::Confirmed, timeout) + /// + /// # Locking + /// * `MemberList::entries` (write) This method must not be called while any MemberList::entries + /// lock is held. + pub fn members_expired_to_confirmed_mlw(&self, timeout: Duration) -> Vec { + self.members_expired_to_mlw(Health::Confirmed, timeout) } /// Query the list of aging Confirmed members to find those which /// have now expired to Departed. Health is updated appropriately, /// and a list of newly-Departed Member IDs is returned. - pub fn members_expired_to_departed(&self, timeout: Duration) -> Vec { - self.members_expired_to(Health::Departed, timeout) + /// + /// # Locking + /// * `MemberList::entries` (write) This method must not be called while any MemberList::entries + /// lock is held. + pub fn members_expired_to_departed_mlw(&self, timeout: Duration) -> Vec { + self.members_expired_to_mlw(Health::Departed, timeout) } /// Return the member IDs of all members that have "timed out" to @@ -687,8 +770,12 @@ impl MemberList { /// `Confirmed` for longer than the given `timeout`. /// /// The newly-updated health status is recorded properly. + /// + /// # Locking + /// * `MemberList::entries` (write) This method must not be called while any MemberList::entries + /// lock is held. // TODO (CM): Better return type than Vec - fn members_expired_to(&self, expiring_to: Health, timeout: Duration) -> Vec { + fn members_expired_to_mlw(&self, expiring_to: Health, timeout: Duration) -> Vec { let now = SteadyTime::now(); let precursor_health = match expiring_to { Health::Confirmed => Health::Suspect, @@ -720,7 +807,10 @@ impl MemberList { expired } - pub fn contains_member(&self, member_id: &str) -> bool { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn contains_member_mlr(&self, member_id: &str) -> bool { self.read_entries().contains_key(member_id) } } @@ -733,6 +823,9 @@ impl<'a> MemberListProxy<'a> { } impl<'a> Serialize for MemberListProxy<'a> { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. fn serialize(&self, serializer: S) -> result::Result where S: Serializer { @@ -779,6 +872,9 @@ mod tests { // This is a remnant of when the MemberList::members entries were // simple Member structs. The tests that use this should be replaced, // but until then, this keeps them working. + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any + /// MemberList::entries lock is held. fn with_member_iter(&self, mut with_closure: impl FnMut(hash_map::Values<'_, String, Member>) -> T) @@ -837,7 +933,7 @@ mod tests { let ml = MemberList::new(); for _x in 0..size { let m = Member::default(); - ml.insert(m, Health::Alive); + ml.insert_mlw(m, Health::Alive); } ml } @@ -845,33 +941,43 @@ mod tests { #[test] fn new() { let ml = MemberList::new(); - assert!(ml.is_empty()); + assert!(ml.is_empty_mlr()); } #[test] fn insert_several_members() { let ml = populated_member_list(4); - assert_eq!(ml.len(), 4); + assert_eq!(ml.len_mlr(), 4); } #[test] fn check_list() { let ml = populated_member_list(1000); - let list_a = ml.check_list("foo"); - let list_b = ml.check_list("foo"); + let list_a = ml.check_list_mlr("foo"); + let list_b = ml.check_list_mlr("foo"); assert!(list_a != list_b); } #[test] fn health_of() { let ml = populated_member_list(1); - ml.with_memberships(|Membership { health, .. }| { + ml.with_memberships_mlr(|Membership { health, .. }| { assert_eq!(health, Health::Alive); Ok(()) }) .ok(); } + #[test] + fn health_of_with_memberships() { + let ml = populated_member_list(1); + ml.with_memberships_mlr(|Membership { health, .. }| { + assert_eq!(health, Health::Alive); + Ok(0) + }) + .ok(); + } + #[test] fn pingreq_targets() { let ml = populated_member_list(10); @@ -879,7 +985,7 @@ mod tests { let from = i.nth(0).unwrap(); let target = i.nth(1).unwrap(); let mut counter: usize = 0; - ml.with_pingreq_targets(&from.id, &target.id, |_m| counter += 1); + ml.with_pingreq_targets_mlr(&from.id, &target.id, |_m| counter += 1); assert_eq!(counter, PINGREQ_TARGETS); }); } @@ -891,7 +997,7 @@ mod tests { let from = i.nth(0).unwrap(); let target = i.nth(1).unwrap(); let mut excluded_appears: bool = false; - ml.with_pingreq_targets(&from.id, &target.id, |m| { + ml.with_pingreq_targets_mlr(&from.id, &target.id, |m| { if m.id == from.id { excluded_appears = true } @@ -907,7 +1013,7 @@ mod tests { let from = i.nth(0).unwrap(); let target = i.nth(1).unwrap(); let mut excluded_appears: bool = false; - ml.with_pingreq_targets(&from.id, &target.id, |m| { + ml.with_pingreq_targets_mlr(&from.id, &target.id, |m| { if m.id == target.id { excluded_appears = true } @@ -923,7 +1029,7 @@ mod tests { let from = i.nth(0).unwrap(); let target = i.nth(1).unwrap(); let mut counter: isize = 0; - ml.with_pingreq_targets(&from.id, &target.id, |_m| counter += 1); + ml.with_pingreq_targets_mlr(&from.id, &target.id, |_m| counter += 1); assert_eq!(counter, 1); }); } @@ -933,8 +1039,8 @@ mod tests { let ml = MemberList::new(); let member = Member::default(); let mcheck = member.clone(); - assert_eq!(ml.insert(member, Health::Alive), true); - assert_eq!(ml.health_of(&mcheck), Some(Health::Alive)); + assert_eq!(ml.insert_mlw(member, Health::Alive), true); + assert_eq!(ml.health_of_mlr(&mcheck), Some(Health::Alive)); } /// Tests of MemberList::insert @@ -957,7 +1063,7 @@ mod tests { m }; - assert!(ml.insert(member.clone(), from_health), + assert!(ml.insert_mlw(member.clone(), from_health), "Could not insert member into list initially"); let update_counter_value_checkpoint_1 = ml.get_update_counter(); @@ -965,7 +1071,7 @@ mod tests { initial_update_counter_value + 1, "Update counter should have incremented by one"); - assert_eq!(ml.health_of(&member), + assert_eq!(ml.health_of_mlr(&member), Some(from_health), "Member should have had health {:?}, but didn't", from_health); @@ -976,7 +1082,7 @@ mod tests { m }; - assert!(!ml.insert(member_with_lower_incarnation, to_health), + assert!(!ml.insert_mlw(member_with_lower_incarnation, to_health), "Inserting with {:?}->{:?} should be a no-op with a lower incarnation \ number", from_health, @@ -985,7 +1091,7 @@ mod tests { update_counter_value_checkpoint_1, "Update counter should not have been incremented after trying to \ insert a lower-incarnation-number rumor"); - assert_eq!(ml.health_of(&member), + assert_eq!(ml.health_of_mlr(&member), Some(from_health), "Member should have still have health {:?} following attempt to \ insert lower-incarnation-number rumor, but didn't", @@ -1036,7 +1142,7 @@ mod tests { m }; - assert!(ml.insert(member.clone(), from_health), + assert!(ml.insert_mlw(member.clone(), from_health), "Could not insert member into list initially"); let update_counter_value_checkpoint_1 = ml.get_update_counter(); @@ -1044,7 +1150,7 @@ mod tests { initial_update_counter_value + 1, "Update counter should have incremented by one"); - assert_eq!(ml.health_of(&member), + assert_eq!(ml.health_of_mlr(&member), Some(from_health), "Member should have had health {:?}, but didn't", from_health); @@ -1055,7 +1161,7 @@ mod tests { m }; - assert!(ml.insert(member_with_higher_incarnation, to_health), + assert!(ml.insert_mlw(member_with_higher_incarnation, to_health), "Inserting with {:?}->{:?} should be always work with a higher \ incarnation number", from_health, @@ -1064,7 +1170,7 @@ mod tests { update_counter_value_checkpoint_1 + 1, "Update counter should increment by 1 when inserting a \ higher-incarnation-number rumor"); - assert_eq!(ml.health_of(&member), + assert_eq!(ml.health_of_mlr(&member), Some(to_health), "Member should have health {:?} following insertion of \ higher-incarnation-number rumor", @@ -1115,7 +1221,7 @@ mod tests { m }; - assert!(ml.insert(member.clone(), from_health), + assert!(ml.insert_mlw(member.clone(), from_health), "Could not insert member into list initially"); let update_counter_value_checkpoint_1 = ml.get_update_counter(); @@ -1123,7 +1229,7 @@ mod tests { initial_update_counter_value + 1, "Update counter should have incremented by one"); - assert_eq!(ml.health_of(&member), + assert_eq!(ml.health_of_mlr(&member), Some(from_health), "Member should have had health {:?}, but didn't", from_health); @@ -1131,7 +1237,7 @@ mod tests { let member_with_same_incarnation = member.clone(); if to_health > from_health { - assert!(ml.insert(member_with_same_incarnation, to_health), + assert!(ml.insert_mlw(member_with_same_incarnation, to_health), "Inserting with {:?}->{:?} should work with an identical incarnation \ number", from_health, @@ -1140,13 +1246,13 @@ mod tests { update_counter_value_checkpoint_1 + 1, "Update counter should increment by 1 when inserting a \ same-incarnation-number rumor with worse health"); - assert_eq!(ml.health_of(&member), + assert_eq!(ml.health_of_mlr(&member), Some(to_health), "Member should have health {:?} following insertion of \ same-incarnation-number rumor with worse health", to_health); } else { - assert!(!ml.insert(member_with_same_incarnation, to_health), + assert!(!ml.insert_mlw(member_with_same_incarnation, to_health), "Inserting with {from:?}->{to:?} should never work with an identical \ incarnation number, because {to:?} is not \"worse than\" {from:?}", from = from_health, @@ -1155,7 +1261,7 @@ mod tests { update_counter_value_checkpoint_1, "Update counter should not increment when inserting a \ same-incarnation-number rumor without worse health"); - assert_eq!(ml.health_of(&member), + assert_eq!(ml.health_of_mlr(&member), Some(from_health), "Member should still have health {:?} following insertion of \ same-incarnation-number rumor without worse health", @@ -1202,10 +1308,10 @@ mod tests { let ml = MemberList::new(); let member_one = Member::default(); - assert!(ml.insert(member_one.clone(), from_health), + assert!(ml.insert_mlw(member_one.clone(), from_health), "Should be able to insert initial health of {:?} into empty MemberList", from_health); - assert_eq!(ml.health_of(&member_one) + assert_eq!(ml.health_of_mlr(&member_one) .expect("Expected member to exist in health after initial insert, \ but it didn't"), from_health, @@ -1215,7 +1321,7 @@ mod tests { let update_counter_before = ml.get_update_counter(); if from_health == to_health { - assert!(!ml.insert(member_one.clone(), to_health), + assert!(!ml.insert_mlw(member_one.clone(), to_health), "Transitioning from {:?} to {:?} (i.e., no change) should be a no-op", from_health, to_health); @@ -1225,14 +1331,14 @@ mod tests { increment update counter", from_health, to_health); - assert_eq!(ml.health_of(&member_one) + assert_eq!(ml.health_of_mlr(&member_one) .expect("Expected member to exist in health after update, but \ it didn't"), from_health, "Member should have still have initial health {:?}", from_health); } else if to_health > from_health { - assert!(ml.insert(member_one.clone(), to_health), + assert!(ml.insert_mlw(member_one.clone(), to_health), "Transitioning from {:?} to {:?} (i.e., worse health) should NOT be \ a no-op", from_health, @@ -1243,7 +1349,7 @@ mod tests { increment update counter by one", from_health, to_health); - assert_eq!(ml.health_of(&member_one) + assert_eq!(ml.health_of_mlr(&member_one) .expect("Expected member to exist in health after update, but \ it didn't"), to_health, @@ -1251,7 +1357,7 @@ mod tests { from_health, to_health); } else { - assert!(!ml.insert(member_one.clone(), to_health), + assert!(!ml.insert_mlw(member_one.clone(), to_health), "Transitioning from {:?} to {:?} (i.e., no worse health) should be a \ no-op", from_health, @@ -1262,7 +1368,7 @@ mod tests { not increment update counter", from_health, to_health); - assert_eq!(ml.health_of(&member_one) + assert_eq!(ml.health_of_mlr(&member_one) .expect("Expected member to exist in health after update, but \ it didn't"), from_health, @@ -1303,8 +1409,8 @@ mod tests { /// Testing of /// - /// - MemberList::members_expired_to_confirmed - /// - MemberList::members_expired_to_departed + /// - MemberList::members_expired_to_confirmed_mlw + /// - MemberList::members_expired_to_departed_mlw mod timed_expiration { use crate::member::{Health, Member, @@ -1326,28 +1432,31 @@ mod tests { let large_timeout = Duration::from_std(StdDuration::from_secs(large_seconds)).unwrap(); - assert!(ml.members_expired_to_confirmed(small_timeout).is_empty(), + assert!(ml.members_expired_to_confirmed_mlw(small_timeout) + .is_empty(), "An empty MemberList shouldn't have anything that's timing out to being \ Confirmed"); - assert!(ml.insert(member_one.clone(), Health::Alive)); + assert!(ml.insert_mlw(member_one.clone(), Health::Alive)); - assert!(ml.members_expired_to_confirmed(small_timeout).is_empty(), + assert!(ml.members_expired_to_confirmed_mlw(small_timeout) + .is_empty(), "Should be no newly Confirmed members when they're all Alive"); - assert!(ml.insert(member_one.clone(), Health::Suspect)); + assert!(ml.insert_mlw(member_one.clone(), Health::Suspect)); - assert!(ml.members_expired_to_confirmed(large_timeout).is_empty(), + assert!(ml.members_expired_to_confirmed_mlw(large_timeout) + .is_empty(), "Nothing should have timed out to Confirmed with a large timeout"); // Allow the Suspect to age thread::sleep(StdDuration::from_secs(small_seconds)); - let newly_confirmed = ml.members_expired_to_confirmed(small_timeout); + let newly_confirmed = ml.members_expired_to_confirmed_mlw(small_timeout); assert!(newly_confirmed.contains(&member_one.id), "Member should be newly Confirmed after timing out"); - assert_eq!(ml.health_of(&member_one), + assert_eq!(ml.health_of_mlr(&member_one), Some(Health::Confirmed), "Member should have a health of Confirmed after timing out"); } @@ -1365,34 +1474,34 @@ mod tests { let large_timeout = Duration::from_std(StdDuration::from_secs(large_seconds)).unwrap(); - assert!(ml.members_expired_to_departed(small_timeout).is_empty(), + assert!(ml.members_expired_to_departed_mlw(small_timeout).is_empty(), "An empty MemberList shouldn't have anything that's timing out to being \ Departed"); - assert!(ml.insert(member_one.clone(), Health::Alive)); - assert!(ml.members_expired_to_departed(small_timeout).is_empty(), + assert!(ml.insert_mlw(member_one.clone(), Health::Alive)); + assert!(ml.members_expired_to_departed_mlw(small_timeout).is_empty(), "Should be no newly Departed members when they're all Alive"); - assert!(ml.insert(member_one.clone(), Health::Suspect)); - assert!(ml.members_expired_to_departed(small_timeout).is_empty(), + assert!(ml.insert_mlw(member_one.clone(), Health::Suspect)); + assert!(ml.members_expired_to_departed_mlw(small_timeout).is_empty(), "Should be no newly Departed members when they're all Confirmed"); - assert!(ml.insert(member_one.clone(), Health::Confirmed)); + assert!(ml.insert_mlw(member_one.clone(), Health::Confirmed)); - assert!(ml.members_expired_to_departed(small_timeout).is_empty(), + assert!(ml.members_expired_to_departed_mlw(small_timeout).is_empty(), "Should be no newly Departed members when they're all Confirmed"); - assert!(ml.members_expired_to_departed(large_timeout).is_empty(), + assert!(ml.members_expired_to_departed_mlw(large_timeout).is_empty(), "Nothing should have timed out to Departed with a large timeout"); // Allow the Confirmed to age thread::sleep(StdDuration::from_secs(small_seconds)); - let newly_departed = ml.members_expired_to_departed(small_timeout); + let newly_departed = ml.members_expired_to_departed_mlw(small_timeout); assert!(newly_departed.contains(&member_one.id), "Member should be newly Departed after timing out"); - assert_eq!(ml.health_of(&member_one), + assert_eq!(ml.health_of_mlr(&member_one), Some(Health::Departed), "Member should have a health of Departed after timing out"); } @@ -1404,15 +1513,15 @@ mod tests { let member_2 = Member::default(); let member_3 = Member::default(); - assert!(ml.insert(member_1.clone(), Health::Suspect)); + assert!(ml.insert_mlw(member_1.clone(), Health::Suspect)); thread::sleep(StdDuration::from_secs(1)); - assert!(ml.insert(member_2.clone(), Health::Suspect)); + assert!(ml.insert_mlw(member_2.clone(), Health::Suspect)); thread::sleep(StdDuration::from_secs(2)); // Give us a bit of padding - assert!(ml.insert(member_3.clone(), Health::Suspect)); + assert!(ml.insert_mlw(member_3.clone(), Health::Suspect)); let timeout = Duration::from_std(StdDuration::from_secs(2)).unwrap(); - let newly_confirmed = ml.members_expired_to_confirmed(timeout); + let newly_confirmed = ml.members_expired_to_confirmed_mlw(timeout); assert!(newly_confirmed.contains(&member_1.id), "Member 1 should be newly Confirmed after timing out"); assert!(newly_confirmed.contains(&member_2.id), @@ -1420,13 +1529,13 @@ mod tests { assert!(!newly_confirmed.contains(&member_3.id), "Member 3 should NOT be newly Confirmed, because it hasn't timed out yet"); - assert_eq!(ml.health_of(&member_1), + assert_eq!(ml.health_of_mlr(&member_1), Some(Health::Confirmed), "Member 1 should have a health of Confirmed after timing out"); - assert_eq!(ml.health_of(&member_2), + assert_eq!(ml.health_of_mlr(&member_2), Some(Health::Confirmed), "Member 2 should have a health of Confirmed after timing out"); - assert_eq!(ml.health_of(&member_3), + assert_eq!(ml.health_of_mlr(&member_3), Some(Health::Suspect), "Member 3 should still have a health of Suspect, because it hasn't \ timed out yet"); @@ -1439,15 +1548,15 @@ mod tests { let member_2 = Member::default(); let member_3 = Member::default(); - assert!(ml.insert(member_1.clone(), Health::Confirmed)); + assert!(ml.insert_mlw(member_1.clone(), Health::Confirmed)); thread::sleep(StdDuration::from_secs(1)); - assert!(ml.insert(member_2.clone(), Health::Confirmed)); + assert!(ml.insert_mlw(member_2.clone(), Health::Confirmed)); thread::sleep(StdDuration::from_secs(2)); // Give us a bit of padding - assert!(ml.insert(member_3.clone(), Health::Confirmed)); + assert!(ml.insert_mlw(member_3.clone(), Health::Confirmed)); let timeout = Duration::from_std(StdDuration::from_secs(2)).unwrap(); - let newly_departed = ml.members_expired_to_departed(timeout); + let newly_departed = ml.members_expired_to_departed_mlw(timeout); assert!(newly_departed.contains(&member_1.id), "Member 1 should be newly Departed after timing out"); assert!(newly_departed.contains(&member_2.id), @@ -1455,13 +1564,13 @@ mod tests { assert!(!newly_departed.contains(&member_3.id), "Member 3 should NOT be newly Departed, because it hasn't timed out yet"); - assert_eq!(ml.health_of(&member_1), + assert_eq!(ml.health_of_mlr(&member_1), Some(Health::Departed), "Member 1 should have a health of Departed after timing out"); - assert_eq!(ml.health_of(&member_2), + assert_eq!(ml.health_of_mlr(&member_2), Some(Health::Departed), "Member 2 should have a health of Departed after timing out"); - assert_eq!(ml.health_of(&member_3), + assert_eq!(ml.health_of_mlr(&member_3), Some(Health::Confirmed), "Member 3 should still have a health of Confirmed, because it hasn't \ timed out yet"); diff --git a/components/butterfly/src/rumor/dat_file.rs b/components/butterfly/src/rumor/dat_file.rs index 6d18964796..b3ac8fbb39 100644 --- a/components/butterfly/src/rumor/dat_file.rs +++ b/components/butterfly/src/rumor/dat_file.rs @@ -57,7 +57,10 @@ impl DatFile { pub fn path(&self) -> &Path { &self.path } - pub fn read_into(&mut self, server: &Server) -> Result<()> { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn read_into_mlr(&mut self, server: &Server) -> Result<()> { let mut version = [0; 1]; let mut size_buf = [0; 8]; // JW: Resizing this buffer is terrible for performance, but it's the easiest way to @@ -164,7 +167,7 @@ impl DatFile { reader.read_exact(&mut rumor_buf) .map_err(|err| Error::DatFileIO(self.path.clone(), err))?; let rumor = Election::from_bytes(&rumor_buf)?; - server.insert_election(rumor); + server.insert_election_mlr(rumor); bytes_read += size_buf.len() as u64 + rumor_size; } @@ -182,7 +185,7 @@ impl DatFile { reader.read_exact(&mut rumor_buf) .map_err(|err| Error::DatFileIO(self.path.clone(), err))?; let rumor = ElectionUpdate::from_bytes(&rumor_buf)?; - server.insert_update_election(rumor); + server.insert_update_election_mlr(rumor); bytes_read += size_buf.len() as u64 + rumor_size; } @@ -209,6 +212,9 @@ impl DatFile { Ok(()) } + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. pub fn write(&self, server: &Server) -> Result { let mut header = Header::default(); let w = @@ -216,7 +222,7 @@ impl DatFile { w.with_writer(|mut f| { let mut writer = BufWriter::new(&mut f); self.init(&mut writer)?; - header.member_len = self.write_member_list(&mut writer, &server.member_list)?; + header.member_len = self.write_member_list_mlr(&mut writer, &server.member_list)?; header.service_len = self.write_rumor_store(&mut writer, &server.service_store)?; header.service_config_len = self.write_rumor_store(&mut writer, &server.service_config_store)?; @@ -281,11 +287,15 @@ impl DatFile { Ok(total) } - fn write_member_list(&self, writer: &mut W, member_list: &MemberList) -> Result - where W: Write - { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + fn write_member_list_mlr(&self, + writer: &mut impl Write, + member_list: &MemberList) + -> Result { let mut total = 0; - member_list.with_memberships(|membership| { + member_list.with_memberships_mlr(|membership| { total += self.write_member(writer, &membership)?; Ok(total) }) diff --git a/components/butterfly/src/server/expire.rs b/components/butterfly/src/server/expire.rs index daae8811e5..9dbc7360d4 100644 --- a/components/butterfly/src/server/expire.rs +++ b/components/butterfly/src/server/expire.rs @@ -12,42 +12,35 @@ use crate::{rumor::{RumorKey, const LOOP_DELAY_MS: u64 = 500; -pub struct Expire { - pub server: Server, - pub timing: Timing, +pub fn spawn_thread(name: String, server: Server, timing: Timing) -> std::io::Result<()> { + thread::Builder::new().name(name) + .spawn(move || run_loop(&server, &timing)) + .map(|_| ()) } -impl Expire { - pub fn new(server: Server, timing: Timing) -> Expire { Expire { server, timing } } - - pub fn run(&self) { - loop { - habitat_common::sync::mark_thread_alive(); - - let newly_confirmed_members = - self.server - .member_list - .members_expired_to_confirmed(self.timing.suspicion_timeout_duration()); - - for id in newly_confirmed_members { - self.server - .rumor_heat - .start_hot_rumor(RumorKey::new(RumorType::Member, &id, "")); - } - - let newly_departed_members = - self.server - .member_list - .members_expired_to_departed(self.timing.departure_timeout_duration()); - - for id in newly_departed_members { - self.server.rumor_heat.purge(&id); - self.server - .rumor_heat - .start_hot_rumor(RumorKey::new(RumorType::Member, &id, "")); - } - - thread::sleep(Duration::from_millis(LOOP_DELAY_MS)); +fn run_loop(server: &Server, timing: &Timing) -> ! { + loop { + habitat_common::sync::mark_thread_alive(); + + let newly_confirmed_members = + server.member_list + .members_expired_to_confirmed_mlw(timing.suspicion_timeout_duration()); + + for id in newly_confirmed_members { + server.rumor_heat + .start_hot_rumor(RumorKey::new(RumorType::Member, &id, "")); + } + + let newly_departed_members = + server.member_list + .members_expired_to_departed_mlw(timing.departure_timeout_duration()); + + for id in newly_departed_members { + server.rumor_heat.purge(&id); + server.rumor_heat + .start_hot_rumor(RumorKey::new(RumorType::Member, &id, "")); } + + thread::sleep(Duration::from_millis(LOOP_DELAY_MS)); } } diff --git a/components/butterfly/src/server/inbound.rs b/components/butterfly/src/server/inbound.rs index 8163998697..4ebd8ed9f1 100644 --- a/components/butterfly/src/server/inbound.rs +++ b/components/butterfly/src/server/inbound.rs @@ -33,198 +33,205 @@ lazy_static! { &["type", "mode"]).unwrap(); } -/// Takes the Server and a channel to send received Acks to the outbound thread. -pub struct Inbound { - pub server: Server, - pub socket: UdpSocket, - pub tx_outbound: AckSender, +pub fn spawn_thread(name: String, + server: Server, + socket: UdpSocket, + tx_outbound: AckSender) + -> std::io::Result<()> { + thread::Builder::new().name(name) + .spawn(move || run_loop(&server, &socket, &tx_outbound)) + .map(|_| ()) } -impl Inbound { - /// Create a new Inbound. - pub fn new(server: Server, socket: UdpSocket, tx_outbound: AckSender) -> Inbound { - Inbound { server, - socket, - tx_outbound } - } - - /// Run the thread. Listens for messages up to 1k in size, and then processes them accordingly. - pub fn run(&self) { - let mut recv_buffer: Vec = vec![0; 1024]; +/// Run the thread. Listens for messages up to 1k in size, and then processes them accordingly. +/// Takes the Server and a channel to send received Acks to the outbound thread. +/// +/// # Locking +/// * `MemberList::entries` (read) This method must not be called while any MemberList::entries lock +/// is held. +pub fn run_loop(server: &Server, socket: &UdpSocket, tx_outbound: &AckSender) -> ! { + let mut recv_buffer: Vec = vec![0; 1024]; + + loop { + habitat_common::sync::mark_thread_alive(); + + if server.paused() { + thread::sleep(Duration::from_millis(100)); + continue; + } - loop { - habitat_common::sync::mark_thread_alive(); + match socket.recv_from(&mut recv_buffer[..]) { + Ok((length, addr)) => { + let swim_payload = match server.unwrap_wire(&recv_buffer[0..length]) { + Ok(swim_payload) => swim_payload, + Err(e) => { + // NOTE: In the future, we might want to block people who send us + // garbage all the time. + error!("Error unwrapping protocol message, {}", e); + let label_values = &["unwrap_wire", "failure"]; + SWIM_BYTES_RECEIVED.with_label_values(label_values) + .set(length.to_i64()); + SWIM_MESSAGES_RECEIVED.with_label_values(label_values).inc(); + continue; + } + }; - if self.server.paused() { - thread::sleep(Duration::from_millis(100)); - continue; - } + let bytes_received = swim_payload.len(); + let msg = match Swim::decode(&swim_payload) { + Ok(msg) => msg, + Err(e) => { + // NOTE: In the future, we might want to block people who send us + // garbage all the time. + error!("Error decoding protocol message, {}", e); + let label_values = &["undecodable", "failure"]; + SWIM_BYTES_RECEIVED.with_label_values(label_values) + .set(bytes_received.to_i64()); + SWIM_MESSAGES_RECEIVED.with_label_values(label_values).inc(); + continue; + } + }; - match self.socket.recv_from(&mut recv_buffer[..]) { - Ok((length, addr)) => { - let swim_payload = match self.server.unwrap_wire(&recv_buffer[0..length]) { - Ok(swim_payload) => swim_payload, - Err(e) => { - // NOTE: In the future, we might want to block people who send us - // garbage all the time. - error!("Error unwrapping protocol message, {}", e); - let label_values = &["unwrap_wire", "failure"]; - SWIM_BYTES_RECEIVED.with_label_values(label_values) - .set(length.to_i64()); - SWIM_MESSAGES_RECEIVED.with_label_values(label_values).inc(); + // Setting a label_values variable here throws errors about moving borrowed + // content that I couldn't solve w/o clones. Leaving this for now. I'm sure + // there's a better way. + SWIM_BYTES_RECEIVED.with_label_values(&[msg.kind.as_str(), "success"]) + .set(bytes_received.to_i64()); + SWIM_MESSAGES_RECEIVED.with_label_values(&[msg.kind.as_str(), "success"]) + .inc(); + + trace!("SWIM Message: {:?}", msg); + match msg.kind { + SwimKind::Ping(ping) => { + if server.is_member_blocked(&ping.from.id) { + debug!("Not processing message from {} - it is blocked", + ping.from.id); continue; } - }; - - let bytes_received = swim_payload.len(); - let msg = match Swim::decode(&swim_payload) { - Ok(msg) => msg, - Err(e) => { - // NOTE: In the future, we might want to block people who send us - // garbage all the time. - error!("Error decoding protocol message, {}", e); - let label_values = &["undecodable", "failure"]; - SWIM_BYTES_RECEIVED.with_label_values(label_values) - .set(bytes_received.to_i64()); - SWIM_MESSAGES_RECEIVED.with_label_values(label_values).inc(); + process_ping(server, socket, addr, ping); + } + SwimKind::Ack(ack) => { + if server.is_member_blocked(&ack.from.id) && ack.forward_to.is_none() { + debug!("Not processing message from {} - it is blocked", + ack.from.id); continue; } - }; - - // Setting a label_values variable here throws errors about moving borrowed - // content that I couldn't solve w/o clones. Leaving this for now. I'm sure - // there's a better way. - SWIM_BYTES_RECEIVED.with_label_values(&[msg.kind.as_str(), "success"]) - .set(bytes_received.to_i64()); - SWIM_MESSAGES_RECEIVED.with_label_values(&[msg.kind.as_str(), "success"]) - .inc(); - - trace!("SWIM Message: {:?}", msg); - match msg.kind { - SwimKind::Ping(ping) => { - if self.server.is_member_blocked(&ping.from.id) { - debug!("Not processing message from {} - it is blocked", - ping.from.id); - continue; - } - self.process_ping(addr, ping); - } - SwimKind::Ack(ack) => { - if self.server.is_member_blocked(&ack.from.id) - && ack.forward_to.is_none() - { - debug!("Not processing message from {} - it is blocked", - ack.from.id); - continue; - } - self.process_ack(addr, ack); - } - SwimKind::PingReq(pingreq) => { - if self.server.is_member_blocked(&pingreq.from.id) { - debug!("Not processing message from {} - it is blocked", - pingreq.from.id); - continue; - } - self.process_pingreq(addr, pingreq); + process_ack(server, socket, tx_outbound, addr, ack); + } + SwimKind::PingReq(pingreq) => { + if server.is_member_blocked(&pingreq.from.id) { + debug!("Not processing message from {} - it is blocked", + pingreq.from.id); + continue; } + process_pingreq_mlr(server, socket, addr, pingreq); } } - Err(e) => { - // TODO: We can't use magic numbers here because the Supervisor runs on more - // than one platform. I'm sure these were added as specific OS errors for Linux - // but we need to also handle Windows & Mac. - match e.raw_os_error() { - Some(35) | Some(11) | Some(10035) | Some(10060) => { - // This is the normal non-blocking result, or a timeout - } - Some(_) => { - error!("UDP Receive error: {}", e); - debug!("UDP Receive error debug: {:?}", e); - } - None => { - error!("UDP Receive error: {}", e); - } + } + Err(e) => { + // TODO: We can't use magic numbers here because the Supervisor runs on more + // than one platform. I'm sure these were added as specific OS errors for Linux + // but we need to also handle Windows & Mac. + match e.raw_os_error() { + Some(35) | Some(11) | Some(10035) | Some(10060) => { + // This is the normal non-blocking result, or a timeout + } + Some(_) => { + error!("UDP Receive error: {}", e); + debug!("UDP Receive error debug: {:?}", e); + } + None => { + error!("UDP Receive error: {}", e); } } } } } +} - /// Process pingreq messages. - fn process_pingreq(&self, addr: SocketAddr, mut msg: PingReq) { - trace_it!(SWIM: &self.server, TraceKind::RecvPingReq, &msg.from.id, addr, &msg); +/// Process pingreq messages. +/// +/// # Locking +/// * `MemberList::entries` (read) This method must not be called while any MemberList::entries lock +/// is held. +fn process_pingreq_mlr(server: &Server, socket: &UdpSocket, addr: SocketAddr, mut msg: PingReq) { + trace_it!(SWIM: server, + TraceKind::RecvPingReq, + &msg.from.id, + addr, + &msg); + if let Some(target) = server.member_list.get_cloned_mlr(&msg.target.id) { msg.from.address = addr.ip().to_string(); - let id = msg.target.id.clone(); // TODO: see if we can eliminate this clone - self.server.member_list.with_member(&id, |target| { - if let Some(target) = target { - // Set the route-back address to the one we received the - // pingreq from - outbound::ping(&self.server, - &self.socket, - target, - target.swim_socket_address(), - Some(&msg.from)); - } else { - error!("PingReq request {:?} for invalid target", msg); - } - }); + let ping_msg = Ping { membership: vec![], + from: server.member.read().unwrap().as_member(), + forward_to: Some(msg.from.clone()), }; + let swim = outbound::populate_membership_rumors_mlr(server, &target, ping_msg); + // Set the route-back address to the one we received the + // pingreq from + outbound::ping(server, + socket, + &target, + target.swim_socket_address(), + Some(&msg.from), + &swim); + } else { + error!("PingReq request {:?} for invalid target", msg); } +} - /// Process ack messages; forwards to the outbound thread. - fn process_ack(&self, addr: SocketAddr, mut msg: Ack) { - trace_it!(SWIM: &self.server, TraceKind::RecvAck, &msg.from.id, addr, &msg); - trace!("Ack from {}@{}", msg.from.id, addr); - if msg.forward_to.is_some() && *self.server.member_id != msg.forward_to.as_ref().unwrap().id - { - let (forward_to_addr, from_addr) = { - let forward_to = msg.forward_to.as_ref().unwrap(); - let forward_addr_str = format!("{}:{}", forward_to.address, forward_to.swim_port); - let forward_to_addr = match forward_addr_str.parse() { - Ok(addr) => addr, - Err(e) => { - error!("Abandoning Ack forward: cannot parse member address: {}:{}, {}", - forward_to.address, forward_to.swim_port, e); - return; - } - }; - trace!("Forwarding Ack from {}@{} to {}@{}", - msg.from.id, - addr, - forward_to.id, - forward_to.address,); - (forward_to_addr, addr.ip().to_string()) - }; - msg.from.address = from_addr; - outbound::forward_ack(&self.server, &self.socket, forward_to_addr, msg); - return; - } - let memberships = msg.membership.clone(); - match self.tx_outbound.send((addr, msg)) { - Ok(()) => { - for membership in memberships { - self.server - .insert_member_from_rumor(membership.member, membership.health); +/// Process ack messages; forwards to the outbound thread. +fn process_ack(server: &Server, + socket: &UdpSocket, + tx_outbound: &AckSender, + addr: SocketAddr, + mut msg: Ack) { + trace_it!(SWIM: server, TraceKind::RecvAck, &msg.from.id, addr, &msg); + trace!("Ack from {}@{}", msg.from.id, addr); + if msg.forward_to.is_some() && *server.member_id != msg.forward_to.as_ref().unwrap().id { + let (forward_to_addr, from_addr) = { + let forward_to = msg.forward_to.as_ref().unwrap(); + let forward_addr_str = format!("{}:{}", forward_to.address, forward_to.swim_port); + let forward_to_addr = match forward_addr_str.parse() { + Ok(addr) => addr, + Err(e) => { + error!("Abandoning Ack forward: cannot parse member address: {}:{}, {}", + forward_to.address, forward_to.swim_port, e); + return; } + }; + trace!("Forwarding Ack from {}@{} to {}@{}", + msg.from.id, + addr, + forward_to.id, + forward_to.address,); + (forward_to_addr, addr.ip().to_string()) + }; + msg.from.address = from_addr; + outbound::forward_ack(server, socket, forward_to_addr, msg); + return; + } + let memberships = msg.membership.clone(); + match tx_outbound.send((addr, msg)) { + Ok(()) => { + for membership in memberships { + server.insert_member_from_rumor(membership.member, membership.health); } - Err(e) => panic!("Outbound thread has died - this shouldn't happen: #{:?}", e), } + Err(e) => panic!("Outbound thread has died - this shouldn't happen: #{:?}", e), } +} - /// Process ping messages. - fn process_ping(&self, addr: SocketAddr, mut msg: Ping) { - trace_it!(SWIM: &self.server, TraceKind::RecvPing, &msg.from.id, addr, &msg); - outbound::ack(&self.server, &self.socket, &msg.from, addr, msg.forward_to); - // Populate the member for this sender with its remote address - msg.from.address = addr.ip().to_string(); - trace!("Ping from {}@{}", msg.from.id, addr); - if msg.from.departed { - self.server.insert_member(msg.from, Health::Departed); - } else { - self.server.insert_member(msg.from, Health::Alive); - } - for membership in msg.membership { - self.server - .insert_member_from_rumor(membership.member, membership.health); - } +fn process_ping(server: &Server, socket: &UdpSocket, addr: SocketAddr, mut msg: Ping) { + trace_it!(SWIM: server, TraceKind::RecvPing, &msg.from.id, addr, &msg); + outbound::ack(server, socket, &msg.from, addr, msg.forward_to); + // Populate the member for this sender with its remote address + msg.from.address = addr.ip().to_string(); + trace!("Ping from {}@{}", msg.from.id, addr); + if msg.from.departed { + server.insert_member(msg.from, Health::Departed); + } else { + server.insert_member(msg.from, Health::Alive); + } + for membership in msg.membership { + server.insert_member_from_rumor(membership.member, membership.health); } } diff --git a/components/butterfly/src/server/mod.rs b/components/butterfly/src/server/mod.rs index 941ba86b3e..602580598e 100644 --- a/components/butterfly/src/server/mod.rs +++ b/components/butterfly/src/server/mod.rs @@ -404,12 +404,16 @@ impl Server { /// Start the server, along with a `Timing` for outbound connections. Spawns the `inbound`, /// `outbound`, and `expire` threads. /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + /// /// # Errors /// /// * Returns `Error::CannotBind` if the socket cannot be bound /// * Returns `Error::SocketSetReadTimeout` if the socket read timeout cannot be set /// * Returns `Error::SocketSetWriteTimeout` if the socket write timeout cannot be set - pub fn start(&mut self, timing: timing::Timing) -> Result<()> { + pub fn start_mlr(&mut self, timing: &timing::Timing) -> Result<()> { debug!("entering habitat_butterfly::server::Server::start"); let (tx_outbound, rx_inbound) = channel(); if let Some(ref path) = self.data_path { @@ -418,7 +422,7 @@ impl Server { } let mut file = DatFile::new(&self.member_id, path); if file.path().exists() { - match file.read_into(self) { + match file.read_into_mlr(self) { Ok(_) => { debug!("Successfully ingested rumors from {}", file.path().display()) @@ -442,81 +446,47 @@ impl Server { } } - let socket = match UdpSocket::bind(self.swim_addr) { - Ok(socket) => socket, - Err(e) => return Err(Error::CannotBind(e)), - }; + let socket = UdpSocket::bind(self.swim_addr)?; socket.set_read_timeout(Some(Duration::from_millis(1000))) .map_err(Error::SocketSetReadTimeout)?; socket.set_write_timeout(Some(Duration::from_millis(1000))) .map_err(Error::SocketSetReadTimeout)?; - let server_a = self.clone(); - let socket_a = match socket.try_clone() { - Ok(socket_a) => socket_a, - Err(_) => return Err(Error::SocketCloneError), - }; - let socket_expire = match socket.try_clone() { - Ok(socket_expire) => socket_expire, - Err(_) => return Err(Error::SocketCloneError), - }; - self.socket = Some(socket_expire); - - let _ = - thread::Builder::new().name(format!("inbound-{}", self.name())) - .spawn(move || { - inbound::Inbound::new(server_a, socket_a, tx_outbound).run(); - panic!("You should never, ever get here, judy"); - }); - - let server_b = self.clone(); - let socket_b = match socket.try_clone() { - Ok(socket_b) => socket_b, - Err(_) => return Err(Error::SocketCloneError), - }; - let timing_b = timing.clone(); - let _ = thread::Builder::new().name(format!("outbound-{}", self.name())) - .spawn(move || { - outbound::Outbound::new(server_b, socket_b, rx_inbound, - timing_b).run(); - panic!("You should never, ever get here, bob"); - }); - - let server_c = self.clone(); - let timing_c = timing.clone(); - let _ = thread::Builder::new().name(format!("expire-{}", self.name())) - .spawn(move || { - expire::Expire::new(server_c, timing_c).run(); - panic!("You should never, ever get here, frank"); - }); - - let server_d = self.clone(); - let _ = thread::Builder::new().name(format!("pull-{}", self.name())) - .spawn(move || { - pull::Pull::new(server_d).run(); - panic!("You should never, ever get here, davey"); - }); - - let server_e = self.clone(); - let _ = thread::Builder::new().name(format!("push-{}", self.name())) - .spawn(move || { - push::Push::new(server_e, timing).run(); - panic!("You should never, ever get here, liu"); - }); + self.socket = Some(socket.try_clone()?); + + inbound::spawn_thread(format!("inbound-{}", self.name()), + self.clone(), + socket.try_clone()?, + tx_outbound)?; + + outbound::spawn_thread(format!("outbound-{}", self.name()), + self.clone(), + socket, + rx_inbound, + timing.clone())?; + + expire::spawn_thread(format!("expire-{}", self.name()), + self.clone(), + timing.clone())?; + + pull::spawn_thread(format!("pull-{}", self.name()), self.clone())?; + + push::spawn_thread(format!("push-{}", self.name()), + self.clone(), + timing.clone())?; if self.dat_file.is_some() { - let server_f = self.clone(); - let _ = thread::Builder::new().name(format!("persist-{}", self.name())) - .spawn(move || { - persist_loop(&server_f); - panic!("Data persistence loop unexpectedly quit!"); - }); + spawn_persist_thread(format!("persist-{}", self.name()), self.clone())?; } Ok(()) } - pub fn need_peer_seeding(&self) -> bool { self.member_list.is_empty() } + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. Additionally `with_closure` is called with this lock held, so the closure + /// must not call any functions which take this lock. + pub fn need_peer_seeding_mlr(&self) -> bool { self.member_list.is_empty_mlr() } /// Persistently block a given address, causing no traffic to be seen. pub fn add_to_block_list(&self, member_id: String) { @@ -567,6 +537,10 @@ impl Server { pub fn name(&self) -> &str { &self.name } /// Insert a member to the `MemberList`, and update its `RumorKey` appropriately. + /// + /// # Locking + /// * `MemberList::entries` (write) This method must not be called while any MemberList::entries + /// lock is held. pub fn insert_member(&self, member: Member, health: Health) { let rk: RumorKey = RumorKey::from(&member); // NOTE: This sucks so much right here. Check out how we allocate no matter what, because @@ -575,7 +549,7 @@ impl Server { let member_id = member.id.clone(); let trace_incarnation = member.incarnation; let trace_health = health; - if self.member_list.insert(member, health) { + if self.member_list.insert_mlw(member, health) { trace_it!(MEMBERSHIP: self, TraceKind::MemberUpdate, member_id, @@ -596,7 +570,11 @@ impl Server { /// Set our member to departed, then send up to 10 out of order ack messages to other /// members to seed our status. - pub fn set_departed(&self) { + /// + /// # Locking + /// * `MemberList::entries` (write) This method must not be called while any MemberList::entries + /// lock is held. + pub fn set_departed_mlw(&self) { if self.socket.is_some() { { let mut me = self.member.write().expect("Member lock is poisoned"); @@ -605,7 +583,7 @@ impl Server { // actually needed. me.mark_departed(); - self.member_list.set_departed(&self.member_id); + self.member_list.set_departed_mlw(&self.member_id); trace_it!(MEMBERSHIP: self, TraceKind::MemberUpdate, self.member_id.clone(), @@ -622,7 +600,7 @@ impl Server { self.rumor_heat .start_hot_rumor(RumorKey::new(RumorType::Member, &self.member_id, "")); - let check_list = self.member_list.check_list(&self.member_id); + let check_list = self.member_list.check_list_mlr(&self.member_id); // TODO (CM): Even though we marked the rumor as hot // above, when we gossip, we send out the 5 "coolest but @@ -641,6 +619,10 @@ impl Server { } /// Given a membership record and some health, insert it into the Member List. + /// + /// # Locking + /// * `MemberList::entries` (write) This method must not be called while any MemberList::entries + /// lock is held. fn insert_member_from_rumor(&self, member: Member, mut health: Health) { let rk: RumorKey = RumorKey::from(&member); if member.id == self.member_id() && health != Health::Alive { @@ -657,7 +639,7 @@ impl Server { let trace_incarnation = member.incarnation; let trace_health = health; - if self.member_list.insert(member, health) { + if self.member_list.insert_mlw(member, health) { trace_it!(MEMBERSHIP: self, TraceKind::MemberUpdate, member_id, @@ -682,6 +664,10 @@ impl Server { /// /// See https://github.com/habitat-sh/habitat/issues/1994 /// See Server::check_quorum + /// + /// # Locking + /// * `MemberList::entries` (write) This method must not be called while any MemberList::entries + /// lock is held. pub fn insert_service(&self, service: Service) { Self::insert_service_impl(service, &self.service_store, @@ -707,10 +693,10 @@ impl Server { if inserting_new_group_member && !check_quorum(service_group) { if let Some(member_id_to_depart) = service_store.min_member_id_with(service_group, |id| { - member_list.health_of_by_id(id) == Some(Health::Confirmed) + member_list.health_of_by_id_mlr(id) == Some(Health::Confirmed) }) { - member_list.set_departed(&member_id_to_depart); + member_list.set_departed_mlw(&member_id_to_depart); rumor_heat.purge(&member_id_to_depart); rumor_heat.start_hot_rumor(RumorKey::new(RumorType::Member, &member_id_to_depart, @@ -738,6 +724,10 @@ impl Server { } /// Insert a departure rumor into the departure store. + /// + /// # Locking + /// * `MemberList::entries` (write) This method must not be called while any MemberList::entries + /// lock is held. pub fn insert_departure(&self, departure: Departure) { let rk = RumorKey::from(&departure); if *self.member_id == departure.member_id { @@ -745,7 +735,7 @@ impl Server { .compare_and_swap(false, true, Ordering::Relaxed); } - self.member_list.set_departed(&departure.member_id); + self.member_list.set_departed_mlw(&departure.member_id); self.rumor_heat.purge(&departure.member_id); self.rumor_heat @@ -758,10 +748,14 @@ impl Server { /// Get all the Member ID's who are present in a given service group, and eligible to vote /// (alive) - fn get_electorate(&self, key: &str) -> Vec { + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + fn get_electorate_mlr(&self, key: &str) -> Vec { let mut electorate = vec![]; self.service_store.with_rumors(key, |s| { - if self.member_list.health_of_by_id(&s.member_id) + if self.member_list.health_of_by_id_mlr(&s.member_id) == Some(Health::Alive) { electorate.push(s.member_id.clone()); @@ -770,18 +764,25 @@ impl Server { electorate } - fn check_in_voting_population_by_id(&self, member_id: &str) -> bool { - match self.member_list.health_of_by_id(member_id) { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + fn check_in_voting_population_by_id_mlr(&self, member_id: &str) -> bool { + match self.member_list.health_of_by_id_mlr(member_id) { Some(Health::Alive) | Some(Health::Suspect) | Some(Health::Confirmed) => true, Some(Health::Departed) | None => false, } } /// Get all the Member ID's who are present in a given service group, and count towards quorum. - fn get_total_population(&self, key: &str) -> Vec { + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + fn get_total_population_mlr(&self, key: &str) -> Vec { let mut total_pop = vec![]; self.service_store.with_rumors(key, |s| { - if self.check_in_voting_population_by_id(&s.member_id) { + if self.check_in_voting_population_by_id_mlr(&s.member_id) { total_pop.push(s.member_id.clone()); } }); @@ -791,9 +792,13 @@ impl Server { /// Check if a given service group has quorum to run an election. /// /// A group has quorum if a majority of its non-departed members are alive. + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. fn check_quorum(&self, key: &str) -> bool { - let electorate = self.get_electorate(key); - let service_group_members = self.get_total_population(key); + let electorate = self.get_electorate_mlr(key); + let service_group_members = self.get_total_population_mlr(key); let total_population = service_group_members.len(); let alive_population = electorate.len(); let has_quorum = alive_population > total_population / 2; @@ -842,10 +847,13 @@ impl Server { self.update_store.insert(e); } - fn elections_to_restart(&self, - elections: &RumorStore, - feature_flags: FeatureFlag) - -> Vec<(String, u64)> + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + fn elections_to_restart_mlr(&self, + elections: &RumorStore, + feature_flags: FeatureFlag) + -> Vec<(String, u64)> where T: Rumor + ElectionRumor + Debug { Self::elections_to_restart_impl(elections, @@ -857,6 +865,9 @@ impl Server { &self.data_path) } + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. fn elections_to_restart_impl(elections: &RumorStore, service_store: &RumorStore, myself_member_id: &str, @@ -905,7 +916,7 @@ impl Server { } } else if election.is_finished() { let leader_health = - member_list.health_of_by_id(election.member_id()) + member_list.health_of_by_id_mlr(election.member_id()) .unwrap_or_else(|| { debug!("No health information for {}; \ treating as Departed", @@ -931,15 +942,20 @@ impl Server { /// /// a) We are the leader, and we have lost quorum with the rest of the group. /// b) We are not the leader, and we have detected that the leader is confirmed dead. - pub fn restart_elections(&self, feature_flags: FeatureFlag) { - let elections_to_restart = self.elections_to_restart(&self.election_store, feature_flags); + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn restart_elections_mlr(&self, feature_flags: FeatureFlag) { + let elections_to_restart = + self.elections_to_restart_mlr(&self.election_store, feature_flags); // TODO (CM): not currently triggering update elections! // There's only one kind of sentinel file at the moment, and // that's for non-update elections. If that file existed, // it'll be gone by the time we get here. let update_elections_to_restart = - self.elections_to_restart(&self.update_store, feature_flags); + self.elections_to_restart_mlr(&self.update_store, feature_flags); for (service_group, old_term) in elections_to_restart { let term = old_term + 1; @@ -959,7 +975,11 @@ impl Server { /// Insert an election into the election store. Handles creating a new election rumor for this /// member on receipt of an election rumor for a service this server cares about. Also handles /// stopping the election if we are the winner and we have enough votes. - pub fn insert_election(&self, mut election: Election) { + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn insert_election_mlr(&self, mut election: Election) { debug!("insert_election: {:?}", election); let rk = RumorKey::from(&election); @@ -989,7 +1009,7 @@ impl Server { // election is over! If it is, mark this election as final before you process it. if self.member_id() == election.member_id { if self.check_quorum(election.key()) { - let electorate = self.get_electorate(election.key()); + let electorate = self.get_electorate_mlr(election.key()); let mut num_votes = 0; for vote in election.votes.iter() { if electorate.contains(vote) { @@ -1049,7 +1069,10 @@ impl Server { } } - pub fn insert_update_election(&self, mut election: ElectionUpdate) { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn insert_update_election_mlr(&self, mut election: ElectionUpdate) { debug!("insert_update_election: {:?}", election); let rk = RumorKey::from(&election); @@ -1079,7 +1102,7 @@ impl Server { // election is over! If it is, mark this election as final before you process it. if self.member_id() == election.member_id { if self.check_quorum(election.key()) { - let electorate = self.get_electorate(election.key()); + let electorate = self.get_electorate_mlr(election.key()); let mut num_votes = 0; for vote in election.votes.iter() { if electorate.contains(vote) { @@ -1141,22 +1164,6 @@ impl Server { pub fn is_departed(&self) -> bool { self.departed.load(Ordering::Relaxed) } } -impl Serialize for Server { - fn serialize(&self, serializer: S) -> result::Result - where S: Serializer - { - let mut strukt = serializer.serialize_struct("butterfly", 7)?; - strukt.serialize_field("member", &self.member_list)?; - strukt.serialize_field("service", &self.service_store)?; - strukt.serialize_field("service_config", &self.service_config_store)?; - strukt.serialize_field("service_file", &self.service_file_store)?; - strukt.serialize_field("election", &self.election_store)?; - strukt.serialize_field("election_update", &self.update_store)?; - strukt.serialize_field("departure", &self.departure_store)?; - strukt.end() - } -} - impl fmt::Display for Server { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, @@ -1167,7 +1174,13 @@ impl fmt::Display for Server { } } -fn persist_loop(server: &Server) { +fn spawn_persist_thread(name: String, server: Server) -> std::io::Result<()> { + thread::Builder::new().name(name) + .spawn(move || persist_loop(&server)) + .map(|_| ()) +} + +fn persist_loop(server: &Server) -> ! { habitat_core::env_config_duration!(PersistLoopPeriod, HAB_PERSIST_LOOP_PERIOD_SECS => from_secs, Duration::from_secs(30)); @@ -1202,6 +1215,9 @@ impl<'a> ServerProxy<'a> { } impl<'a> Serialize for ServerProxy<'a> { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. fn serialize(&self, serializer: S) -> result::Result where S: Serializer { @@ -1369,7 +1385,7 @@ mod tests { service_store.insert(service.clone()); - member_list.insert(departed_leader, Health::Departed); + member_list.insert_mlw(departed_leader, Health::Departed); let to_restart = Server::elections_to_restart_impl(&elections, &service_store, @@ -1418,8 +1434,8 @@ mod tests { let member_list = MemberList::new(); let rumor_heat = RumorHeat::default(); - member_list.insert(alive_member.clone(), Health::Alive); - member_list.insert(confirmed_member.clone(), Health::Confirmed); + member_list.insert_mlw(alive_member.clone(), Health::Alive); + member_list.insert_mlw(confirmed_member.clone(), Health::Confirmed); Server::insert_service_impl(confirmed_member_service_rumor.clone(), &service_store, @@ -1427,7 +1443,7 @@ mod tests { &rumor_heat, check_quorum_returns(false)); - assert_eq!(member_list.health_of(&confirmed_member), + assert_eq!(member_list.health_of_mlr(&confirmed_member), Some(Health::Confirmed)); Server::insert_service_impl(alive_member_service_rumor.clone(), @@ -1436,7 +1452,7 @@ mod tests { &rumor_heat, check_quorum_returns(false)); - assert_eq!(member_list.health_of(&confirmed_member), + assert_eq!(member_list.health_of_mlr(&confirmed_member), Some(Health::Departed)); } @@ -1450,10 +1466,10 @@ mod tests { let member_list = MemberList::new(); let rumor_heat = RumorHeat::default(); - member_list.insert(alive_member.clone(), Health::Alive); + member_list.insert_mlw(alive_member.clone(), Health::Alive); // This member will become confirmed later. If it's already Confirmed // when inserted, it could be departed immediately - member_list.insert(confirmed_member.clone(), Health::Alive); + member_list.insert_mlw(confirmed_member.clone(), Health::Alive); Server::insert_service_impl(alive_member_service_rumor.clone(), &service_store, @@ -1467,7 +1483,7 @@ mod tests { &rumor_heat, check_quorum_returns(false)); - member_list.insert(confirmed_member.clone(), Health::Confirmed); + member_list.insert_mlw(confirmed_member.clone(), Health::Confirmed); Server::insert_service_impl(Service { incarnation: alive_member_service_rumor.incarnation + 1, @@ -1477,7 +1493,7 @@ mod tests { &rumor_heat, check_quorum_returns(false)); - assert_eq!(member_list.health_of(&confirmed_member), + assert_eq!(member_list.health_of_mlr(&confirmed_member), Some(Health::Confirmed)); } @@ -1491,8 +1507,8 @@ mod tests { let member_list = MemberList::new(); let rumor_heat = RumorHeat::default(); - member_list.insert(alive_member.clone(), Health::Alive); - member_list.insert(confirmed_member.clone(), Health::Confirmed); + member_list.insert_mlw(alive_member.clone(), Health::Alive); + member_list.insert_mlw(confirmed_member.clone(), Health::Confirmed); Server::insert_service_impl(confirmed_member_service_rumor.clone(), &service_store, @@ -1500,7 +1516,7 @@ mod tests { &rumor_heat, check_quorum_returns(true)); - assert_eq!(member_list.health_of(&confirmed_member), + assert_eq!(member_list.health_of_mlr(&confirmed_member), Some(Health::Confirmed)); Server::insert_service_impl(alive_member_service_rumor.clone(), @@ -1509,7 +1525,7 @@ mod tests { &rumor_heat, check_quorum_returns(true)); - assert_eq!(member_list.health_of(&confirmed_member), + assert_eq!(member_list.health_of_mlr(&confirmed_member), Some(Health::Confirmed)); } mod myself { @@ -1657,14 +1673,14 @@ mod tests { fn new_with_corrupt_rumor_file() { let tmpdir = TempDir::new().unwrap(); let mut server = start_with_corrupt_rumor_file(&tmpdir); - server.start(Timing::default()) + server.start_mlr(&Timing::default()) .expect("Server failed to start"); } #[test] fn start_listener() { let mut server = start_server(); - server.start(Timing::default()) + server.start_mlr(&Timing::default()) .expect("Server failed to start"); } } diff --git a/components/butterfly/src/server/outbound.rs b/components/butterfly/src/server/outbound.rs index 3ee3593209..87075c1c34 100644 --- a/components/butterfly/src/server/outbound.rs +++ b/components/butterfly/src/server/outbound.rs @@ -66,224 +66,234 @@ impl fmt::Display for AckFrom { } } -/// The outbound thread -pub struct Outbound { - pub server: Server, - pub socket: UdpSocket, - pub rx_inbound: AckReceiver, - pub timing: Timing, +pub fn spawn_thread(name: String, + server: Server, + socket: UdpSocket, + rx_inbound: AckReceiver, + timing: Timing) + -> std::io::Result<()> { + thread::Builder::new().name(name) + .spawn(move || run_loop(&server, &socket, &rx_inbound, &timing)) + .map(|_| ()) } -impl Outbound { - /// Creates a new Outbound struct. - pub fn new(server: Server, - socket: UdpSocket, - rx_inbound: AckReceiver, - timing: Timing) - -> Outbound { - Outbound { server, - socket, - rx_inbound, - timing } - } - - /// Run the outbound thread. Gets a list of members to ping, then walks the list, probing each - /// member. - /// - /// If the probe completes before the next protocol period is scheduled, waits for the protocol - /// period to finish before starting the next probe. - pub fn run(&mut self) { - let mut have_members = false; - loop { - habitat_common::sync::mark_thread_alive(); - - if !have_members { - let num_initial = self.server.member_list.len_initial_members(); - if num_initial != 0 { - // The minimum that's strictly more than half - let min_to_start = num_initial / 2 + 1; - - if self.server.member_list.len() >= min_to_start { - have_members = true; - } else { - self.server.member_list.with_initial_members(|member| { - ping(&self.server, - &self.socket, - &member, - member.swim_socket_address(), - None); - }); - } +/// Run the outbound thread. Gets a list of members to ping, then walks the list, probing each +/// member. +/// +/// If the probe completes before the next protocol period is scheduled, waits for the protocol +/// period to finish before starting the next probe. +fn run_loop(server: &Server, socket: &UdpSocket, rx_inbound: &AckReceiver, timing: &Timing) -> ! { + let mut have_members = false; + loop { + habitat_common::sync::mark_thread_alive(); + + if !have_members { + let num_initial = server.member_list.len_initial_members_imlr(); + if num_initial != 0 { + // The minimum that's strictly more than half + let min_to_start = num_initial / 2 + 1; + + if server.member_list.len_mlr() >= min_to_start { + have_members = true; + } else { + server.member_list.with_initial_members_imlr(|member| { + ping_mlr(&server, + &socket, + &member, + member.swim_socket_address(), + None); + }); } } + } - if self.server.paused() { - thread::sleep(Duration::from_millis(100)); - continue; - } + if server.paused() { + thread::sleep(Duration::from_millis(100)); + continue; + } - self.server.update_swim_round(); + server.update_swim_round(); - let long_wait = self.timing.next_protocol_period(); + let long_wait = timing.next_protocol_period(); - let check_list = self.server.member_list.check_list(&self.server.member_id); + let check_list = server.member_list.check_list_mlr(&server.member_id); - for member in check_list { - if self.server.member_list.pingable(&member) { - // This is the timeout for the next protocol period - if we - // complete faster than this, we want to wait in the end - // until this timer expires. - let next_protocol_period = self.timing.next_protocol_period(); + for member in check_list { + if server.member_list.pingable_mlr(&member) { + // This is the timeout for the next protocol period - if we + // complete faster than this, we want to wait in the end + // until this timer expires. + let next_protocol_period = timing.next_protocol_period(); - self.probe(member); + probe_mlr(&server, &socket, &rx_inbound, &timing, member); - if SteadyTime::now() <= next_protocol_period { - let wait_time = - (next_protocol_period - SteadyTime::now()).num_milliseconds(); - if wait_time > 0 { - debug!("Waiting {} until the next protocol period", wait_time); - thread::sleep(Duration::from_millis(wait_time as u64)); - } + if SteadyTime::now() <= next_protocol_period { + let wait_time = (next_protocol_period - SteadyTime::now()).num_milliseconds(); + if wait_time > 0 { + debug!("Waiting {} until the next protocol period", wait_time); + thread::sleep(Duration::from_millis(wait_time as u64)); } } } + } - if SteadyTime::now() <= long_wait { - let wait_time = (long_wait - SteadyTime::now()).num_milliseconds(); - if wait_time > 0 { - thread::sleep(Duration::from_millis(wait_time as u64)); - } + if SteadyTime::now() <= long_wait { + let wait_time = (long_wait - SteadyTime::now()).num_milliseconds(); + if wait_time > 0 { + thread::sleep(Duration::from_millis(wait_time as u64)); } } } +} - /// Probe Loop - /// - /// First, we send the ping to the remote address. This operation never blocks - we just - /// pass the data straight on to the kernel for UDP goodness. Then we grab a timer for how - /// long we're willing to run this phase, and start listening for Ack packets from the - /// Inbound thread. If we receive an Ack that is for any Member other than the one we are - /// currently pinging, we discard it. Otherwise, we set the address for the Member whose Ack - /// we received to the one we saw on the wire, and insert it into the MemberList. - /// - /// If we don't receive anything on the channel, we check if the current time has exceeded - /// our timeout. If it has, we break out of the Ping loop, and proceed to the PingReq loop. - /// If the timer has not been exceeded, we park this thread for - /// PING_RECV_QUEUE_EMPTY_SLEEP_MS, and try again. - /// - /// If we don't receive anything at all in the Ping/PingReq loop, we mark the member as Suspect. - fn probe(&mut self, member: Member) { - let pa_timer = SWIM_PROBE_DURATION.with_label_values(&["ping/ack"]) - .start_timer(); - let mut pr_timer: Option = None; - let addr = member.swim_socket_address(); - - trace_it!(PROBE: &self.server, TraceKind::ProbeBegin, &member.id, addr); - - // Ping the member, and wait for the ack. - SWIM_PROBES_SENT.with_label_values(&["ping"]).inc(); - ping(&self.server, &self.socket, &member, addr, None); - - if self.recv_ack(&member, addr, AckFrom::Ping) { - trace_it!(PROBE: &self.server, TraceKind::ProbeAckReceived, &member.id, addr); - trace_it!(PROBE: &self.server, TraceKind::ProbeComplete, &member.id, addr); - SWIM_PROBES_SENT.with_label_values(&["ack"]).inc(); - pa_timer.observe_duration(); - return; - } +/// Probe Loop +/// +/// First, we send the ping to the remote address. This operation never blocks - we just +/// pass the data straight on to the kernel for UDP goodness. Then we grab a timer for how +/// long we're willing to run this phase, and start listening for Ack packets from the +/// Inbound thread. If we receive an Ack that is for any Member other than the one we are +/// currently pinging, we discard it. Otherwise, we set the address for the Member whose Ack +/// we received to the one we saw on the wire, and insert it into the MemberList. +/// +/// If we don't receive anything on the channel, we check if the current time has exceeded +/// our timeout. If it has, we break out of the Ping loop, and proceed to the PingReq loop. +/// If the timer has not been exceeded, we park this thread for +/// PING_RECV_QUEUE_EMPTY_SLEEP_MS, and try again. +/// +/// If we don't receive anything at all in the Ping/PingReq loop, we mark the member as Suspect. +/// +/// # Locking +/// * `MemberList::entries` (read) This method must not be called while any MemberList::entries lock +/// is held. +fn probe_mlr(server: &Server, + socket: &UdpSocket, + rx_inbound: &AckReceiver, + timing: &Timing, + member: Member) { + let pa_timer = SWIM_PROBE_DURATION.with_label_values(&["ping/ack"]) + .start_timer(); + let mut pr_timer: Option = None; + let addr = member.swim_socket_address(); + + trace_it!(PROBE: server, TraceKind::ProbeBegin, &member.id, addr); + + // Ping the member, and wait for the ack. + SWIM_PROBES_SENT.with_label_values(&["ping"]).inc(); + ping_mlr(server, socket, &member, addr, None); + + if recv_ack(server, rx_inbound, timing, &member, addr, AckFrom::Ping) { + trace_it!(PROBE: server, TraceKind::ProbeAckReceived, &member.id, addr); + trace_it!(PROBE: server, TraceKind::ProbeComplete, &member.id, addr); + SWIM_PROBES_SENT.with_label_values(&["ack"]).inc(); + pa_timer.observe_duration(); + return; + } - self.server.member_list.with_pingreq_targets( - self.server.member_id(), - &member.id, - |pingreq_target| { - trace_it!(PROBE: &self.server, - TraceKind::ProbePingReq, - &pingreq_target.id, - &pingreq_target.address); - SWIM_PROBES_SENT.with_label_values(&["pingreq"]).inc(); - pr_timer = Some( - SWIM_PROBE_DURATION - .with_label_values(&["pingreq/ack"]) - .start_timer(), - ); - pingreq(&self.server, &self.socket, pingreq_target, &member); - }, - ); - - if self.recv_ack(&member, addr, AckFrom::PingReq) { - SWIM_PROBES_SENT.with_label_values(&["ack"]).inc(); - trace_it!(PROBE: &self.server, TraceKind::ProbeComplete, &member.id, addr); - } else { - // We mark as suspect when we fail to get a response from the PingReq. That moves us - // into the suspicion phase, where anyone marked as suspect has a certain number of - // protocol periods to recover. - warn!("Marking {} as Suspect", &member.id); - trace_it!(PROBE: &self.server, TraceKind::ProbeSuspect, &member.id, addr); - trace_it!(PROBE: &self.server, TraceKind::ProbeComplete, &member.id, addr); - self.server.insert_member(member, Health::Suspect); - SWIM_PROBES_SENT.with_label_values(&["pingreq/failure"]) - .inc(); - } + let pingreq_message = PingReq { membership: vec![], + from: server.member.read().unwrap().as_member(), + target: member.clone(), }; + let swim = populate_membership_rumors_mlr(server, &member, pingreq_message); + + server.member_list + .with_pingreq_targets_mlr(server.member_id(), &member.id, |pingreq_target| { + trace_it!(PROBE: server, + TraceKind::ProbePingReq, + &pingreq_target.id, + &pingreq_target.address); + SWIM_PROBES_SENT.with_label_values(&["pingreq"]).inc(); + pr_timer = Some(SWIM_PROBE_DURATION.with_label_values(&["pingreq/ack"]) + .start_timer()); + pingreq(server, socket, pingreq_target, &member, &swim); + }); + + if recv_ack(server, rx_inbound, timing, &member, addr, AckFrom::PingReq) { + SWIM_PROBES_SENT.with_label_values(&["ack"]).inc(); + trace_it!(PROBE: server, TraceKind::ProbeComplete, &member.id, addr); + } else { + // We mark as suspect when we fail to get a response from the PingReq. That moves us + // into the suspicion phase, where anyone marked as suspect has a certain number of + // protocol periods to recover. + warn!("Marking {} as Suspect", &member.id); + trace_it!(PROBE: server, TraceKind::ProbeSuspect, &member.id, addr); + trace_it!(PROBE: server, TraceKind::ProbeComplete, &member.id, addr); + server.insert_member(member, Health::Suspect); + SWIM_PROBES_SENT.with_label_values(&["pingreq/failure"]) + .inc(); + } - if pr_timer.is_some() { - pr_timer.unwrap().observe_duration(); - } + if pr_timer.is_some() { + pr_timer.unwrap().observe_duration(); } +} - /// Listen for an ack from the `Inbound` thread. - fn recv_ack(&mut self, member: &Member, addr: SocketAddr, ack_from: AckFrom) -> bool { - let timeout = match ack_from { - AckFrom::Ping => self.timing.ping_timeout(), - AckFrom::PingReq => self.timing.pingreq_timeout(), - }; - loop { - match self.rx_inbound.try_recv() { - Ok((real_addr, mut ack)) => { - // If this was forwarded to us, we want to retain the address of the member who - // sent the ack, not the one we received on the socket. - if ack.forward_to.is_none() { - ack.from.address = real_addr.ip().to_string(); - } - if member.id != ack.from.id { - if ack.from.departed { - self.server.insert_member(ack.from, Health::Departed); - } else { - self.server.insert_member(ack.from, Health::Alive); - } - // Keep listening, we want the ack we expected - continue; +/// Listen for an ack from the `Inbound` thread. +fn recv_ack(server: &Server, + rx_inbound: &AckReceiver, + timing: &Timing, + member: &Member, + addr: SocketAddr, + ack_from: AckFrom) + -> bool { + let timeout = match ack_from { + AckFrom::Ping => timing.ping_timeout(), + AckFrom::PingReq => timing.pingreq_timeout(), + }; + loop { + match rx_inbound.try_recv() { + Ok((real_addr, mut ack)) => { + // If this was forwarded to us, we want to retain the address of the member who + // sent the ack, not the one we received on the socket. + if ack.forward_to.is_none() { + ack.from.address = real_addr.ip().to_string(); + } + if member.id != ack.from.id { + if ack.from.departed { + server.insert_member(ack.from, Health::Departed); } else { - // We got the ack we are looking for; return. - if ack.from.departed { - self.server.insert_member(ack.from, Health::Departed); - } else { - self.server.insert_member(ack.from, Health::Alive); - } - return true; + server.insert_member(ack.from, Health::Alive); } - } - Err(mpsc::TryRecvError::Empty) => { - if SteadyTime::now() > timeout { - warn!("Timed out waiting for Ack from {}@{}", &member.id, addr); - return false; + // Keep listening, we want the ack we expected + continue; + } else { + // We got the ack we are looking for; return. + if ack.from.departed { + server.insert_member(ack.from, Health::Departed); + } else { + server.insert_member(ack.from, Health::Alive); } - thread::sleep(Duration::from_millis(PING_RECV_QUEUE_EMPTY_SLEEP_MS)); + return true; } - Err(mpsc::TryRecvError::Disconnected) => { - panic!("Outbound thread has disconnected! This is fatal."); + } + Err(mpsc::TryRecvError::Empty) => { + if SteadyTime::now() > timeout { + warn!("Timed out waiting for Ack from {}@{}", &member.id, addr); + return false; } + thread::sleep(Duration::from_millis(PING_RECV_QUEUE_EMPTY_SLEEP_MS)); + } + Err(mpsc::TryRecvError::Disconnected) => { + panic!("Outbound thread has disconnected! This is fatal."); } } } } -/// Populate a SWIM message with rumors. -pub fn populate_membership_rumors(server: &Server, target: &Member, swim: &mut Swim) { +/// Created a SWIM message from the given `message` template and populate it with rumors. +/// +/// # Locking +/// * `MemberList::entries` (read) This method must not be called while any MemberList::entries lock +/// is held. +pub fn populate_membership_rumors_mlr(server: &Server, + target: &Member, + message: impl Into) + -> Swim { + let mut swim = message.into(); + // If this isn't the first time we are communicating with this target, we want to include this // targets current status. This ensures that members always get a "Confirmed" rumor, before we // have the chance to flip it to "Alive", which helps make sure we heal from a partition. - if server.member_list.contains_member(&target.id) { - if let Some(always_target) = server.member_list.membership_for(&target.id) { + if server.member_list.contains_member_mlr(&target.id) { + if let Some(always_target) = server.member_list.membership_for_mlr(&target.id) { swim.membership.push(always_target); } } @@ -299,26 +309,29 @@ pub fn populate_membership_rumors(server: &Server, target: &Member, swim: &mut S .collect(); for rkey in rumors.iter() { - if let Some(member) = server.member_list.membership_for(&rkey.key()) { + if let Some(member) = server.member_list.membership_for_mlr(&rkey.key()) { swim.membership.push(member); } } // We don't want to update the heat for rumors that we know we are sending to a target that is // confirmed dead; the odds are, they won't receive them. Lets spam them a little harder with // rumors. - if !server.member_list.persistent_and_confirmed(target) { + if !server.member_list.persistent_and_confirmed_mlr(target) { server.rumor_heat.cool_rumors(&target.id, &rumors); } + + swim } -/// Send a PingReq. -pub fn pingreq(server: &Server, socket: &UdpSocket, pingreq_target: &Member, target: &Member) { - let pingreq = PingReq { membership: vec![], - from: server.member.read().unwrap().as_member(), - target: target.clone(), }; - let mut swim: Swim = pingreq.into(); +/// Send a PingReq: request `pingreq_target` to ping `target` on the behalf of `server` to see if +/// `target` is alive despite not being directly reachable from `server`. In other words, +/// `pingreq_target` is the proxy and `target` is the final destination. +fn pingreq(server: &Server, // TODO: eliminate this arg + socket: &UdpSocket, + pingreq_target: &Member, + target: &Member, + swim: &Swim) { let addr = pingreq_target.swim_socket_address(); - populate_membership_rumors(server, target, &mut swim); let bytes = match swim.clone().encode() { Ok(bytes) => bytes, Err(e) => { @@ -362,18 +375,57 @@ pub fn pingreq(server: &Server, socket: &UdpSocket, pingreq_target: &Member, tar } /// Send a Ping. +/// +/// # Locking +/// * `MemberList::entries` (read) This method must not be called while any MemberList::entries lock +/// is held. +pub fn ping_mlr(server: &Server, + socket: &UdpSocket, + target: &Member, + addr: SocketAddr, + forward_to: Option<&Member>) { + let ping_msg = Ping { membership: vec![], + from: server.member.read().unwrap().as_member(), + forward_to: forward_to.cloned(), /* TODO: see if we can eliminate this + * clone */ }; + let swim = populate_membership_rumors_mlr(server, target, ping_msg); + let bytes = match swim.clone().encode() { + Ok(bytes) => bytes, + Err(e) => { + error!("Generating protocol message failed: {}", e); + return; + } + }; + let payload = match server.generate_wire(bytes) { + Ok(payload) => payload, + Err(e) => { + error!("Generating protocol message failed: {}", e); + return; + } + }; + match socket.send_to(&payload, addr) { + Ok(_s) => { + let label_values = &["ping"]; + SWIM_MESSAGES_SENT.with_label_values(label_values).inc(); + SWIM_BYTES_SENT.with_label_values(label_values) + .set(payload.len().to_i64()); + let on_behalf_of = match forward_to { + Some(x) => format!(" on behalf of {}@{}", x.id, x.address), + None => "".into(), + }; + trace!("Sent Ping to {}{}", addr, on_behalf_of); + } + Err(e) => error!("Failed Ping to {}: {}", addr, e), + } + trace_it!(SWIM: server, TraceKind::SendPing, &target.id, addr, &swim); +} + pub fn ping(server: &Server, socket: &UdpSocket, target: &Member, addr: SocketAddr, - forward_to: Option<&Member>) { - let ping = Ping { - membership: vec![], - from: server.member.read().unwrap().as_member(), - forward_to: forward_to.cloned(), // TODO: see if we can eliminate this clone - }; - let mut swim: Swim = ping.into(); - populate_membership_rumors(server, target, &mut swim); + forward_to: Option<&Member>, + swim: &Swim) { let bytes = match swim.clone().encode() { Ok(bytes) => bytes, Err(e) => { @@ -435,17 +487,20 @@ pub fn forward_ack(server: &Server, socket: &UdpSocket, addr: SocketAddr, msg: A } /// Send an Ack. +/// +/// # Locking +/// * `MemberList::entries` (read) This method must not be called while any MemberList::entries lock +/// is held. pub fn ack(server: &Server, socket: &UdpSocket, target: &Member, addr: SocketAddr, forward_to: Option) { - let ack = Ack { membership: vec![], - from: server.member.read().unwrap().as_member(), - forward_to: forward_to.map(Member::from), }; - let member_id = ack.from.id.clone(); - let mut swim: Swim = ack.into(); - populate_membership_rumors(server, target, &mut swim); + let ack_msg = Ack { membership: vec![], + from: server.member.read().unwrap().as_member(), + forward_to: forward_to.map(Member::from), }; + let member_id = ack_msg.from.id.clone(); + let swim = populate_membership_rumors_mlr(server, target, ack_msg); let bytes = match swim.clone().encode() { Ok(bytes) => bytes, Err(e) => { diff --git a/components/butterfly/src/server/pull.rs b/components/butterfly/src/server/pull.rs index e51cbe12ce..5e54f7f590 100644 --- a/components/butterfly/src/server/pull.rs +++ b/components/butterfly/src/server/pull.rs @@ -27,120 +27,113 @@ lazy_static! { &["type", "mode", "blocked"]).unwrap(); } -/// Takes a reference to the server itself -pub struct Pull { - pub server: Server, +pub fn spawn_thread(name: String, server: Server) -> std::io::Result<()> { + thread::Builder::new().name(name) + .spawn(move || run_loop(&server)) + .map(|_| ()) } -impl Pull { - /// Create a new Pull - pub fn new(server: Server) -> Pull { Pull { server } } +fn run_loop(server: &Server) -> ! { + habitat_core::env_config_int!(RecvTimeoutMillis, i32, HAB_PULL_RECV_TIMEOUT_MS, 5_000); + + let socket = (**ZMQ_CONTEXT).as_mut() + .socket(zmq::PULL) + .expect("Failure to create the ZMQ pull socket"); + socket.set_linger(0) + .expect("Failure to set the ZMQ Pull socket to not linger"); + socket.set_tcp_keepalive(0) + .expect("Failure to set the ZMQ Pull socket to not use keepalive"); + socket.set_rcvtimeo(RecvTimeoutMillis::configured_value().into()) + .expect("Failure to set the ZMQ Pull socket receive timeout"); + socket.bind(&format!("tcp://{}", server.gossip_addr())) + .expect("Failure to bind the ZMQ Pull socket to the port"); + 'recv: loop { + if let Ok(-1) = socket.get_rcvtimeo() { + trace!("Skipping thread liveliness checks due to infinite recv timeout"); + } else { + habitat_common::sync::mark_thread_alive(); + } - /// Run this thread. Creates a socket, binds to the `gossip_addr`, then processes messages as - /// they are received. Uses a ZMQ pull socket, so inbound messages are fair-queued. - pub fn run(&mut self) { - habitat_core::env_config_int!(RecvTimeoutMillis, i32, HAB_PULL_RECV_TIMEOUT_MS, 5_000); + if server.paused() { + thread::sleep(Duration::from_millis(100)); + continue; + } - let socket = (**ZMQ_CONTEXT).as_mut() - .socket(zmq::PULL) - .expect("Failure to create the ZMQ pull socket"); - socket.set_linger(0) - .expect("Failure to set the ZMQ Pull socket to not linger"); - socket.set_tcp_keepalive(0) - .expect("Failure to set the ZMQ Pull socket to not use keepalive"); - socket.set_rcvtimeo(RecvTimeoutMillis::configured_value().into()) - .expect("Failure to set the ZMQ Pull socket receive timeout"); - socket.bind(&format!("tcp://{}", self.server.gossip_addr())) - .expect("Failure to bind the ZMQ Pull socket to the port"); - 'recv: loop { - if let Ok(-1) = socket.get_rcvtimeo() { - trace!("Skipping thread liveliness checks due to infinite recv timeout"); - } else { - habitat_common::sync::mark_thread_alive(); + let msg = match socket.recv_msg(0) { + Ok(msg) => msg, + Err(e) => { + // We intentionally set a timeout above so that `mark_thread_alive` can be + // used to show this thread is alive even when there's no data to receive. + if e != zmq::Error::EAGAIN { + error!("Error receiving message: {:?}", e); + } + continue 'recv; } - - if self.server.paused() { - thread::sleep(Duration::from_millis(100)); + }; + + let payload = match server.unwrap_wire(&msg) { + Ok(payload) => payload, + Err(e) => { + // NOTE: In the future, we might want to block people who send us + // garbage all the time. + error!("Error parsing protocol message: {:?}", e); + let label_values = &["unwrap_wire", "failure", "unknown"]; + GOSSIP_BYTES_RECEIVED.with_label_values(label_values) + .set(msg.len().to_i64()); + GOSSIP_MESSAGES_RECEIVED.with_label_values(label_values) + .inc(); continue; } + }; + + let proto = match RumorEnvelope::decode(&payload) { + Ok(proto) => proto, + Err(e) => { + error!("Error parsing protocol message: {:?}", e); + let label_values = &["undecodable", "failure", "unknown"]; + GOSSIP_BYTES_RECEIVED.with_label_values(label_values) + .set(payload.len().to_i64()); + GOSSIP_MESSAGES_RECEIVED.with_label_values(label_values) + .inc(); + continue 'recv; + } + }; - let msg = match socket.recv_msg(0) { - Ok(msg) => msg, - Err(e) => { - // We intentionally set a timeout above so that `mark_thread_alive` can be - // used to show this thread is alive even when there's no data to receive. - if e != zmq::Error::EAGAIN { - error!("Error receiving message: {:?}", e); - } - continue 'recv; - } - }; - - let payload = match self.server.unwrap_wire(&msg) { - Ok(payload) => payload, - Err(e) => { - // NOTE: In the future, we might want to block people who send us - // garbage all the time. - error!("Error parsing protocol message: {:?}", e); - let label_values = &["unwrap_wire", "failure", "unknown"]; - GOSSIP_BYTES_RECEIVED.with_label_values(label_values) - .set(msg.len().to_i64()); - GOSSIP_MESSAGES_RECEIVED.with_label_values(label_values) - .inc(); - continue; - } - }; - - let proto = match RumorEnvelope::decode(&payload) { - Ok(proto) => proto, - Err(e) => { - error!("Error parsing protocol message: {:?}", e); - let label_values = &["undecodable", "failure", "unknown"]; - GOSSIP_BYTES_RECEIVED.with_label_values(label_values) - .set(payload.len().to_i64()); - GOSSIP_MESSAGES_RECEIVED.with_label_values(label_values) - .inc(); - continue 'recv; - } - }; + let blocked = server.is_member_blocked(&proto.from_id); + let blocked_label = if blocked { "true" } else { "false" }; + let label_values = &[&proto.r#type.to_string(), "success", blocked_label]; - let blocked = self.server.is_member_blocked(&proto.from_id); - let blocked_label = if blocked { "true" } else { "false" }; - let label_values = &[&proto.r#type.to_string(), "success", blocked_label]; + GOSSIP_MESSAGES_RECEIVED.with_label_values(label_values) + .inc(); + GOSSIP_BYTES_RECEIVED.with_label_values(label_values) + .set(payload.len().to_i64()); - GOSSIP_MESSAGES_RECEIVED.with_label_values(label_values) - .inc(); - GOSSIP_BYTES_RECEIVED.with_label_values(label_values) - .set(payload.len().to_i64()); + if blocked { + warn!("Not processing message from {} - it is blocked", + proto.from_id); + continue 'recv; + } - if blocked { - warn!("Not processing message from {} - it is blocked", - proto.from_id); - continue 'recv; + trace_it!(GOSSIP: server, TraceKind::RecvRumor, &proto.from_id, &proto); + match proto.kind { + RumorKind::Membership(membership) => { + server.insert_member_from_rumor(membership.member, membership.health); } - - trace_it!(GOSSIP: &self.server, TraceKind::RecvRumor, &proto.from_id, &proto); - match proto.kind { - RumorKind::Membership(membership) => { - self.server - .insert_member_from_rumor(membership.member, membership.health); - } - RumorKind::Service(service) => self.server.insert_service(*service), - RumorKind::ServiceConfig(service_config) => { - self.server.insert_service_config(service_config); - } - RumorKind::ServiceFile(service_file) => { - self.server.insert_service_file(service_file); - } - RumorKind::Election(election) => { - self.server.insert_election(election); - } - RumorKind::ElectionUpdate(election) => { - self.server.insert_update_election(election); - } - RumorKind::Departure(departure) => { - self.server.insert_departure(departure); - } + RumorKind::Service(service) => server.insert_service(*service), + RumorKind::ServiceConfig(service_config) => { + server.insert_service_config(service_config); + } + RumorKind::ServiceFile(service_file) => { + server.insert_service_file(service_file); + } + RumorKind::Election(election) => { + server.insert_election_mlr(election); + } + RumorKind::ElectionUpdate(election) => { + server.insert_update_election_mlr(election); + } + RumorKind::Departure(departure) => { + server.insert_departure(departure); } } } diff --git a/components/butterfly/src/server/push.rs b/components/butterfly/src/server/push.rs index a39230f8b1..b338a10de6 100644 --- a/components/butterfly/src/server/push.rs +++ b/components/butterfly/src/server/push.rs @@ -36,330 +36,305 @@ lazy_static! { &["type", "mode"]).unwrap(); } -/// The Push server -#[derive(Debug)] -pub struct Push { - pub server: Server, - pub timing: Timing, +pub fn spawn_thread(name: String, server: Server, timing: Timing) -> std::io::Result<()> { + thread::Builder::new().name(name) + .spawn(move || run_loop(&server, &timing)) + .map(|_| ()) } -impl Push { - /// Creates a new Push instance from a Server and Timing - pub fn new(server: Server, timing: Timing) -> Push { Push { server, timing } } +/// Executes the Push thread. Gets a list of members to talk to that are not Confirmed; then +/// proceeds to process the list in `FANOUT` sized chunks. If we finish sending the messages to +/// all FANOUT targets faster than `Timing::GOSSIP_PERIOD_DEFAULT_MS`, we will block until we +/// exceed that time. +fn run_loop(server: &Server, timing: &Timing) -> ! { + loop { + habitat_common::sync::mark_thread_alive(); - /// Executes the Push thread. Gets a list of members to talk to that are not Confirmed; then - /// proceeds to process the list in `FANOUT` sized chunks. If we finish sending the messages to - /// all FANOUT targets faster than `Timing::GOSSIP_PERIOD_DEFAULT_MS`, we will block until we - /// exceed that time. - pub fn run(&mut self) { - loop { - habitat_common::sync::mark_thread_alive(); - - if self.server.paused() { - thread::sleep(Duration::from_millis(100)); - continue; - } - - self.server.update_gossip_round(); + if server.paused() { + thread::sleep(Duration::from_millis(100)); + continue; + } - let mut check_list = self.server.member_list.check_list(self.server.member_id()); - let long_wait = self.timing.gossip_timeout(); + server.update_gossip_round(); - 'fanout: loop { - let mut thread_list = Vec::with_capacity(FANOUT); - if check_list.is_empty() { - break 'fanout; - } - let drain_length = if check_list.len() >= FANOUT { - FANOUT - } else { - check_list.len() - }; - let next_gossip = self.timing.gossip_timeout(); - for member in check_list.drain(0..drain_length) { - if self.server.is_member_blocked(&member.id) { - debug!("Not sending rumors to {} - it is blocked", member.id); + let mut check_list = server.member_list.check_list_mlr(server.member_id()); + let long_wait = timing.gossip_timeout(); - continue; - } - // Unlike the SWIM mechanism, we don't actually want to send gossip traffic to - // persistent members that are confirmed dead. When the failure detector thread - // finds them alive again, we'll go ahead and get back to the business at hand. - if self.server.member_list.pingable(&member) - && !self.server.member_list.persistent_and_confirmed(&member) - { - let rumors = self.server.rumor_heat.currently_hot_rumors(&member.id); - if !rumors.is_empty() { - let sc = self.server.clone(); + 'fanout: loop { + let mut thread_list = Vec::with_capacity(FANOUT); + if check_list.is_empty() { + break 'fanout; + } + let drain_length = if check_list.len() >= FANOUT { + FANOUT + } else { + check_list.len() + }; + let next_gossip = timing.gossip_timeout(); + for member in check_list.drain(0..drain_length) { + if server.is_member_blocked(&member.id) { + debug!("Not sending rumors to {} - it is blocked", member.id); - let guard = match thread::Builder::new() - .name(String::from("push-worker")) - .spawn(move || { - PushWorker::new(sc).send_rumors(&member, &rumors); - }) { - Ok(guard) => guard, - Err(e) => { - error!("Could not spawn thread: {}", e); - continue; - } - }; - thread_list.push(guard); - } - } - } - let num_threads = thread_list.len(); - for guard in thread_list.drain(0..num_threads) { - let _ = guard.join() - .map_err(|e| error!("Push worker died: {:?}", e)); + continue; } - if SteadyTime::now() < next_gossip { - let wait_time = (next_gossip - SteadyTime::now()).num_milliseconds(); - if wait_time > 0 { - thread::sleep(Duration::from_millis(wait_time as u64)); + // Unlike the SWIM mechanism, we don't actually want to send gossip traffic to + // persistent members that are confirmed dead. When the failure detector thread + // finds them alive again, we'll go ahead and get back to the business at hand. + if server.member_list.pingable_mlr(&member) + && !server.member_list.persistent_and_confirmed_mlr(&member) + { + let rumors = server.rumor_heat.currently_hot_rumors(&member.id); + if !rumors.is_empty() { + let sc = server.clone(); + let guard = match thread::Builder::new().name(String::from("push-worker")) + .spawn(move || { + send_rumors(&sc, &member, + &rumors) + }) { + Ok(guard) => guard, + Err(e) => { + error!("Could not spawn thread: {}", e); + continue; + } + }; + thread_list.push(guard); } } } - if SteadyTime::now() < long_wait { - let wait_time = (long_wait - SteadyTime::now()).num_milliseconds(); + let num_threads = thread_list.len(); + for guard in thread_list.drain(0..num_threads) { + let _ = guard.join() + .map_err(|e| error!("Push worker died: {:?}", e)); + } + if SteadyTime::now() < next_gossip { + let wait_time = (next_gossip - SteadyTime::now()).num_milliseconds(); if wait_time > 0 { thread::sleep(Duration::from_millis(wait_time as u64)); } } } + if SteadyTime::now() < long_wait { + let wait_time = (long_wait - SteadyTime::now()).num_milliseconds(); + if wait_time > 0 { + thread::sleep(Duration::from_millis(wait_time as u64)); + } + } } } -/// A worker thread for pushing messages to a target -struct PushWorker { - pub server: Server, -} - -impl PushWorker { - /// Create a new PushWorker. - pub fn new(server: Server) -> PushWorker { PushWorker { server } } - - /// Send the list of rumors to a given member. This method creates an outbound socket and then - /// closes the connection as soon as we are done sending rumors. ZeroMQ may choose to keep the - /// connection and socket open for 1 second longer - so it is possible, but unlikely, that this - /// method can lose messages. - // If we ever need to modify this function, it would be an excellent opportunity to - // simplify the redundant aspects and remove this allow(clippy::cognitive_complexity), - // but changing it in the absence of other necessity seems like too much risk for the - // expected reward. - #[allow(clippy::cognitive_complexity)] - fn send_rumors(&self, member: &Member, rumors: &[RumorKey]) { - let socket = (**ZMQ_CONTEXT).as_mut() - .socket(zmq::PUSH) - .expect("Failure to create the ZMQ push socket"); - socket.set_linger(1000) - .expect("Failure to set the ZMQ push socket to not linger"); - socket.set_tcp_keepalive(0) - .expect("Failure to set the ZMQ push socket to not use keepalive"); - socket.set_immediate(true) - .expect("Failure to set the ZMQ push socket to immediate"); - socket.set_sndhwm(1000) - .expect("Failure to set the ZMQ push socket hwm"); - socket.set_sndtimeo(500) - .expect("Failure to set the ZMQ send timeout"); - let to_addr = format!("{}:{}", member.address, member.gossip_port); - match socket.connect(&format!("tcp://{}", to_addr)) { - Ok(()) => debug!("Connected push socket to {:?}", member), - Err(e) => { - error!("Cannot connect push socket to {:?}: {:?}", member, e); - let label_values = &["socket_connect", "failure"]; - GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); - GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); - return; - } +/// Send the list of rumors to a given member. This method creates an outbound socket and then +/// closes the connection as soon as we are done sending rumors. ZeroMQ may choose to keep the +/// connection and socket open for 1 second longer - so it is possible, but unlikely, that this +/// method can lose messages. +/// +/// # Locking +/// * `MemberList::entries` (read) This method must not be called while any MemberList::entries lock +/// is held. +// If we ever need to modify this function, it would be an excellent opportunity to +// simplify the redundant aspects and remove this allow(clippy::cognitive_complexity), +// but changing it in the absence of other necessity seems like too much risk for the +// expected reward. +#[allow(clippy::cognitive_complexity)] +fn send_rumors(server: &Server, member: &Member, rumors: &[RumorKey]) { + let socket = (**ZMQ_CONTEXT).as_mut() + .socket(zmq::PUSH) + .expect("Failure to create the ZMQ push socket"); + socket.set_linger(1000) + .expect("Failure to set the ZMQ push socket to not linger"); + socket.set_tcp_keepalive(0) + .expect("Failure to set the ZMQ push socket to not use keepalive"); + socket.set_immediate(true) + .expect("Failure to set the ZMQ push socket to immediate"); + socket.set_sndhwm(1000) + .expect("Failure to set the ZMQ push socket hwm"); + socket.set_sndtimeo(500) + .expect("Failure to set the ZMQ send timeout"); + let to_addr = format!("{}:{}", member.address, member.gossip_port); + match socket.connect(&format!("tcp://{}", to_addr)) { + Ok(()) => debug!("Connected push socket to {:?}", member), + Err(e) => { + error!("Cannot connect push socket to {:?}: {:?}", member, e); + let label_values = &["socket_connect", "failure"]; + GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); + GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); + return; } - 'rumorlist: for rumor_key in rumors.iter() { - let rumor_as_bytes = match rumor_key.kind { - RumorType::Member => { - let send_rumor = match self.create_member_rumor(&rumor_key) { - Some(rumor) => rumor, - None => continue 'rumorlist, - }; - trace_it!(GOSSIP: &self.server, - TraceKind::SendRumor, - &member.id, - &send_rumor); - match send_rumor.encode() { - Ok(bytes) => bytes, - Err(e) => { - error!("Could not write our own rumor to bytes; abandoning sending \ - rumor: {:?}", - e); - let label_values = &["member_rumor_encode", "failure"]; - GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); - GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); - continue 'rumorlist; - } + } + 'rumorlist: for rumor_key in rumors.iter() { + let rumor_as_bytes = match rumor_key.kind { + RumorType::Member => { + let send_rumor = match create_member_rumor_mlr(&server, &rumor_key) { + Some(rumor) => rumor, + None => continue 'rumorlist, + }; + trace_it!(GOSSIP: &server, + TraceKind::SendRumor, + &member.id, + &send_rumor); + match send_rumor.encode() { + Ok(bytes) => bytes, + Err(e) => { + error!("Could not write our own rumor to bytes; abandoning sending \ + rumor: {:?}", + e); + let label_values = &["member_rumor_encode", "failure"]; + GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); + GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); + continue 'rumorlist; } } - RumorType::Service => { - // trace_it!(GOSSIP: &self.server, - // TraceKind::SendRumor, - // &member.id, - // &send_rumor); - match self.server - .service_store - .encode(&rumor_key.key, &rumor_key.id) - { - Ok(bytes) => bytes, - Err(e) => { - error!("Could not write our own rumor to bytes; abandoning sending \ - rumor: {:?}", - e); - let label_values = &["service_rumor_encode", "failure"]; - GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); - GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); - continue 'rumorlist; - } + } + RumorType::Service => { + // trace_it!(GOSSIP: &server, + // TraceKind::SendRumor, + // &member.id, + // &send_rumor); + match server.service_store.encode(&rumor_key.key, &rumor_key.id) { + Ok(bytes) => bytes, + Err(e) => { + error!("Could not write our own rumor to bytes; abandoning sending \ + rumor: {:?}", + e); + let label_values = &["service_rumor_encode", "failure"]; + GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); + GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); + continue 'rumorlist; } } - RumorType::ServiceConfig => { - // trace_it!(GOSSIP: &self.server, - // TraceKind::SendRumor, - // &member.id, - // &send_rumor); - match self.server - .service_config_store - .encode(&rumor_key.key, &rumor_key.id) - { - Ok(bytes) => bytes, - Err(e) => { - error!("Could not write our own rumor to bytes; abandoning sending \ - rumor: {:?}", - e); - let label_values = &["service_config_rumor_encode", "failure"]; - GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); - GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); - continue 'rumorlist; - } + } + RumorType::ServiceConfig => { + // trace_it!(GOSSIP: &server, + // TraceKind::SendRumor, + // &member.id, + // &send_rumor); + match server.service_config_store + .encode(&rumor_key.key, &rumor_key.id) + { + Ok(bytes) => bytes, + Err(e) => { + error!("Could not write our own rumor to bytes; abandoning sending \ + rumor: {:?}", + e); + let label_values = &["service_config_rumor_encode", "failure"]; + GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); + GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); + continue 'rumorlist; } } - RumorType::ServiceFile => { - // trace_it!(GOSSIP: &self.server, - // TraceKind::SendRumor, - // &member.id, - // &send_rumor); - match self.server - .service_file_store - .encode(&rumor_key.key, &rumor_key.id) - { - Ok(bytes) => bytes, - Err(e) => { - error!("Could not write our own rumor to bytes; abandoning sending \ - rumor: {:?}", - e); - let label_values = &["service_file_rumor_encode", "failure"]; - GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); - GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); - continue 'rumorlist; - } + } + RumorType::ServiceFile => { + // trace_it!(GOSSIP: &server, + // TraceKind::SendRumor, + // &member.id, + // &send_rumor); + match server.service_file_store + .encode(&rumor_key.key, &rumor_key.id) + { + Ok(bytes) => bytes, + Err(e) => { + error!("Could not write our own rumor to bytes; abandoning sending \ + rumor: {:?}", + e); + let label_values = &["service_file_rumor_encode", "failure"]; + GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); + GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); + continue 'rumorlist; } } - RumorType::Departure => { - match self.server - .departure_store - .encode(&rumor_key.key, &rumor_key.id) - { - Ok(bytes) => bytes, - Err(e) => { - error!("Could not write our own rumor to bytes; abandoning sending \ - rumor: {:?}", - e); - let label_values = &["departure_rumor_encode", "failure"]; - GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); - GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); - continue 'rumorlist; - } + } + RumorType::Departure => { + match server.departure_store.encode(&rumor_key.key, &rumor_key.id) { + Ok(bytes) => bytes, + Err(e) => { + error!("Could not write our own rumor to bytes; abandoning sending \ + rumor: {:?}", + e); + let label_values = &["departure_rumor_encode", "failure"]; + GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); + GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); + continue 'rumorlist; } } - RumorType::Election => { - // trace_it!(GOSSIP: &self.server, - // TraceKind::SendRumor, - // &member.id, - // &send_rumor); - match self.server - .election_store - .encode(&rumor_key.key, &rumor_key.id) - { - Ok(bytes) => bytes, - Err(e) => { - error!("Could not write our own rumor to bytes; abandoning sending \ - rumor: {:?}", - e); - let label_values = &["election_rumor_encode", "failure"]; - GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); - GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); - continue 'rumorlist; - } + } + RumorType::Election => { + // trace_it!(GOSSIP: &server, + // TraceKind::SendRumor, + // &member.id, + // &send_rumor); + match server.election_store.encode(&rumor_key.key, &rumor_key.id) { + Ok(bytes) => bytes, + Err(e) => { + error!("Could not write our own rumor to bytes; abandoning sending \ + rumor: {:?}", + e); + let label_values = &["election_rumor_encode", "failure"]; + GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); + GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); + continue 'rumorlist; } } - RumorType::ElectionUpdate => { - match self.server - .update_store - .encode(&rumor_key.key, &rumor_key.id) - { - Ok(bytes) => bytes, - Err(e) => { - error!("Could not write our own rumor to bytes; abandoning sending \ - rumor: {:?}", - e); - let label_values = &["election_update_rumor_encode", "failure"]; - GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); - GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); - continue 'rumorlist; - } + } + RumorType::ElectionUpdate => { + match server.update_store.encode(&rumor_key.key, &rumor_key.id) { + Ok(bytes) => bytes, + Err(e) => { + error!("Could not write our own rumor to bytes; abandoning sending \ + rumor: {:?}", + e); + let label_values = &["election_update_rumor_encode", "failure"]; + GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); + GOSSIP_BYTES_SENT.with_label_values(label_values).set(0); + continue 'rumorlist; } } - RumorType::Fake | RumorType::Fake2 => { - debug!("You have fake rumors; how odd!"); - continue 'rumorlist; - } - }; - let rumor_len = rumor_as_bytes.len().to_i64(); - let payload = match self.server.generate_wire(rumor_as_bytes) { - Ok(payload) => payload, - Err(e) => { - error!("Generating protobuf failed: {}", e); - let label_values = &["generate_wire", "failure"]; - GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); - GOSSIP_BYTES_SENT.with_label_values(label_values) - .set(rumor_len); - continue 'rumorlist; - } - }; - match socket.send(&payload, 0) { - Ok(()) => { - GOSSIP_MESSAGES_SENT.with_label_values(&[&rumor_key.kind.to_string(), - "success"]) - .inc(); - GOSSIP_BYTES_SENT.with_label_values(&[&rumor_key.kind.to_string(), "success"]) - .set(payload.len().to_i64()); - debug!("Sent rumor {:?} to {:?}", rumor_key, member); - } - Err(e) => { - warn!("Could not send rumor to {:?} @ {:?}; ZMQ said: {:?}", - member.id, to_addr, e) - } + } + RumorType::Fake | RumorType::Fake2 => { + debug!("You have fake rumors; how odd!"); + continue 'rumorlist; + } + }; + let rumor_len = rumor_as_bytes.len().to_i64(); + let payload = match server.generate_wire(rumor_as_bytes) { + Ok(payload) => payload, + Err(e) => { + error!("Generating protobuf failed: {}", e); + let label_values = &["generate_wire", "failure"]; + GOSSIP_MESSAGES_SENT.with_label_values(label_values).inc(); + GOSSIP_BYTES_SENT.with_label_values(label_values) + .set(rumor_len); + continue 'rumorlist; + } + }; + match socket.send(&payload, 0) { + Ok(()) => { + GOSSIP_MESSAGES_SENT.with_label_values(&[&rumor_key.kind.to_string(), "success"]) + .inc(); + GOSSIP_BYTES_SENT.with_label_values(&[&rumor_key.kind.to_string(), "success"]) + .set(payload.len().to_i64()); + debug!("Sent rumor {:?} to {:?}", rumor_key, member); + } + Err(e) => { + warn!("Could not send rumor to {:?} @ {:?}; ZMQ said: {:?}", + member.id, to_addr, e) } } - self.server.rumor_heat.cool_rumors(&member.id, &rumors); } + server.rumor_heat.cool_rumors(&member.id, &rumors); +} - /// Given a rumorkey, creates a protobuf rumor for sharing. - fn create_member_rumor(&self, rumor_key: &RumorKey) -> Option { - let member = self.server.member_list.get_cloned(&rumor_key.key())?; - let payload = Membership { member, - health: self.server - .member_list - .health_of_by_id(&rumor_key.key()) - .unwrap() }; - let rumor = RumorEnvelope { r#type: RumorType::Member, - from_id: self.server.member_id().to_string(), - kind: RumorKind::Membership(payload), }; - Some(rumor) - } +/// Given a rumorkey, creates a protobuf rumor for sharing. +/// +/// # Locking +/// * `MemberList::entries` (read) This method must not be called while any MemberList::entries lock +/// is held. +fn create_member_rumor_mlr(server: &Server, rumor_key: &RumorKey) -> Option { + let member = server.member_list.get_cloned_mlr(&rumor_key.key())?; + let payload = Membership { member, + health: server.member_list + .health_of_by_id_mlr(&rumor_key.key()) + .unwrap() }; + let rumor = RumorEnvelope { r#type: RumorType::Member, + from_id: server.member_id().to_string(), + kind: RumorKind::Membership(payload), }; + Some(rumor) } diff --git a/components/butterfly/tests/common/mod.rs b/components/butterfly/tests/common/mod.rs index 460d6ae99c..7023ee1906 100644 --- a/components/butterfly/tests/common/mod.rs +++ b/components/butterfly/tests/common/mod.rs @@ -61,7 +61,7 @@ pub fn start_server(name: &str, ring_key: Option, suitability: u64) -> S Some(String::from(name)), None, Box::new(NSuitability(suitability))).unwrap(); - server.start(Timing::default()) + server.start_mlr(&Timing::default()) .expect("Cannot start server"); server } @@ -166,7 +166,10 @@ impl SwimNet { from.remove_from_block_list(to.member_id()); } - pub fn health_of(&self, from_entry: usize, to_entry: usize) -> Option { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn health_of_mlr(&self, from_entry: usize, to_entry: usize) -> Option { /// To avoid deadlocking in a test, we use `health_of_by_id_with_timeout` rather than /// `health_of_by_id`. const HEALTH_OF_TIMEOUT: Duration = Duration::from_secs(5); @@ -180,7 +183,7 @@ impl SwimNet { .expect("Asked for a network member who is out of bounds"); match from.member_list - .health_of_by_id_with_timeout(to.member_id(), HEALTH_OF_TIMEOUT) + .health_of_by_id_with_timeout_mlr(to.member_id(), HEALTH_OF_TIMEOUT) { Ok(health) => Some(health), Err(Error::UnknownMember(_)) => None, @@ -195,14 +198,17 @@ impl SwimNet { } } - pub fn network_health_of(&self, to_check: usize) -> Vec> { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn network_health_of_mlr(&self, to_check: usize) -> Vec> { let mut health_summary = Vec::with_capacity(self.members.len() - 1); let length = self.members.len(); for x in 0..length { if x == to_check { continue; } - health_summary.push(self.health_of(x, to_check)); + health_summary.push(self.health_of_mlr(x, to_check)); } health_summary } @@ -352,10 +358,17 @@ impl SwimNet { } } - pub fn wait_for_health_of(&self, from_entry: usize, to_check: usize, health: Health) -> bool { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn wait_for_health_of_mlr(&self, + from_entry: usize, + to_check: usize, + health: Health) + -> bool { let rounds_in = self.rounds_in(self.max_rounds()); loop { - if let Some(real_health) = self.health_of(from_entry, to_check) { + if let Some(real_health) = self.health_of_mlr(from_entry, to_check) { if real_health == health { trace_it!(TEST: &self.members[from_entry], format!("Health {} {} as {}", self.members[to_check].name(), self.members[to_check].member_id(), health)); return true; @@ -371,10 +384,13 @@ impl SwimNet { } } - pub fn wait_for_network_health_of(&self, to_check: usize, health: Health) -> bool { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + pub fn wait_for_network_health_of_mlr(&self, to_check: usize, health: Health) -> bool { let rounds_in = self.rounds_in(self.max_rounds()); loop { - let network_health = self.network_health_of(to_check); + let network_health = self.network_health_of_mlr(to_check); if network_health.iter().all(|&x| x == Some(health)) { trace_it!(TEST_NET: self, format!("Health {} {} as {}", @@ -453,7 +469,7 @@ impl SwimNet { #[macro_export] macro_rules! assert_health_of { ($network:expr, $to:expr, $health:expr) => { - assert!($network.network_health_of($to) + assert!($network.network_health_of_mlr($to) .into_iter() .all(|x| x == $health), "Member {} does not always have health {}", @@ -470,7 +486,7 @@ macro_rules! assert_health_of { } #[macro_export] -macro_rules! assert_wait_for_health_of { +macro_rules! assert_wait_for_health_of_mlr { ($network:expr,[$from:expr, $to:expr], $health:expr) => { let left: Vec = $from.collect(); let right: Vec = $to.collect(); @@ -479,12 +495,12 @@ macro_rules! assert_wait_for_health_of { if l == r { continue; } - assert!($network.wait_for_health_of(*l, *r, $health), + assert!($network.wait_for_health_of_mlr(*l, *r, $health), "Member {} does not see {} as {}", l, r, $health); - assert!($network.wait_for_health_of(*r, *l, $health), + assert!($network.wait_for_health_of_mlr(*r, *l, $health), "Member {} does not see {} as {}", r, l, @@ -493,13 +509,13 @@ macro_rules! assert_wait_for_health_of { } }; ($network:expr, $to:expr, $health:expr) => { - assert!($network.wait_for_network_health_of($to, $health), + assert!($network.wait_for_network_health_of_mlr($to, $health), "Member {} does not always have health {}", $to, $health); }; ($network:expr, $from:expr, $to:expr, $health:expr) => { - assert!($network.wait_for_health_of($from, $to, $health), + assert!($network.wait_for_health_of_mlr($from, $to, $health), "Member {} does not see {} as {}", $from, $to, diff --git a/components/butterfly/tests/encryption/mod.rs b/components/butterfly/tests/encryption/mod.rs index 1d52a3faba..97a95df1fa 100644 --- a/components/butterfly/tests/encryption/mod.rs +++ b/components/butterfly/tests/encryption/mod.rs @@ -9,7 +9,7 @@ fn symmetric_encryption_of_wire_payloads() { memory symkey"); let mut net = btest::SwimNet::new_ring_encryption(2, &ring_key); net.connect(0, 1); - assert_wait_for_health_of!(net, [0..2, 0..2], Health::Alive); + assert_wait_for_health_of_mlr!(net, [0..2, 0..2], Health::Alive); net.add_service(0, "core/beast/1.2.3/20161208121212"); net.wait_for_gossip_rounds(2); assert!(net[1].service_store diff --git a/components/butterfly/tests/integration.rs b/components/butterfly/tests/integration.rs index 6afd99b11d..b56a7322e7 100644 --- a/components/butterfly/tests/integration.rs +++ b/components/butterfly/tests/integration.rs @@ -12,13 +12,13 @@ use habitat_butterfly::{self, fn two_members_meshed_confirm_one_member() { let mut net = btest::SwimNet::new(2); net.mesh(); - assert_wait_for_health_of!(net, 0, 1, Health::Alive); - assert_wait_for_health_of!(net, 1, 0, Health::Alive); + assert_wait_for_health_of_mlr!(net, 0, 1, Health::Alive); + assert_wait_for_health_of_mlr!(net, 1, 0, Health::Alive); trace_it!(TEST: &net[0], "Paused"); net[0].pause(); - assert_wait_for_health_of!(net, 1, 0, Health::Suspect); - assert_wait_for_health_of!(net, 1, 0, Health::Confirmed); + assert_wait_for_health_of_mlr!(net, 1, 0, Health::Suspect); + assert_wait_for_health_of_mlr!(net, 1, 0, Health::Confirmed); } #[test] @@ -27,7 +27,7 @@ fn six_members_meshed_confirm_one_member() { net.mesh(); trace_it!(TEST: &net[0], "Paused"); net[0].pause(); - assert_wait_for_health_of!(net, 0, Health::Confirmed); + assert_wait_for_health_of_mlr!(net, 0, Health::Confirmed); } #[test] @@ -37,16 +37,16 @@ fn six_members_meshed_partition_one_node_from_another_node_remains_alive() { net.mesh(); net.block(0, 1); net.wait_for_rounds(2); - assert_wait_for_health_of!(net, 1, Health::Alive); + assert_wait_for_health_of_mlr!(net, 1, Health::Alive); } #[test] fn six_members_meshed_partition_half_of_nodes_from_each_other_both_sides_confirmed() { let mut net = btest::SwimNet::new(6); net.mesh(); - assert_wait_for_health_of!(net, 0, Health::Alive); + assert_wait_for_health_of_mlr!(net, 0, Health::Alive); net.partition(0..3, 3..6); - assert_wait_for_health_of!(net, [0..3, 3..6], Health::Confirmed); + assert_wait_for_health_of_mlr!(net, [0..3, 3..6], Health::Confirmed); } #[test] @@ -57,7 +57,7 @@ fn six_members_unmeshed_become_fully_meshed_via_gossip() { net.connect(2, 3); net.connect(3, 4); net.connect(4, 5); - assert_wait_for_health_of!(net, [0..6, 0..6], Health::Alive); + assert_wait_for_health_of_mlr!(net, [0..6, 0..6], Health::Alive); } #[test] @@ -68,9 +68,9 @@ fn six_members_unmeshed_confirm_one_member() { net.connect(2, 3); net.connect(3, 4); net.connect(4, 5); - assert_wait_for_health_of!(net, [0..6, 0..6], Health::Alive); + assert_wait_for_health_of_mlr!(net, [0..6, 0..6], Health::Alive); net[0].pause(); - assert_wait_for_health_of!(net, 0, Health::Confirmed); + assert_wait_for_health_of_mlr!(net, 0, Health::Confirmed); } #[test] @@ -81,11 +81,11 @@ fn six_members_unmeshed_partition_and_rejoin_no_persistent_peers() { net.connect(2, 3); net.connect(3, 4); net.connect(4, 5); - assert_wait_for_health_of!(net, [0..6, 0..6], Health::Alive); + assert_wait_for_health_of_mlr!(net, [0..6, 0..6], Health::Alive); net.partition(0..3, 3..6); - assert_wait_for_health_of!(net, [0..3, 3..6], Health::Confirmed); + assert_wait_for_health_of_mlr!(net, [0..3, 3..6], Health::Confirmed); net.unpartition(0..3, 3..6); - assert_wait_for_health_of!(net, [0..3, 3..6], Health::Confirmed); + assert_wait_for_health_of_mlr!(net, [0..3, 3..6], Health::Confirmed); } #[test] @@ -104,11 +104,11 @@ fn six_members_unmeshed_partition_and_rejoin_persistent_peers() { net.connect(2, 3); net.connect(3, 4); net.connect(4, 5); - assert_wait_for_health_of!(net, [0..6, 0..6], Health::Alive); + assert_wait_for_health_of_mlr!(net, [0..6, 0..6], Health::Alive); net.partition(0..3, 3..6); - assert_wait_for_health_of!(net, [0..3, 3..6], Health::Confirmed); + assert_wait_for_health_of_mlr!(net, [0..3, 3..6], Health::Confirmed); net.unpartition(0..3, 3..6); - assert_wait_for_health_of!(net, [0..3, 3..6], Health::Alive); + assert_wait_for_health_of_mlr!(net, [0..3, 3..6], Health::Alive); } #[test] @@ -119,12 +119,12 @@ fn six_members_unmeshed_allows_graceful_departure() { net.connect(2, 3); net.connect(3, 4); net.connect(4, 5); - assert_wait_for_health_of!(net, [0..6, 0..6], Health::Alive); + assert_wait_for_health_of_mlr!(net, [0..6, 0..6], Health::Alive); trace_it!(TEST: &net[0], "Departing"); - net[0].set_departed(); + net[0].set_departed_mlw(); trace_it!(TEST: &net[0], "Paused"); net[0].pause(); - assert_wait_for_health_of!(net, 0, Health::Departed); + assert_wait_for_health_of_mlr!(net, 0, Health::Departed); } #[test] @@ -132,5 +132,5 @@ fn ten_members_meshed_confirm_one_member() { let mut net = btest::SwimNet::new(10); net.mesh(); net[0].pause(); - assert_wait_for_health_of!(net, 0, Health::Confirmed); + assert_wait_for_health_of_mlr!(net, 0, Health::Confirmed); } diff --git a/components/butterfly/tests/rumor/departure.rs b/components/butterfly/tests/rumor/departure.rs index 586816aa75..8cf813dbf2 100644 --- a/components/butterfly/tests/rumor/departure.rs +++ b/components/butterfly/tests/rumor/departure.rs @@ -27,5 +27,5 @@ fn departure_via_client() { net.wait_for_gossip_rounds(1); assert!(net[2].departure_store .contains_rumor("departure", net[1].member_id())); - assert_wait_for_health_of!(net, 1, Health::Departed); + assert_wait_for_health_of_mlr!(net, 1, Health::Departed); } diff --git a/components/butterfly/tests/rumor/election.rs b/components/butterfly/tests/rumor/election.rs index be555427b0..4103714fde 100644 --- a/components/butterfly/tests/rumor/election.rs +++ b/components/butterfly/tests/rumor/election.rs @@ -60,11 +60,11 @@ fn five_members_elect_a_new_leader_when_the_old_one_dies() { } net[paused].pause(); let paused_id = net[paused].member_id(); - assert_wait_for_health_of!(net, paused, Health::Confirmed); + assert_wait_for_health_of_mlr!(net, paused, Health::Confirmed); if paused == 0 { - net[1].restart_elections(FeatureFlag::empty()); + net[1].restart_elections_mlr(FeatureFlag::empty()); } else { - net[0].restart_elections(FeatureFlag::empty()); + net[0].restart_elections_mlr(FeatureFlag::empty()); } for i in 0..5 { @@ -107,7 +107,7 @@ fn five_members_elect_a_new_leader_when_they_are_quorum_partitioned() { net.connect(1, 2); net.connect(2, 3); net.connect(3, 4); - assert_wait_for_health_of!(net, [0..5, 0..5], Health::Alive); + assert_wait_for_health_of_mlr!(net, [0..5, 0..5], Health::Alive); assert_wait_for_election_status!(net, [0..5], "witcher.prod", ElectionStatus::Finished); assert_wait_for_equal_election!(net, [0..5, 0..5], "witcher.prod"); @@ -129,9 +129,9 @@ fn five_members_elect_a_new_leader_when_they_are_quorum_partitioned() { let mut new_leader_id = String::from(""); net.partition(0..2, 2..5); - assert_wait_for_health_of!(net, [0..2, 2..5], Health::Confirmed); - net[0].restart_elections(FeatureFlag::empty()); - net[4].restart_elections(FeatureFlag::empty()); + assert_wait_for_health_of_mlr!(net, [0..2, 2..5], Health::Confirmed); + net[0].restart_elections_mlr(FeatureFlag::empty()); + net[4].restart_elections_mlr(FeatureFlag::empty()); assert_wait_for_election_status!(net, 0, "witcher.prod", ElectionStatus::NoQuorum); assert_wait_for_election_status!(net, 1, "witcher.prod", ElectionStatus::NoQuorum); assert_wait_for_election_status!(net, 2, "witcher.prod", ElectionStatus::Finished); @@ -150,7 +150,7 @@ fn five_members_elect_a_new_leader_when_they_are_quorum_partitioned() { assert!(leader_id != new_leader_id); println!("Leader {} New {}", leader_id, new_leader_id); net.unpartition(0..2, 2..5); - assert_wait_for_health_of!(net, [0..5, 0..5], Health::Alive); + assert_wait_for_health_of_mlr!(net, [0..5, 0..5], Health::Alive); assert_wait_for_election_status!(net, 0, "witcher.prod", ElectionStatus::Finished); assert_wait_for_election_status!(net, 1, "witcher.prod", ElectionStatus::Finished); diff --git a/components/core/src/os/process/windows_child.rs b/components/core/src/os/process/windows_child.rs index 71aaf9b78c..eb0689829d 100644 --- a/components/core/src/os/process/windows_child.rs +++ b/components/core/src/os/process/windows_child.rs @@ -1133,7 +1133,7 @@ fn null_stdio_handle() -> Result { Ok(File::open(Path::new("NUL"), &opts).map(File::into_handle)?) } -unsafe fn read_to_end_uninitialized(r: &mut Read, buf: &mut Vec) -> io::Result { +unsafe fn read_to_end_uninitialized(r: &mut impl Read, buf: &mut Vec) -> io::Result { let start_len = buf.len(); buf.reserve(16); diff --git a/components/sup/src/census.rs b/components/sup/src/census.rs index d1b0c7db4a..817d58ea84 100644 --- a/components/sup/src/census.rs +++ b/components/sup/src/census.rs @@ -1,7 +1,8 @@ use crate::error::Error; use habitat_butterfly::{member::{Health, Member, - MemberList}, + MemberList, + Membership}, rumor::{election::{Election as ElectionRumor, ElectionStatus as ElectionStatusRumor, ElectionUpdate as ElectionUpdateRumor}, @@ -112,6 +113,10 @@ impl CensusRing { /// /// (Butterfly provides the health, the ServiceRumors provide the /// rest). + /// + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. fn populate_census(&mut self, service_rumors: &RumorStore, member_list: &MemberList) { @@ -136,15 +141,16 @@ impl CensusRing { } }); - member_list.with_members(|member| { - let health = member_list.health_of(&member).unwrap(); + member_list.with_memberships_mlr(|Membership { member, health }| { for group in self.census_groups.values_mut() { if let Some(census_member) = group.find_member_mut(&member.id) { census_member.update_from_member(&member); census_member.update_from_health(health); } } - }); + Ok(()) + }) + .ok(); } fn update_from_election_store(&mut self, election_rumors: &RumorStore) { diff --git a/components/sup/src/main.rs b/components/sup/src/main.rs index 506936123e..5625ff7409 100644 --- a/components/sup/src/main.rs +++ b/components/sup/src/main.rs @@ -78,7 +78,7 @@ fn main() { logger::init(); let mut ui = UI::default_with_env(); let flags = FeatureFlag::from_env(&mut ui); - let result = start(flags); + let result = start_mlr(flags); let exit_code = match result { Ok(_) => 0, Err(ref err) => { @@ -109,7 +109,10 @@ fn boot() -> Option { } } -fn start(feature_flags: FeatureFlag) -> Result<()> { +/// # Locking +/// * `MemberList::entries` (read) This method must not be called while any MemberList::entries lock +/// is held. +fn start_mlr(feature_flags: FeatureFlag) -> Result<()> { if feature_flags.contains(FeatureFlag::TEST_BOOT_FAIL) { outputln!("Simulating boot failure"); return Err(Error::TestBootFail); @@ -139,7 +142,7 @@ fn start(feature_flags: FeatureFlag) -> Result<()> { ("bash", Some(_)) => sub_bash(), ("run", Some(m)) => { let launcher = launcher.ok_or(Error::NoLauncher)?; - sub_run(m, launcher, feature_flags) + sub_run_mlw_imlw(m, launcher, feature_flags) } ("sh", Some(_)) => sub_sh(), ("term", Some(_)) => sub_term(), @@ -149,11 +152,19 @@ fn start(feature_flags: FeatureFlag) -> Result<()> { fn sub_bash() -> Result<()> { command::shell::bash() } -fn sub_run(m: &ArgMatches, launcher: LauncherCli, feature_flags: FeatureFlag) -> Result<()> { +/// # Locking +/// * `MemberList::entries` (write) This method must not be called while any MemberList::entries +/// lock is held. +/// * `MemberList::intitial_entries` (write) This method must not be called while any +/// MemberList::intitial_entries lock is held. +fn sub_run_mlw_imlw(m: &ArgMatches, + launcher: LauncherCli, + feature_flags: FeatureFlag) + -> Result<()> { set_supervisor_logging_options(m); let cfg = mgrcfg_from_sup_run_matches(m, feature_flags)?; - let manager = Manager::load(cfg, launcher)?; + let manager = Manager::load_imlw(cfg, launcher)?; // We need to determine if we have an initial service to start let svc = if let Some(pkg) = m.value_of("PKG_IDENT_OR_ARTIFACT") { @@ -186,7 +197,7 @@ fn sub_run(m: &ArgMatches, launcher: LauncherCli, feature_flags: FeatureFlag) -> } else { None }; - manager.run(svc) + manager.run_mlw_imlw(svc) } fn sub_sh() -> Result<()> { command::shell::sh() } diff --git a/components/sup/src/manager/mod.rs b/components/sup/src/manager/mod.rs index fd20c82b33..ed5d2ade62 100644 --- a/components/sup/src/manager/mod.rs +++ b/components/sup/src/manager/mod.rs @@ -423,7 +423,11 @@ impl Manager { /// /// The returned Manager will be pre-populated with any cached data from disk from a previous /// run if available. - pub fn load(cfg: ManagerConfig, launcher: LauncherCli) -> Result { + /// + /// # Locking + /// * `MemberList::intitial_entries` (write) This method must not be called while any + /// MemberList::intitial_entries lock is held. + pub fn load_imlw(cfg: ManagerConfig, launcher: LauncherCli) -> Result { let state_path = cfg.sup_root(); let fs_cfg = FsCfg::new(state_path); Self::create_state_path_dirs(&fs_cfg)?; @@ -433,7 +437,7 @@ impl Manager { } obtain_process_lock(&fs_cfg)?; - Self::new(cfg, fs_cfg, launcher) + Self::new_imlw(cfg, fs_cfg, launcher) } pub fn term(proc_lock_file: &Path) -> Result<()> { @@ -452,7 +456,10 @@ impl Manager { } } - fn new(cfg: ManagerConfig, fs_cfg: FsCfg, launcher: LauncherCli) -> Result { + /// # Locking + /// * `MemberList::intitial_entries` (write) This method must not be called while any + /// MemberList::intitial_entries lock is held. + fn new_imlw(cfg: ManagerConfig, fs_cfg: FsCfg, launcher: LauncherCli) -> Result { debug!("new(cfg: {:?}, fs_cfg: {:?}", cfg, fs_cfg); let current = PackageIdent::from_str(&format!("{}/{}", SUP_PKG_IDENT, VERSION)).unwrap(); outputln!("{} ({})", SUP_PKG_IDENT, current); @@ -488,7 +495,7 @@ impl Manager { peer.address = format!("{}", peer_addr.ip()); peer.swim_port = peer_addr.port(); peer.gossip_port = peer_addr.port(); - server.member_list.add_initial_member(peer); + server.member_list.add_initial_member_imlw(peer); } let peer_watcher = if let Some(path) = cfg.watch_peer_file { @@ -705,8 +712,13 @@ impl Manager { // simplify the redundant aspects and remove this allow(clippy::cognitive_complexity), // but changing it in the absence of other necessity seems like too much risk for the // expected reward. + /// # Locking + /// * `MemberList::entries` (write) This method must not be called while any MemberList::entries + /// lock is held. + /// * `MemberList::intitial_entries` (write) This method must not be called while any + /// MemberList::intitial_entries lock is held. #[allow(clippy::cognitive_complexity)] - pub fn run(mut self, svc: Option) -> Result<()> { + pub fn run_mlw_imlw(mut self, svc: Option) -> Result<()> { let main_hist = RUN_LOOP_DURATION.with_label_values(&["sup"]); let service_hist = RUN_LOOP_DURATION.with_label_values(&["service"]); let mut next_cpu_measurement = SteadyTime::now(); @@ -747,9 +759,9 @@ impl Manager { outputln!("Starting gossip-listener on {}", self.butterfly.gossip_addr()); - self.butterfly.start(Timing::default())?; + self.butterfly.start_mlr(&Timing::default())?; debug!("gossip-listener started"); - self.persist_state(); + self.persist_state_mlr(); let http_listen_addr = self.sys.http_listen(); let ctl_listen_addr = self.sys.ctl_listen(); let ctl_secret_key = ctl_gateway::readgen_secret_key(&self.fs_cfg.sup_root)?; @@ -976,14 +988,14 @@ impl Manager { self.maybe_spawn_service_futures(&mut runtime); } - self.update_peers_from_watch_file()?; + self.update_peers_from_watch_file_mlr_imlw()?; self.update_running_services_from_user_config_watcher(); for f in self.stop_services_with_updates() { runtime.spawn(f); } - self.restart_elections(self.feature_flags); + self.restart_elections_mlr(self.feature_flags); self.census_ring .update_from_rumors(&self.state.cfg.cache_key_path, &self.butterfly.service_store, @@ -994,11 +1006,11 @@ impl Manager { &self.butterfly.service_file_store); if self.check_for_changed_services() { - self.persist_state(); + self.persist_state_mlr(); } if self.census_ring.changed() { - self.persist_state(); + self.persist_state_mlr(); } for service in self.state @@ -1053,7 +1065,7 @@ impl Manager { ShutdownMode::Restarting => {} ShutdownMode::Normal | ShutdownMode::Departed => { outputln!("Gracefully departing from butterfly network."); - self.butterfly.set_departed(); + self.butterfly.set_departed_mlw(); let mut svcs = self.state .services @@ -1188,11 +1200,14 @@ impl Manager { } } - fn persist_state(&self) { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + fn persist_state_mlr(&self) { debug!("Updating census state"); self.persist_census_state(); debug!("Updating butterfly state"); - self.persist_butterfly_state(); + self.persist_butterfly_state_mlr(); debug!("Updating services state"); self.persist_services_state(); } @@ -1207,7 +1222,10 @@ impl Manager { .census_data = json; } - fn persist_butterfly_state(&self) { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + fn persist_butterfly_state_mlr(&self) { let bs = ServerProxy::new(&self.butterfly); let json = serde_json::to_string(&bs).unwrap(); self.state @@ -1267,8 +1285,8 @@ impl Manager { } /// Check if any elections need restarting. - fn restart_elections(&mut self, feature_flags: FeatureFlag) { - self.butterfly.restart_elections(feature_flags); + fn restart_elections_mlr(&mut self, feature_flags: FeatureFlag) { + self.butterfly.restart_elections_mlr(feature_flags); } /// Create a future for stopping a Service. The Service is assumed @@ -1541,8 +1559,13 @@ impl Manager { .collect() } - fn update_peers_from_watch_file(&mut self) -> Result<()> { - if !self.butterfly.need_peer_seeding() { + /// # Locking + /// * `MemberList::entries` (read) This method must not be called while any MemberList::entries + /// lock is held. + /// * `MemberList::intitial_entries` (write) This method must not be called while any + /// MemberList::intitial_entries lock is held. + fn update_peers_from_watch_file_mlr_imlw(&mut self) -> Result<()> { + if !self.butterfly.need_peer_seeding_mlr() { return Ok(()); } match self.peer_watcher { @@ -1550,7 +1573,7 @@ impl Manager { Some(ref watcher) => { if watcher.has_fs_events() { let members = watcher.get_members()?; - self.butterfly.member_list.set_initial_members(members); + self.butterfly.member_list.set_initial_members_imlw(members); } Ok(()) }