Skip to content

Commit

Permalink
feat: Implement dedup for the new memtable and expose the config (#3377)
Browse files Browse the repository at this point in the history
* fix: KeyValues num_fields() is incorrect

* chore: fix warnings

* feat: support dedup

* feat: allow using the new memtable

* feat: serde default for config

* fix: resets pk index after finishing a dict
  • Loading branch information
evenyag committed Feb 25, 2024
1 parent 606309f commit e481f07
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 146 deletions.
5 changes: 5 additions & 0 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use serde::{Deserialize, Serialize};
use serde_with::{serde_as, NoneAsEmptyString};

use crate::error::Result;
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;

/// Default max running background job.
Expand Down Expand Up @@ -102,6 +103,9 @@ pub struct MitoConfig {

/// Inverted index configs.
pub inverted_index: InvertedIndexConfig,

/// Experimental memtable.
pub experimental_memtable: Option<MergeTreeConfig>,
}

impl Default for MitoConfig {
Expand All @@ -127,6 +131,7 @@ impl Default for MitoConfig {
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
inverted_index: InvertedIndexConfig::default(),
experimental_memtable: None,
};

// Adjust buffer and cache size according to system memory if we can.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,11 @@ async fn test_region_usage() {
flush_region(&engine, region_id, None).await;

let region_stat = region.region_usage().await;
assert_eq!(region_stat.wal_usage, 0);
assert_eq!(region_stat.sst_usage, 2962);

// region total usage
assert_eq!(region_stat.disk_usage(), 4028);
// Some memtables may share items.
assert!(region_stat.disk_usage() >= 4028);
}

#[tokio::test]
Expand Down
48 changes: 41 additions & 7 deletions src/mito2/src/memtable/key_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl<'a> KeyValue<'a> {

/// Get number of field columns.
pub fn num_fields(&self) -> usize {
self.row.values.len() - self.helper.num_primary_key_column - 1
self.helper.indices.len() - self.helper.num_primary_key_column - 1
}

/// Get sequence.
Expand Down Expand Up @@ -261,7 +261,13 @@ mod tests {
}
}

fn check_key_values(kvs: &KeyValues, num_rows: usize, keys: &[i64], ts: i64, values: &[i64]) {
fn check_key_values(
kvs: &KeyValues,
num_rows: usize,
keys: &[Option<i64>],
ts: i64,
values: &[Option<i64>],
) {
assert_eq!(num_rows, kvs.num_rows());
let mut expect_seq = START_SEQ;
let expect_ts = ValueRef::Int64(ts);
Expand All @@ -273,10 +279,10 @@ mod tests {
assert_eq!(values.len(), kv.num_fields());

assert_eq!(expect_ts, kv.timestamp());
let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::Int64(*k)).collect();
let expect_keys: Vec<_> = keys.iter().map(|k| ValueRef::from(*k)).collect();
let actual_keys: Vec<_> = kv.primary_keys().collect();
assert_eq!(expect_keys, actual_keys);
let expect_values: Vec<_> = values.iter().map(|v| ValueRef::Int64(*v)).collect();
let expect_values: Vec<_> = values.iter().map(|v| ValueRef::from(*v)).collect();
let actual_values: Vec<_> = kv.fields().collect();
assert_eq!(expect_values, actual_values);
}
Expand Down Expand Up @@ -312,7 +318,7 @@ mod tests {
// KeyValues
// keys: [k0=2, k1=0]
// ts: 1,
check_key_values(&kvs, 3, &[2, 0], 1, &[]);
check_key_values(&kvs, 3, &[Some(2), Some(0)], 1, &[]);
}

#[test]
Expand All @@ -325,7 +331,7 @@ mod tests {
// KeyValues (note that v0 is in front of v1 in region schema)
// ts: 2,
// fields: [v0=1, v1=0]
check_key_values(&kvs, 3, &[], 2, &[1, 0]);
check_key_values(&kvs, 3, &[], 2, &[Some(1), Some(0)]);
}

#[test]
Expand All @@ -339,6 +345,34 @@ mod tests {
// keys: [k0=0, k1=3]
// ts: 2,
// fields: [v0=1, v1=4]
check_key_values(&kvs, 3, &[0, 3], 2, &[1, 4]);
check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), Some(4)]);
}

