# Unstructured data pipeline for the Agent's Retriever

By the end of this notebook, you will have transformed your unstructured documents into a vector index that can be queried by your Agent.

This means:
- Documents loaded into a delta table.
- Documents are chunked.
- Chunks have been embedded with an embedding model and stored in a vector index.

The important resulting artifact of this notebook is the chunked vector index. This will be used in the next notebook to power our Retriever.

### 👉 START HERE: How to Use This Notebook

Follow these steps to build and refine your data pipeline's quality:

1. **Build a v0 index with default settings**
    - Configure the data source and destination tables in the `1️⃣ 📂 Data source & destination configuration` cells
    - Press `Run All` to create the vector index.

    *Note: While you can adjust the other settings and modify the parsing/chunking code, we suggest doing so only after evaluating your Agent's quality so you can make improvements that specifically address root causes of quality issues.*

2. **Use later notebooks to integrate the retriever into an the agent and evaluate the agent/retriever's quality.**

3. **If the evaluation results show retrieval issues as a root cause, use this notebook to iterate on your data pipeline's code & config.** Below are some potential fixes you can try, see the AI Cookbook's [debugging retrieval issues](https://ai-cookbook.io/nbs/5-hands-on-improve-quality-step-1-retrieval.html) section for details.**
    - Add missing, but relevant source documents into in the index.
    - Resolve any conflicting information in source documents.
    - Adjust the data pipeline configuration:
      - Modify chunk size or overlap.
      - Experiment with different embedding models.
    - Adjust the data pipeline code:
      - Create a custom parser or use different parsing libraries.
      - Develop a custom chunker or use different chunking techniques.
      - Extract additional metadata for each document.
    - Adjust the Agent's code/config in subsequent notebooks:
      - Change the number of documents retrieved (K).
      - Try a re-ranker.
      - Use hybrid search.
      - Apply extracted metadata as filters.



**Important note:** Throughout this notebook, we indicate which cells you:
- ✅✏️ *should* customize - these cells contain code & config with business logic that you should edit to meet your requirements & tune quality
- 🚫✏️ *typically will not* customize - these cells contain boilerplate code required to execute the pipeline

*Cells that don't require customization still need to be run!  You CAN change these cells, but if this is the first time using this notebook, we suggest not doing so.*

### Install Python libraries

🚫✏️ Only modify if you need additional packages in your code changes to the document parsing or chunking logic.

Versions of Databricks code are not locked since Databricks ensures changes are backwards compatible.
Versions of open source packages are locked since package authors often make backwards compatible changes

In [1]:
# %pip install -qqqq -U -r requirements.txt
# %pip install -qqqq -U -r requirements_datapipeline.txt
# dbutils.library.restartPython()

If running from an IDE with [`databricks-connect`](https://docs.databricks.com/en/dev-tools/databricks-connect/python/index.html), connect to a Spark session.

In [2]:
from cookbook.databricks_utils import get_cluster_url
from cookbook.databricks_utils import get_active_cluster_id
from cookbook.databricks_utils.install_cluster_library import install_requirements
# cluster_id = get_active_cluster_id()
# print(f"Installing packages on the active cluster: {get_cluster_url(cluster_id)}")

# # TODO: build the utils wheel and install it 
# install_requirements(cluster_id, "requirements.txt")
# install_requirements(cluster_id, "requirements_datapipeline.txt")

# Get Spark session if using Databricks Connect from an IDE
from mlflow.utils import databricks_utils as du

if not du.is_in_databricks_notebook():
    from databricks.connect import DatabricksSession

    spark = DatabricksSession.builder.getOrCreate()


## 1️⃣ 📂 Data source & destination configuration

#### ✅✏️ Configure the data pipeline's source location.

Choose a [Unity Catalog Volume](https://docs.databricks.com/en/volumes/index.html) containing PDF, HTML, etc documents to be parsed/chunked/embedded.

- `uc_catalog_name`: Name of the Unity Catalog.
- `uc_schema_name`: Name of the Unity Catalog schema.
- `uc_volume_name`: Name of the Unity Catalog volume.

Running this cell with validate that the UC Volume exists, trying to create it if not.


In [None]:
from cookbook.config.data_pipeline.uc_volume_source import UCVolumeSourceConfig

# Configure the UC Volume that contains the source documents
source_config = UCVolumeSourceConfig(
    # uc_catalog_name="REPLACE_ME", # REPLACE_ME
    # uc_schema_name="REPLACE_ME", # REPLACE_ME
    # uc_volume_name=f"REPLACE_ME", # REPLACE_ME
    uc_catalog_name="ep", # REPLACE_ME
    uc_schema_name="cookbook_local_test", # REPLACE_ME
    uc_volume_name=f"product_docs", # REPLACE_ME
)

# Check if volume exists, create otherwise
is_valid, msg = source_config.create_or_validate_volume()
if not is_valid:
    raise Exception(msg)

#### ✅✏️ Configure the data pipeline's output location.
 
Choose where the data pipeline outputs the parsed, chunked, and embedded documents.

Required parameters:
* `uc_catalog_name`: Unity Catalog name where tables will be created
* `uc_schema_name`: Schema name within the catalog 
* `base_table_name`: Core name used as prefix for all generated tables
* `vector_search_endpoint`: Vector Search endpoint to store the index

Optional parameters:
* `docs_table_postfix`: Suffix for the parsed documents table (default: "docs")
* `chunked_table_postfix`: Suffix for the chunked documents table (default: "docs_chunked") 
* `vector_index_postfix`: Suffix for the vector index (default: "docs_chunked_index")
* `version_suffix`: Version identifier (e.g. 'v1', 'test') to maintain multiple versions

The generated tables follow this naming convention:
* Parsed docs: {uc_catalog_name}.{uc_schema_name}.{base_table_name}_{docs_table_postfix}__{version_suffix}
* Chunked docs: {uc_catalog_name}.{uc_schema_name}.{base_table_name}_{chunked_table_postfix}__{version_suffix}
* Vector index: {uc_catalog_name}.{uc_schema_name}.{base_table_name}_{vector_index_postfix}__{version_suffix}

*Note: If you are comparing different chunking/parsing/embedding strategies, set the `version_suffix` parameter to maintain multiple versions of the pipeline output with the same base_table_name.*

*Databricks suggests sharing a Vector Search endpoint across multiple agents.*

In [None]:
from cookbook.config.data_pipeline.data_pipeline_output import DataPipelineOuputConfig

# Output configuration
output_config = DataPipelineOuputConfig(
    # Required parameters
    uc_catalog_name=source_config.uc_catalog_name, # usually same as source volume catalog, by default is the same as the source volume catalog
    uc_schema_name=source_config.uc_schema_name, # usually same as source volume schema, by default is the same as the source volume schema
    base_table_name="abc_123", # usually similar / same as the source volume name; by default, is the same as the volume_name
    # vector_search_endpoint="REPLACE_ME", # Vector Search endpoint to store the index
    vector_search_endpoint="ericpeter_vector_search", # Vector Search endpoint to store the index

    # Optional parameters, showing defaults
    docs_table_postfix="docs",              # default value is `docs`
    chunked_table_postfix="docs_chunked",   # default value is `docs_chunked`
    vector_index_postfix="docs_chunked_index", # default value is `docs_chunked_index`
    version_suffix="v2"                     # default is None

    # Output tables / indexes follow this naming convention:
    # {uc_catalog_name}.{uc_schema_name}.{base_table_name}_{docs_table_postfix}__{version_suffix}
    # {uc_catalog_name}.{uc_schema_name}.{base_table_name}_{chunked_table_postfix}__{version_suffix}
    # {uc_catalog_name}.{uc_schema_name}.{base_table_name}_{vector_index_postfix}__{version_suffix}
)

# Alternatively, you can directly pass in the UC locations of the tables / indexes
# output_config = DataPipelineOuputConfig(
#     chunked_docs_table="catalog.schema.docs_chunked",
#     parsed_docs_table="catalog.schema.parsed_docs",
#     vector_index="catalog.schema.docs_chunked_index",
#     vector_search_endpoint="REPLACE_ME",
# )

# Check UC locations exist
is_valid, msg = output_config.validate_catalog_and_schema()
if not is_valid:
    raise Exception(msg)

# Check Vector Search endpoint exists
is_valid, msg = output_config.validate_vector_search_endpoint()
if not is_valid:
    raise Exception(msg)

#### ✅✏️ Configure chunk size and the embedding model.

**Chunk size and overlap** control how a larger document is turned into smaller chunks that can be processed by an embedding model.  See the AI Cookbook [chunking deep dive](https://ai-cookbook.io/nbs/3-deep-dive-data-pipeline.html#chunking) for more details.

**The embedding model** is an AI model that is used to identify the most similar documents to a given user's query.  See the AI Cookbook [embedding model deep dive](https://ai-cookbook.io/nbs/3-deep-dive-data-pipeline.html#embedding-model) for more details.

This notebook supports the following [Foundational Models](https://docs.databricks.com/en/machine-learning/foundation-models/index.html) or [External Model](https://docs.databricks.com/en/generative-ai/external-models/index.html) of type `/llm/v1/embeddings`/.  If you want to try another model, you will need to modify the `utils/get_recursive_character_text_splitter` Notebook to add support.
- `databricks-gte-large-en` or `databricks-bge-large-en`
- Azure OpenAI or OpenAI External Model of type `text-embedding-ada-002`, `text-embedding-3-small` or `text-embedding-3-large`

In [None]:
from cookbook.config.data_pipeline.recursive_text_splitter import RecursiveTextSplitterChunkingConfig

chunking_config = RecursiveTextSplitterChunkingConfig(
    embedding_model_endpoint="databricks-gte-large-en",  # A Model Serving endpoint supporting the /llm/v1/embeddings task
    chunk_size_tokens=1024,
    chunk_overlap_tokens=256,
)

# Validate the embedding endpoint & chunking config
is_valid, msg = chunking_config.validate_embedding_endpoint()
if not is_valid:
    raise Exception(msg)

is_valid, msg = chunking_config.validate_chunk_size_and_overlap()
if not is_valid:
    raise Exception(msg)

#### 🚫✏️ Write the data pipeline configuration to a YAML

This allows the configuration to be loaded referenced by the Agent's notebook.

In [6]:
from cookbook.config.data_pipeline import DataPipelineConfig
from cookbook.config import serializable_config_to_yaml_file

data_pipeline_config = DataPipelineConfig(
    source=source_config,
    output=output_config,
    chunking_config=chunking_config,
)

serializable_config_to_yaml_file(data_pipeline_config, "./configs/data_pipeline_config.yaml")

#### 🛑 If you are running your initial data pipeline, you do not need to configure anything else, you can just `Run All` the notebook cells before.  You can modify these cells later to tune the quality of your data pipeline by changing the parsing logic.

## 3️⃣ ⌨️ Data pipeline code

The code below executes the data pipeline.  You can modify the below code as indicated to implement different parsing or chunking strategies or to extract additional metadata fields

#### Pipeline step 1: Load & parse documents into a Delta Table

In this step, we'll load files from the UC Volume defined in `source_config` into the Delta Table `storage_config.parsed_docs_table` . The contents of each file will become a separate row in our delta table.

The path to the source document will be used as the `doc_uri` which is displayed to your end users in the Agent Evalution web application.

After you test your POC with stakeholders, you can return here to change the parsing logic or extraction additional metadata about the documents to help improve the quality of your retriever.

##### ✅✏️ Customize the parsing function

This default implementation parses PDF, HTML, and DOCX files using open source libraries.  Adjust `file_parser(...)` and `ParserReturnValue` in `cookbook/data_pipeline/default_parser.py` to add change the parsing logic, add support for more file types, or extract additional metadata about each document.

In [None]:
from cookbook.data_pipeline.default_parser import file_parser, ParserReturnValue

# Print the code of file_parser function for inspection
import inspect
print(inspect.getsource(ParserReturnValue))
print(inspect.getsource(file_parser))


The below cell is debugging code to test your parsing function on a single record. 

In [None]:
from cookbook.data_pipeline.parse_docs import load_files_to_df
from pyspark.sql import functions as F


raw_files_df = load_files_to_df(
    spark=spark,
    source_path=source_config.volume_path,
)

print(f"Loaded {raw_files_df.count()} files from {source_config.volume_path}.  Files: {source_config.list_files()}")

test_records_dict = raw_files_df.toPandas().to_dict(orient="records")

for record in test_records_dict:
  print()
  print("Testing parsing for file: ", record["path"])
  print()
  test_result = file_parser(raw_doc_contents_bytes=record['content'], doc_path=record['path'], modification_time=record['modificationTime'], doc_bytes_length=record['length'])
  print(test_result)
  break # pause after 1 file.  if you want to test more files, remove the break statement


🚫✏️ The below cell is boilerplate code to apply the parsing function using Spark.

In [None]:
from cookbook.data_pipeline.parse_docs import (
    load_files_to_df,
    apply_parsing_fn,
    check_parsed_df_for_errors,
    check_parsed_df_for_empty_parsed_files
)
from cookbook.data_pipeline.utils.typed_dicts_to_spark_schema import typed_dicts_to_spark_schema
from cookbook.databricks_utils import get_table_url

# Tune this parameter to optimize performance.  More partitions will improve performance, but may cause out of memory errors if your cluster is too small.
NUM_PARTITIONS = 50

# Load the UC Volume files into a Spark DataFrame
raw_files_df = load_files_to_df(
    spark=spark,
    source_path=source_config.volume_path,
).repartition(NUM_PARTITIONS)

# Apply the parsing UDF to the Spark DataFrame
parsed_files_df = apply_parsing_fn(
    raw_files_df=raw_files_df,
    # Modify this function to change the parser, extract additional metadata, etc
    parse_file_fn=file_parser,
    # The schema of the resulting Delta Table will follow the schema defined in ParserReturnValue
    parsed_df_schema=typed_dicts_to_spark_schema(ParserReturnValue),
)

# Write to a Delta Table
parsed_files_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(
    output_config.parsed_docs_table
)

# Get resulting table
parsed_files_df = spark.table(output_config.parsed_docs_table)
parsed_files_no_errors_df = parsed_files_df.filter(
    parsed_files_df.parser_status == "SUCCESS"
)

# Show successfully parsed documents
print(f"Parsed {parsed_files_df.count()} / {parsed_files_no_errors_df.count()} documents successfully.  Inspect `parsed_files_no_errors_df` or visit {get_table_url(output_config.parsed_docs_table)} to see all parsed documents, including any errors.")
display(parsed_files_no_errors_df.toPandas())

Show any parsing failures or successfully parsed files that resulted in an empty document.

In [None]:

# Any documents that failed to parse
is_error, msg, failed_docs_df = check_parsed_df_for_errors(parsed_files_df)
if is_error:
    display(failed_docs_df.toPandas())
    raise Exception(msg)
    
# Any documents that returned empty parsing results
is_error, msg, empty_docs_df = check_parsed_df_for_empty_parsed_files(parsed_files_df)
if is_error:
    display(empty_docs_df.toPandas())
    raise Exception(msg)

#### Pipeline step 2: Compute chunks of documents

In this step, we will split our documents into smaller chunks so they can be indexed in our vector database.


##### ✅✏️ Chunking logic.

We provide a default implementation of a recursive text splitter.  To create your own chunking logic, adapt the `get_recursive_character_text_splitter()` function inside `cookbook.data_pipeline.recursive_character_text_splitter.py`.

In [None]:
from cookbook.data_pipeline.recursive_character_text_splitter import (
    get_recursive_character_text_splitter,
)

# Get the chunking function
recursive_character_text_splitter_fn = get_recursive_character_text_splitter(
    model_serving_endpoint=chunking_config.embedding_model_endpoint,
    chunk_size_tokens=chunking_config.chunk_size_tokens,
    chunk_overlap_tokens=chunking_config.chunk_overlap_tokens,
)

# Determine which columns to propagate from the docs table to the chunks table.

# Get the columns from the parser except for the content
# You can modify this to adjust which fields are propagated from the docs table to the chunks table.
propagate_columns = [
    field.name
    for field in typed_dicts_to_spark_schema(ParserReturnValue).fields
    if field.name != "content"
]

# If you want to implement retrieval strategies such as presenting the entire document vs. the chunk to the LLM, include `contentich contains the doc's full parsed text.  By default this is not included because the size of contcontentquite large and cause performance issues.
# propagate_columns = [
#     field.name
#     for field in typed_dicts_to_spark_schema(ParserReturnValue).fields
# ]

🚫✏️ Run the chunking function within Spark

In [None]:
from cookbook.data_pipeline.chunk_docs import apply_chunking_fn
from cookbook.databricks_utils import get_table_url

# Tune this parameter to optimize performance.  More partitions will improve performance, but may cause out of memory errors if your cluster is too small.
NUM_PARTITIONS = 50

# Load parsed docs
parsed_files_df = spark.table(output_config.parsed_docs_table).repartition(NUM_PARTITIONS)

chunked_docs_df = chunked_docs_table = apply_chunking_fn(
    # The source documents table.
    parsed_docs_df=parsed_files_df,
    # The chunking function that takes a string (document) and returns a list of strings (chunks).
    chunking_fn=recursive_character_text_splitter_fn,
    # Choose which columns to propagate from the docs table to chunks table. `doc_uri` column is required we can propagate the original document URL to the Agent's web app.
    propagate_columns=propagate_columns,
)

# Write to Delta Table
chunked_docs_df.write.mode("overwrite").option(
    "overwriteSchema", "true"
).saveAsTable(output_config.chunked_docs_table)

# Get resulting table
chunked_docs_df = spark.table(output_config.chunked_docs_table)

# Show number of chunks created
print(f"Created {chunked_docs_df.count()} chunks.  Inspect `chunked_docs_df` or visit {get_table_url(output_config.chunked_docs_table)} to see the results.")

# enable CDC feed for VS index sync
cdc_results = spark.sql(f"ALTER TABLE {output_config.chunked_docs_table} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

# Show chunks
display(chunked_docs_df.toPandas())

#### 🚫✏️ Pipeline step 3: Create the vector index

In this step, we'll embed the documents to compute the vector index over the chunks and create our retriever index that will be used to query relevant documents to the user question.  The embedding pipeline is handled within Databricks Vector Search using [Delta Sync](https://docs.databricks.com/en/generative-ai/create-query-vector-search.html#create-a-vector-search-index)

In [None]:
from cookbook.data_pipeline.build_retriever_index import build_retriever_index
from cookbook.databricks_utils import get_table_url

is_error, msg = retriever_index_result = build_retriever_index(
    # Spark requires `` to escape names with special chars, VS client does not.
    chunked_docs_table_name=output_config.chunked_docs_table.replace("`", ""),
    vector_search_endpoint=output_config.vector_search_endpoint,
    vector_search_index_name=output_config.vector_index,

    # Must match the embedding endpoint you used to chunk your documents
    embedding_endpoint_name=chunking_config.embedding_model_endpoint,

    # Set to true to re-create the vector search endpoint when re-running the data pipeline.  If set to True, syncing will not work if re-run the pipeline and change the schema of chunked_docs_table_name.  Keeping this as False will allow Vector Search to avoid recomputing embeddings for any row with that has a chunk_id that was previously computed.
    force_delete_index_before_create=False,
)
if is_error:
    raise Exception(msg)
else:
    print("NOTE: This cell will complete before the vector index has finished syncing/embedding your chunks & is ready for queries!")
    print(f"View sync status here: {get_table_url(output_config.vector_index)}")


#### 🚫✏️ Print links to view the resulting tables/index

In [None]:
from cookbook.databricks_utils import get_table_url

print()
print(f"Parsed docs table: {get_table_url(output_config.parsed_docs_table)}\n")
print(f"Chunked docs table: {get_table_url(output_config.chunked_docs_table)}\n")
print(f"Vector search index: {get_table_url(output_config.vector_index)}\n")