Skip to content

Commit

Permalink
improve: add time_range exclude method
Browse files Browse the repository at this point in the history
* add data block exclude time range method
  • Loading branch information
ZuoTiJia committed Oct 13, 2023
1 parent abdbcb1 commit dce5157
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 1 deletion.
90 changes: 89 additions & 1 deletion common/models/src/predicate/domain.rs
@@ -1,4 +1,4 @@
use std::cmp::{self, Ordering};
use std::cmp::{self, max, min, Ordering};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Display;
use std::hash::Hash;
Expand Down Expand Up @@ -54,6 +54,7 @@ impl From<(StdBound<i64>, StdBound<i64>)> for TimeRange {
}

impl TimeRange {
/// notice: when min_ts > max_ts, time_range is considered [`Self::none()`]
pub fn new(min_ts: i64, max_ts: i64) -> Self {
Self { min_ts, max_ts }
}
Expand Down Expand Up @@ -116,6 +117,34 @@ impl TimeRange {
self.min_ts = self.min_ts.min(other.min_ts);
self.max_ts = self.max_ts.max(other.max_ts);
}

pub fn exclude(&self, other: &TimeRange) -> (Option<TimeRange>, Option<TimeRange>) {
// other
// |__________|
// self
// |____________________|
let left = if self.min_ts < other.min_ts {
Some(TimeRange::new(
self.min_ts,
min(self.max_ts, other.min_ts - 1),
))
} else {
None
};
// other
// |___________|
// self
// |________________|
let right = if self.max_ts > other.max_ts {
Some(TimeRange::new(
max(self.min_ts, other.max_ts + 1),
self.max_ts,
))
} else {
None
};
(left, right)
}
}

