Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor(2.3): extract function update_max_ts_of_levels #2004

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
111 changes: 98 additions & 13 deletions tskv/src/tseries_family.rs
Expand Up @@ -546,18 +546,7 @@ impl Version {
}
}

// Find the last level that has files and update the time_range of all levels.
let mut next_level_max_ts = Option::<Timestamp>::None;
for i in (1..=4).rev() {
let lv = &mut new_levels[i];
if lv.files.is_empty() {
if let Some(ts) = next_level_max_ts {
lv.time_range = TimeRange::new(ts + 1, ts + 1);
}
} else {
next_level_max_ts = Some(lv.time_range.max_ts);
}
}
Self::update_max_ts_of_levels(&mut new_levels);

let mut new_version = Self {
ts_family_id: self.ts_family_id,
Expand Down Expand Up @@ -589,6 +578,24 @@ impl Version {
self.max_level_ts = max_ts;
}

/// Find the last level that has files, then update the time_range of previous levels that have no files.
fn update_max_ts_of_levels(levels: &mut [LevelInfo; 5]) {
// TODO(zipper): How to handle the case that the latest level has the time_range(-∞, +∞)?
// For example: level-1: (none), level-2: (none), level-3: (none), level-4: (-∞, +∞).
let mut next_level_max_ts = Option::<Timestamp>::None;
for i in (1..=4).rev() {
let lv = &mut levels[i];
if lv.files.is_empty() {
if let Some(ts) = next_level_max_ts {
let pre_level_ts = ts.checked_add(1).unwrap_or(ts);
lv.time_range = TimeRange::new(pre_level_ts, pre_level_ts);
}
} else {
next_level_max_ts = Some(lv.time_range.max_ts);
}
}
}

pub async fn unmark_compacting_files(&self, files_ids: &HashSet<ColumnFileId>) {
if files_ids.is_empty() {
return;
Expand Down Expand Up @@ -1247,6 +1254,7 @@ pub mod test_tseries_family {
use parking_lot::RwLock;
use tokio::sync::mpsc::{self, Receiver};
use tokio::sync::RwLock as AsyncRwLock;
use utils::BloomFilter;

use super::{ColumnFile, LevelInfo};
use crate::compaction::test::default_table_schema;
Expand All @@ -1258,10 +1266,87 @@ pub mod test_tseries_family {
use crate::memcache::{FieldVal, MemCache, RowData, RowGroup};
use crate::summary::{CompactMeta, SummaryTask, VersionEdit};
use crate::tseries_family::{TimeRange, TseriesFamily, Version};
use crate::tsm::TsmTombstone;
use crate::tsm::{TsmReader, TsmTombstone};
use crate::version_set::VersionSet;
use crate::TseriesFamilyId;

#[test]
fn test_version_update_max_ts_of_levels() {
let opt = Arc::new(StorageOptions::default());
let db = Arc::new("test".to_string());
let bloom_filter = Arc::new(BloomFilter::new(2));
let lru_cache = ShardedAsyncCache::create_lru_sharded_cache(2);
let reader_cache: Arc<ShardedAsyncCache<String, Arc<TsmReader>>> = Arc::new(lru_cache);
{
let mut levels = LevelInfo::init_levels(db.clone(), 1, opt.clone());
Version::update_max_ts_of_levels(&mut levels);
assert!(levels[0].time_range.is_none());
assert!(levels[1].time_range.is_none());
assert!(levels[2].time_range.is_none());
assert!(levels[3].time_range.is_none());
assert!(levels[4].time_range.is_none());
}
{
let mut levels = LevelInfo::init_levels(db.clone(), 1, opt.clone());
#[rustfmt::skip]
levels[4].push_compact_meta(
&CompactMeta {
file_id: 1, file_size: 1, tsf_id: 1, level: 4, high_seq: 1, low_seq: 1, is_delta: false,
min_ts: 1000, max_ts: 2000,
},
bloom_filter.clone(), Arc::downgrade(&reader_cache),
);
Version::update_max_ts_of_levels(&mut levels);
assert!(levels[0].time_range.is_none());
assert_eq!(levels[1].time_range, (2001, 2001).into());
assert_eq!(levels[2].time_range, (2001, 2001).into());
assert_eq!(levels[3].time_range, (2001, 2001).into());
assert_eq!(levels[4].time_range, (1000, 2000).into());
}
{
let mut levels = LevelInfo::init_levels(db.clone(), 1, opt.clone());
#[rustfmt::skip]
levels[1].push_compact_meta(
&CompactMeta {
file_id: 1, file_size: 1, tsf_id: 1, level: 1, high_seq: 1, low_seq: 1, is_delta: false,
min_ts: 3001, max_ts: 4000,
},
bloom_filter.clone(), Arc::downgrade(&reader_cache),
);
#[rustfmt::skip]
levels[3].push_compact_meta(
&CompactMeta {
file_id: 1, file_size: 1, tsf_id: 1, level: 3, high_seq: 1, low_seq: 1, is_delta: false,
min_ts: 2001, max_ts: 3000,
},
bloom_filter.clone(), Arc::downgrade(&reader_cache),
);
Version::update_max_ts_of_levels(&mut levels);
assert!(levels[0].time_range.is_none());
assert_eq!(levels[1].time_range, (3001, 4000).into());
assert_eq!(levels[2].time_range, (3001, 3001).into());
assert_eq!(levels[3].time_range, (2001, 3000).into());
assert!(levels[4].time_range.is_none());
}
{
let mut levels = LevelInfo::init_levels(db, 1, opt);
#[rustfmt::skip]
levels[4].push_compact_meta(
&CompactMeta {
file_id: 1, file_size: 1, tsf_id: 1, level: 4, high_seq: 1, low_seq: 1, is_delta: false,
min_ts: 1, max_ts: i64::MAX,
},
bloom_filter, Arc::downgrade(&reader_cache),
);
Version::update_max_ts_of_levels(&mut levels);
assert!(levels[0].time_range.is_none());
assert_eq!(levels[1].time_range, (i64::MAX, i64::MAX).into());
assert_eq!(levels[2].time_range, (i64::MAX, i64::MAX).into());
assert_eq!(levels[3].time_range, (i64::MAX, i64::MAX).into());
assert_eq!(levels[4].time_range, (1, i64::MAX).into());
}
}

#[tokio::test]
async fn test_version_apply_version_edits_1() {
//! There is a Version with two levels:
Expand Down