📂 Option 1: Download to /tmp/ (local storage)
### 1. Download sample PDFs from GitHub

In [0]:
%sh
mkdir -p /tmp/pdfs
wget -O /tmp/pdfs/sample1.pdf https://github.com/mozilla/pdf.js-sample-files/blob/master/tracemonkey.pdf?raw=true
#wget -O /tmp/pdfs/sample2.pdf tracemonkey.pdf


### 2. Verify Files Exist

In [0]:
import os
os.listdir("/tmp/pdfs")

In [0]:
%sh
ls -lh /tmp/pdfs

Note: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.

In [0]:
%pip install --quiet PyPDF2 sentence-transformers faiss-cpu pandas databricks-vectorsearch


In [0]:
%restart_python

####⚡ Rule of Thumb:
If Spark can’t guess the schema → you must tell it explicitly.

### 1. Re-create the page-level DataFrame (your working patch)

In [0]:
# %python
import os
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
import PyPDF2

pdf_path = "/tmp/pdfs/sample1.pdf"   # <-- change if needed
reader = PyPDF2.PdfReader(pdf_path)

# Collect page-level chunks
page_chunks = []
for i, page in enumerate(reader.pages):
    text = page.extract_text()
    if text:
        page_chunks.append((os.path.basename(pdf_path), f"page_{i+1}", text))

# Schema for DataFrame
schema = StructType([
    StructField("filename", StringType(), True),
    StructField("page", StringType(), True),
    StructField("content", StringType(), True)
])

# Create Spark DataFrame
page_df = spark.createDataFrame(page_chunks, schema=schema)
display(page_df.limit(5))


### 2. Persist page-level DataFrame into a Delta (Bronze) table

In [0]:
# %python
# Create a small database to keep things organized (optional)
spark.sql("CREATE DATABASE IF NOT EXISTS demo_docs")

# Write bronze
page_df.write.format("delta").mode("overwrite").saveAsTable("demo_docs.pages_bronze")
print("Saved demo_docs.pages_bronze")

#spark.sql("DESCRIBE DETAIL demo_docs").select("location").show()

### 3. Chunk text strings into smaller pieces (sliding window / overlap)
We'll chunk by approximate word count. You can change chunk_size and overlap to tune retrieval granularity.

> [!NOTE]: What is **RDD FlatMap**</br>
Apache Spark Map vs FlatMap Operation - DataFlairAn RDD flatMap in Apache Spark is a transformation operation that applies a function to each element of a Resilient Distributed Dataset (RDD) and then flattens the results into a single RDD, effectively performing a one-to-many mapping. Unlike map, which returns a single output for each input, flatMap can return zero, one, or more elements from the function, making it ideal for scenarios like splitting a line of text into individual word

In [0]:
# %python
from pyspark.sql import Row
import math 
import pandas as pd

# 1. Collect Spark DF -> Pandas
page_pdf = page_df.toPandas()

# 2. Define chunking function (same as before)
def chunk_text_words(text, chunk_size=200, overlap=50):
    if not text:
        return []
    words = text.split()
    n = len(words)
    if n <= chunk_size:
        return [" ".join(words)]
    chunks = []
    start = 0
    while start < n:
        end = start + chunk_size
        chunk = " ".join(words[start:min(end, n)])
        chunks.append(chunk)
        if end >= n:
            break
        start = end - overlap
    return chunks

# 3. Apply chunking
chunk_records = []
for _, row in page_pdf.iterrows():
    chunks = chunk_text_words(row["content"], chunk_size=200, overlap=50)
    for idx, c in enumerate(chunks):
        chunk_records.append({
            "filename": row["filename"],
            "page": row["page"],
            "chunk_id": idx+1,
            "content": c
        })

# 4. Convert back to Spark
chunks_df = spark.createDataFrame(chunk_records)

# 5. Save into Delta
chunks_df.write.format("delta").mode("overwrite").saveAsTable("demo_docs.chunks_bronze")

