Skip to content

Commit

Permalink
feat(mito): enable inverted index (#3158)
Browse files Browse the repository at this point in the history
* feat(mito): enable inverted index

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix typos

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix typos

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* accidentally resolved the incorrect filtering issue within the Metric Engine

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* Update src/mito2/src/access_layer.rs

* Update src/mito2/src/test_util/scheduler_util.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix: format -> join_dir

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: move intermediate_manager from arg of write_and_upload_sst to field of WriteCache

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: add IndexerBuidler

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix clippy

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
  • Loading branch information
zhongzc and evenyag committed Jan 15, 2024
1 parent 816d948 commit 6f07d69
Show file tree
Hide file tree
Showing 34 changed files with 916 additions and 235 deletions.
19 changes: 19 additions & 0 deletions config/datanode.example.toml
Expand Up @@ -116,6 +116,25 @@ parallel_scan_channel_size = 32
# Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

[region_engine.mito.inverted_index]
# Whether to create the index on flush.
# - "auto": automatically
# - "disable": never
create_on_flush = "auto"
# Whether to create the index on compaction.
# - "auto": automatically
# - "disable": never
create_on_compaction = "auto"
# Whether to apply the index on query
# - "auto": automatically
# - "disable": never
apply_on_query = "auto"
# Memory threshold for performing an external sort during index creation.
# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64MB"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""

# Log options, see `standalone.example.toml`
# [logging]
# dir = "/tmp/greptimedb/logs"
Expand Down
19 changes: 19 additions & 0 deletions config/standalone.example.toml
Expand Up @@ -216,6 +216,25 @@ parallel_scan_channel_size = 32
# Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

[region_engine.mito.inverted_index]
# Whether to create the index on flush.
# - "auto": automatically
# - "disable": never
create_on_flush = "auto"
# Whether to create the index on compaction.
# - "auto": automatically
# - "disable": never
create_on_compaction = "auto"
# Whether to apply the index on query
# - "auto": automatically
# - "disable": never
apply_on_query = "auto"
# Memory threshold for performing an external sort during index creation.
# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64M"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""

# Log options
# [logging]
# Specify logs directory.
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/config.rs
Expand Up @@ -284,6 +284,7 @@ impl DatanodeOptions {
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum RegionEngineConfig {
#[serde(rename = "mito")]
Expand Down
16 changes: 5 additions & 11 deletions src/datanode/src/datanode.rs
Expand Up @@ -41,7 +41,7 @@ use metric_engine::engine::MetricEngine;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::{join_dir, normalize_dir};
use object_store::util::normalize_dir;
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::server::{start_server, ServerHandlers};
Expand Down Expand Up @@ -374,27 +374,21 @@ impl DatanodeBuilder {
async fn build_mito_engine(
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
mut config: MitoConfig,
config: MitoConfig,
) -> Result<MitoEngine> {
// Sets write cache path if it is empty.
if config.experimental_write_cache_path.is_empty() {
config.experimental_write_cache_path = join_dir(&opts.storage.data_home, "write_cache");
info!(
"Sets write cache path to {}",
config.experimental_write_cache_path
);
}

let mito_engine = match &opts.wal {
WalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
.await?,
object_store_manager,
)
.await
.context(BuildMitoEngineSnafu)?,

WalConfig::Kafka(kafka_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config).await?,
object_store_manager,
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/inverted_index/create.rs
Expand Up @@ -23,7 +23,7 @@ use crate::inverted_index::BytesRef;

/// `InvertedIndexCreator` provides functionality to construct an inverted index
#[async_trait]
pub trait InvertedIndexCreator {
pub trait InvertedIndexCreator: Send {
/// Adds a value to the named index. A `None` value represents an absence of data (null)
///
/// - `index_name`: Identifier for the index being built
Expand Down
2 changes: 0 additions & 2 deletions src/index/src/lib.rs
Expand Up @@ -13,7 +13,5 @@
// limitations under the License.

#![feature(iter_partition_in_place)]
// TODO(zhongzc): remove once further code is added
#![allow(dead_code)]

pub mod inverted_index;
50 changes: 37 additions & 13 deletions src/mito2/src/access_layer.rs
Expand Up @@ -25,6 +25,8 @@ use crate::cache::CacheManagerRef;
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::IndexerBuilder;
use crate::sst::location;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
Expand All @@ -37,6 +39,8 @@ pub struct AccessLayer {
region_dir: String,
/// Target object store.
object_store: ObjectStore,
/// Intermediate manager for inverted index.
intermediate_manager: IntermediateManager,
}

impl std::fmt::Debug for AccessLayer {
Expand All @@ -49,10 +53,15 @@ impl std::fmt::Debug for AccessLayer {

impl AccessLayer {
/// Returns a new [AccessLayer] for specific `region_dir`.
pub fn new(region_dir: impl Into<String>, object_store: ObjectStore) -> AccessLayer {
pub fn new(
region_dir: impl Into<String>,
object_store: ObjectStore,
intermediate_manager: IntermediateManager,
) -> AccessLayer {
AccessLayer {
region_dir: region_dir.into(),
object_store,
intermediate_manager,
}
}

Expand Down Expand Up @@ -105,16 +114,15 @@ impl AccessLayer {
let file_path = location::sst_file_path(&self.region_dir, request.file_id);
let index_file_path = location::index_file_path(&self.region_dir, request.file_id);
let region_id = request.metadata.region_id;
let file_id = request.file_id;
let cache_manager = request.cache_manager.clone();

let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() {
let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
// Write to the write cache.
write_cache
.write_and_upload_sst(
request,
SstUploadRequest {
file_id: request.file_id,
metadata: request.metadata,
source: request.source,
storage: request.storage,
upload_path: file_path,
index_upload_path: index_file_path,
remote_store: self.object_store.clone(),
Expand All @@ -124,19 +132,30 @@ impl AccessLayer {
.await?
} else {
// Write cache is disabled.
let mut writer =
ParquetWriter::new(file_path, request.metadata, self.object_store.clone());
let indexer = IndexerBuilder {
create_inverted_index: request.create_inverted_index,
mem_threshold_index_create: request.mem_threshold_index_create,
file_id,
file_path: index_file_path,
metadata: &request.metadata,
row_group_size: write_opts.row_group_size,
object_store: self.object_store.clone(),
intermediate_manager: self.intermediate_manager.clone(),
}
.build();
let mut writer = ParquetWriter::new(
file_path,
request.metadata,
self.object_store.clone(),
indexer,
);
writer.write_all(request.source, write_opts).await?
};

// Put parquet metadata to cache manager.
if let Some(sst_info) = &sst_info {
if let Some(parquet_metadata) = &sst_info.file_metadata {
request.cache_manager.put_parquet_meta_data(
region_id,
request.file_id,
parquet_metadata.clone(),
)
cache_manager.put_parquet_meta_data(region_id, file_id, parquet_metadata.clone())
}
}

Expand All @@ -150,7 +169,12 @@ pub(crate) struct SstWriteRequest {
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub(crate) storage: Option<String>,
/// Whether to create inverted index.
pub(crate) create_inverted_index: bool,
/// The threshold of memory size to create inverted index.
pub(crate) mem_threshold_index_create: Option<usize>,
}

/// Creates a fs object store with atomic write dir.
Expand Down

0 comments on commit 6f07d69

Please sign in to comment.