In [0]:
import os
from PyPDF2 import PdfReader

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Set the current catalog and schema
spark.sql("USE CATALOG main")
spark.sql("USE SCHEMA default")

# Define the path to the single file
file_path =  "/dbfs/FileStore/final_report.txt"

# Helper function for reading different file formats with error handling
def read_file(file_path):
    try:
        print(f"Reading file: {file_path}")
        if file_path.endswith(".txt"):
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
                print(f"Read {len(content)} characters from TXT file.")
                return content
        elif file_path.endswith(".pdf"):
            reader = PdfReader(file_path)
            text = "".join([page.extract_text() or "" for page in reader.pages])
            print(f"Extracted {len(text)} characters from PDF file.")
            return text
        elif file_path.endswith(".docx"):
            doc = Document(file_path)
            text = " ".join([para.text.strip() for para in doc.paragraphs if para.text.strip()])
            print(f"Extracted {len(text)} characters from DOCX file.")
            return text
        else:
            raise ValueError(f"Unsupported file format: {file_path}")
    except Exception as e:
        raise IOError(f"Error reading file {file_path}: {e}")

# Character-based chunking with error handling
def character_based_chunking(text, chunk_size=2000, overlap=100):
    try:
        chunks = []
        start = 0
        text_length = len(text)

        while start < text_length:
            end = start + chunk_size
            chunk = text[start:end]
            chunks.append(chunk)
            start = end - overlap  # Move the start index with overlap

        print(f"Created {len(chunks)} chunks.")
        return chunks
    except Exception as e:
        raise ValueError(f"Error in chunking the text: {e}")

# Process the single file and save chunks into a table with error handling
def process_single_file(file_path, chunk_size=2000, overlap=100, table_name="character_chunks_group1"):
    try:
        # Read the document
        text = read_file(file_path)

        if not text.strip():
            print(f"The file '{file_path}' is empty or contains only whitespace.")
            return

        # Perform chunking
        chunks = character_based_chunking(text, chunk_size=chunk_size, overlap=overlap)

        if not chunks:
            print("No chunks were created from the file.")
            return

        # Create a list of (chunk_id, chunk_text)
        chunk_data = [{'chunk_id': i+1, 'chunk_text': chunk} for i, chunk in enumerate(chunks)]
        num_chunks = len(chunk_data)  # Count the number of chunks

        # Define schema for DataFrame
        schema = StructType([
            StructField("chunk_id", IntegerType(), False),
            StructField("chunk_text", StringType(), False)
        ])

        # Create DataFrame
        df_chunks = spark.createDataFrame(chunk_data, schema=schema)

        # Save DataFrame as a table with overwriteSchema option to resolve schema conflicts
        df_chunks.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(table_name)

        # Enable Change Data Feed on the Delta table
        spark.sql(f"ALTER TABLE `main`.`default`.`{table_name}` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")

        # Print the number of chunks created and success message
        print(f"Total chunks created: {num_chunks}")
        print(f"Delta table '{table_name}' for file '{file_path}' created successfully with {num_chunks} chunks.")
        
    except Exception as e:
        print(f"Failed to process file '{file_path}': {e}")

# Example usage: Process the single file
if __name__ == "__main__":
    try:
        # Process the single file with default chunk_size and overlap
        process_single_file(file_path)

    except ValueError as ve:
        print(f"Input Error: {ve}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")


Reading file: /dbfs/FileStore/final_report.txt
Read 3576 characters from TXT file.
Created 2 chunks.
Total chunks created: 2
Delta table 'character_chunks_group1' for file '/dbfs/FileStore/final_report.txt' created successfully with 2 chunks.


In [0]:
import os

host = "https://" + spark.conf.get("spark.databricks.workspaceUrl")
os.environ['DATABRICKS_TOKEN'] = 'dapif26092496687d2f91846997b3528fdcc-3'
index_name="main.default.vsi_char_group123"
VECTOR_SEARCH_ENDPOINT_NAME="pavankumar"

In [0]:
from databricks.vector_search.client import VectorSearchClient
from langchain_community.vectorstores import DatabricksVectorSearch
from langchain_community.embeddings import DatabricksEmbeddings

# Setup embeddings model
embedding_model = DatabricksEmbeddings(endpoint="databricks-bge-large-e")

def get_retriever():
    os.environ["DATABRICKS_HOST"] = host

    # Initialize Vector Search Client
    vsc = VectorSearchClient(workspace_url=host, personal_access_token=os.environ["DATABRICKS_TOKEN"])

    # Get vector search index
    vs_index = vsc.get_index(
        endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME,
        index_name=index_name
    )

    # Create the DatabricksVectorSearch object
    vectorstore = DatabricksVectorSearch(
        vs_index, text_column="chunk_text", embedding=embedding_model
    )
    
    # Return retriever object
    return vectorstore.as_retriever()

In [0]:
from langchain.chains import RetrievalQA
from langchain_community.vectorstores import DatabricksVectorSearch
from langchain_community.embeddings import DatabricksEmbeddings
import os

host = "https://" + spark.conf.get("spark.databricks.workspaceUrl")
os.environ['DATABRICKS_TOKEN'] = 'dapif26092496687d2f91846997b3528fdcc-3'
index_name="main.default.vsi_char_group123"
VECTOR_SEARCH_ENDPOINT_NAME="pavankumar"

def get_retriever():
    os.environ["DATABRICKS_HOST"] = host
    vsc = VectorSearchClient(workspace_url=host, personal_access_token=os.environ["DATABRICKS_TOKEN"])
    vs_index = vsc.get_index(endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME, index_name=index_name)
    vectorstore = DatabricksVectorSearch(vs_index, text_column="chunk_text", embedding=embedding_model)
    return vectorstore.as_retriever()
from langchain.prompts import PromptTemplate
from langchain_community.chat_models import ChatDatabricks

chat_model = ChatDatabricks(endpoint="databricks-dbrx-instruct", max_tokens = 200)

TEMPLATE = """You are an assistant specialized in police investigations. You are answering questions related to police reports, criminal activities, suspects, and investigations based on the information provided in the PDF documents. 
If the question is not related to these topics, kindly decline to answer. 
If you don't know the answer, just say that you don't know, don't try to make up an answer. 
If the question appears to be for a report or document you don't have data on, say so. 
Provide all answers only in English, and ensure the responses are concise, relevant, and factual.
Use the following pieces of context to answer the question at the end:
{context}
Question: {question}
Answer:
"""
prompt = PromptTemplate(template=TEMPLATE, input_variables=["context", "question"])

chain = RetrievalQA.from_chain_type(
    llm=chat_model,
    chain_type="stuff",
    retriever=get_retriever(),
    chain_type_kwargs={"prompt": prompt}
)

[NOTICE] Using a Personal Authentication Token (PAT). Recommended for development only. For improved performance, please use Service Principal based authentication. To disable this message, pass disable_notice=True to VectorSearchClient().




In [0]:

question = dbutils.widgets.get("query")
# query="hello"
answer = chain.run(question)
dbutils.notebook.exit(answer)