# 2a - PDF to docs_text
* Notebook by Adam Lang
* Notebook was adopted from the Databricks webinar in June 2024 that streamed on the Databricks YouTube channel.
* This is the 2nd notebook and the first step after creating the tables within the unity catalog. 
* Date: 4/28/2025

## 1. Install dependencies

### Note about PDF Extraction Library - `PdfPlumbr`
* `PdfPlumbr` is a library that allows you to extract text, tables, and metadata from PDFs. 
* It provides a simple API to work with and is particularly good at handling complex PDF layouts.
* Overall it provides a simple and intuitive interface for extracting text, images, and layout information from PDF documents. 
* It also supports advanced features like table detection and text extraction from complex layouts.
* There are numerous other extraction libraries available such as the newer `PyMuPDF4LLM` which uses an LLM under the hood. 
* Here is a full list of options in python for [PDF extraction options](https://medium.com/@bojjasharanya/automating-pdf-data-extraction-your-ultimate-guide-for-choosing-the-suitable-library-d87a3dcf27e5)

In [0]:
%pip install Pdfplumber langchain

## then restart kernel
dbutils.library.restartPython()

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


## 2. Create Dataframe for file storage
* This will show us what files we currently have in the catalog.

In [0]:
import os 
from pyspark.sql.functions import substring_index 

# Set directory path
dir_path = "/Volumes/workspace/llm_rag_demos/pdf_rag_dbrx"

# list the files in the directory
file_paths = [file.path for file in dbutils.fs.ls(dir_path)]

# Extract file names from paths --> spark df
df = spark.createDataFrame(file_paths, "string").select(substring_index("value", "/", -1).alias("file_name"))

# Show df
df.toPandas()

Unnamed: 0,file_name
0,Washer_Manual.pdf


Summary
* We can see there is only 1 PDF document currently in the file store. 

## 3. Process new PDF Files -- Split & Chunk Text
* Important Distinctions about RecursiveCharacterTextSplitter and TokenTextSplitter
* The [ChromaDB research paper](https://research.trychroma.com/evaluating-chunking#langchain_recursive_text_splitter) took a critical look at the 2 most popular methods for chunking:

1. RecursiveCharacterTextSplitter
2. TokenTextSplitter

These 2 methods are some of the most popular chunking methods, and the default used by many RAG systems.

However, these chunking methods are insensitive to the **semantic content of the corpus**, relying instead on the position of character sequences to divide documents into chunks, up to a maximum specified length.

ChromaDB utilized the very popular implementation of the Langchain library.

Their findings from the paper above were very interesting:
  * They found that it was necessary to alter some defaults to achieve fair results
  * By default, the `RecursiveCharacterTextSplitter` uses the following separators: ["\n\n", "\n", " ", ""].
  * They found this would commonly result in very short chunks, which performed poorly in comparison to TokenTextSplitter which produces chunks of a fixed length by default.
  * Therefore, they used these additional sentence based separators: ["\n\n", "\n", ".", "?", "!", " ", ""] to improve chunk quality and size.

In [0]:
import os 
import pdfplumber
from langchain.text_splitter import RecursiveCharacterTextSplitter  

## set path of PDF volume store
pdf_volume_path = "/Volumes/workspace/llm_rag_demos/pdf_rag_dbrx"

## List PDF files already processed in the Delta Table
processed_files = spark.sql(f"SELECT DISTINCT file_name FROM workspace.llm_rag_demos.docs_track").collect()
processed_files = set(row["file_name"] for row in processed_files)

## Process ONLY the NEW PDF files
new_files = [file for file in os.listdir(pdf_volume_path) if file not in processed_files]

## init empty string to store all text extracted from NEW PDF files
all_text = ''
## loop through and extract text from each PDF file
for file_name in new_files:
  # Extract text from PDF file
  pdf_path = os.path.join(pdf_volume_path, file_name)

  with pdfplumber.open(pdf_path) as pdf:
    for pdf_page in pdf.pages:
      single_page_text = pdf_page.extract_text()
      ## separate each page's text with newline
      all_text = all_text + '\n' + single_page_text

# Split combined text into chunks using RecursiveCharacterTextSplitter
from langchain.text_splitter import RecursiveCharacterTextSplitter

## init text splitter
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, # character length
                                               chunk_overlap=200,  # Overlap
                                               length_function=len,  # Character length with len()
                                               separators=["\n\n", "\n", ".", "?", "!", " ", ""] # adding ChromaDB separators
)

## create chunks
chunks = text_splitter.split_text(all_text)




## 4. Pandas UDF to chunk text data for insertion
* We can then use a Pandas UDF when inserting our text into the docs_track table. 
* A pandas user-defined function (UDF)—also known as vectorized UDF—is a user-defined function that uses Apache Arrow to transfer data and pandas to work with the data. 
* pandas UDFs allows vectorized operations that can increase performance up to 100x compared to row-at-a-time Python UDFs.

In [0]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, StringType
import pandas as pd

# create pandas udf function
@pandas_udf('array<string>')
def split_chunks_udf(dummy):
    return pd.Series([chunks])
  
# Register the UDF 
spark.udf.register('split_chunks_udf', split_chunks_udf)

<function __main__.split_chunks_udf(dummy)>

In [0]:
%sql

insert into workspace.llm_rag_demos.docs_text (text)
select explode(split_chunks_udf('dummy')) as text;

num_affected_rows,num_inserted_rows
295,295


Summary
* We have now created 295 rows of text chunks from the original PDF document as we can see above. 

Finally, we create a temporary table from the Dataframe.

In [0]:
## create temporary table from df
df.createOrReplaceTempView("temp_table")

# Insert only rows that do not already exist in target table
spark.sql("""
    INSERT INTO workspace.llm_rag_demos.docs_track
    SELECT * FROM temp_table
    WHERE NOT EXISTS (
      SELECT 1 FROM workspace.llm_rag_demos.docs_track
      WHERE temp_table.file_name = workspace.llm_rag_demos.docs_track.file_name
    )
          
          
          """)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

Summary
* Updated the docs_track table with the chunks we created. 
* Notice we used IF NOT EXISTS so that if the chunks do already exist we won't insert them again. 