Skip to content

Commit

Permalink
refactor(compaction,2.3): simplify codes of compaction job & scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
zipper-meng committed Apr 9, 2024
1 parent e93a447 commit bae6507
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 179 deletions.
78 changes: 62 additions & 16 deletions tskv/src/compaction/job.rs
Expand Up @@ -2,16 +2,18 @@ use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};

use futures::Future;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{oneshot, OwnedSemaphorePermit, RwLock, Semaphore};
use trace::{error, info, warn};
use trace::{error, info, warn, debug};

use crate::compaction::{picker, CompactTask};
use crate::context::{GlobalContext, GlobalSequenceContext};
use crate::kv_option::StorageOptions;
use crate::summary::SummaryTask;
use crate::version_set::VersionSet;
use crate::TseriesFamilyId;

const COMPACT_BATCH_CHECKING_SECONDS: u64 = 1;

Expand Down Expand Up @@ -113,6 +115,7 @@ impl CompactionJob {
mut compact_task_receiver,
} = self;

// Background job to collect unique compact tasks.
runtime.spawn(async move {
while let Some(compact_task) = compact_task_receiver.recv().await {
compact_task_group_producer
Expand All @@ -121,16 +124,16 @@ impl CompactionJob {
.push_back(compact_task);
}
});

let runtime_inner = runtime.clone();
// Background job to run compaction tasks.
runtime.spawn(async move {
// Each vnodes can only have one compaction task running.
let compacting_vnodes: Arc<RwLock<HashSet<TseriesFamilyId>>> =
Arc::new(RwLock::new(HashSet::new()));
let mut check_interval =
tokio::time::interval(Duration::from_secs(COMPACT_BATCH_CHECKING_SECONDS));
loop {
check_interval.tick().await;
let compact_task = match compact_task_group_consumer.write().await.pop_front() {
Some(t) => t,
None => continue,
};
let permit = match limiter.clone().acquire_owned().await {
Ok(l) => l,
Err(e) => {
Expand All @@ -139,10 +142,35 @@ impl CompactionJob {
break;
}
};
let compact_task_opt = compact_task_group_consumer.write().await.pop_front();
let (compact_task, after_compact) = match compact_task_opt {
Some(t) => {
let vnode_id = t.ts_family_id();
if compacting_vnodes.read().await.contains(&vnode_id) {
// If vnode is compacting, put the tasks back to the compact task group.
debug!("vnode {vnode_id} is compacting, skip this time");
compact_task_group_consumer.write().await.push_back(t);
continue;
} else {
// If vnode is not compacting, mark it as compacting.
compacting_vnodes.write().await.insert(vnode_id);
let compacting_vnodes_inner = compacting_vnodes.clone();
let d = DeferGuard::new(runtime_inner.clone(), async move {
compacting_vnodes_inner.write().await.remove(&vnode_id);
});
(t, d)
}
}
None => {
check_interval.tick().await;
continue;
}
};
runtime_inner.spawn(Self::run_compact_task(
compact_task,
compaction_context.clone(),
permit,
after_compact,
));
}
});
Expand All @@ -152,8 +180,8 @@ impl CompactionJob {
task: CompactTask,
context: Arc<CompactionContext>,
_permit: OwnedSemaphorePermit,
_after_compact: DeferGuard<impl Future<Output = ()> + Send>,
) {
info!("Starting compaction: {task}");
let start = Instant::now();

let vnode_id = task.ts_family_id();
Expand All @@ -167,10 +195,6 @@ impl CompactionJob {
};
let vnode = vnode.read().await;
if !vnode.can_compaction() {
info!(
"Compaction skipped: vnode_id: {vnode_id}, status: {}",
vnode.status()
);
return;
}
vnode.version()
Expand All @@ -179,15 +203,15 @@ impl CompactionJob {
let compact_req = match picker::pick_compaction(task, version).await {
Some(req) => req,
None => {
info!("Finished compaction, did nothing");
debug!("Finished compactionb: {task}, did nothing");
return;
}
};
let database = compact_req.version.database();
let in_level = compact_req.in_level;
let out_level = compact_req.out_level;

info!("Running compaction job: {task}, sending to summary write.");
info!("Running compaction job: {task}.");
match super::run_compaction_job(compact_req, context.ctx.clone()).await {
Ok(Some((version_edit, file_metas))) => {
info!("Finished compaction, sending to summary write: {task}.");
Expand All @@ -210,7 +234,7 @@ impl CompactionJob {
out_level.to_string().as_str(),
start.elapsed().as_secs_f64(),
);
info!("Finished compaction, waiting for summary write: {task}.");
info!("Finished compaction, waiting for summary write: {task}");
match summary_rx.await {
Ok(Ok(())) => {
info!("Finished compaction, summary write success: {version_edit:?}");
Expand All @@ -226,16 +250,38 @@ impl CompactionJob {
}
}
Ok(None) => {
info!("Finished compaction: There is nothing to compact.");
info!("Finished compaction, nothing compacted");
}
Err(e) => {
metrics::incr_compaction_failed();
error!("Compaction: job failed: {}", e);
error!("Compaction: job failed: {e}");
}
}
}
}

pub struct DeferGuard<F: Future<Output = ()> + Send + 'static> {
runtime: Arc<Runtime>,
f: Option<F>,
}

impl<F: Future<Output = ()> + Send + 'static> DeferGuard<F> {
pub fn new(runtime: Arc<Runtime>, f: F) -> Self {
Self {
runtime,
f: Some(f),
}
}
}

impl<F: Future<Output = ()> + Send + 'static> Drop for DeferGuard<F> {
fn drop(&mut self) {
if let Some(f) = self.f.take() {
self.runtime.spawn(f);
}
}
}

#[cfg(test)]
mod test {
use crate::compaction::job::CompactTaskGroup;
Expand Down
4 changes: 2 additions & 2 deletions tskv/src/compaction/picker.rs
Expand Up @@ -70,7 +70,7 @@ impl LevelCompactionPicker {
in_level = start_lvl;
out_level = out_lvl;
} else {
info!("Picker(level): picked no level");
debug!("Picker(level): picked no level");
return None;
}

Expand All @@ -83,7 +83,7 @@ impl LevelCompactionPicker {
files.sort_by(Self::compare_column_file);
let picking_files: Vec<Arc<ColumnFile>> =
Self::pick_files(files, storage_opt.max_compact_size).await;
info!(
debug!(
"Picker(level): Picked files: [ {} ]",
ColumnFiles(&picking_files)
);
Expand Down
43 changes: 8 additions & 35 deletions tskv/src/tseries_family.rs
Expand Up @@ -1273,41 +1273,14 @@ pub fn schedule_vnode_compaction(

// Check if level-0 files is more than 0 .
let version = vnode.read().await.super_version().version.clone();
let mut triggered_delta_compaction = false;
if version.levels_info()[0].files.len() as u32 >= compact_trigger_file_num {
let mut level_0_files = 0_u32;
for file in version.levels_info()[0].files.iter() {
if file.is_deleted() || file.is_compacting().await {
continue;
}
level_0_files += 1;
if level_0_files >= compact_trigger_file_num {
triggered_delta_compaction = true;
let task = CompactTask::Delta(tsf_id);
if let Err(e) = compact_task_sender.send(task).await {
warn!("Scheduler(vnode: {tsf_id}): Failed to send compact task: {task}: {e}");
}
break;
}
}
}
if !triggered_delta_compaction {
let mut level_14_files = 0_u32;
for level in &version.levels_info()[1..] {
for file in level.files.iter() {
if file.is_deleted() || file.is_compacting().await {
continue;
}
level_14_files += 1;
if level_14_files >= compact_trigger_file_num {
let task = CompactTask::Normal(tsf_id);
if let Err(e) = compact_task_sender.send(task).await {
warn!("Scheduler(vnode: {tsf_id}): Failed to send compact task: {task}: {e}");
}
break;
}
}
}
let levels = version.levels_info();
let task = if levels[0].files.len() as u32 >= compact_trigger_file_num {
CompactTask::Delta(tsf_id)
} else {
CompactTask::Normal(tsf_id)
};
if let Err(e) = compact_task_sender.send(task).await {
warn!("Scheduler(vnode: {tsf_id}): Failed to send compact task: {task}: {e}");
}
}
_ = cancellation_token.cancelled() => {
Expand Down

0 comments on commit bae6507

Please sign in to comment.