Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "map-api"
description = "Raft state machine"
version = "0.2.1"
version = "0.2.2"
authors = ["Databend Authors <opensource@datafuselabs.com>"]
license = "Apache-2.0"
edition = "2021"
Expand All @@ -10,18 +10,21 @@ edition = "2021"

[dependencies]
async-trait = { version = "0.1.77" }
deepsize = { version = "0.2.0" }
futures = "0.3.24"
futures-util = "0.3.24"
log = { version = "0.4.21", features = ["serde", "kv_unstable_std"] }
serde = { version = "1.0.164", features = ["derive", "rc"] }
stream-more = { version = "0.1.3" }
thiserror = { version = "1" }

[dev-dependencies]
anyhow = { version = "1.0.65" }
async-trait = { version = "0.1.77" }
pretty_assertions = { version = "1.3.0" }
serde_json = { version = "1.0.85" }
tempfile = { version = "3.4.0" }
tokio = { version = "1.35.0", features = ["full"] }
async-trait = { version = "0.1.77" }

[[example]]
name = "basic_usage"
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub mod map_api_ro;
pub mod map_key;
pub mod map_value;
pub mod marked;
pub mod match_seq;
pub mod seq_value;
pub mod util;

Expand Down
24 changes: 24 additions & 0 deletions src/match_seq/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::Deserialize;
use serde::Serialize;

use crate::match_seq::MatchSeq;

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum ConflictSeq {
#[error("ConflictSeq: Want: {want}, Got: {got}")]
NotMatch { want: MatchSeq, got: u64 },
}
93 changes: 93 additions & 0 deletions src/match_seq/match_seq.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use serde::Deserialize;
use serde::Serialize;

/// Specifies the sequence number condition that an operation must satisfy to take effect.
///
/// In distributed systems, each value stored in the system has an associated sequence number
/// (`seq`) that represents its version. `MatchSeq` provides a way to express conditional
/// operations based on these sequence numbers:
///
/// - Match any sequence number (unconditional operation)
/// - Match an exact sequence number (compare-and-swap operations)
/// - Match sequence numbers greater than or equal to a value (update existing entries)
///
/// This is essential for implementing optimistic concurrency control and ensuring
/// consistency in distributed environments.
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq, deepsize::DeepSizeOf)]
pub enum MatchSeq {
// TODO(xp): remove Any, it is equivalent to GE(0)
/// Any value is acceptable, i.e. does not check seq at all.
Any,

/// To match an exact value of seq.
///
/// E.g., CAS updates the exact version of some value,
/// and put-if-absent adds a value only when seq is 0.
Exact(u64),

/// To match a seq that is greater-or-equal some value.
///
/// E.g., GE(1) perform an update on any existent value.
GE(u64),
}

impl fmt::Display for MatchSeq {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
MatchSeq::Any => {
write!(f, "is any value")
}
MatchSeq::Exact(s) => {
write!(f, "== {}", s)
}
MatchSeq::GE(s) => {
write!(f, ">= {}", s)
}
}
}
}

#[cfg(test)]
mod tests {
use crate::match_seq::MatchSeq;

#[derive(serde::Serialize)]
struct Foo {
f: MatchSeq,
}

#[test]
fn test_match_seq_serde() -> anyhow::Result<()> {
//

let t = Foo { f: MatchSeq::Any };
let s = serde_json::to_string(&t)?;
println!("{s}");

Ok(())
}

#[test]
fn test_match_seq_display() -> anyhow::Result<()> {
assert_eq!("== 3", MatchSeq::Exact(3).to_string());
assert_eq!(">= 3", MatchSeq::GE(3).to_string());

Ok(())
}
}
96 changes: 96 additions & 0 deletions src/match_seq/match_seq_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::match_seq::errors::ConflictSeq;

/// Check if the sequence number satisfies the condition.
pub trait MatchSeqExt<T> {
/// Match against a some value containing seq by checking if the seq satisfies the condition.
fn match_seq(&self, sv: &T) -> Result<(), ConflictSeq>;
}

