Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Doc: add example raft-kv #156

Merged
merged 3 commits into from
Feb 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ members = [
"openraft",
"memstore",
]
exclude = [
"example-raft-kv",
]
7 changes: 7 additions & 0 deletions example-raft-kv/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Directory Ignores ##########################################################
target
vendor
.idea

# File Ignores ###############################################################
Cargo.lock
42 changes: 42 additions & 0 deletions example-raft-kv/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
[package]
name = "example-raft-key-value"
version = "0.1.0"
edition = "2021"
authors = [
"drdr xp <drdr.xp@gmail.com>",
"Pedro Paulo de Amorim <pepa.amorim@gmail.com>"
]
categories = ["algorithms", "asynchronous", "data-structures"]
description = "An example distributed key-value store built upon `openraft`."
homepage = "https://github.com/datafuselabs/openraft"
keywords = ["raft", "consensus"]
license = "MIT/Apache-2.0"
repository = "https://github.com/datafuselabs/openraft"
readme = "README.md"

[[bin]]
name = "raft-key-value"
path = "src/bin/main.rs"

[dependencies]
actix-web = "4.0.0-rc.2"
anyerror = { version = "0.1.1"}
async-trait = "0.1.36"
clap = { version = "3.0.13", features = ["derive", "env"] }
env_logger = "0.9.0"
openraft = { version="0.6", path= "../openraft" }
Copy link
Contributor

@ppamorim ppamorim Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drmingdrmer I would set the crates source to prevent any conflict with future versions, we can keep the demo updated accordingly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not get it. What do you want it to look like? 🤔

Copy link
Contributor

@ppamorim ppamorim Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean replace that with openraft = "0.6.4" to force cargo to get the build from the stable release, but I noticed that you did not that because there are changes to the core, understandable for the moment. I will be closing this item.

reqwest = { version = "0.11.9", features = ["json"] }
serde = { version="1.0.114", features=["derive"] }
serde_json = "1.0.57"
tokio = { version="1.0", default-features=false, features=["sync"] }
tracing = "0.1.29"
tracing-futures = "0.2.4"

[dev-dependencies]
maplit = "1.0.2"

[features]
docinclude = [] # Used only for activating `doc(include="...")` on nightly.

[package.metadata.docs.rs]
features = ["docinclude"] # Activate `docinclude` during docs.rs build.
119 changes: 119 additions & 0 deletions example-raft-kv/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Example distributed key-value store built upon openraft.

It is an example of how to build a real-world key-value store with `openraft`.
Includes:
- An in-memory `RaftStorage` implementation [store](./src/store/store.rs).

