In [0]:
%pip install pypdf databricks-vectorsearch
%pip install sentence-transformers
dbutils.library.restartPython()  # Restart Python to apply changes


In [0]:
import os
import torch
import pandas as pd
from pypdf import PdfReader
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from databricks.vector_search.client import VectorSearchClient
from transformers import AutoModel, AutoTokenizer, pipeline


In [0]:
pdf_path = "/Workspace/Users/shantanu@meteoros.in/Mantis_shrimp_facts.pdf" #path to your Mantis Shrimp facts PDF

def extract_text_from_pdf(pdf_path):
    reader = PdfReader(pdf_path)
    text = "\n".join([page.extract_text() for page in reader.pages if page.extract_text()])
    return text

document_text = extract_text_from_pdf(pdf_path)

print(document_text[:1000])  # Show first 1000 characters for verification


In [0]:
def chunk_text(text, chunk_size=512):
    sentences = text.split(". ")
    chunks = []
    chunk = ""
    
    for sentence in sentences:
        if len(chunk) + len(sentence) < chunk_size:
            chunk += sentence + ". "
        else:
            chunks.append(chunk.strip())
            chunk = sentence + ". "
    
    if chunk:
        chunks.append(chunk.strip())

    return chunks

# Apply chunking
chunks = chunk_text(document_text, chunk_size=512)

# Display chunk information
print(f"Total Chunks: {len(chunks)}")
print("Sample Chunk:\n", chunks[0])
print()
print("Sample Chunk:\n", chunks[1])
print()
print("Sample Chunk:\n", chunks[2])
print()
print("Sample Chunk:\n", chunks[-1])


In [0]:
# Only to be uncommented if first chunk is empty. Otherwise comment it. 
chunks = chunks[1:]

In [0]:
# Convert chunks to Spark DataFrame
spark.sql("USE hive_metastore")  # Ensure we use the default catalog

df = spark.createDataFrame([(i, chunk) for i, chunk in enumerate(chunks)], ["id", "text"])

# Save to Delta Table in hive_metastore
df.write.format("delta").mode("overwrite").saveAsTable("rag_chunks")

print("Document chunks stored in Delta Table (hive_metastore.rag_chunks)")


In [0]:
# Now let's write the data into a delta live table.
# Initialize Spark Session
spark = SparkSession.builder.appName("RAG").getOrCreate()

# Convert chunks to Spark DataFrame
df = spark.createDataFrame([(i, chunk) for i, chunk in enumerate(chunks)], ["id", "text"])

# Define Delta table path
delta_table_path = "/mnt/rag_delta_table"

# Save as Delta Table
df.write.format("delta").mode("overwrite").save(delta_table_path)
print("Text chunks stored in Delta Table.")


In [0]:
from sentence_transformers import SentenceTransformer

# Load Hugging Face embedding model
embedding_model = "BAAI/bge-large-en"
model = SentenceTransformer(embedding_model)

def generate_embedding(text):
    return model.encode(text).tolist()

# Generate embeddings for each chunk
embeddings = [(i, generate_embedding(chunk)) for i, chunk in enumerate(chunks)]

# Convert to Spark DataFrame
embeddings_df = spark.createDataFrame(embeddings, ["id", "embedding"])

# Save embeddings to Delta Table in hive_metastore
embeddings_df.write.format("delta").mode("overwrite").saveAsTable("rag_embeddings")

print("Embeddings stored in Delta Table (hive_metastore.rag_embeddings)")


In [0]:
from pyspark.sql.functions import col
import numpy as np

# Function to compute cosine similarity
def cosine_similarity(a, b):
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

# Define a query embedding
query_text = "What does mantis shrimp eat?"
query_embedding = model.encode(query_text).tolist()

# Load embeddings from Delta Table
df_embeddings = spark.sql("SELECT * FROM rag_embeddings").toPandas()

# Compute similarity scores
df_embeddings["similarity"] = df_embeddings["embedding"].apply(lambda x: cosine_similarity(query_embedding, x))

# Get top 3 most relevant chunks
top_chunks = df_embeddings.sort_values(by="similarity", ascending=False).head(3)

