Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ url = "2.5.4"
uuid = { version = "1.10.0", features = ["std", "serde", "v4", "v7"] }
volo-thrift = "0.10"
walkdir = "2.3.2"
watcher = { version = "0.2.0" }
watcher = { version = "0.4.0" }
wiremock = "0.6"
wkt = "0.11.1"
xorf = { version = "0.11.0", default-features = false, features = ["binary-fuse"] }
Expand Down Expand Up @@ -648,5 +648,5 @@ sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafus
tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" }
tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370", package = "tantivy-common" }
tantivy-jieba = { git = "https://github.com/datafuse-extras/tantivy-jieba", rev = "0e300e9" }
watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.2.0" }
watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.4.0" }
xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", tag = "databend-alpha.4" }
23 changes: 9 additions & 14 deletions src/meta/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,26 +79,21 @@ The following is an illustration of the latest query-meta compatibility:
`[, a.b.c)` denotes the range of versions from previous version(on the left column)(inclusive)
upto `a.b.c` (exclusive).

TODO: xx is the version in which semaphore is added. update it when merged.

| `Meta\Query` | 1.1.34) | [, 1.2.287) | [, 1.2.361) | [, xx) | [, +∞) |
|:-------------------|:--------|:------------|:------------|:-------|:-----------------|
| [0.8.30, 0.8.35) | | ❌ | ❌ | ❌ | ❌ |
| [0.8.35, 0.9.23) | | ❌ | ❌ | ❌ | ❌ |
| [0.9.23, 0.9.42) | | ❌ | ❌ | ❌ | ❌ |
| [0.9.42, 1.1.32) | | ❌ | ❌ | ❌ | ❌ |
| [1.1.32, 1.2.63) | | ✅ | ❌ | ❌ | ❌ |
| [1.2.63, 1.2.226) | | ✅ | ❌ | ❌ | ❌ |
| [1.2.226, 1.2.258) | | ✅ | ✅ | ❌ | ❌ |
| [1.2.258, 1.2.663) | | ✅ | ✅ | ✅ | ✅(no semaphore) |
| [1.2.663, 1.2.677) | | ❌ | ✅ | ✅ | ✅(no semaphore) |
| [1.2.677, +∞) | | ❌ | ✅ | ✅ | ✅ |

| `Meta\Query` | 1.2.287) | [, 1.2.361) | [, 1.2.715) | [, 1.2.726) | [, +∞) |
|:-------------------|:---------|:------------|:------------|:-----------------|:-------|
| [1.2.63, 1.2.226) | | ❌ | ❌ | ❌ | ❌ |
| [1.2.226, 1.2.258) | | ✅ | ❌ | ❌ | ❌ |
| [1.2.258, 1.2.663) | | ✅ | ✅ | ✅(no semaphore) | ❌ |
| [1.2.663, 1.2.677) | | ✅ | ✅ | ✅(no semaphore) | ❌ |
| [1.2.677, +∞) | | ✅ | ✅ | ✅ | ✅ |

History versions that are not included in the above chart:

- Query `[0.7.59, 0.8.80)` is compatible with Meta `[0.8.30, 0.9.23)`.
- Query `[0.8.80, 0.9.41)` is compatible with Meta `[0.8.35, 0.9.42)`.
- Query `[0.9.41, 1.1.34)` is compatible with Meta `[0.8.35, 1.2.663)`.
- Query `[1.1.34, 1.2.287)` is compatible with Meta `[1.1.32, 1.2.63)`.


