Skip to content

Commit

Permalink
Change: Membership.nodes remove Option from value
Browse files Browse the repository at this point in the history
Before this commit, the value of `Membership.nodes` is `Option<N:
Node>`: `Membership.nodes: BTreeMap<NID, Option<N>>`

The value does not have to be an `Option`.
If an application does not need openraft to store the `Node` data, it
can just implement `trait Node` with an empty struct, or just use
`BasicNode` as a placeholder.

- Using `Option<N>` as the value is a legacy and since #480 is merged, we
  do not need the `Option` any more.
  • Loading branch information
drmingdrmer committed Aug 4, 2022
1 parent e9382ff commit c836355
Show file tree
Hide file tree
Showing 21 changed files with 109 additions and 252 deletions.
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore/src/network/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn add_learner(
) -> actix_web::Result<impl Responder> {
let node_id = req.0 .0;
let node = BasicNode { addr: req.0 .1.clone() };
let res = app.raft.add_learner(node_id, Some(node), true).await;
let res = app.raft.add_learner(node_id, node, true).await;
Ok(Json(res))
}

Expand Down
16 changes: 8 additions & 8 deletions examples/raft-kv-memstore/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl ExampleNetwork {
pub async fn send_rpc<Req, Resp, Err>(
&self,
target: ExampleNodeId,
target_node: Option<&BasicNode>,
target_node: &BasicNode,
uri: &str,
req: Req,
) -> Result<Resp, RPCError<ExampleNodeId, BasicNode, Err>>
Expand All @@ -35,7 +35,7 @@ impl ExampleNetwork {
Err: std::error::Error + DeserializeOwned,
Resp: DeserializeOwned,
{
let addr = target_node.map(|x| &x.addr).unwrap();
let addr = &target_node.addr;

let url = format!("http://{}/{}", addr, uri);
let client = reqwest::Client::new();
Expand All @@ -57,20 +57,20 @@ impl RaftNetworkFactory<ExampleTypeConfig> for ExampleNetwork {
async fn connect(
&mut self,
target: ExampleNodeId,
node: Option<&BasicNode>,
node: &BasicNode,
) -> Result<Self::Network, Self::ConnectionError> {
Ok(ExampleNetworkConnection {
owner: ExampleNetwork {},
target,
target_node: node.cloned(),
target_node: node.clone(),
})
}
}

pub struct ExampleNetworkConnection {
owner: ExampleNetwork,
target: ExampleNodeId,
target_node: Option<BasicNode>,
target_node: BasicNode,
}

#[async_trait]
Expand All @@ -82,7 +82,7 @@ impl RaftNetwork<ExampleTypeConfig> for ExampleNetworkConnection {
AppendEntriesResponse<ExampleNodeId>,
RPCError<ExampleNodeId, BasicNode, AppendEntriesError<ExampleNodeId>>,
> {
self.owner.send_rpc(self.target, self.target_node.as_ref(), "raft-append", req).await
self.owner.send_rpc(self.target, &self.target_node, "raft-append", req).await
}

async fn send_install_snapshot(
Expand All @@ -92,13 +92,13 @@ impl RaftNetwork<ExampleTypeConfig> for ExampleNetworkConnection {
InstallSnapshotResponse<ExampleNodeId>,
RPCError<ExampleNodeId, BasicNode, InstallSnapshotError<ExampleNodeId>>,
> {
self.owner.send_rpc(self.target, self.target_node.as_ref(), "raft-snapshot", req).await
self.owner.send_rpc(self.target, &self.target_node, "raft-snapshot", req).await
}

async fn send_vote(
&mut self,
req: VoteRequest<ExampleNodeId>,
) -> Result<VoteResponse<ExampleNodeId>, RPCError<ExampleNodeId, BasicNode, VoteError<ExampleNodeId>>> {
self.owner.send_rpc(self.target, self.target_node.as_ref(), "raft-vote", req).await
self.owner.send_rpc(self.target, &self.target_node, "raft-vote", req).await
}
}
6 changes: 3 additions & 3 deletions examples/raft-kv-memstore/tests/cluster/test_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ async fn test_cluster() -> anyhow::Result<()> {
x.membership_config.nodes().map(|(nid, node)| (*nid, node.clone())).collect::<BTreeMap<_, _>>();
assert_eq!(
btreemap! {
1 => Some(BasicNode::new("127.0.0.1:21001")),
2 => Some(BasicNode::new("127.0.0.1:21002")),
3 => Some(BasicNode::new("127.0.0.1:21003")),
1 => BasicNode::new("127.0.0.1:21001"),
2 => BasicNode::new("127.0.0.1:21002"),
3 => BasicNode::new("127.0.0.1:21003"),
},
nodes_in_cluster
);
Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/src/network/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn rest(app: &mut Server) {
async fn add_learner(mut req: Request<Arc<ExampleApp>>) -> tide::Result {
let (node_id, api_addr, rpc_addr): (ExampleNodeId, String, String) = req.body_json().await?;
let node = ExampleNode { rpc_addr, api_addr };
let res = req.state().raft.add_learner(node_id, Some(node), true).await;
let res = req.state().raft.add_learner(node_id, node, true).await;
Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&res)?).build())
}

Expand Down
4 changes: 2 additions & 2 deletions examples/raft-kv-rocksdb/src/network/raft_network_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ impl RaftNetworkFactory<ExampleTypeConfig> for ExampleNetwork {
async fn connect(
&mut self,
target: ExampleNodeId,
node: Option<&ExampleNode>,
node: &ExampleNode,
) -> Result<Self::Network, Self::ConnectionError> {
dbg!(&node);
let addr = node.map(|x| format!("ws://{}", x.rpc_addr)).unwrap();
let addr = format!("ws://{}", node.rpc_addr);
let client = Client::dial_websocket(&addr).await.ok();
Ok(ExampleNetworkConnection { addr, client, target })
}
Expand Down
6 changes: 3 additions & 3 deletions examples/raft-kv-rocksdb/tests/cluster/test_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ async fn test_cluster() -> Result<(), Box<dyn std::error::Error>> {
x.membership_config.nodes().map(|(nid, node)| (*nid, node.clone())).collect::<BTreeMap<_, _>>();
assert_eq!(
btreemap! {
1 => Some(ExampleNode{rpc_addr: get_rpc_addr(1), api_addr: get_addr(1)}),
2 => Some(ExampleNode{rpc_addr: get_rpc_addr(2), api_addr: get_addr(2)}),
3 => Some(ExampleNode{rpc_addr: get_rpc_addr(3), api_addr: get_addr(3)}),
1 => ExampleNode{rpc_addr: get_rpc_addr(1), api_addr: get_addr(1)},
2 => ExampleNode{rpc_addr: get_rpc_addr(2), api_addr: get_addr(2)},
3 => ExampleNode{rpc_addr: get_rpc_addr(3), api_addr: get_addr(3)},
},
nodes_in_cluster
);
Expand Down
23 changes: 10 additions & 13 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
};

let my_id = self.id;
let target_node = self.engine.state.membership_state.effective.get_node(&target).cloned();
let mut network = match self.network.connect(target, target_node.as_ref()).await {
let target_node = self.engine.state.membership_state.effective.get_node(&target).clone();
let mut network = match self.network.connect(target, &target_node).await {
Ok(n) => n,
Err(e) => {
tracing::error!(target = display(target), "{}", e);
Expand Down Expand Up @@ -479,7 +479,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
pub(super) async fn add_learner(
&mut self,
target: C::NodeId,
node: Option<C::Node>,
node: C::Node,
tx: RaftAddLearnerTx<C::NodeId, C::Node>,
) -> Result<(), Fatal<C::NodeId>> {
if let Some(l) = &self.leader_data {
Expand Down Expand Up @@ -517,7 +517,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}

// Ensure the node is connectable
let conn_res = self.network.connect(target, node.clone().as_ref()).await;
let conn_res = self.network.connect(target, &node).await;
if let Err(e) = conn_res {
let net_err = NetworkError::new(&anyerror::AnyError::new(&e));
let _ = tx.send(Err(AddLearnerError::NetworkError(net_err)));
Expand Down Expand Up @@ -780,9 +780,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) async fn handle_initialize(
&mut self,
member_nodes: BTreeMap<C::NodeId, Option<C::Node>>,
member_nodes: BTreeMap<C::NodeId, C::Node>,
) -> Result<(), InitializeError<C::NodeId, C::Node>> {
let membership = Membership::try_from(member_nodes)?;
let membership = Membership::from(member_nodes);
let payload = EntryPayload::<C>::Membership(membership);

let mut entry_refs = [EntryRef::new(&payload)];
Expand Down Expand Up @@ -967,10 +967,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}

pub(crate) fn get_leader_node(&self, leader_id: Option<C::NodeId>) -> Option<C::Node> {
match leader_id {
None => None,
Some(id) => self.engine.state.membership_state.effective.get_node(&id).cloned(),
}
leader_id.map(|id| self.engine.state.membership_state.effective.get_node(&id).clone())
}

#[tracing::instrument(level = "debug", skip_all)]
Expand Down Expand Up @@ -1071,7 +1068,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

Ok(ReplicationCore::<C, N, S>::spawn(
target,
target_node.cloned(),
target_node.clone(),
self.engine.state.vote,
self.config.clone(),
self.engine.state.last_log_id(),
Expand Down Expand Up @@ -1259,8 +1256,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}

let req = vote_req.clone();
let target_node = self.engine.state.membership_state.effective.get_node(&target).cloned();
let mut network = match self.network.connect(target, target_node.as_ref()).await {
let target_node = self.engine.state.membership_state.effective.get_node(&target).clone();
let mut network = match self.network.connect(target, &target_node).await {
Ok(n) => n,
Err(err) => {
tracing::error!({error=%err, target=display(target)}, "while requesting vote");
Expand Down
1 change: 1 addition & 0 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ pub struct NotAllowed<NID: NodeId> {
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[error("node {node_id} {reason}")]
// TODO: remove it
pub struct MissingNodeInfo<NID: NodeId> {
pub node_id: NID,
pub reason: String,
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/membership/effective_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,12 @@ where
}

/// Get a the node(either voter or learner) by node id.
pub fn get_node(&self, node_id: &NID) -> Option<&N> {
pub fn get_node(&self, node_id: &NID) -> &N {
self.membership.get_node(node_id)
}

/// Returns an Iterator of all nodes(voters and learners).
pub fn nodes(&self) -> impl Iterator<Item = (&NID, &Option<N>)> {
pub fn nodes(&self) -> impl Iterator<Item = (&NID, &N)> {
self.membership.nodes()
}

Expand Down
70 changes: 19 additions & 51 deletions openraft/src/membership/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,21 @@ use crate::quorum::QuorumSet;
use crate::MessageSummary;
use crate::NodeId;

/// BTreeMap for mapping node-id the node.
pub type NodeMap<NID, N> = BTreeMap<NID, Option<N>>;
/// Convert other types into the internal data structure for node infos
pub trait IntoOptionNodes<NID, N>
where
N: Node,
NID: NodeId,
{
fn into_option_nodes(self) -> NodeMap<NID, N>;
fn into_option_nodes(self) -> BTreeMap<NID, N>;
}

impl<NID, N> IntoOptionNodes<NID, N> for ()
where
N: Node,
NID: NodeId,
{
fn into_option_nodes(self) -> NodeMap<NID, N> {
fn into_option_nodes(self) -> BTreeMap<NID, N> {
btreemap! {}
}
}
Expand All @@ -40,8 +38,8 @@ where
N: Node,
NID: NodeId,
{
fn into_option_nodes(self) -> NodeMap<NID, N> {
self.into_iter().map(|node_id| (node_id, None)).collect()
fn into_option_nodes(self) -> BTreeMap<NID, N> {
self.into_iter().map(|node_id| (node_id, N::default())).collect()
}
}

Expand All @@ -50,17 +48,7 @@ where
N: Node,
NID: NodeId,
{
fn into_option_nodes(self) -> NodeMap<NID, N> {
self.into_iter().map(|(node_id, n)| (node_id, Some(n))).collect()
}
}

impl<NID, N> IntoOptionNodes<NID, N> for NodeMap<NID, N>
where
N: Node,
NID: NodeId,
{
fn into_option_nodes(self) -> NodeMap<NID, N> {
fn into_option_nodes(self) -> BTreeMap<NID, N> {
self
}
}
Expand All @@ -84,22 +72,19 @@ where
/// Additional info of all nodes, e.g., the connecting host and port.
///
/// A node-id key that is in `nodes` but is not in `configs` is a **learner**.
/// The values in this map must all be `Some` or `None`.
nodes: BTreeMap<NID, Option<N>>,
nodes: BTreeMap<NID, N>,
}

impl<NID, N> TryFrom<BTreeMap<NID, Option<N>>> for Membership<NID, N>
impl<NID, N> From<BTreeMap<NID, N>> for Membership<NID, N>
where
N: Node,
NID: NodeId,
{
type Error = MissingNodeInfo<NID>;

fn try_from(b: BTreeMap<NID, Option<N>>) -> Result<Self, Self::Error> {
fn from(b: BTreeMap<NID, N>) -> Self {
let member_ids = b.keys().cloned().collect::<BTreeSet<NID>>();

let membership = Membership::with_nodes(vec![member_ids], b)?;
Ok(membership)
// Safe unwrap: every node-id in `member_ids` present in `b`.
Membership::with_nodes(vec![member_ids], b).unwrap()
}
}

Expand All @@ -122,9 +107,8 @@ where
}
res.push(format!("{}", node_id));

if let Some(n) = self.get_node(node_id) {
res.push(format!(":{{{}}}", n));
}
let n = self.get_node(node_id);
res.push(format!(":{{{}}}", n));
}
res.push("}".to_string());
}
Expand All @@ -141,9 +125,8 @@ where

res.push(format!("{}", learner_id));

if let Some(n) = self.get_node(learner_id) {
res.push(format!(":{{{}}}", n));
}
let n = self.get_node(learner_id);
res.push(format!(":{{{}}}", n));
}
res.push("]".to_string());
res.join("")
Expand Down Expand Up @@ -194,28 +177,14 @@ where
}
}

let has_some = nodes.values().any(|x| x.is_some());
if has_some {
let first_none = nodes.iter().find(|(_node_id, v)| v.is_none());
if let Some(first_none) = first_none {
return Err(MissingNodeInfo {
node_id: *first_none.0,
reason: "is None".to_string(),
});
}
}

Ok(Membership { configs, nodes })
}

/// Extends nodes btreemap with another.
///
/// Node that present in `old` will **NOT** be replaced because changing the address of a node potentially breaks
/// consensus guarantee.
pub(crate) fn extend_nodes(
old: BTreeMap<NID, Option<N>>,
new: &BTreeMap<NID, Option<N>>,
) -> BTreeMap<NID, Option<N>> {
pub(crate) fn extend_nodes(old: BTreeMap<NID, N>, new: &BTreeMap<NID, N>) -> BTreeMap<NID, N> {
let mut res = old;

for (k, v) in new.iter() {
Expand All @@ -233,7 +202,7 @@ where
self.configs.len() > 1
}

pub(crate) fn add_learner(&self, node_id: NID, node: Option<N>) -> Result<Self, MissingNodeInfo<NID>> {
pub(crate) fn add_learner(&self, node_id: NID, node: N) -> Result<Self, MissingNodeInfo<NID>> {
let configs = self.configs.clone();

let nodes = Self::extend_nodes(self.nodes.clone(), &btreemap! {node_id=>node});
Expand Down Expand Up @@ -289,13 +258,12 @@ where
}

/// Get a the node(either voter or learner) by node id.
pub(crate) fn get_node(&self, node_id: &NID) -> Option<&N> {
let x = self.nodes.get(node_id)?;
x.as_ref()
pub(crate) fn get_node(&self, node_id: &NID) -> &N {
&self.nodes[node_id]
}

/// Returns an Iterator of all nodes(voters and learners).
pub fn nodes(&self) -> impl Iterator<Item = (&NID, &Option<N>)> {
pub fn nodes(&self) -> impl Iterator<Item = (&NID, &N)> {
self.nodes.iter()
}

Expand Down
Loading

0 comments on commit c836355

Please sign in to comment.