
# 1/ Ingesting and preparing PDF for LLM and Self Managed Vector Search Embeddings

## In this example, we will focus on ingesting pdf documents as source for our retrieval process. 

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/rag-pdf-self-managed-0.png?raw=true" style="float: right; width: 600px; margin-left: 10px">


For this example, we will add Databricks ebook PDFs from [Databricks resources page](https://www.databricks.com/resources) to our knowledge database.

**Note: This demo is advanced content, we strongly recommend going over the simple version first to learn the basics.**

Here are all the detailed steps:

- Use autoloader to load the binary PDFs into our first table. 
- Use the `unstructured` library  to parse the text content of the PDFs.
- Use `llama_index` or `Langchain` to split the texts into chuncks.
- Compute embeddings for the chunks.
- Save our text chunks + embeddings in a Delta Lake table, ready for Vector Search indexing.


Lakehouse AI not only provides state of the art solutions to accelerate your AI and LLM projects, but also to accelerate data ingestion and preparation at scale, including unstructured data like PDFs.

<div style="background-color: #d4f8d4; border-radius: 15px; padding: 20px; text-align: center;">
        Note: Looking for a full, production-grade guide? Make sure you checkout <a target="_blank" href="https://ai-cookbook.io">Databricks ai-cookbook.ai </a>!
    </div>

<!-- Collect usage data (view). Remove it to disable collection or disable tracker during installation. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=data-science&org_id=341332174749405&notebook=%2F03-advanced-app%2F01-PDF-Advanced-Data-Preparation&demo_name=llm-rag-chatbot&event=VIEW&path=%2F_dbdemos%2Fdata-science%2Fllm-rag-chatbot%2F03-advanced-app%2F01-PDF-Advanced-Data-Preparation&version=1">

In [0]:
%pip install --quiet -U transformers==4.49.0 pypdf==4.1.0 langchain-text-splitters==0.2.0 databricks-vectorsearch==0.55 mlflow[databricks] tiktoken==0.7.0 torch==2.3.0 llama-index==0.10.43 markdownify==0.14.1
dbutils.library.restartPython()

In [0]:
%run ../_resources/00-init-advanced $reset_all_data=false

## Ingesting Databricks ebook PDFs and extracting their pages

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/rag-pdf-self-managed-1.png?raw=true" style="float: right" width="500px">

First, let's ingest our PDFs as a Delta Lake table with path urls and content in binary format. 

We'll use [Databricks Autoloader](https://docs.databricks.com/en/ingestion/auto-loader/index.html) to incrementally ingeset new files, making it easy to incrementally consume billions of files from the data lake in various data formats. Autoloader easily ingests our unstructured PDF data in binary format.


In [0]:
%sql
CREATE VOLUME IF NOT EXISTS volume_databricks_documentation;

In [0]:
# List our raw PDF docs
volume_folder =  f"/Volumes/{catalog}/{db}/volume_databricks_documentation"
# Let's upload some pdf files to our volume as example. Change this with your own PDFs / docs.
upload_pdfs_to_volume(volume_folder+"/databricks-pdf")

display(dbutils.fs.ls(volume_folder+"/databricks-pdf"))

In [0]:
df = (spark.readStream
        .format('cloudFiles')
        .option('cloudFiles.format', 'BINARYFILE')
        .option("pathGlobFilter", "*.pdf")
        .load('dbfs:'+volume_folder+"/databricks-pdf"))

# Write the data as a Delta table
(df.writeStream
  .trigger(availableNow=True)
  .option("checkpointLocation", f'dbfs:{volume_folder}/checkpoints/raw_docs')
  .table('pdf_raw').awaitTermination())

In [0]:
%sql SELECT * FROM pdf_raw LIMIT 2

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/rag-pdf-self-managed-2.png?raw=true" style="float: right" width="500px">

## Extracting our PDF content as text chunks

We need to convert the PDF documents bytes to text, and extract chunks from their content.

This part can be tricky as PDFs are hard to work with and can be saved as images, for which we'll need an OCR to extract the text.

Using the `Unstructured` library within a Spark UDF makes it easy to extract text. 

*Note: Your cluster will need a few extra libraries that you would typically install with a cluster init script.*

<br style="clear: both">

### Splitting our big documentation page in smaller chunks

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/chunk-window-size.png?raw=true" style="float: right" width="700px">

In this demo, some PDFs are very large, with a lot of text.

We'll extract the content and then use llama_index `SentenceSplitter`, and ensure that each chunk isn't bigger than 500 tokens. 


The chunk size and chunk overlap depend on the use case and the PDF files. 

Remember that your prompt+answer should stay below your model max window size (4096 for llama2). 

For more details, review the previous [../01-Data-Preparation](01-Data-Preparation) notebook. 

<br/>
<br style="clear: both">
<div style="background-color: #def2ff; padding: 15px;  border-radius: 30px; ">
  <strong>Information</strong><br/>
  Remember that the following steps are specific to your dataset. This is a critical part to building a successful RAG assistant.
  <br/> Always take time to review the chunks created and ensure they make sense and contain relevant information.
</div>

In [0]:
import warnings
from pypdf import PdfReader

def parse_bytes_pypdf(raw_doc_contents_bytes: bytes):
    #Note: in production setting you might want to try/catch errors here to handle incorrect pdf/files
    pdf = io.BytesIO(raw_doc_contents_bytes)
    reader = PdfReader(pdf)
    parsed_content = [page_content.extract_text() for page_content in reader.pages]
    return "\n".join(parsed_content)

Let's start by extracting text from our PDF.

In [0]:
import io
import re
with requests.get('https://dbdemos-dataset.s3.amazonaws.com/llm/databricks-pdf-documentation/Databricks-Customer-360-ebook-Final.pdf') as pdf:
  doc = parse_bytes_pypdf(pdf.content)  
  print(doc)

This looks great. We'll now wrap it with a text_splitter to avoid having too big pages, and create a Pandas UDF function to easily scale that across multiple nodes.

*Note that our pdf text isn't clean. To make it nicer, we could use a few extra LLM-based pre-processing steps, asking to remove unrelevant content like the list of chapters and to only keep the core text.*

In [0]:
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core import Document, set_global_tokenizer
from transformers import AutoTokenizer
from typing import Iterator

# Reduce the arrow batch size as our PDF can be big in memory (classic compute only)
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10)

