# Vector Search Endpoint and Index Creation

In this notebook, we will create a Databricks Vector Search endpoint and index to enable semantic search over our document collection.

## What this notebook does:
1. Creates a Vector Search endpoint (compute infrastructure for vector operations)
2. Enables Change Data Feed on our source table for incremental updates
3. Creates a Delta Sync Vector Search Index that automatically embeds and indexes our documents

This index will be used by our RAG chain to retrieve relevant document chunks.

## Install Required Packages

Install the necessary libraries:
- `databricks-vectorsearch`: Client library for managing Vector Search endpoints and indexes
- `langchain`: Framework for building applications with LLMs (used for integration)

In [0]:
%pip install --quiet -U langchain==0.3.25 databricks-vectorsearch==0.60 


In [0]:
# Restart Python kernel to load newly installed packages
dbutils.library.restartPython()

## Load Unity Catalog Configuration

Load the catalog and schema configuration to ensure we're working in the correct namespace.
This configuration is shared across all notebooks in the project for consistency.

In [0]:
%run "../_config/config_unity_catalog"

## 1- Helper Functions for Vector Search Management

These utility functions help us:
- Check if endpoints and indexes already exist (avoid recreation)
- Wait for asynchronous operations to complete
- Handle rate limiting and error conditions gracefully
- Provide clear status updates during long-running operations

Vector Search endpoints and indexes are created asynchronously, so we need polling logic to wait for them to be ready.

In [0]:
import time

def endpoint_exists(vsc, vs_endpoint_name):
    """
    Check if a Vector Search endpoint already exists.
    
    This prevents attempting to recreate an existing endpoint, which would fail.
    
    Args:
        vsc: VectorSearchClient instance
        vs_endpoint_name: Name of the endpoint to check
        
    Returns:
        bool: True if endpoint exists, False otherwise
    """
    try:
        return vs_endpoint_name in [e['name'] for e in vsc.list_endpoints().get('endpoints', [])]
    except Exception as e:
        # Handle temporary rate limiting issues
        if "REQUEST_LIMIT_EXCEEDED" in str(e):
            print("WARN: couldn't get endpoint status due to REQUEST_LIMIT_EXCEEDED error. The demo will assume it exists")
            return True
        else:
            raise e

def wait_for_vs_endpoint_to_be_ready(vsc, vs_endpoint_name, n_attempts=10):
    """
    Wait for a Vector Search endpoint to reach ONLINE status.
    
    Endpoint creation is asynchronous and can take several minutes.
    This function polls the endpoint status until it's ready or times out.
    
    Args:
        vsc: VectorSearchClient instance
        vs_endpoint_name: Name of the endpoint
        n_attempts: Maximum number of polling attempts (default: 10)
        
    Returns:
        dict: Endpoint details when ready
        
    Raises:
        Exception: If endpoint fails to become ready within timeout
    """
    sleep_time = 100  # Wait 100 seconds between checks

    for i in range(n_attempts):
        try:
            endpoint = vsc.get_endpoint(vs_endpoint_name)
        except Exception as e:
            # Handle temporary rate limiting issues
            if "REQUEST_LIMIT_EXCEEDED" in str(e):
                print("WARN: couldn't get endpoint status due to REQUEST_LIMIT_EXCEEDED error. Please manually check your endpoint status")
                return
            else:
                raise e
        
        # Get the current status of the endpoint
        status = endpoint.get("endpoint_status", endpoint.get("status"))["state"].upper()
        
        if "ONLINE" in status:
            return endpoint
        elif "PROVISIONING" in status or i < 6:
            # Print status updates periodically
            if i % 20 == 0: 
                print(f"Waiting for endpoint to be ready, this can take a few min... {endpoint}")
            time.sleep(sleep_time)
        else:
            raise Exception(f'''Error with the endpoint {vs_endpoint_name}. - this shouldn't happen: {endpoint}.\n Please delete it and re-run the previous cell: vsc.delete_endpoint("{vs_endpoint_name}")''')

    raise Exception(f"Timeout, your endpoint isn't ready yet: {vsc.get_endpoint(vs_endpoint_name)}")

