From 07216569c29d1fb30da76a8fc5d3738b3d6af4df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Wed, 2 Feb 2022 22:33:14 +0800 Subject: [PATCH 1/3] Doc: add example raft-kv Add an example of distributed kv store implementation: `./example-raft-kv`. Includes: - An in-memory `RaftStorage` implementation [store](./store). - A server is based on [actix-web](https://docs.rs/actix-web/4.0.0-rc.2). Includes: - raft-internal network APIs for replication and voting. - Admin APIs to add nodes, change-membership etc. - Application APIs to write a value by key or read a value by key. - Client and `RaftNetwork`([network](./network)) are built upon [reqwest](https://docs.rs/reqwest). --- Cargo.toml | 1 + example-raft-kv/Cargo.toml | 42 ++++ example-raft-kv/README.md | 36 +++ example-raft-kv/src/app.rs | 14 ++ example-raft-kv/src/bin/raftkv.rs | 74 ++++++ example-raft-kv/src/lib.rs | 13 ++ example-raft-kv/src/network.rs | 72 ++++++ example-raft-kv/src/rpc_handlers.rs | 114 ++++++++++ example-raft-kv/src/store.rs | 334 ++++++++++++++++++++++++++++ example-raft-kv/test-cluster.sh | 79 +++++++ openraft/src/error.rs | 12 +- openraft/src/raft.rs | 2 +- 12 files changed, 786 insertions(+), 7 deletions(-) create mode 100644 example-raft-kv/Cargo.toml create mode 100644 example-raft-kv/README.md create mode 100644 example-raft-kv/src/app.rs create mode 100644 example-raft-kv/src/bin/raftkv.rs create mode 100644 example-raft-kv/src/lib.rs create mode 100644 example-raft-kv/src/network.rs create mode 100644 example-raft-kv/src/rpc_handlers.rs create mode 100644 example-raft-kv/src/store.rs create mode 100755 example-raft-kv/test-cluster.sh diff --git a/Cargo.toml b/Cargo.toml index c473eb3cf..67ef6b5f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,4 +2,5 @@ members = [ "openraft", "memstore", + "example-raft-kv", ] diff --git a/example-raft-kv/Cargo.toml b/example-raft-kv/Cargo.toml new file mode 100644 index 000000000..1ffbb1bb7 --- /dev/null +++ b/example-raft-kv/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "example-raft-kv" +version = "0.1.0" +edition = "2021" +authors = [ "drdr xp ", ] +categories = ["algorithms", "asynchronous", "data-structures"] +description = "An example distributed kv-store built upon `openraft`." +#documentation = "https://docs.rs/memstore" +homepage = "https://github.com/datafuselabs/openraft" +keywords = ["raft", "consensus"] +license = "MIT/Apache-2.0" +repository = "https://github.com/datafuselabs/openraft" +readme = "README.md" + +[[bin]] +name = "raftkv" +path = "src/bin/raftkv.rs" + + +[dependencies] +actix-web = "4.0.0-rc.2" +anyerror = { version = "0.1.1"} +async-trait = "0.1.36" +clap = { version = "3.0.13", features = ["derive", "env"] } +env_logger = "0.9.0" +openraft = { version="0.6", path= "../openraft" } +reqwest = { version = "0.11.9", features = ["json"] } +serde = { version="1.0.114", features=["derive"] } +serde_json = "1.0.57" +tokio = { version="1.0", default-features=false, features=["sync"] } +tracing = "0.1.29" +tracing-futures = "0.2.4" + + +[dev-dependencies] +maplit = "1.0.2" + +[features] +docinclude = [] # Used only for activating `doc(include="...")` on nightly. + +[package.metadata.docs.rs] +features = ["docinclude"] # Activate `docinclude` during docs.rs build. diff --git a/example-raft-kv/README.md b/example-raft-kv/README.md new file mode 100644 index 000000000..ac2fd05c4 --- /dev/null +++ b/example-raft-kv/README.md @@ -0,0 +1,36 @@ +# Example distributed KV store built upon openraft + +It is an example of how to build a real-world kv store with openraft. +Includes: +- An in-memory `RaftStorage` implementation [store](./store). + +- A server is based on [actix-web](https://docs.rs/actix-web/4.0.0-rc.2). + Includes: + - raft-internal network APIs for replication and voting. + - Admin APIs to add nodes, change-membership etc. + - Application APIs to write a value by key or read a value by key. + +- Client and `RaftNetwork`([network](./network)) are built upon [reqwest](https://docs.rs/reqwest). + +## Run it + +```shell +cargo build +./example-raft-kv/test-cluster.sh +``` + +## Cluster management + +The raft itself does not store node addresses. +But in a real-world application, the implementation of `RaftNetwork` needs to know the addresses. + +Thus, in this example application: + +- The storage layer has to store nodes' information. +- The network layer keeps a reference to the store so that it is able to get the address of a target node to send RPC to. + +To add a node to a cluster, it includes 3 steps: + +- Write a `node` through raft protocol to the storage. +- Add the node as a `Learner` to let it start receiving replication data from the leader. +- Invoke `change-membership` to change the learner node to a member. \ No newline at end of file diff --git a/example-raft-kv/src/app.rs b/example-raft-kv/src/app.rs new file mode 100644 index 000000000..f34f669ce --- /dev/null +++ b/example-raft-kv/src/app.rs @@ -0,0 +1,14 @@ +use std::sync::Arc; + +use openraft::Config; +use openraft::NodeId; + +use crate::ExampleRaft; +use crate::ExampleStore; + +pub struct ExampleApp { + pub id: NodeId, + pub raft: ExampleRaft, + pub store: Arc, + pub config: Arc, +} diff --git a/example-raft-kv/src/bin/raftkv.rs b/example-raft-kv/src/bin/raftkv.rs new file mode 100644 index 000000000..57abfcaee --- /dev/null +++ b/example-raft-kv/src/bin/raftkv.rs @@ -0,0 +1,74 @@ +use std::sync::Arc; + +use actix_web::middleware; +use actix_web::middleware::Logger; +use actix_web::web::Data; +use actix_web::App; +use actix_web::HttpServer; +use clap::Parser; +use env_logger::Env; +use example_raft_kv::app::ExampleApp; +use example_raft_kv::network::ExampleNetwork; +use example_raft_kv::rpc_handlers; +use example_raft_kv::store::ExampleRequest; +use example_raft_kv::store::ExampleResponse; +use example_raft_kv::store::ExampleStore; +use openraft::Config; +use openraft::Raft; + +pub type ExampleRaft = Raft; + +#[derive(Parser, Clone, Debug)] +#[clap(author, version, about, long_about = None)] +pub struct Opt { + #[clap(long)] + pub id: u64, + + #[clap(long)] + pub http_addr: String, +} + +#[actix_web::main] +async fn main() -> std::io::Result<()> { + env_logger::init_from_env(Env::default().default_filter_or("info")); + + let opt = Opt::parse(); + let node_id = opt.id; + + let config = Arc::new(Config::default().validate().unwrap()); + let store = Arc::new(ExampleStore::default()); + let network = Arc::new(ExampleNetwork { store: store.clone() }); + + let raft = Raft::new(node_id, config.clone(), network, store.clone()); + + let app = Data::new(ExampleApp { + id: opt.id, + raft, + store, + config, + }); + + HttpServer::new(move || { + App::new() + .wrap(Logger::default()) + .wrap(Logger::new("%a %{User-Agent}i")) + .wrap(middleware::Compress::default()) + .app_data(app.clone()) + // raft internal RPC + .service(rpc_handlers::append) + .service(rpc_handlers::snapshot) + .service(rpc_handlers::vote) + // admin API + .service(rpc_handlers::init) + .service(rpc_handlers::add_learner) + .service(rpc_handlers::change_membership) + .service(rpc_handlers::metrics) + .service(rpc_handlers::list_nodes) + // application API + .service(rpc_handlers::write) + .service(rpc_handlers::read) + }) + .bind(opt.http_addr)? + .run() + .await +} diff --git a/example-raft-kv/src/lib.rs b/example-raft-kv/src/lib.rs new file mode 100644 index 000000000..4c6324478 --- /dev/null +++ b/example-raft-kv/src/lib.rs @@ -0,0 +1,13 @@ +use openraft::Raft; + +use crate::network::ExampleNetwork; +use crate::store::ExampleRequest; +use crate::store::ExampleResponse; +use crate::store::ExampleStore; + +pub mod app; +pub mod network; +pub mod rpc_handlers; +pub mod store; + +pub type ExampleRaft = Raft; diff --git a/example-raft-kv/src/network.rs b/example-raft-kv/src/network.rs new file mode 100644 index 000000000..24077ddbc --- /dev/null +++ b/example-raft-kv/src/network.rs @@ -0,0 +1,72 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use openraft::error::AppendEntriesError; +use openraft::error::InstallSnapshotError; +use openraft::error::NetworkError; +use openraft::error::RPCError; +use openraft::error::RemoteError; +use openraft::error::VoteError; +use openraft::raft::AppendEntriesRequest; +use openraft::raft::AppendEntriesResponse; +use openraft::raft::InstallSnapshotRequest; +use openraft::raft::InstallSnapshotResponse; +use openraft::raft::VoteRequest; +use openraft::raft::VoteResponse; +use openraft::NodeId; +use openraft::RaftNetwork; +use serde::de::DeserializeOwned; +use serde::Serialize; + +use crate::store::ExampleRequest; +use crate::ExampleStore; + +pub struct ExampleNetwork { + pub store: Arc, +} + +impl ExampleNetwork { + pub async fn send_rpc(&self, target: NodeId, uri: &str, req: Req) -> Result> + where + Req: Serialize, + Err: std::error::Error + DeserializeOwned, + Resp: DeserializeOwned, + { + let addr = { + let sm = self.store.sm.read().await; + sm.nodes.get(&target).unwrap().clone() + }; + + let url = format!("http://{}/{}", addr, uri); + let client = reqwest::Client::new(); + + let resp = client.post(url).json(&req).send().await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + + let res: Result = resp.json().await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + + res.map_err(|e| RPCError::RemoteError(RemoteError::new(target, e))) + } +} + +#[async_trait] +impl RaftNetwork for ExampleNetwork { + async fn send_append_entries( + &self, + target: NodeId, + req: AppendEntriesRequest, + ) -> Result> { + self.send_rpc(target, "raft-append", req).await + } + + async fn send_install_snapshot( + &self, + target: NodeId, + req: InstallSnapshotRequest, + ) -> Result> { + self.send_rpc(target, "raft-snapshot", req).await + } + + async fn send_vote(&self, target: NodeId, req: VoteRequest) -> Result> { + self.send_rpc(target, "raft-vote", req).await + } +} diff --git a/example-raft-kv/src/rpc_handlers.rs b/example-raft-kv/src/rpc_handlers.rs new file mode 100644 index 000000000..574341c19 --- /dev/null +++ b/example-raft-kv/src/rpc_handlers.rs @@ -0,0 +1,114 @@ +use std::collections::BTreeSet; + +use actix_web::get; +use actix_web::post; +use actix_web::web; +use actix_web::web::Data; +use actix_web::Responder; +use openraft::raft::AppendEntriesRequest; +use openraft::raft::ClientWriteRequest; +use openraft::raft::EntryPayload; +use openraft::raft::InstallSnapshotRequest; +use openraft::raft::VoteRequest; +use openraft::NodeId; +use serde::Deserialize; +use serde::Serialize; +use web::Json; + +use crate::app::ExampleApp; +use crate::store::ExampleRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Empty {} + +// --- Raft communication + +#[post("/raft-vote")] +pub async fn vote(app: Data, req: Json) -> actix_web::Result { + let res = app.raft.vote(req.0).await; + Ok(Json(res)) +} + +#[post("/raft-append")] +pub async fn append( + app: Data, + req: Json>, +) -> actix_web::Result { + let res = app.raft.append_entries(req.0).await; + Ok(Json(res)) +} + +#[post("/raft-snapshot")] +pub async fn snapshot(app: Data, req: Json) -> actix_web::Result { + let res = app.raft.install_snapshot(req.0).await; + Ok(Json(res)) +} + +// --- Application API + +#[post("/write")] +pub async fn write(app: Data, req: Json) -> actix_web::Result { + let res = app.raft.client_write(ClientWriteRequest::new(EntryPayload::Normal(req.0))).await; + Ok(Json(res)) +} + +#[post("/read")] +pub async fn read(app: Data, req: Json) -> actix_web::Result { + let res = { + let sm = app.store.sm.read().await; + let key = req.0; + let value = sm.kvs.get(&key).cloned(); + value.unwrap_or_default() + }; + Ok(Json(res)) +} + +// --- Cluster management + +/// Add a node as **Learner**. +/// +/// A Learner receives log replication from the leader but does not vote. +/// This should be done before adding a node as a member into the cluster(by calling `change-membership`) +#[post("/add-learner")] +pub async fn add_learner(app: Data, req: Json) -> actix_web::Result { + let res = app.raft.add_learner(req.0, true).await; + Ok(Json(res)) +} + +/// Changes specified learners to members, or remove members. +#[post("/change-membership")] +pub async fn change_membership( + app: Data, + req: Json>, +) -> actix_web::Result { + let res = app.raft.change_membership(req.0, true).await; + Ok(Json(res)) +} + +/// Initialize a single-node cluster. +#[post("/init")] +pub async fn init(app: Data, _req: Json) -> actix_web::Result { + let mut nodes = BTreeSet::new(); + nodes.insert(app.id); + + let res = app.raft.initialize(nodes).await; + Ok(Json(res)) +} + +/// Get the latest metrics of the cluster +#[get("/metrics")] +pub async fn metrics(app: Data) -> actix_web::Result { + let res = app.raft.metrics().borrow().clone(); + Ok(Json(res)) +} + +/// List known nodes of the cluster. +#[get("/list-nodes")] +pub async fn list_nodes(app: Data) -> actix_web::Result { + let res = { + let sm = app.store.sm.read().await; + sm.nodes.clone() + }; + + Ok(Json(res)) +} diff --git a/example-raft-kv/src/store.rs b/example-raft-kv/src/store.rs new file mode 100644 index 000000000..529e1e33e --- /dev/null +++ b/example-raft-kv/src/store.rs @@ -0,0 +1,334 @@ +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::io::Cursor; +use std::ops::RangeBounds; +use std::sync::Arc; +use std::sync::Mutex; + +use anyerror::AnyError; +use openraft::async_trait::async_trait; +use openraft::raft::Entry; +use openraft::raft::EntryPayload; +use openraft::storage::LogState; +use openraft::storage::Snapshot; +use openraft::AppData; +use openraft::AppDataResponse; +use openraft::EffectiveMembership; +use openraft::ErrorSubject; +use openraft::ErrorVerb; +use openraft::LogId; +use openraft::NodeId; +use openraft::RaftStorage; +use openraft::SnapshotMeta; +use openraft::StateMachineChanges; +use openraft::StorageError; +use openraft::StorageIOError; +use openraft::Vote; +use serde::Deserialize; +use serde::Serialize; +use tokio::sync::RwLock; + +/// Request is a command to modify the state machine. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum ExampleRequest { + Set { key: String, value: String }, + AddNode { id: NodeId, addr: String }, +} + +impl AppData for ExampleRequest {} + +/// The state after applied a request. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ExampleResponse { + pub value: Option, +} + +impl AppDataResponse for ExampleResponse {} + +#[derive(Debug)] +pub struct ExampleSnapshot { + pub meta: SnapshotMeta, + + /// The data of the state machine at the time of this snapshot. + pub data: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct ExampleStateMachine { + pub last_applied_log: Option, + + pub last_membership: Option, + + /// Node addresses in this cluster. + pub nodes: BTreeMap, + + /// Application data. + pub kvs: BTreeMap, +} + +#[derive(Debug, Default)] +pub struct ExampleStore { + last_purged_log_id: RwLock>, + + /// The Raft log. + log: RwLock>>, + + /// The Raft state machine. + pub sm: RwLock, + + /// The current granted vote. + vote: RwLock>, + + snapshot_idx: Arc>, + + current_snapshot: RwLock>, +} + +#[async_trait] +impl RaftStorage for ExampleStore { + type SnapshotData = Cursor>; + + #[tracing::instrument(level = "trace", skip(self))] + async fn save_vote(&self, vote: &Vote) -> Result<(), StorageError> { + let mut h = self.vote.write().await; + + *h = Some(*vote); + Ok(()) + } + + async fn read_vote(&self) -> Result, StorageError> { + Ok(*self.vote.read().await) + } + + async fn get_log_state(&self) -> Result { + let log = self.log.read().await; + let last = log.iter().rev().next().map(|(_, ent)| ent.log_id); + + let last_purged = *self.last_purged_log_id.read().await; + + let last = match last { + None => last_purged, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id: last_purged, + last_log_id: last, + }) + } + + async fn try_get_log_entries + Clone + Debug + Send + Sync>( + &self, + range: RB, + ) -> Result>, StorageError> { + let res = { + let log = self.log.read().await; + log.range(range.clone()).map(|(_, val)| val.clone()).collect::>() + }; + + Ok(res) + } + + #[tracing::instrument(level = "trace", skip(self, entries))] + async fn append_to_log(&self, entries: &[&Entry]) -> Result<(), StorageError> { + let mut log = self.log.write().await; + for entry in entries { + log.insert(entry.log_id.index, (*entry).clone()); + } + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn delete_conflict_logs_since(&self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: [{:?}, +oo)", log_id); + + { + let mut log = self.log.write().await; + + let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + } + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn purge_logs_upto(&self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: [{:?}, +oo)", log_id); + + { + let mut ld = self.last_purged_log_id.write().await; + assert!(*ld <= Some(log_id)); + *ld = Some(log_id); + } + + { + let mut log = self.log.write().await; + + let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + } + + Ok(()) + } + + async fn last_applied_state(&self) -> Result<(Option, Option), StorageError> { + let sm = self.sm.read().await; + Ok((sm.last_applied_log, sm.last_membership.clone())) + } + + #[tracing::instrument(level = "trace", skip(self, entries))] + async fn apply_to_state_machine( + &self, + entries: &[&Entry], + ) -> Result, StorageError> { + let mut res = Vec::with_capacity(entries.len()); + + let mut sm = self.sm.write().await; + + for entry in entries { + tracing::debug!(%entry.log_id, "replicate to sm"); + + sm.last_applied_log = Some(entry.log_id); + + match entry.payload { + EntryPayload::Blank => res.push(ExampleResponse { value: None }), + EntryPayload::Normal(ref req) => match req { + ExampleRequest::Set { key, value } => { + sm.kvs.insert(key.clone(), value.clone()); + res.push(ExampleResponse { + value: Some(value.clone()), + }) + } + ExampleRequest::AddNode { id, addr } => { + sm.nodes.insert(*id, addr.clone()); + res.push(ExampleResponse { + value: Some(format!("cluster: {:?}", sm.nodes)), + }) + } + }, + EntryPayload::Membership(ref mem) => { + sm.last_membership = Some(EffectiveMembership { + log_id: entry.log_id, + membership: mem.clone(), + }); + res.push(ExampleResponse { value: None }) + } + }; + } + Ok(res) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn build_snapshot(&self) -> Result, StorageError> { + let (data, last_applied_log); + + { + // Serialize the data of the state machine. + let sm = self.sm.read().await; + data = serde_json::to_vec(&*sm) + .map_err(|e| StorageIOError::new(ErrorSubject::StateMachine, ErrorVerb::Read, AnyError::new(&e)))?; + + last_applied_log = sm.last_applied_log; + } + + let last_applied_log = match last_applied_log { + None => { + panic!("can not compact empty state machine"); + } + Some(x) => x, + }; + + let snapshot_idx = { + let mut l = self.snapshot_idx.lock().unwrap(); + *l += 1; + *l + }; + + let snapshot_id = format!( + "{}-{}-{}", + last_applied_log.leader_id, last_applied_log.index, snapshot_idx + ); + + let meta = SnapshotMeta { + last_log_id: last_applied_log, + snapshot_id, + }; + + let snapshot = ExampleSnapshot { + meta: meta.clone(), + data: data.clone(), + }; + + { + let mut current_snapshot = self.current_snapshot.write().await; + *current_snapshot = Some(snapshot); + } + + Ok(Snapshot { + meta, + snapshot: Box::new(Cursor::new(data)), + }) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn begin_receiving_snapshot(&self) -> Result, StorageError> { + Ok(Box::new(Cursor::new(Vec::new()))) + } + + #[tracing::instrument(level = "trace", skip(self, snapshot))] + async fn install_snapshot( + &self, + meta: &SnapshotMeta, + snapshot: Box, + ) -> Result { + tracing::info!( + { snapshot_size = snapshot.get_ref().len() }, + "decoding snapshot for installation" + ); + + let new_snapshot = ExampleSnapshot { + meta: meta.clone(), + data: snapshot.into_inner(), + }; + + // Update the state machine. + { + let new_sm: ExampleStateMachine = serde_json::from_slice(&new_snapshot.data).map_err(|e| { + StorageIOError::new( + ErrorSubject::Snapshot(new_snapshot.meta.clone()), + ErrorVerb::Read, + AnyError::new(&e), + ) + })?; + let mut sm = self.sm.write().await; + *sm = new_sm; + } + + // Update current snapshot. + let mut current_snapshot = self.current_snapshot.write().await; + *current_snapshot = Some(new_snapshot); + Ok(StateMachineChanges { + last_applied: meta.last_log_id, + is_snapshot: true, + }) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn get_current_snapshot(&self) -> Result>, StorageError> { + match &*self.current_snapshot.read().await { + Some(snapshot) => { + let data = snapshot.data.clone(); + Ok(Some(Snapshot { + meta: snapshot.meta.clone(), + snapshot: Box::new(Cursor::new(data)), + })) + } + None => Ok(None), + } + } +} diff --git a/example-raft-kv/test-cluster.sh b/example-raft-kv/test-cluster.sh new file mode 100755 index 000000000..4c41c38e8 --- /dev/null +++ b/example-raft-kv/test-cluster.sh @@ -0,0 +1,79 @@ +#!/bin/sh + +set -o errexit + +cargo build + +rpc() { + local uri=$1 + local body="$2" + + echo '---'" rpc(:$uri, $body)" + + { + if [ ".$body" == "." ]; then + curl --silent "127.0.0.1:$uri" + else + curl --silent "127.0.0.1:$uri" -H "Content-Type: application/json" -d "$body" + fi + } | { + if which -s jq; then + jq + else + cat + fi + } + + echo + echo +} + +export RUST_LOG=debug + +echo "=== Kill all running raftkv" + +killall raftkv +sleep 1 + +echo "=== Start 3 uninitialized raftkv servers: 1, 2, 3" + +nohup ./target/debug/raftkv --id 1 --http-addr 127.0.0.1:21001 > n1.log & +nohup ./target/debug/raftkv --id 2 --http-addr 127.0.0.1:21002 > n2.log & +nohup ./target/debug/raftkv --id 3 --http-addr 127.0.0.1:21003 > n3.log & +sleep 1 + +echo "=== Initialize node-1 as a single-node cluster" + +rpc 21001/init '{}' +sleep 0.2 +rpc 21001/metrics + + +echo "=== Add 3 node addresses so that RaftNetwork is able to find peers by id" + +rpc 21001/write '{"AddNode":{"id":1,"addr":"127.0.0.1:21001"}}' +rpc 21001/write '{"AddNode":{"id":2,"addr":"127.0.0.1:21002"}}' +rpc 21001/write '{"AddNode":{"id":3,"addr":"127.0.0.1:21003"}}' + +echo "=== List known nodes in clusters" + +rpc 21001/list-nodes + +echo "=== Add node-2 and node-2 as Learners, to receive log from leader node-1" + +rpc 21001/add-learner '2' +rpc 21001/add-learner '3' + +echo "=== Change membership from [1] to 3 nodes cluster: [1, 2, 3]" + +rpc 21001/change-membership '[1, 2, 3]' +rpc 21001/metrics + +echo "=== Write on leader and read on every node" + +rpc 21001/write '{"Set":{"key":"foo","value":"bar"}}' +sleep 0.1 + +rpc 21001/read '"foo"' +rpc 21002/read '"foo"' +rpc 21003/read '"foo"' diff --git a/openraft/src/error.rs b/openraft/src/error.rs index 4fda7d496..4e4592fe2 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -17,7 +17,7 @@ use crate::StorageError; use crate::Vote; /// Fatal is unrecoverable and shuts down raft at once. -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)] pub enum Fatal { #[error(transparent)] StorageError(#[from] StorageError), @@ -50,19 +50,19 @@ where E: TryInto + Clone } } -#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] +#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)] pub enum AppendEntriesError { #[error(transparent)] Fatal(#[from] Fatal), } -#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] +#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)] pub enum VoteError { #[error(transparent)] Fatal(#[from] Fatal), } -#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] +#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)] pub enum InstallSnapshotError { #[error(transparent)] SnapshotMismatch(#[from] SnapshotMismatch), @@ -72,7 +72,7 @@ pub enum InstallSnapshotError { } /// An error related to a client read request. -#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] +#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)] pub enum ClientReadError { #[error(transparent)] ForwardToLeader(#[from] ForwardToLeader), @@ -85,7 +85,7 @@ pub enum ClientReadError { } /// An error related to a client write request. -#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] +#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)] pub enum ClientWriteError { #[error(transparent)] ForwardToLeader(#[from] ForwardToLeader), diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index 309878e07..55c0d8585 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -398,7 +398,7 @@ impl, S: RaftStorage> Cl pub(crate) type RaftRespTx = oneshot::Sender>; pub(crate) type RaftRespRx = oneshot::Receiver>; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct AddLearnerResponse { pub matched: Option, } From 42748fe61b38b45b5bae70c1a532afa14179ed66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Thu, 3 Feb 2022 01:39:46 +0800 Subject: [PATCH 2/3] Fixup: exclude example-raft-kv from workspace --- Cargo.toml | 4 +++- example-raft-kv/README.md | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 67ef6b5f2..5cc80b28c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,5 +2,7 @@ members = [ "openraft", "memstore", - "example-raft-kv", ] +exclude = [ + "example-raft-kv", +] \ No newline at end of file diff --git a/example-raft-kv/README.md b/example-raft-kv/README.md index ac2fd05c4..5664bf5f9 100644 --- a/example-raft-kv/README.md +++ b/example-raft-kv/README.md @@ -15,8 +15,9 @@ Includes: ## Run it ```shell +cd example-raft-kv cargo build -./example-raft-kv/test-cluster.sh +./test-cluster.sh ``` ## Cluster management From 17d16a14a5b87025a2b1fb044e828ae8fdc63a1e Mon Sep 17 00:00:00 2001 From: Pedro Paulo de Amorim Date: Wed, 2 Feb 2022 20:49:48 +0000 Subject: [PATCH 3/3] Fixup: Improve example --- example-raft-kv/.gitignore | 7 ++ example-raft-kv/Cargo.toml | 16 +-- example-raft-kv/README.md | 94 +++++++++++++- example-raft-kv/src/app.rs | 2 + example-raft-kv/src/bin/main.rs | 88 +++++++++++++ example-raft-kv/src/bin/raftkv.rs | 74 ----------- example-raft-kv/src/lib.rs | 3 +- example-raft-kv/src/network/api.rs | 34 +++++ example-raft-kv/src/network/management.rs | 58 +++++++++ example-raft-kv/src/network/mod.rs | 4 + example-raft-kv/src/network/raft.rs | 34 +++++ .../src/{network.rs => network/rpc.rs} | 4 +- example-raft-kv/src/rpc_handlers.rs | 114 ----------------- .../src/{store.rs => store/mod.rs} | 82 +++++++----- example-raft-kv/test-cluster.sh | 118 +++++++++++++++--- 15 files changed, 475 insertions(+), 257 deletions(-) create mode 100644 example-raft-kv/.gitignore create mode 100644 example-raft-kv/src/bin/main.rs delete mode 100644 example-raft-kv/src/bin/raftkv.rs create mode 100644 example-raft-kv/src/network/api.rs create mode 100644 example-raft-kv/src/network/management.rs create mode 100644 example-raft-kv/src/network/mod.rs create mode 100644 example-raft-kv/src/network/raft.rs rename example-raft-kv/src/{network.rs => network/rpc.rs} (94%) delete mode 100644 example-raft-kv/src/rpc_handlers.rs rename example-raft-kv/src/{store.rs => store/mod.rs} (78%) diff --git a/example-raft-kv/.gitignore b/example-raft-kv/.gitignore new file mode 100644 index 000000000..56c7098dd --- /dev/null +++ b/example-raft-kv/.gitignore @@ -0,0 +1,7 @@ +# Directory Ignores ########################################################## +target +vendor +.idea + +# File Ignores ############################################################### +Cargo.lock \ No newline at end of file diff --git a/example-raft-kv/Cargo.toml b/example-raft-kv/Cargo.toml index 1ffbb1bb7..6264b4210 100644 --- a/example-raft-kv/Cargo.toml +++ b/example-raft-kv/Cargo.toml @@ -1,11 +1,13 @@ [package] -name = "example-raft-kv" +name = "example-raft-key-value" version = "0.1.0" edition = "2021" -authors = [ "drdr xp ", ] +authors = [ + "drdr xp ", + "Pedro Paulo de Amorim " +] categories = ["algorithms", "asynchronous", "data-structures"] -description = "An example distributed kv-store built upon `openraft`." -#documentation = "https://docs.rs/memstore" +description = "An example distributed key-value store built upon `openraft`." homepage = "https://github.com/datafuselabs/openraft" keywords = ["raft", "consensus"] license = "MIT/Apache-2.0" @@ -13,9 +15,8 @@ repository = "https://github.com/datafuselabs/openraft" readme = "README.md" [[bin]] -name = "raftkv" -path = "src/bin/raftkv.rs" - +name = "raft-key-value" +path = "src/bin/main.rs" [dependencies] actix-web = "4.0.0-rc.2" @@ -31,7 +32,6 @@ tokio = { version="1.0", default-features=false, features=["sync"] } tracing = "0.1.29" tracing-futures = "0.2.4" - [dev-dependencies] maplit = "1.0.2" diff --git a/example-raft-kv/README.md b/example-raft-kv/README.md index 5664bf5f9..7052e4673 100644 --- a/example-raft-kv/README.md +++ b/example-raft-kv/README.md @@ -1,8 +1,8 @@ -# Example distributed KV store built upon openraft +# Example distributed key-value store built upon openraft. -It is an example of how to build a real-world kv store with openraft. +It is an example of how to build a real-world key-value store with `openraft`. Includes: -- An in-memory `RaftStorage` implementation [store](./store). +- An in-memory `RaftStorage` implementation [store](./src/store/store.rs). - A server is based on [actix-web](https://docs.rs/actix-web/4.0.0-rc.2). Includes: @@ -10,16 +10,98 @@ Includes: - Admin APIs to add nodes, change-membership etc. - Application APIs to write a value by key or read a value by key. -- Client and `RaftNetwork`([network](./network)) are built upon [reqwest](https://docs.rs/reqwest). +- Client and `RaftNetwork`([rpc](./src/network/rpc.rs)) are built upon [reqwest](https://docs.rs/reqwest). ## Run it +If you want to see a simulation of 3 nodes running and sharing data, you can run the cluster demo: + ```shell -cd example-raft-kv -cargo build ./test-cluster.sh ``` +if you want to compile the application, run: + +```shell +cargo build +``` + +(If you append `--release` to make it compile in production, but we don't recommend to use +this project in production yet.) + +To run it, get the binary `raft-key-value` inside `target/debug` and run: + +```shell +./raft-key-value --id 1 --http-addr 127.0.0.1:21001 +``` + +It will start a node. + +To start the following nodes: + +```shell +./raft-key-value --id 2 --http-addr 127.0.0.1:21002 +``` + +You can continue replicating the nodes by changing the `id` and `http-addr`. + +After that, call the first node created: + +``` +POST - 127.0.0.1:21001/init +``` + +It will define the first node created as the leader. + +After that you will need to notify the leader node about the other nodes: + +``` +POST - 127.0.0.1:21001/write '{"AddNode":{"id":1,"addr":"127.0.0.1:21001"}}' +POST - 127.0.0.1:21001/write '{"AddNode":{"id":2,"addr":"127.0.0.1:21002"}}' +... +``` + +Then you need to inform to the leader that these nodes are learners: + +``` +POST - 127.0.0.1:21001/add-learner "2" +``` + +Now you need to tell the leader to add all learners as members of the cluster: + +``` +POST - 127.0.0.1:21001/change-membership "[1, 2]" +``` + +Write some data in any of the nodes: + +``` +POST - 127.0.0.1:21001/write "{"Set":{"key":"foo","value":"bar"}}" +``` + +Read the data from any node: + +``` +POST - 127.0.0.1:21002/read "foo" +``` + +You should be able to read that on the another instance even if you did not sync any data! + + +## How it's structured. + +The application is separated in 4 modules: + + - `bin`: You can find the `main()` function in [main](./src/bin/main.rs) the file where the setup for the server happens. + - `network`: You can find the [api](./src/network/api.rs) that implements the endpoints used by the public API and [rpc](./src/network/rpc.rs) where all the raft communication from the node happens. [management](./src/network/management.rs) is where all the administration endpoints are present, those are used to add orremove nodes, promote and more. [raft](./src/network/raft.rs) is where all the communication are received from other nodes. + - `store`: You can find the file [store](./src/store/mod.rs) where all the key-value implementation is done. Here is where your data application will be managed. + +## Where is my data? + +The data is store inside state machines, each state machine represents a point of data and +raft enforces that all nodes have the same data in synchronization. You can have a look of +the struct [ExampleStateMachine](./src/store/mod.rs) + ## Cluster management The raft itself does not store node addresses. diff --git a/example-raft-kv/src/app.rs b/example-raft-kv/src/app.rs index f34f669ce..f50dbd96e 100644 --- a/example-raft-kv/src/app.rs +++ b/example-raft-kv/src/app.rs @@ -6,6 +6,8 @@ use openraft::NodeId; use crate::ExampleRaft; use crate::ExampleStore; +// Representation of an application state. This struct can be shared around to share +// instances of raft, store and more. pub struct ExampleApp { pub id: NodeId, pub raft: ExampleRaft, diff --git a/example-raft-kv/src/bin/main.rs b/example-raft-kv/src/bin/main.rs new file mode 100644 index 000000000..69bb5a240 --- /dev/null +++ b/example-raft-kv/src/bin/main.rs @@ -0,0 +1,88 @@ +use std::sync::Arc; + +use actix_web::middleware; +use actix_web::middleware::Logger; +use actix_web::web::Data; +use actix_web::App; +use actix_web::HttpServer; +use clap::Parser; +use env_logger::Env; +use example_raft_key_value::app::ExampleApp; +use example_raft_key_value::network::api; +use example_raft_key_value::network::management; +use example_raft_key_value::network::raft; +use example_raft_key_value::network::rpc::ExampleNetwork; +use example_raft_key_value::store::ExampleRequest; +use example_raft_key_value::store::ExampleResponse; +use example_raft_key_value::store::ExampleStore; +use openraft::Config; +use openraft::Raft; + +pub type ExampleRaft = Raft; + +#[derive(Parser, Clone, Debug)] +#[clap(author, version, about, long_about = None)] +pub struct Opt { + #[clap(long)] + pub id: u64, + + #[clap(long)] + pub http_addr: String, +} + +#[actix_web::main] +async fn main() -> std::io::Result<()> { + // Setup the logger + env_logger::init_from_env(Env::default().default_filter_or("info")); + + // Parse the parameters passed by arguments. + let options = Opt::parse(); + let node_id = options.id; + + // Create a configuration for the raft instance. + let config = Arc::new(Config::default().validate().unwrap()); + + // Create a instance of where the Raft data will be stored. + let store = Arc::new(ExampleStore::default()); + + // Create the network layer that will connect and communicate the raft instances and + // will be used in conjunction with the store created above. + let network = Arc::new(ExampleNetwork { store: store.clone() }); + + // Create a local raft instance. + let raft = Raft::new(node_id, config.clone(), network, store.clone()); + + // Create an application that will store all the instances created above, this will + // be later used on the actix-web services. + let app = Data::new(ExampleApp { + id: options.id, + raft, + store, + config, + }); + + // Start the actix-web server. + HttpServer::new(move || { + App::new() + .wrap(Logger::default()) + .wrap(Logger::new("%a %{User-Agent}i")) + .wrap(middleware::Compress::default()) + .app_data(app.clone()) + // raft internal RPC + .service(raft::append) + .service(raft::snapshot) + .service(raft::vote) + // admin API + .service(management::init) + .service(management::add_learner) + .service(management::change_membership) + .service(management::metrics) + .service(management::list_nodes) + // application API + .service(api::write) + .service(api::read) + }) + .bind(options.http_addr)? + .run() + .await +} diff --git a/example-raft-kv/src/bin/raftkv.rs b/example-raft-kv/src/bin/raftkv.rs deleted file mode 100644 index 57abfcaee..000000000 --- a/example-raft-kv/src/bin/raftkv.rs +++ /dev/null @@ -1,74 +0,0 @@ -use std::sync::Arc; - -use actix_web::middleware; -use actix_web::middleware::Logger; -use actix_web::web::Data; -use actix_web::App; -use actix_web::HttpServer; -use clap::Parser; -use env_logger::Env; -use example_raft_kv::app::ExampleApp; -use example_raft_kv::network::ExampleNetwork; -use example_raft_kv::rpc_handlers; -use example_raft_kv::store::ExampleRequest; -use example_raft_kv::store::ExampleResponse; -use example_raft_kv::store::ExampleStore; -use openraft::Config; -use openraft::Raft; - -pub type ExampleRaft = Raft; - -#[derive(Parser, Clone, Debug)] -#[clap(author, version, about, long_about = None)] -pub struct Opt { - #[clap(long)] - pub id: u64, - - #[clap(long)] - pub http_addr: String, -} - -#[actix_web::main] -async fn main() -> std::io::Result<()> { - env_logger::init_from_env(Env::default().default_filter_or("info")); - - let opt = Opt::parse(); - let node_id = opt.id; - - let config = Arc::new(Config::default().validate().unwrap()); - let store = Arc::new(ExampleStore::default()); - let network = Arc::new(ExampleNetwork { store: store.clone() }); - - let raft = Raft::new(node_id, config.clone(), network, store.clone()); - - let app = Data::new(ExampleApp { - id: opt.id, - raft, - store, - config, - }); - - HttpServer::new(move || { - App::new() - .wrap(Logger::default()) - .wrap(Logger::new("%a %{User-Agent}i")) - .wrap(middleware::Compress::default()) - .app_data(app.clone()) - // raft internal RPC - .service(rpc_handlers::append) - .service(rpc_handlers::snapshot) - .service(rpc_handlers::vote) - // admin API - .service(rpc_handlers::init) - .service(rpc_handlers::add_learner) - .service(rpc_handlers::change_membership) - .service(rpc_handlers::metrics) - .service(rpc_handlers::list_nodes) - // application API - .service(rpc_handlers::write) - .service(rpc_handlers::read) - }) - .bind(opt.http_addr)? - .run() - .await -} diff --git a/example-raft-kv/src/lib.rs b/example-raft-kv/src/lib.rs index 4c6324478..d6f458c59 100644 --- a/example-raft-kv/src/lib.rs +++ b/example-raft-kv/src/lib.rs @@ -1,13 +1,12 @@ use openraft::Raft; -use crate::network::ExampleNetwork; +use crate::network::rpc::ExampleNetwork; use crate::store::ExampleRequest; use crate::store::ExampleResponse; use crate::store::ExampleStore; pub mod app; pub mod network; -pub mod rpc_handlers; pub mod store; pub type ExampleRaft = Raft; diff --git a/example-raft-kv/src/network/api.rs b/example-raft-kv/src/network/api.rs new file mode 100644 index 000000000..dfae64f0e --- /dev/null +++ b/example-raft-kv/src/network/api.rs @@ -0,0 +1,34 @@ +use actix_web::post; +use actix_web::web; +use actix_web::web::Data; +use actix_web::Responder; +use openraft::raft::ClientWriteRequest; +use openraft::raft::EntryPayload; +use web::Json; + +use crate::app::ExampleApp; +use crate::store::ExampleRequest; + +/** + * Application API + * + * This is where you place your application, you can use the example below to create your + * API. The current implementation: + * + * - `POST - /write` saves a value in a key and sync the nodes. + * - `GET - /read` attempt to find a value from a given key. + */ +#[post("/write")] +pub async fn write(app: Data, req: Json) -> actix_web::Result { + let request = ClientWriteRequest::new(EntryPayload::Normal(req.0)); + let response = app.raft.client_write(request).await; + Ok(Json(response)) +} + +#[post("/read")] +pub async fn read(app: Data, req: Json) -> actix_web::Result { + let state_machine = app.store.state_machine.read().await; + let key = req.0; + let value = state_machine.data.get(&key).cloned(); + Ok(Json(value.unwrap_or_default())) +} diff --git a/example-raft-kv/src/network/management.rs b/example-raft-kv/src/network/management.rs new file mode 100644 index 000000000..442ccebf4 --- /dev/null +++ b/example-raft-kv/src/network/management.rs @@ -0,0 +1,58 @@ +use std::collections::BTreeSet; + +use actix_web::get; +use actix_web::post; +use actix_web::web; +use actix_web::web::Data; +use actix_web::Responder; +use openraft::NodeId; +use web::Json; + +use crate::app::ExampleApp; + +// --- Cluster management + +/// Add a node as **Learner**. +/// +/// A Learner receives log replication from the leader but does not vote. +/// This should be done before adding a node as a member into the cluster +/// (by calling `change-membership`) +#[post("/add-learner")] +pub async fn add_learner(app: Data, req: Json) -> actix_web::Result { + let response = app.raft.add_learner(req.0, true).await; + Ok(Json(response)) +} + +/// Changes specified learners to members, or remove members. +#[post("/change-membership")] +pub async fn change_membership( + app: Data, + req: Json>, +) -> actix_web::Result { + let response = app.raft.change_membership(req.0, true).await; + Ok(Json(response)) +} + +/// Initialize a single-node cluster. +#[post("/init")] +pub async fn init(app: Data) -> actix_web::Result { + let mut nodes = BTreeSet::new(); + nodes.insert(app.id); + let response = app.raft.initialize(nodes).await; + Ok(Json(response)) +} + +/// Get the latest metrics of the cluster +#[get("/metrics")] +pub async fn metrics(app: Data) -> actix_web::Result { + let response = app.raft.metrics().borrow().clone(); + Ok(Json(response)) +} + +/// List known nodes of the cluster. +#[get("/list-nodes")] +pub async fn list_nodes(app: Data) -> actix_web::Result { + let state_machine = app.store.state_machine.read().await; + let response = state_machine.nodes.clone(); + Ok(Json(response)) +} diff --git a/example-raft-kv/src/network/mod.rs b/example-raft-kv/src/network/mod.rs new file mode 100644 index 000000000..62300907b --- /dev/null +++ b/example-raft-kv/src/network/mod.rs @@ -0,0 +1,4 @@ +pub mod api; +pub mod management; +pub mod raft; +pub mod rpc; diff --git a/example-raft-kv/src/network/raft.rs b/example-raft-kv/src/network/raft.rs new file mode 100644 index 000000000..f10c1e914 --- /dev/null +++ b/example-raft-kv/src/network/raft.rs @@ -0,0 +1,34 @@ +use actix_web::post; +use actix_web::web; +use actix_web::web::Data; +use actix_web::Responder; +use openraft::raft::AppendEntriesRequest; +use openraft::raft::InstallSnapshotRequest; +use openraft::raft::VoteRequest; +use web::Json; + +use crate::app::ExampleApp; +use crate::store::ExampleRequest; + +// --- Raft communication + +#[post("/raft-vote")] +pub async fn vote(app: Data, req: Json) -> actix_web::Result { + let res = app.raft.vote(req.0).await; + Ok(Json(res)) +} + +#[post("/raft-append")] +pub async fn append( + app: Data, + req: Json>, +) -> actix_web::Result { + let res = app.raft.append_entries(req.0).await; + Ok(Json(res)) +} + +#[post("/raft-snapshot")] +pub async fn snapshot(app: Data, req: Json) -> actix_web::Result { + let res = app.raft.install_snapshot(req.0).await; + Ok(Json(res)) +} diff --git a/example-raft-kv/src/network.rs b/example-raft-kv/src/network/rpc.rs similarity index 94% rename from example-raft-kv/src/network.rs rename to example-raft-kv/src/network/rpc.rs index 24077ddbc..537fe22b9 100644 --- a/example-raft-kv/src/network.rs +++ b/example-raft-kv/src/network/rpc.rs @@ -33,8 +33,8 @@ impl ExampleNetwork { Resp: DeserializeOwned, { let addr = { - let sm = self.store.sm.read().await; - sm.nodes.get(&target).unwrap().clone() + let state_machine = self.store.state_machine.read().await; + state_machine.nodes.get(&target).unwrap().clone() }; let url = format!("http://{}/{}", addr, uri); diff --git a/example-raft-kv/src/rpc_handlers.rs b/example-raft-kv/src/rpc_handlers.rs deleted file mode 100644 index 574341c19..000000000 --- a/example-raft-kv/src/rpc_handlers.rs +++ /dev/null @@ -1,114 +0,0 @@ -use std::collections::BTreeSet; - -use actix_web::get; -use actix_web::post; -use actix_web::web; -use actix_web::web::Data; -use actix_web::Responder; -use openraft::raft::AppendEntriesRequest; -use openraft::raft::ClientWriteRequest; -use openraft::raft::EntryPayload; -use openraft::raft::InstallSnapshotRequest; -use openraft::raft::VoteRequest; -use openraft::NodeId; -use serde::Deserialize; -use serde::Serialize; -use web::Json; - -use crate::app::ExampleApp; -use crate::store::ExampleRequest; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Empty {} - -// --- Raft communication - -#[post("/raft-vote")] -pub async fn vote(app: Data, req: Json) -> actix_web::Result { - let res = app.raft.vote(req.0).await; - Ok(Json(res)) -} - -#[post("/raft-append")] -pub async fn append( - app: Data, - req: Json>, -) -> actix_web::Result { - let res = app.raft.append_entries(req.0).await; - Ok(Json(res)) -} - -#[post("/raft-snapshot")] -pub async fn snapshot(app: Data, req: Json) -> actix_web::Result { - let res = app.raft.install_snapshot(req.0).await; - Ok(Json(res)) -} - -// --- Application API - -#[post("/write")] -pub async fn write(app: Data, req: Json) -> actix_web::Result { - let res = app.raft.client_write(ClientWriteRequest::new(EntryPayload::Normal(req.0))).await; - Ok(Json(res)) -} - -#[post("/read")] -pub async fn read(app: Data, req: Json) -> actix_web::Result { - let res = { - let sm = app.store.sm.read().await; - let key = req.0; - let value = sm.kvs.get(&key).cloned(); - value.unwrap_or_default() - }; - Ok(Json(res)) -} - -// --- Cluster management - -/// Add a node as **Learner**. -/// -/// A Learner receives log replication from the leader but does not vote. -/// This should be done before adding a node as a member into the cluster(by calling `change-membership`) -#[post("/add-learner")] -pub async fn add_learner(app: Data, req: Json) -> actix_web::Result { - let res = app.raft.add_learner(req.0, true).await; - Ok(Json(res)) -} - -/// Changes specified learners to members, or remove members. -#[post("/change-membership")] -pub async fn change_membership( - app: Data, - req: Json>, -) -> actix_web::Result { - let res = app.raft.change_membership(req.0, true).await; - Ok(Json(res)) -} - -/// Initialize a single-node cluster. -#[post("/init")] -pub async fn init(app: Data, _req: Json) -> actix_web::Result { - let mut nodes = BTreeSet::new(); - nodes.insert(app.id); - - let res = app.raft.initialize(nodes).await; - Ok(Json(res)) -} - -/// Get the latest metrics of the cluster -#[get("/metrics")] -pub async fn metrics(app: Data) -> actix_web::Result { - let res = app.raft.metrics().borrow().clone(); - Ok(Json(res)) -} - -/// List known nodes of the cluster. -#[get("/list-nodes")] -pub async fn list_nodes(app: Data) -> actix_web::Result { - let res = { - let sm = app.store.sm.read().await; - sm.nodes.clone() - }; - - Ok(Json(res)) -} diff --git a/example-raft-kv/src/store.rs b/example-raft-kv/src/store/mod.rs similarity index 78% rename from example-raft-kv/src/store.rs rename to example-raft-kv/src/store/mod.rs index 529e1e33e..b2a08795c 100644 --- a/example-raft-kv/src/store.rs +++ b/example-raft-kv/src/store/mod.rs @@ -28,16 +28,29 @@ use serde::Deserialize; use serde::Serialize; use tokio::sync::RwLock; -/// Request is a command to modify the state machine. +/** + * Here you will set the types of request that will interact with the raft nodes. + * For example the `Set` will be used to write data (key and value) to the raft database. + * The `AddNode` will append a new node to the current existing shared list of nodes. + * You will want to add any request that can write data in all nodes here. + */ #[derive(Serialize, Deserialize, Debug, Clone)] pub enum ExampleRequest { Set { key: String, value: String }, AddNode { id: NodeId, addr: String }, } +// Inform to raft that `ExampleRequest` is an application data to be used by raft. impl AppData for ExampleRequest {} -/// The state after applied a request. +/** + * Here you will defined what type of answer you expect from reading the data of a node. + * In this example it will return a optional value from a given key in + * the `ExampleRequest.Set`. + * + * TODO: SHould we explain how to create multiple `AppDataResponse`? + * + */ #[derive(Serialize, Deserialize, Debug, Clone)] pub struct ExampleResponse { pub value: Option, @@ -53,6 +66,12 @@ pub struct ExampleSnapshot { pub data: Vec, } +/** + * Here defines a state machine of the raft, this state represents a copy of the data + * between each node. Note that we are using `serde` to serialize the `data`, which has + * a implementation to be serialized. Note that for this test we set both the key and + * value as String, but you could set any type of value that has the serialization impl. + */ #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct ExampleStateMachine { pub last_applied_log: Option, @@ -63,7 +82,7 @@ pub struct ExampleStateMachine { pub nodes: BTreeMap, /// Application data. - pub kvs: BTreeMap, + pub data: BTreeMap, } #[derive(Debug, Default)] @@ -74,7 +93,7 @@ pub struct ExampleStore { log: RwLock>>, /// The Raft state machine. - pub sm: RwLock, + pub state_machine: RwLock, /// The current granted vote. vote: RwLock>, @@ -90,8 +109,8 @@ impl RaftStorage for ExampleStore { #[tracing::instrument(level = "trace", skip(self))] async fn save_vote(&self, vote: &Vote) -> Result<(), StorageError> { + // TODO: What `h` stands for? let mut h = self.vote.write().await; - *h = Some(*vote); Ok(()) } @@ -121,12 +140,9 @@ impl RaftStorage for ExampleStore { &self, range: RB, ) -> Result>, StorageError> { - let res = { - let log = self.log.read().await; - log.range(range.clone()).map(|(_, val)| val.clone()).collect::>() - }; - - Ok(res) + let log = self.log.read().await; + let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::>(); + Ok(response) } #[tracing::instrument(level = "trace", skip(self, entries))] @@ -142,13 +158,10 @@ impl RaftStorage for ExampleStore { async fn delete_conflict_logs_since(&self, log_id: LogId) -> Result<(), StorageError> { tracing::debug!("delete_log: [{:?}, +oo)", log_id); - { - let mut log = self.log.write().await; - - let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); - for key in keys { - log.remove(&key); - } + let mut log = self.log.write().await; + let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); } Ok(()) @@ -177,8 +190,8 @@ impl RaftStorage for ExampleStore { } async fn last_applied_state(&self) -> Result<(Option, Option), StorageError> { - let sm = self.sm.read().await; - Ok((sm.last_applied_log, sm.last_membership.clone())) + let state_machine = self.state_machine.read().await; + Ok((state_machine.last_applied_log, state_machine.last_membership.clone())) } #[tracing::instrument(level = "trace", skip(self, entries))] @@ -188,7 +201,7 @@ impl RaftStorage for ExampleStore { ) -> Result, StorageError> { let mut res = Vec::with_capacity(entries.len()); - let mut sm = self.sm.write().await; + let mut sm = self.state_machine.write().await; for entry in entries { tracing::debug!(%entry.log_id, "replicate to sm"); @@ -199,7 +212,7 @@ impl RaftStorage for ExampleStore { EntryPayload::Blank => res.push(ExampleResponse { value: None }), EntryPayload::Normal(ref req) => match req { ExampleRequest::Set { key, value } => { - sm.kvs.insert(key.clone(), value.clone()); + sm.data.insert(key.clone(), value.clone()); res.push(ExampleResponse { value: Some(value.clone()), }) @@ -229,11 +242,11 @@ impl RaftStorage for ExampleStore { { // Serialize the data of the state machine. - let sm = self.sm.read().await; - data = serde_json::to_vec(&*sm) + let state_machine = self.state_machine.read().await; + data = serde_json::to_vec(&*state_machine) .map_err(|e| StorageIOError::new(ErrorSubject::StateMachine, ErrorVerb::Read, AnyError::new(&e)))?; - last_applied_log = sm.last_applied_log; + last_applied_log = state_machine.last_applied_log; } let last_applied_log = match last_applied_log { @@ -298,15 +311,16 @@ impl RaftStorage for ExampleStore { // Update the state machine. { - let new_sm: ExampleStateMachine = serde_json::from_slice(&new_snapshot.data).map_err(|e| { - StorageIOError::new( - ErrorSubject::Snapshot(new_snapshot.meta.clone()), - ErrorVerb::Read, - AnyError::new(&e), - ) - })?; - let mut sm = self.sm.write().await; - *sm = new_sm; + let updated_state_machine: ExampleStateMachine = + serde_json::from_slice(&new_snapshot.data).map_err(|e| { + StorageIOError::new( + ErrorSubject::Snapshot(new_snapshot.meta.clone()), + ErrorVerb::Read, + AnyError::new(&e), + ) + })?; + let mut state_machine = self.state_machine.write().await; + *state_machine = updated_state_machine; } // Update current snapshot. diff --git a/example-raft-kv/test-cluster.sh b/example-raft-kv/test-cluster.sh index 4c41c38e8..3bb916c1b 100755 --- a/example-raft-kv/test-cluster.sh +++ b/example-raft-kv/test-cluster.sh @@ -4,6 +4,17 @@ set -o errexit cargo build +kill() { + if [ "$(uname)" == "Darwin" ]; then + SERVICE='raft-key-value' + if pgrep -xq -- "${SERVICE}"; then + pkill -f "${SERVICE}" + fi + else + killall raft-key-value + fi +} + rpc() { local uri=$1 local body="$2" @@ -30,50 +41,123 @@ rpc() { export RUST_LOG=debug -echo "=== Kill all running raftkv" +echo "Killing all running raft-key-value" + +kill -killall raftkv sleep 1 -echo "=== Start 3 uninitialized raftkv servers: 1, 2, 3" +echo "Start 3 uninitialized raft-key-value servers..." -nohup ./target/debug/raftkv --id 1 --http-addr 127.0.0.1:21001 > n1.log & -nohup ./target/debug/raftkv --id 2 --http-addr 127.0.0.1:21002 > n2.log & -nohup ./target/debug/raftkv --id 3 --http-addr 127.0.0.1:21003 > n3.log & +nohup ./target/debug/raft-key-value --id 1 --http-addr 127.0.0.1:21001 > n1.log & sleep 1 +echo "Server 1 started" -echo "=== Initialize node-1 as a single-node cluster" +nohup ./target/debug/raft-key-value --id 2 --http-addr 127.0.0.1:21002 > n2.log & +sleep 1 +echo "Server 2 started" + +nohup ./target/debug/raft-key-value --id 3 --http-addr 127.0.0.1:21003 > n3.log & +sleep 1 +echo "Server 3 started" +sleep 1 +echo "Initialize server 1 as a single-node cluster" +sleep 2 +echo rpc 21001/init '{}' -sleep 0.2 -rpc 21001/metrics +echo "Server 1 is a leader now" + +sleep 2 -echo "=== Add 3 node addresses so that RaftNetwork is able to find peers by id" +echo "Get metrics from the leader" +sleep 2 +echo +rpc 21001/metrics +sleep 1 +echo "Add 3 node addresses so that RaftNetwork is able to find peers by the ID" +echo +sleep 2 +echo "Adding node 1" +echo rpc 21001/write '{"AddNode":{"id":1,"addr":"127.0.0.1:21001"}}' +sleep 2 +echo "Node 1 added" +echo +sleep 2 +echo "Adding node 2" +echo rpc 21001/write '{"AddNode":{"id":2,"addr":"127.0.0.1:21002"}}' +sleep 2 +echo "Node 2 added" +echo +sleep 2 +echo "Adding node 3" +echo rpc 21001/write '{"AddNode":{"id":3,"addr":"127.0.0.1:21003"}}' +sleep 2 +echo "Node 3 added" +echo -echo "=== List known nodes in clusters" +sleep 2 +echo "Listing all known nodes in the clusters..." +echo rpc 21001/list-nodes -echo "=== Add node-2 and node-2 as Learners, to receive log from leader node-1" +sleep 1 + +echo "Adding node 2 and node 3 as learners, to receive log from leader node 1" +sleep 1 +echo rpc 21001/add-learner '2' +echo "Node 2 added as leaner" +sleep 1 +echo rpc 21001/add-learner '3' +echo "Node 3 added as leaner" +sleep 1 -echo "=== Change membership from [1] to 3 nodes cluster: [1, 2, 3]" - +echo "Changing membership from [1] to 3 nodes cluster: [1, 2, 3]" +echo rpc 21001/change-membership '[1, 2, 3]' -rpc 21001/metrics +sleep 1 +echo "Membership changed" +sleep 1 -echo "=== Write on leader and read on every node" +echo "Get metrics from the leader again" +sleep 1 +echo +rpc 21001/metrics +sleep 1 +echo "Write data on leader" +sleep 1 +echo rpc 21001/write '{"Set":{"key":"foo","value":"bar"}}' -sleep 0.1 +sleep 1 +echo "Data written" +sleep 1 +echo "Read on every node, including the leader" +sleep 1 +echo "Read from node 1" +echo rpc 21001/read '"foo"' +echo "Read from node 2" +echo rpc 21002/read '"foo"' +echo "Read from node 3" +echo rpc 21003/read '"foo"' + +echo "Killing all nodes in 3s..." +sleep 1 +echo "Killing all nodes in 2s..." +sleep 1 +echo "Killing all nodes in 1s..." +sleep 1 +kill \ No newline at end of file