@pandas_udf("array<string>")
def read_as_chunk(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    #set llama2 as tokenizer to match our model size (will stay below gte 1024 limit)
    set_global_tokenizer(
      AutoTokenizer.from_pretrained("hf-internal-testing/llama-tokenizer", cache_dir="/tmp/hf_cache")
    )
    #Sentence splitter from llama_index to split on sentences
    splitter = SentenceSplitter(chunk_size=500, chunk_overlap=10)
    def extract_and_split(b):
      try:
        txt = parse_bytes_pypdf(b)
      except Exception as e:
        txt = f'__PDF_PARSING_ERROR__ file: {e}'
        print(txt)
      if txt is None:
        return []
      nodes = splitter.get_nodes_from_documents([Document(text=txt)])
      return [n.text for n in nodes]

    for x in batch_iter:
        yield x.apply(extract_and_split)

## What's required for our Vector Search Index

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/databricks-vector-search-type.png?raw=true" style="float: right" width="800px">

Databricks provide multiple types of vector search indexes:

- **Managed embeddings**: you provide a text column and endpoint name and Databricks synchronizes the index with your Delta table 
- **Self Managed embeddings**: you compute the embeddings and save them as a field of your Delta Table, Databricks will then synchronize the index
- **Direct index**: when you want to use and update the index without having a Delta Table.

In this demo, we will show you how to setup a **Self-managed Embeddings** index. 

To do so, we will have to first compute the embeddings of our chunks and save them as a Delta Lake table field as `array&ltfloat&gt`

## Introducing Databricks GTE Embeddings Foundation Model endpoints

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/rag-pdf-self-managed-4.png?raw=true" style="float: right; width: 600px; margin-left: 10px">

Foundation Models are provided by Databricks, and can be used out-of-the-box.

Databricks supports several endpoint types to compute embeddings or evaluate a model:
- DBRX Instruct, a **foundation model endpoint**, or another model served by databricks (ex: llama2-70B, MPT...)
- An **external endpoint**, acting as a gateway to an external model (ex: Azure OpenAI)
- A **custom**, fined-tuned model hosted on Databricks model service

Open the [Model Serving Endpoint page](/ml/endpoints) to explore and try the foundation models.

For this demo, we will use the foundation model `GTE` (embeddings) and `DBRX` (chat). <br/><br/>

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/databricks-foundation-models.png?raw=true" width="600px" >

In [0]:
from mlflow.deployments import get_deploy_client

# gte-large-en Foundation models are available using the /serving-endpoints/databricks-gtegte-large-en/invocations api. 
deploy_client = get_deploy_client("databricks")

## NOTE: if you change your embedding model here, make sure you change it in the query step too
embeddings = deploy_client.predict(endpoint="databricks-gte-large-en", inputs={"input": ["What is Apache Spark?"]})
pprint(embeddings)

In [0]:
%sql
--Note that we need to enable Change Data Feed on the table to create the index
CREATE TABLE IF NOT EXISTS databricks_pdf_documentation (
  id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  url STRING,
  content STRING,
  embedding ARRAY <FLOAT>
) TBLPROPERTIES (delta.enableChangeDataFeed = true); 

### Computing the chunk embeddings and saving them to our Delta Table

The last step is to now compute an embedding for all our documentation chunks. Let's create an udf to compute the embeddings using the foundation model endpoint.

*Note that this part would typically be setup as a production-grade job, running as soon as a new documentation page is updated. <br/> This could be setup as a Delta Live Table pipeline to incrementally consume updates.*

In [0]:
@pandas_udf("array<float>")
def get_embedding(contents: pd.Series) -> pd.Series:
    import mlflow.deployments
    deploy_client = mlflow.deployments.get_deploy_client("databricks")
    def get_embeddings(batch):
        #Note: this will fail if an exception is thrown during embedding creation (add try/except if needed) 
        response = deploy_client.predict(endpoint="databricks-gte-large-en", inputs={"input": batch})
        return [e['embedding'] for e in response.data]

    # Splitting the contents into batches of 150 items each, since the embedding model takes at most 150 inputs per request.
    max_batch_size = 150
    batches = [contents.iloc[i:i + max_batch_size] for i in range(0, len(contents), max_batch_size)]

    # Process each batch and collect the results
    all_embeddings = []
    for batch in batches:
        all_embeddings += get_embeddings(batch.tolist())

    return pd.Series(all_embeddings)

In [0]:
(spark.readStream.table('pdf_raw')
      .withColumn("content", F.explode(read_as_chunk("content")))
      .filter("content not like '__PDF_PARSING_ERROR__%'") #Drop PDF with parsing ERROR (could throw an error instead or properly flag that in a prod setup to avoid silent failures)
      .withColumn("embedding", get_embedding("content"))
      .selectExpr('path as url', 'content', 'embedding')
  .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", f'dbfs:{volume_folder}/checkpoints/pdf_chunk')
    .table('databricks_pdf_documentation').awaitTermination())

#Let's also add our documentation web page from the simple demo (make sure you run the quickstart demo first)
if spark.catalog.tableExists(f'{catalog}.{db}.databricks_documentation'):
  (spark.readStream.option("skipChangeCommits", "true").table('databricks_documentation') #skip changes for more stable demo
      .withColumn('embedding', get_embedding("content"))
      .select('url', 'content', 'embedding')
  .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", f'dbfs:{volume_folder}/checkpoints/docs_chunks')
    .table('databricks_pdf_documentation').awaitTermination())

In [0]:
%sql
SELECT * FROM databricks_pdf_documentation WHERE url like '%.pdf' limit 10


### Our dataset is now ready! Let's create our Self-Managed Vector Search Index.

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/rag-pdf-self-managed-3.png?raw=true" style="float: right; width: 600px; margin-left: 10px">

Our dataset is now ready. We chunked the documentation pages into small sections, computed the embeddings and saved it as a Delta Lake table.

Next, we'll configure Databricks Vector Search to ingest data from this table.

Vector search index uses a Vector search endpoint to serve the embeddings (you can think about it as your Vector Search API endpoint). <br/>
Multiple Indexes can use the same endpoint. Let's start by creating one.

In [0]:
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient()

if not endpoint_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME):
    vsc.create_endpoint(name=VECTOR_SEARCH_ENDPOINT_NAME, endpoint_type="STANDARD")

