In [None]:
!pip install --upgrade faiss-cpu PyMuPDF



In [None]:
import os
import time
import json
import fitz
import faiss
import pickle
import numpy as np
from typing import List
# from dotenv import load_dotenv

import google.generativeai as genai
from google.api_core.exceptions import InternalServerError

from sentence_transformers import SentenceTransformer

In [None]:
from google.colab import userdata
api_key = userdata.get('my_api')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


###check supported Model

In [None]:
genai.configure(api_key=api_key)

In [None]:
for model_info in genai.list_models():
    print(f"Model name: {model_info.name}")
    print(f"Supported methods: {model_info.supported_generation_methods}")
    print("-" * 20)

Model name: models/embedding-gecko-001
Supported methods: ['embedText', 'countTextTokens']
--------------------
Model name: models/gemini-1.0-pro-vision-latest
Supported methods: ['generateContent', 'countTokens']
--------------------
Model name: models/gemini-pro-vision
Supported methods: ['generateContent', 'countTokens']
--------------------
Model name: models/gemini-1.5-pro-latest
Supported methods: ['generateContent', 'countTokens']
--------------------
Model name: models/gemini-1.5-pro-001
Supported methods: ['generateContent', 'countTokens', 'createCachedContent']
--------------------
Model name: models/gemini-1.5-pro-002
Supported methods: ['generateContent', 'countTokens', 'createCachedContent']
--------------------
Model name: models/gemini-1.5-pro
Supported methods: ['generateContent', 'countTokens']
--------------------
Model name: models/gemini-1.5-flash-latest
Supported methods: ['generateContent', 'countTokens']
--------------------
Model name: models/gemini-1.5-flash-00

In [None]:
Instruction_prompt= """  """

In [None]:
from typing import List
import os
import time
import pickle
import fitz  # PyMuPDF
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from langchain.chat_models import ChatOpenAI
from langchain.llms.base import LLM

# Optional: wrapper for Gemini
class GeminiLLM(LLM):
    def __init__(self, api_key: str, model_name: str, base_url: str):
        import google.generativeai as genai
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel(model_name=model_name)
        self.base_url = base_url  # Not used in Gemini's case but added for interface consistency

    @property
    def _llm_type(self) -> str:
        return "gemini"

    def _call(self, prompt: str, stop=None, run_manager=None) -> str:
        return self.model.generate_content(prompt).text


