Skip to content

Commit

Permalink
feat: allow using the new memtable
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Feb 25, 2024
1 parent 638cf29 commit 9452519
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 10 deletions.
5 changes: 5 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use serde::{Deserialize, Serialize};
use serde_with::{serde_as, NoneAsEmptyString};

use crate::error::Result;
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;

/// Default max running background job.
Expand Down Expand Up @@ -102,6 +103,9 @@ pub struct MitoConfig {

/// Inverted index configs.
pub inverted_index: InvertedIndexConfig,

/// Experimental memtable.
pub experimental_memtable: Option<MergeTreeConfig>,
}

impl Default for MitoConfig {
Expand All @@ -127,6 +131,7 @@ impl Default for MitoConfig {
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
inverted_index: InvertedIndexConfig::default(),
experimental_memtable: None,
};

// Adjust buffer and cache size according to system memory if we can.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,11 @@ async fn test_region_usage() {
flush_region(&engine, region_id, None).await;

let region_stat = region.region_usage().await;
assert_eq!(region_stat.wal_usage, 0);
assert_eq!(region_stat.sst_usage, 2962);

// region total usage
assert_eq!(region_stat.disk_usage(), 4028);
// Some memtables may share items.
assert!(region_stat.disk_usage() >= 4028);
}

#[tokio::test]
Expand Down
15 changes: 10 additions & 5 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::fmt;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
Expand All @@ -54,7 +55,7 @@ struct PkId {
}

/// Config for the merge tree memtable.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MergeTreeConfig {
/// Max keys in an index shard.
pub index_max_keys_per_shard: usize,
Expand Down Expand Up @@ -248,16 +249,19 @@ impl MergeTreeMemtable {
/// Builder to build a [MergeTreeMemtable].
#[derive(Debug, Default)]
pub struct MergeTreeMemtableBuilder {
write_buffer_manager: Option<WriteBufferManagerRef>,
config: MergeTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
}

impl MergeTreeMemtableBuilder {
/// Creates a new builder with specific `write_buffer_manager`.
pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> Self {
pub fn new(
config: MergeTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
Self {
config,
write_buffer_manager,
config: MergeTreeConfig::default(),
}
}
}
Expand Down Expand Up @@ -420,7 +424,8 @@ mod tests {
memtable_util::metadata_with_primary_key(vec![], false)
};
// Try to build a memtable via the builder.
let memtable = MergeTreeMemtableBuilder::new(None).build(1, &metadata);
let memtable =
MergeTreeMemtableBuilder::new(MergeTreeConfig::default(), None).build(1, &metadata);

let expect = (0..100).collect::<Vec<_>>();
let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
Expand Down
15 changes: 12 additions & 3 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::config::MitoConfig;
use crate::error::{InvalidRequestSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::manifest::action::RegionEdit;
use crate::memtable::merge_tree::MergeTreeMemtableBuilder;
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::memtable::MemtableBuilderRef;
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
Expand Down Expand Up @@ -323,6 +324,16 @@ impl<S: LogStore> WorkerStarter<S> {
let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);

let running = Arc::new(AtomicBool::new(true));
let memtable_builder = if let Some(config) = &self.config.experimental_memtable {
Arc::new(MergeTreeMemtableBuilder::new(
config.clone(),
Some(self.write_buffer_manager.clone()),
)) as _
} else {
Arc::new(TimeSeriesMemtableBuilder::new(Some(
self.write_buffer_manager.clone(),
))) as _
};
let mut worker_thread = RegionWorkerLoop {
id: self.id,
config: self.config,
Expand All @@ -333,9 +344,7 @@ impl<S: LogStore> WorkerStarter<S> {
wal: Wal::new(self.log_store),
object_store_manager: self.object_store_manager.clone(),
running: running.clone(),
memtable_builder: Arc::new(TimeSeriesMemtableBuilder::new(Some(
self.write_buffer_manager.clone(),
))),
memtable_builder,
scheduler: self.scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
flush_scheduler: FlushScheduler::new(self.scheduler.clone()),
Expand Down

0 comments on commit 9452519

Please sign in to comment.