Skip to content

Commit

Permalink
Merge pull request #6662 from habitat-sh/remove-recusive-memberlist-l…
Browse files Browse the repository at this point in the history
…ocking

Remove recusive memberlist locking
  • Loading branch information
baumanj committed Jun 20, 2019
2 parents 25cf396 + 3dec16a commit a3380d9
Show file tree
Hide file tree
Showing 19 changed files with 1,344 additions and 1,130 deletions.
2 changes: 1 addition & 1 deletion .expeditor/verify.pipeline.yml
Expand Up @@ -657,7 +657,7 @@ steps:
shell: [ "powershell", "-Command" ]
always-pull: true
propagate-environment: true
timeout_in_minutes: 5
timeout_in_minutes: 20
retry:
automatic:
limit: 1
Expand Down
4 changes: 2 additions & 2 deletions components/butterfly/src/main.rs
Expand Up @@ -52,10 +52,10 @@ fn main() {
member.address = format!("{}", addr.ip());
member.swim_port = addr.port();
member.gossip_port = addr.port();
server.member_list.add_initial_member(member);
server.member_list.add_initial_member_imlw(member);
}

server.start(server::timing::Timing::default())
server.start_mlr(&server::timing::Timing::default())
.expect("Cannot start server");
loop {
println!("{:#?}", server.member_list);
Expand Down
351 changes: 230 additions & 121 deletions components/butterfly/src/member.rs

Large diffs are not rendered by default.

26 changes: 18 additions & 8 deletions components/butterfly/src/rumor/dat_file.rs
Expand Up @@ -57,7 +57,10 @@ impl DatFile {

pub fn path(&self) -> &Path { &self.path }

pub fn read_into(&mut self, server: &Server) -> Result<()> {
/// # Locking
/// * `MemberList::entries` (read) This method must not be called while any MemberList::entries
/// lock is held.
pub fn read_into_mlr(&mut self, server: &Server) -> Result<()> {
let mut version = [0; 1];
let mut size_buf = [0; 8];
// JW: Resizing this buffer is terrible for performance, but it's the easiest way to
Expand Down Expand Up @@ -164,7 +167,7 @@ impl DatFile {
reader.read_exact(&mut rumor_buf)
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
let rumor = Election::from_bytes(&rumor_buf)?;
server.insert_election(rumor);
server.insert_election_mlr(rumor);
bytes_read += size_buf.len() as u64 + rumor_size;
}

Expand All @@ -182,7 +185,7 @@ impl DatFile {
reader.read_exact(&mut rumor_buf)
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
let rumor = ElectionUpdate::from_bytes(&rumor_buf)?;
server.insert_update_election(rumor);
server.insert_update_election_mlr(rumor);
bytes_read += size_buf.len() as u64 + rumor_size;
}

Expand All @@ -209,14 +212,17 @@ impl DatFile {
Ok(())
}

/// # Locking
/// * `MemberList::entries` (read) This method must not be called while any MemberList::entries
/// lock is held.
pub fn write(&self, server: &Server) -> Result<usize> {
let mut header = Header::default();
let w =
AtomicWriter::new(&self.path).map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
w.with_writer(|mut f| {
let mut writer = BufWriter::new(&mut f);
self.init(&mut writer)?;
header.member_len = self.write_member_list(&mut writer, &server.member_list)?;
header.member_len = self.write_member_list_mlr(&mut writer, &server.member_list)?;
header.service_len = self.write_rumor_store(&mut writer, &server.service_store)?;
header.service_config_len =
self.write_rumor_store(&mut writer, &server.service_config_store)?;
Expand Down Expand Up @@ -281,11 +287,15 @@ impl DatFile {
Ok(total)
}

fn write_member_list<W>(&self, writer: &mut W, member_list: &MemberList) -> Result<u64>
where W: Write
{
/// # Locking
/// * `MemberList::entries` (read) This method must not be called while any MemberList::entries
/// lock is held.
fn write_member_list_mlr(&self,
writer: &mut impl Write,
member_list: &MemberList)
-> Result<u64> {
let mut total = 0;
member_list.with_memberships(|membership| {
member_list.with_memberships_mlr(|membership| {
total += self.write_member(writer, &membership)?;
Ok(total)
})
Expand Down
61 changes: 27 additions & 34 deletions components/butterfly/src/server/expire.rs
Expand Up @@ -12,42 +12,35 @@ use crate::{rumor::{RumorKey,

const LOOP_DELAY_MS: u64 = 500;

pub struct Expire {
pub server: Server,
pub timing: Timing,
pub fn spawn_thread(name: String, server: Server, timing: Timing) -> std::io::Result<()> {
thread::Builder::new().name(name)
.spawn(move || run_loop(&server, &timing))
.map(|_| ())
}

impl Expire {
pub fn new(server: Server, timing: Timing) -> Expire { Expire { server, timing } }

pub fn run(&self) {
loop {
habitat_common::sync::mark_thread_alive();

let newly_confirmed_members =
self.server
.member_list
.members_expired_to_confirmed(self.timing.suspicion_timeout_duration());

for id in newly_confirmed_members {
self.server
.rumor_heat
.start_hot_rumor(RumorKey::new(RumorType::Member, &id, ""));
}

let newly_departed_members =
self.server
.member_list
.members_expired_to_departed(self.timing.departure_timeout_duration());

for id in newly_departed_members {
self.server.rumor_heat.purge(&id);
self.server
.rumor_heat
.start_hot_rumor(RumorKey::new(RumorType::Member, &id, ""));
}

thread::sleep(Duration::from_millis(LOOP_DELAY_MS));
fn run_loop(server: &Server, timing: &Timing) -> ! {
loop {
habitat_common::sync::mark_thread_alive();

let newly_confirmed_members =
server.member_list
.members_expired_to_confirmed_mlw(timing.suspicion_timeout_duration());

for id in newly_confirmed_members {
server.rumor_heat
.start_hot_rumor(RumorKey::new(RumorType::Member, &id, ""));
}

let newly_departed_members =
server.member_list
.members_expired_to_departed_mlw(timing.departure_timeout_duration());

for id in newly_departed_members {
server.rumor_heat.purge(&id);
server.rumor_heat
.start_hot_rumor(RumorKey::new(RumorType::Member, &id, ""));
}

thread::sleep(Duration::from_millis(LOOP_DELAY_MS));
}
}

0 comments on commit a3380d9

Please sign in to comment.