diff --git a/Cargo.toml b/Cargo.toml index c473eb3cf..5cc80b28c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,3 +3,6 @@ members = [ "openraft", "memstore", ] +exclude = [ + "example-raft-kv", +] \ No newline at end of file diff --git a/example-raft-kv/.gitignore b/example-raft-kv/.gitignore new file mode 100644 index 000000000..56c7098dd --- /dev/null +++ b/example-raft-kv/.gitignore @@ -0,0 +1,7 @@ +# Directory Ignores ########################################################## +target +vendor +.idea + +# File Ignores ############################################################### +Cargo.lock \ No newline at end of file diff --git a/example-raft-kv/Cargo.toml b/example-raft-kv/Cargo.toml new file mode 100644 index 000000000..6264b4210 --- /dev/null +++ b/example-raft-kv/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "example-raft-key-value" +version = "0.1.0" +edition = "2021" +authors = [ + "drdr xp ", + "Pedro Paulo de Amorim " +] +categories = ["algorithms", "asynchronous", "data-structures"] +description = "An example distributed key-value store built upon `openraft`." +homepage = "https://github.com/datafuselabs/openraft" +keywords = ["raft", "consensus"] +license = "MIT/Apache-2.0" +repository = "https://github.com/datafuselabs/openraft" +readme = "README.md" + +[[bin]] +name = "raft-key-value" +path = "src/bin/main.rs" + +[dependencies] +actix-web = "4.0.0-rc.2" +anyerror = { version = "0.1.1"} +async-trait = "0.1.36" +clap = { version = "3.0.13", features = ["derive", "env"] } +env_logger = "0.9.0" +openraft = { version="0.6", path= "../openraft" } +reqwest = { version = "0.11.9", features = ["json"] } +serde = { version="1.0.114", features=["derive"] } +serde_json = "1.0.57" +tokio = { version="1.0", default-features=false, features=["sync"] } +tracing = "0.1.29" +tracing-futures = "0.2.4" + +[dev-dependencies] +maplit = "1.0.2" + +[features] +docinclude = [] # Used only for activating `doc(include="...")` on nightly. + +[package.metadata.docs.rs] +features = ["docinclude"] # Activate `docinclude` during docs.rs build. diff --git a/example-raft-kv/README.md b/example-raft-kv/README.md new file mode 100644 index 000000000..7052e4673 --- /dev/null +++ b/example-raft-kv/README.md @@ -0,0 +1,119 @@ +# Example distributed key-value store built upon openraft. + +It is an example of how to build a real-world key-value store with `openraft`. +Includes: +- An in-memory `RaftStorage` implementation [store](./src/store/store.rs). + +- A server is based on [actix-web](https://docs.rs/actix-web/4.0.0-rc.2). + Includes: + - raft-internal network APIs for replication and voting. + - Admin APIs to add nodes, change-membership etc. + - Application APIs to write a value by key or read a value by key. + +- Client and `RaftNetwork`([rpc](./src/network/rpc.rs)) are built upon [reqwest](https://docs.rs/reqwest). + +## Run it + +If you want to see a simulation of 3 nodes running and sharing data, you can run the cluster demo: + +```shell +./test-cluster.sh +``` + +if you want to compile the application, run: + +```shell +cargo build +``` + +(If you append `--release` to make it compile in production, but we don't recommend to use +this project in production yet.) + +To run it, get the binary `raft-key-value` inside `target/debug` and run: + +```shell +./raft-key-value --id 1 --http-addr 127.0.0.1:21001 +``` + +It will start a node. + +To start the following nodes: + +```shell +./raft-key-value --id 2 --http-addr 127.0.0.1:21002 +``` + +You can continue replicating the nodes by changing the `id` and `http-addr`. + +After that, call the first node created: + +``` +POST - 127.0.0.1:21001/init +``` + +It will define the first node created as the leader. + +After that you will need to notify the leader node about the other nodes: + +``` +POST - 127.0.0.1:21001/write '{"AddNode":{"id":1,"addr":"127.0.0.1:21001"}}' +POST - 127.0.0.1:21001/write '{"AddNode":{"id":2,"addr":"127.0.0.1:21002"}}' +... +``` + +Then you need to inform to the leader that these nodes are learners: + +``` +POST - 127.0.0.1:21001/add-learner "2" +``` + +Now you need to tell the leader to add all learners as members of the cluster: + +``` +POST - 127.0.0.1:21001/change-membership "[1, 2]" +``` + +Write some data in any of the nodes: + +``` +POST - 127.0.0.1:21001/write "{"Set":{"key":"foo","value":"bar"}}" +``` + +Read the data from any node: + +``` +POST - 127.0.0.1:21002/read "foo" +``` + +You should be able to read that on the another instance even if you did not sync any data! + + +## How it's structured. + +The application is separated in 4 modules: + + - `bin`: You can find the `main()` function in [main](./src/bin/main.rs) the file where the setup for the server happens. + - `network`: You can find the [api](./src/network/api.rs) that implements the endpoints used by the public API and [rpc](./src/network/rpc.rs) where all the raft communication from the node happens. [management](./src/network/management.rs) is where all the administration endpoints are present, those are used to add orremove nodes, promote and more. [raft](./src/network/raft.rs) is where all the communication are received from other nodes. + - `store`: You can find the file [store](./src/store/mod.rs) where all the key-value implementation is done. Here is where your data application will be managed. + +## Where is my data? + +The data is store inside state machines, each state machine represents a point of data and +raft enforces that all nodes have the same data in synchronization. You can have a look of +the struct [ExampleStateMachine](./src/store/mod.rs) + +## Cluster management + +The raft itself does not store node addresses. +But in a real-world application, the implementation of `RaftNetwork` needs to know the addresses. + +Thus, in this example application: + +- The storage layer has to store nodes' information. +- The network layer keeps a reference to the store so that it is able to get the address of a target node to send RPC to. + +To add a node to a cluster, it includes 3 steps: + +- Write a `node` through raft protocol to the storage. +- Add the node as a `Learner` to let it start receiving replication data from the leader. +- Invoke `change-membership` to change the learner node to a member. \ No newline at end of file diff --git a/example-raft-kv/src/app.rs b/example-raft-kv/src/app.rs new file mode 100644 index 000000000..f50dbd96e --- /dev/null +++ b/example-raft-kv/src/app.rs @@ -0,0 +1,16 @@ +use std::sync::Arc; + +use openraft::Config; +use openraft::NodeId; + +use crate::ExampleRaft; +use crate::ExampleStore; + +// Representation of an application state. This struct can be shared around to share +// instances of raft, store and more. +pub struct ExampleApp { + pub id: NodeId, + pub raft: ExampleRaft, + pub store: Arc, + pub config: Arc, +} diff --git a/example-raft-kv/src/bin/main.rs b/example-raft-kv/src/bin/main.rs new file mode 100644 index 000000000..69bb5a240 --- /dev/null +++ b/example-raft-kv/src/bin/main.rs @@ -0,0 +1,88 @@ +use std::sync::Arc; + +use actix_web::middleware; +use actix_web::middleware::Logger; +use actix_web::web::Data; +use actix_web::App; +use actix_web::HttpServer; +use clap::Parser; +use env_logger::Env; +use example_raft_key_value::app::ExampleApp; +use example_raft_key_value::network::api; +use example_raft_key_value::network::management; +use example_raft_key_value::network::raft; +use example_raft_key_value::network::rpc::ExampleNetwork; +use example_raft_key_value::store::ExampleRequest; +use example_raft_key_value::store::ExampleResponse; +use example_raft_key_value::store::ExampleStore; +use openraft::Config; +use openraft::Raft; + +pub type ExampleRaft = Raft; + +#[derive(Parser, Clone, Debug)] +#[clap(author, version, about, long_about = None)] +pub struct Opt { + #[clap(long)] + pub id: u64, + + #[clap(long)] + pub http_addr: String, +} + +#[actix_web::main] +async fn main() -> std::io::Result<()> { + // Setup the logger + env_logger::init_from_env(Env::default().default_filter_or("info")); + + // Parse the parameters passed by arguments. + let options = Opt::parse(); + let node_id = options.id; + + // Create a configuration for the raft instance. + let config = Arc::new(Config::default().validate().unwrap()); + + // Create a instance of where the Raft data will be stored. + let store = Arc::new(ExampleStore::default()); + + // Create the network layer that will connect and communicate the raft instances and + // will be used in conjunction with the store created above. + let network = Arc::new(ExampleNetwork { store: store.clone() }); + + // Create a local raft instance. + let raft = Raft::new(node_id, config.clone(), network, store.clone()); + + // Create an application that will store all the instances created above, this will + // be later used on the actix-web services. + let app = Data::new(ExampleApp { + id: options.id, + raft, + store, + config, + }); + + // Start the actix-web server. + HttpServer::new(move || { + App::new() + .wrap(Logger::default()) + .wrap(Logger::new("%a %{User-Agent}i")) + .wrap(middleware::Compress::default()) + .app_data(app.clone()) + // raft internal RPC + .service(raft::append) + .service(raft::snapshot) + .service(raft::vote) + // admin API + .service(management::init) + .service(management::add_learner) + .service(management::change_membership) + .service(management::metrics) + .service(management::list_nodes) + // application API + .service(api::write) + .service(api::read) + }) + .bind(options.http_addr)? + .run() + .await +} diff --git a/example-raft-kv/src/lib.rs b/example-raft-kv/src/lib.rs new file mode 100644 index 000000000..d6f458c59 --- /dev/null +++ b/example-raft-kv/src/lib.rs @@ -0,0 +1,12 @@ +use openraft::Raft; + +use crate::network::rpc::ExampleNetwork; +use crate::store::ExampleRequest; +use crate::store::ExampleResponse; +use crate::store::ExampleStore; + +pub mod app; +pub mod network; +pub mod store; + +pub type ExampleRaft = Raft; diff --git a/example-raft-kv/src/network/api.rs b/example-raft-kv/src/network/api.rs new file mode 100644 index 000000000..dfae64f0e --- /dev/null +++ b/example-raft-kv/src/network/api.rs @@ -0,0 +1,34 @@ +use actix_web::post; +use actix_web::web; +use actix_web::web::Data; +use actix_web::Responder; +use openraft::raft::ClientWriteRequest; +use openraft::raft::EntryPayload; +use web::Json; + +use crate::app::ExampleApp; +use crate::store::ExampleRequest; + +/** + * Application API + * + * This is where you place your application, you can use the example below to create your + * API. The current implementation: + * + * - `POST - /write` saves a value in a key and sync the nodes. + * - `GET - /read` attempt to find a value from a given key. + */ +#[post("/write")] +pub async fn write(app: Data, req: Json) -> actix_web::Result { + let request = ClientWriteRequest::new(EntryPayload::Normal(req.0)); + let response = app.raft.client_write(request).await; + Ok(Json(response)) +} + +#[post("/read")] +pub async fn read(app: Data, req: Json) -> actix_web::Result { + let state_machine = app.store.state_machine.read().await; + let key = req.0; + let value = state_machine.data.get(&key).cloned(); + Ok(Json(value.unwrap_or_default())) +} diff --git a/example-raft-kv/src/network/management.rs b/example-raft-kv/src/network/management.rs new file mode 100644 index 000000000..442ccebf4 --- /dev/null +++ b/example-raft-kv/src/network/management.rs @@ -0,0 +1,58 @@ +use std::collections::BTreeSet; + +use actix_web::get; +use actix_web::post; +use actix_web::web; +use actix_web::web::Data; +use actix_web::Responder; +use openraft::NodeId; +use web::Json; + +use crate::app::ExampleApp; + +// --- Cluster management + +/// Add a node as **Learner**. +/// +/// A Learner receives log replication from the leader but does not vote. +/// This should be done before adding a node as a member into the cluster +/// (by calling `change-membership`) +#[post("/add-learner")] +pub async fn add_learner(app: Data, req: Json) -> actix_web::Result { + let response = app.raft.add_learner(req.0, true).await; + Ok(Json(response)) +} + +/// Changes specified learners to members, or remove members. +#[post("/change-membership")] +pub async fn change_membership( + app: Data, + req: Json>, +) -> actix_web::Result { + let response = app.raft.change_membership(req.0, true).await; + Ok(Json(response)) +} + +/// Initialize a single-node cluster. +#[post("/init")] +pub async fn init(app: Data) -> actix_web::Result { + let mut nodes = BTreeSet::new(); + nodes.insert(app.id); + let response = app.raft.initialize(nodes).await; + Ok(Json(response)) +} + +/// Get the latest metrics of the cluster +#[get("/metrics")] +pub async fn metrics(app: Data) -> actix_web::Result { + let response = app.raft.metrics().borrow().clone(); + Ok(Json(response)) +} + +/// List known nodes of the cluster. +#[get("/list-nodes")] +pub async fn list_nodes(app: Data) -> actix_web::Result { + let state_machine = app.store.state_machine.read().await; + let response = state_machine.nodes.clone(); + Ok(Json(response)) +} diff --git a/example-raft-kv/src/network/mod.rs b/example-raft-kv/src/network/mod.rs new file mode 100644 index 000000000..62300907b --- /dev/null +++ b/example-raft-kv/src/network/mod.rs @@ -0,0 +1,4 @@ +pub mod api; +pub mod management; +pub mod raft; +pub mod rpc; diff --git a/example-raft-kv/src/network/raft.rs b/example-raft-kv/src/network/raft.rs new file mode 100644 index 000000000..f10c1e914 --- /dev/null +++ b/example-raft-kv/src/network/raft.rs @@ -0,0 +1,34 @@ +use actix_web::post; +use actix_web::web; +use actix_web::web::Data; +use actix_web::Responder; +use openraft::raft::AppendEntriesRequest; +use openraft::raft::InstallSnapshotRequest; +use openraft::raft::VoteRequest; +use web::Json; + +use crate::app::ExampleApp; +use crate::store::ExampleRequest; + +// --- Raft communication + +#[post("/raft-vote")] +pub async fn vote(app: Data, req: Json) -> actix_web::Result { + let res = app.raft.vote(req.0).await; + Ok(Json(res)) +} + +#[post("/raft-append")] +pub async fn append( + app: Data, + req: Json>, +) -> actix_web::Result { + let res = app.raft.append_entries(req.0).await; + Ok(Json(res)) +} + +#[post("/raft-snapshot")] +pub async fn snapshot(app: Data, req: Json) -> actix_web::Result { + let res = app.raft.install_snapshot(req.0).await; + Ok(Json(res)) +} diff --git a/example-raft-kv/src/network/rpc.rs b/example-raft-kv/src/network/rpc.rs new file mode 100644 index 000000000..537fe22b9 --- /dev/null +++ b/example-raft-kv/src/network/rpc.rs @@ -0,0 +1,72 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use openraft::error::AppendEntriesError; +use openraft::error::InstallSnapshotError; +use openraft::error::NetworkError; +use openraft::error::RPCError; +use openraft::error::RemoteError; +use openraft::error::VoteError; +use openraft::raft::AppendEntriesRequest; +use openraft::raft::AppendEntriesResponse; +use openraft::raft::InstallSnapshotRequest; +use openraft::raft::InstallSnapshotResponse; +use openraft::raft::VoteRequest; +use openraft::raft::VoteResponse; +use openraft::NodeId; +use openraft::RaftNetwork; +use serde::de::DeserializeOwned; +use serde::Serialize; + +use crate::store::ExampleRequest; +use crate::ExampleStore; + +pub struct ExampleNetwork { + pub store: Arc, +} + +impl ExampleNetwork { + pub async fn send_rpc(&self, target: NodeId, uri: &str, req: Req) -> Result> + where + Req: Serialize, + Err: std::error::Error + DeserializeOwned, + Resp: DeserializeOwned, + { + let addr = { + let state_machine = self.store.state_machine.read().await; + state_machine.nodes.get(&target).unwrap().clone() + }; + + let url = format!("http://{}/{}", addr, uri); + let client = reqwest::Client::new(); + + let resp = client.post(url).json(&req).send().await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + + let res: Result = resp.json().await.map_err(|e| RPCError::Network(NetworkError::new(&e)))?; + + res.map_err(|e| RPCError::RemoteError(RemoteError::new(target, e))) + } +} + +#[async_trait] +impl RaftNetwork for ExampleNetwork { + async fn send_append_entries( + &self, + target: NodeId, + req: AppendEntriesRequest, + ) -> Result> { + self.send_rpc(target, "raft-append", req).await + } + + async fn send_install_snapshot( + &self, + target: NodeId, + req: InstallSnapshotRequest, + ) -> Result> { + self.send_rpc(target, "raft-snapshot", req).await + } + + async fn send_vote(&self, target: NodeId, req: VoteRequest) -> Result> { + self.send_rpc(target, "raft-vote", req).await + } +} diff --git a/example-raft-kv/src/store/mod.rs b/example-raft-kv/src/store/mod.rs new file mode 100644 index 000000000..b2a08795c --- /dev/null +++ b/example-raft-kv/src/store/mod.rs @@ -0,0 +1,348 @@ +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::io::Cursor; +use std::ops::RangeBounds; +use std::sync::Arc; +use std::sync::Mutex; + +use anyerror::AnyError; +use openraft::async_trait::async_trait; +use openraft::raft::Entry; +use openraft::raft::EntryPayload; +use openraft::storage::LogState; +use openraft::storage::Snapshot; +use openraft::AppData; +use openraft::AppDataResponse; +use openraft::EffectiveMembership; +use openraft::ErrorSubject; +use openraft::ErrorVerb; +use openraft::LogId; +use openraft::NodeId; +use openraft::RaftStorage; +use openraft::SnapshotMeta; +use openraft::StateMachineChanges; +use openraft::StorageError; +use openraft::StorageIOError; +use openraft::Vote; +use serde::Deserialize; +use serde::Serialize; +use tokio::sync::RwLock; + +/** + * Here you will set the types of request that will interact with the raft nodes. + * For example the `Set` will be used to write data (key and value) to the raft database. + * The `AddNode` will append a new node to the current existing shared list of nodes. + * You will want to add any request that can write data in all nodes here. + */ +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum ExampleRequest { + Set { key: String, value: String }, + AddNode { id: NodeId, addr: String }, +} + +// Inform to raft that `ExampleRequest` is an application data to be used by raft. +impl AppData for ExampleRequest {} + +/** + * Here you will defined what type of answer you expect from reading the data of a node. + * In this example it will return a optional value from a given key in + * the `ExampleRequest.Set`. + * + * TODO: SHould we explain how to create multiple `AppDataResponse`? + * + */ +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ExampleResponse { + pub value: Option, +} + +impl AppDataResponse for ExampleResponse {} + +#[derive(Debug)] +pub struct ExampleSnapshot { + pub meta: SnapshotMeta, + + /// The data of the state machine at the time of this snapshot. + pub data: Vec, +} + +/** + * Here defines a state machine of the raft, this state represents a copy of the data + * between each node. Note that we are using `serde` to serialize the `data`, which has + * a implementation to be serialized. Note that for this test we set both the key and + * value as String, but you could set any type of value that has the serialization impl. + */ +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct ExampleStateMachine { + pub last_applied_log: Option, + + pub last_membership: Option, + + /// Node addresses in this cluster. + pub nodes: BTreeMap, + + /// Application data. + pub data: BTreeMap, +} + +#[derive(Debug, Default)] +pub struct ExampleStore { + last_purged_log_id: RwLock>, + + /// The Raft log. + log: RwLock>>, + + /// The Raft state machine. + pub state_machine: RwLock, + + /// The current granted vote. + vote: RwLock>, + + snapshot_idx: Arc>, + + current_snapshot: RwLock>, +} + +#[async_trait] +impl RaftStorage for ExampleStore { + type SnapshotData = Cursor>; + + #[tracing::instrument(level = "trace", skip(self))] + async fn save_vote(&self, vote: &Vote) -> Result<(), StorageError> { + // TODO: What `h` stands for? + let mut h = self.vote.write().await; + *h = Some(*vote); + Ok(()) + } + + async fn read_vote(&self) -> Result, StorageError> { + Ok(*self.vote.read().await) + } + + async fn get_log_state(&self) -> Result { + let log = self.log.read().await; + let last = log.iter().rev().next().map(|(_, ent)| ent.log_id); + + let last_purged = *self.last_purged_log_id.read().await; + + let last = match last { + None => last_purged, + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id: last_purged, + last_log_id: last, + }) + } + + async fn try_get_log_entries + Clone + Debug + Send + Sync>( + &self, + range: RB, + ) -> Result>, StorageError> { + let log = self.log.read().await; + let response = log.range(range.clone()).map(|(_, val)| val.clone()).collect::>(); + Ok(response) + } + + #[tracing::instrument(level = "trace", skip(self, entries))] + async fn append_to_log(&self, entries: &[&Entry]) -> Result<(), StorageError> { + let mut log = self.log.write().await; + for entry in entries { + log.insert(entry.log_id.index, (*entry).clone()); + } + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn delete_conflict_logs_since(&self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: [{:?}, +oo)", log_id); + + let mut log = self.log.write().await; + let keys = log.range(log_id.index..).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + + Ok(()) + } + + #[tracing::instrument(level = "debug", skip(self))] + async fn purge_logs_upto(&self, log_id: LogId) -> Result<(), StorageError> { + tracing::debug!("delete_log: [{:?}, +oo)", log_id); + + { + let mut ld = self.last_purged_log_id.write().await; + assert!(*ld <= Some(log_id)); + *ld = Some(log_id); + } + + { + let mut log = self.log.write().await; + + let keys = log.range(..=log_id.index).map(|(k, _v)| *k).collect::>(); + for key in keys { + log.remove(&key); + } + } + + Ok(()) + } + + async fn last_applied_state(&self) -> Result<(Option, Option), StorageError> { + let state_machine = self.state_machine.read().await; + Ok((state_machine.last_applied_log, state_machine.last_membership.clone())) + } + + #[tracing::instrument(level = "trace", skip(self, entries))] + async fn apply_to_state_machine( + &self, + entries: &[&Entry], + ) -> Result, StorageError> { + let mut res = Vec::with_capacity(entries.len()); + + let mut sm = self.state_machine.write().await; + + for entry in entries { + tracing::debug!(%entry.log_id, "replicate to sm"); + + sm.last_applied_log = Some(entry.log_id); + + match entry.payload { + EntryPayload::Blank => res.push(ExampleResponse { value: None }), + EntryPayload::Normal(ref req) => match req { + ExampleRequest::Set { key, value } => { + sm.data.insert(key.clone(), value.clone()); + res.push(ExampleResponse { + value: Some(value.clone()), + }) + } + ExampleRequest::AddNode { id, addr } => { + sm.nodes.insert(*id, addr.clone()); + res.push(ExampleResponse { + value: Some(format!("cluster: {:?}", sm.nodes)), + }) + } + }, + EntryPayload::Membership(ref mem) => { + sm.last_membership = Some(EffectiveMembership { + log_id: entry.log_id, + membership: mem.clone(), + }); + res.push(ExampleResponse { value: None }) + } + }; + } + Ok(res) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn build_snapshot(&self) -> Result, StorageError> { + let (data, last_applied_log); + + { + // Serialize the data of the state machine. + let state_machine = self.state_machine.read().await; + data = serde_json::to_vec(&*state_machine) + .map_err(|e| StorageIOError::new(ErrorSubject::StateMachine, ErrorVerb::Read, AnyError::new(&e)))?; + + last_applied_log = state_machine.last_applied_log; + } + + let last_applied_log = match last_applied_log { + None => { + panic!("can not compact empty state machine"); + } + Some(x) => x, + }; + + let snapshot_idx = { + let mut l = self.snapshot_idx.lock().unwrap(); + *l += 1; + *l + }; + + let snapshot_id = format!( + "{}-{}-{}", + last_applied_log.leader_id, last_applied_log.index, snapshot_idx + ); + + let meta = SnapshotMeta { + last_log_id: last_applied_log, + snapshot_id, + }; + + let snapshot = ExampleSnapshot { + meta: meta.clone(), + data: data.clone(), + }; + + { + let mut current_snapshot = self.current_snapshot.write().await; + *current_snapshot = Some(snapshot); + } + + Ok(Snapshot { + meta, + snapshot: Box::new(Cursor::new(data)), + }) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn begin_receiving_snapshot(&self) -> Result, StorageError> { + Ok(Box::new(Cursor::new(Vec::new()))) + } + + #[tracing::instrument(level = "trace", skip(self, snapshot))] + async fn install_snapshot( + &self, + meta: &SnapshotMeta, + snapshot: Box, + ) -> Result { + tracing::info!( + { snapshot_size = snapshot.get_ref().len() }, + "decoding snapshot for installation" + ); + + let new_snapshot = ExampleSnapshot { + meta: meta.clone(), + data: snapshot.into_inner(), + }; + + // Update the state machine. + { + let updated_state_machine: ExampleStateMachine = + serde_json::from_slice(&new_snapshot.data).map_err(|e| { + StorageIOError::new( + ErrorSubject::Snapshot(new_snapshot.meta.clone()), + ErrorVerb::Read, + AnyError::new(&e), + ) + })?; + let mut state_machine = self.state_machine.write().await; + *state_machine = updated_state_machine; + } + + // Update current snapshot. + let mut current_snapshot = self.current_snapshot.write().await; + *current_snapshot = Some(new_snapshot); + Ok(StateMachineChanges { + last_applied: meta.last_log_id, + is_snapshot: true, + }) + } + + #[tracing::instrument(level = "trace", skip(self))] + async fn get_current_snapshot(&self) -> Result>, StorageError> { + match &*self.current_snapshot.read().await { + Some(snapshot) => { + let data = snapshot.data.clone(); + Ok(Some(Snapshot { + meta: snapshot.meta.clone(), + snapshot: Box::new(Cursor::new(data)), + })) + } + None => Ok(None), + } + } +} diff --git a/example-raft-kv/test-cluster.sh b/example-raft-kv/test-cluster.sh new file mode 100755 index 000000000..3bb916c1b --- /dev/null +++ b/example-raft-kv/test-cluster.sh @@ -0,0 +1,163 @@ +#!/bin/sh + +set -o errexit + +cargo build + +kill() { + if [ "$(uname)" == "Darwin" ]; then + SERVICE='raft-key-value' + if pgrep -xq -- "${SERVICE}"; then + pkill -f "${SERVICE}" + fi + else + killall raft-key-value + fi +} + +rpc() { + local uri=$1 + local body="$2" + + echo '---'" rpc(:$uri, $body)" + + { + if [ ".$body" == "." ]; then + curl --silent "127.0.0.1:$uri" + else + curl --silent "127.0.0.1:$uri" -H "Content-Type: application/json" -d "$body" + fi + } | { + if which -s jq; then + jq + else + cat + fi + } + + echo + echo +} + +export RUST_LOG=debug + +echo "Killing all running raft-key-value" + +kill + +sleep 1 + +echo "Start 3 uninitialized raft-key-value servers..." + +nohup ./target/debug/raft-key-value --id 1 --http-addr 127.0.0.1:21001 > n1.log & +sleep 1 +echo "Server 1 started" + +nohup ./target/debug/raft-key-value --id 2 --http-addr 127.0.0.1:21002 > n2.log & +sleep 1 +echo "Server 2 started" + +nohup ./target/debug/raft-key-value --id 3 --http-addr 127.0.0.1:21003 > n3.log & +sleep 1 +echo "Server 3 started" +sleep 1 + +echo "Initialize server 1 as a single-node cluster" +sleep 2 +echo +rpc 21001/init '{}' + +echo "Server 1 is a leader now" + +sleep 2 + +echo "Get metrics from the leader" +sleep 2 +echo +rpc 21001/metrics +sleep 1 + +echo "Add 3 node addresses so that RaftNetwork is able to find peers by the ID" +echo +sleep 2 +echo "Adding node 1" +echo +rpc 21001/write '{"AddNode":{"id":1,"addr":"127.0.0.1:21001"}}' +sleep 2 +echo "Node 1 added" +echo +sleep 2 +echo "Adding node 2" +echo +rpc 21001/write '{"AddNode":{"id":2,"addr":"127.0.0.1:21002"}}' +sleep 2 +echo "Node 2 added" +echo +sleep 2 +echo "Adding node 3" +echo +rpc 21001/write '{"AddNode":{"id":3,"addr":"127.0.0.1:21003"}}' +sleep 2 +echo "Node 3 added" +echo + +sleep 2 +echo "Listing all known nodes in the clusters..." + +echo +rpc 21001/list-nodes + +sleep 1 + +echo "Adding node 2 and node 3 as learners, to receive log from leader node 1" + +sleep 1 +echo +rpc 21001/add-learner '2' +echo "Node 2 added as leaner" +sleep 1 +echo +rpc 21001/add-learner '3' +echo "Node 3 added as leaner" +sleep 1 + +echo "Changing membership from [1] to 3 nodes cluster: [1, 2, 3]" +echo +rpc 21001/change-membership '[1, 2, 3]' +sleep 1 +echo "Membership changed" +sleep 1 + +echo "Get metrics from the leader again" +sleep 1 +echo +rpc 21001/metrics +sleep 1 + +echo "Write data on leader" +sleep 1 +echo +rpc 21001/write '{"Set":{"key":"foo","value":"bar"}}' +sleep 1 +echo "Data written" +sleep 1 + +echo "Read on every node, including the leader" +sleep 1 +echo "Read from node 1" +echo +rpc 21001/read '"foo"' +echo "Read from node 2" +echo +rpc 21002/read '"foo"' +echo "Read from node 3" +echo +rpc 21003/read '"foo"' + +echo "Killing all nodes in 3s..." +sleep 1 +echo "Killing all nodes in 2s..." +sleep 1 +echo "Killing all nodes in 1s..." +sleep 1 +kill \ No newline at end of file diff --git a/openraft/src/error.rs b/openraft/src/error.rs index 4fda7d496..4e4592fe2 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -17,7 +17,7 @@ use crate::StorageError; use crate::Vote; /// Fatal is unrecoverable and shuts down raft at once. -#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)] pub enum Fatal { #[error(transparent)] StorageError(#[from] StorageError), @@ -50,19 +50,19 @@ where E: TryInto + Clone } } -#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] +#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)] pub enum AppendEntriesError { #[error(transparent)] Fatal(#[from] Fatal), } -#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] +#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)] pub enum VoteError { #[error(transparent)] Fatal(#[from] Fatal), } -#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] +#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)] pub enum InstallSnapshotError { #[error(transparent)] SnapshotMismatch(#[from] SnapshotMismatch), @@ -72,7 +72,7 @@ pub enum InstallSnapshotError { } /// An error related to a client read request. -#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] +#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)] pub enum ClientReadError { #[error(transparent)] ForwardToLeader(#[from] ForwardToLeader), @@ -85,7 +85,7 @@ pub enum ClientReadError { } /// An error related to a client write request. -#[derive(Debug, Clone, thiserror::Error, derive_more::TryInto)] +#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error, derive_more::TryInto)] pub enum ClientWriteError { #[error(transparent)] ForwardToLeader(#[from] ForwardToLeader), diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index 309878e07..55c0d8585 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -398,7 +398,7 @@ impl, S: RaftStorage> Cl pub(crate) type RaftRespTx = oneshot::Sender>; pub(crate) type RaftRespRx = oneshot::Receiver>; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct AddLearnerResponse { pub matched: Option, }