From 09dc7935e29c0deb0c8d3a6417e2e3084d7e9316 Mon Sep 17 00:00:00 2001 From: ZuoTiJia Date: Fri, 22 Sep 2023 10:25:12 +0800 Subject: [PATCH] improve: add time_range exclude method * add data block exclude time range method --- common/models/src/predicate/domain.rs | 90 ++++++++++++++++- tskv/src/tsm/block.rs | 137 ++++++++++++++++++++++++++ tskv/src/tsm/tombstone.rs | 19 ++++ 3 files changed, 245 insertions(+), 1 deletion(-) diff --git a/common/models/src/predicate/domain.rs b/common/models/src/predicate/domain.rs index 197c856d0a..6f13fe1702 100644 --- a/common/models/src/predicate/domain.rs +++ b/common/models/src/predicate/domain.rs @@ -1,4 +1,4 @@ -use std::cmp::{self, Ordering}; +use std::cmp::{self, max, min, Ordering}; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Display; use std::hash::Hash; @@ -54,6 +54,7 @@ impl From<(StdBound, StdBound)> for TimeRange { } impl TimeRange { + /// notice: when min_ts > max_ts, time_range is considered [`Self::none()`] pub fn new(min_ts: i64, max_ts: i64) -> Self { Self { min_ts, max_ts } } @@ -116,6 +117,34 @@ impl TimeRange { self.min_ts = self.min_ts.min(other.min_ts); self.max_ts = self.max_ts.max(other.max_ts); } + + pub fn exclude(&self, other: &TimeRange) -> (Option, Option) { + // other + // |__________| + // self + // |____________________| + let left = if self.min_ts < other.min_ts { + Some(TimeRange::new( + self.min_ts, + min(self.max_ts, other.min_ts - 1), + )) + } else { + None + }; + // other + // |___________| + // self + // |________________| + let right = if self.max_ts > other.max_ts { + Some(TimeRange::new( + max(self.min_ts, other.max_ts + 1), + self.max_ts, + )) + } else { + None + }; + (left, right) + } } impl From<(Timestamp, Timestamp)> for TimeRange { @@ -324,6 +353,27 @@ impl TimeRanges { pub fn max_time_range(&self) -> TimeRange { TimeRange::new(self.min_ts, self.max_ts) } + + pub fn exclude(&self, time_range: &TimeRange) -> TimeRanges { + let trs = self + .inner + .iter() + .flat_map(|tr| { + let tr_tuple = tr.exclude(time_range); + [tr_tuple.0, tr_tuple.1] + }) + .collect::>>() + .into_iter() + .flatten() + .collect::>(); + TimeRanges::new(trs) + } + + pub fn exclude_time_ranges(&self, other: &[&TimeRange]) -> TimeRanges { + let mut res = self.clone(); + other.iter().for_each(|o| res = res.exclude(o)); + res + } } impl AsRef<[TimeRange]> for TimeRanges { @@ -1710,6 +1760,44 @@ mod tests { ); } + #[test] + fn test_time_range_exclude() { + let tr = TimeRange::new(1, 5); + let exclude = tr.exclude(&TimeRange::new(2, 6)); + assert_eq!(exclude.0.unwrap(), TimeRange::new(1, 1)); + + let tr = TimeRange::new(5, 10); + let exclude = tr.exclude(&TimeRange::new(1, 7)); + assert_eq!(exclude.1.unwrap(), TimeRange::new(8, 10)); + + let tr = TimeRange::new(3, 4); + let exclude = tr.exclude(&TimeRange::new(1, 5)); + assert!(exclude.0.is_none()); + assert!(exclude.1.is_none()); + + let tr = TimeRange::new(1, 5); + let exclude = tr.exclude(&TimeRange::new(3, 4)); + assert_eq!(exclude.0.unwrap(), TimeRange::new(1, 2)); + assert_eq!(exclude.1.unwrap(), TimeRange::new(5, 5)); + } + + #[test] + fn test_time_ranges_exclude() { + let trs = vec![ + TimeRange::new(1, 2), + TimeRange::new(4, 5), + TimeRange::new(7, 9), + ]; + let trs = TimeRanges::new(trs); + let exclude_ranges = trs.exclude(&TimeRange::new(3, 8)); + let expected = TimeRanges::new(vec![(1, 2).into(), (9, 9).into()]); + assert_eq!(expected, exclude_ranges); + + let exclude_ranges = + trs.exclude_time_ranges(&[&TimeRange::new(3, 4), &TimeRange::new(5, 8)]); + assert_eq!(expected, exclude_ranges); + } + #[test] fn test_of_ranges() { let f1 = Range::lt(&DataType::Float64, &ScalarValue::Float64(Some(-1000000.1))); diff --git a/tskv/src/tsm/block.rs b/tskv/src/tsm/block.rs index 9a9734ba7d..afeb0bbcdf 100644 --- a/tskv/src/tsm/block.rs +++ b/tskv/src/tsm/block.rs @@ -544,6 +544,89 @@ impl DataBlock { } } + pub fn exclude_time_ranges(&self, exclude_time_ranges: &[&TimeRange]) -> Self { + let time_ranges = match self.time_range() { + Some((min, max)) => TimeRanges::new(vec![TimeRange::new(min, max)]), + None => return self.clone(), + }; + + let indexs = time_ranges + .exclude_time_ranges(exclude_time_ranges) + .time_ranges() + .iter() + .filter_map(|tr| self.index_range(tr)) + .collect::>(); + + match self { + DataBlock::U64 { ts, val, enc } => { + let mut new_ts = vec![]; + let mut new_val = vec![]; + indexs.into_iter().for_each(|(min, max)| { + new_ts.extend_from_slice(&ts[min..=max]); + new_val.extend_from_slice(&val[min..=max]) + }); + DataBlock::U64 { + ts: new_ts, + val: new_val, + enc: *enc, + } + } + DataBlock::I64 { ts, val, enc } => { + let mut new_ts = vec![]; + let mut new_val = vec![]; + indexs.into_iter().for_each(|(min, max)| { + new_ts.extend_from_slice(&ts[min..=max]); + new_val.extend_from_slice(&val[min..=max]) + }); + DataBlock::I64 { + ts: new_ts, + val: new_val, + enc: *enc, + } + } + DataBlock::Str { ts, val, enc } => { + let mut new_ts = vec![]; + let mut new_val = vec![]; + indexs.into_iter().for_each(|(min, max)| { + new_ts.extend_from_slice(&ts[min..=max]); + new_val.extend_from_slice(&val[min..=max]) + }); + DataBlock::Str { + ts: new_ts, + val: new_val, + enc: *enc, + } + } + DataBlock::F64 { ts, val, enc } => { + let mut new_ts = vec![]; + let mut new_val = vec![]; + indexs.into_iter().for_each(|(min, max)| { + new_ts.extend_from_slice(&ts[min..=max]); + new_val.extend_from_slice(&val[min..=max]) + }); + DataBlock::F64 { + ts: new_ts, + val: new_val, + enc: *enc, + } + } + DataBlock::Bool { ts, val, enc } => { + let mut new_ts = vec![]; + let mut new_val = vec![]; + indexs.into_iter().for_each(|(min, max)| { + new_ts.extend_from_slice(&ts[min..=max]); + new_val.extend_from_slice(&val[min..=max]) + }); + DataBlock::Bool { + ts: new_ts, + val: new_val, + enc: *enc, + } + } + } + } + + // left close, right open [min, max] pub fn index_range(&self, time_range: &TimeRange) -> Option<(usize, usize)> { if self.is_empty() { return None; @@ -1182,4 +1265,58 @@ pub mod test { } ); } + + #[test] + fn test_data_block_exclude_3() { + let mut blk = DataBlock::U64 { + ts: vec![0, 1, 2, 3], + val: vec![10, 11, 12, 13], + enc: DataBlockEncoding::default(), + }; + let exclude_time_range: TimeRange = (-2, 0).into(); + let new_blk = blk.exclude_time_ranges(&[&exclude_time_range]); + blk.exclude(&exclude_time_range); + + assert_eq!(blk, new_blk); + + let mut blk = DataBlock::U64 { + ts: vec![0, 1, 2, 3], + val: vec![10, 11, 12, 13], + enc: DataBlockEncoding::default(), + }; + let exclude_time_range: TimeRange = (3, 5).into(); + let new_blk = blk.exclude_time_ranges(&[&exclude_time_range]); + blk.exclude(&exclude_time_range); + assert_eq!(blk, new_blk); + + let mut blk = DataBlock::U64 { + ts: vec![0, 1, 2, 3], + val: vec![10, 11, 12, 13], + enc: DataBlockEncoding::default(), + }; + let exclude_time_range: TimeRange = (-3, -1).into(); + let new_blk = blk.exclude_time_ranges(&[&exclude_time_range]); + blk.exclude(&exclude_time_range); + assert_eq!(blk, new_blk,); + + let mut blk = DataBlock::U64 { + ts: vec![0, 1, 2, 3], + val: vec![10, 11, 12, 13], + enc: DataBlockEncoding::default(), + }; + let exclude_time_range: TimeRange = (5, 7).into(); + let new_blk = blk.exclude_time_ranges(&[&exclude_time_range]); + blk.exclude(&exclude_time_range); + assert_eq!(blk, new_blk); + + let mut blk = DataBlock::U64 { + ts: vec![0, 1, 2, 3, 7, 8, 9, 10], + val: vec![10, 11, 12, 13, 17, 18, 19, 20], + enc: DataBlockEncoding::default(), + }; + let exclude_time_range: TimeRange = (5, 6).into(); + let new_blk = blk.exclude_time_ranges(&[&exclude_time_range]); + blk.exclude(&exclude_time_range); + assert_eq!(blk, new_blk); + } } diff --git a/tskv/src/tsm/tombstone.rs b/tskv/src/tsm/tombstone.rs index 119b7b97dc..ad6fdb39cf 100644 --- a/tskv/src/tsm/tombstone.rs +++ b/tskv/src/tsm/tombstone.rs @@ -202,6 +202,25 @@ impl TsmTombstone { } } } + + // if no exclude, return None + pub fn data_block_exclude_tombstones_new( + &self, + field_id: FieldId, + data_block: &DataBlock, + ) -> Option { + let block_tr: TimeRange = data_block.time_range()?.into(); + let tombstone = self + .tombstones + .get(&field_id)? + .iter() + .filter(|tr| tr.overlaps(&block_tr)) + .collect::>(); + if tombstone.is_empty() { + return None; + } + Some(data_block.exclude_time_ranges(tombstone.as_slice())) + } } #[cfg(test)]