# Loading PDF file

In [1]:
import streamlit as st
import os
os.environ["GEMINI_API_KEY"]=st.secrets["GEMINI_API_KEY"] 

In [2]:
from pypdf import PdfReader

def load_pdf(file_path):
    """
    Reads the text content from a PDF file and returns it as a single string.

    Parameters:
    - file_path (str): The file path to the PDF file.

    Returns:
    - str: The concatenated text content of all pages in the PDF.

    Raises:
    - FileNotFoundError: If the specified file_path does not exist.
    - PyPDF2.utils.PdfReadError: If the PDF file is encrypted or malformed.

    Example:
    >>> pdf_text = load_pdf("example.pdf") 
    >>> print(pdf_text)
    "This is the text content extracted from the PDF file."
    """
    # Logic to read pdf
    reader = PdfReader(file_path)

    # Loop over each page and store it in a variable
    text = ""
    for page in reader.pages:
        text += page.extract_text()

    return text

text = load_pdf(file_path="/home/shuaib/Desktop/School/AI_programming/Gemini_RAG/data/state_of_the_union.pdf")

In [3]:
pdf_text = load_pdf(file_path="/home/shuaib/Desktop/School/AI_programming/Gemini_RAG/data/state_of_the_union.pdf")

# Splitting the text

In [4]:
import re

# Function to split the text into chunks
def split_text(text: str, chunk_size: int = 10000, chunk_overlap: int = 500):
    """
    Splits a text string into smaller chunks based on sentence boundaries while ensuring
    that each chunk does not exceed a specified size. The function also allows for a 
    slight overlap between chunks to maintain context.

    Parameters:
    - text (str): The input text to be split.
    - chunk_size (int): The maximum size of each chunk in characters (default is 10000).
    - chunk_overlap (int): The number of overlapping characters between consecutive chunks (default is 500).

    Returns:
    - List[str]: A list containing text chunks that do not exceed the specified chunk size.
    """
    # Split text into sentences using regular expressions
    sentences = re.split(r'(?<=[.!?])\s+', text)
    chunks = []
    current_chunk = ""

    # Iterate over sentences and build chunks
    for sentence in sentences:
        if len(current_chunk) + len(sentence) < chunk_size:
            current_chunk += " " + sentence
        else:
            # Add the current chunk to the list and start a new chunk
            chunks.append(current_chunk.strip())
            current_chunk = sentence

    # Add the last chunk if it exists
    if current_chunk:
        chunks.append(current_chunk.strip())

    return chunks


In [5]:
chunked_text = split_text(pdf_text)

# Embedding the text

In [6]:
import google.generativeai as genai
from chromadb import Documents, EmbeddingFunction, Embeddings
import os

class GeminiEmbeddingFunction(EmbeddingFunction):
    """
    Custom embedding function using the Gemini AI API for document retrieval.

    This class extends the EmbeddingFunction class and implements the __call__ method
    to generate embeddings for a given set of documents using the Gemini AI API.

    Parameters:
    - input (Documents): A collection of documents to be embedded.

    Returns:
    - Embeddings: Embeddings generated for the input documents.

    Raises:
    - ValueError: If the Gemini API Key is not provided as an environment variable (GEMINI_API_KEY).

    Example:
    >>> gemini_embedding_function = GeminiEmbeddingFunction()
    >>> input_documents = Documents(["Document 1", "Document 2", "Document 3"])
    >>> embeddings_result = gemini_embedding_function(input_documents)
    >>> print(embeddings_result)
    Embeddings for the input documents generated by the Gemini AI API.
    """
    def __call__(self, input: Documents) -> Embeddings:
        gemini_api_key = os.getenv("GEMINI_API_KEY")
        if not gemini_api_key:
            raise ValueError("Gemini API Key not provided. Please provide GEMINI_API_KEY as an environment variable")
        genai.configure(api_key=gemini_api_key)
        model = "models/embedding-001"
        title = "Custom query"
        return genai.embed_content(model=model,
                                   content=input,
                                   task_type="retrieval_document",
                                   title=title)["embedding"]


# Storing vectors into DB

In [7]:
import chromadb
from typing import List
def create_chroma_db(documents:List, path:str, name:str):
    """
    Creates a Chroma database using the provided documents, path, and collection name.

    Parameters:
    - documents: An iterable of documents to be added to the Chroma database.
    - path (str): The path where the Chroma database will be stored.
    - name (str): The name of the collection within the Chroma database.

    Returns:
    - Tuple[chromadb.Collection, str]: A tuple containing the created Chroma Collection and its name.
    """
    chroma_client = chromadb.PersistentClient(path=path)
    db = chroma_client.create_collection(name=name, embedding_function=GeminiEmbeddingFunction())

    for i, d in enumerate(documents):
        db.add(documents=d, ids=str(i))

    return db, name

