Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: TSKV Optimizations to reduce the memory usage. #1199

Merged
merged 3 commits into from May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/models/src/schema.rs
Expand Up @@ -682,7 +682,7 @@ pub struct DatabaseOptions {
vnode_duration: Option<Duration>,

replica: Option<u64>,
// timestamp percision
// timestamp precision
precision: Option<Precision>,
}

Expand Down
17 changes: 14 additions & 3 deletions tskv/src/compaction/compact.rs
Expand Up @@ -534,7 +534,11 @@ pub async fn run_compaction_job(
let mut iter = CompactIterator::new(tsm_readers, max_data_block_size, false);
let tsm_dir = storage_opt.tsm_dir(&request.database, tsf_id);
let mut tsm_writer = tsm::new_tsm_writer(&tsm_dir, kernel.file_id_next(), false, 0).await?;
info!("Compaction: File {} been created.", tsm_writer.sequence());
info!(
"Compaction: File: {} been created (level: {}).",
tsm_writer.sequence(),
request.out_level
);
let mut version_edit = VersionEdit::new(tsf_id);
let mut file_metas: HashMap<ColumnFileId, Arc<BloomFilter>> = HashMap::new();

Expand Down Expand Up @@ -578,10 +582,17 @@ pub async fn run_compaction_job(
version_edit.add_file(cm, version.max_level_ts);
tsm_writer =
tsm::new_tsm_writer(&tsm_dir, kernel.file_id_next(), false, 0).await?;
info!("Compaction: File {} been created.", tsm_writer.sequence());
info!(
"Compaction: File: {} been created (level: {}).",
tsm_writer.sequence(),
request.out_level
);
}
tsm::WriteTsmError::Finished { path } => {
error!("Tsm writer finished: {}", path.display());
error!(
"Trying to write by a finished tsm writer: {}",
path.display()
);
}
}
}
Expand Down
13 changes: 4 additions & 9 deletions tskv/src/compaction/flush.rs
Expand Up @@ -61,11 +61,6 @@ impl FlushTask {
version_edits: &mut Vec<VersionEdit>,
file_metas: &mut HashMap<ColumnFileId, Arc<BloomFilter>>,
) -> Result<()> {
info!(
"Flush: Running flush job on ts_family: {} with {} MemCaches, collecting informations.",
version.ts_family_id,
self.mem_caches.len(),
);
let (mut high_seq, mut low_seq) = (0, u64::MAX);
let mut total_memcache_size = 0_u64;

Expand Down Expand Up @@ -196,7 +191,7 @@ pub async fn run_flush_memtable_job(
compact_task_sender: Option<Sender<CompactTask>>,
) -> Result<()> {
info!(
"Flush: Running flush job for {} of {} MemCaches",
"Flush: Running flush job for ts_family {} with {} MemCaches",
req.ts_family_id,
req.mems.len()
);
Expand Down Expand Up @@ -253,7 +248,7 @@ pub async fn run_flush_memtable_job(
}

info!(
"Flush: Flush for {} finished, version edits: {:?}",
"Flush: Run flush job for ts_family {} finished, version edits: {:?}",
req.ts_family_id, version_edits
);

Expand Down Expand Up @@ -397,7 +392,7 @@ impl WriterWrapper {
None => {
let writer = flush_task.new_tsm_writer(level == 0).await?;
info!(
"Flush: File {}(level={}) been created.",
"Flush: File: {} been created (level={}).",
writer.sequence(),
level
);
Expand All @@ -420,7 +415,7 @@ impl WriterWrapper {
w.write_index().await.context(error::WriteTsmSnafu)?;
w.finish().await.context(error::WriteTsmSnafu)?;
info!(
"Flush: File: {}(level={}) write finished ({} B).",
"Flush: File: {} write finished (level: {}, {} B).",
w.sequence(),
level,
w.size()
Expand Down
251 changes: 169 additions & 82 deletions tskv/src/compaction/job.rs
@@ -1,16 +1,45 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

use tokio::runtime::Runtime;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{oneshot, RwLock, Semaphore};
use tokio::time::Instant;
use trace::{error, info};

use crate::compaction::{flush, CompactTask, LevelCompactionPicker, Picker};
use crate::context::{GlobalContext, GlobalSequenceContext};
use crate::kv_option::StorageOptions;
use crate::summary::SummaryTask;
use crate::version_set::VersionSet;
use crate::TseriesFamilyId;

const COMPACT_BATCH_CHECKING_SECONDS: u64 = 1;

struct CompactProcessor {
vnode_ids: HashMap<TseriesFamilyId, bool>,
}

impl CompactProcessor {
fn insert(&mut self, vnode_id: TseriesFamilyId, should_flush: bool) {
let old_should_flush = self.vnode_ids.entry(vnode_id).or_insert(should_flush);
if should_flush && !*old_should_flush {
*old_should_flush = should_flush
}
}

fn take(&mut self) -> HashMap<TseriesFamilyId, bool> {
std::mem::replace(&mut self.vnode_ids, HashMap::with_capacity(32))
}
}

impl Default for CompactProcessor {
fn default() -> Self {
Self {
vnode_ids: HashMap::with_capacity(32),
}
}
}

pub fn run(
storage_opt: Arc<StorageOptions>,
Expand All @@ -22,100 +51,158 @@ pub fn run(
summary_task_sender: Sender<SummaryTask>,
) {
let runtime_inner = runtime.clone();

let compact_processor = Arc::new(RwLock::new(CompactProcessor::default()));
let compact_batch_processor = compact_processor.clone();
runtime.spawn(async move {
// TODO: Concurrent compactions should not over argument $cpu.
let compaction_limit = Arc::new(Semaphore::new(
storage_opt.max_concurrent_compaction as usize,
));
let mut check_interval =
tokio::time::interval(Duration::from_secs(COMPACT_BATCH_CHECKING_SECONDS));

loop {
check_interval.tick().await;
let processor = compact_batch_processor.read().await;
if processor.vnode_ids.is_empty() {
continue;
}
drop(processor);
let vnode_ids = compact_batch_processor.write().await.take();
let vnode_ids_for_debug = vnode_ids.clone();
let now = Instant::now();
info!("Compacting on vnode(job start): {:?}", &vnode_ids_for_debug);
for (vnode_id, flush_vnode) in vnode_ids {
let ts_family = version_set
.read()
.await
.get_tsfamily_by_tf_id(vnode_id)
.await;
if let Some(tsf) = ts_family {
info!("Starting compaction on ts_family {}", vnode_id);
let start = Instant::now();
if !tsf.read().await.can_compaction() {
info!("forbidden compaction on moving vnode {}", vnode_id);
return;
}
let picker = LevelCompactionPicker::new(storage_opt.clone());
let version = tsf.read().await.version();
let compact_req = picker.pick_compaction(version);
if let Some(req) = compact_req {
let database = req.database.clone();
let compact_ts_family = req.ts_family_id;
let out_level = req.out_level;

let ctx_inner = ctx.clone();
let seq_ctx_inner = seq_ctx.clone();
let version_set_inner = version_set.clone();
let summary_task_sender_inner = summary_task_sender.clone();

// Method acquire_owned() will return AcquireError if the semaphore has been closed.
let permit = compaction_limit.clone().acquire_owned().await.unwrap();
runtime_inner.spawn(async move {
if flush_vnode {
let mut tsf_wlock = tsf.write().await;
tsf_wlock.switch_to_immutable();
let flush_req = tsf_wlock.build_flush_req(true);
drop(tsf_wlock);
if let Some(req) = flush_req {
if let Err(e) = flush::run_flush_memtable_job(
req,
ctx_inner.clone(),
seq_ctx_inner.clone(),
version_set_inner,
summary_task_sender_inner.clone(),
None,
)
.await
{
error!("Failed to flush vnode {}: {:?}", vnode_id, e);
}
}
}

match super::run_compaction_job(req, ctx_inner).await {
Ok(Some((version_edit, file_metas))) => {
metrics::incr_compaction_success();
let (summary_tx, _summary_rx) = oneshot::channel();
let _ = summary_task_sender_inner
.send(SummaryTask::new(
vec![version_edit],
Some(file_metas),
None,
summary_tx,
))
.await;

metrics::sample_tskv_compaction_duration(
database.as_str(),
compact_ts_family.to_string().as_str(),
out_level.to_string().as_str(),
start.elapsed().as_secs_f64(),
)
// TODO Handle summary result using summary_rx.
}
Ok(None) => {
info!("There is nothing to compact.");
}
Err(e) => {
metrics::incr_compaction_failed();
error!("Compaction job failed: {:?}", e);
}
}
drop(permit);
});
}
}
}
info!(
"Compacting on vnode(job start): {:?} costs {} sec",
vnode_ids_for_debug,
now.elapsed().as_secs()
);
}
});

runtime.spawn(async move {
while let Some(compact_task) = receiver.recv().await {
// Vnode id to compact & whether vnode be flushed before compact
let (vnode_id, flush_vnode) = match compact_task {
CompactTask::Vnode(id) => (id, false),
CompactTask::ColdVnode(id) => (id, true),
};
let ts_family = version_set
.read()
compact_processor
.write()
.await
.get_tsfamily_by_tf_id(vnode_id)
.await;
if let Some(tsf) = ts_family {
info!("Starting compaction on ts_family {}", vnode_id);
let start = Instant::now();
if !tsf.read().await.can_compaction() {
info!("forbidden compaction on moving vnode {}", vnode_id);
return;
}
let picker = LevelCompactionPicker::new(storage_opt.clone());
let version = tsf.read().await.version();
let compact_req = picker.pick_compaction(version);
if let Some(req) = compact_req {
let database = req.database.clone();
let compact_ts_family = req.ts_family_id;
let out_level = req.out_level;

let ctx_inner = ctx.clone();
let seq_ctx_inner = seq_ctx.clone();
let version_set_inner = version_set.clone();
let summary_task_sender_inner = summary_task_sender.clone();

let permit = compaction_limit.clone().acquire_owned().await.unwrap();
runtime_inner.spawn(async move {
if flush_vnode {
let mut tsf_wlock = tsf.write().await;
tsf_wlock.switch_to_immutable();
let flush_req = tsf_wlock.build_flush_req(true);
drop(tsf_wlock);
if let Some(req) = flush_req {
if let Err(e) = flush::run_flush_memtable_job(
req,
ctx_inner.clone(),
seq_ctx_inner.clone(),
version_set_inner,
summary_task_sender_inner.clone(),
None,
)
.await
{
error!("Failed to flush vnode {}: {:?}", vnode_id, e);
}
}
}

match super::run_compaction_job(req, ctx_inner).await {
Ok(Some((version_edit, file_metas))) => {
metrics::incr_compaction_success();
let (summary_tx, _summary_rx) = oneshot::channel();
let _ = summary_task_sender_inner
.send(SummaryTask::new(
vec![version_edit],
Some(file_metas),
None,
summary_tx,
))
.await;

metrics::sample_tskv_compaction_duration(
database.as_str(),
compact_ts_family.to_string().as_str(),
out_level.to_string().as_str(),
start.elapsed().as_secs_f64(),
)
// TODO Handle summary result using summary_rx.
}
Ok(None) => {
info!("There is nothing to compact.");
}
Err(e) => {
metrics::incr_compaction_failed();
error!("Compaction job failed: {:?}", e);
}
}
drop(permit);
});
}
}
.insert(vnode_id, flush_vnode);
}
});
}

#[cfg(test)]
mod test {
use crate::compaction::job::CompactProcessor;
use crate::TseriesFamilyId;

#[test]
fn test_build_compact_batch() {
let mut compact_batch_builder = CompactProcessor::default();
compact_batch_builder.insert(1, false);
compact_batch_builder.insert(2, false);
compact_batch_builder.insert(1, true);
compact_batch_builder.insert(3, true);
assert_eq!(compact_batch_builder.vnode_ids.len(), 3);
let mut keys: Vec<TseriesFamilyId> =
compact_batch_builder.vnode_ids.keys().cloned().collect();
keys.sort();
assert_eq!(keys, vec![1, 2, 3]);
assert_eq!(compact_batch_builder.vnode_ids.get(&1), Some(&true));
assert_eq!(compact_batch_builder.vnode_ids.get(&2), Some(&false));
assert_eq!(compact_batch_builder.vnode_ids.get(&3), Some(&true));
let vnode_ids = compact_batch_builder.take();
assert_eq!(vnode_ids.len(), 3);
assert_eq!(vnode_ids.get(&1), Some(&true));
assert_eq!(vnode_ids.get(&2), Some(&false));
assert_eq!(vnode_ids.get(&3), Some(&true));
}
}
1 change: 1 addition & 0 deletions tskv/src/compute/count.rs
Expand Up @@ -75,6 +75,7 @@ async fn count_non_null_values_inner(
let read_tasks =
create_file_read_tasks(&super_version, &column_files, &counting_object, &time_range)
.await?;
// TODO(zipper): Very big HashSet maybe slow insert.
let (cached_timestamps, cached_time_range) = match counting_object {
CountingObject::Field(field_id) => {
get_field_timestamps_in_caches(&super_version, field_id, &time_range)
Expand Down