Skip to content

Commit

Permalink
feat(mito): filters memtables by their time ranges (#2686)
Browse files Browse the repository at this point in the history
* feat: filter memtable by time range

* fix: incorrect time range returned by time series memtable

* test: test memtable pruning
  • Loading branch information
evenyag committed Nov 3, 2023
1 parent 0dca63b commit 395632c
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 3 deletions.
77 changes: 77 additions & 0 deletions src/mito2/src/engine/prune_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,80 @@ async fn test_prune_tag_and_field() {
)
.await;
}

/// Creates a time range `[start_sec, end_sec)`
fn time_range_expr(start_sec: i64, end_sec: i64) -> Expr {
Expr::from(
col("ts")
.gt_eq(lit(ScalarValue::TimestampMillisecond(
Some(start_sec * 1000),
None,
)))
.and(col("ts").lt(lit(ScalarValue::TimestampMillisecond(
Some(end_sec * 1000),
None,
)))),
)
}

#[tokio::test]
async fn test_prune_memtable() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);

engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

// 5 ~ 10 in SST
put_rows(
&engine,
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(5, 10),
},
)
.await;
flush_region(&engine, region_id, Some(5)).await;

// 20 ~ 30 in memtable
put_rows(
&engine,
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(20, 30),
},
)
.await;

let stream = engine
.handle_query(
region_id,
ScanRequest {
filters: vec![time_range_expr(0, 20)],
..Default::default()
},
)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
| 9 | 9.0 | 1970-01-01T00:00:09 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
12 changes: 11 additions & 1 deletion src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl Memtable for TimeSeriesMemtable {
let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
MemtableStats {
estimated_bytes,
time_range: Some((max_timestamp, min_timestamp)),
time_range: Some((min_timestamp, max_timestamp)),
}
}
}
Expand Down Expand Up @@ -1047,6 +1047,16 @@ mod tests {
.map(|v| v.unwrap().0.value())
.collect::<HashSet<_>>();
assert_eq!(expected_ts, read);

let stats = memtable.stats();
assert!(stats.bytes_allocated() > 0);
assert_eq!(
Some((
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(99)
)),
stats.time_range()
);
}

#[test]
Expand Down
16 changes: 14 additions & 2 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,22 @@ impl ScanRegion {
}

let memtables = self.version.memtables.list_memtables();
// Skip empty memtables.
// Skip empty memtables and memtables out of time range.
let memtables: Vec<_> = memtables
.into_iter()
.filter(|mem| !mem.is_empty())
.filter(|mem| {
if mem.is_empty() {
return false;
}
let stats = mem.stats();
let Some((start, end)) = stats.time_range() else {
return true;
};

// The time range of the memtable is inclusive.
let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
memtable_range.intersects(&time_range)
})
.collect();

debug!(
Expand Down

0 comments on commit 395632c

Please sign in to comment.