diff --git a/Cargo.lock b/Cargo.lock index d4f719e470463..57b1fd2e8611a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4952,7 +4952,11 @@ dependencies = [ "databend-common-base", "databend-common-catalog", "databend-common-exception", + "databend-common-expression", "databend-common-meta-app", + "databend-common-pipeline-core", + "databend-common-storages-fuse", + "databend-storages-common-table-meta", ] [[package]] diff --git a/src/query/ee/src/storages/fuse/operations/mod.rs b/src/query/ee/src/storages/fuse/operations/mod.rs index 95e551c45f35c..528f11775e5c0 100644 --- a/src/query/ee/src/storages/fuse/operations/mod.rs +++ b/src/query/ee/src/storages/fuse/operations/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod handler; +pub mod ngram_index; pub mod vacuum_drop_tables; pub mod vacuum_table; pub mod vacuum_table_v2; diff --git a/src/query/ee/src/storages/fuse/operations/ngram_index.rs b/src/query/ee/src/storages/fuse/operations/ngram_index.rs new file mode 100644 index 0000000000000..14a036946406b --- /dev/null +++ b/src/query/ee/src/storages/fuse/operations/ngram_index.rs @@ -0,0 +1,422 @@ +// Copyright 2023 Databend Cloud +// +// Licensed under the Elastic License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.elastic.co/licensing/elastic-license +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::slice; +use std::sync::Arc; + +use databend_common_catalog::plan::Projection; +use databend_common_catalog::table::Table; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::ColumnId; +use databend_common_expression::DataBlock; +use databend_common_expression::TableSchemaRef; +use databend_common_pipeline_core::Pipeline; +use databend_common_pipeline_sources::AsyncSource; +use databend_common_pipeline_sources::AsyncSourcer; +use databend_common_pipeline_transforms::AsyncTransform; +use databend_common_pipeline_transforms::TransformPipelineHelper; +use databend_common_sql::executor::physical_plans::MutationKind; +use databend_common_storages_fuse::index::filters::BlockFilter; +use databend_common_storages_fuse::index::BloomIndexBuilder; +use databend_common_storages_fuse::index::BloomIndexMeta; +use databend_common_storages_fuse::io::read::bloom::block_filter_reader::load_bloom_filter_by_columns; +use databend_common_storages_fuse::io::read::bloom::block_filter_reader::load_index_meta; +use databend_common_storages_fuse::io::write_data; +use databend_common_storages_fuse::io::BlockReader; +use databend_common_storages_fuse::io::BloomIndexState; +use databend_common_storages_fuse::io::MetaReaders; +use databend_common_storages_fuse::io::NewNgramIndexColumn; +use databend_common_storages_fuse::io::TableMetaLocationGenerator; +use databend_common_storages_fuse::operations::BlockMetaIndex; +use databend_common_storages_fuse::operations::CommitSink; +use databend_common_storages_fuse::operations::MutationGenerator; +use databend_common_storages_fuse::operations::MutationLogEntry; +use databend_common_storages_fuse::operations::MutationLogs; +use databend_common_storages_fuse::operations::TableMutationAggregator; +use databend_common_storages_fuse::FuseStorageFormat; +use databend_common_storages_fuse::FuseTable; +use databend_query::storages::index::BloomIndex; +use databend_query::storages::index::NgramArgs; +use databend_storages_common_cache::CacheAccessor; +use databend_storages_common_cache::CacheManager; +use databend_storages_common_cache::LoadParams; +use databend_storages_common_io::ReadSettings; +use databend_storages_common_table_meta::meta::BlockMeta; +use databend_storages_common_table_meta::meta::ExtendedBlockMeta; +use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::meta::Statistics; +use databend_storages_common_table_meta::meta::Versioned; +use opendal::Operator; + +pub async fn do_refresh_ngram_index( + fuse_table: &FuseTable, + ctx: Arc, + index_name: String, + _index_schema: TableSchemaRef, + segment_locs: Option>, + pipeline: &mut Pipeline, +) -> Result<()> { + let index = fuse_table + .get_table_info() + .meta + .indexes + .get(&index_name) + .ok_or_else(|| { + ErrorCode::RefreshIndexError(format!("Ngram index: {index_name} not found")) + })?; + let Some(snapshot) = fuse_table.read_table_snapshot().await? else { + // no snapshot + return Ok(()); + }; + let table_schema = &fuse_table.get_table_info().meta.schema; + let index_column_id = index.column_ids[0]; + let projection = Projection::Columns(vec![table_schema + .fields() + .iter() + .position(|f| f.column_id() == index_column_id) + .unwrap()]); + let index_ngram_args = FuseTable::create_ngram_index_args( + &fuse_table.get_table_info().meta, + &projection.project_schema(table_schema), + )? + .pop() + .unwrap(); + + let block_reader = + fuse_table.create_block_reader(ctx.clone(), projection, false, false, false)?; + + let segment_reader = + MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema.clone()); + + // If no segment locations are specified, iterates through all segments + let segment_locs = if let Some(segment_locs) = segment_locs { + segment_locs + .into_iter() + .filter(|s| snapshot.segments.contains(s)) + .collect() + } else { + snapshot.segments.clone() + }; + + if segment_locs.is_empty() { + return Ok(()); + } + let operator = fuse_table.get_operator_ref(); + + // Read the segment infos and collect the block metas that need to generate the index. + let mut block_metas = VecDeque::new(); + let mut block_meta_index_map = HashMap::new(); + for (segment_idx, (segment_loc, ver)) in segment_locs.into_iter().enumerate() { + let segment_info = segment_reader + .read(&LoadParams { + location: segment_loc.to_string(), + len_hint: None, + ver, + put_cache: false, + }) + .await?; + + for (block_idx, block_meta) in segment_info.block_metas()?.into_iter().enumerate() { + let index_location = + TableMetaLocationGenerator::gen_bloom_index_location_from_block_location( + &block_meta.location.0, + ); + // only generate bloom index if it is not exist. + let bloom_meta = if let Ok(content_length) = operator + .stat(&index_location) + .await + .map(|meta| meta.content_length()) + { + let bloom_index_meta = + load_index_meta(operator.clone(), &index_location, content_length).await?; + + let ngram_index_name = BloomIndex::build_filter_ngram_name( + index_column_id, + index_ngram_args.gram_size(), + index_ngram_args.bloom_size(), + ); + if bloom_index_meta + .columns + .iter() + .any(|(column_name, _)| column_name == &ngram_index_name) + { + continue; + } + Some(bloom_index_meta) + } else { + None + }; + block_meta_index_map.insert( + block_meta.location.clone(), + ( + BlockMetaIndex { + segment_idx, + block_idx, + }, + bloom_meta, + ), + ); + block_metas.push_back(block_meta); + } + } + if block_metas.is_empty() { + return Ok(()); + } + + let settings = ReadSettings::from_ctx(&ctx)?; + let write_settings = fuse_table.get_write_settings(); + let storage_format = write_settings.storage_format; + + pipeline.add_source( + |output| { + let inner = NgramIndexSource::new( + settings, + storage_format, + block_reader.clone(), + block_metas.clone(), + ); + AsyncSourcer::create(ctx.clone(), output, inner) + }, + 1, + )?; + + let block_nums = block_metas.len(); + let max_threads = ctx.get_settings().get_max_threads()? as usize; + let max_threads = std::cmp::min(block_nums, max_threads); + pipeline.try_resize(max_threads)?; + pipeline.add_async_transformer(|| { + NgramIndexTransform::new( + ctx.clone(), + operator.clone(), + index_ngram_args.clone(), + block_meta_index_map.clone(), + index_column_id, + ) + }); + + pipeline.try_resize(1)?; + let table_meta_timestamps = + ctx.get_table_meta_timestamps(fuse_table, Some(snapshot.clone()))?; + pipeline.add_async_accumulating_transformer(|| { + TableMutationAggregator::create( + fuse_table, + ctx.clone(), + vec![], + vec![], + vec![], + Statistics::default(), + MutationKind::Update, + table_meta_timestamps, + ) + }); + + let prev_snapshot_id = snapshot.snapshot_id; + let snapshot_gen = MutationGenerator::new(Some(snapshot), MutationKind::Update); + pipeline.add_sink(|input| { + CommitSink::try_create( + fuse_table, + ctx.clone(), + None, + vec![], + snapshot_gen.clone(), + input, + None, + Some(prev_snapshot_id), + None, + table_meta_timestamps, + ) + })?; + + Ok(()) +} + +pub struct NgramIndexSource { + settings: ReadSettings, + storage_format: FuseStorageFormat, + block_reader: Arc, + block_metas: VecDeque>, + is_finished: bool, +} + +impl NgramIndexSource { + pub fn new( + settings: ReadSettings, + storage_format: FuseStorageFormat, + block_reader: Arc, + block_metas: VecDeque>, + ) -> Self { + Self { + settings, + storage_format, + block_reader, + block_metas, + is_finished: false, + } + } +} + +#[async_trait::async_trait] +impl AsyncSource for NgramIndexSource { + const NAME: &'static str = "NgramIndexSource"; + + #[async_backtrace::framed] + async fn generate(&mut self) -> Result> { + if self.is_finished { + return Ok(None); + } + + match self.block_metas.pop_front() { + Some(block_meta) => { + let block = self + .block_reader + .read_by_meta(&self.settings, &block_meta, &self.storage_format) + .await?; + let block = block.add_meta(Some(Box::new(Arc::unwrap_or_clone(block_meta))))?; + Ok(Some(block)) + } + None => { + self.is_finished = true; + Ok(None) + } + } + } +} + +pub struct NgramIndexTransform { + ctx: Arc, + operator: Operator, + index_ngram_args: NgramArgs, + block_meta_index_map: HashMap>)>, + ngram_column_id: ColumnId, +} + +impl NgramIndexTransform { + pub fn new( + ctx: Arc, + operator: Operator, + index_ngram_args: NgramArgs, + block_meta_index_map: HashMap>)>, + ngram_column_id: ColumnId, + ) -> Self { + Self { + ctx, + operator, + index_ngram_args, + block_meta_index_map, + ngram_column_id, + } + } +} + +#[async_trait::async_trait] +impl AsyncTransform for NgramIndexTransform { + const NAME: &'static str = "NgramIndexTransform"; + + #[async_backtrace::framed] + async fn transform(&mut self, data_block: DataBlock) -> Result { + let block_meta = data_block + .get_meta() + .and_then(BlockMeta::downcast_ref_from) + .unwrap(); + + let index_path = TableMetaLocationGenerator::gen_bloom_index_location_from_block_location( + &block_meta.location.0, + ); + + let mut new_block_meta = block_meta.clone(); + let index_location = (index_path.clone(), BlockFilter::VERSION); + + let (block_meta_index, bloom_index_meta) = self + .block_meta_index_map + .remove(&block_meta.location) + .unwrap(); + + let mut builder = BloomIndexBuilder::create( + self.ctx.get_function_context()?, + BTreeMap::new(), + slice::from_ref(&self.index_ngram_args), + )?; + builder.add_block(&data_block)?; + + if let Some(new_ngram_index) = builder.finalize()? { + let (index, new_ngram_index_column) = if let Some(bloom_index_meta) = bloom_index_meta { + let index_columns = bloom_index_meta + .columns + .iter() + .map(|(name, _)| name.to_string()) + .collect::>(); + let filter = load_bloom_filter_by_columns( + self.operator.clone(), + &index_columns, + &index_path, + block_meta.bloom_filter_index_size, + ) + .await?; + let old_index = BloomIndex::from_filter_block( + self.ctx.get_function_context()?, + filter.filter_schema, + filter.filters, + index_location.1, + )?; + let new_ngram_column = new_ngram_index + .serialize_to_data_block()? + .take_columns() + .pop() + .unwrap(); + + ( + old_index, + Some(NewNgramIndexColumn::new( + self.ngram_column_id, + new_ngram_column, + self.index_ngram_args.clone(), + )), + ) + } else { + (new_ngram_index, None) + }; + let state = + BloomIndexState::from_bloom_index(&index, index_location, new_ngram_index_column)?; + + // remove old bloom index meta + if let Some(cache) = CacheManager::instance().get_bloom_index_meta_cache() { + cache.evict(&index_path); + } + + new_block_meta.bloom_filter_index_size = state.size(); + new_block_meta.ngram_filter_index_size = state.ngram_size(); + write_data(state.data(), &self.operator, &index_path).await?; + } + let extended_block_meta = ExtendedBlockMeta { + block_meta: new_block_meta, + draft_virtual_block_meta: None, + }; + + let entry = MutationLogEntry::ReplacedBlock { + index: block_meta_index, + block_meta: Arc::new(extended_block_meta), + }; + let meta = MutationLogs { + entries: vec![entry], + }; + let new_block = DataBlock::empty_with_meta(Box::new(meta)); + Ok(new_block) + } +} diff --git a/src/query/ee/src/table_index/table_index_handler.rs b/src/query/ee/src/table_index/table_index_handler.rs index 6936894b08f61..8b5066f936768 100644 --- a/src/query/ee/src/table_index/table_index_handler.rs +++ b/src/query/ee/src/table_index/table_index_handler.rs @@ -16,11 +16,20 @@ use std::sync::Arc; use databend_common_base::base::GlobalInstance; use databend_common_catalog::catalog::Catalog; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::TableSchemaRef; use databend_common_meta_app::schema::CreateTableIndexReq; use databend_common_meta_app::schema::DropTableIndexReq; +use databend_common_meta_app::schema::TableIndexType; +use databend_common_pipeline_core::Pipeline; +use databend_common_storages_fuse::FuseTable; use databend_enterprise_table_index::TableIndexHandler; use databend_enterprise_table_index::TableIndexHandlerWrapper; +use databend_storages_common_table_meta::meta::Location; + +use crate::storages::fuse::operations::ngram_index::do_refresh_ngram_index; pub struct RealTableIndexHandler {} @@ -43,6 +52,36 @@ impl TableIndexHandler for RealTableIndexHandler { ) -> Result<()> { catalog.drop_table_index(req).await } + + #[async_backtrace::framed] + async fn do_refresh_table_index( + &self, + index_ty: TableIndexType, + table: &FuseTable, + ctx: Arc, + index_name: String, + index_schema: TableSchemaRef, + segment_locs: Option>, + pipeline: &mut Pipeline, + ) -> Result<()> { + match index_ty { + TableIndexType::Ngram => { + do_refresh_ngram_index( + table, + ctx, + index_name, + index_schema, + segment_locs, + pipeline, + ) + .await?; + } + _ => { + return Err(ErrorCode::RefreshIndexError("Only Ngram support Refresh")); + } + } + Ok(()) + } } impl RealTableIndexHandler { diff --git a/src/query/ee/tests/it/inverted_index/index_refresh.rs b/src/query/ee/tests/it/inverted_index/index_refresh.rs index 80171a6ffdcec..ca6ffdcee4331 100644 --- a/src/query/ee/tests/it/inverted_index/index_refresh.rs +++ b/src/query/ee/tests/it/inverted_index/index_refresh.rs @@ -91,6 +91,7 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> { assert!(res.is_ok()); let refresh_index_plan = RefreshTableIndexPlan { + index_type: databend_common_ast::ast::TableIndexType::Inverted, catalog: fixture.default_catalog_name(), database: fixture.default_db_name(), table: fixture.default_table_name(), diff --git a/src/query/ee/tests/it/inverted_index/pruning.rs b/src/query/ee/tests/it/inverted_index/pruning.rs index 5c7077272ea2c..570713175154a 100644 --- a/src/query/ee/tests/it/inverted_index/pruning.rs +++ b/src/query/ee/tests/it/inverted_index/pruning.rs @@ -525,6 +525,7 @@ async fn test_block_pruner() -> Result<()> { ]); let refresh_index_plan = RefreshTableIndexPlan { + index_type: databend_common_ast::ast::TableIndexType::Inverted, catalog: fixture.default_catalog_name(), database: fixture.default_db_name(), table: test_tbl_name.to_string(), diff --git a/src/query/ee/tests/it/main.rs b/src/query/ee/tests/it/main.rs index a13eb96c05ef8..8b5c2646c8640 100644 --- a/src/query/ee/tests/it/main.rs +++ b/src/query/ee/tests/it/main.rs @@ -16,5 +16,6 @@ mod aggregating_index; mod inverted_index; mod license; +mod ngram_index; mod storages; mod stream; diff --git a/src/query/ee/tests/it/ngram_index/index_refresh.rs b/src/query/ee/tests/it/ngram_index/index_refresh.rs new file mode 100644 index 0000000000000..b543f408a606e --- /dev/null +++ b/src/query/ee/tests/it/ngram_index/index_refresh.rs @@ -0,0 +1,110 @@ +// Copyright 2023 Databend Cloud +// +// Licensed under the Elastic License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.elastic.co/licensing/elastic-license +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_base::base::tokio; +use databend_common_exception::Result; +use databend_common_storage::DataOperator; +use databend_common_storages_fuse::index::BloomIndexMeta; +use databend_common_storages_fuse::io::read::bloom::block_filter_reader::load_index_meta; +use databend_enterprise_query::test_kits::context::EESetup; +use databend_query::test_kits::TestFixture; +use databend_storages_common_table_meta::meta::SingleColumnMeta; +use futures_util::StreamExt; + +#[tokio::test(flavor = "multi_thread")] +async fn test_fuse_do_refresh_ngram_index() -> Result<()> { + let fixture = TestFixture::setup_with_custom(EESetup::new()).await?; + fixture + .default_session() + .get_settings() + .set_data_retention_time_in_days(0)?; + fixture.create_default_database().await?; + + // Create table + fixture + .execute_command("CREATE TABLE default.t3 (a int, b int, c int, d string, e string) storage_format = 'parquet'") + .await?; + // Insert data + fixture + .execute_command("INSERT INTO default.t3 VALUES(1,2,3, 'aaaaaaaaaa', 'aaaaaaaaaaaaa'),(4,5,6,'xxxxxxxxxxx','yyyyyyyyyyy');") + .await?; + fixture + .execute_command("CREATE NGRAM INDEX idx2 ON default.t3(d);") + .await?; + + let meta_0 = get_bloom_index_meta(&fixture).await?; + assert_eq!(meta_0.columns.len(), 5); + fixture + .execute_command("REFRESH NGRAM INDEX idx2 ON default.t3;") + .await?; + let meta_1 = get_bloom_index_meta(&fixture).await?; + + assert_eq!(&meta_0.columns[..], &meta_1.columns[..5]); + assert_eq!(meta_1.columns.len(), 6); + assert_eq!( + &meta_1.columns[5], + &("Ngram(3)_3_1048576".to_string(), SingleColumnMeta { + offset: 424, + len: 1048644, + num_values: 1, + }) + ); + + fixture + .execute_command("DROP NGRAM INDEX idx2 ON default.t3;") + .await?; + fixture + .execute_command("CREATE NGRAM INDEX idx2 ON default.t3(d) gram_size = 5;") + .await?; + fixture + .execute_command("REFRESH NGRAM INDEX idx2 ON default.t3;") + .await?; + let meta_2 = get_bloom_index_meta(&fixture).await?; + assert_eq!(meta_2.columns.len(), 6); + assert_eq!( + &meta_2.columns[5], + &("Ngram(3)_5_1048576".to_string(), SingleColumnMeta { + offset: 424, + len: 1048644, + num_values: 1, + }) + ); + + Ok(()) +} + +async fn get_bloom_index_meta(fixture: &TestFixture) -> Result> { + let block = fixture + .execute_query( + "select bloom_filter_location, bloom_filter_size from fuse_block('default', 't3');", + ) + .await? + .next() + .await + .transpose()? + .unwrap(); + let path = block.columns()[0].to_column().remove_nullable(); + let path_scalar = path.as_string().unwrap().index(0).unwrap(); + let length = block.columns()[1].to_column(); + let length_scalar = length.as_number().unwrap().index(0).unwrap(); + + load_index_meta( + DataOperator::instance().operator(), + path_scalar, + *length_scalar.as_u_int64().unwrap(), + ) + .await +} diff --git a/src/query/ee/tests/it/ngram_index/mod.rs b/src/query/ee/tests/it/ngram_index/mod.rs new file mode 100644 index 0000000000000..32fdcd5ee2145 --- /dev/null +++ b/src/query/ee/tests/it/ngram_index/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Databend Cloud +// +// Licensed under the Elastic License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.elastic.co/licensing/elastic-license +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod index_refresh; diff --git a/src/query/ee_features/table_index/Cargo.toml b/src/query/ee_features/table_index/Cargo.toml index 056646d8c628d..2859509ee1ba5 100644 --- a/src/query/ee_features/table_index/Cargo.toml +++ b/src/query/ee_features/table_index/Cargo.toml @@ -13,7 +13,11 @@ async-trait = { workspace = true } databend-common-base = { workspace = true } databend-common-catalog = { workspace = true } databend-common-exception = { workspace = true } +databend-common-expression = { workspace = true } databend-common-meta-app = { workspace = true } +databend-common-pipeline-core = { workspace = true } +databend-common-storages-fuse = { workspace = true } +databend-storages-common-table-meta = { workspace = true } [build-dependencies] diff --git a/src/query/ee_features/table_index/src/table_index_handler.rs b/src/query/ee_features/table_index/src/table_index_handler.rs index c4744c48f83aa..ca70a510cc18e 100644 --- a/src/query/ee_features/table_index/src/table_index_handler.rs +++ b/src/query/ee_features/table_index/src/table_index_handler.rs @@ -16,9 +16,15 @@ use std::sync::Arc; use databend_common_base::base::GlobalInstance; use databend_common_catalog::catalog::Catalog; +use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use databend_common_expression::TableSchemaRef; use databend_common_meta_app::schema::CreateTableIndexReq; use databend_common_meta_app::schema::DropTableIndexReq; +use databend_common_meta_app::schema::TableIndexType; +use databend_common_pipeline_core::Pipeline; +use databend_common_storages_fuse::FuseTable; +use databend_storages_common_table_meta::meta::Location; #[async_trait::async_trait] pub trait TableIndexHandler: Sync + Send { @@ -33,6 +39,17 @@ pub trait TableIndexHandler: Sync + Send { catalog: Arc, req: DropTableIndexReq, ) -> Result<()>; + + async fn do_refresh_table_index( + &self, + index_ty: TableIndexType, + table: &FuseTable, + ctx: Arc, + index_name: String, + index_schema: TableSchemaRef, + segment_locs: Option>, + pipeline: &mut Pipeline, + ) -> Result<()>; } pub struct TableIndexHandlerWrapper { @@ -61,6 +78,30 @@ impl TableIndexHandlerWrapper { ) -> Result<()> { self.handler.do_drop_table_index(catalog, req).await } + + #[async_backtrace::framed] + pub async fn do_refresh_table_index( + &self, + index_ty: TableIndexType, + table: &FuseTable, + ctx: Arc, + index_name: String, + index_schema: TableSchemaRef, + segment_locs: Option>, + pipeline: &mut Pipeline, + ) -> Result<()> { + self.handler + .do_refresh_table_index( + index_ty, + table, + ctx, + index_name, + index_schema, + segment_locs, + pipeline, + ) + .await + } } pub fn get_table_index_handler() -> Arc { diff --git a/src/query/expression/src/block.rs b/src/query/expression/src/block.rs index 9f94e5e06e02f..03c12fc8f6962 100644 --- a/src/query/expression/src/block.rs +++ b/src/query/expression/src/block.rs @@ -503,6 +503,11 @@ impl DataBlock { self.add_entry(column.into()); } + #[inline] + pub fn remove_column(&mut self, index: usize) { + self.entries.remove(index); + } + #[inline] pub fn add_const_column(&mut self, scalar: Scalar, data_type: DataType) { self.entries.push(BlockEntry::new_const_column( diff --git a/src/query/service/src/interpreters/hook/refresh_hook.rs b/src/query/service/src/interpreters/hook/refresh_hook.rs index 05ee008b0eb89..3589c79459a59 100644 --- a/src/query/service/src/interpreters/hook/refresh_hook.rs +++ b/src/query/service/src/interpreters/hook/refresh_hook.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use databend_common_ast::ast; use databend_common_base::runtime::GlobalIORuntime; use databend_common_catalog::catalog::CatalogManager; use databend_common_catalog::table::Table; @@ -21,6 +22,7 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_meta_app::schema::IndexMeta; use databend_common_meta_app::schema::ListIndexesByIdReq; +use databend_common_meta_app::schema::TableIndexType; use databend_common_meta_types::MetaId; use databend_common_pipeline_core::always_callback; use databend_common_pipeline_core::ExecutionInfo; @@ -263,7 +265,13 @@ async fn generate_refresh_inverted_index_plan( if index.sync_creation { continue; } + let index_type = match index.index_type { + TableIndexType::Inverted => ast::TableIndexType::Inverted, + TableIndexType::Ngram => ast::TableIndexType::Ngram, + TableIndexType::Vector => ast::TableIndexType::Vector, + }; let plan = RefreshTableIndexPlan { + index_type, catalog: desc.catalog.clone(), database: desc.database.clone(), table: desc.table.clone(), diff --git a/src/query/service/src/interpreters/interpreter_table_index_create.rs b/src/query/service/src/interpreters/interpreter_table_index_create.rs index 24968878913db..fcff22b739d6c 100644 --- a/src/query/service/src/interpreters/interpreter_table_index_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_index_create.rs @@ -81,7 +81,14 @@ impl Interpreter for CreateTableIndexInterpreter { )); } ast::TableIndexType::Inverted => TableIndexType::Inverted, - ast::TableIndexType::Ngram => TableIndexType::Ngram, + ast::TableIndexType::Ngram => { + if column_ids.len() != 1 { + return Err(ErrorCode::InvalidArgument( + "Ngram Index has one and only one column", + )); + } + TableIndexType::Ngram + } ast::TableIndexType::Vector => TableIndexType::Vector, }; diff --git a/src/query/service/src/interpreters/interpreter_table_index_refresh.rs b/src/query/service/src/interpreters/interpreter_table_index_refresh.rs index caa3001290c81..889f06a5ac140 100644 --- a/src/query/service/src/interpreters/interpreter_table_index_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_table_index_refresh.rs @@ -14,15 +14,18 @@ use std::sync::Arc; +use databend_common_ast::ast; use databend_common_catalog::table::TableExt; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::TableSchemaRefExt; use databend_common_license::license::Feature; use databend_common_license::license_manager::LicenseManagerSwitch; +use databend_common_meta_app::schema; use databend_common_sql::plans::RefreshTableIndexPlan; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::TableContext; +use databend_enterprise_table_index::get_table_index_handler; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; @@ -51,8 +54,21 @@ impl Interpreter for RefreshTableIndexInterpreter { #[async_backtrace::framed] async fn execute2(&self) -> Result { - LicenseManagerSwitch::instance() - .check_enterprise_enabled(self.ctx.get_license_key(), Feature::InvertedIndex)?; + match self.plan.index_type { + ast::TableIndexType::Inverted => { + LicenseManagerSwitch::instance() + .check_enterprise_enabled(self.ctx.get_license_key(), Feature::InvertedIndex)?; + } + ast::TableIndexType::Ngram => { + LicenseManagerSwitch::instance() + .check_enterprise_enabled(self.ctx.get_license_key(), Feature::NgramIndex)?; + } + ast::TableIndexType::Vector | ast::TableIndexType::Aggregating => { + return Err(ErrorCode::RefreshIndexError( + "Only Inverted and Ngram support Refresh", + )); + } + } let table = self .ctx @@ -66,8 +82,8 @@ impl Interpreter for RefreshTableIndexInterpreter { let table_meta = &table.get_table_info().meta; let Some(index) = table_meta.indexes.get(&index_name) else { return Err(ErrorCode::RefreshIndexError(format!( - "Inverted index {} does not exist", - index_name + "{} index {} does not exist", + self.plan.index_type, index_name ))); }; let mut index_fields = Vec::with_capacity(index.column_ids.len()); @@ -81,8 +97,8 @@ impl Interpreter for RefreshTableIndexInterpreter { } if index_fields.len() != index.column_ids.len() { return Err(ErrorCode::RefreshIndexError(format!( - "Inverted index {} is invalid", - index_name + "{} index {} is invalid", + self.plan.index_type, index_name ))); } let index_version = index.version.clone(); @@ -91,17 +107,39 @@ impl Interpreter for RefreshTableIndexInterpreter { let mut build_res = PipelineBuildResult::create(); let fuse_table = FuseTable::try_from_table(table.as_ref())?; - fuse_table - .do_refresh_inverted_index( - self.ctx.clone(), - index_name, - index_version, - &index.options, - index_schema, - segment_locs, - &mut build_res.main_pipeline, - ) - .await?; + + match self.plan.index_type { + ast::TableIndexType::Inverted => { + fuse_table + .do_refresh_inverted_index( + self.ctx.clone(), + index_name, + index_version, + &index.options, + index_schema, + segment_locs, + &mut build_res.main_pipeline, + ) + .await?; + } + ast::TableIndexType::Ngram => { + let handler = get_table_index_handler(); + let _ = handler + .do_refresh_table_index( + schema::TableIndexType::Ngram, + fuse_table, + self.ctx.clone(), + index_name, + index_schema, + segment_locs, + &mut build_res.main_pipeline, + ) + .await?; + } + ast::TableIndexType::Vector | ast::TableIndexType::Aggregating => { + unreachable!() + } + } Ok(build_res) } diff --git a/src/query/sql/src/planner/binder/ddl/index.rs b/src/query/sql/src/planner/binder/ddl/index.rs index 3362a64055710..dbf5c8bde93a6 100644 --- a/src/query/sql/src/planner/binder/ddl/index.rs +++ b/src/query/sql/src/planner/binder/ddl/index.rs @@ -813,7 +813,7 @@ impl Binder { limit: _, } = stmt; - if !matches!(index_type, TableIndexType::Inverted) { + if !matches!(index_type, TableIndexType::Inverted | TableIndexType::Ngram) { return Err(ErrorCode::UnsupportedIndex(format!( "Table index {} does not support refresh", index_type @@ -825,6 +825,7 @@ impl Binder { let index_name = self.normalize_object_identifier(index_name); let plan = RefreshTableIndexPlan { + index_type: *index_type, catalog, database, table, diff --git a/src/query/sql/src/planner/plans/ddl/index.rs b/src/query/sql/src/planner/plans/ddl/index.rs index fa3c228c13102..41928df9ccbb2 100644 --- a/src/query/sql/src/planner/plans/ddl/index.rs +++ b/src/query/sql/src/planner/plans/ddl/index.rs @@ -78,6 +78,7 @@ pub struct DropTableIndexPlan { #[derive(Clone, Debug, PartialEq, Eq)] pub struct RefreshTableIndexPlan { + pub index_type: TableIndexType, pub catalog: String, pub database: String, pub table: String, diff --git a/src/query/storages/common/index/src/bloom_index.rs b/src/query/storages/common/index/src/bloom_index.rs index e24cc5b3e6dc8..bf5ec35035e64 100644 --- a/src/query/storages/common/index/src/bloom_index.rs +++ b/src/query/storages/common/index/src/bloom_index.rs @@ -489,8 +489,12 @@ impl BloomIndex { } } - pub fn build_filter_ngram_name(field: &TableField, gram_size: usize) -> String { - format!("Ngram({})_{gram_size}", field.column_id()) + pub fn build_filter_ngram_name( + column_id: ColumnId, + gram_size: usize, + bloom_size: u64, + ) -> String { + format!("Ngram({column_id})_{gram_size}_{bloom_size}") } fn find( @@ -508,7 +512,11 @@ impl BloomIndex { // The column doesn't have a Ngram Arg. return Ok(FilterEvalResult::Uncertain); }; - BloomIndex::build_filter_ngram_name(table_field, ngram_arg.gram_size) + BloomIndex::build_filter_ngram_name( + table_field.column_id(), + ngram_arg.gram_size, + ngram_arg.bloom_size, + ) } else { BloomIndex::build_filter_bloom_name(self.version, table_field)? }; @@ -572,6 +580,7 @@ struct ColumnFilterBuilder { index: FieldIndex, field: TableField, gram_size: usize, + bloom_size: u64, builder: FilterImplBuilder, } @@ -619,6 +628,7 @@ impl BloomIndexBuilder { index, field: field.clone(), gram_size: 0, + bloom_size: 0, builder: FilterImplBuilder::Xor(Xor8Builder::create()), }); } @@ -627,6 +637,7 @@ impl BloomIndexBuilder { index: arg.index, field: arg.field.clone(), gram_size: arg.gram_size, + bloom_size: arg.bloom_size, builder: FilterImplBuilder::Ngram(BloomBuilder::create( arg.bloom_size, NGRAM_HASH_SEED, @@ -792,8 +803,11 @@ impl BloomIndexBuilder { } for ngram_column in self.ngram_columns.iter_mut() { let filter = ngram_column.builder.build()?; - let filter_name = - BloomIndex::build_filter_ngram_name(&ngram_column.field, ngram_column.gram_size); + let filter_name = BloomIndex::build_filter_ngram_name( + ngram_column.field.column_id(), + ngram_column.gram_size, + ngram_column.bloom_size, + ); filter_fields.push(TableField::new(&filter_name, TableDataType::Binary)); filters.push(Arc::new(filter)); } diff --git a/src/query/storages/fuse/src/io/mod.rs b/src/query/storages/fuse/src/io/mod.rs index 63b43a9ff785f..227147dd0e818 100644 --- a/src/query/storages/fuse/src/io/mod.rs +++ b/src/query/storages/fuse/src/io/mod.rs @@ -50,6 +50,7 @@ pub use write::CachedMetaWriter; pub use write::InvertedIndexBuilder; pub use write::InvertedIndexWriter; pub use write::MetaWriter; +pub use write::NewNgramIndexColumn; pub(crate) use write::StreamBlockBuilder; pub(crate) use write::StreamBlockProperties; pub use write::VirtualColumnBuilder; diff --git a/src/query/storages/fuse/src/io/read/bloom/block_filter_reader.rs b/src/query/storages/fuse/src/io/read/bloom/block_filter_reader.rs index 3d5bc526aeaf9..477fb109247fe 100644 --- a/src/query/storages/fuse/src/io/read/bloom/block_filter_reader.rs +++ b/src/query/storages/fuse/src/io/read/bloom/block_filter_reader.rs @@ -82,7 +82,7 @@ impl BloomBlockFilterReader for Location { /// load index column data #[fastrace::trace] -async fn load_bloom_filter_by_columns<'a>( +pub async fn load_bloom_filter_by_columns<'a>( dal: Operator, column_needed: &'a [String], index_path: &'a str, @@ -174,7 +174,11 @@ async fn load_column_bloom_filter<'a>( /// Loads index meta data /// read data from cache, or populate cache items if possible #[fastrace::trace] -async fn load_index_meta(dal: Operator, path: &str, length: u64) -> Result> { +pub async fn load_index_meta( + dal: Operator, + path: &str, + length: u64, +) -> Result> { let path_owned = path.to_owned(); async move { let reader = MetaReaders::bloom_index_meta_reader(dal); diff --git a/src/query/storages/fuse/src/io/write/bloom_index_writer.rs b/src/query/storages/fuse/src/io/write/bloom_index_writer.rs index d8fc9328970b2..35970dae03458 100644 --- a/src/query/storages/fuse/src/io/write/bloom_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/bloom_index_writer.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::borrow::Cow; use std::collections::BTreeMap; use std::collections::HashMap; use std::sync::Arc; @@ -19,10 +20,13 @@ use std::sync::Arc; use databend_common_catalog::plan::Projection; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use databend_common_expression::BlockEntry; use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; use databend_common_expression::FieldIndex; +use databend_common_expression::TableDataType; use databend_common_expression::TableField; +use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRef; use databend_common_io::constants::DEFAULT_BLOCK_INDEX_BUFFER_SIZE; use databend_storages_common_blocks::blocks_to_parquet; @@ -40,6 +44,22 @@ use opendal::Operator; use crate::io::BlockReader; use crate::FuseStorageFormat; +pub struct NewNgramIndexColumn { + column_id: ColumnId, + column: BlockEntry, + ngram_args: NgramArgs, +} + +impl NewNgramIndexColumn { + pub fn new(column_id: ColumnId, column: BlockEntry, ngram_args: NgramArgs) -> Self { + Self { + column_id, + column, + ngram_args, + } + } +} + pub struct BloomIndexState { pub(crate) data: Vec, pub(crate) size: u64, @@ -49,11 +69,42 @@ pub struct BloomIndexState { } impl BloomIndexState { - pub fn from_bloom_index(bloom_index: &BloomIndex, location: Location) -> Result { - let index_block = bloom_index.serialize_to_data_block()?; + pub fn from_bloom_index( + bloom_index: &BloomIndex, + location: Location, + new_index_column: Option, + ) -> Result { + let mut index_block = bloom_index.serialize_to_data_block()?; + let filter_schema = if let Some(NewNgramIndexColumn { + column_id, + column: new_column, + ngram_args, + }) = new_index_column + { + let ngram_name = BloomIndex::build_filter_ngram_name( + column_id, + ngram_args.gram_size(), + ngram_args.bloom_size(), + ); + let mut new_filter_schema = TableSchema::clone(&bloom_index.filter_schema); + let ngram_field = TableField::new(&ngram_name, TableDataType::Binary); + + if let Some(pos) = new_filter_schema + .fields() + .iter() + .position(|f| f.name().starts_with(format!("Ngram({column_id})").as_str())) + { + new_filter_schema.fields.remove(pos); + index_block.remove_column(pos); + } + new_filter_schema.add_columns(&[ngram_field])?; + index_block.add_entry(new_column); + Cow::Owned(Arc::new(new_filter_schema)) + } else { + Cow::Borrowed(&bloom_index.filter_schema) + }; // Calculate ngram index size - let ngram_indexes = &bloom_index - .filter_schema + let ngram_indexes = filter_schema .fields() .iter() .enumerate() @@ -63,7 +114,7 @@ impl BloomIndexState { let ngram_size = if !ngram_indexes.is_empty() { let mut ngram_size = 0; for i in ngram_indexes { - let column = index_block.get_by_offset(*i); + let column = index_block.get_by_offset(i); ngram_size += column.value().memory_size() as u64; } Some(ngram_size) @@ -72,7 +123,7 @@ impl BloomIndexState { }; let mut data = Vec::with_capacity(DEFAULT_BLOCK_INDEX_BUFFER_SIZE); let _ = blocks_to_parquet( - &bloom_index.filter_schema, + &filter_schema, vec![index_block], &mut data, TableCompression::None, @@ -100,13 +151,26 @@ impl BloomIndexState { builder.add_block(block)?; let maybe_bloom_index = builder.finalize()?; if let Some(bloom_index) = maybe_bloom_index { - Ok(Some(Self::from_bloom_index(&bloom_index, location)?)) + Ok(Some(Self::from_bloom_index(&bloom_index, location, None)?)) } else { Ok(None) } } + + pub fn size(&self) -> u64 { + self.size + } + + pub fn data(self) -> Vec { + self.data + } + + pub fn ngram_size(&self) -> Option { + self.ngram_size + } } +#[derive(Clone)] pub struct BloomIndexRebuilder { pub table_ctx: Arc, pub table_schema: TableSchemaRef, @@ -165,7 +229,11 @@ impl BloomIndexRebuilder { match maybe_bloom_index { None => Ok(None), Some(bloom_index) => Ok(Some(( - BloomIndexState::from_bloom_index(&bloom_index, bloom_index_location.clone())?, + BloomIndexState::from_bloom_index( + &bloom_index, + bloom_index_location.clone(), + None, + )?, bloom_index, ))), } diff --git a/src/query/storages/fuse/src/io/write/mod.rs b/src/query/storages/fuse/src/io/write/mod.rs index b0af3633055dc..8d61274f45a6f 100644 --- a/src/query/storages/fuse/src/io/write/mod.rs +++ b/src/query/storages/fuse/src/io/write/mod.rs @@ -27,6 +27,7 @@ pub use block_writer::BlockSerialization; pub use block_writer::BlockWriter; pub use bloom_index_writer::BloomIndexRebuilder; pub use bloom_index_writer::BloomIndexState; +pub use bloom_index_writer::NewNgramIndexColumn; pub(crate) use inverted_index_writer::create_index_schema; pub(crate) use inverted_index_writer::create_inverted_index_builders; pub(crate) use inverted_index_writer::create_tokenizer_manager; diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index 543d6663a8aa5..589079b9acb5f 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -279,6 +279,7 @@ impl StreamBlockBuilder { Some(BloomIndexState::from_bloom_index( &bloom_index, bloom_index_location, + None, )?) } else { None @@ -388,7 +389,10 @@ impl StreamBlockProperties { let bloom_columns_map = table .bloom_index_cols .bloom_index_fields(source_schema.clone(), BloomIndex::supported_type)?; - let ngram_args = FuseTable::create_ngram_index_args(&table.table_info.meta)?; + let ngram_args = FuseTable::create_ngram_index_args( + &table.table_info.meta, + &table.table_info.meta.schema, + )?; let bloom_column_ids = bloom_columns_map .values() .map(|v| v.column_id()) diff --git a/src/query/storages/fuse/src/operations/changes.rs b/src/query/storages/fuse/src/operations/changes.rs index b8e72be455d15..e3a48a7244f2c 100644 --- a/src/query/storages/fuse/src/operations/changes.rs +++ b/src/query/storages/fuse/src/operations/changes.rs @@ -297,7 +297,8 @@ impl FuseTable { ) }; let bloom_index_cols = self.bloom_index_cols(); - let ngram_args = Self::create_ngram_index_args(&self.table_info.meta)?; + let ngram_args = + Self::create_ngram_index_args(&self.table_info.meta, &self.table_info.meta.schema)?; let mut pruner = FusePruner::create_with_pages( &ctx, self.get_operator(), diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs index 78e8c3667bac9..83d2cc46a9d7e 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs @@ -151,7 +151,10 @@ impl TransformSerializeBlock { let bloom_columns_map = table .bloom_index_cols .bloom_index_fields(source_schema.clone(), BloomIndex::supported_type)?; - let ngram_args = FuseTable::create_ngram_index_args(&table.table_info.meta)?; + let ngram_args = FuseTable::create_ngram_index_args( + &table.table_info.meta, + &table.table_info.meta.schema, + )?; let inverted_index_builders = create_inverted_index_builders(&table.table_info.meta); let virtual_column_builder = if ctx diff --git a/src/query/storages/fuse/src/operations/merge.rs b/src/query/storages/fuse/src/operations/merge.rs index 2c2f0d5f8d015..49a6e2b89f300 100644 --- a/src/query/storages/fuse/src/operations/merge.rs +++ b/src/query/storages/fuse/src/operations/merge.rs @@ -92,7 +92,10 @@ impl FuseTable { let bloom_columns_map = self .bloom_index_cols() .bloom_index_fields(new_schema.clone(), BloomIndex::supported_type)?; - let ngram_args = FuseTable::create_ngram_index_args(&self.table_info.meta)?; + let ngram_args = FuseTable::create_ngram_index_args( + &self.table_info.meta, + &self.table_info.meta.schema, + )?; let inverted_index_builders = create_inverted_index_builders(&self.table_info.meta); let block_builder = BlockBuilder { diff --git a/src/query/storages/fuse/src/operations/mutation_source.rs b/src/query/storages/fuse/src/operations/mutation_source.rs index 37c7a761cb406..75b7c5d167bf6 100644 --- a/src/query/storages/fuse/src/operations/mutation_source.rs +++ b/src/query/storages/fuse/src/operations/mutation_source.rs @@ -190,7 +190,7 @@ impl FuseTable { self.schema_with_stream(), &push_down, self.bloom_index_cols(), - Self::create_ngram_index_args(&self.table_info.meta)?, + Self::create_ngram_index_args(&self.table_info.meta, &self.table_info.meta.schema)?, None, )?; diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 327c9fca861e1..0c63345596c7b 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -37,6 +37,7 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::Scalar; +use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRef; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_meta_app::schema::TableIndexType; @@ -634,7 +635,8 @@ impl FuseTable { table_schema: TableSchemaRef, dal: Operator, ) -> Result { - let ngram_args = Self::create_ngram_index_args(&self.table_info.meta)?; + let ngram_args = + Self::create_ngram_index_args(&self.table_info.meta, &self.table_info.meta.schema)?; let bloom_index_builder = if ctx .get_settings() .get_enable_auto_fix_missing_bloom_index()? @@ -686,7 +688,10 @@ impl FuseTable { Ok(pruner) } - pub fn create_ngram_index_args(table_meta: &TableMeta) -> Result> { + pub fn create_ngram_index_args( + table_meta: &TableMeta, + table_schema: &TableSchema, + ) -> Result> { let mut ngram_index_args = Vec::with_capacity(table_meta.indexes.len()); for index in table_meta.indexes.values() { if !matches!(index.index_type, TableIndexType::Ngram) { @@ -696,8 +701,7 @@ impl FuseTable { continue; } - let Some((pos, field)) = table_meta - .schema + let Some((pos, field)) = table_schema .fields() .iter() .find_position(|field| field.column_id() == index.column_ids[0]) diff --git a/src/query/storages/fuse/src/pruning/bloom_pruner.rs b/src/query/storages/fuse/src/pruning/bloom_pruner.rs index 2f7df2a735bf8..b2563748e2d3c 100644 --- a/src/query/storages/fuse/src/pruning/bloom_pruner.rs +++ b/src/query/storages/fuse/src/pruning/bloom_pruner.rs @@ -168,8 +168,9 @@ impl BloomPrunerCreator { } if let Some(ngram_arg) = self.ngram_args.iter().find(|arg| arg.field() == field) { acc.push(BloomIndex::build_filter_ngram_name( - field, + field.column_id(), ngram_arg.gram_size(), + ngram_arg.bloom_size(), )); } Ok::<_, ErrorCode>(acc) diff --git a/tests/sqllogictests/suites/ee/08_ee_ngram_index/08_0000_ngram_index_base.test b/tests/sqllogictests/suites/ee/08_ee_ngram_index/08_0000_ngram_index_base.test index 0bb45d7148607..91fafcec865c6 100644 --- a/tests/sqllogictests/suites/ee/08_ee_ngram_index/08_0000_ngram_index_base.test +++ b/tests/sqllogictests/suites/ee/08_ee_ngram_index/08_0000_ngram_index_base.test @@ -34,10 +34,10 @@ INSERT INTO t1 VALUES query III select row_count, bloom_filter_size, ngram_index_size from fuse_block('test_gram_index', 't1') ---- -4 1049458 1048617 +4 1049482 1048617 statement ok -CREATE TABLE t2 (id int, content string) +CREATE TABLE t2 (id int, content string, name string) statement error CREATE NGRAM INDEX idx2 ON t2(content) gram_size = 0 @@ -51,6 +51,9 @@ CREATE NGRAM INDEX idx2 ON t2(content) bloom_size = 511 statement error CREATE NGRAM INDEX idx2 ON t2(content) bloom_size = 10485761 +statement error +CREATE NGRAM INDEX idx2 ON t2(content, name) gram_size = 5 bloom_size = 1048576 + statement ok CREATE NGRAM INDEX idx2 ON t2(content) gram_size = 5 bloom_size = 1048576 @@ -76,6 +79,32 @@ show create table t1; ---- t1 CREATE TABLE t1 ( id INT NULL, content VARCHAR NULL, SYNC NGRAM INDEX idx1 (content) bloom_size = '1048576', gram_size = '5' ) ENGINE=FUSE +statement ok +CREATE OR REPLACE TABLE t1 (id int, content string) + +statement ok +INSERT INTO t1 VALUES +(1, 'The quick brown fox jumps over the lazy dog'), +(2, 'A picture is worth a thousand words'), +(3, 'The early bird catches the worm'), +(4, 'Actions speak louder than words'); + +query II +select block_size, bloom_filter_size, ngram_index_size from fuse_block('test_gram_index', 't1'); +---- +206 654 NULL + +statement ok +CREATE NGRAM INDEX idx1 ON t1(content) gram_size = 5 bloom_size = 1048576 + +statement ok +REFRESH NGRAM INDEX idx1 ON t1 + +query II +select block_size, bloom_filter_size, ngram_index_size from fuse_block('test_gram_index', 't1'); +---- +206 1049482 1048617 + statement ok use default