display(chunks_df.limit(10))



# # Chunking function (word-based, sliding-window)
# def chunk_text_words(text, chunk_size=200, overlap=50):
#     """
#     chunk_size = number of words per chunk
#     overlap = number of words to overlap between adjacent chunks
#     """
#     if not text:
#         return []
#     words = text.split()
#     n = len(words)
#     if n <= chunk_size:
#         return [" ".join(words)]
#     chunks = []
#     start = 0
#     while start < n:
#         end = start + chunk_size
#         chunk = " ".join(words[start:min(end, n)])
#         chunks.append(chunk)
#         if end >= n:
#             break
#         start = end - overlap
#     return chunks


# # Schema for the output
# chunk_schema = StructType([
#     StructField("filename", StringType(), True),
#     StructField("page", StringType(), True),
#     StructField("chunk_id", IntegerType(), True),
#     StructField("content", StringType(), True)
# ])

# # Function to apply per batch (pandas df -> pandas df)
# def chunk_pages(pdf: pd.DataFrame) -> pd.DataFrame:
#     records = []
#     for _, row in pdf.iterrows():
#         chunks = chunk_text_words(row["content"], chunk_size=200, overlap=50)
#         for idx, c in enumerate(chunks):
#             records.append({
#                 "filename": row["filename"],
#                 "page": row["page"],
#                 "chunk_id": idx + 1,
#                 "content": c
#             })
#     return pd.DataFrame(records)

# # Apply mapInPandas
# chunks_df = page_df.mapInPandas(chunk_pages, schema=chunk_schema)

# # Save to Delta
# chunks_df.write.format("delta").mode("overwrite").saveAsTable("demo_docs.chunks_bronze")

# display(chunks_df.limit(10))

# # Convert page_df -> chunk rows using RDD flatMap
# # def page_to_chunks(row):
# #     # row: (filename, page, content)
# #     chunks = chunk_text_words(row['content'], chunk_size=200, overlap=50)
# #     out = []
# #     for idx, c in enumerate(chunks):
# #         out.append(Row(filename=row['filename'],
# #                        page=row['page'],
# #                        chunk_id=idx+1,
# #                        content=c))
# #     return out

# # rdd_chunks = page_df.rdd.flatMap(page_to_chunks)
# # chunks_df = spark.createDataFrame(rdd_chunks)

# # # Persist chunk table (bronze -> silver in a real pipeline)
# # chunks_df.write.format("delta").mode("overwrite").saveAsTable("demo_docs.chunks_bronze")
# # print("Saved demo_docs.chunks_bronze")
# # display(chunks_df.limit(10))



### 4. Create embeddings for each chunk (using Sentence-Transformers)

For Free Edition we’ll use an in-session model (all-MiniLM-L6-v2) from sentence-transformers. This is small and fast. We will convert the chunk DataFrame to pandas (for small datasets) and compute embeddings in batches.

In [0]:
# %python
from sentence_transformers import SentenceTransformer
import numpy as np
import pandas as pd
import math

# Load small embedding model (works locally)
model = SentenceTransformer("all-MiniLM-L6-v2")

# Pull chunks into pandas (ONLY for modest data sizes).
# For larger datasets, use mapInPandas or handle in streaming/batches.
chunks_pd = chunks_df.select("filename","page","chunk_id","content").toPandas()
print("Number of chunks:", len(chunks_pd))

# Compute embeddings in batches to avoid OOM
batch_size = 64
embeddings = []
for i in range(0, len(chunks_pd), batch_size):
    batch_texts = chunks_pd["content"].iloc[i:i+batch_size].tolist()
    emb_batch = model.encode(batch_texts, show_progress_bar=False, convert_to_numpy=True)
    embeddings.append(emb_batch)
embeddings = np.vstack(embeddings)  # shape: (num_chunks, dim)
print("Embeddings shape:", embeddings.shape)

