From e5366fed67147fcbd5a7c10ceb57ade1e6e1f502 Mon Sep 17 00:00:00 2001 From: Zhang Yanpo Date: Sun, 16 Mar 2025 12:04:46 +0800 Subject: [PATCH 1/2] feat: add `MatchSeq` to assert seq `MatchSeq` specifies the sequence number condition that an operation must satisfy to take effect. In distributed systems, each value stored in the system has an associated sequence number (`seq`) that represents its version. `MatchSeq` provides a way to express conditional operations based on these sequence numbers: - Match any sequence number (unconditional operation) - Match an exact sequence number (compare-and-swap operations) - Match sequence numbers greater than or equal to a value (update existing entries) This is essential for implementing optimistic concurrency control and ensuring consistency in distributed environments. --- Cargo.toml | 5 +- src/lib.rs | 1 + src/match_seq/errors.rs | 24 +++++++ src/match_seq/match_seq.rs | 93 +++++++++++++++++++++++++++ src/match_seq/match_seq_ext.rs | 96 ++++++++++++++++++++++++++++ src/match_seq/match_seq_ext_impls.rs | 51 +++++++++++++++ src/match_seq/mod.rs | 23 +++++++ 7 files changed, 292 insertions(+), 1 deletion(-) create mode 100644 src/match_seq/errors.rs create mode 100644 src/match_seq/match_seq.rs create mode 100644 src/match_seq/match_seq_ext.rs create mode 100644 src/match_seq/match_seq_ext_impls.rs create mode 100644 src/match_seq/mod.rs diff --git a/Cargo.toml b/Cargo.toml index baa61eb..cc68b52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,18 +10,21 @@ edition = "2021" [dependencies] async-trait = { version = "0.1.77" } +deepsize = { version = "0.2.0" } futures = "0.3.24" futures-util = "0.3.24" log = { version = "0.4.21", features = ["serde", "kv_unstable_std"] } serde = { version = "1.0.164", features = ["derive", "rc"] } stream-more = { version = "0.1.3" } +thiserror = { version = "1" } [dev-dependencies] anyhow = { version = "1.0.65" } +async-trait = { version = "0.1.77" } pretty_assertions = { version = "1.3.0" } +serde_json = { version = "1.0.85" } tempfile = { version = "3.4.0" } tokio = { version = "1.35.0", features = ["full"] } -async-trait = { version = "0.1.77" } [[example]] name = "basic_usage" diff --git a/src/lib.rs b/src/lib.rs index 46da5cc..4c748d8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,6 +74,7 @@ pub mod map_api_ro; pub mod map_key; pub mod map_value; pub mod marked; +pub mod match_seq; pub mod seq_value; pub mod util; diff --git a/src/match_seq/errors.rs b/src/match_seq/errors.rs new file mode 100644 index 0000000..a0e7aad --- /dev/null +++ b/src/match_seq/errors.rs @@ -0,0 +1,24 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::Deserialize; +use serde::Serialize; + +use crate::match_seq::MatchSeq; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum ConflictSeq { + #[error("ConflictSeq: Want: {want}, Got: {got}")] + NotMatch { want: MatchSeq, got: u64 }, +} diff --git a/src/match_seq/match_seq.rs b/src/match_seq/match_seq.rs new file mode 100644 index 0000000..ced05c1 --- /dev/null +++ b/src/match_seq/match_seq.rs @@ -0,0 +1,93 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; + +use serde::Deserialize; +use serde::Serialize; + +/// Specifies the sequence number condition that an operation must satisfy to take effect. +/// +/// In distributed systems, each value stored in the system has an associated sequence number +/// (`seq`) that represents its version. `MatchSeq` provides a way to express conditional +/// operations based on these sequence numbers: +/// +/// - Match any sequence number (unconditional operation) +/// - Match an exact sequence number (compare-and-swap operations) +/// - Match sequence numbers greater than or equal to a value (update existing entries) +/// +/// This is essential for implementing optimistic concurrency control and ensuring +/// consistency in distributed environments. +#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, deepsize::DeepSizeOf)] +pub enum MatchSeq { + // TODO(xp): remove Any, it is equivalent to GE(0) + /// Any value is acceptable, i.e. does not check seq at all. + Any, + + /// To match an exact value of seq. + /// + /// E.g., CAS updates the exact version of some value, + /// and put-if-absent adds a value only when seq is 0. + Exact(u64), + + /// To match a seq that is greater-or-equal some value. + /// + /// E.g., GE(1) perform an update on any existent value. + GE(u64), +} + +impl fmt::Display for MatchSeq { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + MatchSeq::Any => { + write!(f, "is any value") + } + MatchSeq::Exact(s) => { + write!(f, "== {}", s) + } + MatchSeq::GE(s) => { + write!(f, ">= {}", s) + } + } + } +} + +#[cfg(test)] +mod tests { + use crate::match_seq::MatchSeq; + + #[derive(serde::Serialize)] + struct Foo { + f: MatchSeq, + } + + #[test] + fn test_match_seq_serde() -> anyhow::Result<()> { + // + + let t = Foo { f: MatchSeq::Any }; + let s = serde_json::to_string(&t)?; + println!("{s}"); + + Ok(()) + } + + #[test] + fn test_match_seq_display() -> anyhow::Result<()> { + assert_eq!("== 3", MatchSeq::Exact(3).to_string()); + assert_eq!(">= 3", MatchSeq::GE(3).to_string()); + + Ok(()) + } +} diff --git a/src/match_seq/match_seq_ext.rs b/src/match_seq/match_seq_ext.rs new file mode 100644 index 0000000..6ed82b8 --- /dev/null +++ b/src/match_seq/match_seq_ext.rs @@ -0,0 +1,96 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::match_seq::errors::ConflictSeq; + +/// Check if the sequence number satisfies the condition. +pub trait MatchSeqExt { + /// Match against a some value containing seq by checking if the seq satisfies the condition. + fn match_seq(&self, sv: &T) -> Result<(), ConflictSeq>; +} + +#[cfg(test)] +mod tests { + use crate::match_seq::errors::ConflictSeq; + use crate::match_seq::MatchSeq; + use crate::match_seq::MatchSeqExt; + + type SeqV = crate::seq_value::SeqV; + + #[test] + fn test_match_seq_match_seq_value() -> anyhow::Result<()> { + assert_eq!(MatchSeq::GE(0).match_seq(&Some(SeqV::new(0, 1))), Ok(())); + assert_eq!(MatchSeq::GE(0).match_seq(&Some(SeqV::new(1, 1))), Ok(())); + + // + + assert_eq!( + MatchSeq::Exact(3).match_seq(&None::), + Err(ConflictSeq::NotMatch { + want: MatchSeq::Exact(3), + got: 0 + }) + ); + assert_eq!( + MatchSeq::Exact(3).match_seq(&Some(SeqV::new(0, 1))), + Err(ConflictSeq::NotMatch { + want: MatchSeq::Exact(3), + got: 0 + }) + ); + assert_eq!( + MatchSeq::Exact(3).match_seq(&Some(SeqV::new(2, 1))), + Err(ConflictSeq::NotMatch { + want: MatchSeq::Exact(3), + got: 2 + }) + ); + assert_eq!(MatchSeq::Exact(3).match_seq(&Some(SeqV::new(3, 1))), Ok(())); + assert_eq!( + MatchSeq::Exact(3).match_seq(&Some(SeqV::new(4, 1))), + Err(ConflictSeq::NotMatch { + want: MatchSeq::Exact(3), + got: 4 + }) + ); + + // + + assert_eq!( + MatchSeq::GE(3).match_seq(&None::), + Err(ConflictSeq::NotMatch { + want: MatchSeq::GE(3), + got: 0 + }) + ); + assert_eq!( + MatchSeq::GE(3).match_seq(&Some(SeqV::new(0, 1))), + Err(ConflictSeq::NotMatch { + want: MatchSeq::GE(3), + got: 0 + }) + ); + assert_eq!( + MatchSeq::GE(3).match_seq(&Some(SeqV::new(2, 1))), + Err(ConflictSeq::NotMatch { + want: MatchSeq::GE(3), + got: 2 + }) + ); + assert_eq!(MatchSeq::GE(3).match_seq(&Some(SeqV::new(3, 1))), Ok(())); + assert_eq!(MatchSeq::GE(3).match_seq(&Some(SeqV::new(4, 1))), Ok(())); + + Ok(()) + } +} diff --git a/src/match_seq/match_seq_ext_impls.rs b/src/match_seq/match_seq_ext_impls.rs new file mode 100644 index 0000000..ac597ae --- /dev/null +++ b/src/match_seq/match_seq_ext_impls.rs @@ -0,0 +1,51 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::match_seq::errors::ConflictSeq; +use crate::match_seq::MatchSeq; +use crate::match_seq::MatchSeqExt; +use crate::seq_value::SeqV; + +impl MatchSeqExt for MatchSeq { + fn match_seq(&self, seq: &u64) -> Result<(), ConflictSeq> { + match self { + MatchSeq::Any => Ok(()), + MatchSeq::Exact(s) if seq == s => Ok(()), + MatchSeq::GE(s) if seq >= s => Ok(()), + _ => Err(ConflictSeq::NotMatch { + want: *self, + got: *seq, + }), + } + } +} + +impl MatchSeqExt> for MatchSeq { + fn match_seq(&self, sv: &SeqV) -> Result<(), ConflictSeq> { + self.match_seq(&sv.seq) + } +} + +impl MatchSeqExt>> for MatchSeq { + fn match_seq(&self, sv: &Option<&SeqV>) -> Result<(), ConflictSeq> { + let seq = sv.map_or(0, |sv| sv.seq); + self.match_seq(&seq) + } +} + +impl MatchSeqExt>> for MatchSeq { + fn match_seq(&self, sv: &Option>) -> Result<(), ConflictSeq> { + self.match_seq(&sv.as_ref()) + } +} diff --git a/src/match_seq/mod.rs b/src/match_seq/mod.rs new file mode 100644 index 0000000..66df130 --- /dev/null +++ b/src/match_seq/mod.rs @@ -0,0 +1,23 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod errors; + +#[allow(clippy::module_inception)] +mod match_seq; +mod match_seq_ext; +mod match_seq_ext_impls; + +pub use match_seq::MatchSeq; +pub use match_seq_ext::MatchSeqExt; From a1d933e4793d9d4018fa994922068491b5bf614a Mon Sep 17 00:00:00 2001 From: Zhang Yanpo Date: Sun, 16 Mar 2025 12:07:41 +0800 Subject: [PATCH 2/2] BumpVer: 0.2.2 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index cc68b52..72a0ad0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "map-api" description = "Raft state machine" -version = "0.2.1" +version = "0.2.2" authors = ["Databend Authors "] license = "Apache-2.0" edition = "2021"