class NormalRAG:
    def __init__(self,
                 llm_name: str = "openai",  # "openai" or "gemini"
                 api_key: str = "",
                 model_name: str = "gpt-3.5-turbo",  # or "models/gemini-2.0-pro"
                 base_url: str = "",
                 embed_model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
                 instruction_prompt: str = "",
                 vectorstore_path: str = "./faiss_index"):

        if not api_key:
            raise ValueError("API key is missing.")

        self.instruction_prompt = instruction_prompt
        self.vectorstore_path = vectorstore_path
        self.llm_name = llm_name.lower()

        # Load embedder
        self.embedder = SentenceTransformer(embed_model_name)
        embedding_dim = self.embedder.encode(['test']).shape[1]
        self.index = faiss.IndexFlatL2(embedding_dim)
        self.documents = []

        # Initialize LLM
        self.llm = self._load_llm(llm_name, api_key, model_name, base_url)

    def _load_llm(self, llm_name: str, api_key: str, model_name: str, base_url: str) -> LLM:
        if llm_name == "openai":
            return ChatOpenAI(openai_api_key=api_key, model_name=model_name, base_url=base_url)
        elif llm_name == "gemini":
            return GeminiLLM(api_key=api_key, model_name=model_name, base_url=base_url)
        else:
            raise ValueError(f"Unsupported LLM provider: {llm_name}")

    def load_document(self, pdf_path: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
        doc = fitz.open(pdf_path)
        all_text = "".join([page.get_text() for page in doc])

        chunks = []
        start = 0
        while start < len(all_text):
            end = start + chunk_size
            chunks.append(all_text[start:end])
            start += chunk_size - overlap

        return chunks

    def add_document(self, chunks: List[str]):
        embeddings = self.embedder.encode(chunks, batch_size=32, convert_to_numpy=True)
        self.index.add(np.array(embeddings))
        self.documents.extend(chunks)

    def ask_question(self, query: str, top_k: int = 5) -> str:
        query_emb = self.embedder.encode([query], convert_to_numpy=True)
        D, I = self.index.search(np.array(query_emb), top_k)

        context = "\n".join([self.documents[i] for i in I[0] if i < len(self.documents)])
        full_prompt = f"""
            ### Instruction:
            {self.instruction_prompt}

            Use the following context to answer the question.
            Context:
            {context}

            Question: {query}
            Answer:"""

        for attempt in range(3):
            try:
                response = self.llm(full_prompt)
                return response
            except Exception as e:
                print(f"Error: {e}. Retrying in 5 seconds...")
                time.sleep(5)

        raise Exception("Failed to generate response after 3 retries.")

    def load_vectorstore(self):
        if os.path.exists(self.vectorstore_path + ".index") and os.path.exists(self.vectorstore_path + ".pkl"):
            self.index = faiss.read_index(self.vectorstore_path + ".index")
            with open(self.vectorstore_path + ".pkl", "rb") as f:
                self.documents = pickle.load(f)
            return True
        return False

    def save_vectorstore(self):
        faiss.write_index(self.index, self.vectorstore_path + ".index")
        with open(self.vectorstore_path + ".pkl", "wb") as f:
            pickle.dump(self.documents, f)


In [None]:
from rank_bm25 import BM25Okapi
import numpy as np
import faiss
import fitz
import pickle
import os
from typing import List, Tuple
from sentence_transformers import SentenceTransformer
import google.generativeai as genai
from google.api_core.exceptions import InternalServerError
import time

class HybridRAG:
    def __init__(self, api_key: str, model_name: str = "models/gemini-2.0-flash",
                 embed_model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
                 instruction_prompt: str = Instruction_prompt,
                 vectorstore_path: str = "/content/drive/MyDrive/gemini_rag/hybrid_faiss_index",
                 bm25_corpus_path: str = "/content/drive/MyDrive/gemini_rag/bm25_corpus.pkl"):

        if not api_key:
            raise ValueError("API key is missing.")

        self.instruction_prompt = instruction_prompt
        self.vectorstore_path = vectorstore_path
        self.bm25_corpus_path = bm25_corpus_path
        self.documents = []
        self.tokenized_corpus = []
        self.bm25 = None

        # Setup Gemini
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel(model_name=model_name)

        # Setup Embedder
        self.embedder = SentenceTransformer(embed_model_name)

        # Setup FAISS index
        embedding_dim = self.embedder.encode(['test']).shape[1]
        self.index = faiss.IndexFlatL2(embedding_dim)


    def load_document(self, pdf_path: str, chunk_size: int = 500, overlap: int = 50) -> List[str]:
        doc = fitz.open(pdf_path)
        all_text = ""
        for page in doc:
            all_text += page.get_text()

        chunks = []
        start = 0
        while start < len(all_text):
            end = start + chunk_size
            chunks.append(all_text[start:end])
            start += chunk_size - overlap

        return chunks

    def add_document(self, chunks: List[str]):
        # Add to FAISS index
        embeddings = self.embedder.encode(chunks, batch_size=32, convert_to_numpy=True)
        self.index.add(np.array(embeddings))

        # Add to documents list and prepare for BM25
        self.documents.extend(chunks)
        self.tokenized_corpus.extend([doc.split() for doc in chunks])
        self.bm25 = BM25Okapi(self.tokenized_corpus)


    def semantic_search(self, query: str, top_k: int = 5) -> List[Tuple[str, float]]:
        query_embedding = self.embedder.encode([query], convert_to_numpy=True)
        D, I = self.index.search(np.array(query_embedding), top_k)
        # Return list of (document, score) tuples
        return [(self.documents[i], float(D[0][j])) for j, i in enumerate(I[0]) if i < len(self.documents)]

    def bm25_search(self, query: str, top_k: int = 5) -> List[Tuple[str, float]]:
        tokenized_query = query.split()
        scores = self.bm25.get_scores(tokenized_query)
        top_indices = np.argsort(scores)[::-1][:top_k]
        # Return list of (document, score) tuples
        return [(self.documents[i], float(scores[i])) for i in top_indices]

    def hybrid_search(self, query: str, top_k: int = 5, alpha: float = 0.5) -> List[str]:
        sem_results = self.semantic_search(query, top_k=top_k * 2)
        bm25_results = self.bm25_search(query, top_k=top_k * 2)

        # Combine results and rerank (simple weighted sum of scores)
        combined_scores = {}
        for doc, score in sem_results:
            combined_scores[doc] = combined_scores.get(doc, 0) + (1 - alpha) * score # Lower score is better for L2

        for doc, score in bm25_results:
             # Assuming higher score is better for BM25, adjust if needed
             # Normalize BM25 scores - simple max normalization
            max_bm25_score = max([s for _, s in bm25_results]) if bm25_results else 1
            normalized_bm25_score = score / max_bm25_score if max_bm25_score > 0 else 0

            # For BM25, higher score is better, so use alpha * score.
            # To combine with L2 (where lower is better), we need a consistent metric.
            # Let's convert BM25 to a "distance" or "dissimilarity" for consistency with L2.
            # One way is to use 1 / (1 + score) or similar.
            # However, a simpler approach for combining is to rank and then combine ranks (Reciprocal Rank Fusion)
            # Let's stick to a weighted sum, but be mindful of score directions.
            # Assuming we want to rank documents such that a lower final score is better:
            # For semantic (L2), lower is better. (1-alpha) * L2_score
            # For BM25, higher is better. We need to invert this.
            # Let's use Reciprocal Rank Fusion (RRF) as it's generally more robust.

            # RRF Approach:
            # Assign ranks based on individual scores, then sum reciprocal ranks.
            pass # RRF implementation is more complex and requires re-ranking after initial search

        # Simpler weighted sum approach (requires score normalization or understanding score scales)
        # Let's assume for simplicity we normalize scores between 0 and 1 and higher is better for both after normalization
        # This requires re-evaluating the scoring logic in semantic_search and bm25_search to return similarity scores (higher is better)

        # Let's refine semantic_search and bm25_search to return similarity scores (higher is better)
        # For L2, we can use 1 / (1 + distance).
        # For BM25, the scores are already a form of similarity.

        # Let's retry the weighted sum with normalized scores.
        sem_results_normalized = {doc: 1 / (1 + score) for doc, score in sem_results} # Higher is better
        max_bm25_score = max([s for _, s in bm25_results]) if bm25_results else 1
        bm25_results_normalized = {doc: score / max_bm25_score if max_bm25_score > 0 else 0 for doc, score in bm25_results} # Higher is better

        combined_scores = {}
        for doc, score in sem_results_normalized.items():
            combined_scores[doc] = combined_scores.get(doc, 0) + (1 - alpha) * score

        for doc, score in bm25_results_normalized.items():
            combined_scores[doc] = combined_scores.get(doc, 0) + alpha * score

        # Sort by combined score (descending) and get top_k documents
        sorted_results = sorted(combined_scores.items(), key=lambda item: item[1], reverse=True)[:top_k]

        return [doc for doc, score in sorted_results]


    def ask_question(self, query: str, challange: str = "", top_k: int = 5) -> str:
        # Use hybrid search to get relevant context
        context_docs = self.hybrid_search(query, top_k=top_k)
        context = "\n".join(context_docs)

        prompt = f"""
            ### instruction prompt : (explanation : this text is your guideline don't mention it on response)
            {self.instruction_prompt}

            Use the following context to answer the question.
            Context:
            {context}

            Question: {query}
            Answer:"""

        for attempt in range(3):
            try:
                response = self.model.generate_content(prompt)
                return response.text
            except InternalServerError as e:
                print(f"Error: {e}. Retrying in 5 seconds...")
                time.sleep(5)

        raise Exception("Failed to generate after 3 retries.")

    def load_vectorstore(self):
        if os.path.exists(self.vectorstore_path + ".index") and os.path.exists(self.vectorstore_path + ".pkl") and os.path.exists(self.bm25_corpus_path):
            self.index = faiss.read_index(self.vectorstore_path + ".index")
            with open(self.vectorstore_path + ".pkl", "rb") as f:
                self.documents = pickle.load(f)
            with open(self.bm25_corpus_path, "rb") as f:
                self.tokenized_corpus = pickle.load(f)
                self.bm25 = BM25Okapi(self.tokenized_corpus)
            return True
        return False

    def save_vectorstore(self):
        faiss.write_index(self.index, self.vectorstore_path + ".index")
        with open(self.vectorstore_path + ".pkl", "wb") as f:
            pickle.dump(self.documents, f)
        with open(self.bm25_corpus_path, "wb") as f:
            pickle.dump(self.tokenized_corpus, f)

In [None]:
def load_and_add_documents(rag, pdf_files: list):
    for file in pdf_files:
        chunks = rag.load_document(file)
        rag.add_document(chunks)
    print(f"✅ Loaded and embedded {len(pdf_files)} document(s).")

def chat_loop(rag):
    print("type exit")
    exit_keywords = ['exit']

    while True:
        try:
            user_input = input('\nUser: ').strip()
            if user_input.lower() in [e.lower() for e in exit_keywords]:
                print('\n')
                break
            challenges = rag.generate_challenges(user_input)
            # give choosen challange to model
            response = rag.ask_question(user_input, challange= "")
            print('Bot:', response)
            print('-' * 50)
        except Exception as e:
            print(f"❌ error: {e}")
            continue


In [None]:
import argparse

def main():
    parser = argparse.ArgumentParser(description="Run RAG with specified parameters.")
    parser.add_argument('--data-name', type=str, default='all', help='Name of the data to use (e.g., all).')
    parser.add_argument('--emb-model', type=str, default="sentence-transformers/all-MiniLM-L6-v2", help='Name of the embedding model to use.')
    parser.add_argument('--RAG-type', type=str, default='Normal', help='Type of RAG to use (e.g., Normal, Hybrid).')
    args = parser.parse_args()

    if args.RAG_type.lower() == 'normal':
        rag = NormalRAG(api_key=api_key, embed_model_name=args.emb_model)
    elif args.RAG_type.lower() == 'hybrid':
        rag = HybridRAG(api_key=api_key, embed_model_name=args.emb_model)
    else:
        print(f"Unknown RAG type: {args.RAG_type}. Please choose 'Normal' or 'Hybrid'.")
        return

    if rag.load_vectorstore():
        print(f"📦 Loaded {args.RAG_type} vectorstore from Google Drive.")
    else:
        print(f"ℹ️ No saved {args.RAG_type} vectorstore found. Embedding from scratch.")
        # This part might also depend on data-name, you'll need to add logic here
        pdf_files = ["drive/MyDrive/gemini_rag/rag_file1.pdf"]
        load_and_add_documents(rag, pdf_files)
        rag.save_vectorstore()
        print(f"✅ {args.RAG_type} Vectorstore saved to Google Drive.")

    chat_loop(rag)

if __name__ == "__main__":
    main()