In [0]:
def index_exists(vsc, vs_endpoint_name, vs_index_full_name):
    """
    Check if a Vector Search index already exists.
    
    Args:
        vsc: VectorSearchClient instance
        vs_endpoint_name: Name of the endpoint hosting the index
        vs_index_full_name: Full name of the index (catalog.schema.index_name)
        
    Returns:
        bool: True if index exists, False otherwise
    """
    try:
        vsc.get_index(vs_endpoint_name, vs_index_full_name).describe()
        return True
    except Exception as e:
        if 'RESOURCE_DOES_NOT_EXIST' not in str(e):
            print(f'Unexpected error describing the index. This could be a permission issue.')
            raise e
    return False
    
def wait_for_index_to_be_ready(vsc, vs_endpoint_name, vs_index_name, n_attempts=10):
    """
    Wait for a Vector Search index to reach ONLINE status.
    
    Index creation involves:
    - Setting up the DLT pipeline
    - Processing all source documents
    - Generating embeddings for each chunk
    - Building the vector index
    
    This can take several minutes for large document collections.
    
    Args:
        vsc: VectorSearchClient instance
        vs_endpoint_name: Name of the endpoint
        vs_index_name: Full name of the index
        n_attempts: Maximum number of polling attempts (default: 10)
        
    Raises:
        Exception: If index fails to become ready within timeout
    """
    for i in range(n_attempts):
        idx = vsc.get_index(vs_endpoint_name, vs_index_name).describe()
        index_status = idx.get('status', idx.get('index_status', {}))
        status = index_status.get('detailed_state', index_status.get('status', 'UNKNOWN')).upper()
        url = index_status.get('index_url', index_status.get('url', 'UNKNOWN'))
        
        if "ONLINE" in status:
            return
        if "UNKNOWN" in status:
            print(f"Can't get the status - will assume index is ready {idx} - url: {url}")
            return
        elif "PROVISIONING" in status:
            # Print status updates periodically
            if i % 40 == 0: 
                print(f"Waiting for index to be ready, this can take a few min... {index_status} - pipeline url:{url}")
            time.sleep(100)
        else:
            raise Exception(f'''Error with the index - this shouldn't happen. DLT pipeline might have been killed.\n Please delete it and re-run the previous cell: vsc.delete_index("{vs_index_name}", "{vs_endpoint_name}") \nIndex details: {idx}''')
    
    raise Exception(f"Timeout, your index isn't ready yet: {vsc.get_index(vs_index_name, vs_endpoint_name)}")

def wait_for_model_serving_endpoint_to_be_ready(ep_name):
    """
    Wait for a Model Serving endpoint to be ready.
    
    This function is used when deploying models to Databricks Model Serving.
    It polls the endpoint status until deployment is complete.
    
    Args:
        ep_name: Name of the serving endpoint
        
    Raises:
        Exception: If endpoint fails to become ready within timeout
    """
    from databricks.sdk import WorkspaceClient
    from databricks.sdk.service.serving import EndpointStateReady, EndpointStateConfigUpdate
    import time

    # Initialize Workspace Client
    w = WorkspaceClient()
    state = ""
    
    for i in range(200):
        state = w.serving_endpoints.get(ep_name).state
        if state.config_update == EndpointStateConfigUpdate.IN_PROGRESS:
            if i % 40 == 0:
                print(f"Waiting for endpoint to deploy {ep_name}. Current state: {state}")
            time.sleep(10)
        elif state.ready == EndpointStateReady.READY:
            print('Endpoint ready.')
            return
        else:
            break
    
    raise Exception(f"Couldn't start the endpoint, timeout, please check your endpoint for more details: {state}")

## 2- Initialize Vector Search Client and Configuration

Define the key configuration parameters:
- **VECTOR_SEARCH_ENDPOINT_NAME**: Name of the endpoint that will host our indexes
- **DOCUMENT_TABLE_NAME**: Source Delta table containing our document chunks

The VectorSearchClient handles all interactions with the Vector Search service.

In [0]:
from databricks.vector_search.client import VectorSearchClient

# Configuration
VECTOR_SEARCH_ENDPOINT_NAME = "pdf_document_vs_endpoint"
DOCUMENT_TABLE_NAME = "pdf_document_raw"  # 

# Initialize Vector Search Client
vsc = VectorSearchClient()


## 3- Create Vector Search Endpoint

A Vector Search endpoint provides the compute infrastructure for:
- Hosting vector indexes
- Processing search queries
- Managing embeddings