wait_for_vs_endpoint_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME)
print(f"Endpoint named {VECTOR_SEARCH_ENDPOINT_NAME} is ready.")


You can view your endpoint on the [Vector Search Endpoints UI](#/setting/clusters/vector-search). Click on the endpoint name to see all indexes that are served by the endpoint.

In [0]:
from databricks.sdk import WorkspaceClient
import databricks.sdk.service.catalog as c

#The table we'd like to index
source_table_fullname = f"{catalog}.{db}.databricks_pdf_documentation"
# Where we want to store our index
vs_index_fullname = f"{catalog}.{db}.databricks_pdf_documentation_self_managed_vs_index"

if not index_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname):
  print(f"Creating index {vs_index_fullname} on endpoint {VECTOR_SEARCH_ENDPOINT_NAME}...")
  try:
    vsc.create_delta_sync_index(
      endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME,
      index_name=vs_index_fullname,
      source_table_name=source_table_fullname,
      pipeline_type="TRIGGERED", #Sync needs to be manually triggered
      primary_key="id",
      embedding_dimension=1024, #Match your model embedding size (gte)
      embedding_vector_column="embedding"
    )
  except Exception as e:
    display_quota_error(e, VECTOR_SEARCH_ENDPOINT_NAME)
    raise e
  #Let's wait for the index to be ready and all our embeddings to be created and indexed
  wait_for_index_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname)
