Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Doc: add example raft-kv #156

Merged
merged 3 commits into from Feb 5, 2022
Merged

Conversation

drmingdrmer
Copy link
Member

Add an example of distributed kv store implementation: ./example-raft-kv.

Includes:

  • An in-memory RaftStorage implementation store.

  • A server is based on actix-web.
    Includes:

    • raft-internal network APIs for replication and voting.
    • Admin APIs to add nodes, change-membership etc.
    • Application APIs to write a value by key or read a value by key.
  • Client and RaftNetwork(network) are built upon reqwest.

@drmingdrmer
Copy link
Member Author

@ppamorim
Have a look at the code:DDD.
I've not yet got time to update doc.:(

Copy link
Contributor

@ppamorim ppamorim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am super happy with this implementation! Finally I will be able to implement my project! Thank you VERY much!!!

I also pointed some items in the review, please be free to answer these any time. I will probably later point more things (at the moment I don't have anything, but I did not test the code yet because I am currently working in another projects at the same time, I will verify that today but later).

Cargo.toml Outdated Show resolved Hide resolved
## Cluster management

The raft itself does not store node addresses.
But in a real-world application, the implementation of `RaftNetwork` needs to know the addresses.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: I saw that rite_raft works by this way:

  • Start the leader informing the server address and raft address (aren't or could they be the same here?)
  • When adding a node, inform the server address, leader address and raft address.

By doing that, the node will connect to the leader and append itself as an active instance. Could we do that in the demo? I am aware that you did that on rpc 21001/write '{"AddNode":{"id":1,"addr":"127.0.0.1:21001"}}' but is that better? Just a question.

Copy link
Contributor

@ppamorim ppamorim Feb 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drmingdrmer I can open a PR in your fork that includes this option, if the invocation includes the flag leader_addr it will auto include the leaner to after the server gets started.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Node addresses have to be stored and replicated, i.e., adding nodes has to be done through raft protocol.
Otherwise, a new leader won't know about the addresses of other nodes.

Adding a node as a member in a single step is can be done by just sending a join(leader_address) request to an uninitialized node and letting it do all these 3 steps:

  • Write a raft log entry to the leader to add the address of it: rpc <leader>/write '{"AddNode":{"id":1,"addr":"127.0.0.1:21001"}}'
  • Inform the leader to replicate logs to it: rpc <leader>/add-learner.
  • Inform the leader to change to a new membership config that includes it: rpc <leader>/change-members.

@ppamorim
Copy link
Contributor

ppamorim commented Feb 2, 2022

@drmingdrmer Hi again, I opened a PR in your fork (it's is in draft at the moment, I will give some hours to check again if everything is right) with some fixes and how I think it should be implemented. I tried to prevent any type of abbreviation in the implementation, even if it looks verbose. I think it's easier to understand what is happening. Please be free to check that.

async-trait = "0.1.36"
clap = { version = "3.0.13", features = ["derive", "env"] }
env_logger = "0.9.0"
openraft = { version="0.6", path= "../openraft" }
Copy link
Contributor

@ppamorim ppamorim Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drmingdrmer I would set the crates source to prevent any conflict with future versions, we can keep the demo updated accordingly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not get it. What do you want it to look like? 🤔

Copy link
Contributor

@ppamorim ppamorim Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean replace that with openraft = "0.6.4" to force cargo to get the build from the stable release, but I noticed that you did not that because there are changes to the core, understandable for the moment. I will be closing this item.

@drmingdrmer
Copy link
Member Author

@drmingdrmer Hi again, I opened a PR in your fork (it's is in draft at the moment, I will give some hours to check again if everything is right) with some fixes and how I think it should be implemented. I tried to prevent any type of abbreviation in the implementation, even if it looks verbose. I think it's easier to understand what is happening. Please be free to check that.

Great! Let me check it out!

value.unwrap_or_default()
};
Ok(Json(res))
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could result in stale reads if the request reaches a node that is not the leader, right?

IMHO an endpoint with potential stale reads could be very useful in many applications, but it should be explicit that it could be a stale read. Adding another endpoint based on client_read would be good.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!
This is a draft application.
I'm gonna add a read endpoint in another PR:DDD

Copy link
Contributor

@ppamorim ppamorim Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MikaelCall @drmingdrmer Is this outdated? By placing the code below before all state machine calls will sort this issue?

let _ = app.raft.client_read().await;

I suspect it will wait until the node gets confirmed to be in sync with the leader, no? It seems to be working but I have no means to confirm that it is really preventing stale reads.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ppamorim
You have to handle the error. If the node that receives the request is not the leader it will give a forward to leader error. Not handling the error (i.e. returning the error using ?) may still result in stale reads.

Search for client_read in this repository. The fn docs are good and there is a great test explaining the difference calling client_read on leader vs follower nodes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MikaelCall Nice! I was confused and thought it was part of another code, in my project I removed this completely and it's redirecting the request to the leader, as expected. I will read more about it too.

@ppamorim
Copy link
Contributor

ppamorim commented Feb 3, 2022

@drmingdrmer I did a test where:

  • I started a leader at port 8500.
  • Started a learner at port 8501 and set it as member.

After that the nodes started heartbeating as expected, calling POST /raft-append

Then I left the server working for a moment and killed the leader at port 8500.

The node running at port 5801 called POST /raft-vote, probably it turned to be the leader since it's the only node that kept running.

After that I created a new learner at port 8502 and try to add it as a node to the new leader.

But when I call 5801/list-nodes it returns the node itself and the node that was killed at port 8500, the 8502 is not present. Am I missing something?

@ppamorim
Copy link
Contributor

ppamorim commented Feb 3, 2022

@drmingdrmer I could panic the application when calling /list-nodes on the leader.

The problem happened on the function send_rpc:

let addr = {
      let state_machine = self.store.state_machine.read().await;
      state_machine.nodes.get(&target).unwrap().clone() // <------ `unwrap()` caused the application to crash
};

@drmingdrmer
Copy link
Member Author

But when I call 5801/list-nodes it returns the node itself and the node that was killed at port 8500, the 8502 is not present. Am I missing something?

The cluster has only 2 nodes thus a quorum(majority) is the entire cluster, i.e. 8500 and 8501.
After killing one of them, no consensus can be made because a quorum requires 2 nodes, and no change can be made to the cluster which has only one online node 8501.

@drmingdrmer
Copy link
Member Author

@drmingdrmer I could panic the application when calling /list-nodes on the leader.

The problem happened on the function send_rpc:

let addr = {
      let state_machine = self.store.state_machine.read().await;
      state_machine.nodes.get(&target).unwrap().clone() // <------ `unwrap()` caused the application to crash
};

Yes, it will panic if the node is not added.

A real-world application should check the presence of a target and if not, return an error.

@ppamorim
Copy link
Contributor

ppamorim commented Feb 3, 2022

@drmingdrmer Understood, so the minimum number of nodes is 3. Is there any way to recover from this state by adding 2 learners?

@drmingdrmer
Copy link
Member Author

@drmingdrmer Understood, so the minimum number of nodes is 3. Is there any way to recover from this state by adding 2 learners?

No way. :(

A learner does not vote and it does not count as a member of a quorum.

@ppamorim
Copy link
Contributor

ppamorim commented Feb 3, 2022

@drmingdrmer I could panic the application when calling /list-nodes on the leader.
The problem happened on the function send_rpc:

let addr = {
      let state_machine = self.store.state_machine.read().await;
      state_machine.nodes.get(&target).unwrap().clone() // <------ `unwrap()` caused the application to crash
};

Yes, it will panic if the node is not added.

A real-world application should check the presence of a target and if not, return an error.

@drmingdrmer Talking about that. I am trying to handle the state machine error, I created a new enum called StateMachineError just to hold this error. You can see the implementation here: https://gist.github.com/ppamorim/8294fde7025cb66bb8639551c8abac2d

I had to do that because RPCError does not contain any generic error type that I can use (or I am not aware of) to wrap this state machine error. Does it make sense to implement From on my StateMachineError to convert it to a RPCError?

@ppamorim
Copy link
Contributor

ppamorim commented Feb 3, 2022

@drmingdrmer Ah, after reading a bit more and being less stupid I could understand that I can use RPCError::RemoteError(RemoteError::new(target, e))

@drmingdrmer
Copy link
Member Author

@drmingdrmer I could panic the application when calling /list-nodes on the leader.
The problem happened on the function send_rpc:

let addr = {
      let state_machine = self.store.state_machine.read().await;
      state_machine.nodes.get(&target).unwrap().clone() // <------ `unwrap()` caused the application to crash
};

Yes, it will panic if the node is not added.
A real-world application should check the presence of a target and if not, return an error.

@drmingdrmer Talking about that. I am trying to handle the state machine error, I created a new enum called StateMachineError just to hold this error. You can see the implementation here: https://gist.github.com/ppamorim/8294fde7025cb66bb8639551c8abac2d

I had to do that because RPCError does not contain any generic error type that I can use (or I am not aware of) to wrap this state machine error. Does it make sense to implement From on my StateMachineError to convert it to a RPCError?

If a target is not found, it probably is an administrative mistake:
The administrator was trying to add a learner without writing the node address to the cluster.

At least it's not the state-machine's fault, IMHO.
What about making it a RPCError::Network(NetworkError)?
I.e., treat such an error an unreachable node error.

NetworkError can be built from any StdError:

impl NetworkError {
    pub fn new<E: Error + 'static>(e: &E) -> Self {
        Self {
            source: AnyError::new(e),
        }
    }
}

@drmingdrmer
Copy link
Member Author

@drmingdrmer Ah, after reading a bit more and being less stupid I could understand that I can use RPCError::RemoteError(RemoteError::new(target, e))

Hmm.. an RPCError::RemoteError is meant to be a wrapper of an error that happened on a remote peer.

@ppamorim
Copy link
Contributor

ppamorim commented Feb 3, 2022

@drmingdrmer Could implement it by doing:

pub struct NodeNotFound {
  pub id: NodeId,
}

impl NodeNotFound {
  fn new(id: NodeId) -> Self {
    Self { id }
  }
}

Then later using:

let addr = state_machine.nodes
      .get(&target)
      .ok_or(RPCError::Network(NetworkError::new(&NodeNotFound::new(target))))?
      .clone();

Just curious why the & is required here &NodeNotFound::new(target).

@drmingdrmer
Copy link
Member Author

@drmingdrmer Could implement it by doing:

pub struct NodeNotFound {
  pub id: NodeId,
}

impl NodeNotFound {
  fn new(id: NodeId) -> Self {
    Self { id }
  }
}

Then later using:

let addr = state_machine.nodes
      .get(&target)
      .ok_or(RPCError::Network(NetworkError::new(&NodeNotFound::new(target))))?
      .clone();

Just curious why the & is required here &NodeNotFound::new(target).

Right:DDD

Because NetworkError::new() actually does not consume the input error.
It internally builds another object from &e.

@ppamorim
Copy link
Contributor

ppamorim commented Feb 3, 2022

Question for further discussion:

I noticed that when I try to write data to the state machine using the the learners (already members), I get this error:

{
	"Err": {
		"ForwardToLeader": {
			"leader_id": 3630289849000485989
		}
	}
}

I am aware that is expected and I understand that with this implementation only the leader accepts write of data. Could be possible to make the learners to dispatch the write of the data to the leader in that case? I am asking that based on the information ForwardToLeader that the learner returns. I expect the caller to foward the write to the leader but could the same be done inside the leaner using the management endpoints?

@drmingdrmer
Copy link
Member Author

Question for further discussion:

I noticed that when I try to write data to the state machine using the the learners (already members), I get this error:

{
	"Err": {
		"ForwardToLeader": {
			"leader_id": 3630289849000485989
		}
	}
}

I am aware that is expected and I understand that with this implementation only the leader accepts write of data. Could be possible to make the learners to dispatch the write of the data to the leader in that case? I am asking that based on the information ForwardToLeader that the learner returns. I expect the caller to foward the write to the leader but could the same be done inside the leaner using the management endpoints?

Yes, it's possible to embed a client that can deal with ForwardToLeader error into an endpoint on a learner.
And let users use a lightweight client that does not deal with such errors.

drmingdrmer and others added 3 commits February 3, 2022 23:03
Add an example of distributed kv store implementation: `./example-raft-kv`.

Includes:
- An in-memory `RaftStorage` implementation [store](./store).

- A server is based on [actix-web](https://docs.rs/actix-web/4.0.0-rc.2).
  Includes:
  - raft-internal network APIs for replication and voting.
  - Admin APIs to add nodes, change-membership etc.
  - Application APIs to write a value by key or read a value by key.

- Client and `RaftNetwork`([network](./network)) are built upon [reqwest](https://docs.rs/reqwest).
@ppamorim
Copy link
Contributor

ppamorim commented Feb 3, 2022

@drmingdrmer I tried to create a batch interaction with the state machine with the implementation below:

https://gist.github.com/ppamorim/858bd2b48f779beecb3941d653cab0d8#file-mod-rs-L30-L45

I did that to prevent the need to call the endpoint for each entry, so I can insert multiple the entries to the state machine's data at once. But it's causing the error:

{
   "Err":{
      "Fatal":"Stopped"
   }
}

Is it some sort of timeout for the heartbeats? What I need to do to prevent that? The data has around 25k entries.

Edit: I tested with a small batch (20 items) and it's working, will I need to add some sort of heartbeat verification for the batch write?

@drmingdrmer
Copy link
Member Author

@drmingdrmer I tried to create a batch interaction with the state machine with the implementation below:

https://gist.github.com/ppamorim/858bd2b48f779beecb3941d653cab0d8#file-mod-rs-L30-L45

I did that to prevent the need to call the endpoint for each user, so I can insert multiple the entries to the state machine's data at once. But it's causing the error:

{
   "Err":{
      "Fatal":"Stopped"
   }
}

Is it some sort of timeout for the heartbeats? What I need to do to prevent that? The data has around 25k entries.

Edit: I tested with a small batch (20 items) and it's working, will I need to add some sort of heartbeat verification for the batch write?

Normally a timeout does not cause a Fatal error.
Do you have a branch I can investigate with?

My guess is that there is a panic caused by a too large RPC payload.

@ppamorim
Copy link
Contributor

ppamorim commented Feb 3, 2022

@drmingdrmer I will add you in the project, after we sort this issue I can replicate that in the example.

Edit: You should have the project access now.

@drmingdrmer
Copy link
Member Author

@drmingdrmer I will add you in the project, after we sort this issue I can replicate that in the example.

Edit: You should have the project access now.

Then what do I do to reproduce this problem 🤔 ?

@ppamorim
Copy link
Contributor

ppamorim commented Feb 3, 2022

@drmingdrmer You can create a massive JSON (more than 20k entries) with this format:

[
  {
    "email": "test@google.com",
    "region": "usa"
  },
  {
    "email": "spanish@google.com",
    "region": "spain"
  },
  ...
]

You need to call POST - /batch-user and send this JSON array.

If you run cargo run it will start the server at 127.0.0.1:8100.

Mind that the issue does not happen if you have only the leader started, you can start 3 instances by running ./test-cluster.sh.

@ppamorim
Copy link
Contributor

ppamorim commented Feb 3, 2022

@drmingdrmer I think the issue is around actix-web limiting the payload size, this can be a problem even for production level synchronization.

Edit: It's not :(

@drmingdrmer
Copy link
Member Author

@drmingdrmer You can create a massive JSON (more than 20k entries) with this format:

[
  {
    "email": "test@google.com",
    "region": "usa"
  },
  {
    "email": "spanish@google.com",
    "region": "spain"
  },
  ...
]

You need to call POST - /batch-user and send this JSON array.

If you run cargo run it will start the server at 127.0.0.1:8100.

Mind that the issue does not happen if you have only the leader started, you can start 3 instances by running ./test-cluster.sh.

Got it. Let me check it out.

Is this PR good enough to merge?

@drmingdrmer drmingdrmer merged commit 4ce50a3 into datafuselabs:main Feb 5, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants