Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions src/common/base/src/base/drop_callback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

/// Call a callback when dropped.
pub struct DropCallback {
callback: Option<Box<dyn FnOnce() + Send + 'static>>,
}

impl fmt::Debug for DropCallback {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "DropCallback")
}
}

impl DropCallback {
pub fn new(callback: impl FnOnce() + Send + 'static) -> Self {
DropCallback {
callback: Some(Box::new(callback)),
}
}
}

impl Drop for DropCallback {
fn drop(&mut self) {
if let Some(callback) = self.callback.take() {
callback();
}
}
}

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use super::*;

#[test]
fn test_drop_callback() {
let called = Arc::new(AtomicBool::new(false));
let called_clone = called.clone();
{
let _drop_callback = DropCallback::new(move || {
called_clone.store(true, Ordering::SeqCst);
});
assert!(!called.load(Ordering::SeqCst));
}
assert!(called.load(Ordering::SeqCst));
}
}
2 changes: 2 additions & 0 deletions src/common/base/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod build_info;
mod dma;
mod drop_callback;
mod net;
mod ordered_float;
mod profiling;
Expand All @@ -37,6 +38,7 @@ pub use dma::dma_write_file_vectored;
pub use dma::Alignment;
pub use dma::DmaAllocator;
pub use dma::DmaWriteBuf;
pub use drop_callback::DropCallback;
pub use net::get_free_tcp_port;
pub use net::get_free_udp_port;
pub use ordered_float::OrderedFloat;
Expand Down
4 changes: 3 additions & 1 deletion src/meta/raft-store/src/applier/applier_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use crate::sm_v003::OnChange;

pub(crate) struct ApplierData {
/// Hold a unique permit to serialize all apply operations to the state machine.
pub(crate) _permit: WriterPermit,
///
/// Wrapping it in a Mutex to make it `Sync` while the permit itself is only `Send`.
pub(crate) _permit: Mutex<WriterPermit>,

pub(crate) view: StateMachineView,

Expand Down
6 changes: 3 additions & 3 deletions src/meta/raft-store/src/leveled_store/db_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ impl DBBuilder {
lm: &mut LeveledMap,
make_snapshot_id: impl FnOnce(&SysData) -> String + Send,
) -> Result<DB, io::Error> {
lm.testing_freeze_writable();
lm.freeze_writable_without_permit();

let compacting_data = lm.new_compacting_data();
let (sys_data, strm) = compacting_data.compact_into_stream().await?;
let immutable_data = lm.immutable_data();
let (sys_data, strm) = immutable_data.compact_into_stream().await?;

self.append_kv_stream(strm).await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async fn test_db_scoped_seq_bounded_read() -> anyhow::Result<()> {

a.commit().await?;

sm.levels_mut().testing_freeze_writable();
sm.levels_mut().freeze_writable_without_permit();

let mut a = sm.new_applier().await;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,80 +14,26 @@

use std::fmt;
use std::io;
use std::ops::Deref;
use std::sync::Arc;

use databend_common_meta_types::snapshot_db::DB;
use databend_common_meta_types::sys_data::SysData;
use futures_util::future;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use map_api::mvcc::ScopedSeqBoundedRange;
use map_api::IOResultStream;
use map_api::MapKV;
use rotbl::v001::SeqMarked;
use seq_marked::SeqMarked;
use state_machine_api::ExpireKey;
use state_machine_api::UserKey;
use stream_more::KMerge;
use stream_more::StreamMore;

use crate::leveled_store::immutable_data::ImmutableData;
use crate::leveled_store::immutable_levels::ImmutableLevels;
use crate::leveled_store::rotbl_codec::RotblCodec;
use crate::leveled_store::util;
use crate::utils::add_cooperative_yielding;

/// The data to compact.
///
/// Including several in-memory immutable levels and an optional persisted db.
#[derive(Debug)]
pub(crate) struct CompactingData {
pub(crate) immutable: Arc<ImmutableData>,
}

impl Deref for CompactingData {
type Target = Arc<ImmutableData>;

fn deref(&self) -> &Self::Target {
&self.immutable
}
}

impl CompactingData {
pub fn new(immutable: Arc<ImmutableData>) -> Self {
Self { immutable }
}

// Testing only
#[allow(dead_code)]
pub(crate) fn new_from_levels_and_persisted(
levels: ImmutableLevels,
persisted: Option<DB>,
) -> Self {
let immutable = ImmutableData::new(levels, persisted);
Self {
immutable: Arc::new(immutable),
}
}

/// Compact in-memory immutable levels(excluding on disk db)
/// into one level and keep tombstone record.
///
/// When compact mem levels, do not remove tombstone,
/// because tombstones are still required when compacting with the underlying db.
///
/// This is only used for test
pub async fn compact_immutable_in_place(&mut self) -> Result<(), io::Error> {
// TODO: test: after compaction in place, the data should be the same, the base_seq and newest_seq should be the same.
let immutable_levels = self.immutable.levels().clone();

let levels = immutable_levels.compact_all().await;
let immutable = ImmutableData::new(levels, self.immutable.persisted().cloned());
self.immutable = Arc::new(immutable);

Ok(())
}

impl ImmutableData {
/// Compacted all data into a stream.
///
/// Tombstones are removed because no more compact with lower levels.
Expand All @@ -97,6 +43,7 @@ impl CompactingData {
/// The stream Item is 2 items tuple of key, and value with seq.
///
/// The exported stream contains encoded `String` key and rotbl value [`SeqMarked`]
// TODO: mvcc snapshot_seq
pub async fn compact_into_stream(
&self,
) -> Result<(SysData, IOResultStream<(String, SeqMarked)>), io::Error> {
Expand All @@ -107,7 +54,7 @@ impl CompactingData {
)
}

let immutable_levels = self.immutable.levels();
let immutable_levels = self.levels();
let d = immutable_levels.newest().unwrap();

let sys_data = d.sys_data().clone();
Expand Down Expand Up @@ -141,7 +88,7 @@ impl CompactingData {
let mut kmerge = KMerge::by(util::rotbl_by_key_seq);
kmerge = kmerge.merge(strm);

if let Some(db) = self.immutable.persisted() {
if let Some(db) = self.persisted() {
let db_strm = db.inner_range();
kmerge = kmerge.merge(db_strm);
}
Expand Down
2 changes: 2 additions & 0 deletions src/meta/raft-store/src/leveled_store/immutable_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use crate::leveled_store::map_api::MapKeyEncode;
use crate::leveled_store::value_convert::ValueConvert;
use crate::leveled_store::ScopedSeqBoundedRead;

mod compact_into_stream;

#[derive(Debug, Default, Clone)]
pub struct ImmutableData {
/// The last sequence of the immutable data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,24 @@ impl ImmutableLevels {

#[cfg(test)]
mod tests {
use std::ops::Deref;

use databend_common_meta_types::raft_types::Membership;
use databend_common_meta_types::raft_types::StoredMembership;
use futures_util::TryStreamExt;
use map_api::mvcc::ScopedSeqBoundedRange;
use openraft::testing::log_id;
use seq_marked::SeqMarked;
use state_machine_api::ExpireKey;
use state_machine_api::KVMeta;
use state_machine_api::UserKey;

use crate::sm_v003::compact_immutable_levels_test::build_3_levels;
use crate::sm_v003::compact_immutable_levels_test::build_sm_with_expire;

fn s(x: impl ToString) -> String {
x.to_string()
}

fn b(x: impl ToString) -> Vec<u8> {
x.to_string().as_bytes().to_vec()
Expand All @@ -82,7 +90,7 @@ mod tests {
async fn test_compact_copied_value_and_kv() -> anyhow::Result<()> {
let lm = build_3_levels().await?;

lm.testing_freeze_writable();
lm.freeze_writable_without_permit();
let immutable_levels = lm.immutable_levels();

// Capture the original newest level's index before compaction
Expand Down Expand Up @@ -132,4 +140,55 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_compact_expire_index() -> anyhow::Result<()> {
let sm = build_sm_with_expire().await?;

let immutable_levels = {
sm.leveled_map().freeze_writable_without_permit();
let compactor = sm.acquire_compactor("").await;
let immutable_levels = compactor.immutable_levels();
immutable_levels.compact_all().await
};

let d = immutable_levels.newest().unwrap().deref();

let got = d
.range(UserKey::default().., u64::MAX)
.await?
.try_collect::<Vec<_>>()
.await?;

assert_eq!(got, vec![
//
(
user_key("a"),
SeqMarked::new_normal(4, (Some(KVMeta::new_expires_at(15)), b("a1")))
),
(
user_key("b"),
SeqMarked::new_normal(2, (Some(KVMeta::new_expires_at(5)), b("b0")))
),
(
user_key("c"),
SeqMarked::new_normal(3, (Some(KVMeta::new_expires_at(20)), b("c0")))
),
]);

let got = d
.range(ExpireKey::default().., u64::MAX)
.await?
.try_collect::<Vec<_>>()
.await?;
assert_eq!(got, vec![
//
(ExpireKey::new(5_000, 2), SeqMarked::new_normal(2, s("b"))),
(ExpireKey::new(10_000, 1), SeqMarked::new_tombstone(4)),
(ExpireKey::new(15_000, 4), SeqMarked::new_normal(4, s("a"))),
(ExpireKey::new(20_000, 3), SeqMarked::new_normal(3, s("c"))),
]);

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use crate::sm_v003::SMV003;
async fn test_blocking_wait_timeout() -> anyhow::Result<()> {
let lm = SMV003::default();

let _c = lm.acquire_compactor().await;
let _c = lm.acquire_compactor("").await;

let (tx, rx) = oneshot::channel();

let _ = timeout(std::time::Duration::from_secs(1), async {
let _got = lm.acquire_compactor().await;
let _got = lm.acquire_compactor("").await;
let _ = tx.send(true);
})
.await;
Expand All @@ -43,11 +43,11 @@ async fn test_blocking_wait_timeout() -> anyhow::Result<()> {
async fn test_blocking_wait_ok() -> anyhow::Result<()> {
let lm = SMV003::default();

let _c = lm.acquire_compactor().await;
let _c = lm.acquire_compactor("").await;

let (tx, rx) = oneshot::channel();
databend_common_base::runtime::spawn(async move {
let _got = lm.acquire_compactor().await;
let _got = lm.acquire_compactor("").await;
let _ = tx.send(true);
});

Expand Down
Loading