db,name =create_chroma_db(documents=chunked_text, 
                          path="/home/shuaib/Desktop/School/AI_programming/RAG/content",
                          name="rag_experiment")

UniqueConstraintError: Collection rag_experiment already exists

In [22]:
import chromadb

# Function to create or load ChromaDB and store vectors
def create_or_load_chroma_db(documents, path: str, name: str):
    """
    Creates or loads a Chroma database collection using the provided documents, path, and collection name.
    If the collection already exists, it will be loaded; otherwise, a new collection will be created.

    Parameters:
    - documents: An iterable of documents to be added to the Chroma database if creating a new collection.
    - path (str): The path where the Chroma database will be stored or accessed.
    - name (str): The name of the collection within the Chroma database.

    Returns:
    - Tuple[chromadb.Collection, str]: A tuple containing the Chroma Collection and its name.
    """
    # Initialize ChromaDB client with the specified path
    chroma_client = chromadb.PersistentClient(path=path)
    
    # Get the names of existing collections
    existing_collections = chroma_client.list_collections()
    collection_names = [collection.name for collection in existing_collections]

    # Check if the collection already exists
    if name in collection_names:
        st.write(f"Loading existing collection: {name}")
        # Load the existing collection
        db = chroma_client.get_collection(name=name, embedding_function=GeminiEmbeddingFunction())
    else:
        st.write(f"Creating new collection: {name}")
        # Create a new collection and add the documents
        db = chroma_client.create_collection(name=name, embedding_function=GeminiEmbeddingFunction())
        for i, d in enumerate(documents):
            db.add(documents=d, ids=str(i))
    
    return db, name


In [23]:
db,name =create_or_load_chroma_db(documents=text, path="/home/shuaib/Desktop/School/AI_programming/RAG/content", name="rag_experiment")

2024-09-04 10:57:03.519 
  command:

    streamlit run /home/shuaib/.local/lib/python3.10/site-packages/ipykernel_launcher.py [ARGUMENTS]


# Retrieval

In [24]:
def get_relevant_passage(query: str, db, n_results: int):
    """
    Retrieves the most relevant passage from the database based on the provided query.

    Parameters:
    - query (str): The search query to find the relevant passage.
    - db: The Chroma database collection to query.
    - n_results (int): The number of results to retrieve from the query.

    Returns:
    - str: The most relevant passage based on the query, or an empty string if no relevant passages are found.
    """
    # Query the database with the given search query and number of results
    results = db.query(query_texts=[query], n_results=n_results)
    
    # Extract the most relevant passage from the query results
    # If there are no documents found, return an empty string
    passage = results['documents'][0] if results['documents'] else ""
    
    return passage


In [25]:
relevant_text = get_relevant_passage("Sanctions on Russia",db,3)

Number of requested results 3 is greater than number of elements in index 1, updating n_results = 1


# Generation

In [26]:
import google.generativeai as genai
import os

def generate_response(query: str, relevant_passages: list):
    """
    Generates a detailed response to a query based on provided relevant passages using the Gemini AI model.

    Parameters:
    - query (str): The question or query that needs to be answered.
    - relevant_passages (list): A list of relevant passages or context to use for generating the response.

    Returns:
    - str: The generated response from the Gemini AI model.
    """
    # Retrieve the Gemini API key from environment variables
    gemini_api_key = os.getenv("GEMINI_API_KEY")
    if not gemini_api_key:
        raise ValueError("Gemini API Key not provided. Please provide GEMINI_API_KEY as an environment variable")
    
    # Configure the generative AI model with the provided API key
    genai.configure(api_key=gemini_api_key)
    model = genai.GenerativeModel('gemini-pro')
    
    # Combine relevant passages into a single context block
    context = "\n\n".join(relevant_passages)
    
    # Create a prompt template that includes the context and the query
    prompt_template = f"""
    Answer the question as detailed as possible from the provided context,
    making sure to provide all the details. If the answer is not in the provided context, 
    just say, "The answer is not available in the context."
    
    Context:
    {context}
    
    Question:
    {query}
    
    Answer:
    """
    
    # Generate the content using the AI model based on the prompt template
    answer = model.generate_content(prompt_template)
    
    return answer.text


# Bringing it all together

In [28]:

def generate_answer(db,query    ):
    #retrieve top 3 relevant text chunks
    relevant_text = get_relevant_passage(query,db,n_results=3)
    answer = generate_response(query, relevant_text)

    return answer

In [29]:
db, _ =create_or_load_chroma_db(documents=text, path="/home/shuaib/Desktop/School/AI_programming/Gemini_RAG/content", name="rag_experiment")
answer = generate_answer(db, query="Sanctions on Russia")
print(answer)
