Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): enable inverted index #3158

Merged
merged 11 commits into from Jan 15, 2024
19 changes: 19 additions & 0 deletions config/datanode.example.toml
Expand Up @@ -116,6 +116,25 @@ parallel_scan_channel_size = 32
# Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

[region_engine.mito.inverted_index]
# Whether to create the index on flush.
# - "auto": automatically
fengjiachun marked this conversation as resolved.
Show resolved Hide resolved
# - "disable": never
create_on_flush = "auto"
# Whether to create the index on compaction.
# - "auto": automatically
# - "disable": never
create_on_compaction = "auto"
# Whether to apply the index on query
# - "auto": automatically
# - "disable": never
apply_on_query = "auto"
# Memory threshold for performing an external sort during index creation.
# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64MB"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""

# Log options, see `standalone.example.toml`
# [logging]
# dir = "/tmp/greptimedb/logs"
Expand Down
19 changes: 19 additions & 0 deletions config/standalone.example.toml
Expand Up @@ -216,6 +216,25 @@ parallel_scan_channel_size = 32
# Whether to allow stale WAL entries read during replay.
allow_stale_entries = false

[region_engine.mito.inverted_index]
# Whether to create the index on flush.
# - "auto": automatically
# - "disable": never
create_on_flush = "auto"
# Whether to create the index on compaction.
# - "auto": automatically
# - "disable": never
create_on_compaction = "auto"
# Whether to apply the index on query
# - "auto": automatically
# - "disable": never
apply_on_query = "auto"
# Memory threshold for performing an external sort during index creation.
# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64M"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""

# Log options
# [logging]
# Specify logs directory.
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/config.rs
Expand Up @@ -284,6 +284,7 @@ impl DatanodeOptions {
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum RegionEngineConfig {
#[serde(rename = "mito")]
Expand Down
16 changes: 5 additions & 11 deletions src/datanode/src/datanode.rs
Expand Up @@ -42,7 +42,7 @@ use metric_engine::engine::MetricEngine;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::{join_dir, normalize_dir};
use object_store::util::normalize_dir;
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::grpc::builder::GrpcServerBuilder;
Expand Down Expand Up @@ -457,27 +457,21 @@ impl DatanodeBuilder {
async fn build_mito_engine(
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
mut config: MitoConfig,
config: MitoConfig,
) -> Result<MitoEngine> {
// Sets write cache path if it is empty.
if config.experimental_write_cache_path.is_empty() {
config.experimental_write_cache_path = join_dir(&opts.storage.data_home, "write_cache");
info!(
"Sets write cache path to {}",
config.experimental_write_cache_path
);
}

let mito_engine = match &opts.wal {
WalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
.await?,
object_store_manager,
)
.await
.context(BuildMitoEngineSnafu)?,

WalConfig::Kafka(kafka_config) => MitoEngine::new(
&opts.storage.data_home,
config,
Self::build_kafka_log_store(kafka_config).await?,
object_store_manager,
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/inverted_index/create.rs
Expand Up @@ -23,7 +23,7 @@ use crate::inverted_index::BytesRef;

/// `InvertedIndexCreator` provides functionality to construct an inverted index
#[async_trait]
pub trait InvertedIndexCreator {
pub trait InvertedIndexCreator: Send {
/// Adds a value to the named index. A `None` value represents an absence of data (null)
///
/// - `index_name`: Identifier for the index being built
Expand Down
2 changes: 0 additions & 2 deletions src/index/src/lib.rs
Expand Up @@ -13,7 +13,5 @@
// limitations under the License.

#![feature(iter_partition_in_place)]
// TODO(zhongzc): remove once further code is added
#![allow(dead_code)]

pub mod inverted_index;
50 changes: 37 additions & 13 deletions src/mito2/src/access_layer.rs
Expand Up @@ -25,6 +25,8 @@ use crate::cache::CacheManagerRef;
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::IndexerBuilder;
use crate::sst::location;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
Expand All @@ -37,6 +39,8 @@ pub struct AccessLayer {
region_dir: String,
/// Target object store.
object_store: ObjectStore,
/// Intermediate manager for inverted index.
intermediate_manager: IntermediateManager,
}

impl std::fmt::Debug for AccessLayer {
Expand All @@ -49,10 +53,15 @@ impl std::fmt::Debug for AccessLayer {

impl AccessLayer {
/// Returns a new [AccessLayer] for specific `region_dir`.
pub fn new(region_dir: impl Into<String>, object_store: ObjectStore) -> AccessLayer {
pub fn new(
region_dir: impl Into<String>,
object_store: ObjectStore,
intermediate_manager: IntermediateManager,
) -> AccessLayer {
AccessLayer {
region_dir: region_dir.into(),
object_store,
intermediate_manager,
}
}

Expand Down Expand Up @@ -105,16 +114,15 @@ impl AccessLayer {
let file_path = location::sst_file_path(&self.region_dir, request.file_id);
let index_file_path = location::index_file_path(&self.region_dir, request.file_id);
let region_id = request.metadata.region_id;
let file_id = request.file_id;
let cache_manager = request.cache_manager.clone();

let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() {
let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
// Write to the write cache.
write_cache
.write_and_upload_sst(
request,
SstUploadRequest {
file_id: request.file_id,
metadata: request.metadata,
source: request.source,
storage: request.storage,
upload_path: file_path,
index_upload_path: index_file_path,
remote_store: self.object_store.clone(),
Expand All @@ -124,19 +132,30 @@ impl AccessLayer {
.await?
} else {
// Write cache is disabled.
let mut writer =
ParquetWriter::new(file_path, request.metadata, self.object_store.clone());
let indexer = IndexerBuilder {
create_inverted_index: request.create_inverted_index,
mem_threshold_index_create: request.mem_threshold_index_create,
file_id,
file_path: index_file_path,
metadata: &request.metadata,
row_group_size: write_opts.row_group_size,
object_store: self.object_store.clone(),
intermediate_manager: self.intermediate_manager.clone(),
}
.build();
let mut writer = ParquetWriter::new(
file_path,
request.metadata,
self.object_store.clone(),
indexer,
);
writer.write_all(request.source, write_opts).await?
};

// Put parquet metadata to cache manager.
if let Some(sst_info) = &sst_info {
if let Some(parquet_metadata) = &sst_info.file_metadata {
request.cache_manager.put_parquet_meta_data(
region_id,
request.file_id,
parquet_metadata.clone(),
)
cache_manager.put_parquet_meta_data(region_id, file_id, parquet_metadata.clone())
}
}

Expand All @@ -150,7 +169,12 @@ pub(crate) struct SstWriteRequest {
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub(crate) storage: Option<String>,
/// Whether to create inverted index.
pub(crate) create_inverted_index: bool,
/// The threshold of memory size to create inverted index.
pub(crate) mem_threshold_index_create: Option<usize>,
}

/// Creates a fs object store with atomic write dir.
Expand Down