impl From<(Timestamp, Timestamp)> for TimeRange {
Expand Down Expand Up @@ -324,6 +353,27 @@ impl TimeRanges {
pub fn max_time_range(&self) -> TimeRange {
TimeRange::new(self.min_ts, self.max_ts)
}

pub fn exclude(&self, time_range: &TimeRange) -> TimeRanges {
let trs = self
.inner
.iter()
.flat_map(|tr| {
let tr_tuple = tr.exclude(time_range);
[tr_tuple.0, tr_tuple.1]
})
.collect::<Vec<Option<TimeRange>>>()
.into_iter()
.flatten()
.collect::<Vec<TimeRange>>();
TimeRanges::new(trs)
}

pub fn exclude_time_ranges(&self, other: &[&TimeRange]) -> TimeRanges {
let mut res = self.clone();
other.iter().for_each(|o| res = res.exclude(o));
res
}
}

impl AsRef<[TimeRange]> for TimeRanges {
Expand Down Expand Up @@ -1710,6 +1760,44 @@ mod tests {
);
}

#[test]
fn test_time_range_exclude() {
let tr = TimeRange::new(1, 5);
let exclude = tr.exclude(&TimeRange::new(2, 6));
assert_eq!(exclude.0.unwrap(), TimeRange::new(1, 1));

let tr = TimeRange::new(5, 10);
let exclude = tr.exclude(&TimeRange::new(1, 7));
assert_eq!(exclude.1.unwrap(), TimeRange::new(8, 10));

let tr = TimeRange::new(3, 4);
let exclude = tr.exclude(&TimeRange::new(1, 5));
assert!(exclude.0.is_none());
assert!(exclude.1.is_none());

let tr = TimeRange::new(1, 5);
let exclude = tr.exclude(&TimeRange::new(3, 4));
assert_eq!(exclude.0.unwrap(), TimeRange::new(1, 2));
assert_eq!(exclude.1.unwrap(), TimeRange::new(5, 5));
}

#[test]
fn test_time_ranges_exclude() {
let trs = vec![
TimeRange::new(1, 2),
TimeRange::new(4, 5),
TimeRange::new(7, 9),
];
let trs = TimeRanges::new(trs);
let exclude_ranges = trs.exclude(&TimeRange::new(3, 8));
let expected = TimeRanges::new(vec![(1, 2).into(), (9, 9).into()]);
assert_eq!(expected, exclude_ranges);

let exclude_ranges =
trs.exclude_time_ranges(&[&TimeRange::new(3, 4), &TimeRange::new(5, 8)]);
assert_eq!(expected, exclude_ranges);
}

#[test]
fn test_of_ranges() {
let f1 = Range::lt(&DataType::Float64, &ScalarValue::Float64(Some(-1000000.1)));
Expand Down
137 changes: 137 additions & 0 deletions tskv/src/tsm/block.rs
Expand Up @@ -550,6 +550,89 @@ impl DataBlock {
}
}

pub fn exclude_time_ranges(&self, exclude_time_ranges: &[&TimeRange]) -> Self {
let time_ranges = match self.time_range() {
Some((min, max)) => TimeRanges::new(vec![TimeRange::new(min, max)]),
None => return self.clone(),
};

let indexs = time_ranges
.exclude_time_ranges(exclude_time_ranges)
.time_ranges()
.iter()
.filter_map(|tr| self.index_range(tr))
.collect::<Vec<_>>();

match self {
DataBlock::U64 { ts, val, enc } => {
let mut new_ts = vec![];
let mut new_val = vec![];
indexs.into_iter().for_each(|(min, max)| {
new_ts.extend_from_slice(&ts[min..=max]);
new_val.extend_from_slice(&val[min..=max])
});
DataBlock::U64 {
ts: new_ts,
val: new_val,
enc: *enc,
}
}
DataBlock::I64 { ts, val, enc } => {
let mut new_ts = vec![];
let mut new_val = vec![];
indexs.into_iter().for_each(|(min, max)| {
new_ts.extend_from_slice(&ts[min..=max]);
new_val.extend_from_slice(&val[min..=max])
});
DataBlock::I64 {
ts: new_ts,
val: new_val,
enc: *enc,
}
}
DataBlock::Str { ts, val, enc } => {
let mut new_ts = vec![];
let mut new_val = vec![];
indexs.into_iter().for_each(|(min, max)| {
new_ts.extend_from_slice(&ts[min..=max]);
new_val.extend_from_slice(&val[min..=max])
});
DataBlock::Str {
ts: new_ts,
val: new_val,
enc: *enc,
}
}
DataBlock::F64 { ts, val, enc } => {
let mut new_ts = vec![];
let mut new_val = vec![];
indexs.into_iter().for_each(|(min, max)| {
new_ts.extend_from_slice(&ts[min..=max]);
new_val.extend_from_slice(&val[min..=max])
});
DataBlock::F64 {
ts: new_ts,
val: new_val,
enc: *enc,
}
}
DataBlock::Bool { ts, val, enc } => {
let mut new_ts = vec![];
let mut new_val = vec![];
indexs.into_iter().for_each(|(min, max)| {
new_ts.extend_from_slice(&ts[min..=max]);
new_val.extend_from_slice(&val[min..=max])
});
DataBlock::Bool {
ts: new_ts,
val: new_val,
enc: *enc,
}
}
}
}

// left close, right open [min, max]
pub fn index_range(&self, time_range: &TimeRange) -> Option<(usize, usize)> {
if self.is_empty() {
return None;
Expand Down Expand Up @@ -1188,4 +1271,58 @@ pub mod test {
}
);
}

#[test]
fn test_data_block_exclude_3() {
let mut blk = DataBlock::U64 {
ts: vec![0, 1, 2, 3],
val: vec![10, 11, 12, 13],
enc: DataBlockEncoding::default(),
};
let exclude_time_range: TimeRange = (-2, 0).into();
let new_blk = blk.exclude_time_ranges(&[&exclude_time_range]);
blk.exclude(&exclude_time_range);

assert_eq!(blk, new_blk);

let mut blk = DataBlock::U64 {
ts: vec![0, 1, 2, 3],
val: vec![10, 11, 12, 13],
enc: DataBlockEncoding::default(),
};
let exclude_time_range: TimeRange = (3, 5).into();
let new_blk = blk.exclude_time_ranges(&[&exclude_time_range]);
blk.exclude(&exclude_time_range);
assert_eq!(blk, new_blk);

let mut blk = DataBlock::U64 {
ts: vec![0, 1, 2, 3],
val: vec![10, 11, 12, 13],
enc: DataBlockEncoding::default(),
};
let exclude_time_range: TimeRange = (-3, -1).into();
let new_blk = blk.exclude_time_ranges(&[&exclude_time_range]);
blk.exclude(&exclude_time_range);
assert_eq!(blk, new_blk,);

let mut blk = DataBlock::U64 {
ts: vec![0, 1, 2, 3],
val: vec![10, 11, 12, 13],
enc: DataBlockEncoding::default(),
};
let exclude_time_range: TimeRange = (5, 7).into();
let new_blk = blk.exclude_time_ranges(&[&exclude_time_range]);
blk.exclude(&exclude_time_range);
assert_eq!(blk, new_blk);

let mut blk = DataBlock::U64 {
ts: vec![0, 1, 2, 3, 7, 8, 9, 10],
val: vec![10, 11, 12, 13, 17, 18, 19, 20],
enc: DataBlockEncoding::default(),
};
let exclude_time_range: TimeRange = (5, 6).into();
let new_blk = blk.exclude_time_ranges(&[&exclude_time_range]);
blk.exclude(&exclude_time_range);
assert_eq!(blk, new_blk);
}
}
19 changes: 19 additions & 0 deletions tskv/src/tsm/tombstone.rs
Expand Up @@ -202,6 +202,25 @@ impl TsmTombstone {
}
}
}

// if no exclude, return None
pub fn data_block_exclude_tombstones_new(
&self,
field_id: FieldId,
data_block: &DataBlock,
) -> Option<DataBlock> {
let block_tr: TimeRange = data_block.time_range()?.into();
let tombstone = self
.tombstones
.get(&field_id)?
.iter()
.filter(|tr| tr.overlaps(&block_tr))
.collect::<Vec<_>>();
if tombstone.is_empty() {
return None;
}
Some(data_block.exclude_time_ranges(tombstone.as_slice()))
}
}

#[cfg(test)]
Expand Down

0 comments on commit dce5157

Please sign in to comment.