Skip to content

Commit

Permalink
feat(compaction,2.3): add delta compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
zipper-meng committed Jan 4, 2024
1 parent 6765f19 commit e189a94
Show file tree
Hide file tree
Showing 19 changed files with 3,639 additions and 973 deletions.
12 changes: 9 additions & 3 deletions common/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ pub static COMPACTION_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
.namespace(NAMESPACE)
.subsystem(TSKV_SUBSYSTEM)
.buckets(linear_buckets(0.0, 300.0, 2400).unwrap()),
&["db", "ts_family", "level"],
&["db", "ts_family", "in_level", "out_level"],
)
.expect("tskv metric cannot be created")
});
Expand All @@ -231,9 +231,15 @@ pub fn incr_compaction_failed() {
COMPACTION_FAILED.inc();
}

pub fn sample_tskv_compaction_duration(db: &str, ts_family: &str, level: &str, delta: f64) {
pub fn sample_tskv_compaction_duration(
db: &str,
ts_family: &str,
in_level: &str,
out_level: &str,
delta: f64,
) {
COMPACTION_DURATION
.with_label_values(&[db, ts_family, level])
.with_label_values(&[db, ts_family, in_level, out_level])
.observe(delta)
}

Expand Down
120 changes: 120 additions & 0 deletions common/models/src/predicate/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,41 @@ impl TimeRange {
self.min_ts = self.min_ts.min(other.min_ts);
self.max_ts = self.max_ts.max(other.max_ts);
}

pub fn compact(time_ranges: &mut Vec<TimeRange>) {
if time_ranges.is_empty() {
return;
}

// Start compact with sorted ascending time ranges
time_ranges.sort();

fn can_compact(a: &TimeRange, b: &TimeRange) -> bool {
(a.min_ts <= b.max_ts && a.max_ts >= b.min_ts)
|| match a.max_ts.checked_add(1) {
Some(t) => t == b.min_ts,
None => false,
}
|| match b.max_ts.checked_add(1) {
Some(t) => t == a.min_ts,
None => false,
}
}

let mut i = 0;
while i < time_ranges.len() - 1 {
if can_compact(&time_ranges[i], &time_ranges[i + 1]) {
let mut j = i + 1;
while j < time_ranges.len() && can_compact(&time_ranges[i], &time_ranges[j]) {
time_ranges[i].max_ts = time_ranges[i].max_ts.max(time_ranges[j].max_ts);
j += 1;
}
time_ranges.drain(i + 1..j);
} else {
i += 1;
}
}
}
}

impl From<(Timestamp, Timestamp)> for TimeRange {
Expand Down Expand Up @@ -1533,6 +1568,7 @@ mod tests {
assert!(tr.overlaps(&TimeRange::new(4, 7)));
assert!(tr.overlaps(&TimeRange::new(1, 5)));
assert!(tr.overlaps(&TimeRange::new(2, 4)));
assert!(tr.overlaps(&TimeRange::new(5, 6)));
assert!(!tr.overlaps(&TimeRange::new(-3, -1)));
assert!(!tr.overlaps(&TimeRange::new(-1, 0)));
assert!(!tr.overlaps(&TimeRange::new(6, 7)));
Expand Down Expand Up @@ -1692,6 +1728,90 @@ mod tests {
);
}

#[test]
fn test_time_range_sort() {
let mut time_range_vec: Vec<TimeRange> = vec![
(1, 1).into(),
(1, 2).into(),
(1, 0).into(),
(3, 4).into(),
(2, 3).into(),
(2, 1).into(),
];
time_range_vec.sort();
assert_eq!(
time_range_vec,
vec![
(1, 0).into(),
(1, 1).into(),
(1, 2).into(),
(2, 1).into(),
(2, 3).into(),
(3, 4).into(),
]
);
}

#[test]
fn test_time_range_compact() {
{
let mut time_range_vec: Vec<TimeRange> = vec![];
TimeRange::compact(&mut time_range_vec);
assert_eq!(time_range_vec, vec![]);
}
{
let mut time_range_vec: Vec<TimeRange> = vec![(1, 2).into()];
TimeRange::compact(&mut time_range_vec);
assert_eq!(time_range_vec, vec![(1, 2).into()]);
}
{
let mut time_range_vec: Vec<TimeRange> =
vec![(Timestamp::MIN, 999).into(), (1_000, Timestamp::MAX).into()];
TimeRange::compact(&mut time_range_vec);
assert_eq!(
time_range_vec,
vec![(Timestamp::MIN, Timestamp::MAX).into()]
);
}
{
let mut time_range_vec: Vec<TimeRange> =
vec![(Timestamp::MIN, Timestamp::MAX).into(), (999, 1_000).into()];
TimeRange::compact(&mut time_range_vec);
assert_eq!(
time_range_vec,
vec![(Timestamp::MIN, Timestamp::MAX).into()]
);
}
{
let mut time_range_vec: Vec<TimeRange> =
vec![(Timestamp::MIN, 998).into(), (1_000, Timestamp::MAX).into()];
TimeRange::compact(&mut time_range_vec);
assert_eq!(
time_range_vec,
vec![(Timestamp::MIN, 998).into(), (1_000, Timestamp::MAX).into()]
);
}
{
let mut time_range_vec: Vec<TimeRange> =
vec![(1, 2).into(), (3, 4).into(), (9, 10).into()];
TimeRange::compact(&mut time_range_vec);
assert_eq!(time_range_vec, vec![(1, 4).into(), (9, 10).into()]);
}
{
let mut time_range_vec: Vec<TimeRange> = vec![
(5, 5).into(),
(1, 10).into(),
(0, 5).into(),
(3, 6).into(),
(5, 9).into(),
(20, 30).into(),
(21, 29).into(),
];
TimeRange::compact(&mut time_range_vec);
assert_eq!(time_range_vec, vec![(0, 10).into(), (20, 30).into()]);
}
}

#[test]
fn test_of_ranges() {
let f1 = Range::lt(&DataType::Float64, &ScalarValue::Float64(Some(-1000000.1)));
Expand Down
Loading

0 comments on commit e189a94

Please sign in to comment.