Skip to content

Commit

Permalink
fix(compaction,2.3): typos; add field CompactReq::out_time_range, it …
Browse files Browse the repository at this point in the history
…inits in pickers; delta picker hasn't reset ColumnFile::compacting if delta-files-num is lesser than the trigger; TimeRanges::add_time_range() will join (t,t1) and (t1+1,t2)
  • Loading branch information
zipper-meng committed Mar 1, 2024
1 parent de675d9 commit 88f72d9
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 142 deletions.
171 changes: 169 additions & 2 deletions common/models/src/predicate/domain.rs
Expand Up @@ -236,9 +236,11 @@ impl TimeRanges {
self.max_ts = max(self.max_ts, time_range.max_ts);
let timestamps = self
.inner
.range(..=time_range.max_ts)
.range(..=time_range.max_ts.checked_add(1).unwrap_or(Timestamp::MAX))
.rev()
.take_while(|(_, &max)| max >= time_range.min_ts)
.take_while(|(_, &max)| {
max >= time_range.min_ts.checked_sub(1).unwrap_or(Timestamp::MIN)
})
.map(|(&min, _)| min)
.collect::<Vec<_>>();
let mut new_range = (time_range.min_ts, time_range.max_ts);
Expand Down Expand Up @@ -1803,6 +1805,171 @@ mod tests {
assert!(trs_all.is_boundless());
}

#[test]
fn test_time_ranges_push() {
{
let mut trs = TimeRanges::new(vec![TimeRange::new(2, 3)]);
assert!(trs.includes(&(2, 3).into()));
trs.push(TimeRange::new(4, 5));
assert!(trs.includes(&(2, 3).into()));
assert!(trs.includes(&(3, 4).into()));
assert!(trs.includes(&(4, 5).into()));
assert!(trs.includes(&(2, 5).into()));
assert!(!trs.includes(&(1, 2).into()));
assert!(!trs.includes(&(5, 6).into()));
assert_eq!(
trs.time_ranges().collect::<Vec<_>>(),
vec![TimeRange::new(2, 5)]
);

trs.push(TimeRange::new(2, 3));
assert!(trs.includes(&(2, 3).into()));
assert!(trs.includes(&(3, 4).into()));
assert!(trs.includes(&(4, 5).into()));
assert!(trs.includes(&(2, 5).into()));
assert!(!trs.includes(&(1, 2).into()));
assert!(!trs.includes(&(5, 6).into()));
assert_eq!(
trs.time_ranges().collect::<Vec<_>>(),
vec![TimeRange::new(2, 5)]
);

trs.push(TimeRange::new(4, 5));
assert!(trs.includes(&(2, 3).into()));
assert!(trs.includes(&(3, 4).into()));
assert!(trs.includes(&(4, 5).into()));
assert!(trs.includes(&(2, 5).into()));
assert!(!trs.includes(&(1, 2).into()));
assert!(!trs.includes(&(5, 6).into()));
assert_eq!(
trs.time_ranges().collect::<Vec<_>>(),
vec![TimeRange::new(2, 5)]
);

trs.push(TimeRange::new(5, 6));
assert!(trs.includes(&(2, 3).into()));
assert!(trs.includes(&(3, 4).into()));
assert!(trs.includes(&(4, 5).into()));
assert!(trs.includes(&(2, 5).into()));
assert!(!trs.includes(&(1, 2).into()));
assert!(trs.includes(&(4, 5).into()));
assert!(trs.includes(&(4, 6).into()));
assert!(trs.includes(&(5, 6).into()));
assert!(!trs.includes(&(7, 8).into()));
assert_eq!(
trs.time_ranges().collect::<Vec<_>>(),
vec![TimeRange::new(2, 6)]
);
}
{
let mut trs = TimeRanges::new(vec![TimeRange::new(2, 3)]);
assert!(trs.includes(&(2, 3).into()));
assert!(!trs.includes(&(1, 2).into()));
assert!(!trs.includes(&(1, 3).into()));
trs.push(TimeRange::new(1, 2));
assert!(trs.includes(&(1, 2).into()));
assert!(trs.includes(&(1, 3).into()));
assert!(trs.includes(&(2, 3).into()));
assert!(!trs.includes(&(3, 4).into()));
assert_eq!(
trs.time_ranges().collect::<Vec<_>>(),
vec![TimeRange::new(1, 3)]
);
}
{
let mut trs = TimeRanges::new(vec![TimeRange::new(2, 3)]);

trs.push(TimeRange::new(5, 6));
assert!(!trs.includes(&(1, 2).into()));
assert!(trs.includes(&(2, 3).into()));
assert!(!trs.includes(&(3, 4).into()));
assert!(!trs.includes(&(4, 5).into()));
assert!(!trs.includes(&(4, 6).into()));
assert!(trs.includes(&(5, 6).into()));
assert!(!trs.includes(&(2, 5).into()));
assert!(!trs.includes(&(2, 6).into()));
assert_eq!(
trs.time_ranges().collect::<Vec<_>>(),
vec![TimeRange::new(2, 3), TimeRange::new(5, 6)]
);

trs.push(TimeRange::new(4, 5));
assert!(trs.includes(&(2, 3).into()));
assert!(trs.includes(&(3, 4).into()));
assert!(trs.includes(&(4, 5).into()));
assert!(trs.includes(&(2, 5).into()));
assert!(!trs.includes(&(1, 2).into()));
assert!(trs.includes(&(4, 5).into()));
assert!(trs.includes(&(4, 6).into()));
assert!(trs.includes(&(5, 6).into()));
assert!(!trs.includes(&(7, 8).into()));
assert_eq!(
trs.time_ranges().collect::<Vec<_>>(),
vec![TimeRange::new(2, 6)]
);
}
{
let mut trs = TimeRanges::new(vec![TimeRange::new(2, 3)]);
trs.push(TimeRange::new(5, 6));
trs.push(TimeRange::new(1, 6));
assert!(!trs.includes(&(0, 1).into()));
assert!(trs.includes(&(1, 3).into()));
assert!(trs.includes(&(1, 6).into()));
assert!(!trs.includes(&(1, 7).into()));
assert!(trs.includes(&(5, 6).into()));
assert!(!trs.includes(&(7, 8).into()));
assert_eq!(
trs.time_ranges().collect::<Vec<_>>(),
vec![TimeRange::new(1, 6)]
);
}
{
let mut trs = TimeRanges::new(vec![TimeRange::new(2, 3)]);

trs.push(TimeRange::new(2, 3));
assert!(!trs.includes(&(1, 2).into()));
assert!(trs.includes(&(2, 3).into()));
assert!(!trs.includes(&(3, 4).into()));
assert!(!trs.includes(&(4, 5).into()));
assert!(!trs.includes(&(2, 4).into()));
assert_eq!(
trs.time_ranges().collect::<Vec<_>>(),
vec![TimeRange::new(2, 3)]
);

trs.push(TimeRange::new(1, 3));
assert!(!trs.includes(&(0, 1).into()));
assert!(!trs.includes(&(0, 2).into()));
assert!(trs.includes(&(1, 2).into()));
assert!(trs.includes(&(1, 3).into()));
assert!(!trs.includes(&(1, 4).into()));
assert!(trs.includes(&(2, 3).into()));
assert!(!trs.includes(&(2, 4).into()));
assert_eq!(
trs.time_ranges().collect::<Vec<_>>(),
vec![TimeRange::new(1, 3)]
);
}
{
let mut trs = TimeRanges::new(vec![TimeRange::new(2, 3)]);

trs.push(TimeRange::new(1, 4));
assert!(!trs.includes(&(0, 1).into()));
assert!(!trs.includes(&(0, 2).into()));
assert!(trs.includes(&(1, 2).into()));
assert!(trs.includes(&(1, 3).into()));
assert!(trs.includes(&(1, 4).into()));
assert!(!trs.includes(&(1, 5).into()));
assert!(trs.includes(&(2, 3).into()));
assert!(trs.includes(&(2, 4).into()));
assert!(!trs.includes(&(2, 5).into()));
assert_eq!(
trs.time_ranges().collect::<Vec<_>>(),
vec![TimeRange::new(1, 4)]
);
}
}

#[test]
fn test_time_range_sort() {
let mut time_range_vec: Vec<TimeRange> = vec![
Expand Down
22 changes: 13 additions & 9 deletions tskv/src/compaction/compact.rs
Expand Up @@ -1202,17 +1202,20 @@ pub mod test {
}
}

pub fn create_options(base_dir: String) -> Arc<Options> {
pub fn create_options(base_dir: String, always_compact: bool) -> Arc<Options> {
let mut config = config::get_config_for_test();
config.storage.path = base_dir.clone();
if always_compact {
config.storage.compact_trigger_file_num = 1;
}
config.log.path = base_dir;
Arc::new(Options::from(&config))
}

#[test]
fn test_create_options() {
let dir = "/tmp/test/compaction/test_create_options";
let opt = create_options(dir.to_string());
let opt = create_options(dir.to_string(), false);
assert_eq!(opt.storage.path.to_string_lossy(), dir);
}

Expand All @@ -1239,6 +1242,7 @@ pub mod test {
files,
in_level: 1,
out_level: 2,
out_time_range: TimeRange::all(),
};
let context = Arc::new(GlobalContext::new());
context.set_file_id(next_file_id);
Expand Down Expand Up @@ -1288,7 +1292,7 @@ pub mod test {
let dir = "/tmp/test/compaction/0";
let _ = std::fs::remove_dir_all(dir);
let tenant_database = Arc::new("cnosdb.dba".to_string());
let opt = create_options(dir.to_string());
let opt = create_options(dir.to_string(), true);
let dir = opt.storage.tsm_dir(&tenant_database, 1);
let max_level_ts = 9;

Expand Down Expand Up @@ -1336,7 +1340,7 @@ pub mod test {
let dir = "/tmp/test/compaction/1";
let _ = std::fs::remove_dir_all(dir);
let tenant_database = Arc::new("cnosdb.dba".to_string());
let opt = create_options(dir.to_string());
let opt = create_options(dir.to_string(), true);
let dir = opt.storage.tsm_dir(&tenant_database, 1);
let max_level_ts = 9;

Expand Down Expand Up @@ -1383,7 +1387,7 @@ pub mod test {
let dir = "/tmp/test/compaction/2";
let _ = std::fs::remove_dir_all(dir);
let tenant_database = Arc::new("cnosdb.dba".to_string());
let opt = create_options(dir.to_string());
let opt = create_options(dir.to_string(), true);
let dir = opt.storage.tsm_dir(&tenant_database, 1);
let max_level_ts = 9;

Expand Down Expand Up @@ -1424,7 +1428,7 @@ pub mod test {
let dir = "/tmp/test/compaction/3";
let _ = std::fs::remove_dir_all(dir);
let tenant_database = Arc::new("cnosdb.dba".to_string());
let opt = create_options(dir.to_string());
let opt = create_options(dir.to_string(), true);
let dir = opt.storage.tsm_dir(&tenant_database, 1);
let max_level_ts = 9;

Expand Down Expand Up @@ -1676,7 +1680,7 @@ pub mod test {
let dir = "/tmp/test/compaction/big_1";
let _ = std::fs::remove_dir_all(dir);
let tenant_database = Arc::new("cnosdb.dba".to_string());
let opt = create_options(dir.to_string());
let opt = create_options(dir.to_string(), true);
let dir = opt.storage.tsm_dir(&tenant_database, 1);
if !file_manager::try_exists(&dir) {
std::fs::create_dir_all(&dir).unwrap();
Expand Down Expand Up @@ -1744,7 +1748,7 @@ pub mod test {
let dir = "/tmp/test/compaction/big_2";
let _ = std::fs::remove_dir_all(dir);
let tenant_database = Arc::new("cnosdb.dba".to_string());
let opt = create_options(dir.to_string());
let opt = create_options(dir.to_string(), true);
let dir = opt.storage.tsm_dir(&tenant_database, 1);
if !file_manager::try_exists(&dir) {
std::fs::create_dir_all(&dir).unwrap();
Expand Down Expand Up @@ -1857,7 +1861,7 @@ pub mod test {
let dir = "/tmp/test/compaction/big_3";
let _ = std::fs::remove_dir_all(dir);
let tenant_database = Arc::new("cnosdb.dba".to_string());
let opt = create_options(dir.to_string());
let opt = create_options(dir.to_string(), true);
let dir = opt.storage.tsm_dir(&tenant_database, 1);
if !file_manager::try_exists(&dir) {
std::fs::create_dir_all(&dir).unwrap();
Expand Down

0 comments on commit 88f72d9

Please sign in to comment.