Skip to content

Commit

Permalink
[refactor]: Refactor and add tests for topology
Browse files Browse the repository at this point in the history
Signed-off-by: Shanin Roman <shanin1000@yandex.ru>
  • Loading branch information
Erigara authored and appetrosyan committed May 19, 2023
1 parent f4a7b41 commit 5afcdef
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 39 deletions.
9 changes: 1 addition & 8 deletions core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,7 @@ impl Sumeragi {
.collect::<Vec<PublicKey>>(),
);
topology.rotate_set_a();
topology.update_peer_list(
&self
.wsv
.peers_ids()
.iter()
.map(|id| id.clone())
.collect::<Vec<PeerId>>(),
);
topology.update_peer_list(self.wsv.peers_ids().iter().map(|id| id.clone()).collect());
self.current_topology = topology;
self.connect_peers(&self.current_topology);
}
Expand Down
113 changes: 82 additions & 31 deletions core/src/sumeragi/network_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
clippy::arithmetic_side_effects
)]

use std::collections::HashSet;

use derive_more::Display;
use iroha_crypto::{PublicKey, SignatureOf};
use iroha_data_model::prelude::PeerId;
Expand Down Expand Up @@ -128,47 +130,23 @@ impl Topology {
&self.sorted_peers[self.min_votes_for_commit()]
}
/// Add or remove peers from the topology.
pub fn update_peer_list(&mut self, new_peer_list: &[PeerId]) {
let mut i = 0;
while i < self.sorted_peers.len() {
if new_peer_list.iter().any(|p| p == &self.sorted_peers[i]) {
i += 1;
} else {
self.sorted_peers.remove(i);
}
}
self.sorted_peers.extend(
new_peer_list
.iter()
.filter(|p| !self.sorted_peers.contains(p))
.cloned()
.collect::<Vec<PeerId>>(),
);
pub fn update_peer_list(&mut self, mut new_peers: HashSet<PeerId>) {
self.sorted_peers.retain(|peer| new_peers.remove(peer));
self.sorted_peers.extend(new_peers);
}
/// Rotate peers after each failed attempt to create a block.
pub fn rotate_all(&mut self) {
self.sorted_peers.rotate_left(1);
}
/// Re-arrange the set of peers after each successful block commit.
pub fn rotate_set_a(&mut self) {
let top = self.sorted_peers.remove(0);
self.sorted_peers.insert(
self.min_votes_for_commit().min(self.sorted_peers.len()),
top,
);
let rotate_at = self.min_votes_for_commit().min(self.sorted_peers.len());
self.sorted_peers[..rotate_at].rotate_left(1);
}
/// Pull peers up in the topology to the top of the a set while preserving local order.
pub fn lift_up_peers(&mut self, to_lift_up: &[PublicKey]) {
let mut observing = Vec::new();
let mut i = 0;
while i < self.sorted_peers.len() {
if to_lift_up.contains(&self.sorted_peers[i].public_key) {
i += 1;
} else {
observing.insert(0, self.sorted_peers.remove(i)); // This has to be insert(0) and not push in order to preserve order.
}
}
self.sorted_peers.extend(observing);
self.sorted_peers
.sort_by_cached_key(|peer| !to_lift_up.contains(&peer.public_key));
}
}

Expand All @@ -184,3 +162,76 @@ pub enum Role {
/// Proxy Tail.
ProxyTail,
}

#[cfg(test)]
mod tests {
use iroha_crypto::KeyPair;

use super::*;

macro_rules! peers {
($($id:literal),+$(,)?) => {
vec![
$(PeerId::new(&(([0, 0, 0, 0], $id).into()), KeyPair::generate().expect("Failed to generate key pair").public_key())),+
]
};
}

fn topology() -> Topology {
let peers = peers![0, 1, 2, 3, 4, 5, 6];
Topology::new(peers)
}

fn extract_ports(topology: &Topology) -> Vec<u16> {
topology
.sorted_peers
.iter()
.map(|peer| peer.address.port())
.collect()
}

#[test]
fn rotate_all() {
let mut topology = topology();
topology.rotate_all();
assert_eq!(extract_ports(&topology), vec![1, 2, 3, 4, 5, 6, 0])
}

#[test]
fn rotate_set_a() {
let mut topology = topology();
topology.rotate_set_a();
assert_eq!(extract_ports(&topology), vec![1, 2, 3, 4, 0, 5, 6])
}

#[test]
fn lift_up_peers() {
let mut topology = topology();
// Will lift up 1, 2, 4, 6
let to_lift_up = &[
topology.sorted_peers[1].public_key().clone(),
topology.sorted_peers[2].public_key().clone(),
topology.sorted_peers[4].public_key().clone(),
topology.sorted_peers[6].public_key().clone(),
];
topology.lift_up_peers(to_lift_up);
assert_eq!(extract_ports(&topology), vec![1, 2, 4, 6, 0, 3, 5])
}

#[test]
fn update_peer_list() {
let mut topology = topology();
// New peers will be 0, 2, 5, 7
let new_peers = {
let mut peers = HashSet::from([
topology.sorted_peers[0].clone(),
topology.sorted_peers[5].clone(),
topology.sorted_peers[2].clone(),
]);
peers.extend(peers![7]);
peers
};
topology.update_peer_list(new_peers);
assert_eq!(extract_ports(&topology), vec![0, 2, 5, 7])
}
}
9 changes: 9 additions & 0 deletions primitives/src/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,15 @@ impl SocketAddr {
}
}

/// Extracts port from [`Self`]
pub fn port(&self) -> u16 {
match self {
SocketAddr::Ipv4(addr) => addr.port,
SocketAddr::Ipv6(addr) => addr.port,
SocketAddr::Host(addr) => addr.port,
}
}

/// Serialize the data contained in this [`SocketAddr`] for use in hashing.
pub fn payload(&self) -> Vec<u8> {
let mut bytes = Vec::new();
Expand Down

0 comments on commit 5afcdef

Please sign in to comment.