Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge tree dedup reader #3375

Merged
merged 7 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Memtable implementation based on a merge tree.

mod data;
mod dedup;
mod dict;
mod merger;
mod metrics;
Expand Down Expand Up @@ -59,13 +60,16 @@ pub struct MergeTreeConfig {
pub index_max_keys_per_shard: usize,
/// Number of rows to freeze a data part.
pub data_freeze_threshold: usize,
/// Whether to delete duplicates rows.
pub dedup: bool,
}

impl Default for MergeTreeConfig {
fn default() -> Self {
Self {
index_max_keys_per_shard: 8192,
data_freeze_threshold: 102400,
dedup: true,
}
}
}
Expand Down
174 changes: 117 additions & 57 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ pub(crate) struct DataBatchRange {

impl DataBatchRange {
pub(crate) fn len(&self) -> usize {
(self.start..self.end).len()
self.end - self.start
}

pub(crate) fn is_empty(&self) -> bool {
(self.start..self.end).is_empty()
self.len() == 0
}
}

Expand Down Expand Up @@ -163,6 +163,10 @@ impl<'a> DataBatch<'a> {
},
}
}

pub(crate) fn num_rows(&self) -> usize {
self.range.len()
}
}

/// Buffer for the value part (pk_index, ts, sequence, op_type, field columns) in a shard.
Expand All @@ -180,11 +184,13 @@ pub struct DataBuffer {
op_type_builder: UInt8VectorBuilder,
/// Builders for field columns.
field_builders: Vec<LazyMutableVectorBuilder>,

dedup: bool,
}

impl DataBuffer {
/// Creates a `DataBuffer` instance with given schema and capacity.
pub fn with_capacity(metadata: RegionMetadataRef, init_capacity: usize) -> Self {
pub fn with_capacity(metadata: RegionMetadataRef, init_capacity: usize, dedup: bool) -> Self {
let ts_builder = metadata
.time_index_column()
.column_schema
Expand All @@ -209,6 +215,7 @@ impl DataBuffer {
sequence_builder,
op_type_builder,
field_builders,
dedup,
}
}

Expand Down Expand Up @@ -237,7 +244,13 @@ impl DataBuffer {
pk_weights: Option<&[u16]>,
replace_pk_index: bool,
) -> Result<DataPart> {
let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None, replace_pk_index);
let encoder = DataPartEncoder::new(
&self.metadata,
pk_weights,
None,
replace_pk_index,
self.dedup,
);
let parts = encoder.write(self)?;
Ok(parts)
}
Expand All @@ -246,13 +259,12 @@ impl DataBuffer {
/// If pk_weights is present, yielded rows are sorted according to weights,
/// otherwise rows are sorted by "pk_weights" values as they are actually weights.
pub fn read(&mut self, pk_weights: Option<&[u16]>) -> Result<DataBufferReader> {
// todo(hl): control whether to dedup while invoking `read`.
let batch = data_buffer_to_record_batches(
self.data_part_schema.clone(),
self,
pk_weights,
true,
true,
self.dedup,
// replace_pk_index is always set to false since:
// - for DataBuffer in ShardBuilder, pk dict is not frozen
// - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`.
Expand Down Expand Up @@ -629,6 +641,7 @@ struct DataPartEncoder<'a> {
pk_weights: Option<&'a [u16]>,
row_group_size: Option<usize>,
replace_pk_index: bool,
dedup: bool,
}

impl<'a> DataPartEncoder<'a> {
Expand All @@ -637,13 +650,15 @@ impl<'a> DataPartEncoder<'a> {
pk_weights: Option<&'a [u16]>,
row_group_size: Option<usize>,
replace_pk_index: bool,
dedup: bool,
) -> DataPartEncoder<'a> {
let schema = memtable_schema_to_encoded_schema(metadata);
Self {
schema,
pk_weights,
row_group_size,
replace_pk_index,
dedup,
}
}

Expand All @@ -663,7 +678,7 @@ impl<'a> DataPartEncoder<'a> {
source,
self.pk_weights,
false,
true,
self.dedup,
self.replace_pk_index,
)?;
writer.write(&rb).context(error::EncodeMemtableSnafu)?;
Expand Down Expand Up @@ -803,9 +818,9 @@ pub struct DataParts {
}

impl DataParts {
pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize) -> Self {
pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize, dedup: bool) -> Self {
Self {
active: DataBuffer::with_capacity(metadata, capacity),
active: DataBuffer::with_capacity(metadata, capacity, dedup),
frozen: Vec::new(),
}
}
Expand Down Expand Up @@ -868,6 +883,47 @@ impl DataPartsReader {
}
}

#[cfg(test)]
pub(crate) fn write_rows_to_buffer(
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
buffer: &mut DataBuffer,
schema: &RegionMetadataRef,
pk_index: u16,
ts: Vec<i64>,
v0: Vec<Option<f64>>,
sequence: u64,
) {
let kvs = crate::test_util::memtable_util::build_key_values_with_ts_seq_values(
schema,
"whatever".to_string(),
1,
ts.into_iter(),
v0.into_iter(),
sequence,
);

for kv in kvs.iter() {
buffer.write_row(pk_index, kv);
}
}

#[cfg(test)]
pub(crate) fn extract_data_batch(batch: &DataBatch) -> (u16, Vec<(i64, u64)>) {
let rb = batch.slice_record_batch();
let ts = timestamp_array_to_i64_slice(rb.column(1));
let seq = rb
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values();
let ts_and_seq = ts
.iter()
.zip(seq.iter())
.map(|(ts, seq)| (*ts, *seq))
.collect::<Vec<_>>();
(batch.pk_index(), ts_and_seq)
}

#[cfg(test)]
mod tests {
use datafusion::arrow::array::Float64Array;
Expand All @@ -876,7 +932,7 @@ mod tests {
use parquet::data_type::AsBytes;

use super::*;
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
use crate::test_util::memtable_util::metadata_for_test;

#[test]
fn test_lazy_mutable_vector_builder() {
Expand All @@ -900,7 +956,7 @@ mod tests {

fn check_test_data_buffer_to_record_batches(keep_data: bool) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);

write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1);
write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2);
Expand Down Expand Up @@ -968,10 +1024,50 @@ mod tests {
check_test_data_buffer_to_record_batches(false);
}

fn check_data_buffer_dedup(dedup: bool) {
let metadata = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(metadata.clone(), 10, dedup);
write_rows_to_buffer(
&mut buffer,
&metadata,
0,
vec![2, 3],
vec![Some(1.0), Some(2.0)],
0,
);
write_rows_to_buffer(
&mut buffer,
&metadata,
0,
vec![1, 2],
vec![Some(1.1), Some(2.1)],
2,
);

let mut reader = buffer.read(Some(&[0])).unwrap();
let mut res = vec![];
while reader.is_valid() {
let batch = reader.current_data_batch();
res.push(extract_data_batch(&batch));
reader.next().unwrap();
}
if dedup {
assert_eq!(vec![(0, vec![(1, 2), (2, 3), (3, 1)])], res);
} else {
assert_eq!(vec![(0, vec![(1, 2), (2, 3), (2, 0), (3, 1)])], res);
}
}

#[test]
fn test_data_buffer_dedup() {
check_data_buffer_dedup(true);
check_data_buffer_dedup(false);
}

#[test]
fn test_data_buffer_to_record_batches_with_dedup() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);

write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1);
write_rows_to_buffer(&mut buffer, &meta, 1, vec![2], vec![Some(1.1)], 2);
Expand Down Expand Up @@ -1026,7 +1122,7 @@ mod tests {
#[test]
fn test_data_buffer_to_record_batches_without_dedup() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);

write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1);
write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2);
Expand Down Expand Up @@ -1064,35 +1160,13 @@ mod tests {
);
}

