diff --git a/docs/docs/core/flow_def.mdx b/docs/docs/core/flow_def.mdx index 5d17121d1..5f96dad11 100644 --- a/docs/docs/core/flow_def.mdx +++ b/docs/docs/core/flow_def.mdx @@ -327,6 +327,44 @@ Following metrics are supported: | L2Distance | [L2 distance (a.k.a. Euclidean distance)](https://en.wikipedia.org/wiki/Euclidean_distance) | Smaller is more similar | | InnerProduct | [Inner product](https://en.wikipedia.org/wiki/Inner_product_space) | Larger is more similar | +### Full-Text Search (FTS) Index + +*Full-text search index* is specified by `fts_indexes` (`Sequence[FtsIndexDef]`). `FtsIndexDef` has the following fields: + +* `field_name`: the field to create FTS index. +* `parameters` (optional): a dictionary of parameters to pass to the target's FTS index creation. The supported parameters vary by target. + +For example, with LanceDB: + + + + +```python +@cocoindex.flow_def(name="DemoFlow") +def demo_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): + ... + demo_collector = data_scope.add_collector() + ... + demo_collector.export( + "demo_target", DemoTargetSpec(...), + primary_key_fields=["id"], + fts_indexes=[ + # Basic FTS index with default tokenizer + cocoindex.FtsIndexDef("content"), + # FTS index with custom tokenizer + cocoindex.FtsIndexDef("description", parameters={"language": "English"}) + ]) +``` + + + + +:::note + +FTS indexes are currently only supported for LanceDB target on its enterprise edition. Other targets will raise an error if FTS indexes are specified. + +::: + ## Miscellaneous ### Getting App Namespace diff --git a/docs/docs/targets/lancedb.md b/docs/docs/targets/lancedb.md index cda7fdf81..95219ce9f 100644 --- a/docs/docs/targets/lancedb.md +++ b/docs/docs/targets/lancedb.md @@ -20,7 +20,6 @@ Here's how CocoIndex data elements map to LanceDB elements during export: | a collected row | a row | | a field | a column | - ::::info Installation and import This target is provided via an optional dependency `[lancedb]`: @@ -41,14 +40,15 @@ import cocoindex.targets.lancedb as coco_lancedb The spec `coco_lancedb.LanceDB` takes the following fields: -* `db_uri` (`str`, required): The LanceDB database location (e.g. `./lancedb_data`). -* `table_name` (`str`, required): The name of the table to export the data to. -* `db_options` (`coco_lancedb.DatabaseOptions`, optional): Advanced database options. - * `storage_options` (`dict[str, Any]`, optional): Passed through to LanceDB when connecting. +* `db_uri` (`str`, required): The LanceDB database location (e.g. `./lancedb_data`). +* `table_name` (`str`, required): The name of the table to export the data to. +* `db_options` (`coco_lancedb.DatabaseOptions`, optional): Advanced database options. + * `storage_options` (`dict[str, Any]`, optional): Passed through to LanceDB when connecting. Additional notes: -* Exactly one primary key field is required for LanceDB targets. We create B-Tree index on this key column. +* Exactly one primary key field is required for LanceDB targets. We create B-Tree index on this key column. +* **Full-Text Search (FTS) indexes** are supported via the `fts_indexes` parameter. Note that FTS functionality requires [LanceDB Enterprise](https://lancedb.com/docs/indexing/fts-index/). You can pass any parameters supported by the target's FTS index creation API (e.g., `tokenizer_name` for LanceDB). See [LanceDB FTS documentation](https://lancedb.com/docs/indexing/fts-index/) for full parameter details. :::info @@ -59,6 +59,38 @@ If you want to use vector indexes, you can run the flow once to populate the tar You can find an end-to-end example here: [examples/text_embedding_lancedb](https://github.com/cocoindex-io/cocoindex/tree/main/examples/text_embedding_lancedb). +### FTS Index Example + +```python +import cocoindex +import cocoindex.targets.lancedb as coco_lancedb + +@cocoindex.flow_def(name="DocumentSearchFlow") +def document_search_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope): + # ... source and transformations ... + + doc_collector = data_scope.add_collector() + # ... collect document data ... + + doc_collector.export( + "documents", + coco_lancedb.LanceDB( + db_uri="./lancedb_data", + table_name="documents" + ), + primary_key_fields=["id"], + # Add FTS indexes for full-text search + fts_indexes=[ + # Basic FTS index with default tokenizer + cocoindex.FtsIndexDef("content"), + # FTS index with stemming for better search recall + cocoindex.FtsIndexDef("description", parameters={"tokenizer_name": "en_stem"}), + # FTS index with position tracking for phrase searches + cocoindex.FtsIndexDef("title", parameters={"tokenizer_name": "default", "with_position": True}) + ] + ) +``` + ## `connect_async()` helper We provide a helper to obtain a shared `AsyncConnection` that is reused across your process and shared with CocoIndex's writer for strong read-after-write consistency: @@ -85,6 +117,7 @@ Once `db_uri` matches, it automatically reuses the same connection instance with This achieves strong consistency between your indexing and querying logic, if they run in the same process. ## Example + None: @@ -432,6 +433,7 @@ def export( index_options = index.IndexOptions( primary_key_fields=primary_key_fields, vector_indexes=vector_indexes, + fts_indexes=fts_indexes, ) self._flow_builder_state.engine_flow_builder.export( target_name, diff --git a/python/cocoindex/index.py b/python/cocoindex/index.py index 6c5e11cb2..b03e25789 100644 --- a/python/cocoindex/index.py +++ b/python/cocoindex/index.py @@ -1,6 +1,6 @@ from enum import Enum from dataclasses import dataclass -from typing import Sequence, Union +from typing import Sequence, Union, Any class VectorSimilarityMetric(Enum): @@ -40,6 +40,19 @@ class VectorIndexDef: method: VectorIndexMethod | None = None +@dataclass +class FtsIndexDef: + """ + Define a full-text search index on a field. + + The parameters field can contain any keyword arguments supported by the target's + FTS index creation API (e.g., tokenizer_name for LanceDB). + """ + + field_name: str + parameters: dict[str, Any] | None = None + + @dataclass class IndexOptions: """ @@ -48,3 +61,4 @@ class IndexOptions: primary_key_fields: Sequence[str] vector_indexes: Sequence[VectorIndexDef] = () + fts_indexes: Sequence[FtsIndexDef] = () diff --git a/python/cocoindex/targets/lancedb.py b/python/cocoindex/targets/lancedb.py index 08bfadcbd..0445b5676 100644 --- a/python/cocoindex/targets/lancedb.py +++ b/python/cocoindex/targets/lancedb.py @@ -20,7 +20,7 @@ VectorTypeSchema, TableType, ) -from ..index import VectorIndexDef, IndexOptions, VectorSimilarityMetric +from ..index import VectorIndexDef, FtsIndexDef, IndexOptions, VectorSimilarityMetric _logger = logging.getLogger(__name__) @@ -48,11 +48,19 @@ class _VectorIndex: metric: VectorSimilarityMetric +@dataclasses.dataclass +class _FtsIndex: + name: str + field_name: str + parameters: dict[str, Any] | None = None + + @dataclasses.dataclass class _State: key_field_schema: FieldSchema value_fields_schema: list[FieldSchema] vector_indexes: list[_VectorIndex] | None = None + fts_indexes: list[_FtsIndex] | None = None db_options: DatabaseOptions | None = None @@ -318,6 +326,18 @@ def get_setup_state( if index_options.vector_indexes is not None else None ), + fts_indexes=( + [ + _FtsIndex( + name=f"__{index.field_name}__fts__idx", + field_name=index.field_name, + parameters=index.parameters, + ) + for index in index_options.fts_indexes + ] + if index_options.fts_indexes is not None + else None + ), ) @staticmethod @@ -412,6 +432,30 @@ async def apply_setup_change( if vector_index_name in existing_vector_indexes: await table.drop_index(vector_index_name) + # Handle FTS indexes + unseen_prev_fts_indexes = { + index.name for index in (previous and previous.fts_indexes) or [] + } + existing_fts_indexes = {index.name for index in await table.list_indices()} + + for fts_index in current.fts_indexes or []: + if fts_index.name in unseen_prev_fts_indexes: + unseen_prev_fts_indexes.remove(fts_index.name) + else: + try: + # Create FTS index using create_fts_index() API + # Pass parameters as kwargs to support any future FTS index options + kwargs = fts_index.parameters if fts_index.parameters else {} + await table.create_fts_index(fts_index.field_name, **kwargs) + except Exception as e: # pylint: disable=broad-exception-caught + raise RuntimeError( + f"Exception in creating FTS index on field {fts_index.field_name}: {e}" + ) from e + + for fts_index_name in unseen_prev_fts_indexes: + if fts_index_name in existing_fts_indexes: + await table.drop_index(fts_index_name) + @staticmethod async def prepare( spec: LanceDB, diff --git a/rust/cocoindex/src/base/spec.rs b/rust/cocoindex/src/base/spec.rs index 7b7a7fe72..899e3e759 100644 --- a/rust/cocoindex/src/base/spec.rs +++ b/rust/cocoindex/src/base/spec.rs @@ -459,12 +459,33 @@ impl fmt::Display for VectorIndexDef { } } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct FtsIndexDef { + pub field_name: FieldName, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub parameters: Option>, +} + +impl fmt::Display for FtsIndexDef { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.parameters { + None => write!(f, "{}", self.field_name), + Some(params) => { + let params_str = serde_json::to_string(params).unwrap_or_else(|_| "{}".to_string()); + write!(f, "{}:{}", self.field_name, params_str) + } + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct IndexOptions { #[serde(default, skip_serializing_if = "Option::is_none")] pub primary_key_fields: Option>, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub vector_indexes: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub fts_indexes: Vec, } impl IndexOptions { @@ -490,7 +511,16 @@ impl fmt::Display for IndexOptions { .map(|v| v.to_string()) .collect::>() .join(","); - write!(f, "keys={primary_keys}, indexes={vector_indexes}") + let fts_indexes = self + .fts_indexes + .iter() + .map(|f| f.to_string()) + .collect::>() + .join(","); + write!( + f, + "keys={primary_keys}, vector_indexes={vector_indexes}, fts_indexes={fts_indexes}" + ) } } diff --git a/rust/cocoindex/src/ops/targets/kuzu.rs b/rust/cocoindex/src/ops/targets/kuzu.rs index fbf9c9da7..48a999b6a 100644 --- a/rust/cocoindex/src/ops/targets/kuzu.rs +++ b/rust/cocoindex/src/ops/targets/kuzu.rs @@ -772,6 +772,9 @@ impl TargetFactoryBase for Factory { if !data_coll.index_options.vector_indexes.is_empty() { api_bail!("Vector indexes are not supported for Kuzu yet"); } + if !data_coll.index_options.fts_indexes.is_empty() { + api_bail!("FTS indexes are not supported for Kuzu target"); + } fn to_dep_table( field_mapping: &AnalyzedGraphElementFieldMapping, ) -> Result { diff --git a/rust/cocoindex/src/ops/targets/neo4j.rs b/rust/cocoindex/src/ops/targets/neo4j.rs index ce992a818..ad2f54b45 100644 --- a/rust/cocoindex/src/ops/targets/neo4j.rs +++ b/rust/cocoindex/src/ops/targets/neo4j.rs @@ -557,6 +557,9 @@ impl SetupState { .iter() .map(|f| (f.name.as_str(), &f.value_type.typ)) .collect::>(); + if !index_options.fts_indexes.is_empty() { + api_bail!("FTS indexes are not supported for Neo4j target"); + } for index_def in index_options.vector_indexes.iter() { sub_components.push(ComponentState { object_label: schema.elem_type.clone(), diff --git a/rust/cocoindex/src/ops/targets/postgres.rs b/rust/cocoindex/src/ops/targets/postgres.rs index ae808361d..f37198b98 100644 --- a/rust/cocoindex/src/ops/targets/postgres.rs +++ b/rust/cocoindex/src/ops/targets/postgres.rs @@ -384,6 +384,9 @@ impl SetupState { index_options: &IndexOptions, column_options: &HashMap, ) -> Result { + if !index_options.fts_indexes.is_empty() { + api_bail!("FTS indexes are not supported for Postgres target"); + } Ok(Self { columns: TableColumnsSchema { key_columns: key_fields_schema diff --git a/rust/cocoindex/src/ops/targets/qdrant.rs b/rust/cocoindex/src/ops/targets/qdrant.rs index cdb0e2f43..fb62d2221 100644 --- a/rust/cocoindex/src/ops/targets/qdrant.rs +++ b/rust/cocoindex/src/ops/targets/qdrant.rs @@ -409,6 +409,9 @@ impl TargetFactoryBase for Factory { }); } + if !d.index_options.fts_indexes.is_empty() { + api_bail!("FTS indexes are not supported for Qdrant target"); + } let mut specified_vector_fields = HashSet::new(); for vector_index in d.index_options.vector_indexes { match vector_def.get_mut(&vector_index.field_name) {