# Attach embeddings and persistent id
chunks_pd["vector_id"] = np.arange(len(chunks_pd))  # unique int id on the vector namespace
chunks_pd["embedding"] = embeddings.tolist()

# Convert back to Spark DataFrame (embedding column will be an ArrayType(DoubleType()))
from pyspark.sql import SparkSession
emb_spark = spark.createDataFrame(chunks_pd)
# Save as embeddings table
emb_spark.write.format("delta").mode("overwrite").saveAsTable("demo_docs.chunks_with_embeddings")
print("Saved demo_docs.chunks_with_embeddings")
display(emb_spark.limit(5))
# spark.sql("DESCRIBE DETAIL demo_docs").select("location").show()


### 5. Build a local FAISS index (for similarity search in Free Edition)
We save the FAISS index file to /tmp/ and keep a mapping table (vector_id → chunk metadata).

In [0]:
# %python
import faiss
import numpy as np
from pathlib import Path

# Fetch embeddings numpy array (assume small-ish)
vecs = embeddings.astype(np.float32)
d = vecs.shape[1]
index = faiss.IndexFlatL2(d)   # exact index (Vector Search Index); for large data use IndexIVFFlat or HNSW
index.add(vecs)
print("FAISS index size:", index.ntotal)

# Save index to /tmp (ephemeral)
faiss_file = "/tmp/faiss_chunks.index"
faiss.write_index(index, faiss_file)
print("FAISS index saved to", faiss_file)

# Save mapping (vector_id -> filename,page,chunk_id) as spark table if not already
map_pd = chunks_pd[["vector_id","filename","page","chunk_id","content"]]
map_spark = spark.createDataFrame(map_pd)
map_spark.write.format("delta").mode("overwrite").saveAsTable("demo_docs.vectorid_to_chunk")
print("Saved demo_docs.vectorid_to_chunk")


### 6. Example: Run similarity search (query → embeddings → FAISS lookup → get chunk content)

In [0]:
# %python
# Query text
query_text = "Explain Trace Trees"   # replace with user question

# Create embedding for query
q_emb = model.encode([query_text]).astype(np.float32)

# Load index from file (just to demonstrate persist/load)
index = faiss.read_index("/tmp/faiss_chunks.index")

# Search top_k
top_k = 5
D, I = index.search(q_emb, top_k)   # I is indices into the vectors array
print("Distances:", D)
print("Indices:", I)

# I is vector index positions which we set equal to vector_id when creating
vector_ids = I[0].tolist()

# Fetch metadata + content for matched vector_ids from spark table
vector_ids_str = ",".join(str(int(v)) for v in vector_ids)
sql = f"SELECT * FROM demo_docs.vectorid_to_chunk WHERE vector_id IN ({vector_ids_str})"
hits = spark.sql(sql).toPandas()
print(hits[["vector_id","filename","page","chunk_id","content"]].to_dict(orient="records"))


### 7. (Optional) Persist the FAISS index file outside the cluster

Because /tmp is **ephemeral**, you may want to download the index to your laptop, or upload to a GitHub release / cloud storage via curl/aws s3 cp etc. For Free Edition without cloud credentials, download via browser: expose the file via a tiny HTTP server (not recommended for security) or copy file content and save locally. Simpler: re-create indexing steps whenever you restart.

## ✅ Good Practices & Notes

- **Chunk size / overlap**  
  Typical chunk sizes: **150–500 words** with overlap **20–100 words**.  
  Tune these depending on your LLM’s context window and retrieval quality.

- **Batch embeddings**  
  Generate embeddings in batches to avoid memory issues.  
  Example: `batch_size = 64`.

- **Large data**  
  For large corpora:  
  - Compute embeddings in worker nodes (e.g., `mapInPandas` or distributed UDFs).  
  - Write embeddings incrementally to Delta for scalability.

- **Vector index type**  
  - For small demos: use **`IndexFlatL2`** (exact search).  
  - For larger corpora: use **IVF** or **HNSW** indexes to reduce memory usage and improve retrieval speed.