fn write_rows_to_buffer(
buffer: &mut DataBuffer,
schema: &RegionMetadataRef,
pk_index: u16,
ts: Vec<i64>,
v0: Vec<Option<f64>>,
sequence: u64,
) {
let kvs = build_key_values_with_ts_seq_values(
schema,
"whatever".to_string(),
1,
ts.into_iter(),
v0.into_iter(),
sequence,
);

for kv in kvs.iter() {
buffer.write_row(pk_index, kv);
}
}

fn check_data_buffer_freeze(
pk_weights: Option<&[u16]>,
replace_pk_weights: bool,
expected: &[(u16, Vec<(i64, u64)>)],
) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);

// write rows with null values.
write_rows_to_buffer(
Expand All @@ -1113,21 +1187,7 @@ mod tests {
.unwrap();
while reader.is_valid() {
let batch = reader.current_data_batch();
let rb = batch.slice_record_batch();
let ts = timestamp_array_to_i64_slice(rb.column(1));
let sequence = rb
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values();
let ts_and_seq = ts
.iter()
.zip(sequence.iter())
.map(|(ts, seq)| (*ts, *seq))
.collect::<Vec<_>>();
res.push((batch.pk_index(), ts_and_seq));

res.push(extract_data_batch(&batch));
reader.next().unwrap();
}
assert_eq!(expected, res);
Expand Down Expand Up @@ -1163,7 +1223,7 @@ mod tests {
#[test]
fn test_encode_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);

// write rows with null values.
write_rows_to_buffer(
Expand All @@ -1181,7 +1241,7 @@ mod tests {

assert_eq!(4, buffer.num_rows());

let encoder = DataPartEncoder::new(&meta, Some(&[0, 1, 2]), None, true);
let encoder = DataPartEncoder::new(&meta, Some(&[0, 1, 2]), None, true, true);
let encoded = match encoder.write(&mut buffer).unwrap() {
DataPart::Parquet(data) => data.data,
};
Expand Down Expand Up @@ -1228,7 +1288,7 @@ mod tests {

fn check_iter_data_buffer(pk_weights: Option<&[u16]>, expected: &[Vec<f64>]) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);

write_rows_to_buffer(
&mut buffer,
Expand Down Expand Up @@ -1268,7 +1328,7 @@ mod tests {
#[test]
fn test_iter_empty_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
let mut iter = buffer.read(Some(&[0, 1, 3, 2])).unwrap();
check_buffer_values_equal(&mut iter, &[]);
}
Expand All @@ -1294,7 +1354,7 @@ mod tests {

fn check_iter_data_part(weights: &[u16], expected_values: &[Vec<f64>]) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);

write_rows_to_buffer(
&mut buffer,
Expand Down Expand Up @@ -1323,7 +1383,7 @@ mod tests {
4,
);

let encoder = DataPartEncoder::new(&meta, Some(weights), Some(4), true);
let encoder = DataPartEncoder::new(&meta, Some(weights), Some(4), true, true);
let encoded = encoder.write(&mut buffer).unwrap();

let mut iter = encoded.read().unwrap();
Expand Down