Skip to content

Commit

Permalink
update check logic of layered memtable enabling.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachelint committed May 27, 2024
1 parent 9cadf32 commit a9c4041
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 40 deletions.
17 changes: 11 additions & 6 deletions src/analytic_engine/src/memtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,18 @@ pub struct LayeredMemtableOptions {
pub mutable_segment_switch_threshold: ReadableSize,
}

impl LayeredMemtableOptions {
#[inline]
pub fn enable_layered_memtable(&self) -> bool {
self.enable && self.mutable_segment_switch_threshold.0 > 0
}
}

impl Default for LayeredMemtableOptions {
fn default() -> Self {
Self {
mutable_segment_switch_threshold: ReadableSize::mb(0),
enable: false,
mutable_segment_switch_threshold: ReadableSize::mb(3),
}
}
}
Expand All @@ -98,14 +105,12 @@ impl From<manifest::LayeredMemtableOptions> for LayeredMemtableOptions {
fn from(value: manifest::LayeredMemtableOptions) -> Self {
let enable = match value.enable {
Some(Enable::EnableOpt(enable)) => enable,
// For compatibility(enable field not exist in old horaedb version),
// if this field not exist in pb, set it to true.
None => true,
None => false,
};

Self {
mutable_segment_switch_threshold: ReadableSize(value.mutable_segment_switch_threshold),
enable,
mutable_segment_switch_threshold: ReadableSize(value.mutable_segment_switch_threshold),
}
}
}
Expand All @@ -114,7 +119,7 @@ impl From<LayeredMemtableOptions> for manifest::LayeredMemtableOptions {
fn from(value: LayeredMemtableOptions) -> Self {
Self {
mutable_segment_switch_threshold: value.mutable_segment_switch_threshold.0,
enable: Some(Enable::EnableOpt(value.enable))
enable: Some(Enable::EnableOpt(value.enable)),
}
}
}
Expand Down
44 changes: 31 additions & 13 deletions src/analytic_engine/src/table/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use id_allocator::IdAllocator;
use logger::{debug, info};
use macros::define_result;
use object_store::Path;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::table::{SchemaId, TableId};
use time_ext::ReadableDuration;

Expand Down Expand Up @@ -95,6 +95,9 @@ pub enum Error {

#[snafu(display("Failed to alloc file id, err:{}", source))]
AllocFileId { source: GenericError },

#[snafu(display("Found invalid table opts, msg:{msg}.\nBacktrace:\n{backtrace}"))]
InvalidTableOpts { msg: String, backtrace: Backtrace },
}

define_result!(Error);
Expand Down Expand Up @@ -323,13 +326,20 @@ impl TableData {
MemtableType::Column => Arc::new(ColumnarMemTableFactory),
};

