# PDF to Databricks Vector Search Pipeline

This notebook demonstrates how to:
1. Read a PDF document
2. Parse and chunk the content
3. Generate embeddings
4. Index into Databricks Vector Search


## 1. Setup and Imports


In [None]:
# Install required packages if needed (should be available in DBR 17.3 LTS)
%pip install pypdf langchain databricks_langchain databricks-vectorsearch mlflow
%restart_python


In [None]:
import os
from pathlib import Path
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType, StructType, StructField, ArrayType

# Langchain imports
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from databricks_langchain import DatabricksEmbeddings

# Databricks imports
from databricks.vector_search.client import VectorSearchClient
from databricks.sdk import WorkspaceClient

# MLflow imports
import mlflow

print(f"MLflow version: {mlflow.__version__}")


## 2. Configuration


In [None]:
# File configuration
PDF_PATH = "data/2025-annual-report.pdf"

# Chunking configuration
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200

# Databricks configuration
CATALOG_NAME = "brian_ml_dev"  # Update with your catalog name
SCHEMA_NAME = "eval_testing"  # Update with your schema name
TABLE_NAME = "annual_report_chunks"  # Table to store chunks
VECTOR_SEARCH_ENDPOINT = "one-env-shared-endpoint-13"  # Update with your endpoint name
VECTOR_INDEX_NAME = "annual_report_index"  # Name for the vector index

# Embedding model configuration
EMBEDDING_MODEL = "databricks-gte-large-en"  # Databricks Foundation Model for embeddings


## 3. Load and Parse PDF


In [None]:
# Load PDF document
print(f"Loading PDF from: {PDF_PATH}")
loader = PyPDFLoader(PDF_PATH)
documents = loader.load()

print(f"Loaded {len(documents)} pages from PDF")
print(f"\nFirst page preview (first 500 chars):\n{documents[0].page_content[:500]}...")


## 4. Chunk Documents


In [None]:
# Initialize text splitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=CHUNK_SIZE,
    chunk_overlap=CHUNK_OVERLAP,
    length_function=len,
    separators=["\n\n", "\n", ". ", " ", ""]
)

# Split documents into chunks
chunks = text_splitter.split_documents(documents)

print(f"Created {len(chunks)} chunks from the document")
print(f"\nFirst chunk preview:\n{chunks[0].page_content[:300]}...")
print(f"\nChunk metadata: {chunks[0].metadata}")


## 5. Prepare Data for Vector Search


In [None]:
# Convert chunks to a structured format
chunk_data = []
for idx, chunk in enumerate(chunks):
    chunk_data.append({
        "chunk_id": f"chunk_{idx}",
        "text": chunk.page_content,
        "page": chunk.metadata.get("page", -1),
        "source": chunk.metadata.get("source", PDF_PATH)
    })

# Create pandas DataFrame
chunks_df = pd.DataFrame(chunk_data)
print(f"Created DataFrame with {len(chunks_df)} chunks")
chunks_df.head()


## 6. Create Delta Table with Chunks

**Important:** The Delta table must have Change Data Feed (CDF) enabled for Databricks Vector Search to work. CDF allows the vector index to automatically sync when the source table is updated.


In [None]:
# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Convert to Spark DataFrame
spark_df = spark.createDataFrame(chunks_df)

# Define full table name
full_table_name = f"{CATALOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME}"

# Write to Delta table with Change Data Feed enabled (required for Vector Search)
print(f"Writing chunks to Delta table: {full_table_name}")
spark_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("delta.enableChangeDataFeed", "true") \
    .saveAsTable(full_table_name)

print(f"Successfully wrote {spark_df.count()} chunks to Delta table")

