# Build a Custom Vector Database Operator

This tutorial demonstrates how to build a custom Vector Database (VDB) operator for NV-Ingest using the abstract `VDB` class. We implement a complete OpenSearch operator as an example.

**Important:** NVIDIA makes no claim about accuracy, performance, or functionality of any vector database except Milvus. If you use a different vector database, it's your responsibility to test and maintain it.

## Overview

NV-Ingest provides an abstract base class `VDB` located in `client/src/nv_ingest_client/util/vdb/adt_vdb.py` that defines the interface for vector database operations. By inheriting from this class and implementing its abstract methods, you can create custom operators for any vector database.

In this tutorial, we build an OpenSearch operator. OpenSearch is an open-source, distributed search and analytics engine derived from Elasticsearch, maintained by the OpenSearch Software Foundation since 2021.

## Prerequisites

Before implementing your custom VDB operator, you must understand how to interact with your chosen vector database. For OpenSearch, you should be familiar with:

### Essential Knowledge Requirements

1. **Python Client Integration**: How to interact with the database using Python
2. **Database Deployment**: How to run and manage the database instance
3. **Connection Management**: How to establish and maintain database connections
4. **Index Operations**: How to create, manage, and configure indexes
5. **Data Ingestion**: How to load documents and records into indexes
6. **Query Operations**: How to retrieve relevant documents using vector similarity search