// Wrap it by `LayeredMemtable`.
let mutable_segment_switch_threshold = opts
.layered_memtable_opts
.mutable_segment_switch_threshold
.0 as usize;
let enable_layered_memtable = mutable_segment_switch_threshold > 0;
let enable_layered_memtable = opts.layered_memtable_opts.enable;
let memtable_factory = if enable_layered_memtable {
let mutable_segment_switch_threshold = opts
.layered_memtable_opts
.mutable_segment_switch_threshold
.0 as usize;

ensure!(
mutable_segment_switch_threshold > 0,
InvalidTableOpts {
msg: format!("layered memtable is enabled but mutable_switch_threshold is 0")
}
);

Arc::new(LayeredMemtableFactory::new(
memtable_factory,
mutable_segment_switch_threshold,
Expand Down Expand Up @@ -403,13 +413,21 @@ impl TableData {
MemtableType::Column => Arc::new(ColumnarMemTableFactory),
};
// Maybe wrap it by `LayeredMemtable`.
let mutable_segment_switch_threshold = add_meta
.opts
.layered_memtable_opts
.mutable_segment_switch_threshold
.0 as usize;
let enable_layered_memtable = mutable_segment_switch_threshold > 0;
let enable_layered_memtable = add_meta.opts.layered_memtable_opts.enable;
let memtable_factory = if enable_layered_memtable {
let mutable_segment_switch_threshold = add_meta
.opts
.layered_memtable_opts
.mutable_segment_switch_threshold
.0 as usize;

ensure!(
mutable_segment_switch_threshold > 0,
InvalidTableOpts {
msg: format!("layered memtable is enabled but mutable_switch_threshold is 0")
}
);

Arc::new(LayeredMemtableFactory::new(
memtable_factory,
mutable_segment_switch_threshold,
Expand Down
65 changes: 46 additions & 19 deletions src/analytic_engine/src/table_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

//! Constants for table options.

use std::{collections::HashMap, string::ToString, time::Duration};
use std::{collections::HashMap, str::FromStr, string::ToString, time::Duration};

use anyhow::ensure;
use common_types::{
time::Timestamp, ARENA_BLOCK_SIZE, COMPACTION_STRATEGY, COMPRESSION, ENABLE_TTL, MEMTABLE_TYPE,
MUTABLE_SEGMENT_SWITCH_THRESHOLD, NUM_ROWS_PER_ROW_GROUP, OPTION_KEY_ENABLE_TTL,
SEGMENT_DURATION, STORAGE_FORMAT, TTL, UPDATE_MODE, WRITE_BUFFER_SIZE,
time::Timestamp, ARENA_BLOCK_SIZE, COMPACTION_STRATEGY, COMPRESSION, ENABLE_TTL,
LAYERED_ENABLE, LAYERED_MUTABLE_SWITCH_THRESHOLD, MEMTABLE_TYPE, NUM_ROWS_PER_ROW_GROUP,
OPTION_KEY_ENABLE_TTL, SEGMENT_DURATION, STORAGE_FORMAT, TTL, UPDATE_MODE, WRITE_BUFFER_SIZE,
};
use datafusion::parquet::basic::Compression as ParquetCompression;
use generic_error::{BoxError, GenericError};
Expand Down Expand Up @@ -141,12 +142,9 @@ pub enum Error {
HybridDeprecated { backtrace: Backtrace },

#[snafu(display(
"Failed to parse layered memtable options, err:{source}.\nBacktrace:\n{backtrace}",
"Failed to parse layered memtable options, msg:{msg}.\nBacktrace:\n{backtrace}",
))]
ParseLayeredMemtableOptions {
source: GenericError,
backtrace: Backtrace,
},
ParseLayeredMemtableOptions { msg: String, backtrace: Backtrace },

#[snafu(display("Layered memtable options is missing.\nBacktrace:\n{backtrace}",))]
MissingLayeredMemtableOptions { backtrace: Backtrace },
Expand Down Expand Up @@ -470,7 +468,11 @@ impl TableOptions {
),
(MEMTABLE_TYPE.to_string(), self.memtable_type.to_string()),
(
MUTABLE_SEGMENT_SWITCH_THRESHOLD.to_string(),
LAYERED_ENABLE.to_string(),
self.layered_memtable_opts.enable.to_string(),
),
(
LAYERED_MUTABLE_SWITCH_THRESHOLD.to_string(),
self.layered_memtable_opts
.mutable_segment_switch_threshold
.0
Expand All @@ -488,14 +490,21 @@ impl TableOptions {
/// If invalid, Some(reason) will be returned.
/// If valid, None will be returned
pub fn check_validity(&self) -> Option<String> {
// layered memtable is not support in overwrite mode
if self.need_dedup()
if self.layered_memtable_opts.enable
&& self
.layered_memtable_opts
.mutable_segment_switch_threshold
.0
> 0
== 0
{
return Some(format!(
"layered memtable is enabled but mutable_switch_threshold is 0, layered_memtable_opts:{:?}",
self.layered_memtable_opts,
));
}

// layered memtable is not support in overwrite mode
if self.need_dedup() && self.layered_memtable_opts.enable {
return Some(format!(
"layered memtable is enabled for table needing dedup, layered_memtable_opts:{:?}, update_mode:{:?}",
self.layered_memtable_opts, self.update_mode,
Expand Down Expand Up @@ -821,14 +830,32 @@ fn merge_table_options(
if let Some(v) = options.get(MEMTABLE_TYPE) {
base_table_opts.memtable_type = MemtableType::parse_from(v);
}
if let Some(v) = options.get(MUTABLE_SEGMENT_SWITCH_THRESHOLD) {
let threshold = v
.parse::<u64>()
.box_err()
.context(ParseLayeredMemtableOptions)?;
if let Some(v) = options.get(LAYERED_ENABLE) {
let enable = match v.parse::<bool>() {
Ok(v) => v,
Err(e) => {
return ParseLayeredMemtableOptions {
msg: format!("invalid layered_enable setting, err:{e}"),
}
.fail()
}
};
base_table_opts.layered_memtable_opts.enable = enable;
}
if let Some(v) = options.get(LAYERED_MUTABLE_SWITCH_THRESHOLD) {
let threshold = match ReadableSize::from_str(v) {
Ok(v) => v,
Err(e) => {
return ParseLayeredMemtableOptions {
msg: format!("invalid layered_mutable_switch_threshold setting, err:{e}"),
}
.fail()
}
};

base_table_opts
.layered_memtable_opts
.mutable_segment_switch_threshold = ReadableSize(threshold);
.mutable_segment_switch_threshold = threshold;
}

Ok(base_table_opts)
Expand Down
3 changes: 2 additions & 1 deletion src/common_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ pub const UPDATE_MODE: &str = "update_mode";
pub const COMPRESSION: &str = "compression";
pub const STORAGE_FORMAT: &str = "storage_format";
pub const MEMTABLE_TYPE: &str = "memtable_type";
pub const MUTABLE_SEGMENT_SWITCH_THRESHOLD: &str = "mutable_segment_switch_threshold";
pub const LAYERED_MUTABLE_SWITCH_THRESHOLD: &str = "layered_mutable_switch_threshold";
pub const LAYERED_ENABLE: &str = "layered_enable";

#[cfg(any(test, feature = "test"))]
pub mod tests;
2 changes: 1 addition & 1 deletion src/system_catalog/src/sys_catalog_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl SysCatalogTable {
);
// Disable layered memtable for system catalog table.
options.insert(
common_types::MUTABLE_SEGMENT_SWITCH_THRESHOLD.to_string(),
common_types::LAYERED_MUTABLE_SWITCH_THRESHOLD.to_string(),
0.to_string(),
);
let params = CreateTableParams {
Expand Down

0 comments on commit a9c4041

Please sign in to comment.