- A server is based on [actix-web](https://docs.rs/actix-web/4.0.0-rc.2).
Includes:
- raft-internal network APIs for replication and voting.
- Admin APIs to add nodes, change-membership etc.
- Application APIs to write a value by key or read a value by key.

- Client and `RaftNetwork`([rpc](./src/network/rpc.rs)) are built upon [reqwest](https://docs.rs/reqwest).

## Run it

If you want to see a simulation of 3 nodes running and sharing data, you can run the cluster demo:

```shell
./test-cluster.sh
```

if you want to compile the application, run:

```shell
cargo build
```

(If you append `--release` to make it compile in production, but we don't recommend to use
this project in production yet.)

To run it, get the binary `raft-key-value` inside `target/debug` and run:

```shell
./raft-key-value --id 1 --http-addr 127.0.0.1:21001
```

It will start a node.

To start the following nodes:

```shell
./raft-key-value --id 2 --http-addr 127.0.0.1:21002
```

You can continue replicating the nodes by changing the `id` and `http-addr`.

After that, call the first node created:

```
POST - 127.0.0.1:21001/init
```

It will define the first node created as the leader.

After that you will need to notify the leader node about the other nodes:

```
POST - 127.0.0.1:21001/write '{"AddNode":{"id":1,"addr":"127.0.0.1:21001"}}'
POST - 127.0.0.1:21001/write '{"AddNode":{"id":2,"addr":"127.0.0.1:21002"}}'
...
```

Then you need to inform to the leader that these nodes are learners:

```
POST - 127.0.0.1:21001/add-learner "2"
```

Now you need to tell the leader to add all learners as members of the cluster:

```
POST - 127.0.0.1:21001/change-membership "[1, 2]"
```

Write some data in any of the nodes:

```
POST - 127.0.0.1:21001/write "{"Set":{"key":"foo","value":"bar"}}"
```

Read the data from any node:

```
POST - 127.0.0.1:21002/read "foo"
```

You should be able to read that on the another instance even if you did not sync any data!


## How it's structured.

The application is separated in 4 modules:

- `bin`: You can find the `main()` function in [main](./src/bin/main.rs) the file where the setup for the server happens.
- `network`: You can find the [api](./src/network/api.rs) that implements the endpoints used by the public API and [rpc](./src/network/rpc.rs) where all the raft communication from the node happens. [management](./src/network/management.rs) is where all the administration endpoints are present, those are used to add orremove nodes, promote and more. [raft](./src/network/raft.rs) is where all the communication are received from other nodes.
- `store`: You can find the file [store](./src/store/mod.rs) where all the key-value implementation is done. Here is where your data application will be managed.

## Where is my data?

The data is store inside state machines, each state machine represents a point of data and
raft enforces that all nodes have the same data in synchronization. You can have a look of
the struct [ExampleStateMachine](./src/store/mod.rs)

## Cluster management

The raft itself does not store node addresses.
But in a real-world application, the implementation of `RaftNetwork` needs to know the addresses.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: I saw that rite_raft works by this way:

  • Start the leader informing the server address and raft address (aren't or could they be the same here?)
  • When adding a node, inform the server address, leader address and raft address.

By doing that, the node will connect to the leader and append itself as an active instance. Could we do that in the demo? I am aware that you did that on rpc 21001/write '{"AddNode":{"id":1,"addr":"127.0.0.1:21001"}}' but is that better? Just a question.

Copy link
Contributor

@ppamorim ppamorim Feb 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drmingdrmer I can open a PR in your fork that includes this option, if the invocation includes the flag leader_addr it will auto include the leaner to after the server gets started.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Node addresses have to be stored and replicated, i.e., adding nodes has to be done through raft protocol.
Otherwise, a new leader won't know about the addresses of other nodes.

Adding a node as a member in a single step is can be done by just sending a join(leader_address) request to an uninitialized node and letting it do all these 3 steps:

  • Write a raft log entry to the leader to add the address of it: rpc <leader>/write '{"AddNode":{"id":1,"addr":"127.0.0.1:21001"}}'
  • Inform the leader to replicate logs to it: rpc <leader>/add-learner.
  • Inform the leader to change to a new membership config that includes it: rpc <leader>/change-members.


Thus, in this example application:

- The storage layer has to store nodes' information.
- The network layer keeps a reference to the store so that it is able to get the address of a target node to send RPC to.

To add a node to a cluster, it includes 3 steps:

- Write a `node` through raft protocol to the storage.
- Add the node as a `Learner` to let it start receiving replication data from the leader.
- Invoke `change-membership` to change the learner node to a member.
16 changes: 16 additions & 0 deletions example-raft-kv/src/app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use std::sync::Arc;

use openraft::Config;
use openraft::NodeId;

use crate::ExampleRaft;
use crate::ExampleStore;

// Representation of an application state. This struct can be shared around to share
// instances of raft, store and more.
pub struct ExampleApp {
pub id: NodeId,
pub raft: ExampleRaft,
pub store: Arc<ExampleStore>,
pub config: Arc<Config>,
}
88 changes: 88 additions & 0 deletions example-raft-kv/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::sync::Arc;

use actix_web::middleware;
use actix_web::middleware::Logger;
use actix_web::web::Data;
use actix_web::App;
use actix_web::HttpServer;
use clap::Parser;
use env_logger::Env;
use example_raft_key_value::app::ExampleApp;
use example_raft_key_value::network::api;
use example_raft_key_value::network::management;
use example_raft_key_value::network::raft;
use example_raft_key_value::network::rpc::ExampleNetwork;
use example_raft_key_value::store::ExampleRequest;
use example_raft_key_value::store::ExampleResponse;
use example_raft_key_value::store::ExampleStore;
use openraft::Config;
use openraft::Raft;

pub type ExampleRaft = Raft<ExampleRequest, ExampleResponse, ExampleNetwork, ExampleStore>;

#[derive(Parser, Clone, Debug)]
#[clap(author, version, about, long_about = None)]
pub struct Opt {
#[clap(long)]
pub id: u64,

#[clap(long)]
pub http_addr: String,
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
// Setup the logger
env_logger::init_from_env(Env::default().default_filter_or("info"));

// Parse the parameters passed by arguments.
let options = Opt::parse();
let node_id = options.id;

// Create a configuration for the raft instance.
let config = Arc::new(Config::default().validate().unwrap());

// Create a instance of where the Raft data will be stored.
let store = Arc::new(ExampleStore::default());

// Create the network layer that will connect and communicate the raft instances and
// will be used in conjunction with the store created above.
let network = Arc::new(ExampleNetwork { store: store.clone() });

// Create a local raft instance.
let raft = Raft::new(node_id, config.clone(), network, store.clone());

// Create an application that will store all the instances created above, this will
// be later used on the actix-web services.
let app = Data::new(ExampleApp {
id: options.id,
raft,
store,
config,
});

// Start the actix-web server.
HttpServer::new(move || {
App::new()
.wrap(Logger::default())
.wrap(Logger::new("%a %{User-Agent}i"))
.wrap(middleware::Compress::default())
.app_data(app.clone())
// raft internal RPC
.service(raft::append)
.service(raft::snapshot)
.service(raft::vote)
// admin API
.service(management::init)
.service(management::add_learner)
.service(management::change_membership)
.service(management::metrics)
.service(management::list_nodes)
// application API
.service(api::write)
.service(api::read)
})
.bind(options.http_addr)?
.run()
.await
}
12 changes: 12 additions & 0 deletions example-raft-kv/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use openraft::Raft;

use crate::network::rpc::ExampleNetwork;
use crate::store::ExampleRequest;
use crate::store::ExampleResponse;
use crate::store::ExampleStore;

pub mod app;
pub mod network;
pub mod store;

pub type ExampleRaft = Raft<ExampleRequest, ExampleResponse, ExampleNetwork, ExampleStore>;
34 changes: 34 additions & 0 deletions example-raft-kv/src/network/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use actix_web::post;
use actix_web::web;
use actix_web::web::Data;
use actix_web::Responder;
use openraft::raft::ClientWriteRequest;
use openraft::raft::EntryPayload;
use web::Json;

use crate::app::ExampleApp;
use crate::store::ExampleRequest;

/**
* Application API
*
* This is where you place your application, you can use the example below to create your
* API. The current implementation:
*
* - `POST - /write` saves a value in a key and sync the nodes.
* - `GET - /read` attempt to find a value from a given key.
*/
#[post("/write")]
pub async fn write(app: Data<ExampleApp>, req: Json<ExampleRequest>) -> actix_web::Result<impl Responder> {
let request = ClientWriteRequest::new(EntryPayload::Normal(req.0));
let response = app.raft.client_write(request).await;
Ok(Json(response))
}

#[post("/read")]
pub async fn read(app: Data<ExampleApp>, req: Json<String>) -> actix_web::Result<impl Responder> {
let state_machine = app.store.state_machine.read().await;
let key = req.0;
let value = state_machine.data.get(&key).cloned();
Ok(Json(value.unwrap_or_default()))
}
58 changes: 58 additions & 0 deletions example-raft-kv/src/network/management.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::collections::BTreeSet;

use actix_web::get;
use actix_web::post;
use actix_web::web;
use actix_web::web::Data;
use actix_web::Responder;
use openraft::NodeId;
use web::Json;

use crate::app::ExampleApp;

// --- Cluster management

/// Add a node as **Learner**.
///
/// A Learner receives log replication from the leader but does not vote.
/// This should be done before adding a node as a member into the cluster
/// (by calling `change-membership`)
#[post("/add-learner")]
pub async fn add_learner(app: Data<ExampleApp>, req: Json<NodeId>) -> actix_web::Result<impl Responder> {
let response = app.raft.add_learner(req.0, true).await;
Ok(Json(response))
}

/// Changes specified learners to members, or remove members.
#[post("/change-membership")]
pub async fn change_membership(
app: Data<ExampleApp>,
req: Json<BTreeSet<NodeId>>,
) -> actix_web::Result<impl Responder> {
let response = app.raft.change_membership(req.0, true).await;
Ok(Json(response))
}

/// Initialize a single-node cluster.
#[post("/init")]
pub async fn init(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
let mut nodes = BTreeSet::new();
nodes.insert(app.id);
let response = app.raft.initialize(nodes).await;
Ok(Json(response))
}

/// Get the latest metrics of the cluster
#[get("/metrics")]
pub async fn metrics(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
let response = app.raft.metrics().borrow().clone();
Ok(Json(response))
}

/// List known nodes of the cluster.
#[get("/list-nodes")]
pub async fn list_nodes(app: Data<ExampleApp>) -> actix_web::Result<impl Responder> {
let state_machine = app.store.state_machine.read().await;
let response = state_machine.nodes.clone();
Ok(Json(response))
}
4 changes: 4 additions & 0 deletions example-raft-kv/src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod api;
pub mod management;
pub mod raft;
pub mod rpc;
Loading