- **Storage**  
  Save embeddings and mapping as **Delta tables** so they can be queried with SQL.  
  This ensures a **single source of truth**.

- **Free Edition constraints**  
  - Everything in `/tmp` is **ephemeral** → re-run steps after cluster restarts.  
  - **Databricks Vector Search** (managed indexes) and **Unity Catalog** may not be available.  
    → Use **FAISS** as a local alternative.

- **Downstream usage**  
  After retrieving candidate chunks:  
  - Pass top-k chunks (or a concatenated/summary version) into your **LLM / QA model** (RAG pipeline).  
  - For higher precision, consider **reranking** using a cross-encoder.


In [0]:
from dataclasses import dataclass
from typing import List, Optional
from databricks.vector_search.client import VectorSearchClient

# ---- Structured return type ----
@dataclass
class SearchResult:
    id: str
    score: float
    content: str
    metadata: Optional[dict] = None

# ---- Search wrapper ----
def run_search(
    client: VectorSearchClient,
    index_name: str,
    query: str,
    k: int = 5
) -> List[SearchResult]:
    """
    Run semantic vector search against a Databricks Vector Search Index.

    Args:
        client: Databricks VectorSearchClient instance.
        index_name: Name of the vector search index.
        query: Natural language query string.
        k: Top-k results to return.

    Returns:
        List of SearchResult objects with id, score, content, and metadata.
    """
    index = client.get_index(index_name)

    results = index.similarity_search(
        query_vector=query,
        k=k,
        return_metadata=True
    )

    return [
        SearchResult(
            id=row["id"],
            score=row["score"],
            content=row["metadata"].get("content", ""),
            metadata=row["metadata"]
        )
        for row in results["matches"]
    ]


In [0]:
# After FAISS search
vector_ids = I[0].tolist()

# # Fetch metadata and chunk content
# vector_ids_str = ",".join(str(int(v)) for v in vector_ids)

# sql = f"""
# SELECT vector_id, filename, page, chunk_id, content
# FROM demo_docs.vectorid_to_chunk
# WHERE vector_id IN ({vector_ids_str})
# ORDER BY FIELD(vector_id, {vector_ids_str})  -- preserve FAISS ranking
# """
# hits = spark.sql(sql).toPandas()

order_expr = "CASE"
for rank, vid in enumerate(vector_ids):
    order_expr += f" WHEN vector_id = {vid} THEN {rank}"
order_expr += " END"

sql = f"""
SELECT vector_id, filename, page, chunk_id, content
FROM demo_docs.vectorid_to_chunk
WHERE vector_id IN ({vector_ids_str})
ORDER BY {order_expr}
"""
hits = spark.sql(sql).toPandas()

#############################################################

from dataclasses import dataclass

@dataclass
class SearchHit:
    filename: str
    page: int
    chunk_id: int
    content: str

# Convert DataFrame to list of SearchHit objects
hit_objects = [
    SearchHit(
        filename=row["filename"],
        page=row["page"],
        chunk_id=row["chunk_id"],
        content=row["content"]
    )
    for _, row in hits.iterrows()
]

#######################################################
def build_answer(hits):
    answer = "Here are the relevant details from the documents:\n\n"
    for idx, hit in enumerate(hits, start=1):
        answer += (
            f"{idx}. File: {hit.filename}, Page: {hit.page}, Chunk: {hit.chunk_id}\n"
            f"Content: {hit.content}\n\n"
        )
    return answer

nl_answer = build_answer(hit_objects)

#print(type(nl_answer))
print(nl_answer)


Here are the relevant details from the documents:

1. File: recipes.pdf, Page: 12, Chunk: 3
Content: To make a spicy chicken curry, start with...

2. File: cooking_tips.pdf, Page: 7, Chunk: 1
Content: The best way to balance heat and flavor in your curry is...