# Ensure Change Data Feed is enabled (required for Databricks Vector Search)
print("Enabling Change Data Feed on the table...")
spark.sql(f"ALTER TABLE {full_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
print("✓ Change Data Feed enabled")


In [None]:
# Verify table creation
display(spark.sql(f"SELECT * FROM {full_table_name} LIMIT 5"))


In [None]:
# Verify Change Data Feed is enabled
table_properties = spark.sql(f"SHOW TBLPROPERTIES {full_table_name}").collect()
cdf_enabled = any(row['key'] == 'delta.enableChangeDataFeed' and row['value'] == 'true' for row in table_properties)
print(f"Change Data Feed enabled: {cdf_enabled}")

if not cdf_enabled:
    print("⚠ Warning: Change Data Feed is not enabled. Vector Search will not work!")
    print("Run: spark.sql(f'ALTER TABLE {full_table_name} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)')")


## 7. Create Vector Search Index


In [None]:
# Initialize Vector Search client
vsc = VectorSearchClient()

print(f"Vector Search Client initialized")
print(f"\nAvailable endpoints:")
try:
    endpoints_response = vsc.list_endpoints()
    
    # Handle different response structures
    if hasattr(endpoints_response, 'get'):
        # If it's a dict-like object
        endpoints_list = endpoints_response.get('endpoints', [endpoints_response])
    elif hasattr(endpoints_response, '__iter__'):
        # If it's iterable
        endpoints_list = list(endpoints_response)
    else:
        endpoints_list = [endpoints_response]
    
    for endpoint in endpoints_list:
        endpoint_name = endpoint.get('name', 'Unknown') if isinstance(endpoint, dict) else getattr(endpoint, 'name', 'Unknown')
        endpoint_status = endpoint.get('endpoint_status', {}).get('state', 'Unknown') if isinstance(endpoint, dict) else 'Unknown'
        print(f"  - {endpoint_name} (Status: {endpoint_status})")
except Exception as e:
    print(f"Note: Could not list endpoints - {e}")


In [None]:
# Create or update vector search index
# Full index name
full_index_name = f"{CATALOG_NAME}.{SCHEMA_NAME}.{VECTOR_INDEX_NAME}"

print(f"Creating vector search index: {full_index_name}")
print(f"Source table: {full_table_name}")
print(f"Endpoint: {VECTOR_SEARCH_ENDPOINT}")
print(f"Embedding model: {EMBEDDING_MODEL}")

try:
    # Create Delta Sync Index (automatically managed embeddings)
    index = vsc.create_delta_sync_index(
        endpoint_name=VECTOR_SEARCH_ENDPOINT,
        index_name=full_index_name,
        source_table_name=full_table_name,
        pipeline_type="TRIGGERED",  # or "CONTINUOUS" for real-time updates
        primary_key="chunk_id",
        embedding_source_column="text",  # Column to generate embeddings from
        embedding_model_endpoint_name=EMBEDDING_MODEL
    )
    print(f"\n✓ Vector search index created successfully!")
    print(f"Index details: {index}")
except Exception as e:
    print(f"\nError creating index: {e}")
    print(f"\nIf the index already exists, you can update it or sync it manually.")


## 8. Wait for Index to be Ready

The vector index needs time to compute embeddings for all chunks. This can take several minutes depending on the size of your document.


In [None]:
import time

print("Waiting for index to be ready...")
max_wait_time = 600  # 10 minutes
wait_interval = 10  # Check every 10 seconds
elapsed_time = 0

while elapsed_time < max_wait_time:
    try:
        # Get the index object to check its status
        index_obj = vsc.get_index(
            endpoint_name=VECTOR_SEARCH_ENDPOINT,
            index_name=full_index_name
        )
        
        # Try to access status in different ways depending on the API version
        status = None
        if hasattr(index_obj, 'describe'):
            # Newer API - call describe() method
            index_info = index_obj.describe()
            if isinstance(index_info, dict):
                status = index_info.get('status', {}).get('detailed_state', 'UNKNOWN')
            else:
                status = getattr(getattr(index_info, 'status', {}), 'detailed_state', 'UNKNOWN')
        elif hasattr(index_obj, 'status'):
            # Direct status attribute
            status_obj = index_obj.status
            if isinstance(status_obj, dict):
                status = status_obj.get('detailed_state', 'UNKNOWN')
            else:
                status = getattr(status_obj, 'detailed_state', 'UNKNOWN')
        else:
            # Try to get it as a dictionary
            if isinstance(index_obj, dict):
                status = index_obj.get('status', {}).get('detailed_state', 'UNKNOWN')
        
        if status is None:
            status = 'UNKNOWN'
        
        print(f"[{elapsed_time}s] Index status: {status}")
        
        if status == "ONLINE" or "ONLINE_NO_PENDING_UPDATE":
            print(f"\n✓ Index is ONLINE and ready to use!")
            break
        elif status in ["FAILED", "ERROR"]:
            print(f"\n✗ Index creation failed with status: {status}")
            print(f"Details: {index_obj}")
            break
            
    except Exception as e:
        print(f"[{elapsed_time}s] Error checking index status: {e}")
        # Continue trying - index might not be available yet
    
    time.sleep(wait_interval)
    elapsed_time += wait_interval

if elapsed_time >= max_wait_time:
    print(f"\n⚠ Timeout: Index did not become ready within {max_wait_time} seconds")


### Alternative: Check Index Status Manually

If the automatic waiting fails, you can check the index status manually:


In [None]:
# Manual index status check
try:
    index_obj = vsc.get_index(
        endpoint_name=VECTOR_SEARCH_ENDPOINT,
        index_name=full_index_name
    )
    
    # Try calling describe if available
    if hasattr(index_obj, 'describe'):
        print(index_obj.describe())
    else:
        print(f"Index object: {index_obj}")
        print(f"\nIndex object type: {type(index_obj)}")
        print(f"\nIndex object attributes: {dir(index_obj)}")
except Exception as e:
    print(f"Error: {e}")
    import traceback
    traceback.print_exc()


## 9. Test Vector Search


In [None]:
# Test similarity search
test_query = "What are the key financial highlights from the annual report?"

print(f"Testing vector search with query: '{test_query}'")
print("\n" + "="*80)

try:
    # Get the index object
    index = vsc.get_index(
        endpoint_name=VECTOR_SEARCH_ENDPOINT,
        index_name=full_index_name
    )
    
    # Perform similarity search
    results = index.similarity_search(
        query_text=test_query,
        columns=["chunk_id", "text", "page", "source"],
        num_results=3
    )
    
    # Handle results - they might be in different formats
    if isinstance(results, dict):
        data_array = results.get('result', {}).get('data_array', [])
    else:
        data_array = getattr(results, 'data_array', [])
    
    print(f"\nTop {len(data_array)} relevant chunks:\n")
    
    for i, result in enumerate(data_array, 1):
        print(f"Result {i}:")
        print(f"  Chunk ID: {result[0]}")
        print(f"  Page: {result[2]}")
        print(f"  Text preview: {result[1][:200]}...")
        if len(result) > 4:
            print(f"  Score: {result[-1]}")
        print()
        
except Exception as e:
    print(f"Error during similarity search: {e}")
    print("\nMake sure the index is ONLINE before running similarity search.")
    import traceback
    traceback.print_exc()


## 10. Summary and Next Steps


In [None]:
print("="*80)
print("PIPELINE SUMMARY")
print("="*80)
print(f"\n✓ PDF loaded: {PDF_PATH}")
print(f"✓ Number of chunks created: {len(chunks)}")
print(f"✓ Delta table: {full_table_name}")
print(f"✓ Vector search index: {full_index_name}")
print(f"✓ Embedding model: {EMBEDDING_MODEL}")
print(f"\n" + "="*80)
print("\nNEXT STEPS:")
print("1. Use the vector search index for RAG applications")
print("2. Build LangChain/LangGraph agents with this knowledge base")
print("3. Log models with MLflow 3.4+ using Model As Code")
print("4. Monitor and evaluate your RAG pipeline")
print("="*80)
