Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): adjust seg size of inverted index to finer granularity instead of row group level #3289

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion src/index/src/inverted_index/search/index_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ pub use predicates_apply::PredicatesIndexApplier;
use crate::inverted_index::error::Result;
use crate::inverted_index::format::reader::InvertedIndexReader;

/// The output of an apply operation.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ApplyOutput {
/// The list of indices that match the predicates.
pub matched_segment_ids: BTreeSet<usize>,

/// The total number of rows in the index.
pub total_row_count: usize,

/// The number of rows in each segment.
pub segment_row_count: usize,
}

/// A trait for processing and transforming indices obtained from an inverted index.
///
/// Applier instances are reusable and work with various `InvertedIndexReader` instances,
Expand All @@ -35,7 +48,7 @@ pub trait IndexApplier: Send + Sync {
&self,
context: SearchContext,
reader: &mut (dyn InvertedIndexReader + 'a),
) -> Result<BTreeSet<usize>>;
) -> Result<ApplyOutput>;

/// Returns the memory usage of the applier.
fn memory_usage(&self) -> usize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::inverted_index::search::fst_apply::{
};
use crate::inverted_index::search::fst_values_mapper::FstValuesMapper;
use crate::inverted_index::search::index_apply::{
IndexApplier, IndexNotFoundStrategy, SearchContext,
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use crate::inverted_index::search::predicate::Predicate;

Expand All @@ -48,8 +48,13 @@ impl IndexApplier for PredicatesIndexApplier {
&self,
context: SearchContext,
reader: &mut (dyn InvertedIndexReader + 'a),
) -> Result<BTreeSet<usize>> {
) -> Result<ApplyOutput> {
let metadata = reader.metadata().await?;
let mut output = ApplyOutput {
matched_segment_ids: BTreeSet::default(),
total_row_count: metadata.total_row_count as _,
segment_row_count: metadata.segment_row_count as _,
};

let mut bitmap = Self::bitmap_full_range(&metadata);
// TODO(zhongzc): optimize the order of applying to make it quicker to return empty.
Expand All @@ -61,7 +66,7 @@ impl IndexApplier for PredicatesIndexApplier {
let Some(meta) = metadata.metas.get(name) else {
match context.index_not_found_strategy {
IndexNotFoundStrategy::ReturnEmpty => {
return Ok(BTreeSet::default());
return Ok(output);
}
IndexNotFoundStrategy::Ignore => {
continue;
Expand All @@ -81,7 +86,8 @@ impl IndexApplier for PredicatesIndexApplier {
bitmap &= bm;
}

Ok(bitmap.iter_ones().collect())
output.matched_segment_ids = bitmap.iter_ones().collect();
evenyag marked this conversation as resolved.
Show resolved Hide resolved
Ok(output)
}

/// Returns the memory usage of the applier.
Expand Down Expand Up @@ -206,11 +212,14 @@ mod tests {
_ => unreachable!(),
}
});
let indices = applier
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, BTreeSet::from_iter([0, 2, 4, 6]));
assert_eq!(
output.matched_segment_ids,
BTreeSet::from_iter([0, 2, 4, 6])
);

// An index reader with a single tag "tag-0" but without value "tag-0_value-0"
let mut mock_reader = MockInvertedIndexReader::new();
Expand All @@ -223,11 +232,11 @@ mod tests {
"tag-0" => Ok(FstMap::from_iter([(b"tag-0_value-1", fst_value(2, 1))]).unwrap()),
_ => unreachable!(),
});
let indices = applier
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert!(indices.is_empty());
assert!(output.matched_segment_ids.is_empty());
}

#[tokio::test]
Expand Down Expand Up @@ -260,11 +269,11 @@ mod tests {
}
});

let indices = applier
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, BTreeSet::from_iter([0, 4, 6]));
assert_eq!(output.matched_segment_ids, BTreeSet::from_iter([0, 4, 6]));
}

