Skip to content

Commit

Permalink
Feature(2.3): metrics of delta_compactions; some improvements (#2055)
Browse files Browse the repository at this point in the history
* refactor(compaction,2.3): metrics for delta_compaction; normal compaction sheduler

* refactor(compaction,2.3): simplify codes of compaction job & scheduler
  • Loading branch information
zipper-meng committed Apr 10, 2024
1 parent 0ef625e commit 2c416ee
Show file tree
Hide file tree
Showing 12 changed files with 765 additions and 453 deletions.
10 changes: 10 additions & 0 deletions common/models/src/meta_data.rs
Expand Up @@ -108,6 +108,16 @@ pub enum VnodeStatus {
Broken,
}

impl std::fmt::Display for VnodeStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
VnodeStatus::Running => write!(f, "running"),
VnodeStatus::Copying => write!(f, "copying"),
VnodeStatus::Broken => write!(f, "broken"),
}
}
}

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct VnodeAllInfo {
pub vnode_id: VnodeId,
Expand Down
1 change: 1 addition & 0 deletions config/config_8902.toml
Expand Up @@ -32,6 +32,7 @@ compact_trigger_file_num = 4
compact_trigger_cold_duration = "1h"
max_compact_size = "2G" # 2147483648
max_concurrent_compaction = 4
collect_compaction_metrics = false
strict_write = false
reserve_space = "0"
copyinto_trigger_flush_size = "128M" # 134217728
Expand Down
6 changes: 4 additions & 2 deletions config/config_8912.toml
Expand Up @@ -27,10 +27,12 @@ max_summary_size = "128M" # 134217728
base_file_size = "16M" # 16777216
flush_req_channel_cap = 16
max_cached_readers = 32
enable_compaction = true
compact_trigger_file_num = 4
compact_trigger_cold_duration = "1h"
max_compact_size = "2G" # 2147483648
max_concurrent_compaction = 4
collect_compaction_metrics = false
strict_write = false
reserve_space = "0"
copyinto_trigger_flush_size = "128M" # 134217728
Expand All @@ -47,7 +49,7 @@ sync_interval = "0"
[cache]
max_buffer_size = "128M" # 134217728
max_immutable_number = 4
partition = 16 # default memcache partition number
partition = 16 # default memcache partition number

[log]
level = 'info'
Expand All @@ -69,7 +71,7 @@ tcp_listen_port = 8915
vector_listen_port = 8916

[node_basic]
node_id = 2001
node_id = 2001
store_metrics = true

[heartbeat]
Expand Down
17 changes: 17 additions & 0 deletions config/src/storage_config.rs
Expand Up @@ -50,6 +50,9 @@ pub struct StorageConfig {
#[serde(default = "StorageConfig::default_max_concurrent_compaction")]
pub max_concurrent_compaction: u16,

#[serde(default = "StorageConfig::default_collect_compaction_metrics")]
pub collect_compaction_metrics: bool,

#[serde(default = "StorageConfig::default_strict_write")]
pub strict_write: bool,

Expand Down Expand Up @@ -108,6 +111,10 @@ impl StorageConfig {
4
}

fn default_collect_compaction_metrics() -> bool {
false
}

fn default_reserve_space() -> u64 {
0
}
Expand All @@ -133,6 +140,12 @@ impl StorageConfig {
if let Ok(size) = std::env::var("CNOSDB_FLUSH_REQ_CHANNEL_CAP") {
self.flush_req_channel_cap = size.parse::<usize>().unwrap();
}
if let Ok(size) = std::env::var("CNOSDB_STORAGE_MAX_CACHED_READERS") {
self.max_cached_readers = size.parse::<usize>().unwrap();
}
if let Ok(flag) = std::env::var("CNOSDB_STORAGE_ENABLE_COMPACTION") {
self.enable_compaction = flag.parse::<bool>().unwrap();
}
if let Ok(size) = std::env::var("CNOSDB_STORAGE_MAX_LEVEL") {
self.max_level = size.parse::<u16>().unwrap();
}
Expand All @@ -148,6 +161,9 @@ impl StorageConfig {
if let Ok(size) = std::env::var("CNOSDB_STORAGE_MAX_CONCURRENT_COMPACTION") {
self.max_concurrent_compaction = size.parse::<u16>().unwrap();
}
if let Ok(flag) = std::env::var("CNOSDB_STORAGE_COLLECT_COMPACTION_METRICS") {
self.collect_compaction_metrics = flag.parse::<bool>().unwrap();
}
if let Ok(size) = std::env::var("CNOSDB_STORAGE_STRICT_WRITE") {
self.strict_write = size.parse::<bool>().unwrap();
}
Expand Down Expand Up @@ -179,6 +195,7 @@ impl Default for StorageConfig {
compact_trigger_cold_duration: Self::default_compact_trigger_cold_duration(),
max_compact_size: Self::default_max_compact_size(),
max_concurrent_compaction: Self::default_max_concurrent_compaction(),
collect_compaction_metrics: Self::default_collect_compaction_metrics(),
strict_write: Self::default_strict_write(),
reserve_space: Self::default_reserve_space(),
copyinto_trigger_flush_size: Self::default_copyinto_trigger_flush_size(),
Expand Down
64 changes: 46 additions & 18 deletions tskv/src/compaction/delta_compact.rs
Expand Up @@ -10,6 +10,7 @@ use utils::BloomFilter;

use super::CompactTask;
use crate::compaction::compact::{CompactingBlock, CompactingBlockMeta, CompactingFile};
use crate::compaction::metric::{self, CompactMetrics, FakeMetricStore, MetricStore};
use crate::compaction::{CompactReq, CompactingBlocks};
use crate::context::GlobalContext;
use crate::error::{self, Result};
Expand Down Expand Up @@ -79,7 +80,28 @@ pub async fn run_compaction_job(
let mut merged_blks = Vec::with_capacity(32);

let mut curr_fid: Option<FieldId> = None;
while let Some(fid) = state.next(&mut merging_blk_meta_groups).await {

let mut compact_metrics: Box<dyn MetricStore> = if request
.version
.borrowed_storage_opt()
.collect_compaction_metrics
{
Box::new(CompactMetrics::default(compact_task))
} else {
Box::new(FakeMetricStore)
};

compact_metrics.begin_all();
loop {
compact_metrics.begin(metric::NEXT_FIELD);
let fid = match state.next(&mut merging_blk_meta_groups).await {
Some(fid) => fid,
None => break,
};
compact_metrics.finish(metric::NEXT_FIELD);

compact_metrics.begin(metric::MERGE_FIELD);

for blk_meta_group in merging_blk_meta_groups.drain(..) {
trace::trace!("merging meta group: {blk_meta_group}");
if let Some(c_fid) = curr_fid {
Expand All @@ -90,20 +112,25 @@ pub async fn run_compaction_job(
trace::trace!(
"write the previous compacting block (fid={curr_fid:?}): {blk}"
);
compact_metrics.begin(metric::WRITE_BLOCK);
writer_wrapper.write(blk).await?;
compact_metrics.finish(metric::WRITE_BLOCK);
}
}
}
curr_fid = Some(fid);

compact_metrics.begin(metric::MERGE_BLOCK);
blk_meta_group
.merge_with_previous_block(
previous_merged_block.take(),
max_block_size,
&out_time_range,
&mut merged_blks,
&mut compact_metrics,
)
.await?;
compact_metrics.finish(metric::MERGE_BLOCK);
if merged_blks.is_empty() {
continue;
}
Expand All @@ -112,7 +139,6 @@ pub async fn run_compaction_job(
previous_merged_block = Some(merged_blks.remove(0));
continue;
}

let last_blk_idx = merged_blks.len() - 1;
for (i, blk) in merged_blks.drain(..).enumerate() {
if i == last_blk_idx && blk.len() < max_block_size {
Expand All @@ -123,16 +149,25 @@ pub async fn run_compaction_job(
break;
}
trace::trace!("write compacting block(fid={fid}): {blk}");
compact_metrics.begin(metric::WRITE_BLOCK);
writer_wrapper.write(blk).await?;
compact_metrics.finish(metric::WRITE_BLOCK);
}
}

compact_metrics.finish(metric::MERGE_FIELD);
}
if let Some(blk) = previous_merged_block {
trace::trace!("write the final compacting block(fid={curr_fid:?}): {blk}");
compact_metrics.begin(metric::WRITE_BLOCK);
writer_wrapper.write(blk).await?;
compact_metrics.finish(metric::WRITE_BLOCK);
}

let (mut version_edit, file_metas) = writer_wrapper.close().await?;

compact_metrics.finish_all();

// Level 0 files that can be deleted after compaction.
version_edit.del_files = l0_file_metas_will_delete;
if let Some(f) = level_file {
Expand Down Expand Up @@ -383,6 +418,7 @@ impl CompactingBlockMetaGroup {
max_block_size: usize,
time_range: &TimeRange,
compacting_blocks: &mut Vec<CompactingBlock>,
metrics: &mut Box<dyn MetricStore>,
) -> Result<()> {
compacting_blocks.clear();
if self.blk_metas.is_empty() {
Expand All @@ -399,7 +435,9 @@ impl CompactingBlockMetaGroup {
// Only one compacting block and has no tombstone, write as raw block.
trace::trace!("only one compacting block without tombstone and time_range is entirely included by target level, handled as raw block");
let head_meta = &self.blk_metas[0].meta;
metrics.begin(metric::READ_BLOCK);
let buf = self.blk_metas[0].get_raw_data().await?;
metrics.finish(metric::READ_BLOCK);

if head_meta.size() >= max_block_size as u64 {
// Raw data block is full, so do not merge with the previous, directly return.
Expand Down Expand Up @@ -444,7 +482,9 @@ impl CompactingBlockMetaGroup {

let (mut head_block, mut head_i) = (Option::<DataBlock>::None, 0_usize);
for (i, meta) in self.blk_metas.iter().enumerate() {
metrics.begin(metric::READ_BLOCK);
if let Some(blk) = meta.get_data_block_intersection(time_range).await? {
metrics.finish(metric::READ_BLOCK);
head_block = Some(blk);
head_i = i;
break;
Expand All @@ -460,26 +500,14 @@ impl CompactingBlockMetaGroup {
}

trace::trace!("=== Resolving {} blocks", self.blk_metas.len() - head_i - 1);
const BLOCK_BATCH_SIZE: usize = 64;
let mut batch_blk = Vec::with_capacity(BLOCK_BATCH_SIZE);
for blk_meta in self.blk_metas.iter_mut().skip(head_i + 1) {
// Merge decoded data block.
metrics.begin(metric::READ_BLOCK);
if let Some(blk) = blk_meta.get_data_block_intersection(time_range).await? {
batch_blk.push(blk);
if batch_blk.len() >= BLOCK_BATCH_SIZE {
let batch_blk = std::mem::replace(
&mut batch_blk,
Vec::with_capacity(BLOCK_BATCH_SIZE),
);
if let Some(blk) = DataBlock::merge_blocks(batch_blk) {
head_blk = head_blk.merge(blk);
}
}
}
}
if !batch_blk.is_empty() {
if let Some(blk) = DataBlock::merge_blocks(batch_blk) {
metrics.finish(metric::READ_BLOCK);
metrics.begin(metric::MERGE_BLOCK_BATCH);
head_blk = head_blk.merge(blk);
metrics.finish(metric::MERGE_BLOCK_BATCH);
}
}
trace::trace!("Compaction(delta): Finished task to merge data blocks");
Expand Down

0 comments on commit 2c416ee

Please sign in to comment.