<a href="https://colab.research.google.com/github/Vinit-source/Python-RAG-Pipeline-for-PDF-Analysis/blob/main/code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q pypdf langchain langchain_community sentence-transformers faiss-cpu requests

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/313.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.2/313.2 kB[0m [31m11.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m70.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.3/31.3 MB[0m [31m69.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.2/45.2 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m139.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m85.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import os
os.environ["GEMINI_API_KEY"] = "AIzaSyDOpY9raCHWFY1lhNeqOdz4namc7iZpK9g"

In [None]:
import os
import requests
import sys

# --- Dependency Check ---
# We do this first to give a clear error message if a core dependency is missing.
try:
    from pypdf import PdfReader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain.embeddings import HuggingFaceEmbeddings
    from langchain.vectorstores import FAISS
    from sentence_transformers import SentenceTransformer # Import SentenceTransformer
except ImportError as e:
    print("--- DEPENDENCY ERROR ---")
    print(f"A required library is missing: {e.name}")
    print("Please install all required packages by running:")
    print("pip install pypdf langchain sentence-transformers faiss-cpu requests")
    print("------------------------\n")
    sys.exit(1)


# --- Configuration ---
# 1. Set your PDF Path
# IMPORTANT: Replace this with the actual path to your 120-page PDF file.
PDF_PATH = "Knowledge is Strength.pdf"
VECTOR_STORE_PATH = "faiss_index" # Folder to save/load the local vector store
EMBEDDING_MODEL_PATH = "local_embedding_model"
# 2. Set your Gemini API Key
# IMPORTANT: You can get a free API key from Google AI Studio.
# The script will prompt you to enter it if not set as an environment variable.
API_KEY = os.environ.get("GEMINI_API_KEY")

# --- Constants for the Generative Model ---
API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent"

# --- Helper Functions ---

def get_pdf_text(pdf_path):
    """
    Extracts text from a PDF file.

    Args:
        pdf_path (str): The path to the PDF file.

    Returns:
        str: The concatenated text from all pages of the PDF.
             Returns None if the file is not found.
    """
    if not os.path.exists(pdf_path):
        print(f"Error: PDF file not found at '{pdf_path}'")
        return None

    print("Extracting text from PDF...")
    text = ""
    try:
        pdf_reader = PdfReader(pdf_path)
        for page in pdf_reader.pages:
            page_text = page.extract_text()
            if page_text:
                text += page_text
    except Exception as e:
        print(f"Failed to read PDF. Error: {e}")
        return None
    print("Text extraction complete.")
    return text

def get_text_chunks(text):
    """
    Splits a long text into smaller, overlapping chunks.

    Args:
        text (str): The input text.

    Returns:
        list: A list of text chunks.
    """
    print("Splitting text into chunks...")
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,  # The size of each chunk in characters
        chunk_overlap=200 # The overlap between consecutive chunks
    )
    chunks = text_splitter.split_text(text)
    print(f"Created {len(chunks)} text chunks.")
    return chunks

def get_vector_store(text_chunks, embeddings):
    """
    Creates embeddings for text chunks and stores them in a FAISS vector store.

    Args:
        text_chunks (list): A list of text chunks.
        embeddings: The embedding model to use.

    Returns:
        FAISS: A vector store object containing the embeddings.
    """
    print("Creating new vector store...")
    try:
        vector_store = FAISS.from_texts(texts=text_chunks, embedding=embeddings)
        print("Vector store created successfully.")
        return vector_store
    except Exception as e:
        print(f"Failed to create vector store. Error: {e}")
        if "Could not import faiss" in str(e):
            print("\n--- HINT ---")
            print("This error often means the 'faiss' library is not correctly installed.")
            print("Please ensure you have run 'pip install faiss-cpu' or 'pip install faiss-gpu'.")
            print("--------------\n")
        return None

def generate_response(context, question, api_key):
    """
    Generates a response using the Gemini API based on the provided context and question.

    Args:
        context (str): The relevant text retrieved from the document.
        question (str): The user's question.
        api_key (str): The Gemini API key.

    Returns:
        str: The generated answer from the language model.
    """
    prompt = f"""
    Based on the following context from a document, please provide a clear and concise answer to the question.
    If the context does not contain the answer, state that the information is not available in the document.

    Context:
    ---
    {context}
    ---

    Question: {question}

    Answer:
    """

    headers = {'Content-Type': 'application/json'}
    payload = {
        "contents": [{
            "parts": [{"text": prompt}]
        }]
    }

    try:
        response = requests.post(f"{API_URL}?key={api_key}", headers=headers, json=payload, timeout=60)
        response.raise_for_status() # Raises an HTTPError for bad responses (4xx or 5xx)

        result = response.json()

        if (result.get('candidates') and
            result['candidates'][0].get('content') and
            result['candidates'][0]['content'].get('parts')):
            return result['candidates'][0]['content']['parts'][0]['text'].strip()
        else:
            return "Could not generate an answer. The response from the model was empty or malformed."

    except requests.exceptions.RequestException as e:
        return f"API request failed: {e}"
    except Exception as e:
        return f"An unexpected error occurred during generation: {e}"


def handle_user_query(vector_store, question, api_key):
    """
    Handles the user's query by performing retrieval and generation.

    Args:
        vector_store (FAISS): The vector store for the document.
        question (str): The user's question.
        api_key (str): The Gemini API key.
    """
    if not question:
        print("Please enter a question.")
        return

    print("Searching for relevant context...")
    try:
        docs = vector_store.similarity_search(question, k=5)
        context = "\n\n".join([d.page_content for d in docs])

        print("Generating answer...")
        answer = generate_response(context, question, api_key)

        print("\n--- Answer ---")
        print(answer)
        print("--------------\n")

    except Exception as e:
        print(f"An error occurred while handling the query: {e}")


def main():
    """
    Main function to run the RAG pipeline.
    """
    print("--- RAG PDF Analysis Pipeline ---")

    # Check for PDF file
    if not os.path.exists(PDF_PATH):
        print(f"\nFATAL ERROR: The file '{PDF_PATH}' was not found.")
        print("Please update the 'PDF_PATH' variable in the script with the correct file path.")
        sys.exit(1)

    # Get API Key
    global API_KEY
    if not API_KEY:
        API_KEY = input("Please enter your Gemini API key: ").strip()
        if not API_KEY:
            print("\nFATAL ERROR: API key is required.")
            sys.exit(1)

    # Initialize embedding model
    # This uses a popular, open-source embedding model that runs locally.
    # The first time you run this, it will download the model (a few hundred MB).
    embeddings = None
    # --- Load or Create Embedding Model ---
    if os.path.exists(EMBEDDING_MODEL_PATH):
        print(f"Loading embedding model from '{EMBEDDING_MODEL_PATH}'...")
        try:
            # Load the SentenceTransformer model directly
            model = SentenceTransformer(EMBEDDING_MODEL_PATH)
            embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL_PATH, model_kwargs={'model': model})
            print("Embedding model loaded successfully.")
        except Exception as e:
            print(f"Failed to load embedding model. Error: {e}. Will download/create a new one.")
            # Ensure embeddings is None so a new one is created
            embeddings = None

    if not embeddings:
        # Initialize embedding model (downloads if not cached)
        print("Initializing/downloading embedding model (this may take a moment)...")
        model_name = "sentence-transformers/all-MiniLM-L6-v2"
        embeddings = HuggingFaceEmbeddings(model_name=model_name)

        # Explicitly save the underlying SentenceTransformer model
        try:
            print(f"Saving embedding model to '{EMBEDDING_MODEL_PATH}'...")
            # Access the client attribute which holds the SentenceTransformer model
            embeddings.client.save_pretrained(EMBEDDING_MODEL_PATH)
            print("Embedding model saved.")
        except Exception as e:
            print(f"Warning: Could not save embedding model to '{EMBEDDING_MODEL_PATH}'. Error: {e}")

    vector_store = None
    # --- Load or Create Vector Store ---
    if os.path.exists(VECTOR_STORE_PATH):
        # Load the existing vector store from disk
        print(f"Loading existing vector store from '{VECTOR_STORE_PATH}'...")
        try:
            # The 'allow_dangerous_deserialization' flag is needed for loading FAISS with langchain.
            vector_store = FAISS.load_local(VECTOR_STORE_PATH, embeddings, allow_dangerous_deserialization=True)
            print("Vector store loaded successfully.")
        except Exception as e:
            print(f"Failed to load vector store. Error: {e}. Will create a new one.")

    if not vector_store:
        # If loading failed or store doesn't exist, create a new one
        raw_text = get_pdf_text(PDF_PATH)
        if not raw_text:
            sys.exit(1)

        text_chunks = get_text_chunks(raw_text)
        vector_store = get_vector_store(text_chunks, embeddings)

        if vector_store:
            # Save the newly created vector store to disk
            print(f"Saving vector store to '{VECTOR_STORE_PATH}'...")
            vector_store.save_local(VECTOR_STORE_PATH)
            print("Vector store saved.")

    if not vector_store:
        print("\nFailed to initialize the RAG pipeline. Exiting.")
        sys.exit(1)

    print("\nSetup complete. You can now ask questions about your document.")

    # --- Interactive Q&A Loop ---
    try:
        while True:
            question = input("Ask a question (or type 'exit' to quit): ")
            if question.lower() == 'exit':
                break
            handle_user_query(vector_store, question, API_KEY)
    except KeyboardInterrupt:
        print("\nExiting...")

if __name__ == '__main__':
    main()


--- RAG PDF Analysis Pipeline ---
Initializing/downloading embedding model (this may take a moment)...


  embeddings = HuggingFaceEmbeddings(model_name=model_name)
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Saving embedding model to 'local_embedding_model'...
Embedding model saved.
Extracting text from PDF...
Text extraction complete.
Splitting text into chunks...
Created 236 text chunks.
Creating new vector store...


  return forward_call(*args, **kwargs)


Vector store created successfully.
Saving vector store to 'faiss_index'...
Vector store saved.

Setup complete. You can now ask questions about your document.
Searching for relevant context...
Generating answer...

--- Answer ---
According to the dualists, souls attain omniscience and omnipotence when they reach Brahmaloka, the sphere of Brahmâ.
--------------

Searching for relevant context...
Generating answer...

--- Answer ---
Those that are very spiritual, when they die, follow the solar rays and reach what is called the solar sphere, through which they reach what is called the lunar sphere, and through that they reach what is called the sphere of lightning, where they meet another soul who guides them to the Brahmaloka, the sphere of Brahmâ.
--------------

Searching for relevant context...
Generating answer...

--- Answer ---
The next class of persons, who have been doing good work with selfish motives, are carried by the results of their good works, when they die, to what is ca

In [None]:
from google.colab import files
import os
import shutil

folder_path = '/content/local_embedding_model'
zip_path = '/content/local_embedding_model.zip'

# Create a zip archive of the folder
try:
    shutil.make_archive(zip_path.replace('.zip', ''), 'zip', folder_path)
    print(f"Folder '{folder_path}' zipped to '{zip_path}'")

    # Download the zip file
    files.download(zip_path)
    print(f"Downloading '{zip_path}'...")

except FileNotFoundError:
    print(f"Error: Folder not found at '{folder_path}'")
except Exception as e:
    print(f"An error occurred: {e}")

Folder '/content/local_embedding_model' zipped to '/content/local_embedding_model.zip'


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Downloading '/content/local_embedding_model.zip'...