#[tokio::test]
Expand All @@ -278,11 +287,14 @@ mod tests {
.expect_metadata()
.returning(|| Ok(mock_metas(["tag-0"])));

let indices = applier
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7])); // full range to scan
assert_eq!(
output.matched_segment_ids,
BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7])
); // full range to scan
}

#[tokio::test]
Expand All @@ -303,11 +315,11 @@ mod tests {
fst_appliers: vec![(s("tag-0"), Box::new(mock_fst_applier))],
};

let indices = applier
let output = applier
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert!(indices.is_empty());
assert!(output.matched_segment_ids.is_empty());
}

#[tokio::test]
Expand All @@ -334,7 +346,7 @@ mod tests {
.await;
assert!(matches!(result, Err(Error::IndexNotFound { .. })));

let indices = applier
let output = applier
.apply(
SearchContext {
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
Expand All @@ -343,9 +355,9 @@ mod tests {
)
.await
.unwrap();
assert!(indices.is_empty());
assert!(output.matched_segment_ids.is_empty());

let indices = applier
let output = applier
.apply(
SearchContext {
index_not_found_strategy: IndexNotFoundStrategy::Ignore,
Expand All @@ -354,7 +366,10 @@ mod tests {
)
.await
.unwrap();
assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7]));
assert_eq!(
output.matched_segment_ids,
BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7])
);
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ datatypes.workspace = true
futures.workspace = true
humantime-serde.workspace = true
index.workspace = true
itertools.workspace = true
lazy_static = "1.4"
log-store = { workspace = true, optional = true }
memcomparable = "0.2"
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl AccessLayer {
file_id,
file_path: index_file_path,
metadata: &request.metadata,
segment_row_count: write_opts.index_segment_row_count,
row_group_size: write_opts.row_group_size,
object_store: self.object_store.clone(),
intermediate_manager: self.intermediate_manager.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl WriteCache {
file_id,
file_path: self.file_cache.cache_file_path(puffin_key),
metadata: &write_request.metadata,
segment_row_count: write_opts.index_segment_row_count,
row_group_size: write_opts.row_group_size,
object_store: self.file_cache.local_store(),
intermediate_manager: self.intermediate_manager.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,10 @@ async fn test_region_usage() {

let region_stat = region.region_usage().await;
assert_eq!(region_stat.wal_usage, 0);
assert_eq!(region_stat.sst_usage, 3006);
assert_eq!(region_stat.sst_usage, 3005);

// region total usage
assert_eq!(region_stat.disk_usage(), 4072);
assert_eq!(region_stat.disk_usage(), 4071);
}

#[tokio::test]
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ lazy_static! {
/// Counter of filtered rows by precise filter.
pub static ref PRECISE_FILTER_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_precise_filter_rows_total", "mito precise filter rows total", &[TYPE_LABEL]).unwrap();
pub static ref READ_ROWS_IN_ROW_GROUP_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_rows_in_row_group_total", "mito read rows in row group total", &[TYPE_LABEL]).unwrap();
// ------- End of query metrics.

// Cache related metrics.
Expand Down
20 changes: 19 additions & 1 deletion src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ pub(crate) struct IndexerBuilder<'a> {
pub(crate) file_path: String,
pub(crate) metadata: &'a RegionMetadataRef,
pub(crate) row_group_size: usize,
pub(crate) segment_row_count: usize,
pub(crate) object_store: ObjectStore,
pub(crate) intermediate_manager: IntermediateManager,
}
Expand All @@ -153,6 +154,14 @@ impl<'a> IndexerBuilder<'a> {
return Indexer::default();
}

let Some(mut segment_row_count) = NonZeroUsize::new(self.segment_row_count) else {
warn!(
"Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
);
return Indexer::default();
};

let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
warn!(
"Row group size is 0, skip creating index, region_id: {}, file_id: {}",
Expand All @@ -161,14 +170,19 @@ impl<'a> IndexerBuilder<'a> {
return Indexer::default();
};

// if segment row count not aligned with row group size, adjust it to be aligned.
if row_group_size.get() % segment_row_count.get() != 0 {
segment_row_count = row_group_size;
}

let creator = SstIndexCreator::new(
self.file_path,
self.file_id,
self.metadata,
self.object_store,
self.intermediate_manager,
self.mem_threshold_index_create,
row_group_size,
segment_row_count,
)
.with_buffer_size(self.write_buffer_size);

Expand Down Expand Up @@ -263,6 +277,7 @@ mod tests {
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
segment_row_count: 16,
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
Expand All @@ -282,6 +297,7 @@ mod tests {
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
segment_row_count: 16,
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
Expand All @@ -301,6 +317,7 @@ mod tests {
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
segment_row_count: 16,
row_group_size: 1024,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
Expand All @@ -320,6 +337,7 @@ mod tests {
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
segment_row_count: 0,
row_group_size: 0,
object_store: mock_object_store(),
intermediate_manager: mock_intm_mgr(),
Expand Down
28 changes: 20 additions & 8 deletions src/mito2/src/sst/index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@

pub mod builder;

use std::collections::BTreeSet;
use std::sync::Arc;

use futures::{AsyncRead, AsyncSeek};
use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::search::index_apply::{
IndexApplier, IndexNotFoundStrategy, SearchContext,
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use object_store::ObjectStore;
use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader};
Expand Down Expand Up @@ -84,7 +83,7 @@ impl SstIndexApplier {
}

/// Applies predicates to the provided SST file id and returns the relevant row group ids
pub async fn apply(&self, file_id: FileId) -> Result<BTreeSet<usize>> {
pub async fn apply(&self, file_id: FileId) -> Result<ApplyOutput> {
let _timer = INDEX_APPLY_ELAPSED.start_timer();

let context = SearchContext {
Expand Down Expand Up @@ -175,6 +174,8 @@ impl Drop for SstIndexApplier {

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;

use futures::io::Cursor;
use index::inverted_index::search::index_apply::MockIndexApplier;
use object_store::services::Memory;
Expand Down Expand Up @@ -203,9 +204,13 @@ mod tests {

let mut mock_index_applier = MockIndexApplier::new();
mock_index_applier.expect_memory_usage().returning(|| 100);
mock_index_applier
.expect_apply()
.returning(|_, _| Ok(BTreeSet::from_iter([1, 2, 3])));
mock_index_applier.expect_apply().returning(|_, _| {
Ok(ApplyOutput {
matched_segment_ids: BTreeSet::from_iter([1, 2, 3]),
total_row_count: 100,
segment_row_count: 10,
})
});

let sst_index_applier = SstIndexApplier::new(
region_dir.clone(),
Expand All @@ -214,8 +219,15 @@ mod tests {
None,
Box::new(mock_index_applier),
);
let ids = sst_index_applier.apply(file_id).await.unwrap();
assert_eq!(ids, BTreeSet::from_iter([1, 2, 3]));
let output = sst_index_applier.apply(file_id).await.unwrap();
assert_eq!(
output,
ApplyOutput {
matched_segment_ids: BTreeSet::from_iter([1, 2, 3]),
total_row_count: 100,
segment_row_count: 10,
}
);
}

#[tokio::test]
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/sst/index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl SstIndexCreator {
index_store: ObjectStore,
intermediate_manager: IntermediateManager,
memory_usage_threshold: Option<usize>,
row_group_size: NonZeroUsize,
segment_row_count: NonZeroUsize,
) -> Self {
// `memory_usage_threshold` is the total memory usage threshold of the index creation,
// so we need to divide it by the number of columns
Expand All @@ -96,7 +96,7 @@ impl SstIndexCreator {
intermediate_manager,
));
let sorter = ExternalSorter::factory(temp_file_provider.clone() as _, memory_threshold);
let index_creator = Box::new(SortIndexCreator::new(sorter, row_group_size));
let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));

let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns());
Self {
Expand Down