#[cfg(test)]
mod tests {
use crate::match_seq::errors::ConflictSeq;
use crate::match_seq::MatchSeq;
use crate::match_seq::MatchSeqExt;

type SeqV = crate::seq_value::SeqV<u64, u64>;

#[test]
fn test_match_seq_match_seq_value() -> anyhow::Result<()> {
assert_eq!(MatchSeq::GE(0).match_seq(&Some(SeqV::new(0, 1))), Ok(()));
assert_eq!(MatchSeq::GE(0).match_seq(&Some(SeqV::new(1, 1))), Ok(()));

//

assert_eq!(
MatchSeq::Exact(3).match_seq(&None::<SeqV>),
Err(ConflictSeq::NotMatch {
want: MatchSeq::Exact(3),
got: 0
})
);
assert_eq!(
MatchSeq::Exact(3).match_seq(&Some(SeqV::new(0, 1))),
Err(ConflictSeq::NotMatch {
want: MatchSeq::Exact(3),
got: 0
})
);
assert_eq!(
MatchSeq::Exact(3).match_seq(&Some(SeqV::new(2, 1))),
Err(ConflictSeq::NotMatch {
want: MatchSeq::Exact(3),
got: 2
})
);
assert_eq!(MatchSeq::Exact(3).match_seq(&Some(SeqV::new(3, 1))), Ok(()));
assert_eq!(
MatchSeq::Exact(3).match_seq(&Some(SeqV::new(4, 1))),
Err(ConflictSeq::NotMatch {
want: MatchSeq::Exact(3),
got: 4
})
);

//

assert_eq!(
MatchSeq::GE(3).match_seq(&None::<SeqV>),
Err(ConflictSeq::NotMatch {
want: MatchSeq::GE(3),
got: 0
})
);
assert_eq!(
MatchSeq::GE(3).match_seq(&Some(SeqV::new(0, 1))),
Err(ConflictSeq::NotMatch {
want: MatchSeq::GE(3),
got: 0
})
);
assert_eq!(
MatchSeq::GE(3).match_seq(&Some(SeqV::new(2, 1))),
Err(ConflictSeq::NotMatch {
want: MatchSeq::GE(3),
got: 2
})
);
assert_eq!(MatchSeq::GE(3).match_seq(&Some(SeqV::new(3, 1))), Ok(()));
assert_eq!(MatchSeq::GE(3).match_seq(&Some(SeqV::new(4, 1))), Ok(()));

Ok(())
}
}
51 changes: 51 additions & 0 deletions src/match_seq/match_seq_ext_impls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::match_seq::errors::ConflictSeq;
use crate::match_seq::MatchSeq;
use crate::match_seq::MatchSeqExt;
use crate::seq_value::SeqV;

impl MatchSeqExt<u64> for MatchSeq {
fn match_seq(&self, seq: &u64) -> Result<(), ConflictSeq> {
match self {
MatchSeq::Any => Ok(()),
MatchSeq::Exact(s) if seq == s => Ok(()),
MatchSeq::GE(s) if seq >= s => Ok(()),
_ => Err(ConflictSeq::NotMatch {
want: *self,
got: *seq,
}),
}
}
}

impl<M, T> MatchSeqExt<SeqV<M, T>> for MatchSeq {
fn match_seq(&self, sv: &SeqV<M, T>) -> Result<(), ConflictSeq> {
self.match_seq(&sv.seq)
}
}

impl<M, T> MatchSeqExt<Option<&SeqV<M, T>>> for MatchSeq {
fn match_seq(&self, sv: &Option<&SeqV<M, T>>) -> Result<(), ConflictSeq> {
let seq = sv.map_or(0, |sv| sv.seq);
self.match_seq(&seq)
}
}

impl<M, T> MatchSeqExt<Option<SeqV<M, T>>> for MatchSeq {
fn match_seq(&self, sv: &Option<SeqV<M, T>>) -> Result<(), ConflictSeq> {
self.match_seq(&sv.as_ref())
}
}
23 changes: 23 additions & 0 deletions src/match_seq/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod errors;

#[allow(clippy::module_inception)]
mod match_seq;
mod match_seq_ext;
mod match_seq_ext_impls;

pub use match_seq::MatchSeq;
pub use match_seq_ext::MatchSeqExt;
Loading