Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support append-only mode in time-series memtable #3540

Merged
merged 2 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/mito2/benches/memtable_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn write_rows(c: &mut Criterion) {
});
});
group.bench_function("time_series", |b| {
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None);
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true);
let kvs =
memtable_util::build_key_values(&metadata, "hello".to_string(), 42, &timestamps, 1);
b.iter(|| {
Expand Down Expand Up @@ -83,7 +83,7 @@ fn full_scan(c: &mut Criterion) {
});
});
group.bench_function("time_series", |b| {
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None);
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true);
for kvs in generator.iter() {
memtable.write(&kvs).unwrap();
}
Expand Down Expand Up @@ -121,7 +121,7 @@ fn filter_1_host(c: &mut Criterion) {
});
});
group.bench_function("time_series", |b| {
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None);
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true);
for kvs in generator.iter() {
memtable.write(&kvs).unwrap();
}
Expand Down
61 changes: 48 additions & 13 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ impl MemtableBuilder for TimeSeriesMemtableBuilder {
metadata.clone(),
id,
self.write_buffer_manager.clone(),
true, // todo(hl): set according to region option
))
}
}
Expand All @@ -84,13 +85,15 @@ pub struct TimeSeriesMemtable {
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
dedup: bool,
}

impl TimeSeriesMemtable {
pub fn new(
region_metadata: RegionMetadataRef,
id: MemtableId,
write_buffer_manager: Option<WriteBufferManagerRef>,
dedup: bool,
) -> Self {
let row_codec = Arc::new(McmpRowCodec::new(
region_metadata
Expand All @@ -107,6 +110,7 @@ impl TimeSeriesMemtable {
alloc_tracker: AllocTracker::new(write_buffer_manager),
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
dedup,
}
}

Expand Down Expand Up @@ -232,7 +236,7 @@ impl Memtable for TimeSeriesMemtable {
.collect()
};

let iter = self.series_set.iter_series(projection, filters);
let iter = self.series_set.iter_series(projection, filters, self.dedup);
Ok(Box::new(iter))
}

Expand Down Expand Up @@ -277,6 +281,7 @@ impl Memtable for TimeSeriesMemtable {
metadata.clone(),
id,
self.alloc_tracker.write_buffer_manager(),
self.dedup,
))
}
}
Expand Down Expand Up @@ -336,7 +341,12 @@ impl SeriesSet {
}

/// Iterates all series in [SeriesSet].
fn iter_series(&self, projection: HashSet<ColumnId>, predicate: Option<Predicate>) -> Iter {
fn iter_series(
&self,
projection: HashSet<ColumnId>,
predicate: Option<Predicate>,
dedup: bool,
) -> Iter {
let primary_key_schema = primary_key_schema(&self.region_metadata);
let primary_key_datatypes = self
.region_metadata
Expand All @@ -352,6 +362,7 @@ impl SeriesSet {
primary_key_schema,
primary_key_datatypes,
self.codec.clone(),
dedup,
)
}
}
Expand Down Expand Up @@ -398,10 +409,12 @@ struct Iter {
pk_schema: arrow::datatypes::SchemaRef,
pk_datatypes: Vec<ConcreteDataType>,
codec: Arc<McmpRowCodec>,
dedup: bool,
metrics: Metrics,
}

