STEP 1: Read PDFs from Unity Catalog Volumes

In [0]:
%pip install langchain
%restart_python

In [0]:

import fitz  # PyMuPDF
from pyspark.sql.functions import input_file_name
import os

volume_path = "dbfs:/Volumes/genai/default/pdf_store/"
pdf_files = [f.path for f in dbutils.fs.ls(volume_path) if f.path.endswith(".pdf")]

print(pdf_files)


Step 2: Parse and Chunk PDFs

In [0]:
import fitz  # PyMuPDF
from io import BytesIO
import uuid
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Define table schema (once, before first write)
schema = StructType([
    StructField("chunk_id", StringType(), False),
    StructField("source", StringType(), False),
    StructField("content", StringType(), True),
])

# Create empty table only if needed (run once)
spark.createDataFrame([], schema).write.mode("overwrite").format("delta").saveAsTable("genai.default.pdf_chunks")

# Text splitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=50,
)

# Function to process a PDF from Unity Catalog volume
def chunk_pdf_to_rows(dbfs_path):
    try:
        # Read PDF as binary
        spark_df = spark.read.format("binaryFile").load(dbfs_path)
        file_bytes = spark_df.select("content").first()["content"]

        with BytesIO(file_bytes) as f:
            doc = fitz.open(stream=f.read(), filetype="pdf")
            text = "\n".join([page.get_text() for page in doc])

        # Split into chunks
        chunks = text_splitter.create_documents([text])

        # Add metadata manually
        for chunk in chunks:
            chunk.metadata = {"source": dbfs_path}

        # Convert to Spark rows
        rows = [
            Row(
                chunk_id=str(uuid.uuid4()),
                source=chunk.metadata["source"],
                content=chunk.page_content
            )
            for chunk in chunks
        ]
        return rows

    except Exception as e:
        print(f"⚠️ Failed to process {dbfs_path}: {e}")
        return []

# Define volume path
volume_path = "dbfs:/Volumes/genai/default/pdf_store/"

# List all PDFs
pdf_files = [f.path for f in dbutils.fs.ls(volume_path) if f.path.endswith(".pdf")]

# Aggregate all rows
all_rows = []
for pdf_path in pdf_files:
    rows = chunk_pdf_to_rows(pdf_path)
    all_rows.extend(rows)

# Write to Delta only if there is data
if all_rows:
    df = spark.createDataFrame(all_rows, schema=schema)
    df.write.mode("append").format("delta").saveAsTable("genai.default.pdf_chunks")
    print(f"✅ Successfully ingested {len(all_rows)} chunks into genai.default.pdf_chunks")
else:
    print("⚠️ No valid PDF chunks found to write.")


STEP 3: Code to Embed Your Chunks

In [0]:
from mlflow.deployments import get_deploy_client
from pyspark.sql.functions import pandas_udf
import pandas as pd

client = get_deploy_client("databricks")
EMBEDDING_ENDPOINT_NAME = "databricks-bge-large-en"

# Define a batch embedding function for a pandas Series
def get_embeddings_batch(text_series: pd.Series) -> pd.Series:
    # Batch call
    response = client.predict(endpoint=EMBEDDING_ENDPOINT_NAME, inputs={"input": text_series.tolist()})
    return pd.Series([embedding for embedding in response["embeddings"]])


STEP 4: Add Embeddings to Delta Table

_DEBUGGING Snippet
Pull a small batch_
sample_df = spark.table("genai.default.pdf_chunks").limit(2).toPandas()

_Define a local (non-UDF) embedding function_
def local_embed(text):
    print(f"Embedding: {text[:50]}...")  # Short preview
    response = client.predict(endpoint=EMBEDDING_ENDPOINT_NAME, inputs={"input": [text]})
    return response["data"][0]["embedding"]

sample_df["embedding"] = sample_df["content"].apply(local_embed)
print(sample_df)



In [0]:
from mlflow.deployments import get_deploy_client
import pandas as pd

client = get_deploy_client("databricks")
endpoint = "databricks-bge-large-en"

def get_embeddings_batch(texts):
    response = client.predict(endpoint=endpoint, inputs={"input": texts})

    if "data" not in response:
        raise ValueError(f"'data' key not found in response: {response}")

    # Extract embeddings list
    return [item["embedding"] for item in response["data"]]

# Fetch already embedded chunk_ids to skip
try:
    embedded_chunk_ids = spark.table("genai.default.pdf_chunks_embedded") \
                               .select("chunk_id") \
                               .toPandas()["chunk_id"] \
                               .tolist()
except:
    embedded_chunk_ids = []

# Read original chunks table in batches
batch_size = 100
total_rows = spark.sql("SELECT COUNT(*) FROM genai.default.pdf_chunks").collect()[0][0]

for offset in range(0, total_rows, batch_size):
    batch_df = spark.sql(f"""
        SELECT * FROM genai.default.pdf_chunks
        ORDER BY chunk_id
        LIMIT {batch_size} OFFSET {offset}
    """).toPandas()

    # Filter out already embedded chunk_ids
    batch_df = batch_df[~batch_df["chunk_id"].isin(embedded_chunk_ids)]

    if batch_df.empty:
        print(f"Skipping offset {offset} (already embedded)")
        continue

    # Generate embeddings
    batch_df["embedding"] = get_embeddings_batch(batch_df["content"].tolist())

    # Write to embedded table
    spark.createDataFrame(batch_df).write.mode("append").format("delta").saveAsTable("genai.default.pdf_chunks_embedded")

    print(f"✅ Embedded and stored batch offset {offset} ({len(batch_df)} rows)")
