Skip to content

Commit

Permalink
fix: stabilization predecessor notify logic (#542)
Browse files Browse the repository at this point in the history
* Fix stabilization predecessor notify logic

* Fix stabilization test node order

* Expand messages in desc order stabilization

* Use random node in 1_3_2 test

* Fix wait for msgs in tests

* Update docs of notify

* Fix clippy warning

* Fix Cargo.lock after merge
  • Loading branch information
Ma233 committed Feb 1, 2024
1 parent 7b61351 commit b7bce92
Show file tree
Hide file tree
Showing 9 changed files with 391 additions and 84 deletions.
69 changes: 66 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ web-sys = { version = "0.3.56", optional = true, features = [
] }

[dev-dependencies]
pretty_assertions = "1.4.0"
tracing-subscriber = { version = "0.3.15", features = ["ansi"] }
tracing-test = "0.2.4"
tracing-wasm = "0.2.1"
wasm-bindgen-test = "0.3.0"

Expand Down
15 changes: 8 additions & 7 deletions crates/core/src/dht/chord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,27 +309,28 @@ impl Chord<PeerRingAction> for PeerRing {
succ
}

/// Handle notification from a node that thinks it is the predecessor of current node.
/// The `did` in parameters is the Did of that node.
/// Handle notification from a node that thinks a did is the predecessor of current node.
/// The `did` in parameters is the Did of that predecessor.
/// If that node is closer to current node or current node has no predecessor, set it to the did.
/// This method will return that did if it is set to the predecessor.
fn notify(&self, did: Did) -> Result<Option<Did>> {
/// This method will return current predecessor after setting.
fn notify(&self, did: Did) -> Result<Did> {
let mut predecessor = self.lock_predecessor()?;

match *predecessor {
Some(pre) => {
// If the did is closer to self than predecessor, set it to the predecessor.
// Otherwise tell the real predecessor back.
if self.bias(pre) < self.bias(did) {
*predecessor = Some(did);
Ok(Some(did))
Ok(did)
} else {
Ok(None)
Ok(pre)
}
}
None => {
// Self has no predecessor, set it to the did directly.
*predecessor = Some(did);
Ok(Some(did))
Ok(did)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/dht/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ pub trait Chord<Action> {

/// Notify the DHT that a node is its predecessor.
/// According to the paper, this method should be called periodically.
fn notify(&self, did: Did) -> Result<Option<Did>>;
/// This method should return the predecessor after updating.
fn notify(&self, did: Did) -> Result<Did>;

/// Fix finger table by finding the successor for each finger.
/// According to the paper, this method should be called periodically.
Expand Down
62 changes: 51 additions & 11 deletions crates/core/src/message/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,6 @@ impl MessageHandler {
) -> Result<Vec<MessageHandlerEvent>> {
let message: Message = payload.transaction.data()?;

#[cfg(test)]
{
println!("{} got msg {}", self.dht.did, &message);
}
tracing::debug!(
"START HANDLE MESSAGE: {} {}",
&payload.transaction.tx_id,
Expand Down Expand Up @@ -146,6 +142,7 @@ impl MessageHandler {
#[cfg(not(feature = "wasm"))]
#[cfg(test)]
pub mod tests {
use dashmap::DashMap;
use futures::lock::Mutex;
use tokio::time::sleep;
use tokio::time::Duration;
Expand Down Expand Up @@ -280,13 +277,56 @@ pub mod tests {
}

pub async fn wait_for_msgs(node1: &Swarm, node2: &Swarm, node3: &Swarm) {
loop {
tokio::select! {
_ = node1.listen_once() => {}
_ = node2.listen_once() => {}
_ = node3.listen_once() => {}
_ = sleep(Duration::from_secs(3)) => break
let did_names: DashMap<Did, &str, _> = DashMap::new();
did_names.insert(node1.did(), "node1");
did_names.insert(node2.did(), "node2");
did_names.insert(node3.did(), "node3");

let listen1 = async {
loop {
tokio::select! {
Some((payload, _)) = node1.listen_once() => {
println!(
"Msg {} => node1 : {:?}",
*did_names.get(&payload.signer()).unwrap(),
payload.transaction.data::<Message>().unwrap()
)
}
_ = sleep(Duration::from_secs(3)) => break
}
}
}
};

let listen2 = async {
loop {
tokio::select! {
Some((payload, _)) = node2.listen_once() => {
println!(
"Msg {} => node2 : {:?}",
*did_names.get(&payload.signer()).unwrap(),
payload.transaction.data::<Message>().unwrap()
)
}
_ = sleep(Duration::from_secs(3)) => break
}
}
};

let listen3 = async {
loop {
tokio::select! {
Some((payload, _)) = node3.listen_once() => {
println!(
"Msg {} => node3 : {:?}",
*did_names.get(&payload.signer()).unwrap(),
payload.transaction.data::<Message>().unwrap()
)
}
_ = sleep(Duration::from_secs(3)) => break
}
}
};

futures::join!(listen1, listen2, listen3);
}
}

0 comments on commit b7bce92

Please sign in to comment.