Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): add options to ignore building index for specific column ids #3295

Merged
merged 2 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::region::options::IndexOptions;
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::IndexerBuilder;
Expand Down Expand Up @@ -143,6 +144,7 @@ impl AccessLayer {
row_group_size: write_opts.row_group_size,
object_store: self.object_store.clone(),
intermediate_manager: self.intermediate_manager.clone(),
index_options: request.index_options,
}
.build();
let mut writer = ParquetWriter::new(
Expand Down Expand Up @@ -187,6 +189,8 @@ pub(crate) struct SstWriteRequest {
pub(crate) mem_threshold_index_create: Option<usize>,
/// The size of write buffer for index.
pub(crate) index_write_buffer_size: Option<usize>,
/// The options of the index for the region.
pub(crate) index_options: IndexOptions,
}

/// Creates a fs object store with atomic write dir.
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl WriteCache {
row_group_size: write_opts.row_group_size,
object_store: self.file_cache.local_store(),
intermediate_manager: self.intermediate_manager.clone(),
index_options: write_request.index_options,
}
.build();

Expand Down Expand Up @@ -235,6 +236,7 @@ mod tests {
use super::*;
use crate::cache::test_util::new_fs_store;
use crate::cache::CacheManager;
use crate::region::options::IndexOptions;
use crate::sst::file::FileId;
use crate::sst::location::{index_file_path, sst_file_path};
use crate::sst::parquet::reader::ParquetReaderBuilder;
Expand Down Expand Up @@ -279,6 +281,7 @@ mod tests {
mem_threshold_index_create: None,
index_write_buffer_size: None,
cache_manager: Default::default(),
index_options: IndexOptions::default(),
};

let upload_request = SstUploadRequest {
Expand Down Expand Up @@ -363,6 +366,7 @@ mod tests {
mem_threshold_index_create: None,
index_write_buffer_size: None,
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
};
let write_opts = WriteOptions {
row_group_size: 512,
Expand Down
6 changes: 6 additions & 0 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::read::{BoxedBatchReader, Source};
use crate::region::options::IndexOptions;
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
Expand Down Expand Up @@ -186,6 +187,7 @@ impl Picker for TwcsPicker {
start_time,
cache_manager,
storage: current_version.options.storage.clone(),
index_options: current_version.options.index_options.clone(),
};
Some(Box::new(task))
}
Expand Down Expand Up @@ -251,6 +253,8 @@ pub(crate) struct TwcsCompactionTask {
pub(crate) cache_manager: CacheManagerRef,
/// Target storage of the region.
pub(crate) storage: Option<String>,
/// Index options of the region.
pub(crate) index_options: IndexOptions,
}

impl Debug for TwcsCompactionTask {
Expand Down Expand Up @@ -327,6 +331,7 @@ impl TwcsCompactionTask {
let file_id = output.output_file_id;
let cache_manager = self.cache_manager.clone();
let storage = self.storage.clone();
let index_options = self.index_options.clone();
futs.push(async move {
let reader =
build_sst_reader(metadata.clone(), sst_layer.clone(), &output.inputs).await?;
Expand All @@ -341,6 +346,7 @@ impl TwcsCompactionTask {
create_inverted_index,
mem_threshold_index_create,
index_write_buffer_size,
index_options,
},
&write_opts,
)
Expand Down
6 changes: 6 additions & 0 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::error::{
use crate::memtable::MemtableBuilderRef;
use crate::metrics::{FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL};
use crate::read::Source;
use crate::region::options::IndexOptions;
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::request::{
BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest,
Expand Down Expand Up @@ -203,6 +204,9 @@ pub(crate) struct RegionFlushTask {
pub(crate) engine_config: Arc<MitoConfig>,
pub(crate) row_group_size: Option<usize>,
pub(crate) cache_manager: CacheManagerRef,

/// Index options for the region.
pub(crate) index_options: IndexOptions,
}

impl RegionFlushTask {
Expand Down Expand Up @@ -338,6 +342,7 @@ impl RegionFlushTask {
create_inverted_index,
mem_threshold_index_create,
index_write_buffer_size,
index_options: self.index_options.clone(),
};
let Some(sst_info) = self
.access_layer
Expand Down Expand Up @@ -766,6 +771,7 @@ mod tests {
engine_config: Arc::new(MitoConfig::default()),
row_group_size: None,
cache_manager: Arc::new(CacheManager::default()),
index_options: IndexOptions::default(),
};
task.push_sender(OptionOutputTx::from(output_tx));
scheduler
Expand Down
8 changes: 8 additions & 0 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ impl ScanRegion {
self.access_layer.object_store().clone(),
file_cache,
self.version.metadata.as_ref(),
self.version
.options
.index_options
.inverted_index
.ignore_column_ids
.iter()
.copied()
.collect(),
)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build index applier"))
Expand Down
64 changes: 63 additions & 1 deletion src/mito2/src/region/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use std::collections::HashMap;
use std::time::Duration;

use common_wal::options::{WalOptions, WAL_OPTIONS_KEY};
use serde::Deserialize;
use serde::de::Error as _;
use serde::{Deserialize, Deserializer};
use serde_json::Value;
use serde_with::{serde_as, with_prefix, DisplayFromStr};
use snafu::ResultExt;
use store_api::storage::ColumnId;

use crate::error::{Error, JsonOptionsSnafu, Result};

Expand All @@ -40,6 +42,8 @@ pub struct RegionOptions {
pub storage: Option<String>,
/// Wal options.
pub wal_options: WalOptions,
/// Index options.
pub index_options: IndexOptions,
}

impl TryFrom<&HashMap<String, String>> for RegionOptions {
Expand All @@ -64,11 +68,14 @@ impl TryFrom<&HashMap<String, String>> for RegionOptions {
},
)?;

let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?;

Ok(RegionOptions {
ttl: options.ttl,
compaction,
storage: options.storage,
wal_options,
index_options,
})
}
}
Expand Down Expand Up @@ -152,6 +159,40 @@ impl Default for RegionOptionsWithoutEnum {
}
}

with_prefix!(prefix_inverted_index "index.inverted_index.");
waynexia marked this conversation as resolved.
Show resolved Hide resolved

/// Options for index.
#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize)]
#[serde(default)]
pub struct IndexOptions {
/// Options for the inverted index.
#[serde(flatten, with = "prefix_inverted_index")]
pub inverted_index: InvertedIndexOptions,
}

/// Options for the inverted index.
#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize)]
#[serde(default)]
pub struct InvertedIndexOptions {
/// The column ids that should be ignored when building the inverted index.
/// The column ids are separated by commas. For example, "1,2,3".
evenyag marked this conversation as resolved.
Show resolved Hide resolved
#[serde(deserialize_with = "deserialize_ignore_column_ids")]
pub ignore_column_ids: Vec<ColumnId>,
}

fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result<Vec<ColumnId>, D::Error>
where
D: Deserializer<'de>,
{
let s: String = Deserialize::deserialize(deserializer)?;
let mut column_ids = Vec::new();
for item in s.split(',') {
let column_id = item.parse().map_err(D::Error::custom)?;
column_ids.push(column_id);
}
Ok(column_ids)
}

/// Converts the `options` map to a json object.
///
/// Converts all key-values to lowercase and replaces "null" strings by `null` json values.
Expand Down Expand Up @@ -257,6 +298,21 @@ mod tests {
expect == got
}

#[test]
fn test_with_index() {
let map = make_map(&[("index.inverted_index.ignore_column_ids", "1,2,3")]);
let options = RegionOptions::try_from(&map).unwrap();
let expect = RegionOptions {
index_options: IndexOptions {
inverted_index: InvertedIndexOptions {
ignore_column_ids: vec![1, 2, 3],
},
},
..Default::default()
};
assert_eq!(expect, options);
}

// No need to add compatible tests for RegionOptions since the above tests already check for compatibility.
#[test]
fn test_with_any_wal_options() {
Expand All @@ -281,6 +337,7 @@ mod tests {
("compaction.twcs.time_window", "2h"),
("compaction.type", "twcs"),
("storage", "S3"),
("index.inverted_index.ignore_column_ids", "1,2,3"),
(
WAL_OPTIONS_KEY,
&serde_json::to_string(&wal_options).unwrap(),
Expand All @@ -296,6 +353,11 @@ mod tests {
}),
storage: Some("s3".to_string()),
wal_options,
index_options: IndexOptions {
inverted_index: InvertedIndexOptions {
ignore_column_ids: vec![1, 2, 3],
},
},
};
assert_eq!(expect, options);
}
Expand Down
16 changes: 15 additions & 1 deletion src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;

use crate::read::Batch;
use crate::region::options::IndexOptions;
use crate::sst::file::FileId;
use crate::sst::index::intermediate::IntermediateManager;

Expand Down Expand Up @@ -132,6 +133,7 @@ pub(crate) struct IndexerBuilder<'a> {
pub(crate) segment_row_count: usize,
pub(crate) object_store: ObjectStore,
pub(crate) intermediate_manager: IntermediateManager,
pub(crate) index_options: IndexOptions,
}

impl<'a> IndexerBuilder<'a> {
Expand Down Expand Up @@ -184,7 +186,15 @@ impl<'a> IndexerBuilder<'a> {
self.mem_threshold_index_create,
segment_row_count,
)
.with_buffer_size(self.write_buffer_size);
.with_buffer_size(self.write_buffer_size)
.with_ignore_column_ids(
self.index_options
.inverted_index
.ignore_column_ids
.iter()
.map(|i| i.to_string())
.collect(),
);

Indexer {
file_id: self.file_id,
Expand Down Expand Up @@ -281,6 +291,7 @@ mod tests {
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
index_options: IndexOptions::default(),
}
.build();

Expand All @@ -301,6 +312,7 @@ mod tests {
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
index_options: IndexOptions::default(),
}
.build();

Expand All @@ -321,6 +333,7 @@ mod tests {
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
index_options: IndexOptions::default(),
}
.build();

Expand All @@ -341,6 +354,7 @@ mod tests {
row_group_size: 0,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
index_options: IndexOptions::default(),
}
.build();

Expand Down