# Retrieve corresponding text chunks
retrieved_chunks = spark.sql("SELECT * FROM rag_chunks").toPandas()
retrieved_texts = retrieved_chunks[retrieved_chunks["id"].isin(top_chunks["id"])]["text"].tolist()

print("🔎 Retrieved Chunks:")
for text in retrieved_texts:
    print('******')
    print(text)


In [0]:
from transformers import pipeline

# Load Hugging Face LLM 
llm_pipeline = pipeline("text-generation", model="facebook/opt-1.3b")
# very small model-> facebook/opt-6.7b is better but may crash the kernel 

# Generate response based on retrieved context
context = "\n".join(retrieved_texts)
prompt = f"Context: {context}\n\nQuestion: {query_text}\nAnswer:"

response = llm_pipeline(prompt, max_length=150, num_return_sequences=1)
print("💡 AI Response:\n", response[0]["generated_text"])


In [0]:
# Generate response based on retrieved context
context = "\n".join(retrieved_texts)
prompt = f"Context: {context}\n\nQuestion: {query_text}\nAnswer:"

response = llm_pipeline(prompt, max_length=150, num_return_sequences=1)
print("💡 AI Response:\n", response[0]["generated_text"])

Below code is for high level understanding and WILL NOT EXECUTE. 

In [0]:
# WARNING: FOLLOWING CODE WILL NOT WORK AND THROW ERROR DUE TO UNITY CATALOG/PERMISSIONS 

# Part 1- building embeddings and vectors

import mlflow
from databricks.vector_search.client import VectorSearchClient
from pyspark.sql.functions import col

# Initialize Databricks Vector Search Client
vsc = VectorSearchClient()

# Define Unity Catalog schema and table
catalog_name = "your_catalog"
schema_name = "your_schema"
vector_index_name = "your_vector_index"

# Load Databricks Model for Embeddings
embedding_model_uri = "models:/your_embedding_model/production"
embedding_model = mlflow.pyfunc.load_model(embedding_model_uri)

def generate_embeddings(texts):
    """Generate embeddings using Databricks MLflow Model."""
    return [embedding_model.predict([text])[0] for text in texts]

# Create or load the Vector Search index
try:
    vsc.get_index(catalog_name, schema_name, vector_index_name)
except Exception:
    vsc.create_index(catalog_name, schema_name, vector_index_name, dimension=768)

# Load dataset from Unity Catalog
df = spark.read.table(f"{catalog_name}.{schema_name}.your_text_table")

# Generate embeddings and store them in Vector Search
text_data = df.select("text_column").rdd.map(lambda row: row[0]).collect()
embeddings = generate_embeddings(text_data)

# Convert to DataFrame and store in Vector Search
vector_df = spark.createDataFrame(
    [(i, text, embedding) for i, (text, embedding) in enumerate(zip(text_data, embeddings))],
    ["id", "text", "embedding"]
)

vector_df.write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.{vector_index_name}")

print("Embeddings stored in Unity Catalog and Databricks Vector Search.")


In [0]:
# WARNING: FOLLOWING CODE WILL NOT WORK AND THROW ERROR DUE TO UNITY CATALOG/PERMISSIONS

# Part 2- RAG 

# Load a Databricks-hosted LLM model for RAG
llm_model_uri = "models:/your_llm_model/production"
llm_model = mlflow.pyfunc.load_model(llm_model_uri)

def retrieve_relevant_chunks(query, top_k=5):
    """Retrieve relevant text chunks using Databricks Vector Search."""
    query_embedding = embedding_model.predict([query])[0]
    results = vsc.search(
        catalog_name, schema_name, vector_index_name,
        query_embedding, top_k=top_k
    )
    return [row["text"] for row in results]

def generate_response(query):
    """Generate response using retrieved chunks and LLM."""
    relevant_chunks = retrieve_relevant_chunks(query)
    context = "\n".join(relevant_chunks)
    prompt = f"Context:\n{context}\n\nQuestion: {query}\n\nAnswer:"
    response = llm_model.predict([prompt])[0]
    return response

# Example usage
query = "What is a Mantis Shrimp?"
response = generate_response(query)
print("Generated Response:", response)
