From 5046ab4e995f5cb8a09151d59225c50b84c04b1e Mon Sep 17 00:00:00 2001 From: Zhang Yanpo Date: Thu, 8 May 2025 13:24:44 +0800 Subject: [PATCH] feat(meta-service): add initialization complete flag for watch When a watch requires initialization flush, the server now sends a flag message indicating when the initialization phase is completed. With this flag, the `cache` module can determine whether data has been fully initialized and is ready to serve, improving reliability of the caching system. --- Cargo.lock | 4 +- Cargo.toml | 4 +- src/meta/README.md | 23 ++++----- src/meta/client/src/lib.rs | 5 +- .../src/meta_event_subscriber/processor.rs | 2 +- src/meta/service/src/api/grpc/grpc_service.rs | 17 +++++-- src/meta/service/src/meta_service/watcher.rs | 10 +++- .../tests/it/grpc/metasrv_grpc_watch.rs | 35 ++++++++++++-- src/meta/types/proto/meta.proto | 19 +++++++- .../types/src/proto_display/watch_display.rs | 28 +++++++++-- src/meta/types/src/proto_ext/watch_ext.rs | 48 +++++++++++++++++-- 11 files changed, 157 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c29e214a57fd3..2766b18af9356 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16202,8 +16202,8 @@ dependencies = [ [[package]] name = "watcher" -version = "0.2.0" -source = "git+https://github.com/databendlabs/watcher?tag=v0.2.0#7bff8b42604a95ddda90fe42f329f1b72a526f06" +version = "0.4.0" +source = "git+https://github.com/databendlabs/watcher?tag=v0.4.0#a550d7cb0ac809cf719d55b4c4984aafb78f88e8" dependencies = [ "futures", "log", diff --git a/Cargo.toml b/Cargo.toml index d2529a69cc4be..cbdcfabbbb9d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -524,7 +524,7 @@ url = "2.5.4" uuid = { version = "1.10.0", features = ["std", "serde", "v4", "v7"] } volo-thrift = "0.10" walkdir = "2.3.2" -watcher = { version = "0.2.0" } +watcher = { version = "0.4.0" } wiremock = "0.6" wkt = "0.11.1" xorf = { version = "0.11.0", default-features = false, features = ["binary-fuse"] } @@ -648,5 +648,5 @@ sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafus tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" } tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370", package = "tantivy-common" } tantivy-jieba = { git = "https://github.com/datafuse-extras/tantivy-jieba", rev = "0e300e9" } -watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.2.0" } +watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.4.0" } xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", tag = "databend-alpha.4" } diff --git a/src/meta/README.md b/src/meta/README.md index 0086b0d08abc1..0868c124b4ee7 100644 --- a/src/meta/README.md +++ b/src/meta/README.md @@ -79,26 +79,21 @@ The following is an illustration of the latest query-meta compatibility: `[, a.b.c)` denotes the range of versions from previous version(on the left column)(inclusive) upto `a.b.c` (exclusive). -TODO: xx is the version in which semaphore is added. update it when merged. - -| `Meta\Query` | 1.1.34) | [, 1.2.287) | [, 1.2.361) | [, xx) | [, +∞) | -|:-------------------|:--------|:------------|:------------|:-------|:-----------------| -| [0.8.30, 0.8.35) | | ❌ | ❌ | ❌ | ❌ | -| [0.8.35, 0.9.23) | | ❌ | ❌ | ❌ | ❌ | -| [0.9.23, 0.9.42) | | ❌ | ❌ | ❌ | ❌ | -| [0.9.42, 1.1.32) | | ❌ | ❌ | ❌ | ❌ | -| [1.1.32, 1.2.63) | | ✅ | ❌ | ❌ | ❌ | -| [1.2.63, 1.2.226) | | ✅ | ❌ | ❌ | ❌ | -| [1.2.226, 1.2.258) | | ✅ | ✅ | ❌ | ❌ | -| [1.2.258, 1.2.663) | | ✅ | ✅ | ✅ | ✅(no semaphore) | -| [1.2.663, 1.2.677) | | ❌ | ✅ | ✅ | ✅(no semaphore) | -| [1.2.677, +∞) | | ❌ | ✅ | ✅ | ✅ | + +| `Meta\Query` | 1.2.287) | [, 1.2.361) | [, 1.2.715) | [, 1.2.726) | [, +∞) | +|:-------------------|:---------|:------------|:------------|:-----------------|:-------| +| [1.2.63, 1.2.226) | | ❌ | ❌ | ❌ | ❌ | +| [1.2.226, 1.2.258) | | ✅ | ❌ | ❌ | ❌ | +| [1.2.258, 1.2.663) | | ✅ | ✅ | ✅(no semaphore) | ❌ | +| [1.2.663, 1.2.677) | | ✅ | ✅ | ✅(no semaphore) | ❌ | +| [1.2.677, +∞) | | ✅ | ✅ | ✅ | ✅ | History versions that are not included in the above chart: - Query `[0.7.59, 0.8.80)` is compatible with Meta `[0.8.30, 0.9.23)`. - Query `[0.8.80, 0.9.41)` is compatible with Meta `[0.8.35, 0.9.42)`. - Query `[0.9.41, 1.1.34)` is compatible with Meta `[0.8.35, 1.2.663)`. +- Query `[1.1.34, 1.2.287)` is compatible with Meta `[1.1.32, 1.2.63)`. ## Compatibility between databend-meta diff --git a/src/meta/client/src/lib.rs b/src/meta/client/src/lib.rs index 151761d62c8f0..910f77b8d356f 100644 --- a/src/meta/client/src/lib.rs +++ b/src/meta/client/src/lib.rs @@ -134,9 +134,12 @@ pub static METACLI_COMMIT_SEMVER: LazyLock = LazyLock::new(|| { /// 👥 client: semaphore(watch) requires `WatchRequest::initial_flush`(`1,2.677`), /// other RPC does not require `1.2.677`, requires only `1.2.259`. /// -/// - 2025-04-15: since TODO: add version when merged. +/// - 2025-04-15: since 1.2.726 /// 👥 client: requires `1,2.677`. /// +/// - 2025-05-08: since TODO: add version when merged. +/// 🖥 server: add `WatchResponse::is_initialization`, +/// /// /// Server feature set: /// ```yaml diff --git a/src/meta/semaphore/src/meta_event_subscriber/processor.rs b/src/meta/semaphore/src/meta_event_subscriber/processor.rs index 1f1f5d57948c4..b25dfc17cea19 100644 --- a/src/meta/semaphore/src/meta_event_subscriber/processor.rs +++ b/src/meta/semaphore/src/meta_event_subscriber/processor.rs @@ -225,7 +225,7 @@ mod tests { let prev = PermitEntry::new("a", 1); let current = PermitEntry::new("b", 2); - let watch_response = WatchResponse::new3( + let watch_response = WatchResponse::new_change_event( sem_key.format_key(), Some(SeqV::new(1, prev.encode_to_vec()?)), Some(SeqV::new(2, current.encode_to_vec()?)), diff --git a/src/meta/service/src/api/grpc/grpc_service.rs b/src/meta/service/src/api/grpc/grpc_service.rs index 8bc77f740a69c..2914861bb8c01 100644 --- a/src/meta/service/src/api/grpc/grpc_service.rs +++ b/src/meta/service/src/api/grpc/grpc_service.rs @@ -74,7 +74,7 @@ use tonic::Response; use tonic::Status; use tonic::Streaming; use watcher::key_range::build_key_range; -use watcher::util::new_watch_sink; +use watcher::util::new_initialization_sink; use watcher::util::try_forward; use watcher::watch_stream::WatchStream; use watcher::watch_stream::WatchStreamSender; @@ -459,10 +459,21 @@ impl MetaService for MetaServiceImpl { let ctx = "watch-Dispatcher"; if let Some(sender) = sm.event_sender() { - let snk = new_watch_sink::(tx, ctx); + let snk = new_initialization_sink::(tx.clone(), ctx); let strm = sm.range_kv(key_range).await?; - let fu = try_forward(strm, snk, ctx); + let fu = async move { + try_forward(strm, snk, ctx).await; + + // Send an empty message with `is_initialization=false` to indicate + // the end of the initialization flush. + tx.send(Ok(WatchResponse::new_initialization_complete())) + .await + .map_err(|e| { + error!("failed to send flush complete message: {}", e); + }) + .ok(); + }; let fu = Box::pin(fu); info!( diff --git a/src/meta/service/src/meta_service/watcher.rs b/src/meta/service/src/meta_service/watcher.rs index 1db39d240e860..8686a60dc6374 100644 --- a/src/meta/service/src/meta_service/watcher.rs +++ b/src/meta/service/src/meta_service/watcher.rs @@ -28,7 +28,9 @@ use tonic::Status; use watcher::dispatch::Command; use watcher::dispatch::DispatcherHandle as GenericDispatcherHandle; use watcher::type_config::KVChange; +use watcher::type_config::KeyOf; use watcher::type_config::TypeConfig; +use watcher::type_config::ValueOf; use crate::metrics::server_metrics; @@ -42,8 +44,12 @@ impl TypeConfig for WatchTypes { type Response = WatchResponse; type Error = Status; - fn new_response(change: KVChange) -> Self::Response { - WatchResponse::new3(change.0, change.1, change.2) + fn new_initialize_response(key: KeyOf, value: ValueOf) -> Self::Response { + WatchResponse::new_initialization_event(key, value) + } + + fn new_change_response(change: KVChange) -> Self::Response { + WatchResponse::new_change_event(change.0, change.1, change.2) } fn data_error(error: Error) -> Self::Error { diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs index cdc23b140fc3f..ffa95255dd673 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs @@ -343,7 +343,7 @@ async fn test_watch() -> anyhow::Result<()> { #[test(harness = meta_service_test_harness)] #[fastrace::trace] -async fn test_watch_initial_flush() -> anyhow::Result<()> { +async fn test_watch_initialization_flush() -> anyhow::Result<()> { let (tc, _addr) = crate::tests::start_metasrv().await?; let updates = vec![ UpsertKV::update("a", b"a"), @@ -369,11 +369,25 @@ async fn test_watch_initial_flush() -> anyhow::Result<()> { }; let cache = Arc::new(Mutex::new(BTreeMap::new())); - let c = cache.clone(); + + let is_initialization_completed = Arc::new(Mutex::new(false)); + let init_compl = is_initialization_completed.clone(); + + let flags = Arc::new(Mutex::new(vec![])); + let f = flags.clone(); + let cache_updater = async move { while let Ok(Some(resp)) = strm.message().await { - let event = resp.event.unwrap(); + f.lock().unwrap().push(resp.is_initialization); + + if resp.is_initialization_complete_flag() { + *init_compl.lock().unwrap() = true; + } + + let Some(event) = resp.event else { + continue; + }; let mut cache = c.lock().unwrap(); if let Some(value) = event.current { @@ -387,6 +401,14 @@ async fn test_watch_initial_flush() -> anyhow::Result<()> { let _h = databend_common_base::runtime::spawn(cache_updater); tokio::time::sleep(Duration::from_secs(1)).await; + + assert_eq!(flags.lock().unwrap().clone(), vec![ + true, true, true, true, // existent key-values + false, // initialization complete + ]); + + assert!(*is_initialization_completed.lock().unwrap()); + let keys = { let cache = cache.lock().unwrap(); cache.keys().cloned().collect::>() @@ -398,6 +420,13 @@ async fn test_watch_initial_flush() -> anyhow::Result<()> { client.upsert_kv(UpsertKV::delete("c")).await?; tokio::time::sleep(Duration::from_secs(1)).await; + + assert_eq!(flags.lock().unwrap().clone(), vec![ + true, true, true, true, // existent key-values + false, // initialization complete + false, false, // changes + ]); + let values = { let cache = cache.lock().unwrap(); cache diff --git a/src/meta/types/proto/meta.proto b/src/meta/types/proto/meta.proto index 0903e4cd43e72..e627e4fc3ae5e 100644 --- a/src/meta/types/proto/meta.proto +++ b/src/meta/types/proto/meta.proto @@ -100,7 +100,24 @@ message Event { optional SeqV prev = 3; } -message WatchResponse {Event event = 1;} +// Response in a watch stream from server to the client. +message WatchResponse { + // The event containing key-value change information or a value emitted during initial-flush. + // This includes the key, current value (if any), and previous value (if any). + Event event = 1; + + // Indicates whether this event is part of the initialization. + // + // When true: + // - The event represents an existing key-value record at the time the watch was established + // - These events are sent when initial_flush=true was specified in WatchRequest + // + // When false: + // - The event represents a real-time change that occurred after the watch was established + // - A special event with no key-value data may be sent to indicate the completion + // of the initial flush phase + bool is_initialization = 2; +} // messages for txn message TxnCondition { diff --git a/src/meta/types/src/proto_display/watch_display.rs b/src/meta/types/src/proto_display/watch_display.rs index 4f82c82e46773..846d979ac42c7 100644 --- a/src/meta/types/src/proto_display/watch_display.rs +++ b/src/meta/types/src/proto_display/watch_display.rs @@ -34,7 +34,12 @@ impl fmt::Display for Event { impl fmt::Display for WatchResponse { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.event.display()) + let typ = if self.is_initialization { + "INIT" + } else { + "CHANGE" + }; + write!(f, "{typ}:{}", self.event.display()) } } @@ -79,7 +84,7 @@ mod tests { #[test] fn test_watch_response_display() { - let watch_response = WatchResponse { + let mut watch_response = WatchResponse { event: Some(Event { key: "test_key".to_string(), prev: Some(SeqV { @@ -95,11 +100,24 @@ mod tests { meta: None, }), }), + is_initialization: false, + }; + assert_eq!(watch_response.to_string(), "CHANGE:(test_key: (seq=1 [expire=1970-01-01T00:16:40.000] 'test_prev') -> (seq=2 [] 'test_current'))"); + + watch_response.is_initialization = true; + assert_eq!(watch_response.to_string(), "INIT:(test_key: (seq=1 [expire=1970-01-01T00:16:40.000] 'test_prev') -> (seq=2 [] 'test_current'))"); + + let watch_response = WatchResponse { + event: None, + is_initialization: true, }; - assert_eq!(watch_response.to_string(), "(test_key: (seq=1 [expire=1970-01-01T00:16:40.000] 'test_prev') -> (seq=2 [] 'test_current'))"); + assert_eq!(watch_response.to_string(), "INIT:None"); - let watch_response = WatchResponse { event: None }; - assert_eq!(watch_response.to_string(), "None"); + let watch_response = WatchResponse { + event: None, + is_initialization: false, + }; + assert_eq!(watch_response.to_string(), "CHANGE:None"); } #[test] diff --git a/src/meta/types/src/proto_ext/watch_ext.rs b/src/meta/types/src/proto_ext/watch_ext.rs index 064af2f9c4d04..b07a4c02ab633 100644 --- a/src/meta/types/src/proto_ext/watch_ext.rs +++ b/src/meta/types/src/proto_ext/watch_ext.rs @@ -55,15 +55,47 @@ impl WatchRequest { } impl WatchResponse { - /// Create a new `WatchResponse` with `key`, `prev` and `current` values. - pub fn new3(key: String, prev: Option, current: Option) -> Self { + /// Create a new `WatchResponse` that marks the end of initialization phase. + /// + /// This response is used to signal that the initial-flush phase has completed, + /// and the watch stream can now proceed with real-time change notifications. + pub fn new_initialization_complete() -> Self { + WatchResponse { + event: None, + is_initialization: false, + } + } + + /// Create a new `WatchResponse` for an initial key-value pair. + /// This represents the initial state of a key, not a change event. + /// + /// The response will have `is_initialization` set to true to indicate + /// this is part of the initial data load rather than a real-time change. + pub fn new_initialization_event(key: String, value: SeqV) -> Self { + let ev = pb::Event { + key, + prev: None, + current: Some(pb::SeqV::from(value)), + }; + + WatchResponse { + event: Some(ev), + is_initialization: true, + } + } + + /// Create a new `WatchResponse` for a key-value change event. + pub fn new_change_event(key: String, prev: Option, current: Option) -> Self { let ev = pb::Event { key, prev: prev.map(pb::SeqV::from), current: current.map(pb::SeqV::from), }; - WatchResponse { event: Some(ev) } + WatchResponse { + event: Some(ev), + is_initialization: false, + } } pub fn new(change: &Change, String>) -> Option { @@ -73,7 +105,15 @@ impl WatchResponse { current: change.result.clone().map(pb::SeqV::from), }; - Some(WatchResponse { event: Some(ev) }) + Some(WatchResponse { + event: Some(ev), + is_initialization: false, + }) + } + + /// Check if the response is an empty indicator just to indicating initialization completion. + pub fn is_initialization_complete_flag(&self) -> bool { + !self.is_initialization && self.event.is_none() } pub fn unpack(self) -> Option<(String, Option, Option)> {