## Compatibility between databend-meta
Expand Down
5 changes: 4 additions & 1 deletion src/meta/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,12 @@ pub static METACLI_COMMIT_SEMVER: LazyLock<Version> = LazyLock::new(|| {
/// 👥 client: semaphore(watch) requires `WatchRequest::initial_flush`(`1,2.677`),
/// other RPC does not require `1.2.677`, requires only `1.2.259`.
///
/// - 2025-04-15: since TODO: add version when merged.
/// - 2025-04-15: since 1.2.726
/// 👥 client: requires `1,2.677`.
///
/// - 2025-05-08: since TODO: add version when merged.
/// 🖥 server: add `WatchResponse::is_initialization`,
///
///
/// Server feature set:
/// ```yaml
Expand Down
2 changes: 1 addition & 1 deletion src/meta/semaphore/src/meta_event_subscriber/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ mod tests {
let prev = PermitEntry::new("a", 1);
let current = PermitEntry::new("b", 2);

let watch_response = WatchResponse::new3(
let watch_response = WatchResponse::new_change_event(
sem_key.format_key(),
Some(SeqV::new(1, prev.encode_to_vec()?)),
Some(SeqV::new(2, current.encode_to_vec()?)),
Expand Down
17 changes: 14 additions & 3 deletions src/meta/service/src/api/grpc/grpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ use tonic::Response;
use tonic::Status;
use tonic::Streaming;
use watcher::key_range::build_key_range;
use watcher::util::new_watch_sink;
use watcher::util::new_initialization_sink;
use watcher::util::try_forward;
use watcher::watch_stream::WatchStream;
use watcher::watch_stream::WatchStreamSender;
Expand Down Expand Up @@ -459,10 +459,21 @@ impl MetaService for MetaServiceImpl {
let ctx = "watch-Dispatcher";

if let Some(sender) = sm.event_sender() {
let snk = new_watch_sink::<WatchTypes>(tx, ctx);
let snk = new_initialization_sink::<WatchTypes>(tx.clone(), ctx);
let strm = sm.range_kv(key_range).await?;

let fu = try_forward(strm, snk, ctx);
let fu = async move {
try_forward(strm, snk, ctx).await;

// Send an empty message with `is_initialization=false` to indicate
// the end of the initialization flush.
tx.send(Ok(WatchResponse::new_initialization_complete()))
.await
.map_err(|e| {
error!("failed to send flush complete message: {}", e);
})
.ok();
};
let fu = Box::pin(fu);

info!(
Expand Down
10 changes: 8 additions & 2 deletions src/meta/service/src/meta_service/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use tonic::Status;
use watcher::dispatch::Command;
use watcher::dispatch::DispatcherHandle as GenericDispatcherHandle;
use watcher::type_config::KVChange;
use watcher::type_config::KeyOf;
use watcher::type_config::TypeConfig;
use watcher::type_config::ValueOf;

use crate::metrics::server_metrics;

Expand All @@ -42,8 +44,12 @@ impl TypeConfig for WatchTypes {
type Response = WatchResponse;
type Error = Status;

fn new_response(change: KVChange<Self>) -> Self::Response {
WatchResponse::new3(change.0, change.1, change.2)
fn new_initialize_response(key: KeyOf<Self>, value: ValueOf<Self>) -> Self::Response {
WatchResponse::new_initialization_event(key, value)
}

fn new_change_response(change: KVChange<Self>) -> Self::Response {
WatchResponse::new_change_event(change.0, change.1, change.2)
}

fn data_error(error: Error) -> Self::Error {
Expand Down
35 changes: 32 additions & 3 deletions src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ async fn test_watch() -> anyhow::Result<()> {

#[test(harness = meta_service_test_harness)]
#[fastrace::trace]
async fn test_watch_initial_flush() -> anyhow::Result<()> {
async fn test_watch_initialization_flush() -> anyhow::Result<()> {
let (tc, _addr) = crate::tests::start_metasrv().await?;
let updates = vec![
UpsertKV::update("a", b"a"),
Expand All @@ -369,11 +369,25 @@ async fn test_watch_initial_flush() -> anyhow::Result<()> {
};

let cache = Arc::new(Mutex::new(BTreeMap::new()));

let c = cache.clone();

let is_initialization_completed = Arc::new(Mutex::new(false));
let init_compl = is_initialization_completed.clone();

let flags = Arc::new(Mutex::new(vec![]));
let f = flags.clone();

let cache_updater = async move {
while let Ok(Some(resp)) = strm.message().await {
let event = resp.event.unwrap();
f.lock().unwrap().push(resp.is_initialization);

if resp.is_initialization_complete_flag() {
*init_compl.lock().unwrap() = true;
}

let Some(event) = resp.event else {
continue;
};

let mut cache = c.lock().unwrap();
if let Some(value) = event.current {
Expand All @@ -387,6 +401,14 @@ async fn test_watch_initial_flush() -> anyhow::Result<()> {
let _h = databend_common_base::runtime::spawn(cache_updater);

tokio::time::sleep(Duration::from_secs(1)).await;

assert_eq!(flags.lock().unwrap().clone(), vec![
true, true, true, true, // existent key-values
false, // initialization complete
]);

assert!(*is_initialization_completed.lock().unwrap());

let keys = {
let cache = cache.lock().unwrap();
cache.keys().cloned().collect::<Vec<_>>()
Expand All @@ -398,6 +420,13 @@ async fn test_watch_initial_flush() -> anyhow::Result<()> {
client.upsert_kv(UpsertKV::delete("c")).await?;

tokio::time::sleep(Duration::from_secs(1)).await;

assert_eq!(flags.lock().unwrap().clone(), vec![
true, true, true, true, // existent key-values
false, // initialization complete
false, false, // changes
]);

let values = {
let cache = cache.lock().unwrap();
cache
Expand Down
19 changes: 18 additions & 1 deletion src/meta/types/proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,24 @@ message Event {
optional SeqV prev = 3;
}

message WatchResponse {Event event = 1;}
// Response in a watch stream from server to the client.
message WatchResponse {
// The event containing key-value change information or a value emitted during initial-flush.
// This includes the key, current value (if any), and previous value (if any).
Event event = 1;

// Indicates whether this event is part of the initialization.
//
// When true:
// - The event represents an existing key-value record at the time the watch was established
// - These events are sent when initial_flush=true was specified in WatchRequest
//
// When false:
// - The event represents a real-time change that occurred after the watch was established
// - A special event with no key-value data may be sent to indicate the completion
// of the initial flush phase
bool is_initialization = 2;
}

// messages for txn
message TxnCondition {
Expand Down
28 changes: 23 additions & 5 deletions src/meta/types/src/proto_display/watch_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ impl fmt::Display for Event {

impl fmt::Display for WatchResponse {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.event.display())
let typ = if self.is_initialization {
"INIT"
} else {
"CHANGE"
};
write!(f, "{typ}:{}", self.event.display())
}
}

Expand Down Expand Up @@ -79,7 +84,7 @@ mod tests {

#[test]
fn test_watch_response_display() {
let watch_response = WatchResponse {
let mut watch_response = WatchResponse {
event: Some(Event {
key: "test_key".to_string(),
prev: Some(SeqV {
Expand All @@ -95,11 +100,24 @@ mod tests {
meta: None,
}),
}),
is_initialization: false,
};
assert_eq!(watch_response.to_string(), "CHANGE:(test_key: (seq=1 [expire=1970-01-01T00:16:40.000] 'test_prev') -> (seq=2 [] 'test_current'))");

watch_response.is_initialization = true;
assert_eq!(watch_response.to_string(), "INIT:(test_key: (seq=1 [expire=1970-01-01T00:16:40.000] 'test_prev') -> (seq=2 [] 'test_current'))");

let watch_response = WatchResponse {
event: None,
is_initialization: true,
};
assert_eq!(watch_response.to_string(), "(test_key: (seq=1 [expire=1970-01-01T00:16:40.000] 'test_prev') -> (seq=2 [] 'test_current'))");
assert_eq!(watch_response.to_string(), "INIT:None");

let watch_response = WatchResponse { event: None };
assert_eq!(watch_response.to_string(), "None");
let watch_response = WatchResponse {
event: None,
is_initialization: false,
};
assert_eq!(watch_response.to_string(), "CHANGE:None");
}

#[test]
Expand Down
48 changes: 44 additions & 4 deletions src/meta/types/src/proto_ext/watch_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,47 @@ impl WatchRequest {
}

impl WatchResponse {
/// Create a new `WatchResponse` with `key`, `prev` and `current` values.
pub fn new3(key: String, prev: Option<SeqV>, current: Option<SeqV>) -> Self {
/// Create a new `WatchResponse` that marks the end of initialization phase.
///
/// This response is used to signal that the initial-flush phase has completed,
/// and the watch stream can now proceed with real-time change notifications.
pub fn new_initialization_complete() -> Self {
WatchResponse {
event: None,
is_initialization: false,
}
}

/// Create a new `WatchResponse` for an initial key-value pair.
/// This represents the initial state of a key, not a change event.
///
/// The response will have `is_initialization` set to true to indicate
/// this is part of the initial data load rather than a real-time change.
pub fn new_initialization_event(key: String, value: SeqV) -> Self {
let ev = pb::Event {
key,
prev: None,
current: Some(pb::SeqV::from(value)),
};

WatchResponse {
event: Some(ev),
is_initialization: true,
}
}

/// Create a new `WatchResponse` for a key-value change event.
pub fn new_change_event(key: String, prev: Option<SeqV>, current: Option<SeqV>) -> Self {
let ev = pb::Event {
key,
prev: prev.map(pb::SeqV::from),
current: current.map(pb::SeqV::from),
};

WatchResponse { event: Some(ev) }
WatchResponse {
event: Some(ev),
is_initialization: false,
}
}

pub fn new(change: &Change<Vec<u8>, String>) -> Option<Self> {
Expand All @@ -73,7 +105,15 @@ impl WatchResponse {
current: change.result.clone().map(pb::SeqV::from),
};

Some(WatchResponse { event: Some(ev) })
Some(WatchResponse {
event: Some(ev),
is_initialization: false,
})
}

/// Check if the response is an empty indicator just to indicating initialization completion.
pub fn is_initialization_complete_flag(&self) -> bool {
!self.is_initialization && self.event.is_none()
}

pub fn unpack(self) -> Option<(String, Option<SeqV>, Option<SeqV>)> {
Expand Down
Loading