For OpenSearch, comprehensive documentation is available at [https://docs.opensearch.org/docs/latest/](https://docs.opensearch.org/docs/latest/).

**Note**: This tutorial assumes you have already configured and deployed OpenSearch in your environment.

## Step 1: Database Configuration

### Docker Compose Setup

To integrate OpenSearch with the NV-Ingest ecosystem, add the following configuration to your `docker-compose.yml` file:

```yaml
opensearch:
  image: opensearchproject/opensearch:3.1.0
  ports:
    - "9200:9200"
    - "9300:9300"
  environment:
    - cluster.name=opensearch-cluster
    - node.name=opensearch-node
    - discovery.type=single-node
    - bootstrap.memory_lock=true
    - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=10g"
    - OPENSEARCH_INITIAL_ADMIN_PASSWORD="myStrongPassword123@456"
    - DISABLE_SECURITY_PLUGIN=true
  volumes:
    - ./.volumes/opensearch:/usr/share/opensearch/data
  healthcheck:
    test: [ "CMD", "curl", "-f", "http://localhost:9200" ]
    interval: 30s
    timeout: 20s
    retries: 3
  profiles:
    - retrieval

opensearch-dashboards:
  image: opensearchproject/opensearch-dashboards:latest
  container_name: opensearch-dashboards
  ports:
    - "5601:5601"
  environment:
    OPENSEARCH_HOSTS: '["http://opensearch:9200"]'
    DISABLE_SECURITY_DASHBOARDS_PLUGIN: 'true'
  depends_on:
    - opensearch
```

### Starting the Services

After adding the configuration, start the OpenSearch services using:

```bash
docker compose up -d
```

This will launch both OpenSearch and OpenSearch Dashboards containers.

## Step 2: Understanding the VDB Abstract Class

The `VDB` abstract class provides a lightweight interface that defines the essential operations for vector database integration. It contains the following abstract methods:

- `__init__(**kwargs)`: Initialize the operator with configuration parameters
- `create_index(**kwargs)`: Create or configure database indexes
- `write_to_index(records: list, **kwargs)`: Write records to the database
- `retrieval(queries: list, **kwargs)`: Perform similarity search queries
- `run(records)`: Main entry point for the NV-Ingest pipeline
- `reindex(records: list, **kwargs)`: Rebuild indexes with new data (optional)

The design philosophy emphasizes flexibility - each method accepts `**kwargs` to allow developers to implement custom parameter handling according to their specific requirements.

## Step 3: Creating the Custom Operator Class

Begin by creating a new class that inherits from the `VDB` abstract class. This establishes the foundation for your custom operator:

```python
from nv_ingest_client.util.vdb.adt_vdb import VDB

class OpenSearch(VDB):
    def __init__(self, **kwargs):
        pass

    def create_index(self, **kwargs):
        pass

    def write_to_index(self, records: list, **kwargs):
        pass

    def retrieval(self, queries: list, **kwargs):
        pass

    def reindex(self, records: list, **kwargs):
        pass

    def run(self, records):
        pass
```

This skeleton class provides the structure for implementing each required method. The `run` method serves as the primary entry point for NV-Ingest integration, but we will implement it last after completing the core functionality.

In [None]:

from nv_ingest_client.util.vdb.adt_vdb import VDB



class OpenSearch(VDB):

    def __init__(self, **kwargs):
        pass

    def create_index(self, **kwargs):
        pass

    def write_to_index(self, records: list, **kwargs):
        pass

    def retrieval(self, queries: list, **kwargs):
        pass

    def reindex(self, records: list, **kwargs):
        pass

    def run(self, records):
        pass


## Step 4: Implementing the Constructor

The `__init__` method is responsible for:
1. Extracting and storing configuration parameters
2. Setting default values for optional parameters
3. Initializing the database client connection

### Configuration Parameters

Define the parameters your operator will accept, including connection details, index configuration, and content type filters:

```python
def __init__(self, **kwargs):
    # Connection parameters
    self.host = kwargs.get("host", "localhost")
    self.port = kwargs.get("port", 9200)
    self.username = kwargs.get("username", "admin")
    self.password = kwargs.get("password", "admin")
    self.use_ssl = kwargs.get("use_ssl", False)
    self.verify_certs = kwargs.get("verify_certs", False)
    self.http_compress = kwargs.get("http_compress", False)
    
    # Index configuration
    self.dense_dim = kwargs.get("dense_dim", 2048)
    self.index_name = kwargs.get("index_name", "nv_ingest_test")
    
    # Content type filters
    self.enable_text = kwargs.get("enable_text", True)
    self.enable_charts = kwargs.get("enable_charts", True)
    self.enable_tables = kwargs.get("enable_tables", True)
    self.enable_images = kwargs.get("enable_images", True)
    self.enable_infographics = kwargs.get("enable_infographics", True)
    self.enable_audio = kwargs.get("enable_audio", True)
    
    # Initialize parent class
    super().__init__(**kwargs)
    
    # Initialize OpenSearch client
    self.client = opensearch.OpenSearch(
        hosts=[{"host": self.host, "port": self.port}],
        http_compress=self.http_compress,
        use_ssl=self.use_ssl,
        verify_certs=self.verify_certs,
    )
```

This implementation provides a robust foundation with sensible defaults while allowing customization through keyword arguments.

In [None]:
   def __init__(self, **kwargs):
        self.host = kwargs.get("host", "localhost")
        self.port = kwargs.get("port", 9200)
        self.username = kwargs.get("username", "admin")
        self.password = kwargs.get("password", "admin")
        self.use_ssl = kwargs.get("use_ssl", False)
        self.verify_certs = kwargs.get("verify_certs", False)
        self.http_compress = kwargs.get("http_compress", False)
        self.dense_dim = kwargs.get("dense_dim", 2048)
        self.index_name = kwargs.get("index_name", "nv_ingest_test")
        self.enable_text = kwargs.get("enable_text", True)
        self.enable_charts = kwargs.get("enable_charts", True)
        self.enable_tables = kwargs.get("enable_tables", True)
        self.enable_images = kwargs.get("enable_images", True)
        self.enable_infographics = kwargs.get("enable_infographics", True)
        self.enable_audio = kwargs.get("enable_audio", True)
        super().__init__(**kwargs)

        self.client = opensearch.OpenSearch(
            hosts=[{"host": self.host, "port": self.port}],
            http_compress=self.http_compress,
            # http_auth=(self.username, self.password),
            use_ssl=self.use_ssl,
            verify_certs=self.verify_certs,
        )

## Step 5: Implementing Index Creation

The `create_index` method handles the creation and configuration of database indexes. For OpenSearch, this involves setting up a k-NN index optimized for vector similarity search.

### Key Implementation Considerations

1. **Index Existence Check**: Verify if the index already exists before creation
2. **Recreation Logic**: Provide option to recreate indexes when needed
3. **Index Configuration**: Define appropriate mappings and settings for vector search

```python
def create_index(self, **kwargs):
    recreate = kwargs.get("recreate", False)
    exists = self.client.indices.exists(index=f"{self.index_name}_dense")
    
    # Handle index recreation if requested
    if recreate and exists:
        self.client.indices.delete(index=f"{self.index_name}_dense")
        exists = False
    
    # Create index if it doesn't exist
    if not exists:
        index_body = {
            "settings": {
                "index.knn": True,
                "index.knn.algo_param.ef_search": 100,
            },
            "mappings": {
                "properties": {
                    "dense": {
                        "type": "knn_vector",
                        "dimension": self.dense_dim,
                        "method": {
                            "name": "hnsw",
                            "engine": "faiss",
                            "space_type": "l2",
                            "parameters": {"m": 16, "ef_construction": 100},
                        },
                    },
                    "text": {"type": "text"},
                    "id": {"type": "keyword"},
                    "metadata": {"type": "object"},
                }
            },
        }
        
        self.client.indices.create(index=f"{self.index_name}_dense", body=index_body)
```

This implementation creates a k-NN index with HNSW (Hierarchical Navigable Small World) algorithm for efficient approximate nearest neighbor search.

In [None]:
    def create_index(self, **kwargs):
        recreate = kwargs.get("recreate", False)
        exists = self.client.indices.exists(index=f"{self.index_name}_dense")
        if recreate and exists:
            self.client.indices.delete(index=f"{self.index_name}_dense")
            exists = False
        if not exists:
            index_body = {
                "settings": {
                    "index.knn": True,
                    "index.knn.algo_param.ef_search": 100,
                },
                "mappings": {
                    "properties": {
                        "dense": {
                            "type": "knn_vector",
                            "dimension": self.dense_dim,
                            "method": {
                                "name": "hnsw",
                                "engine": "faiss",
                                "space_type": "l2",
                                "parameters": {"m": 16, "ef_construction": 100},
                            },
                        },
                        "text": {"type": "text"},
                        "id": {"type": "keyword"},
                        "metadata": {"type": "object"},
                    }
                },
            }

            self.client.indices.create(index=f"{self.index_name}_dense", body=index_body)

## Step 6: Implementing Data Ingestion

The `write_to_index` method processes and stores records in the vector database. This involves:

1. **Record Transformation**: Converting NV-Ingest records to database-compatible format
2. **Content Filtering**: Applying content type filters based on configuration
3. **Data Validation**: Ensuring records contain required fields (embeddings, text)
4. **Batch Processing**: Efficiently writing multiple records to the database

### Core Implementation

```python
def write_to_index(self, records: list, **kwargs):
    for record_set in records:
        for record in record_set:
            transform_record = self.transform_record(record)
            if transform_record:
                self.client.index(index=f"{self.index_name}_dense", body=transform_record, id=count)
```

### Record Transformation Helper

The `transform_record` method extracts and validates the necessary data from NV-Ingest records:

```python
def transform_record(self, record: dict):
    text = _pull_text(
        record,
        self.enable_text,
        self.enable_charts,
        self.enable_tables,
        self.enable_images,
        self.enable_infographics,
        self.enable_audio,
    )
    if text:
        return {
            "dense": record["metadata"]["embedding"],
            "text": text,
            "metadata": record["metadata"]["content_metadata"],
        }
    else:
        return None
```

### Content Extraction and Validation

The `_pull_text` function handles content extraction from different document types:

```python
def _pull_text(
    element,
    enable_text: bool,
    enable_charts: bool,
    enable_tables: bool,
    enable_images: bool,
    enable_infographics: bool,
    enable_audio: bool,
):
    text = None
    
    # Extract text based on document type and enabled content types
    if element["document_type"] == "text" and enable_text:
        text = element["metadata"]["content"]
    elif element["document_type"] == "structured":
        text = element["metadata"]["table_metadata"]["table_content"]
        if element["metadata"]["content_metadata"]["subtype"] == "chart" and not enable_charts:
            text = None
        elif element["metadata"]["content_metadata"]["subtype"] == "table" and not enable_tables:
            text = None
        elif element["metadata"]["content_metadata"]["subtype"] == "infographic" and not enable_infographics:
            text = None
    elif element["document_type"] == "image" and enable_images:
        text = element["metadata"]["image_metadata"]["caption"]
    elif element["document_type"] == "audio" and enable_audio:
        text = element["metadata"]["audio_metadata"]["audio_transcript"]
    
    # Validate embedding and text requirements
    verify_emb = verify_embedding(element)
    if not text or not verify_emb:
        source_name = element["metadata"]["source_metadata"]["source_name"]
        pg_num = element["metadata"]["content_metadata"].get("page_number", None)
        doc_type = element["document_type"]
        if not verify_emb:
            logger.debug(f"failed to find embedding for entity: {source_name} page: {pg_num} type: {doc_type}")
        if not text:
            logger.debug(f"failed to find text for entity: {source_name} page: {pg_num} type: {doc_type}")
        text = None
    
    # Handle text length limitations
    if text and len(text) > 65535:
        logger.warning(
            f"Text is too long, skipping. It is advised to use SplitTask, to make smaller chunk sizes."
            f"text_length: {len(text)}, file_name: {element['metadata']['source_metadata'].get('source_name', None)} "
            f"page_number: {element['metadata']['content_metadata'].get('page_number', None)}"
        )
        text = None
    
    return text
```

This implementation ensures data quality by validating embeddings, filtering content types, and handling text length limitations.

## Step 7: Implementing the Main Entry Point

The `run` method serves as the primary interface between NV-Ingest and your custom operator. It orchestrates the index creation and data ingestion processes.

### Implementation

```python
def run(self, records):
    self.create_index()
    self.write_to_index(records)
```

This method is called by the NV-Ingest Ingestor class during the ingestion pipeline. For more information on how operators are integrated into NV-Ingest, refer to the [interface implementation](https://github.com/NVIDIA/nv-ingest/blob/release/25.6.2/client/src/nv_ingest_client/client/interface.py#L324).

The simplicity of this method belies its importance - it ensures that indexes are properly configured before data ingestion begins.

In [None]:
def write_to_index(self, records: list, **kwargs):
    for record_set in records:
        for record in record_set:
            transform_record = self.transform_record(record)
            if transform_record:
                self.client.index(index=f"{self.index_name}_dense", body=transform_record, id=count)

def transform_record(self, record: dict):
    text = _pull_text(
        record,
        self.enable_text,
        self.enable_charts,
        self.enable_tables,
        self.enable_images,
        self.enable_infographics,
        self.enable_audio,
    )
    if text:
        return {
            "dense": record["metadata"]["embedding"],
            "text": text,
            "metadata": record["metadata"]["content_metadata"],
        }
    else:
        return None


def verify_embedding(element):
    if element["metadata"]["embedding"] is not None:
        return True
    return False


def _pull_text(
    element,
    enable_text: bool,
    enable_charts: bool,
    enable_tables: bool,
    enable_images: bool,
    enable_infographics: bool,
    enable_audio: bool,
):
    text = None
    if element["document_type"] == "text" and enable_text:
        text = element["metadata"]["content"]
    elif element["document_type"] == "structured":
        text = element["metadata"]["table_metadata"]["table_content"]
        if element["metadata"]["content_metadata"]["subtype"] == "chart" and not enable_charts:
            text = None
        elif element["metadata"]["content_metadata"]["subtype"] == "table" and not enable_tables:
            text = None
        elif element["metadata"]["content_metadata"]["subtype"] == "infographic" and not enable_infographics:
            text = None
    elif element["document_type"] == "image" and enable_images:
        text = element["metadata"]["image_metadata"]["caption"]
    elif element["document_type"] == "audio" and enable_audio:
        text = element["metadata"]["audio_metadata"]["audio_transcript"]
    verify_emb = verify_embedding(element)
    if not text or not verify_emb:
        source_name = element["metadata"]["source_metadata"]["source_name"]
        pg_num = element["metadata"]["content_metadata"].get("page_number", None)
        doc_type = element["document_type"]
        if not verify_emb:
            logger.debug(f"failed to find embedding for entity: {source_name} page: {pg_num} type: {doc_type}")
        if not text:
            logger.debug(f"failed to find text for entity: {source_name} page: {pg_num} type: {doc_type}")
        # if we do find text but no embedding remove anyway
        text = None
    if text and len(text) > 65535:
        logger.warning(
            f"Text is too long, skipping. It is advised to use SplitTask, to make smaller chunk sizes."
            f"text_length: {len(text)}, file_name: {element['metadata']['source_metadata'].get('source_name', None)} "
            f"page_number: {element['metadata']['content_metadata'].get('page_number', None)}"
        )
        text = None
    return text




## Step 8: Implementing Vector Search Retrieval

The `retrieval` method enables similarity search functionality by:

1. **Query Embedding**: Converting text queries to vector embeddings
2. **Vector Search**: Performing k-NN search against stored vectors
3. **Result Processing**: Formatting and cleaning search results
4. **Response Optimization**: Removing unnecessary data from responses

### Implementation

```python
def retrieval(self, queries: list, **kwargs):
    client_config = ClientConfigSchema()
    index_name = kwargs.get("index_name", f"{self.index_name}_dense")
    top_k = kwargs.get("top_k", 10)
    nvidia_api_key = kwargs.get("nvidia_api_key" or client_config.nvidia_api_key)
    embedding_endpoint = kwargs.get("embedding_endpoint", client_config.embedding_nim_endpoint)
    model_name = kwargs.get("model_name", client_config.embedding_nim_model_name)
    
    from llama_index.embeddings.nvidia import NVIDIAEmbedding
    
    # Initialize embedding model for query vectorization
    dense_model = NVIDIAEmbedding(
        base_url=embedding_endpoint, 
        model=model_name, 
        nvidia_api_key=nvidia_api_key
    )
    
    results = []
    for query in queries:
        # Generate query embedding
        embedding = dense_model.get_query_embedding(query)
        
        # Construct k-NN query
        query_body = {
            "query": {
                "knn": {
                    "dense": {
                        "vector": embedding,
                        "k": top_k,
                    },
                },
            }
        }
        
        # Execute search and process results
        response = [
            hit["_source"]
            for hit in self.client.search(index=f"{index_name}_dense", body=query_body)["hits"]["hits"]
        ]
        
        # Remove dense embeddings from response to reduce payload size
        for res in response:
            res.pop("dense")
        
        results.append(response)
    
    return results
```

### Key Features

- **Configurable Search Parameters**: Supports custom `top_k` values and index names
- **NVIDIA Embedding Integration**: Uses NVIDIA's embedding models for query vectorization
- **Result Optimization**: Removes vector embeddings from responses to reduce payload size
- **Batch Query Support**: Processes multiple queries efficiently

This implementation provides a complete vector search solution that integrates seamlessly with NV-Ingest's retrieval pipeline.

In [None]:

    def run(self, records):
        self.create_index()
        self.write_to_index(records)

As you can see above, the `run` method is very simple. It first creates an index using the arguments that the VDB Operator was instantiated with, and then right the records to that index. The only functionality left to define is the retrieval. So the `retrieval` method has one parameter, `queries` which represents the list of queries we want to retrieve results for from the vector database(VDB). So in this function we include all the logic required to submit the queries to the VDB and and then format the responses correctly. In this operator, we ensure that we remove unnecessary elements from the response, the dense embedding. 

In [None]:
    def retrieval(self, queries: list, **kwargs):
        client_config = ClientConfigSchema()
        index_name = kwargs.get("index_name", f"{self.index_name}_dense")
        top_k = kwargs.get("top_k", 10)
        nvidia_api_key = kwargs.get("nvidia_api_key" or client_config.nvidia_api_key)
        # required for NVIDIAEmbedding call if the endpoint is Nvidia build api.
        embedding_endpoint = kwargs.get("embedding_endpoint", client_config.embedding_nim_endpoint)
        model_name = kwargs.get("model_name", client_config.embedding_nim_model_name)
        from llama_index.embeddings.nvidia import NVIDIAEmbedding

        dense_model = NVIDIAEmbedding(base_url=embedding_endpoint, model=model_name, nvidia_api_key=nvidia_api_key)
        results = []
        for query in queries:
            embedding = dense_model.get_query_embedding(query)

            query_body = {
                "query": {
                    "knn": {
                        "dense": {
                            "vector": embedding,
                            "k": top_k,
                        },
                    },
                }
            }
            response = [
                hit["_source"]
                for hit in self.client.search(index=f"{index_name}_dense", body=query_body)["hits"]["hits"]
            ]
            for res in response:
                res.pop("dense")
            results.append(response)
        return results


## Conclusion

Congratulations! You have successfully learned how to create a custom Vector Database operator for NV-Ingest using the abstract `VDB` class. This tutorial has demonstrated the complete implementation of an OpenSearch operator, showcasing all the essential components required for vector database integration.

### Production-Ready Implementation Available

The OpenSearch operator demonstrated in this tutorial has been fully implemented and is available for immediate use in your NV-Ingest projects. You can find the complete, production-ready implementation at:

**`client/src/nv_ingest_client/util/vdb/opensearch.py`**

This implementation includes all the features covered in this tutorial:

- ✅ Complete OpenSearch integration with k-NN vector search
- ✅ Configurable connection parameters and index settings
- ✅ Robust data validation and content filtering
- ✅ Efficient batch processing and error handling
- ✅ NVIDIA embedding model integration for query vectorization
- ✅ Optimized response formatting and payload management

### Getting Started with the OpenSearch Operator

To use the pre-built OpenSearch operator in your NV-Ingest pipeline:

```python
from nv_ingest_client.util.vdb.opensearch import OpenSearch

# Initialize the operator with your configuration
opensearch_vdb = OpenSearch(
    host="localhost",
    port=9200,
    index_name="my_custom_index",
    dense_dim=2048
)

# Use in your NV-Ingest pipeline
ingestor = ingestor.vdb_upload(vdb_op=opensearch_vdb)
```

### Next Steps

1. **Explore the Implementation**: Review the complete source code in `opensearch.py` to understand advanced features and optimizations
2. **Customize for Your Needs**: Modify the operator parameters to match your specific requirements
3. **Deploy in Production**: Integrate the operator into your NV-Ingest pipeline for production use
4. **Build Your Own**: Use this tutorial as a template to create operators for other vector databases

For additional support and advanced usage patterns, refer to the [NV-Ingest documentation](https://github.com/NVIDIA/nv-ingest) and the OpenSearch operator source code.