diff --git a/src/common/base/src/base/drop_callback.rs b/src/common/base/src/base/drop_callback.rs new file mode 100644 index 0000000000000..6ac664e175a42 --- /dev/null +++ b/src/common/base/src/base/drop_callback.rs @@ -0,0 +1,64 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; + +/// Call a callback when dropped. +pub struct DropCallback { + callback: Option>, +} + +impl fmt::Debug for DropCallback { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "DropCallback") + } +} + +impl DropCallback { + pub fn new(callback: impl FnOnce() + Send + 'static) -> Self { + DropCallback { + callback: Some(Box::new(callback)), + } + } +} + +impl Drop for DropCallback { + fn drop(&mut self) { + if let Some(callback) = self.callback.take() { + callback(); + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::AtomicBool; + use std::sync::atomic::Ordering; + use std::sync::Arc; + + use super::*; + + #[test] + fn test_drop_callback() { + let called = Arc::new(AtomicBool::new(false)); + let called_clone = called.clone(); + { + let _drop_callback = DropCallback::new(move || { + called_clone.store(true, Ordering::SeqCst); + }); + assert!(!called.load(Ordering::SeqCst)); + } + assert!(called.load(Ordering::SeqCst)); + } +} diff --git a/src/common/base/src/base/mod.rs b/src/common/base/src/base/mod.rs index 89cffa7273f99..4f98ac033fa04 100644 --- a/src/common/base/src/base/mod.rs +++ b/src/common/base/src/base/mod.rs @@ -14,6 +14,7 @@ mod build_info; mod dma; +mod drop_callback; mod net; mod ordered_float; mod profiling; @@ -37,6 +38,7 @@ pub use dma::dma_write_file_vectored; pub use dma::Alignment; pub use dma::DmaAllocator; pub use dma::DmaWriteBuf; +pub use drop_callback::DropCallback; pub use net::get_free_tcp_port; pub use net::get_free_udp_port; pub use ordered_float::OrderedFloat; diff --git a/src/meta/raft-store/src/applier/applier_data/mod.rs b/src/meta/raft-store/src/applier/applier_data/mod.rs index 7b75025caa0ae..01400f2608e0e 100644 --- a/src/meta/raft-store/src/applier/applier_data/mod.rs +++ b/src/meta/raft-store/src/applier/applier_data/mod.rs @@ -22,7 +22,9 @@ use crate::sm_v003::OnChange; pub(crate) struct ApplierData { /// Hold a unique permit to serialize all apply operations to the state machine. - pub(crate) _permit: WriterPermit, + /// + /// Wrapping it in a Mutex to make it `Sync` while the permit itself is only `Send`. + pub(crate) _permit: Mutex, pub(crate) view: StateMachineView, diff --git a/src/meta/raft-store/src/leveled_store/db_builder.rs b/src/meta/raft-store/src/leveled_store/db_builder.rs index 4284e6dd8cac3..cf1d9be707761 100644 --- a/src/meta/raft-store/src/leveled_store/db_builder.rs +++ b/src/meta/raft-store/src/leveled_store/db_builder.rs @@ -137,10 +137,10 @@ impl DBBuilder { lm: &mut LeveledMap, make_snapshot_id: impl FnOnce(&SysData) -> String + Send, ) -> Result { - lm.testing_freeze_writable(); + lm.freeze_writable_without_permit(); - let compacting_data = lm.new_compacting_data(); - let (sys_data, strm) = compacting_data.compact_into_stream().await?; + let immutable_data = lm.immutable_data(); + let (sys_data, strm) = immutable_data.compact_into_stream().await?; self.append_kv_stream(strm).await?; diff --git a/src/meta/raft-store/src/leveled_store/db_scoped_seq_bounded_read_test.rs b/src/meta/raft-store/src/leveled_store/db_scoped_seq_bounded_read_test.rs index 603112e9e5986..f334c15c64aa5 100644 --- a/src/meta/raft-store/src/leveled_store/db_scoped_seq_bounded_read_test.rs +++ b/src/meta/raft-store/src/leveled_store/db_scoped_seq_bounded_read_test.rs @@ -41,7 +41,7 @@ async fn test_db_scoped_seq_bounded_read() -> anyhow::Result<()> { a.commit().await?; - sm.levels_mut().testing_freeze_writable(); + sm.levels_mut().freeze_writable_without_permit(); let mut a = sm.new_applier().await; diff --git a/src/meta/raft-store/src/leveled_store/leveled_map/compacting_data.rs b/src/meta/raft-store/src/leveled_store/immutable_data/compact_into_stream.rs similarity index 65% rename from src/meta/raft-store/src/leveled_store/leveled_map/compacting_data.rs rename to src/meta/raft-store/src/leveled_store/immutable_data/compact_into_stream.rs index d82c24462493c..246166368b2aa 100644 --- a/src/meta/raft-store/src/leveled_store/leveled_map/compacting_data.rs +++ b/src/meta/raft-store/src/leveled_store/immutable_data/compact_into_stream.rs @@ -14,10 +14,7 @@ use std::fmt; use std::io; -use std::ops::Deref; -use std::sync::Arc; -use databend_common_meta_types::snapshot_db::DB; use databend_common_meta_types::sys_data::SysData; use futures_util::future; use futures_util::StreamExt; @@ -25,69 +22,18 @@ use futures_util::TryStreamExt; use map_api::mvcc::ScopedSeqBoundedRange; use map_api::IOResultStream; use map_api::MapKV; -use rotbl::v001::SeqMarked; +use seq_marked::SeqMarked; use state_machine_api::ExpireKey; use state_machine_api::UserKey; use stream_more::KMerge; use stream_more::StreamMore; use crate::leveled_store::immutable_data::ImmutableData; -use crate::leveled_store::immutable_levels::ImmutableLevels; use crate::leveled_store::rotbl_codec::RotblCodec; use crate::leveled_store::util; use crate::utils::add_cooperative_yielding; -/// The data to compact. -/// -/// Including several in-memory immutable levels and an optional persisted db. -#[derive(Debug)] -pub(crate) struct CompactingData { - pub(crate) immutable: Arc, -} - -impl Deref for CompactingData { - type Target = Arc; - - fn deref(&self) -> &Self::Target { - &self.immutable - } -} - -impl CompactingData { - pub fn new(immutable: Arc) -> Self { - Self { immutable } - } - - // Testing only - #[allow(dead_code)] - pub(crate) fn new_from_levels_and_persisted( - levels: ImmutableLevels, - persisted: Option, - ) -> Self { - let immutable = ImmutableData::new(levels, persisted); - Self { - immutable: Arc::new(immutable), - } - } - - /// Compact in-memory immutable levels(excluding on disk db) - /// into one level and keep tombstone record. - /// - /// When compact mem levels, do not remove tombstone, - /// because tombstones are still required when compacting with the underlying db. - /// - /// This is only used for test - pub async fn compact_immutable_in_place(&mut self) -> Result<(), io::Error> { - // TODO: test: after compaction in place, the data should be the same, the base_seq and newest_seq should be the same. - let immutable_levels = self.immutable.levels().clone(); - - let levels = immutable_levels.compact_all().await; - let immutable = ImmutableData::new(levels, self.immutable.persisted().cloned()); - self.immutable = Arc::new(immutable); - - Ok(()) - } - +impl ImmutableData { /// Compacted all data into a stream. /// /// Tombstones are removed because no more compact with lower levels. @@ -97,6 +43,7 @@ impl CompactingData { /// The stream Item is 2 items tuple of key, and value with seq. /// /// The exported stream contains encoded `String` key and rotbl value [`SeqMarked`] + // TODO: mvcc snapshot_seq pub async fn compact_into_stream( &self, ) -> Result<(SysData, IOResultStream<(String, SeqMarked)>), io::Error> { @@ -107,7 +54,7 @@ impl CompactingData { ) } - let immutable_levels = self.immutable.levels(); + let immutable_levels = self.levels(); let d = immutable_levels.newest().unwrap(); let sys_data = d.sys_data().clone(); @@ -141,7 +88,7 @@ impl CompactingData { let mut kmerge = KMerge::by(util::rotbl_by_key_seq); kmerge = kmerge.merge(strm); - if let Some(db) = self.immutable.persisted() { + if let Some(db) = self.persisted() { let db_strm = db.inner_range(); kmerge = kmerge.merge(db_strm); } diff --git a/src/meta/raft-store/src/leveled_store/immutable_data/mod.rs b/src/meta/raft-store/src/leveled_store/immutable_data/mod.rs index c5d1211b6b991..542f0e7bc9a5e 100644 --- a/src/meta/raft-store/src/leveled_store/immutable_data/mod.rs +++ b/src/meta/raft-store/src/leveled_store/immutable_data/mod.rs @@ -37,6 +37,8 @@ use crate::leveled_store::map_api::MapKeyEncode; use crate::leveled_store::value_convert::ValueConvert; use crate::leveled_store::ScopedSeqBoundedRead; +mod compact_into_stream; + #[derive(Debug, Default, Clone)] pub struct ImmutableData { /// The last sequence of the immutable data. diff --git a/src/meta/raft-store/src/leveled_store/immutable_levels/compact_all.rs b/src/meta/raft-store/src/leveled_store/immutable_levels/compact_all.rs index 3c2f5aef33cb3..be15360b886f2 100644 --- a/src/meta/raft-store/src/leveled_store/immutable_levels/compact_all.rs +++ b/src/meta/raft-store/src/leveled_store/immutable_levels/compact_all.rs @@ -59,6 +59,8 @@ impl ImmutableLevels { #[cfg(test)] mod tests { + use std::ops::Deref; + use databend_common_meta_types::raft_types::Membership; use databend_common_meta_types::raft_types::StoredMembership; use futures_util::TryStreamExt; @@ -66,9 +68,15 @@ mod tests { use openraft::testing::log_id; use seq_marked::SeqMarked; use state_machine_api::ExpireKey; + use state_machine_api::KVMeta; use state_machine_api::UserKey; use crate::sm_v003::compact_immutable_levels_test::build_3_levels; + use crate::sm_v003::compact_immutable_levels_test::build_sm_with_expire; + + fn s(x: impl ToString) -> String { + x.to_string() + } fn b(x: impl ToString) -> Vec { x.to_string().as_bytes().to_vec() @@ -82,7 +90,7 @@ mod tests { async fn test_compact_copied_value_and_kv() -> anyhow::Result<()> { let lm = build_3_levels().await?; - lm.testing_freeze_writable(); + lm.freeze_writable_without_permit(); let immutable_levels = lm.immutable_levels(); // Capture the original newest level's index before compaction @@ -132,4 +140,55 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_compact_expire_index() -> anyhow::Result<()> { + let sm = build_sm_with_expire().await?; + + let immutable_levels = { + sm.leveled_map().freeze_writable_without_permit(); + let compactor = sm.acquire_compactor("").await; + let immutable_levels = compactor.immutable_levels(); + immutable_levels.compact_all().await + }; + + let d = immutable_levels.newest().unwrap().deref(); + + let got = d + .range(UserKey::default().., u64::MAX) + .await? + .try_collect::>() + .await?; + + assert_eq!(got, vec![ + // + ( + user_key("a"), + SeqMarked::new_normal(4, (Some(KVMeta::new_expires_at(15)), b("a1"))) + ), + ( + user_key("b"), + SeqMarked::new_normal(2, (Some(KVMeta::new_expires_at(5)), b("b0"))) + ), + ( + user_key("c"), + SeqMarked::new_normal(3, (Some(KVMeta::new_expires_at(20)), b("c0"))) + ), + ]); + + let got = d + .range(ExpireKey::default().., u64::MAX) + .await? + .try_collect::>() + .await?; + assert_eq!(got, vec![ + // + (ExpireKey::new(5_000, 2), SeqMarked::new_normal(2, s("b"))), + (ExpireKey::new(10_000, 1), SeqMarked::new_tombstone(4)), + (ExpireKey::new(15_000, 4), SeqMarked::new_normal(4, s("a"))), + (ExpireKey::new(20_000, 3), SeqMarked::new_normal(3, s("c"))), + ]); + + Ok(()) + } } diff --git a/src/meta/raft-store/src/leveled_store/leveled_map/acquire_compactor_test.rs b/src/meta/raft-store/src/leveled_store/leveled_map/acquire_compactor_test.rs index beca8a2e4328e..0c69d5a399955 100644 --- a/src/meta/raft-store/src/leveled_store/leveled_map/acquire_compactor_test.rs +++ b/src/meta/raft-store/src/leveled_store/leveled_map/acquire_compactor_test.rs @@ -21,12 +21,12 @@ use crate::sm_v003::SMV003; async fn test_blocking_wait_timeout() -> anyhow::Result<()> { let lm = SMV003::default(); - let _c = lm.acquire_compactor().await; + let _c = lm.acquire_compactor("").await; let (tx, rx) = oneshot::channel(); let _ = timeout(std::time::Duration::from_secs(1), async { - let _got = lm.acquire_compactor().await; + let _got = lm.acquire_compactor("").await; let _ = tx.send(true); }) .await; @@ -43,11 +43,11 @@ async fn test_blocking_wait_timeout() -> anyhow::Result<()> { async fn test_blocking_wait_ok() -> anyhow::Result<()> { let lm = SMV003::default(); - let _c = lm.acquire_compactor().await; + let _c = lm.acquire_compactor("").await; let (tx, rx) = oneshot::channel(); databend_common_base::runtime::spawn(async move { - let _got = lm.acquire_compactor().await; + let _got = lm.acquire_compactor("").await; let _ = tx.send(true); }); diff --git a/src/meta/raft-store/src/leveled_store/leveled_map/applier_acquirer.rs b/src/meta/raft-store/src/leveled_store/leveled_map/applier_acquirer.rs index f8b0a4ede39d4..533da8cb75d0f 100644 --- a/src/meta/raft-store/src/leveled_store/leveled_map/applier_acquirer.rs +++ b/src/meta/raft-store/src/leveled_store/leveled_map/applier_acquirer.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; use std::sync::Arc; use std::time::Instant; +use databend_common_base::base::DropCallback; use log::info; use tokio::sync::OwnedSemaphorePermit; use tokio::sync::Semaphore; @@ -24,6 +26,12 @@ pub struct WriterAcquirer { sem: Arc, } +impl fmt::Display for WriterAcquirer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "WriterAcquirer") + } +} + impl WriterAcquirer { pub fn new(sem: Arc) -> Self { WriterAcquirer { sem } @@ -36,7 +44,9 @@ impl WriterAcquirer { let permit = self.sem.acquire_owned().await.unwrap(); info!("WriterPermit-Acquire: total: {:?}", start.elapsed()); WriterPermit { - start: Instant::now(), + _drop: DropCallback::new(move || { + info!("WriterPermit-Drop: total: {:?}", start.elapsed()); + }), _permit: permit, } } @@ -44,12 +54,6 @@ impl WriterAcquirer { /// ApplierPermit is used to acquire a permit for applying changes to the state machine. pub struct WriterPermit { - start: Instant, _permit: OwnedSemaphorePermit, -} - -impl Drop for WriterPermit { - fn drop(&mut self) { - info!("WriterPermit-Drop: total: {:?}", self.start.elapsed()); - } + _drop: DropCallback, } diff --git a/src/meta/raft-store/src/leveled_store/leveled_map/compactor.rs b/src/meta/raft-store/src/leveled_store/leveled_map/compactor.rs index a1b0cd693f690..a1c18a307767f 100644 --- a/src/meta/raft-store/src/leveled_store/leveled_map/compactor.rs +++ b/src/meta/raft-store/src/leveled_store/leveled_map/compactor.rs @@ -13,14 +13,15 @@ // limitations under the License. use std::io; +use std::sync::Arc; use databend_common_meta_types::snapshot_db::DB; use databend_common_meta_types::sys_data::SysData; use map_api::IOResultStream; use rotbl::v001::SeqMarked; +use crate::leveled_store::immutable_data::ImmutableData; use crate::leveled_store::immutable_levels::ImmutableLevels; -use crate::leveled_store::leveled_map::compacting_data::CompactingData; use crate::leveled_store::leveled_map::compactor_acquirer::CompactorPermit; /// Compactor is responsible for compacting the immutable levels and db. @@ -32,16 +33,20 @@ pub struct Compactor { /// /// This is used to ensure that only one compactor can run at a time. pub(crate) _permit: CompactorPermit, - pub(crate) compacting_data: CompactingData, + pub(crate) immutable_data: Arc, } impl Compactor { + pub fn immutable_data(&self) -> Arc { + self.immutable_data.clone() + } + pub fn immutable_levels(&self) -> ImmutableLevels { - self.compacting_data.immutable.levels().clone() + self.immutable_data.levels().clone() } pub fn db(&self) -> Option { - self.compacting_data.immutable.persisted().cloned() + self.immutable_data.persisted().cloned() } /// Compact in-memory immutable levels(excluding on disk db) @@ -50,7 +55,13 @@ impl Compactor { /// When compact mem levels, do not remove tombstone, /// because tombstones are still required when compacting with the underlying db. pub async fn compact_immutable_in_place(&mut self) -> Result<(), io::Error> { - self.compacting_data.compact_immutable_in_place().await + let immutable_levels = self.immutable_data.levels().clone(); + + let levels = immutable_levels.compact_all().await; + let immutable = ImmutableData::new(levels, self.immutable_data.persisted().cloned()); + self.immutable_data = Arc::new(immutable); + + Ok(()) } /// Compacted all data into a stream. @@ -64,6 +75,6 @@ impl Compactor { pub async fn compact_into_stream( &mut self, ) -> Result<(SysData, IOResultStream<(String, SeqMarked)>), io::Error> { - self.compacting_data.compact_into_stream().await + self.immutable_data.compact_into_stream().await } } diff --git a/src/meta/raft-store/src/leveled_store/leveled_map/compactor_acquirer.rs b/src/meta/raft-store/src/leveled_store/leveled_map/compactor_acquirer.rs index 923a5c0164d50..ea6403914937a 100644 --- a/src/meta/raft-store/src/leveled_store/leveled_map/compactor_acquirer.rs +++ b/src/meta/raft-store/src/leveled_store/leveled_map/compactor_acquirer.rs @@ -12,30 +12,65 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; use std::sync::Arc; +use std::time::Instant; +use databend_common_base::base::DropCallback; +use log::info; use tokio::sync::OwnedSemaphorePermit; use tokio::sync::Semaphore; /// Acquirer is used to acquire a permit for compaction, without holding lock to the state machine. pub struct CompactorAcquirer { + name: String, sem: Arc, } +impl fmt::Display for CompactorAcquirer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "CompactorAcquirer({})", self.name) + } +} + impl CompactorAcquirer { - pub fn new(sem: Arc) -> Self { - CompactorAcquirer { sem } + pub fn new(sem: Arc, name: impl ToString) -> Self { + CompactorAcquirer { + name: name.to_string(), + sem, + } } pub async fn acquire(self) -> CompactorPermit { + let start = Instant::now(); + // Safe unwrap: it returns error only when semaphore is closed. // This semaphore does not close. let permit = self.sem.acquire_owned().await.unwrap(); - CompactorPermit { _permit: permit } + + let name = self.name.clone(); + + info!( + "CompactorPermit({})-Acquire: total: {:?}", + name, + start.elapsed() + ); + + CompactorPermit { + _permit: permit, + _drop: DropCallback::new(move || { + info!( + "CompactorPermit({})-Drop: total: {:?}", + name, + start.elapsed() + ); + }), + } } } #[derive(Debug)] pub struct CompactorPermit { _permit: OwnedSemaphorePermit, + _drop: DropCallback, } diff --git a/src/meta/raft-store/src/leveled_store/leveled_map/leveled_map_test.rs b/src/meta/raft-store/src/leveled_store/leveled_map/leveled_map_test.rs index 5537249908ee1..ca6dd834edb50 100644 --- a/src/meta/raft-store/src/leveled_store/leveled_map/leveled_map_test.rs +++ b/src/meta/raft-store/src/leveled_store/leveled_map/leveled_map_test.rs @@ -39,7 +39,7 @@ async fn test_freeze() -> anyhow::Result<()> { view.commit().await?; // Insert the same entry at level 1 - l.testing_freeze_writable(); + l.freeze_writable_without_permit(); // println!("{:#?}", l); let mut view = l.to_view(); @@ -182,7 +182,7 @@ async fn test_two_levels() -> anyhow::Result<()> { // Create a new level - l.testing_freeze_writable(); + l.freeze_writable_without_permit(); let mut view = l.to_view(); // Override @@ -273,7 +273,7 @@ async fn build_3_levels() -> anyhow::Result { view.set(user_key("d"), Some((None, b("d0")))); view.commit().await?; - l.testing_freeze_writable(); + l.freeze_writable_without_permit(); let mut view = l.to_view(); // internal_seq: 4 @@ -282,7 +282,7 @@ async fn build_3_levels() -> anyhow::Result { view.set(user_key("e"), Some((None, b("e1")))); view.commit().await?; - l.testing_freeze_writable(); + l.freeze_writable_without_permit(); let mut view = l.to_view(); // internal_seq: 6 @@ -464,7 +464,7 @@ async fn build_2_level_with_meta() -> anyhow::Result { ); view.commit().await?; - l.testing_freeze_writable(); + l.freeze_writable_without_permit(); let mut view = l.to_view(); // internal_seq: 3 @@ -520,7 +520,7 @@ async fn build_2_level_consecutive_delete() -> anyhow::Result { view.set(user_key("b"), None); view.commit().await?; - l.testing_freeze_writable(); + l.freeze_writable_without_permit(); let mut view = l.to_view(); // internal_seq: 3 diff --git a/src/meta/raft-store/src/leveled_store/leveled_map/mod.rs b/src/meta/raft-store/src/leveled_store/leveled_map/mod.rs index 3be7fe31bffc9..78e66c475b9c2 100644 --- a/src/meta/raft-store/src/leveled_store/leveled_map/mod.rs +++ b/src/meta/raft-store/src/leveled_store/leveled_map/mod.rs @@ -31,7 +31,7 @@ use crate::leveled_store::immutable::Immutable; use crate::leveled_store::immutable_data::ImmutableData; use crate::leveled_store::immutable_levels::ImmutableLevels; use crate::leveled_store::leveled_map::applier_acquirer::WriterPermit; -use crate::leveled_store::leveled_map::compacting_data::CompactingData; +use crate::leveled_store::leveled_map::compactor_acquirer::CompactorPermit; use crate::leveled_store::leveled_map::leveled_map_data::LeveledMapData; use crate::leveled_store::snapshot::MvccSnapshot; use crate::leveled_store::snapshot::StateMachineSnapshot; @@ -41,7 +41,6 @@ use crate::leveled_store::view::StateMachineView; mod acquire_compactor_test; pub mod applier_acquirer; -pub mod compacting_data; pub mod compactor; pub mod compactor_acquirer; mod impl_commit; @@ -100,17 +99,21 @@ impl LeveledMap { /// Freeze the current writable level and create a new empty writable level. /// - /// Need writer permit and compactor permit - pub fn freeze_writable(&self, _writer_permit: &mut WriterPermit) { + /// Need writer permit to reset the writable level, and compactor permit to add a new immutable level. + pub fn freeze_writable( + &self, + _writer_permit: &mut WriterPermit, + _compactor_permit: &mut CompactorPermit, + ) { self.do_freeze_writable() } /// For testing, requires no permit - pub fn testing_freeze_writable(&self) { + pub fn freeze_writable_without_permit(&self) { self.do_freeze_writable() } - pub fn do_freeze_writable(&self) { + fn do_freeze_writable(&self) { let mut inner = self.data.lock().unwrap(); let new_writable = inner.writable.new_level(); @@ -187,8 +190,8 @@ impl LeveledMap { /// **Important**: Do not drop the compactor within this function when called /// under a state machine lock, as dropping may take ~250ms. pub fn replace_with_compacted(&self, compactor: &mut Compactor, db: DB) { - let upto = compactor.compacting_data.latest_level_index(); - let compactor_indexes = compactor.compacting_data.levels().indexes(); + let upto = compactor.immutable_data.latest_level_index(); + let compactor_indexes = compactor.immutable_data.levels().indexes(); self.with_inner(|inner| { let mut levels = inner.immutable.levels().clone(); @@ -211,8 +214,7 @@ impl LeveledMap { info!("replace_with_compacted: finished replacing the db"); } - pub(crate) fn new_compacting_data(&self) -> CompactingData { - let immutable = self.with_inner(|inner| inner.immutable.clone()); - CompactingData::new(immutable) + pub(crate) fn immutable_data(&self) -> Arc { + self.with_inner(|inner| inner.immutable.clone()) } } diff --git a/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs b/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs index b77442902fd0a..d8d9a4d7c8804 100644 --- a/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs +++ b/src/meta/raft-store/src/sm_v003/compact_immutable_levels_test.rs @@ -12,143 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Deref; - use databend_common_meta_types::node::Node; use databend_common_meta_types::raft_types::Membership; use databend_common_meta_types::raft_types::StoredMembership; use databend_common_meta_types::Endpoint; use databend_common_meta_types::UpsertKV; use futures_util::TryStreamExt; -use map_api::mvcc::ScopedSeqBoundedRange; use map_api::mvcc::ScopedSet; use maplit::btreemap; use openraft::testing::log_id; use pretty_assertions::assert_eq; -use seq_marked::SeqMarked; -use state_machine_api::ExpireKey; -use state_machine_api::KVMeta; use state_machine_api::UserKey; -use crate::leveled_store::leveled_map::compacting_data::CompactingData; use crate::leveled_store::leveled_map::LeveledMap; use crate::sm_v003::sm_v003::SMV003; -#[tokio::test] -async fn test_compact_copied_value_and_kv() -> anyhow::Result<()> { - let lm = build_3_levels().await?; - - lm.testing_freeze_writable(); - let immutable_levels = lm.immutable_levels(); - - let compacted = { - let mut compacting_data = - CompactingData::new_from_levels_and_persisted(immutable_levels.clone(), None); - - compacting_data.compact_immutable_in_place().await?; - compacting_data.levels().clone() - }; - - let d = compacted.newest().unwrap().deref(); - - assert_eq!(compacted.newest_to_oldest().count(), 1); - assert_eq!( - d.last_membership(), - StoredMembership::new( - Some(log_id(3, 3, 3)), - Membership::new_with_defaults(vec![], []) - ) - ); - assert_eq!(d.last_applied(), Some(log_id(3, 3, 3))); - assert_eq!( - d.nodes(), - btreemap! {3=>Node::new("3", Endpoint::new("3", 3))} - ); - - let got = d - .range(UserKey::default().., u64::MAX) - .await? - .try_collect::>() - .await?; - assert_eq!(got, vec![ - // - (user_key("a"), SeqMarked::new_normal(1, (None, b("a0")))), - (user_key("b"), SeqMarked::new_tombstone(4)), - (user_key("c"), SeqMarked::new_tombstone(6)), - (user_key("d"), SeqMarked::new_normal(7, (None, b("d2")))), - (user_key("e"), SeqMarked::new_normal(6, (None, b("e1")))), - ]); - - let got = d - .range(ExpireKey::default().., u64::MAX) - .await? - .try_collect::>() - .await?; - assert_eq!(got, vec![]); - - Ok(()) -} - -#[tokio::test] -async fn test_compact_expire_index() -> anyhow::Result<()> { - let sm = build_sm_with_expire().await?; - - let compacted = { - sm.levels().testing_freeze_writable(); - let mut compactor = sm.acquire_compactor().await; - compactor.compact_immutable_in_place().await?; - compactor.immutable_levels() - }; - - let d = compacted.newest().unwrap().deref(); - - let got = d - .range(UserKey::default().., u64::MAX) - .await? - .try_collect::>() - .await?; - assert_eq!(got, vec![ - // - ( - user_key("a"), - SeqMarked::new_normal(4, (Some(KVMeta::new_expires_at(15)), b("a1"))) - ), - ( - user_key("b"), - SeqMarked::new_normal(2, (Some(KVMeta::new_expires_at(5)), b("b0"))) - ), - ( - user_key("c"), - SeqMarked::new_normal(3, (Some(KVMeta::new_expires_at(20)), b("c0"))) - ), - ]); - - let got = d - .range(ExpireKey::default().., u64::MAX) - .await? - .try_collect::>() - .await?; - assert_eq!(got, vec![ - // - (ExpireKey::new(5_000, 2), SeqMarked::new_normal(2, s("b"))), - (ExpireKey::new(10_000, 1), SeqMarked::new_tombstone(4)), - (ExpireKey::new(15_000, 4), SeqMarked::new_normal(4, s("a"))), - (ExpireKey::new(20_000, 3), SeqMarked::new_normal(3, s("c"))), - ]); - - Ok(()) -} - #[tokio::test] async fn test_compact_3_level() -> anyhow::Result<()> { let lm = build_3_levels().await?; println!("{:#?}", lm); - lm.testing_freeze_writable(); + lm.freeze_writable_without_permit(); - let compacting_data = lm.new_compacting_data(); + let immutable_data = lm.immutable_data(); - let (sys_data, strm) = compacting_data.compact_into_stream().await?; + let (sys_data, strm) = immutable_data.compact_into_stream().await?; assert_eq!( r#"{"last_applied":{"leader_id":{"term":3,"node_id":3},"index":3},"last_membership":{"log_id":{"leader_id":{"term":3,"node_id":3},"index":3},"membership":{"configs":[],"nodes":{}}},"nodes":{"3":{"name":"3","endpoint":{"addr":"3","port":3},"grpc_api_advertise_address":null}},"sequence":7,"data_seq":2}"#, serde_json::to_string(&sys_data).unwrap() @@ -171,9 +59,9 @@ async fn test_compact_3_level() -> anyhow::Result<()> { #[tokio::test] async fn test_export_2_level_with_meta() -> anyhow::Result<()> { let sm = build_sm_with_expire().await?; - sm.levels().testing_freeze_writable(); + sm.leveled_map().freeze_writable_without_permit(); - let mut compactor = sm.acquire_compactor().await; + let mut compactor = sm.acquire_compactor("").await; let (sys_data, strm) = compactor.compact_into_stream().await?; let got = strm @@ -224,7 +112,7 @@ pub(crate) async fn build_3_levels() -> anyhow::Result { view.commit().await?; - lm.testing_freeze_writable(); + lm.freeze_writable_without_permit(); lm.with_sys_data(|sd| { *sd.last_membership_mut() = StoredMembership::new( @@ -242,7 +130,7 @@ pub(crate) async fn build_3_levels() -> anyhow::Result { view.set(user_key("e"), Some((None, b("e1")))); view.commit().await?; - lm.testing_freeze_writable(); + lm.freeze_writable_without_permit(); lm.with_sys_data(|sd| { *sd.last_membership_mut() = StoredMembership::new( @@ -270,7 +158,7 @@ pub(crate) async fn build_3_levels() -> anyhow::Result { /// l1 | a₄ c₃ | 10,1₄ -> ø 15,4₄ -> a 20,3₃ -> c /// ------------------------------------------------------------ /// l0 | a₁ b₂ | 5,2₂ -> b 10,1₁ -> a -async fn build_sm_with_expire() -> anyhow::Result { +pub(crate) async fn build_sm_with_expire() -> anyhow::Result { let mut sm = SMV003::default(); let mut a = sm.new_applier().await; @@ -281,7 +169,7 @@ async fn build_sm_with_expire() -> anyhow::Result { a.commit().await?; - sm.map_mut().testing_freeze_writable(); + sm.map_mut().freeze_writable_without_permit(); let mut a = sm.new_applier().await; a.upsert_kv(&UpsertKV::update("c", b"c0").with_expire_sec(20)) @@ -293,10 +181,6 @@ async fn build_sm_with_expire() -> anyhow::Result { Ok(sm) } -fn s(x: impl ToString) -> String { - x.to_string() -} - fn b(x: impl ToString) -> Vec { x.to_string().as_bytes().to_vec() } diff --git a/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs b/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs index 136fcdc0e21ca..c1ab89ae4d546 100644 --- a/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs +++ b/src/meta/raft-store/src/sm_v003/compact_with_db_test.rs @@ -96,7 +96,7 @@ async fn test_leveled_query_with_db() -> anyhow::Result<()> { async fn test_leveled_query_with_expire_index() -> anyhow::Result<()> { let (sm, _g) = build_sm_with_expire().await?; - let lm = sm.into_levels(); + let lm = sm.into_leveled_map(); assert_eq!(lm.curr_seq(), 4); assert_eq!( @@ -146,7 +146,7 @@ async fn test_leveled_query_with_expire_index() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn test_compact() -> anyhow::Result<()> { let (mut lm, _g) = build_3_levels().await?; - lm.testing_freeze_writable(); + lm.freeze_writable_without_permit(); let temp_dir = tempfile::tempdir()?; let path = temp_dir.path(); @@ -203,10 +203,12 @@ async fn test_compact_expire_index() -> anyhow::Result<()> { let (sm, _g) = build_sm_with_expire().await?; { let mut permit = sm.new_writer_acquirer().acquire().await; - sm.levels().freeze_writable(&mut permit); + let mut compactor_permit = sm.new_compactor_acquirer("").acquire().await; + sm.leveled_map() + .freeze_writable(&mut permit, &mut compactor_permit); } - let mut lm = sm.into_levels(); + let mut lm = sm.into_leveled_map(); let temp_dir = tempfile::tempdir()?; let path = temp_dir.path(); @@ -269,11 +271,11 @@ async fn test_compact_expire_index() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn test_compact_output_3_level() -> anyhow::Result<()> { let (lm, _g) = build_3_levels().await?; - lm.testing_freeze_writable(); + lm.freeze_writable_without_permit(); - let compacting_data = lm.new_compacting_data(); + let immutable_data = lm.immutable_data(); - let (sys_data, strm) = compacting_data.compact_into_stream().await?; + let (sys_data, strm) = immutable_data.compact_into_stream().await?; assert_eq!(sys_data.curr_seq(), 7); assert_eq!( @@ -328,7 +330,7 @@ async fn build_3_levels() -> anyhow::Result<(LeveledMap, impl Drop)> { view.set(user_key("d"), Some((None, b("d0")))); view.commit().await?; - lm.testing_freeze_writable(); + lm.freeze_writable_without_permit(); lm.with_sys_data(|sd| { *sd.last_membership_mut() = StoredMembership::new( Some(log_id(2, 2, 2)), @@ -345,7 +347,7 @@ async fn build_3_levels() -> anyhow::Result<(LeveledMap, impl Drop)> { view.set(user_key("e"), Some((None, b("e1")))); view.commit().await?; - lm.testing_freeze_writable(); + lm.freeze_writable_without_permit(); lm.with_sys_data(|sd| { *sd.last_membership_mut() = StoredMembership::new( @@ -388,7 +390,7 @@ async fn build_sm_with_expire() -> anyhow::Result<(SMV003, impl Drop)> { .await?; a.commit().await?; - sm.map_mut().testing_freeze_writable(); + sm.map_mut().freeze_writable_without_permit(); let mut a = sm.new_applier().await; a.upsert_kv(&UpsertKV::update("c", b"c0").with_expire_sec(20)) diff --git a/src/meta/raft-store/src/sm_v003/mod.rs b/src/meta/raft-store/src/sm_v003/mod.rs index 79a232bb126ad..e2583bf341490 100644 --- a/src/meta/raft-store/src/sm_v003/mod.rs +++ b/src/meta/raft-store/src/sm_v003/mod.rs @@ -32,7 +32,7 @@ pub(crate) mod compact_immutable_levels_test; #[cfg(test)] mod compact_with_db_test; #[cfg(test)] -mod sm_v003_test; +pub(crate) mod sm_v003_test; pub use sm_v003::OnChange; pub use sm_v003::SMV003; diff --git a/src/meta/raft-store/src/sm_v003/sm_v003.rs b/src/meta/raft-store/src/sm_v003/sm_v003.rs index c4b35b8ce13c7..f8b7a3660b5ac 100644 --- a/src/meta/raft-store/src/sm_v003/sm_v003.rs +++ b/src/meta/raft-store/src/sm_v003/sm_v003.rs @@ -50,7 +50,7 @@ use crate::sm_v003::sm_v003_kv_api::SMV003KVApi; pub type OnChange = Box, Option)) + Send + Sync>; pub struct SMV003 { - levels: LeveledMap, + leveled_map: LeveledMap, /// A semaphore that permits at most one compactor to run. pub(crate) compaction_semaphore: Arc, @@ -72,7 +72,7 @@ pub struct SMV003 { impl Default for SMV003 { fn default() -> Self { Self { - levels: Default::default(), + leveled_map: Default::default(), // Only one compactor is allowed a time. compaction_semaphore: Arc::new(Semaphore::new(1)), // Only one writer is allowed a time. @@ -86,7 +86,7 @@ impl Default for SMV003 { impl fmt::Debug for SMV003 { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("SMV003") - .field("levels", &self.levels) + .field("levels", &self.leveled_map) .field( "on_change_applied", &self @@ -148,11 +148,11 @@ impl StateMachineApi for ApplierData { impl SMV003 { /// Return a mutable reference to the map that stores app data. pub(in crate::sm_v003) fn map_mut(&mut self) -> &mut LeveledMap { - &mut self.levels + &mut self.leveled_map } pub fn to_state_machine_snapshot(&self) -> StateMachineSnapshot { - self.levels.to_state_machine_snapshot() + self.leveled_map.to_state_machine_snapshot() } pub fn kv_api(&self) -> SMV003KVApi { @@ -164,7 +164,7 @@ impl SMV003 { let sys_data = db.sys_data().clone(); let new_sm = SMV003 { - levels: LeveledMap { + leveled_map: LeveledMap { data: Arc::new(Mutex::new(LeveledMapData { writable: Default::default(), immutable: Arc::new(ImmutableData::new(Default::default(), Some(db))), @@ -176,17 +176,17 @@ impl SMV003 { on_change_applied: self.on_change_applied.clone(), }; - new_sm.levels.with_sys_data(|s| *s = sys_data); + new_sm.leveled_map.with_sys_data(|s| *s = sys_data); new_sm } pub fn get_snapshot(&self) -> Option { - self.levels.persisted() + self.leveled_map.persisted() } pub fn data(&self) -> &LeveledMap { - &self.levels + &self.leveled_map } pub async fn get_maybe_expired_kv(&self, key: &str) -> Result, io::Error> { @@ -199,9 +199,9 @@ impl SMV003 { pub(crate) async fn new_applier(&self) -> Applier { let permit = self.acquire_writer_permit().await; - let view = self.levels.to_view(); + let view = self.leveled_map.to_view(); let applier_data = ApplierData { - _permit: permit, + _permit: Mutex::new(permit), view, cleanup_start_time: self.cleanup_start_time.clone(), on_change_applied: self.get_on_change_applied(), @@ -217,7 +217,6 @@ impl SMV003 { pub async fn acquire_writer_permit(&self) -> WriterPermit { let acquirer = self.new_writer_acquirer(); let permit = acquirer.acquire().await; - debug!("WriterPermit acquired"); permit } @@ -251,19 +250,19 @@ impl SMV003 { } pub fn sys_data(&self) -> SysData { - self.levels.with_sys_data(|x| x.clone()) + self.leveled_map.with_sys_data(|x| x.clone()) } pub fn with_sys_data(&self, f: impl FnOnce(&mut SysData) -> T) -> T { - self.levels.with_sys_data(f) + self.leveled_map.with_sys_data(f) } - pub fn into_levels(self) -> LeveledMap { - self.levels + pub fn into_leveled_map(self) -> LeveledMap { + self.leveled_map } - pub fn levels(&self) -> &LeveledMap { - &self.levels + pub fn leveled_map(&self) -> &LeveledMap { + &self.leveled_map } pub fn levels_mut(&mut self) -> &mut LeveledMap { @@ -276,19 +275,19 @@ impl SMV003 { } /// Get a singleton `Compactor` instance specific to `self`. - pub async fn acquire_compactor(&self) -> Compactor { - let permit = self.new_compactor_acquirer().acquire().await; + pub async fn acquire_compactor(&self, name: impl ToString) -> Compactor { + let permit = self.new_compactor_acquirer(name).acquire().await; self.new_compactor(permit) } pub fn new_compactor(&self, permit: CompactorPermit) -> Compactor { Compactor { _permit: permit, - compacting_data: self.levels.new_compacting_data(), + immutable_data: self.leveled_map.immutable_data(), } } - pub fn new_compactor_acquirer(&self) -> CompactorAcquirer { - CompactorAcquirer::new(self.compaction_semaphore.clone()) + pub fn new_compactor_acquirer(&self, name: impl ToString) -> CompactorAcquirer { + CompactorAcquirer::new(self.compaction_semaphore.clone(), name) } } diff --git a/src/meta/raft-store/src/sm_v003/sm_v003_test.rs b/src/meta/raft-store/src/sm_v003/sm_v003_test.rs index d56cf87e1cfe9..b87b7870bf6e9 100644 --- a/src/meta/raft-store/src/sm_v003/sm_v003_test.rs +++ b/src/meta/raft-store/src/sm_v003/sm_v003_test.rs @@ -97,7 +97,7 @@ async fn test_two_level_upsert_get_range() -> anyhow::Result<()> { a.upsert_kv(&UpsertKV::update("c", b"c0")).await?; a.commit().await?; - sm.map_mut().testing_freeze_writable(); + sm.map_mut().freeze_writable_without_permit(); let mut a = sm.new_applier().await; // internal_seq = 3 @@ -180,7 +180,7 @@ async fn build_sm_with_expire() -> anyhow::Result { a.commit().await?; - sm.map_mut().testing_freeze_writable(); + sm.map_mut().freeze_writable_without_permit(); let mut a = sm.new_applier().await; a.upsert_kv(&UpsertKV::update("c", b"c0").with_expire_sec(20)) diff --git a/src/meta/service/src/store/raft_state_machine_impl.rs b/src/meta/service/src/store/raft_state_machine_impl.rs index 8c12849ffa780..ed6e04d75b463 100644 --- a/src/meta/service/src/store/raft_state_machine_impl.rs +++ b/src/meta/service/src/store/raft_state_machine_impl.rs @@ -128,7 +128,7 @@ impl RaftStateMachine for RaftStore { info!(id = self.id; "get snapshot start"); let r = self.state_machine(); - let db = r.levels().persisted(); + let db = r.leveled_map().persisted(); let snapshot = db.map(|x| Snapshot { meta: x.snapshot_meta().clone(), diff --git a/src/meta/service/src/store/store_inner.rs b/src/meta/service/src/store/store_inner.rs index 664f8c5b75282..c363b306f4933 100644 --- a/src/meta/service/src/store/store_inner.rs +++ b/src/meta/service/src/store/store_inner.rs @@ -189,15 +189,18 @@ impl RaftStoreInner { let _guard = SnapshotBuilding::guard(); + let mut compactor_permit = self + .new_compactor_acquirer("build_snapshot") + .acquire() + .await; { let mut writer_permit = self.state_machine().acquire_writer_permit().await; self.state_machine() - .levels() - .freeze_writable(&mut writer_permit); + .leveled_map() + .freeze_writable(&mut writer_permit, &mut compactor_permit); } - let permit = self.new_compactor_acquirer().acquire().await; - let mut compactor = self.state_machine().new_compactor(permit); + let mut compactor = self.state_machine().new_compactor(compactor_permit); info!("do_build_snapshot compactor created: {:?}", compactor); @@ -281,7 +284,7 @@ impl RaftStoreInner { { self.state_machine() - .levels() + .leveled_map() .replace_with_compacted(&mut compactor, db.clone()); info!("do_build_snapshot-replace_with_compacted returned"); } @@ -313,7 +316,7 @@ impl RaftStoreInner { /// It returns None if there is no snapshot or there is an error parsing snapshot meta or id. pub(crate) async fn try_get_snapshot_key_count(&self) -> Option { let sm = self.state_machine(); - let db = sm.levels().persisted()?; + let db = sm.leveled_map().persisted()?; Some(db.stat().key_num) } @@ -326,7 +329,7 @@ impl RaftStoreInner { /// Returns an empty map if no snapshot exists. pub(crate) async fn get_snapshot_key_space_stat(&self) -> BTreeMap { let sm = self.state_machine(); - let Some(db) = sm.levels().persisted() else { + let Some(db) = sm.leveled_map().persisted() else { return Default::default(); }; db.sys_data().key_counts().clone() @@ -335,7 +338,7 @@ impl RaftStoreInner { /// Get the statistics of the snapshot database. pub(crate) async fn get_snapshot_db_stat(&self) -> DBStat { let sm = self.state_machine(); - let Some(db) = sm.levels().persisted() else { + let Some(db) = sm.leveled_map().persisted() else { return Default::default(); }; db.db_stat() @@ -404,7 +407,7 @@ impl RaftStoreInner { Ok(line) } - let permit = self.new_compactor_acquirer().acquire().await; + let permit = self.new_compactor_acquirer("export").acquire().await; let mut dump = { let log = self.log.read().await; @@ -533,8 +536,8 @@ impl RaftStoreInner { self.state_machine.lock().unwrap().clone() } - fn new_compactor_acquirer(&self) -> CompactorAcquirer { + fn new_compactor_acquirer(&self, name: impl ToString) -> CompactorAcquirer { let sm = self.state_machine(); - sm.new_compactor_acquirer() + sm.new_compactor_acquirer(name) } }