impl Iter {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
metadata: RegionMetadataRef,
series: Arc<SeriesRwLockMap>,
Expand All @@ -410,6 +423,7 @@ impl Iter {
pk_schema: arrow::datatypes::SchemaRef,
pk_datatypes: Vec<ConcreteDataType>,
codec: Arc<McmpRowCodec>,
dedup: bool,
) -> Self {
let simple_filters = predicate
.map(|p| {
Expand All @@ -428,6 +442,7 @@ impl Iter {
pk_schema,
pk_datatypes,
codec,
dedup,
metrics: Metrics::default(),
}
}
Expand Down Expand Up @@ -484,8 +499,9 @@ impl Iterator for Iter {
self.last_key = Some(primary_key.clone());

let values = series.compact(&self.metadata);
let batch =
values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection));
let batch = values.and_then(|v| {
v.to_batch(primary_key, &self.metadata, &self.projection, self.dedup)
});

// Update metrics.
self.metrics.num_batches += 1;
Expand Down Expand Up @@ -703,6 +719,7 @@ impl Values {
primary_key: &[u8],
metadata: &RegionMetadataRef,
projection: &HashSet<ColumnId>,
dedup: bool,
) -> Result<Batch> {
let builder = BatchBuilder::with_required_columns(
primary_key.to_vec(),
Expand All @@ -723,7 +740,7 @@ impl Values {
.collect();

let mut batch = builder.with_fields(fields).build()?;
batch.sort_and_dedup()?;
batch.sort(dedup)?;
Ok(batch)
}

Expand Down Expand Up @@ -796,7 +813,7 @@ impl From<ValueBuilder> for Values {

#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
Expand Down Expand Up @@ -983,7 +1000,12 @@ mod tests {
};

let batch = values
.to_batch(b"test", &schema, &[0, 1, 2, 3, 4].into_iter().collect())
.to_batch(
b"test",
&schema,
&[0, 1, 2, 3, 4].into_iter().collect(),
true,
)
.unwrap();
check_value(
&batch,
Expand Down Expand Up @@ -1147,18 +1169,29 @@ mod tests {
#[test]
fn test_memtable() {
common_telemetry::init_default_ut_logging();
check_memtable_dedup(true);
check_memtable_dedup(false);
}

fn check_memtable_dedup(dedup: bool) {
let schema = schema_for_test();
let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
let memtable = TimeSeriesMemtable::new(schema, 42, None);
let memtable = TimeSeriesMemtable::new(schema, 42, None, dedup);
memtable.write(&kvs).unwrap();
memtable.write(&kvs).unwrap();

let expected_ts = kvs
let mut expected_ts: HashMap<i64, usize> = HashMap::new();
for ts in kvs
.iter()
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<HashSet<_>>();
{
*expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
}

let iter = memtable.iter(None, None).unwrap();
let read = iter
let mut read = HashMap::new();

for ts in iter
.flat_map(|batch| {
batch
.unwrap()
Expand All @@ -1171,7 +1204,9 @@ mod tests {
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<HashSet<_>>();
{
*read.entry(ts).or_default() += 1;
}
assert_eq!(expected_ts, read);

let stats = memtable.stats();
Expand All @@ -1190,7 +1225,7 @@ mod tests {
common_telemetry::init_default_ut_logging();
let schema = schema_for_test();
let kvs = build_key_values(&schema, "hello".to_string(), 42, 100);
let memtable = TimeSeriesMemtable::new(schema, 42, None);
let memtable = TimeSeriesMemtable::new(schema, 42, None, true);
memtable.write(&kvs).unwrap();

let iter = memtable.iter(Some(&[3]), None).unwrap();
Expand Down
92 changes: 66 additions & 26 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,13 @@ impl Batch {
Ok(())
}

/// Sorts and dedup rows in the batch.
/// Sorts rows in the batch. If `dedup` is true, it also removes
/// duplicated rows according to primary keys.
///
/// It orders rows by timestamp, sequence desc and only keep the latest
/// row for the same timestamp. It doesn't consider op type as sequence
/// should already provide uniqueness for a row.
pub fn sort_and_dedup(&mut self) -> Result<()> {
pub fn sort(&mut self, dedup: bool) -> Result<()> {
// If building a converter each time is costly, we may allow passing a
// converter.
let converter = RowConverter::new(vec![
Expand All @@ -362,14 +363,16 @@ impl Batch {
let mut to_sort: Vec<_> = rows.iter().enumerate().collect();
to_sort.sort_unstable_by(|left, right| left.1.cmp(&right.1));

// Dedup by timestamps.
to_sort.dedup_by(|left, right| {
debug_assert_eq!(18, left.1.as_ref().len());
debug_assert_eq!(18, right.1.as_ref().len());
let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
// We only compare the timestamp part and ignore sequence.
left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
});
if dedup {
// Dedup by timestamps.
to_sort.dedup_by(|left, right| {
debug_assert_eq!(18, left.1.as_ref().len());
debug_assert_eq!(18, right.1.as_ref().len());
let (left_key, right_key) = (left.1.as_ref(), right.1.as_ref());
// We only compare the timestamp part and ignore sequence.
left_key[..TIMESTAMP_KEY_LEN] == right_key[..TIMESTAMP_KEY_LEN]
});
}

let indices = UInt32Vector::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
self.take_in_place(&indices)
Expand Down Expand Up @@ -983,7 +986,7 @@ mod tests {

#[test]
fn test_sort_and_dedup() {
let mut batch = new_batch(
let original = new_batch(
&[2, 3, 1, 4, 5, 2],
&[1, 2, 3, 4, 5, 6],
&[
Expand All @@ -996,30 +999,67 @@ mod tests {
],
&[21, 22, 23, 24, 25, 26],
);
batch.sort_and_dedup().unwrap();

let mut batch = original.clone();
batch.sort(true).unwrap();
// It should only keep one timestamp 2.
let expect = new_batch(
&[1, 2, 3, 4, 5],
&[3, 6, 2, 4, 5],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
],
&[23, 26, 22, 24, 25],
assert_eq!(
new_batch(
&[1, 2, 3, 4, 5],
&[3, 6, 2, 4, 5],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
],
&[23, 26, 22, 24, 25],
),
batch
);
assert_eq!(expect, batch);

let mut batch = new_batch(
let mut batch = original.clone();
batch.sort(false).unwrap();

// It should only keep one timestamp 2.
assert_eq!(
new_batch(
&[1, 2, 2, 3, 4, 5],
&[3, 6, 1, 2, 4, 5],
&[
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
OpType::Put,
],
&[23, 26, 21, 22, 24, 25],
),
batch
);

let original = new_batch(
&[2, 2, 1],
&[1, 6, 1],
&[OpType::Delete, OpType::Put, OpType::Put],
&[21, 22, 23],
);
batch.sort_and_dedup().unwrap();

let mut batch = original.clone();
batch.sort(true).unwrap();
let expect = new_batch(&[1, 2], &[1, 6], &[OpType::Put, OpType::Put], &[23, 22]);
assert_eq!(expect, batch);

let mut batch = original.clone();
batch.sort(false).unwrap();
let expect = new_batch(
&[1, 2, 2],
&[1, 6, 1],
&[OpType::Put, OpType::Put, OpType::Delete],
&[23, 22, 21],
);
assert_eq!(expect, batch);
}
}