#[test]
fn test_sparse_field() {
let meta = new_region_metadata(2, 2);
// The value of each row:
// k0=0, v0=1, ts=2, k1=3, (v1 will be null)
let mutation = new_mutation(&["k0", "v0", "ts", "k1"], 3);
let kvs = KeyValues::new(&meta, mutation).unwrap();
// KeyValues
// keys: [k0=0, k1=3]
// ts: 2,
// fields: [v0=1, v1=null]
check_key_values(&kvs, 3, &[Some(0), Some(3)], 2, &[Some(1), None]);
}

#[test]
fn test_sparse_tag_field() {
let meta = new_region_metadata(2, 2);
// The value of each row:
// k0 = 0, v0=1, ts=2, (k1, v1 will be null)
let mutation = new_mutation(&["k0", "v0", "ts"], 3);
let kvs = KeyValues::new(&meta, mutation).unwrap();
// KeyValues
// keys: [k0=0, k1=null]
// ts: 2,
// fields: [v0=1, v1=null]
check_key_values(&kvs, 3, &[Some(0), None], 2, &[Some(1), None]);
}
}
16 changes: 11 additions & 5 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::fmt;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
Expand All @@ -54,7 +55,8 @@ struct PkId {
}

/// Config for the merge tree memtable.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct MergeTreeConfig {
/// Max keys in an index shard.
pub index_max_keys_per_shard: usize,
Expand Down Expand Up @@ -248,16 +250,19 @@ impl MergeTreeMemtable {
/// Builder to build a [MergeTreeMemtable].
#[derive(Debug, Default)]
pub struct MergeTreeMemtableBuilder {
write_buffer_manager: Option<WriteBufferManagerRef>,
config: MergeTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
}

impl MergeTreeMemtableBuilder {
/// Creates a new builder with specific `write_buffer_manager`.
pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> Self {
pub fn new(
config: MergeTreeConfig,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
Self {
config,
write_buffer_manager,
config: MergeTreeConfig::default(),
}
}
}
Expand Down Expand Up @@ -420,7 +425,8 @@ mod tests {
memtable_util::metadata_with_primary_key(vec![], false)
};
// Try to build a memtable via the builder.
let memtable = MergeTreeMemtableBuilder::new(None).build(1, &metadata);
let memtable =
MergeTreeMemtableBuilder::new(MergeTreeConfig::default(), None).build(1, &metadata);

let expect = (0..100).collect::<Vec<_>>();
let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
Expand Down
27 changes: 3 additions & 24 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,29 +883,6 @@ impl DataPartsReader {
}
}

#[cfg(test)]
pub(crate) fn write_rows_to_buffer(
buffer: &mut DataBuffer,
schema: &RegionMetadataRef,
pk_index: u16,
ts: Vec<i64>,
v0: Vec<Option<f64>>,
sequence: u64,
) {
let kvs = crate::test_util::memtable_util::build_key_values_with_ts_seq_values(
schema,
"whatever".to_string(),
1,
ts.into_iter(),
v0.into_iter(),
sequence,
);

for kv in kvs.iter() {
buffer.write_row(pk_index, kv);
}
}

#[cfg(test)]
mod tests {
use datafusion::arrow::array::Float64Array;
Expand All @@ -914,7 +891,9 @@ mod tests {
use parquet::data_type::AsBytes;

use super::*;
use crate::test_util::memtable_util::{extract_data_batch, metadata_for_test};
use crate::test_util::memtable_util::{
extract_data_batch, metadata_for_test, write_rows_to_buffer,
};

#[test]
fn test_lazy_mutable_vector_builder() {
Expand Down
85 changes: 35 additions & 50 deletions src/mito2/src/memtable/merge_tree/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,19 @@ use std::ops::Range;

use crate::error::Result;
use crate::memtable::merge_tree::data::DataBatch;
use crate::memtable::merge_tree::shard::DataBatchSource;
use crate::memtable::merge_tree::PkId;

pub trait DedupSource {
/// Returns whether current source is still valid.
fn is_valid(&self) -> bool;

/// Advances source to next data batch.
fn next(&mut self) -> Result<()>;

/// Returns current pk id.
/// # Panics
/// If source is not valid.
fn current_pk_id(&self) -> PkId;

/// Returns the current primary key bytes.
/// # Panics
/// If source is not valid.
fn current_key(&self) -> &[u8];

/// Returns the data part.
/// # Panics
/// If source is not valid.
fn current_data_batch(&self) -> DataBatch;
}

struct DedupReader<T> {
/// A reader that dedup sorted batches from a merger.
pub struct DedupReader<T> {
prev_batch_last_row: Option<(PkId, i64)>,
current_batch_range: Option<Range<usize>>,
inner: T,
}

impl<T: DedupSource> DedupReader<T> {
fn try_new(inner: T) -> Result<Self> {
impl<T: DataBatchSource> DedupReader<T> {
/// Creates a new dedup reader.
pub fn try_new(inner: T) -> Result<Self> {
let mut res = Self {
prev_batch_last_row: None,
current_batch_range: None,
Expand All @@ -57,24 +37,13 @@ impl<T: DedupSource> DedupReader<T> {
res.next()?;
Ok(res)
}
}

impl<T: DataBatchSource> DataBatchSource for DedupReader<T> {
fn is_valid(&self) -> bool {
self.current_batch_range.is_some()
}

/// Returns current encoded primary key.
/// # Panics
/// If inner reader is exhausted.
fn current_key(&self) -> &[u8] {
self.inner.current_key()
}

fn current_data_batch(&self) -> DataBatch {
let range = self.current_batch_range.as_ref().unwrap();
let data_batch = self.inner.current_data_batch();
data_batch.slice(range.start, range.len())
}

fn next(&mut self) -> Result<()> {
loop {
match &mut self.prev_batch_last_row {
Expand Down Expand Up @@ -122,40 +91,56 @@ impl<T: DedupSource> DedupReader<T> {
}
Ok(())
}

fn current_pk_id(&self) -> PkId {
self.inner.current_pk_id()
}

fn current_key(&self) -> Option<&[u8]> {
self.inner.current_key()
}

fn current_data_batch(&self) -> DataBatch {
let range = self.current_batch_range.as_ref().unwrap();
let data_batch = self.inner.current_data_batch();
data_batch.slice(range.start, range.len())
}
}

#[cfg(test)]
mod tests {
use store_api::metadata::RegionMetadataRef;

use super::*;
use crate::memtable::merge_tree::data::{
write_rows_to_buffer, DataBuffer, DataParts, DataPartsReader,
use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DataPartsReader};
use crate::test_util::memtable_util::{
extract_data_batch, metadata_for_test, write_rows_to_buffer,
};
use crate::test_util::memtable_util::{extract_data_batch, metadata_for_test};

impl DedupSource for DataPartsReader {
struct MockSource(DataPartsReader);

impl DataBatchSource for MockSource {
fn is_valid(&self) -> bool {
self.is_valid()
self.0.is_valid()
}

fn next(&mut self) -> Result<()> {
self.next()
self.0.next()
}

fn current_pk_id(&self) -> PkId {
PkId {
shard_id: 0,
pk_index: self.current_data_batch().pk_index(),
pk_index: self.0.current_data_batch().pk_index(),
}
}

fn current_key(&self) -> &[u8] {
b"abcf"
fn current_key(&self) -> Option<&[u8]> {
None
}

fn current_data_batch(&self) -> DataBatch {
self.current_data_batch()
self.0.current_data_batch()
}
}

Expand Down Expand Up @@ -194,7 +179,7 @@ mod tests {
let mut parts = DataParts::new(meta, 10, true).with_frozen(frozens);

let mut res = Vec::with_capacity(expected.len());
let mut reader = DedupReader::try_new(parts.read().unwrap()).unwrap();
let mut reader = DedupReader::try_new(MockSource(parts.read().unwrap())).unwrap();
while reader.is_valid() {
let batch = reader.current_data_batch();
res.push(extract_data_batch(&batch));
Expand Down

0 comments on commit e481f07

Please sign in to comment.