else:
  #Trigger a sync to update our vs content with the new data saved in the table
  wait_for_index_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname)
  vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname).sync()

## Searching for similar content

That's all we have to do. Databricks will automatically capture and synchronize new entries in your Delta Lake Table.

Note that depending on your dataset size and model size, index creation can take a few seconds to start and index your embeddings.

Let's give it a try and search for similar content.

*Note: `similarity_search` also supports a filters parameter. This is useful to add a security layer to your RAG system: you can filter out some sensitive content based on who is doing the call (for example filter on a specific department based on the user preference).*

In [0]:
question = "How can I track billing usage on my workspaces?"

response = deploy_client.predict(endpoint="databricks-gte-large-en", inputs={"input": [question]})
embeddings = [e['embedding'] for e in response.data]

results = vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname).similarity_search(
  query_vector=embeddings[0],
  columns=["url", "content"],
  num_results=1)
docs = results.get('result', {}).get('data_array', [])
pprint(docs)

## Next step: Deploy our chatbot model with RAG

We've seen how Databricks Lakehouse AI makes it easy to ingest and prepare your documents, and deploy a Self Managed Vector Search index on top of it with just a few lines of code and configuration.

This simplifies and accelerates your data projects so that you can focus on the next step: creating your realtime chatbot endpoint with well-crafted prompt augmentation.

Open the [02-Advanced-Chatbot-Chain]($./02-Advanced-Chatbot-Chain) notebook to create and deploy a chatbot endpoint.