In [None]:
import fitz  # PyMuPDF
import nltk
nltk.download('punkt')
from nltk.tokenize import sent_tokenize
import google.generativeai as genai
import chromadb
from IPython.display import Markdown

# Step 1: Extract text from the PDF
def extract_text_from_pdf(pdf_path):
    text = ""
    doc = fitz.open(pdf_path)
    for page in doc:
        text += page.get_text()
    return text

pdf_text = extract_text_from_pdf('dataset/best 55 places.pdf')

# Step 2: Improved text splitting
def split_text(text, max_chunk_size=1000):
    sentences = sent_tokenize(text)
    chunks = []
    current_chunk = ""
    
    for sentence in sentences:
        if len(current_chunk) + len(sentence) + 1 <= max_chunk_size:
            current_chunk += " " + sentence
        else:
            chunks.append(current_chunk.strip())
            current_chunk = sentence
    
    if current_chunk:
        chunks.append(current_chunk.strip())
    
    return chunks

text_chunks = split_text(pdf_text)

# Step 3: Data Cleaning and Validation
def clean_text(text):
    clean_text = text.replace("\n", " ").replace("\r", " ").strip()
    return clean_text

cleaned_chunks = [clean_text(chunk) for chunk in text_chunks]

# Step 4: Configure Google Gemini API
API_KEY = 'AIzaSyDleZ4xVF9dCT7aw95WBeDpfHwktn4LUQ0'  # Replace with your actual API key
genai.configure(api_key=API_KEY)

class GeminiEmbeddingFunction:
    def __call__(self, input):
        model = 'models/text-embedding-004'
        response = genai.embed_content(model=model, content=input, task_type="retrieval_document")
        if 'embedding' in response:
            return response['embedding']
        else:
            raise KeyError(f"'embedding' key not found in response: {response}")

# Step 5: Store embeddings in ChromaDB
def create_chroma_db(documents, name):
    chroma_client = chromadb.Client()

    # Check if the collection exists
    try:
        existing_collection = chroma_client.get_collection(name=name)
        chroma_client.delete_collection(name=name)
    except ValueError:
        pass  # Collection does not exist

    db = chroma_client.create_collection(name=name, embedding_function=GeminiEmbeddingFunction())

    for i, d in enumerate(documents):
        db.add(
            documents=[d],
            ids=[str(i)]
        )
    return db

db = create_chroma_db(cleaned_chunks, "egypt_places_chromadb")

# Step 6: Prompt Refinement
def test_prompt(prompt, function, *args):
    try:
        response = function(*args)
        return response
    except Exception as e:
        print(f"Error testing prompt: {e}")
        return None

# Example function to simulate embedding generation
def generate_embedding(text):
    model = 'models/text-embedding-004'
    response = genai.embed_content(model=model, content=text, task_type="retrieval_document")
    if 'embedding' in response:
        return response['embedding']
    else:
        raise KeyError(f"'embedding' key not found in response: {response}")

# Querying and response generation functions
def get_relevant_passage(query, db):
    result = db.query(query_texts=[query], n_results=1)
    passage = result['documents'][0][0]
    return passage

# Continuous Testing and Refinement
def continuous_testing_and_refinement(db):
    # Define initial prompts
    prompts = {
        "embedding": "Generate an embedding for the following text: {text}",
        "querying": "Find the most relevant information about {query}.",
        "response": "Provide a detailed response for the query: {query}."
    }

    # Test and refine prompts iteratively
    for prompt_type, prompt in prompts.items():
        print(f"Testing {prompt_type} prompt...")
        if prompt_type == "embedding":
            text = "Pyramids in Egypt"
            result = test_prompt(prompt, generate_embedding, text)
        else:
            query = "Pyramids in Egypt"
            result = test_prompt(prompt, get_relevant_passage, query, db)
        
        if not result:
            # Refine prompt if necessary
            refined_prompt = f"Refined {prompt_type} prompt text here: {query if prompt_type != 'embedding' else text}"
            result = test_prompt(refined_prompt, generate_embedding, text) if prompt_type == "embedding" else test_prompt(refined_prompt, get_relevant_passage, query, db)
        
        print(f"Result for {prompt_type} prompt: {result}")

# Run continuous testing and refinement process
continuous_testing_and_refinement(db)

# Step 7: Query the Database
query = "Pyramids in Egypt"
passage = get_relevant_passage(query, db)
Markdown(passage)


[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\MN\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
