From 6f07d6915515a2b7634dc2b2210c9037a1324114 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 15 Jan 2024 17:08:07 +0800 Subject: [PATCH] feat(mito): enable inverted index (#3158) * feat(mito): enable inverted index Signed-off-by: Zhenchi * fix typos Signed-off-by: Zhenchi * fix typos Signed-off-by: Zhenchi * accidentally resolved the incorrect filtering issue within the Metric Engine Signed-off-by: Zhenchi * fix test Signed-off-by: Zhenchi * Update src/mito2/src/access_layer.rs * Update src/mito2/src/test_util/scheduler_util.rs Co-authored-by: Yingwen * fix: format -> join_dir Signed-off-by: Zhenchi * refactor: move intermediate_manager from arg of write_and_upload_sst to field of WriteCache Signed-off-by: Zhenchi * refactor: add IndexerBuidler Signed-off-by: Zhenchi * fix clippy Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi Co-authored-by: Yingwen --- config/datanode.example.toml | 19 ++ config/standalone.example.toml | 19 ++ src/datanode/src/config.rs | 1 + src/datanode/src/datanode.rs | 16 +- src/index/src/inverted_index/create.rs | 2 +- src/index/src/lib.rs | 2 - src/mito2/src/access_layer.rs | 50 +++- src/mito2/src/cache/write_cache.rs | 83 +++-- src/mito2/src/compaction.rs | 10 +- src/mito2/src/compaction/twcs.rs | 23 +- src/mito2/src/config.rs | 89 +++++- src/mito2/src/engine.rs | 9 +- src/mito2/src/engine/basic_test.rs | 4 +- src/mito2/src/flush.rs | 10 +- src/mito2/src/read/scan_region.rs | 13 + src/mito2/src/region/opener.rs | 16 +- src/mito2/src/sst/file_purger.rs | 24 +- src/mito2/src/sst/index.rs | 283 +++++++++++++++++- src/mito2/src/sst/index/creator.rs | 33 +- .../src/sst/index/creator/temp_provider.rs | 40 ++- src/mito2/src/sst/index/intermediate.rs | 153 ++++++++++ src/mito2/src/sst/location.rs | 70 ----- src/mito2/src/sst/parquet.rs | 22 +- src/mito2/src/sst/parquet/writer.rs | 49 ++- src/mito2/src/test_util.rs | 44 ++- src/mito2/src/test_util/scheduler_util.rs | 15 +- src/mito2/src/worker.rs | 28 +- src/mito2/src/worker/handle_catchup.rs | 1 + src/mito2/src/worker/handle_create.rs | 1 + src/mito2/src/worker/handle_open.rs | 1 + tests-integration/tests/http.rs | 7 + .../common/insert/logical_metric_table.result | 11 +- .../common/insert/logical_metric_table.sql | 1 - tests/runner/src/env.rs | 2 +- 34 files changed, 916 insertions(+), 235 deletions(-) create mode 100644 src/mito2/src/sst/index/intermediate.rs diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 6ed277cf185..62502cabe53 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -116,6 +116,25 @@ parallel_scan_channel_size = 32 # Whether to allow stale WAL entries read during replay. allow_stale_entries = false +[region_engine.mito.inverted_index] +# Whether to create the index on flush. +# - "auto": automatically +# - "disable": never +create_on_flush = "auto" +# Whether to create the index on compaction. +# - "auto": automatically +# - "disable": never +create_on_compaction = "auto" +# Whether to apply the index on query +# - "auto": automatically +# - "disable": never +apply_on_query = "auto" +# Memory threshold for performing an external sort during index creation. +# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. +mem_threshold_on_create = "64MB" +# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`). +intermediate_path = "" + # Log options, see `standalone.example.toml` # [logging] # dir = "/tmp/greptimedb/logs" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 565c94cdb01..a49ffa835e9 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -216,6 +216,25 @@ parallel_scan_channel_size = 32 # Whether to allow stale WAL entries read during replay. allow_stale_entries = false +[region_engine.mito.inverted_index] +# Whether to create the index on flush. +# - "auto": automatically +# - "disable": never +create_on_flush = "auto" +# Whether to create the index on compaction. +# - "auto": automatically +# - "disable": never +create_on_compaction = "auto" +# Whether to apply the index on query +# - "auto": automatically +# - "disable": never +apply_on_query = "auto" +# Memory threshold for performing an external sort during index creation. +# Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. +mem_threshold_on_create = "64M" +# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`). +intermediate_path = "" + # Log options # [logging] # Specify logs directory. diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 9f469f0f484..ac8d3a48b60 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -284,6 +284,7 @@ impl DatanodeOptions { } } +#[allow(clippy::large_enum_variant)] #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub enum RegionEngineConfig { #[serde(rename = "mito")] diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 2d40ad81245..64446013099 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -41,7 +41,7 @@ use metric_engine::engine::MetricEngine; use mito2::config::MitoConfig; use mito2::engine::MitoEngine; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; -use object_store::util::{join_dir, normalize_dir}; +use object_store::util::normalize_dir; use query::QueryEngineFactory; use servers::export_metrics::ExportMetricsTask; use servers::server::{start_server, ServerHandlers}; @@ -374,19 +374,11 @@ impl DatanodeBuilder { async fn build_mito_engine( opts: &DatanodeOptions, object_store_manager: ObjectStoreManagerRef, - mut config: MitoConfig, + config: MitoConfig, ) -> Result { - // Sets write cache path if it is empty. - if config.experimental_write_cache_path.is_empty() { - config.experimental_write_cache_path = join_dir(&opts.storage.data_home, "write_cache"); - info!( - "Sets write cache path to {}", - config.experimental_write_cache_path - ); - } - let mito_engine = match &opts.wal { WalConfig::RaftEngine(raft_engine_config) => MitoEngine::new( + &opts.storage.data_home, config, Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config) .await?, @@ -394,7 +386,9 @@ impl DatanodeBuilder { ) .await .context(BuildMitoEngineSnafu)?, + WalConfig::Kafka(kafka_config) => MitoEngine::new( + &opts.storage.data_home, config, Self::build_kafka_log_store(kafka_config).await?, object_store_manager, diff --git a/src/index/src/inverted_index/create.rs b/src/index/src/inverted_index/create.rs index e17f987b5c6..15674d696cd 100644 --- a/src/index/src/inverted_index/create.rs +++ b/src/index/src/inverted_index/create.rs @@ -23,7 +23,7 @@ use crate::inverted_index::BytesRef; /// `InvertedIndexCreator` provides functionality to construct an inverted index #[async_trait] -pub trait InvertedIndexCreator { +pub trait InvertedIndexCreator: Send { /// Adds a value to the named index. A `None` value represents an absence of data (null) /// /// - `index_name`: Identifier for the index being built diff --git a/src/index/src/lib.rs b/src/index/src/lib.rs index 296efb315d0..e7f448c398e 100644 --- a/src/index/src/lib.rs +++ b/src/index/src/lib.rs @@ -13,7 +13,5 @@ // limitations under the License. #![feature(iter_partition_in_place)] -// TODO(zhongzc): remove once further code is added -#![allow(dead_code)] pub mod inverted_index; diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index ecbeea2f0b0..dddf5e7b3f7 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -25,6 +25,8 @@ use crate::cache::CacheManagerRef; use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result}; use crate::read::Source; use crate::sst::file::{FileHandle, FileId, FileMeta}; +use crate::sst::index::intermediate::IntermediateManager; +use crate::sst::index::IndexerBuilder; use crate::sst::location; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; @@ -37,6 +39,8 @@ pub struct AccessLayer { region_dir: String, /// Target object store. object_store: ObjectStore, + /// Intermediate manager for inverted index. + intermediate_manager: IntermediateManager, } impl std::fmt::Debug for AccessLayer { @@ -49,10 +53,15 @@ impl std::fmt::Debug for AccessLayer { impl AccessLayer { /// Returns a new [AccessLayer] for specific `region_dir`. - pub fn new(region_dir: impl Into, object_store: ObjectStore) -> AccessLayer { + pub fn new( + region_dir: impl Into, + object_store: ObjectStore, + intermediate_manager: IntermediateManager, + ) -> AccessLayer { AccessLayer { region_dir: region_dir.into(), object_store, + intermediate_manager, } } @@ -105,16 +114,15 @@ impl AccessLayer { let file_path = location::sst_file_path(&self.region_dir, request.file_id); let index_file_path = location::index_file_path(&self.region_dir, request.file_id); let region_id = request.metadata.region_id; + let file_id = request.file_id; + let cache_manager = request.cache_manager.clone(); - let sst_info = if let Some(write_cache) = request.cache_manager.write_cache() { + let sst_info = if let Some(write_cache) = cache_manager.write_cache() { // Write to the write cache. write_cache .write_and_upload_sst( + request, SstUploadRequest { - file_id: request.file_id, - metadata: request.metadata, - source: request.source, - storage: request.storage, upload_path: file_path, index_upload_path: index_file_path, remote_store: self.object_store.clone(), @@ -124,19 +132,30 @@ impl AccessLayer { .await? } else { // Write cache is disabled. - let mut writer = - ParquetWriter::new(file_path, request.metadata, self.object_store.clone()); + let indexer = IndexerBuilder { + create_inverted_index: request.create_inverted_index, + mem_threshold_index_create: request.mem_threshold_index_create, + file_id, + file_path: index_file_path, + metadata: &request.metadata, + row_group_size: write_opts.row_group_size, + object_store: self.object_store.clone(), + intermediate_manager: self.intermediate_manager.clone(), + } + .build(); + let mut writer = ParquetWriter::new( + file_path, + request.metadata, + self.object_store.clone(), + indexer, + ); writer.write_all(request.source, write_opts).await? }; // Put parquet metadata to cache manager. if let Some(sst_info) = &sst_info { if let Some(parquet_metadata) = &sst_info.file_metadata { - request.cache_manager.put_parquet_meta_data( - region_id, - request.file_id, - parquet_metadata.clone(), - ) + cache_manager.put_parquet_meta_data(region_id, file_id, parquet_metadata.clone()) } } @@ -150,7 +169,12 @@ pub(crate) struct SstWriteRequest { pub(crate) metadata: RegionMetadataRef, pub(crate) source: Source, pub(crate) cache_manager: CacheManagerRef, + #[allow(dead_code)] pub(crate) storage: Option, + /// Whether to create inverted index. + pub(crate) create_inverted_index: bool, + /// The threshold of memory size to create inverted index. + pub(crate) mem_threshold_index_create: Option, } /// Creates a fs object store with atomic write dir. diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 5871853b99f..7a23cda4718 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -27,12 +27,14 @@ use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; -use crate::access_layer::new_fs_object_store; +use crate::access_layer::{new_fs_object_store, SstWriteRequest}; use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; use crate::error::{self, Result}; use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL}; use crate::read::Source; use crate::sst::file::FileId; +use crate::sst::index::intermediate::IntermediateManager; +use crate::sst::index::{Indexer, IndexerBuilder}; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::parquet::{SstInfo, WriteOptions}; use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; @@ -45,6 +47,8 @@ pub struct WriteCache { file_cache: FileCacheRef, /// Object store manager. object_store_manager: ObjectStoreManagerRef, + /// Intermediate manager for inverted index. + intermediate_manager: IntermediateManager, } pub type WriteCacheRef = Arc; @@ -56,6 +60,7 @@ impl WriteCache { local_store: ObjectStore, object_store_manager: ObjectStoreManagerRef, cache_capacity: ReadableSize, + intermediate_manager: IntermediateManager, ) -> Result { let file_cache = FileCache::new(local_store, cache_capacity); file_cache.recover().await?; @@ -63,6 +68,7 @@ impl WriteCache { Ok(Self { file_cache: Arc::new(file_cache), object_store_manager, + intermediate_manager, }) } @@ -71,11 +77,18 @@ impl WriteCache { cache_dir: &str, object_store_manager: ObjectStoreManagerRef, cache_capacity: ReadableSize, + intermediate_manager: IntermediateManager, ) -> Result { info!("Init write cache on {cache_dir}, capacity: {cache_capacity}"); let local_store = new_fs_object_store(cache_dir).await?; - Self::new(local_store, object_store_manager, cache_capacity).await + Self::new( + local_store, + object_store_manager, + cache_capacity, + intermediate_manager, + ) + .await } /// Returns the file cache of the write cache. @@ -84,27 +97,42 @@ impl WriteCache { } /// Writes SST to the cache and then uploads it to the remote object store. - pub async fn write_and_upload_sst( + pub(crate) async fn write_and_upload_sst( &self, - request: SstUploadRequest, + write_request: SstWriteRequest, + upload_request: SstUploadRequest, write_opts: &WriteOptions, ) -> Result> { let timer = FLUSH_ELAPSED .with_label_values(&["write_sst"]) .start_timer(); - let region_id = request.metadata.region_id; - let file_id = request.file_id; + let region_id = write_request.metadata.region_id; + let file_id = write_request.file_id; let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet); + let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); + + let indexer = IndexerBuilder { + create_inverted_index: write_request.create_inverted_index, + mem_threshold_index_create: write_request.mem_threshold_index_create, + file_id, + file_path: self.file_cache.cache_file_path(puffin_key), + metadata: &write_request.metadata, + row_group_size: write_opts.row_group_size, + object_store: self.file_cache.local_store(), + intermediate_manager: self.intermediate_manager.clone(), + } + .build(); // Write to FileCache. let mut writer = ParquetWriter::new( self.file_cache.cache_file_path(parquet_key), - request.metadata, + write_request.metadata, self.file_cache.local_store(), + indexer, ); - let sst_info = writer.write_all(request.source, write_opts).await?; + let sst_info = writer.write_all(write_request.source, write_opts).await?; timer.stop_and_record(); @@ -114,13 +142,13 @@ impl WriteCache { return Ok(None); }; - let parquet_path = &request.upload_path; - let remote_store = &request.remote_store; + let parquet_path = &upload_request.upload_path; + let remote_store = &upload_request.remote_store; self.upload(parquet_key, parquet_path, remote_store).await?; if sst_info.inverted_index_available { let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin); - let puffin_path = &request.index_upload_path; + let puffin_path = &upload_request.index_upload_path; self.upload(puffin_key, puffin_path, remote_store).await?; } @@ -193,10 +221,6 @@ impl WriteCache { /// Request to write and upload a SST. pub struct SstUploadRequest { - pub file_id: FileId, - pub metadata: RegionMetadataRef, - pub source: Source, - pub storage: Option, /// Path to upload the file. pub upload_path: String, /// Path to upload the index file. @@ -212,6 +236,7 @@ mod tests { use common_test_util::temp_dir::create_temp_dir; use object_store::manager::ObjectStoreManager; use object_store::services::Fs; + use object_store::util::join_dir; use object_store::ObjectStore; use store_api::storage::RegionId; @@ -230,10 +255,14 @@ mod tests { // TODO(QuenKar): maybe find a way to create some object server for testing, // and now just use local file system to mock. let mut env = TestEnv::new(); + let data_home = env.data_home().display().to_string(); let mock_store = env.init_object_store_manager(); let file_id = FileId::random(); let upload_path = sst_file_path("test", file_id); let index_upload_path = index_file_path("test", file_id); + let intm_mgr = IntermediateManager::init_fs(join_dir(&data_home, "intm")) + .await + .unwrap(); // Create WriteCache let local_dir = create_temp_dir(""); @@ -243,6 +272,7 @@ mod tests { local_store.clone(), object_store_manager, ReadableSize::mb(10), + intm_mgr, ) .await .unwrap(); @@ -256,13 +286,19 @@ mod tests { new_batch_by_range(&["b", "h"], 100, 200), ]); - let request = SstUploadRequest { + let write_request = SstWriteRequest { file_id, metadata, source, storage: None, + create_inverted_index: true, + mem_threshold_index_create: None, + cache_manager: Default::default(), + }; + + let request = SstUploadRequest { upload_path: upload_path.clone(), - index_upload_path, + index_upload_path: index_upload_path.clone(), remote_store: mock_store.clone(), }; @@ -273,7 +309,7 @@ mod tests { // Write to cache and upload sst to mock remote store let sst_info = write_cache - .write_and_upload_sst(request, &write_opts) + .write_and_upload_sst(write_request, request, &write_opts) .await .unwrap() .unwrap(); @@ -289,5 +325,16 @@ mod tests { .await .unwrap(); assert_eq!(remote_data, cache_data); + + // Check write cache contains the index key + let index_key = IndexKey::new(region_id, file_id, FileType::Puffin); + assert!(write_cache.file_cache.contains_key(&index_key)); + + let remote_index_data = mock_store.read(&index_upload_path).await.unwrap(); + let cache_index_data = local_store + .read(&write_cache.file_cache.cache_file_path(index_key)) + .await + .unwrap(); + assert_eq!(remote_index_data, cache_index_data); } } diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 0ddcec61d0f..000a6e2a88c 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -21,7 +21,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; -use common_base::readable_size::ReadableSize; use common_telemetry::{debug, error}; pub use picker::CompactionPickerRef; use snafu::ResultExt; @@ -44,6 +43,7 @@ use crate::sst::file_purger::FilePurgerRef; /// Region compaction request. pub struct CompactionRequest { + pub(crate) engine_config: Arc, pub(crate) current_version: VersionRef, pub(crate) access_layer: AccessLayerRef, /// Sender to send notification to the region worker. @@ -53,8 +53,6 @@ pub struct CompactionRequest { pub(crate) file_purger: FilePurgerRef, /// Start time of compaction task. pub(crate) start_time: Instant, - /// Buffering threshold while writing SST files. - pub(crate) sst_write_buffer_size: ReadableSize, pub(crate) cache_manager: CacheManagerRef, } @@ -331,13 +329,13 @@ impl CompactionStatus { let current_version = self.version_control.current().version; let start_time = Instant::now(); let mut req = CompactionRequest { + engine_config, current_version, access_layer: self.access_layer.clone(), request_sender: request_sender.clone(), waiters: Vec::new(), file_purger: self.file_purger.clone(), start_time, - sst_write_buffer_size: engine_config.sst_write_buffer_size, cache_manager, }; @@ -363,7 +361,7 @@ mod tests { #[tokio::test] async fn test_schedule_empty() { - let env = SchedulerEnv::new(); + let env = SchedulerEnv::new().await; let (tx, _rx) = mpsc::channel(4); let mut scheduler = env.mock_compaction_scheduler(tx); let mut builder = VersionControlBuilder::new(); @@ -432,7 +430,7 @@ mod tests { #[tokio::test] async fn test_schedule_on_finished() { let job_scheduler = Arc::new(VecScheduler::default()); - let env = SchedulerEnv::new().scheduler(job_scheduler.clone()); + let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone()); let (tx, _rx) = mpsc::channel(4); let mut scheduler = env.mock_compaction_scheduler(tx); let mut builder = VersionControlBuilder::new(); diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 3bc96dc2dda..b0adc897b6a 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -17,7 +17,6 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use std::time::{Duration, Instant}; -use common_base::readable_size::ReadableSize; use common_telemetry::{debug, error, info}; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; @@ -32,6 +31,7 @@ use crate::access_layer::{AccessLayerRef, SstWriteRequest}; use crate::cache::CacheManagerRef; use crate::compaction::picker::{CompactionTask, Picker}; use crate::compaction::CompactionRequest; +use crate::config::MitoConfig; use crate::error::{self, CompactRegionSnafu}; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED}; use crate::read::projection::ProjectionMapper; @@ -123,13 +123,13 @@ impl TwcsPicker { impl Picker for TwcsPicker { fn pick(&self, req: CompactionRequest) -> Option> { let CompactionRequest { + engine_config, current_version, access_layer, request_sender, waiters, file_purger, start_time, - sst_write_buffer_size, cache_manager, } = req; @@ -173,12 +173,12 @@ impl Picker for TwcsPicker { return None; } let task = TwcsCompactionTask { + engine_config, region_id, metadata: region_metadata, sst_layer: access_layer, outputs, expired_ssts, - sst_write_buffer_size, compaction_time_window: Some(time_window_size), request_sender, waiters, @@ -234,12 +234,12 @@ fn find_latest_window_in_seconds<'a>( } pub(crate) struct TwcsCompactionTask { + pub engine_config: Arc, pub region_id: RegionId, pub metadata: RegionMetadataRef, pub sst_layer: AccessLayerRef, pub outputs: Vec, pub expired_ssts: Vec, - pub sst_write_buffer_size: ReadableSize, pub compaction_time_window: Option, pub file_purger: FilePurgerRef, /// Request sender to notify the worker. @@ -301,9 +301,20 @@ impl TwcsCompactionTask { ); let write_opts = WriteOptions { - write_buffer_size: self.sst_write_buffer_size, + write_buffer_size: self.engine_config.sst_write_buffer_size, ..Default::default() }; + let create_inverted_index = self + .engine_config + .inverted_index + .create_on_compaction + .auto(); + let mem_threshold_index_create = self + .engine_config + .inverted_index + .mem_threshold_on_create + .map(|m| m.as_bytes() as _); + let metadata = self.metadata.clone(); let sst_layer = self.sst_layer.clone(); let region_id = self.region_id; @@ -321,6 +332,8 @@ impl TwcsCompactionTask { source: Source::Reader(reader), cache_manager, storage, + create_inverted_index, + mem_threshold_index_create, }, &write_opts, ) diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 0723c702ae7..dce77a1233f 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -18,10 +18,11 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_telemetry::warn; +use object_store::util::join_dir; use serde::{Deserialize, Serialize}; -use snafu::ensure; +use serde_with::{serde_as, NoneAsEmptyString}; -use crate::error::{InvalidConfigSnafu, Result}; +use crate::error::Result; /// Default max running background job. const DEFAULT_MAX_BG_JOB: usize = 4; @@ -72,7 +73,7 @@ pub struct MitoConfig { pub page_cache_size: ReadableSize, /// Whether to enable the experimental write cache. pub enable_experimental_write_cache: bool, - /// Path for write cache. + /// File system path for write cache, defaults to `{data_home}/write_cache`. pub experimental_write_cache_path: String, /// Capacity for write cache. pub experimental_write_cache_size: ReadableSize, @@ -89,6 +90,9 @@ pub struct MitoConfig { pub parallel_scan_channel_size: usize, /// Whether to allow stale entries read during replay. pub allow_stale_entries: bool, + + /// Inverted index configs. + pub inverted_index: InvertedIndexConfig, } impl Default for MitoConfig { @@ -113,6 +117,7 @@ impl Default for MitoConfig { scan_parallelism: divide_num_cpus(4), parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, allow_stale_entries: false, + inverted_index: InvertedIndexConfig::default(), } } } @@ -121,7 +126,7 @@ impl MitoConfig { /// Sanitize incorrect configurations. /// /// Returns an error if there is a configuration that unable to sanitize. - pub(crate) fn sanitize(&mut self) -> Result<()> { + pub(crate) fn sanitize(&mut self, data_home: &str) -> Result<()> { // Use default value if `num_workers` is 0. if self.num_workers == 0 { self.num_workers = divide_num_cpus(2); @@ -167,13 +172,75 @@ impl MitoConfig { ); } - if self.enable_experimental_write_cache { - ensure!( - !self.experimental_write_cache_path.is_empty(), - InvalidConfigSnafu { - reason: "experimental_write_cache_path should not be empty", - } - ); + // Sets write cache path if it is empty. + if self.experimental_write_cache_path.is_empty() { + self.experimental_write_cache_path = join_dir(data_home, "write_cache"); + } + + self.inverted_index.sanitize(data_home)?; + + Ok(()) + } +} + +/// Operational mode for certain actions. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum Mode { + /// The action is performed automatically based on internal criteria. + #[default] + Auto, + /// The action is explicitly disabled. + Disable, +} + +impl Mode { + /// Whether the action is disabled. + pub fn disabled(&self) -> bool { + matches!(self, Mode::Disable) + } + + /// Whether the action is automatic. + pub fn auto(&self) -> bool { + matches!(self, Mode::Auto) + } +} + +/// Configuration options for the inverted index. +#[serde_as] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(default)] +pub struct InvertedIndexConfig { + /// Whether to create the index on flush: automatically or never. + pub create_on_flush: Mode, + /// Whether to create the index on compaction: automatically or never. + pub create_on_compaction: Mode, + /// Whether to apply the index on query: automatically or never. + pub apply_on_query: Mode, + /// Memory threshold for performing an external sort during index creation. + /// `None` means all sorting will happen in memory. + #[serde_as(as = "NoneAsEmptyString")] + pub mem_threshold_on_create: Option, + /// File system path to store intermediate files for external sort, defaults to `{data_home}/index_intermediate`. + pub intermediate_path: String, +} + +impl Default for InvertedIndexConfig { + fn default() -> Self { + Self { + create_on_flush: Mode::Auto, + create_on_compaction: Mode::Auto, + apply_on_query: Mode::Auto, + mem_threshold_on_create: Some(ReadableSize::mb(64)), + intermediate_path: String::new(), + } + } +} + +impl InvertedIndexConfig { + pub fn sanitize(&mut self, data_home: &str) -> Result<()> { + if self.intermediate_path.is_empty() { + self.intermediate_path = join_dir(data_home, "index_intermediate"); } Ok(()) diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 577d5131d0c..02c7533c75c 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -78,11 +78,12 @@ pub struct MitoEngine { impl MitoEngine { /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`. pub async fn new( + data_home: &str, mut config: MitoConfig, log_store: Arc, object_store_manager: ObjectStoreManagerRef, ) -> Result { - config.sanitize()?; + config.sanitize(data_home)?; Ok(MitoEngine { inner: Arc::new(EngineInner::new(config, log_store, object_store_manager).await?), @@ -192,7 +193,8 @@ impl EngineInner { request, Some(cache_manager), ) - .with_parallelism(scan_parallelism); + .with_parallelism(scan_parallelism) + .ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()); scan_region.scanner() } @@ -315,13 +317,14 @@ impl RegionEngine for MitoEngine { impl MitoEngine { /// Returns a new [MitoEngine] for tests. pub async fn new_for_test( + data_home: &str, mut config: MitoConfig, log_store: Arc, object_store_manager: ObjectStoreManagerRef, write_buffer_manager: Option, listener: Option, ) -> Result { - config.sanitize()?; + config.sanitize(data_home)?; let config = Arc::new(config); Ok(MitoEngine { diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 2dff27290ca..47aed723cf3 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -550,8 +550,8 @@ async fn test_region_usage() { let region_stat = region.region_usage().await; assert_eq!(region_stat.wal_usage, 0); - assert_eq!(region_stat.sst_usage, 2742); + assert_eq!(region_stat.sst_usage, 3006); // region total usage - assert_eq!(region_stat.disk_usage(), 3791); + assert_eq!(region_stat.disk_usage(), 4072); } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index f79d811ba47..6ee585e40c2 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -315,6 +315,12 @@ impl RegionFlushTask { let file_id = FileId::random(); let iter = mem.iter(None, None); let source = Source::Iter(iter); + let create_inverted_index = self.engine_config.inverted_index.create_on_flush.auto(); + let mem_threshold_index_create = self + .engine_config + .inverted_index + .mem_threshold_on_create + .map(|m| m.as_bytes() as _); // Flush to level 0. let write_request = SstWriteRequest { @@ -323,6 +329,8 @@ impl RegionFlushTask { source, cache_manager: self.cache_manager.clone(), storage: version.options.storage.clone(), + create_inverted_index, + mem_threshold_index_create, }; let Some(sst_info) = self .access_layer @@ -732,7 +740,7 @@ mod tests { #[tokio::test] async fn test_schedule_empty() { - let env = SchedulerEnv::new(); + let env = SchedulerEnv::new().await; let (tx, _rx) = mpsc::channel(4); let mut scheduler = env.mock_flush_scheduler(); let builder = VersionControlBuilder::new(); diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index c2b0939d9b2..d5f7dbe1002 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -122,6 +122,8 @@ pub(crate) struct ScanRegion { cache_manager: Option, /// Parallelism to scan. parallelism: ScanParallism, + /// Whether to ignore inverted index. + ignore_inverted_index: bool, } impl ScanRegion { @@ -138,6 +140,7 @@ impl ScanRegion { request, cache_manager, parallelism: ScanParallism::default(), + ignore_inverted_index: false, } } @@ -148,6 +151,12 @@ impl ScanRegion { self } + #[must_use] + pub(crate) fn ignore_inverted_index(mut self, ignore: bool) -> Self { + self.ignore_inverted_index = ignore; + self + } + /// Returns a [Scanner] to scan the region. pub(crate) fn scanner(self) -> Result { self.seq_scan().map(Scanner::Seq) @@ -234,6 +243,10 @@ impl ScanRegion { /// Use the latest schema to build the index applier. fn build_index_applier(&self) -> Option { + if self.ignore_inverted_index { + return None; + } + let file_cache = || -> Option { let cache_manager = self.cache_manager.as_ref()?; let write_cache = cache_manager.write_cache()?; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 80116ea9fdd..9e53f52b8bb 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -45,6 +45,7 @@ use crate::region_write_ctx::RegionWriteCtx; use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; use crate::sst::file_purger::LocalFilePurger; +use crate::sst::index::intermediate::IntermediateManager; use crate::wal::{EntryId, Wal}; /// Builder to create a new [MitoRegion] or open an existing one. @@ -58,6 +59,7 @@ pub(crate) struct RegionOpener { options: Option, cache_manager: Option, skip_wal_replay: bool, + intermediate_manager: IntermediateManager, } impl RegionOpener { @@ -68,6 +70,7 @@ impl RegionOpener { memtable_builder: MemtableBuilderRef, object_store_manager: ObjectStoreManagerRef, scheduler: SchedulerRef, + intermediate_manager: IntermediateManager, ) -> RegionOpener { RegionOpener { region_id, @@ -79,6 +82,7 @@ impl RegionOpener { options: None, cache_manager: None, skip_wal_replay: false, + intermediate_manager, } } @@ -170,7 +174,11 @@ impl RegionOpener { .options(options) .build(); let version_control = Arc::new(VersionControl::new(version)); - let access_layer = Arc::new(AccessLayer::new(self.region_dir, object_store)); + let access_layer = Arc::new(AccessLayer::new( + self.region_dir, + object_store, + self.intermediate_manager, + )); Ok(MitoRegion { region_id, @@ -240,7 +248,11 @@ impl RegionOpener { let region_id = self.region_id; let object_store = self.object_store(®ion_options.storage)?.clone(); - let access_layer = Arc::new(AccessLayer::new(self.region_dir.clone(), object_store)); + let access_layer = Arc::new(AccessLayer::new( + self.region_dir.clone(), + object_store, + self.intermediate_manager.clone(), + )); let file_purger = Arc::new(LocalFilePurger::new( self.scheduler.clone(), access_layer.clone(), diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index cc913c1a7e2..623e5695aad 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -97,6 +97,7 @@ impl FilePurger for LocalFilePurger { mod tests { use common_test_util::temp_dir::create_temp_dir; use object_store::services::Fs; + use object_store::util::join_dir; use object_store::ObjectStore; use smallvec::SmallVec; @@ -104,6 +105,7 @@ mod tests { use crate::access_layer::AccessLayer; use crate::schedule::scheduler::{LocalScheduler, Scheduler}; use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType}; + use crate::sst::index::intermediate::IntermediateManager; use crate::sst::location; #[tokio::test] @@ -111,17 +113,21 @@ mod tests { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("file-purge"); + let dir_path = dir.path().display().to_string(); let mut builder = Fs::default(); - builder.root(dir.path().to_str().unwrap()); - let object_store = ObjectStore::new(builder).unwrap().finish(); + builder.root(&dir_path); let sst_file_id = FileId::random(); let sst_dir = "table1"; let path = location::sst_file_path(sst_dir, sst_file_id); + let intm_mgr = IntermediateManager::init_fs(join_dir(&dir_path, "intm")) + .await + .unwrap(); + let object_store = ObjectStore::new(builder).unwrap().finish(); object_store.write(&path, vec![0; 4096]).await.unwrap(); let scheduler = Arc::new(LocalScheduler::new(3)); - let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone())); + let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone(), intm_mgr)); let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None)); @@ -152,13 +158,17 @@ mod tests { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("file-purge"); + let dir_path = dir.path().display().to_string(); let mut builder = Fs::default(); - builder.root(dir.path().to_str().unwrap()); - let object_store = ObjectStore::new(builder).unwrap().finish(); + builder.root(&dir_path); let sst_file_id = FileId::random(); let sst_dir = "table1"; - + let intm_mgr = IntermediateManager::init_fs(join_dir(&dir_path, "intm")) + .await + .unwrap(); let path = location::sst_file_path(sst_dir, sst_file_id); + + let object_store = ObjectStore::new(builder).unwrap().finish(); object_store.write(&path, vec![0; 4096]).await.unwrap(); let index_path = location::index_file_path(sst_dir, sst_file_id); @@ -168,7 +178,7 @@ mod tests { .unwrap(); let scheduler = Arc::new(LocalScheduler::new(3)); - let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone())); + let layer = Arc::new(AccessLayer::new(sst_dir, object_store.clone(), intm_mgr)); let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None)); diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 7e6cefa9929..3e18cabe62a 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -12,13 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![allow(dead_code)] - -pub mod applier; +pub(crate) mod applier; mod codec; -pub mod creator; +pub(crate) mod creator; +pub(crate) mod intermediate; mod store; +use std::num::NonZeroUsize; + +use common_telemetry::{debug, warn}; +use creator::SstIndexCreator; +use object_store::ObjectStore; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::RegionId; + +use crate::read::Batch; +use crate::sst::file::FileId; +use crate::sst::index::intermediate::IntermediateManager; + const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; // TODO(zhongzc): how to determine this value? @@ -27,3 +38,267 @@ const MIN_MEMORY_USAGE_THRESHOLD: usize = 8192; /// The buffer size for the pipe used to send index data to the puffin blob. const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192; + +/// The index creator that hides the error handling details. +#[derive(Default)] +pub struct Indexer { + file_id: FileId, + region_id: RegionId, + inner: Option, +} + +impl Indexer { + /// Update the index with the given batch. + pub async fn update(&mut self, batch: &Batch) { + if let Some(creator) = self.inner.as_mut() { + if let Err(err) = creator.update(batch).await { + warn!( + err; "Failed to update index, skip creating index, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + + // Skip index creation if error occurs. + self.inner = None; + } + } + } + + /// Finish the index creation. + /// Returns the number of bytes written if success or None if failed. + pub async fn finish(&mut self) -> Option { + if let Some(mut creator) = self.inner.take() { + match creator.finish().await { + Ok((row_count, byte_count)) => { + debug!( + "Create index successfully, region_id: {}, file_id: {}, bytes: {}, rows: {}", + self.region_id, self.file_id, byte_count, row_count + ); + return Some(byte_count); + } + Err(err) => { + warn!( + err; "Failed to create index, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + } + } + + None + } + + /// Abort the index creation. + pub async fn abort(&mut self) { + if let Some(mut creator) = self.inner.take() { + if let Err(err) = creator.abort().await { + warn!( + err; "Failed to abort index, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + } + } +} + +pub(crate) struct IndexerBuilder<'a> { + pub(crate) create_inverted_index: bool, + pub(crate) mem_threshold_index_create: Option, + pub(crate) file_id: FileId, + pub(crate) file_path: String, + pub(crate) metadata: &'a RegionMetadataRef, + pub(crate) row_group_size: usize, + pub(crate) object_store: ObjectStore, + pub(crate) intermediate_manager: IntermediateManager, +} + +impl<'a> IndexerBuilder<'a> { + /// Sanity check for arguments and create a new [Indexer] + /// with inner [SstIndexCreator] if arguments are valid. + pub(crate) fn build(self) -> Indexer { + if !self.create_inverted_index { + debug!( + "Skip creating index due to request, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + return Indexer::default(); + } + + if self.metadata.primary_key.is_empty() { + debug!( + "No tag columns, skip creating index, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + return Indexer::default(); + } + + let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else { + warn!( + "Row group size is 0, skip creating index, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + return Indexer::default(); + }; + + let creator = SstIndexCreator::new( + self.file_path, + self.file_id, + self.metadata, + self.object_store, + self.intermediate_manager, + self.mem_threshold_index_create, + row_group_size, + ); + + Indexer { + file_id: self.file_id, + region_id: self.metadata.region_id, + inner: Some(creator), + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::SemanticType; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use object_store::services::Memory; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + + use super::*; + + fn mock_region_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "c", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }) + .primary_key(vec![1]); + + Arc::new(builder.build().unwrap()) + } + + fn no_tag_region_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "c", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }); + + Arc::new(builder.build().unwrap()) + } + + fn mock_object_store() -> ObjectStore { + ObjectStore::new(Memory::default()).unwrap().finish() + } + + fn mock_intm_mgr() -> IntermediateManager { + IntermediateManager::new(mock_object_store()) + } + + #[test] + fn test_build_indexer_basic() { + let metadata = mock_region_metadata(); + let indexer = IndexerBuilder { + create_inverted_index: true, + mem_threshold_index_create: Some(1024), + file_id: FileId::random(), + file_path: "test".to_string(), + metadata: &metadata, + row_group_size: 1024, + object_store: mock_object_store(), + intermediate_manager: mock_intm_mgr(), + } + .build(); + + assert!(indexer.inner.is_some()); + } + + #[test] + fn test_build_indexer_disable_create() { + let metadata = mock_region_metadata(); + let indexer = IndexerBuilder { + create_inverted_index: false, + mem_threshold_index_create: Some(1024), + file_id: FileId::random(), + file_path: "test".to_string(), + metadata: &metadata, + row_group_size: 1024, + object_store: mock_object_store(), + intermediate_manager: mock_intm_mgr(), + } + .build(); + + assert!(indexer.inner.is_none()); + } + + #[test] + fn test_build_indexer_no_tag() { + let metadata = no_tag_region_metadata(); + let indexer = IndexerBuilder { + create_inverted_index: true, + mem_threshold_index_create: Some(1024), + file_id: FileId::random(), + file_path: "test".to_string(), + metadata: &metadata, + row_group_size: 1024, + object_store: mock_object_store(), + intermediate_manager: mock_intm_mgr(), + } + .build(); + + assert!(indexer.inner.is_none()); + } + + #[test] + fn test_build_indexer_zero_row_group() { + let metadata = mock_region_metadata(); + let indexer = IndexerBuilder { + create_inverted_index: true, + mem_threshold_index_create: Some(1024), + file_id: FileId::random(), + file_path: "test".to_string(), + metadata: &metadata, + row_group_size: 0, + object_store: mock_object_store(), + intermediate_manager: mock_intm_mgr(), + } + .build(); + + assert!(indexer.inner.is_none()); + } +} diff --git a/src/mito2/src/sst/index/creator.rs b/src/mito2/src/sst/index/creator.rs index b88b47c1394..c143bc9aacd 100644 --- a/src/mito2/src/sst/index/creator.rs +++ b/src/mito2/src/sst/index/creator.rs @@ -43,22 +43,19 @@ use crate::sst::file::FileId; use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; use crate::sst::index::creator::statistics::Statistics; use crate::sst::index::creator::temp_provider::TempFileProvider; +use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager}; use crate::sst::index::store::InstrumentedStore; use crate::sst::index::{ INDEX_BLOB_TYPE, MIN_MEMORY_USAGE_THRESHOLD, PIPE_BUFFER_SIZE_FOR_SENDING_BLOB, }; -use crate::sst::location::{self, IntermediateLocation}; type ByteCount = usize; type RowCount = usize; /// Creates SST index. pub struct SstIndexCreator { - /// Directory of the region. - region_dir: String, - /// ID of the SST file. - sst_file_id: FileId, - + /// Path of index file to write. + file_path: String, /// The store to write index files. store: InstrumentedStore, /// The index creator. @@ -81,11 +78,11 @@ impl SstIndexCreator { /// Creates a new `SstIndexCreator`. /// Should ensure that the number of tag columns is greater than 0. pub fn new( - region_dir: String, + file_path: String, sst_file_id: FileId, metadata: &RegionMetadataRef, index_store: ObjectStore, - intermediate_store: ObjectStore, // prefer to use local store + intermediate_manager: IntermediateManager, memory_usage_threshold: Option, row_group_size: NonZeroUsize, ) -> Self { @@ -95,16 +92,15 @@ impl SstIndexCreator { (threshold / metadata.primary_key.len()).max(MIN_MEMORY_USAGE_THRESHOLD) }); let temp_file_provider = Arc::new(TempFileProvider::new( - IntermediateLocation::new(®ion_dir, &sst_file_id), - InstrumentedStore::new(intermediate_store), + IntermediateLocation::new(&metadata.region_id, &sst_file_id), + intermediate_manager, )); let sorter = ExternalSorter::factory(temp_file_provider.clone() as _, memory_threshold); let index_creator = Box::new(SortIndexCreator::new(sorter, row_group_size)); let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns()); Self { - region_dir, - sst_file_id, + file_path, store: InstrumentedStore::new(index_store), codec, index_creator, @@ -129,10 +125,7 @@ impl SstIndexCreator { if let Err(update_err) = self.do_update(batch).await { // clean up garbage if failed to update if let Err(err) = self.do_cleanup().await { - warn!( - err; "Failed to clean up index creator, region_dir: {}, sst_file_id: {}", - self.region_dir, self.sst_file_id, - ); + warn!(err; "Failed to clean up index creator, file_path: {}", self.file_path); } return Err(update_err); } @@ -153,10 +146,7 @@ impl SstIndexCreator { let finish_res = self.do_finish().await; // clean up garbage no matter finish successfully or not if let Err(err) = self.do_cleanup().await { - warn!( - err; "Failed to clean up index creator, region_dir: {}, sst_file_id: {}", - self.region_dir, self.sst_file_id, - ); + warn!(err; "Failed to clean up index creator, file_path: {}", self.file_path); } finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count())) @@ -216,11 +206,10 @@ impl SstIndexCreator { async fn do_finish(&mut self) -> Result<()> { let mut guard = self.stats.record_finish(); - let file_path = location::index_file_path(&self.region_dir, self.sst_file_id); let file_writer = self .store .writer( - &file_path, + &self.file_path, &INDEX_PUFFIN_WRITE_BYTES_TOTAL, &INDEX_PUFFIN_WRITE_OP_TOTAL, &INDEX_PUFFIN_FLUSH_OP_TOTAL, diff --git a/src/mito2/src/sst/index/creator/temp_provider.rs b/src/mito2/src/sst/index/creator/temp_provider.rs index d8dfff3d7d2..d938b236c86 100644 --- a/src/mito2/src/sst/index/creator/temp_provider.rs +++ b/src/mito2/src/sst/index/creator/temp_provider.rs @@ -27,16 +27,15 @@ use crate::metrics::{ INDEX_INTERMEDIATE_READ_OP_TOTAL, INDEX_INTERMEDIATE_SEEK_OP_TOTAL, INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_OP_TOTAL, }; -use crate::sst::index::store::InstrumentedStore; -use crate::sst::location::IntermediateLocation; +use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager}; /// `TempFileProvider` implements `ExternalTempFileProvider`. /// It uses `InstrumentedStore` to create and read intermediate files. pub(crate) struct TempFileProvider { /// Provides the location of intermediate files. location: IntermediateLocation, - /// Provides access to files in the object store. - store: InstrumentedStore, + /// Provides store to access to intermediate files. + manager: IntermediateManager, } #[async_trait] @@ -48,7 +47,8 @@ impl ExternalTempFileProvider for TempFileProvider { ) -> IndexResult> { let path = self.location.file_path(column_id, file_id); let writer = self - .store + .manager + .store() .writer( &path, &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL, @@ -67,7 +67,8 @@ impl ExternalTempFileProvider for TempFileProvider { ) -> IndexResult>> { let column_path = self.location.column_path(column_id); let entries = self - .store + .manager + .store() .list(&column_path) .await .map_err(BoxedError::new) @@ -81,7 +82,8 @@ impl ExternalTempFileProvider for TempFileProvider { } let reader = self - .store + .manager + .store() .reader( entry.path(), &INDEX_INTERMEDIATE_READ_BYTES_TOTAL, @@ -100,30 +102,35 @@ impl ExternalTempFileProvider for TempFileProvider { impl TempFileProvider { /// Creates a new `TempFileProvider`. - pub fn new(location: IntermediateLocation, store: InstrumentedStore) -> Self { - Self { location, store } + pub fn new(location: IntermediateLocation, manager: IntermediateManager) -> Self { + Self { location, manager } } /// Removes all intermediate files. pub async fn cleanup(&self) -> Result<()> { - self.store.remove_all(self.location.root_path()).await + self.manager + .store() + .remove_all(self.location.root_path()) + .await } } #[cfg(test)] mod tests { + use common_test_util::temp_dir; use futures::{AsyncReadExt, AsyncWriteExt}; - use object_store::services::Memory; - use object_store::ObjectStore; + use store_api::storage::RegionId; use super::*; use crate::sst::file::FileId; #[tokio::test] async fn test_temp_file_provider_basic() { - let location = IntermediateLocation::new("region_dir", &FileId::random()); - let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); - let store = InstrumentedStore::new(object_store); + let temp_dir = temp_dir::create_temp_dir("intermediate"); + let path = temp_dir.path().display().to_string(); + + let location = IntermediateLocation::new(&RegionId::new(0, 0), &FileId::random()); + let store = IntermediateManager::init_fs(path).await.unwrap(); let provider = TempFileProvider::new(location.clone(), store); let column_name = "tag0"; @@ -163,7 +170,8 @@ mod tests { provider.cleanup().await.unwrap(); assert!(provider - .store + .manager + .store() .list(location.root_path()) .await .unwrap() diff --git a/src/mito2/src/sst/index/intermediate.rs b/src/mito2/src/sst/index/intermediate.rs new file mode 100644 index 00000000000..ab10b0e6803 --- /dev/null +++ b/src/mito2/src/sst/index/intermediate.rs @@ -0,0 +1,153 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::warn; +use object_store::util::{self, normalize_dir}; +use store_api::storage::RegionId; +use uuid::Uuid; + +use crate::access_layer::new_fs_object_store; +use crate::error::Result; +use crate::sst::file::FileId; +use crate::sst::index::store::InstrumentedStore; + +const INTERMEDIATE_DIR: &str = "__intm"; + +/// `IntermediateManager` provides store to access to intermediate files. +#[derive(Clone)] +pub struct IntermediateManager { + store: InstrumentedStore, +} + +impl IntermediateManager { + /// Create a new `IntermediateManager` with the given root path. + /// It will clean up all garbage intermediate files from previous runs. + pub async fn init_fs(root_path: impl AsRef) -> Result { + let store = new_fs_object_store(&normalize_dir(root_path.as_ref())).await?; + let store = InstrumentedStore::new(store); + + // Remove all garbage intermediate files from previous runs. + if let Err(err) = store.remove_all(INTERMEDIATE_DIR).await { + warn!(err; "Failed to remove garbage intermediate files"); + } + + Ok(Self { store }) + } + + /// Returns the store to access to intermediate files. + pub(crate) fn store(&self) -> &InstrumentedStore { + &self.store + } + + #[cfg(test)] + pub(crate) fn new(store: object_store::ObjectStore) -> Self { + Self { + store: InstrumentedStore::new(store), + } + } +} + +/// `IntermediateLocation` produces paths for intermediate files +/// during external sorting. +#[derive(Debug, Clone)] +pub struct IntermediateLocation { + root_path: String, +} + +impl IntermediateLocation { + /// Create a new `IntermediateLocation`. Set the root directory to + /// `__intm/{region_id}/{sst_file_id}/{uuid}/`, incorporating + /// uuid to differentiate active sorting files from orphaned data due to unexpected + /// process termination. + pub fn new(region_id: &RegionId, sst_file_id: &FileId) -> Self { + let region_id = region_id.as_u64(); + let uuid = Uuid::new_v4(); + Self { + root_path: format!("{INTERMEDIATE_DIR}/{region_id}/{sst_file_id}/{uuid}/"), + } + } + + /// Returns the root directory of the intermediate files + pub fn root_path(&self) -> &str { + &self.root_path + } + + /// Returns the path of the directory for intermediate files associated with a column: + /// `__intm/{region_id}/{sst_file_id}/{uuid}/{column_id}/` + pub fn column_path(&self, column_id: &str) -> String { + util::join_path(&self.root_path, &format!("{column_id}/")) + } + + /// Returns the path of the intermediate file with the given id for a column: + /// `__intm/{region_id}/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im` + pub fn file_path(&self, column_id: &str, im_file_id: &str) -> String { + util::join_path(&self.column_path(column_id), &format!("{im_file_id}.im")) + } +} + +#[cfg(test)] +mod tests { + use common_test_util::temp_dir; + use regex::Regex; + + use super::*; + + #[tokio::test] + async fn test_manager() { + let temp_dir = temp_dir::create_temp_dir("index_intermediate"); + let path = temp_dir.path().to_str().unwrap(); + + // write a garbage file + tokio::fs::create_dir_all(format!("{path}/{INTERMEDIATE_DIR}")) + .await + .unwrap(); + tokio::fs::write(format!("{path}/{INTERMEDIATE_DIR}/garbage.im"), "blahblah") + .await + .unwrap(); + + let _manager = IntermediateManager::init_fs(path).await.unwrap(); + + // cleaned up by `init_fs` + assert!(!tokio::fs::try_exists(format!("{path}/{INTERMEDIATE_DIR}")) + .await + .unwrap()); + } + + #[test] + fn test_intermediate_location() { + let sst_file_id = FileId::random(); + let location = IntermediateLocation::new(&RegionId::new(0, 0), &sst_file_id); + + let re = Regex::new(&format!( + "{INTERMEDIATE_DIR}/0/{sst_file_id}/{}/", + r"\w{8}-\w{4}-\w{4}-\w{4}-\w{12}" + )) + .unwrap(); + assert!(re.is_match(location.root_path())); + + let uuid = location.root_path().split('/').nth(3).unwrap(); + + let column_id = "1"; + assert_eq!( + location.column_path(column_id), + format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{column_id}/") + ); + + let im_file_id = "000000000010"; + assert_eq!( + location.file_path(column_id, im_file_id), + format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im") + ); + } +} diff --git a/src/mito2/src/sst/location.rs b/src/mito2/src/sst/location.rs index d3b69d9c73d..179e9159c94 100644 --- a/src/mito2/src/sst/location.rs +++ b/src/mito2/src/sst/location.rs @@ -13,7 +13,6 @@ // limitations under the License. use object_store::util; -use uuid::Uuid; use crate::sst::file::FileId; @@ -30,48 +29,8 @@ pub fn index_file_path(region_dir: &str, sst_file_id: FileId) -> String { util::join_path(&dir, &sst_file_id.as_puffin()) } -/// `IntermediateLocation` produces paths for intermediate files -/// during external sorting. -#[derive(Debug, Clone)] -pub struct IntermediateLocation { - root_path: String, -} - -impl IntermediateLocation { - /// Create a new `IntermediateLocation`. Set the root directory to - /// `{region_dir}/index/__intermediate/{sst_file_id}/{uuid}/`, incorporating - /// uuid to differentiate active sorting files from orphaned data due to unexpected - /// process termination. - pub fn new(region_dir: &str, sst_file_id: &FileId) -> Self { - let uuid = Uuid::new_v4(); - let child = format!("index/__intermediate/{sst_file_id}/{uuid}/"); - Self { - root_path: util::join_path(region_dir, &child), - } - } - - /// Returns the root directory of the intermediate files - pub fn root_path(&self) -> &str { - &self.root_path - } - - /// Returns the path of the directory for intermediate files associated with a column: - /// `{region_dir}/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/` - pub fn column_path(&self, column_id: &str) -> String { - util::join_path(&self.root_path, &format!("{column_id}/")) - } - - /// Returns the path of the intermediate file with the given id for a column: - /// `{region_dir}/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im` - pub fn file_path(&self, column_id: &str, im_file_id: &str) -> String { - util::join_path(&self.column_path(column_id), &format!("{im_file_id}.im")) - } -} - #[cfg(test)] mod tests { - use regex::Regex; - use super::*; #[test] @@ -91,33 +50,4 @@ mod tests { format!("region_dir/index/{file_id}.puffin") ); } - - #[test] - fn test_intermediate_location() { - let sst_file_id = FileId::random(); - let location = IntermediateLocation::new("region_dir", &sst_file_id); - - let re = Regex::new(&format!( - "region_dir/index/__intermediate/{sst_file_id}/{}/", - r"\w{8}-\w{4}-\w{4}-\w{4}-\w{12}" - )) - .unwrap(); - assert!(re.is_match(location.root_path())); - - let uuid = location.root_path().split('/').nth(4).unwrap(); - - let column_id = "1"; - assert_eq!( - location.column_path(column_id), - format!("region_dir/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/") - ); - - let im_file_id = "000000000010"; - assert_eq!( - location.file_path(column_id, im_file_id), - format!( - "region_dir/index/__intermediate/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im" - ) - ); - } } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 6c9ec3e9802..924df76e1dd 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -80,6 +80,7 @@ mod tests { use super::*; use crate::cache::{CacheManager, PageKey}; + use crate::sst::index::Indexer; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; use crate::test_util::sst_util::{ @@ -107,7 +108,12 @@ mod tests { ..Default::default() }; - let mut writer = ParquetWriter::new(file_path, metadata, object_store.clone()); + let mut writer = ParquetWriter::new( + file_path, + metadata, + object_store.clone(), + Indexer::default(), + ); let info = writer .write_all(source, &write_opts) .await @@ -156,7 +162,12 @@ mod tests { ..Default::default() }; // Prepare data. - let mut writer = ParquetWriter::new(file_path, metadata.clone(), object_store.clone()); + let mut writer = ParquetWriter::new( + file_path, + metadata.clone(), + object_store.clone(), + Indexer::default(), + ); writer .write_all(source, &write_opts) .await @@ -225,7 +236,12 @@ mod tests { // write the sst file and get sst info // sst info contains the parquet metadata, which is converted from FileMetaData - let mut writer = ParquetWriter::new(file_path, metadata.clone(), object_store.clone()); + let mut writer = ParquetWriter::new( + file_path, + metadata.clone(), + object_store.clone(), + Indexer::default(), + ); let sst_info = writer .write_all(source, &write_opts) .await diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index e1d8765f5f4..d2d9cc2d492 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use common_datasource::file_format::parquet::BufferedWriter; use common_telemetry::debug; use common_time::Timestamp; +use futures::TryFutureExt; use object_store::ObjectStore; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; @@ -28,10 +29,11 @@ use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::SEQUENCE_COLUMN_NAME; -use super::helper::parse_parquet_metadata; use crate::error::{InvalidMetadataSnafu, Result, WriteBufferSnafu}; use crate::read::{Batch, Source}; +use crate::sst::index::Indexer; use crate::sst::parquet::format::WriteFormat; +use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; /// Parquet SST writer. @@ -41,6 +43,7 @@ pub struct ParquetWriter { /// Region metadata of the source and the target SST. metadata: RegionMetadataRef, object_store: ObjectStore, + indexer: Indexer, } impl ParquetWriter { @@ -49,11 +52,13 @@ impl ParquetWriter { file_path: String, metadata: RegionMetadataRef, object_store: ObjectStore, + indexer: Indexer, ) -> ParquetWriter { ParquetWriter { file_path, metadata, object_store, + indexer, } } @@ -90,16 +95,22 @@ impl ParquetWriter { .context(WriteBufferSnafu)?; let mut stats = SourceStats::default(); - while let Some(batch) = source.next_batch().await? { + while let Some(batch) = write_next_batch(&mut source, &write_format, &mut buffered_writer) + .or_else(|err| async { + // abort index creation if error occurs. + self.indexer.abort().await; + Err(err) + }) + .await? + { stats.update(&batch); - let arrow_batch = write_format.convert_batch(&batch)?; - - buffered_writer - .write(&arrow_batch) - .await - .context(WriteBufferSnafu)?; + self.indexer.update(&batch).await; } + let index_size = self.indexer.finish().await; + let inverted_index_available = index_size.is_some(); + let index_file_size = index_size.unwrap_or(0) as u64; + if stats.num_rows == 0 { debug!( "No data written, try to stop the writer: {}", @@ -124,8 +135,8 @@ impl ParquetWriter { file_size, num_rows: stats.num_rows, file_metadata: Some(Arc::new(parquet_metadata)), - inverted_index_available: false, - index_file_size: 0, + inverted_index_available, + index_file_size, })) } @@ -149,6 +160,24 @@ impl ParquetWriter { } } +async fn write_next_batch( + source: &mut Source, + write_format: &WriteFormat, + buffered_writer: &mut BufferedWriter, +) -> Result> { + let Some(batch) = source.next_batch().await? else { + return Ok(None); + }; + + let arrow_batch = write_format.convert_batch(&batch)?; + buffered_writer + .write(&arrow_batch) + .await + .context(WriteBufferSnafu)?; + + Ok(Some(batch)) +} + #[derive(Default)] struct SourceStats { /// Number of rows fetched. diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 73795744ffc..63d73c776e2 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -136,7 +136,8 @@ impl TestEnv { let object_store_manager = Arc::new(object_store_manager); self.logstore = Some(logstore.clone()); self.object_store_manager = Some(object_store_manager.clone()); - MitoEngine::new(config, logstore, object_store_manager) + let data_home = self.data_home().display().to_string(); + MitoEngine::new(&data_home, config, logstore, object_store_manager) .await .unwrap() } @@ -145,8 +146,8 @@ impl TestEnv { pub async fn create_follower_engine(&mut self, config: MitoConfig) -> MitoEngine { let logstore = self.logstore.as_ref().unwrap().clone(); let object_store_manager = self.object_store_manager.as_ref().unwrap().clone(); - - MitoEngine::new(config, logstore, object_store_manager) + let data_home = self.data_home().display().to_string(); + MitoEngine::new(&data_home, config, logstore, object_store_manager) .await .unwrap() } @@ -164,9 +165,19 @@ impl TestEnv { let object_store_manager = Arc::new(object_store_manager); self.logstore = Some(logstore.clone()); self.object_store_manager = Some(object_store_manager.clone()); - MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener) - .await - .unwrap() + + let data_home = self.data_home().display().to_string(); + + MitoEngine::new_for_test( + &data_home, + config, + logstore, + object_store_manager, + manager, + listener, + ) + .await + .unwrap() } pub async fn create_engine_with_multiple_object_stores( @@ -195,9 +206,18 @@ impl TestEnv { let object_store_manager = Arc::new(object_store_manager); self.logstore = Some(logstore.clone()); self.object_store_manager = Some(object_store_manager.clone()); - MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener) - .await - .unwrap() + let data_home = self.data_home().display().to_string(); + + MitoEngine::new_for_test( + &data_home, + config, + logstore, + object_store_manager, + manager, + listener, + ) + .await + .unwrap() } /// Reopen the engine. @@ -205,6 +225,7 @@ impl TestEnv { engine.stop().await.unwrap(); MitoEngine::new( + &self.data_home().display().to_string(), config, self.logstore.clone().unwrap(), self.object_store_manager.clone().unwrap(), @@ -216,6 +237,7 @@ impl TestEnv { /// Open the engine. pub async fn open_engine(&mut self, config: MitoConfig) -> MitoEngine { MitoEngine::new( + &self.data_home().display().to_string(), config, self.logstore.clone().unwrap(), self.object_store_manager.clone().unwrap(), @@ -231,9 +253,11 @@ impl TestEnv { } /// Creates a new [WorkerGroup] with specific config under this env. - pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup { + pub(crate) async fn create_worker_group(&self, mut config: MitoConfig) -> WorkerGroup { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; + let data_home = self.data_home().display().to_string(); + config.sanitize(&data_home).unwrap(); WorkerGroup::start( Arc::new(config), Arc::new(log_store), diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 445151f12f5..4a080993918 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use object_store::services::Fs; +use object_store::util::join_dir; use object_store::ObjectStore; use tokio::sync::mpsc::Sender; @@ -27,6 +28,7 @@ use crate::compaction::CompactionScheduler; use crate::flush::FlushScheduler; use crate::request::WorkerRequest; use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; +use crate::sst::index::intermediate::IntermediateManager; /// Scheduler mocker. pub(crate) struct SchedulerEnv { @@ -39,15 +41,20 @@ pub(crate) struct SchedulerEnv { impl SchedulerEnv { /// Creates a new mocker. - pub(crate) fn new() -> SchedulerEnv { + pub(crate) async fn new() -> SchedulerEnv { let path = create_temp_dir(""); + let path_str = path.path().display().to_string(); let mut builder = Fs::default(); - builder.root(path.path().to_str().unwrap()); + builder.root(&path_str); + + let intm_mgr = IntermediateManager::init_fs(join_dir(&path_str, "intm")) + .await + .unwrap(); let object_store = ObjectStore::new(builder).unwrap().finish(); - let access_layer = Arc::new(AccessLayer::new("", object_store.clone())); + let access_layer = Arc::new(AccessLayer::new("", object_store.clone(), intm_mgr)); SchedulerEnv { - path: create_temp_dir(""), + path, access_layer, scheduler: None, } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 09cb59aa1b1..ef54b42514d 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -55,6 +55,7 @@ use crate::request::{ BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, }; use crate::schedule::scheduler::{LocalScheduler, SchedulerRef}; +use crate::sst::index::intermediate::IntermediateManager; use crate::wal::Wal; /// Identifier for a worker. @@ -120,8 +121,15 @@ impl WorkerGroup { let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new( config.global_write_buffer_size.as_bytes() as usize, )); + let intermediate_manager = + IntermediateManager::init_fs(&config.inverted_index.intermediate_path).await?; let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); - let write_cache = write_cache_from_config(&config, object_store_manager.clone()).await?; + let write_cache = write_cache_from_config( + &config, + object_store_manager.clone(), + intermediate_manager.clone(), + ) + .await?; let cache_manager = Arc::new( CacheManager::builder() .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes()) @@ -142,6 +150,7 @@ impl WorkerGroup { scheduler: scheduler.clone(), listener: WorkerListener::default(), cache_manager: cache_manager.clone(), + intermediate_manager: intermediate_manager.clone(), } .start() }) @@ -222,7 +231,14 @@ impl WorkerGroup { )) }); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); - let write_cache = write_cache_from_config(&config, object_store_manager.clone()).await?; + let intermediate_manager = + IntermediateManager::init_fs(&config.inverted_index.intermediate_path).await?; + let write_cache = write_cache_from_config( + &config, + object_store_manager.clone(), + intermediate_manager.clone(), + ) + .await?; let cache_manager = Arc::new( CacheManager::builder() .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes()) @@ -231,7 +247,6 @@ impl WorkerGroup { .write_cache(write_cache) .build(), ); - let workers = (0..config.num_workers) .map(|id| { WorkerStarter { @@ -243,6 +258,7 @@ impl WorkerGroup { scheduler: scheduler.clone(), listener: WorkerListener::new(listener.clone()), cache_manager: cache_manager.clone(), + intermediate_manager: intermediate_manager.clone(), } .start() }) @@ -263,6 +279,7 @@ fn value_to_index(value: usize, num_workers: usize) -> usize { async fn write_cache_from_config( config: &MitoConfig, object_store_manager: ObjectStoreManagerRef, + intermediate_manager: IntermediateManager, ) -> Result> { if !config.enable_experimental_write_cache { return Ok(None); @@ -275,6 +292,7 @@ async fn write_cache_from_config( &config.experimental_write_cache_path, object_store_manager, config.experimental_write_cache_size, + intermediate_manager, ) .await?; Ok(Some(Arc::new(cache))) @@ -290,6 +308,7 @@ struct WorkerStarter { scheduler: SchedulerRef, listener: WorkerListener, cache_manager: CacheManagerRef, + intermediate_manager: IntermediateManager, } impl WorkerStarter { @@ -323,6 +342,7 @@ impl WorkerStarter { stalled_requests: StalledRequests::default(), listener: self.listener, cache_manager: self.cache_manager, + intermediate_manager: self.intermediate_manager, }; let handle = common_runtime::spawn_write(async move { worker_thread.run().await; @@ -479,6 +499,8 @@ struct RegionWorkerLoop { listener: WorkerListener, /// Cache. cache_manager: CacheManagerRef, + /// Intermediate manager for inverted index. + intermediate_manager: IntermediateManager, } impl RegionWorkerLoop { diff --git a/src/mito2/src/worker/handle_catchup.rs b/src/mito2/src/worker/handle_catchup.rs index 9841c4eb43f..3622793273a 100644 --- a/src/mito2/src/worker/handle_catchup.rs +++ b/src/mito2/src/worker/handle_catchup.rs @@ -54,6 +54,7 @@ impl RegionWorkerLoop { self.memtable_builder.clone(), self.object_store_manager.clone(), self.scheduler.clone(), + self.intermediate_manager.clone(), ) .cache(Some(self.cache_manager.clone())) .options(region.version().options.clone()) diff --git a/src/mito2/src/worker/handle_create.rs b/src/mito2/src/worker/handle_create.rs index 0af60793a0e..0a87ba2ed54 100644 --- a/src/mito2/src/worker/handle_create.rs +++ b/src/mito2/src/worker/handle_create.rs @@ -61,6 +61,7 @@ impl RegionWorkerLoop { self.memtable_builder.clone(), self.object_store_manager.clone(), self.scheduler.clone(), + self.intermediate_manager.clone(), ) .metadata(metadata) .parse_options(request.options)? diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index a2a7f7d6609..9163b6f174c 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -68,6 +68,7 @@ impl RegionWorkerLoop { self.memtable_builder.clone(), self.object_store_manager.clone(), self.scheduler.clone(), + self.intermediate_manager.clone(), ) .skip_wal_replay(request.skip_wal_replay) .parse_options(request.options)? diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 620d1bf4c56..cbe18e7fbf8 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -777,6 +777,13 @@ sst_write_buffer_size = "8MiB" parallel_scan_channel_size = 32 allow_stale_entries = false +[datanode.region_engine.mito.inverted_index] +create_on_flush = "auto" +create_on_compaction = "auto" +apply_on_query = "auto" +mem_threshold_on_create = "64.0MiB" +intermediate_path = "" + [[datanode.region_engine]] [datanode.region_engine.file] diff --git a/tests/cases/standalone/common/insert/logical_metric_table.result b/tests/cases/standalone/common/insert/logical_metric_table.result index d3f59786830..09ddac341c9 100644 --- a/tests/cases/standalone/common/insert/logical_metric_table.result +++ b/tests/cases/standalone/common/insert/logical_metric_table.result @@ -19,19 +19,14 @@ SELECT * from t1; | 1970-01-01T00:00:00 | 0.0 | host1 | +-------------------------+-----+-------+ --- TODO(ruihang): fix this. t2 should not contains data from t1 CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy"); Affected Rows: 0 SELECT * from t2; -+-------------------------+-----+-----+ -| ts | job | val | -+-------------------------+-----+-----+ -| 1970-01-01T00:00:00.001 | | 1.0 | -| 1970-01-01T00:00:00 | | 0.0 | -+-------------------------+-----+-----+ +++ +++ INSERT INTO t2 VALUES (0, 'job1', 0), (1, 'job2', 1); @@ -42,8 +37,6 @@ SELECT * from t2; +-------------------------+------+-----+ | ts | job | val | +-------------------------+------+-----+ -| 1970-01-01T00:00:00.001 | | 1.0 | -| 1970-01-01T00:00:00 | | 0.0 | | 1970-01-01T00:00:00.001 | job2 | 1.0 | | 1970-01-01T00:00:00 | job1 | 0.0 | +-------------------------+------+-----+ diff --git a/tests/cases/standalone/common/insert/logical_metric_table.sql b/tests/cases/standalone/common/insert/logical_metric_table.sql index 6583833de51..fa2b6e0b6d7 100644 --- a/tests/cases/standalone/common/insert/logical_metric_table.sql +++ b/tests/cases/standalone/common/insert/logical_metric_table.sql @@ -6,7 +6,6 @@ INSERT INTO t1 VALUES (0, 0, 'host1'), (1, 1, 'host2'); SELECT * from t1; --- TODO(ruihang): fix this. t2 should not contains data from t1 CREATE TABLE t2 (ts timestamp time index, job string primary key, val double) engine = metric with ("on_physical_table" = "phy"); SELECT * from t2; diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 3df634d5f9b..2378f088513 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -322,7 +322,7 @@ impl Env { } } - /// Setup kafka wal cluster if needed. The conterpart is in [GreptimeDB::stop]. + /// Setup kafka wal cluster if needed. The counterpart is in [GreptimeDB::stop]. fn setup_wal(&self) { if matches!(self.wal, WalConfig::Kafka { needs_kafka_cluster, .. } if needs_kafka_cluster) { util::setup_wal();