**Endpoint Types:**
- **STANDARD**: Suitable for most production workloads (used here)
- **OPTIMIZED**: For high-throughput, low-latency requirements

The endpoint is created only if it doesn't already exist, then we wait for it to be ONLINE.

In [0]:
# Create endpoint if it doesn't exist
if not endpoint_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME):
    vsc.create_endpoint(name=VECTOR_SEARCH_ENDPOINT_NAME, endpoint_type="STANDARD")

    # Wait for endpoint to be ready (can take several minutes)
    wait_for_vs_endpoint_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME)
    print(f"Endpoint named {VECTOR_SEARCH_ENDPOINT_NAME} is ready.")
else:
    print(f"Endpoint named {VECTOR_SEARCH_ENDPOINT_NAME} already exists.")

## 4- Enable Change Data Feed on Source Table

**Change Data Feed (CDF)** is a Delta Lake feature that tracks row-level changes to a table.

For Vector Search Delta Sync indexes, CDF is **required** because:
- It enables incremental updates to the index
- Only new/modified documents are re-embedded
- Deleted documents are removed from the index
- This makes index synchronization efficient

Without CDF, the entire table would need to be reprocessed on every sync.


In [0]:
# Enable Change Data Feed on the source table
spark.sql(
    f"ALTER TABLE {DOCUMENT_TABLE_NAME} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)"
)

## 5- Create Delta Sync Vector Search Index

This is the core step where we create the vector search index.

### What happens during index creation:
1. A DLT (Delta Live Tables) pipeline is automatically created
2. The pipeline reads data from the source Delta table
3. Each document chunk is sent to the embedding model endpoint
4. Embeddings are generated and stored in the vector index
5. The index is optimized for similarity search

### Key Parameters:
- **endpoint_name**: The Vector Search endpoint hosting this index
- **index_name**: Full name of the index (catalog.schema.index_name)
- **source_table_name**: Delta table containing the documents
- **pipeline_type**: "TRIGGERED" means manual sync, "CONTINUOUS" means automatic
- **primary_key**: Unique identifier column (typically "id")
- **embedding_source_column**: Column containing the text to embed
- **embedding_model_endpoint_name**: Databricks Foundation Model endpoint for embeddings

### Sync Behavior:
- If index exists: Triggers a sync to update with new data
- If index doesn't exist: Creates it and performs initial embedding

**Note**: Initial index creation can take 10-30 minutes depending on document volume.

In [0]:
from databricks.sdk import WorkspaceClient
import databricks.sdk.service.catalog as c

# Construct full table names
source_table_fullname = f"{catalog}.{schema}.{DOCUMENT_TABLE_NAME}"
vs_index_fullname = f"{catalog}.{schema}.{DOCUMENT_TABLE_NAME}_vs_index"

if not index_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname):
    print(f"Creating index {vs_index_fullname} on endpoint {VECTOR_SEARCH_ENDPOINT_NAME}...")
    try:
        vsc.create_delta_sync_index(
            endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME,
            index_name=vs_index_fullname,
            source_table_name=source_table_fullname,
            pipeline_type="TRIGGERED",  # Manual sync (use "CONTINUOUS" for auto-sync)
            primary_key="id",  # Unique identifier for each document chunk
            embedding_source_column='content',  # Column containing the text to embed
            embedding_model_endpoint_name='databricks-gte-large-en',  # Embedding model endpoint
            columns_to_sync=["content", "source_name"] 
        )
    except Exception as e:
        print(f"Error creating index: {e}")
        raise e
    
    # Wait for index to be ready and all embeddings to be created
    wait_for_index_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname)
else:
    # Index already exists - trigger a sync to update with any new data
    print(f"Index {vs_index_fullname} already exists. Triggering sync...")
    wait_for_index_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname)
    vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname).sync()

print(f"Index {vs_index_fullname} on table {source_table_fullname} is ready")

### Next Steps:
- Use this index in your RAG chain for document retrieval
- Test semantic search queries

### Important Resources:
- **Index Name**: `{vs_index_fullname}`
- **Endpoint Name**: `{VECTOR_SEARCH_ENDPOINT_NAME}`
- **Source Table**: `{source_table_fullname}`

These values will be used in the RAG chain configuration.