In [4]:
# Import necessary libraries
from langchain.docstore.document import Document as LangchainDocument
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.document_loaders import PyPDFLoader,PyPDFDirectoryLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter,TokenTextSplitter
from langchain.prompts import PromptTemplate
from transformers import AutoTokenizer, pipeline
from sentence_transformers import CrossEncoder

import os
from tqdm import tqdm

In [2]:
# Initialize Hugging Face embeddings
EMBEDDING_MODEL_NAME ='sentence-transformers/all-MiniLM-L6-v2'
embedding_model = HuggingFaceEmbeddings(
    model_name=EMBEDDING_MODEL_NAME,
    multi_process=True,
    encode_kwargs={"normalize_embeddings": True}  # For cosine similarity
)

In [5]:
# Initialize lightweight reranker
RERANKER_MODEL_NAME = "cross-encoder/ms-marco-MiniLM-L-6-v2"
reranker = CrossEncoder(RERANKER_MODEL_NAME)

In [7]:
import os
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
from transformers import AutoTokenizer

# Configuration
PDF_PATH = os.path.abspath("data")  # Absolute path
FAISS_INDEX_PATH = os.path.abspath("faiss_index")  # Absolute path
EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200

# Initialize embedding model and tokenizer
try:
    # Use HuggingFaceEmbeddings instead of SentenceTransformer
    embedding_model = HuggingFaceEmbeddings(
        model_name=EMBEDDING_MODEL_NAME,
        model_kwargs={"device": "cpu"},  # Adjust to "cuda" if GPU is available
        encode_kwargs={"normalize_embeddings": True}
    )
    tokenizer = AutoTokenizer.from_pretrained(EMBEDDING_MODEL_NAME)
    print("Initialized embedding model and tokenizer.")
except Exception as e:
    raise Exception(f"Failed to load embedding model or tokenizer: {str(e)}")

# Load or create FAISS vector database
if os.path.exists(os.path.join(FAISS_INDEX_PATH, "index.faiss")):
    try:
        KNOWLEDGE_VECTOR_DATABASE = FAISS.load_local(
            FAISS_INDEX_PATH, embedding_model, allow_dangerous_deserialization=True
        )
        print("Loaded existing FAISS index.")
    except Exception as e:
        print(f"Failed to load FAISS index: {str(e)}")
        raise
else:
    print("FAISS index not found. Creating a new one...")
    # Verify PDF exists
    if not os.path.exists(PDF_PATH):
        raise FileNotFoundError(f"PDF file not found at {PDF_PATH}")

    # Load PDF
    try:
        loader = PyPDFDirectoryLoader(PDF_PATH)
        RAW_KNOWLEDGE_BASE = loader.load()
        print(f"Loaded PDF with {len(RAW_KNOWLEDGE_BASE)} pages.")
    except Exception as e:
        raise Exception(f"Failed to load PDF: {str(e)}")

    # Initialize text splitter
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP,
        length_function=lambda x: len(tokenizer.encode(x, add_special_tokens=False)),
        separators=[
        "\n#{1,6} ",
        "```\n",
        "\n\\*\\*\\*+\n",
        "\n---+\n",
        "\n___+\n",
        "\n\n",
        "\n",
        " ",
        ".",
    ]
    )

    # Split documents
    try:
        docs_processed = text_splitter.split_documents(RAW_KNOWLEDGE_BASE)
        print(f"Split into {len(docs_processed)} document chunks.")
    except Exception as e:
        raise Exception(f"Failed to split documents: {str(e)}")

    # Create and save FAISS index
    try:
        KNOWLEDGE_VECTOR_DATABASE = FAISS.from_documents(
            docs_processed, embedding_model, distance_strategy="COSINE"
        )
        os.makedirs(FAISS_INDEX_PATH, exist_ok=True)  # Ensure directory exists
        KNOWLEDGE_VECTOR_DATABASE.save_local(FAISS_INDEX_PATH)
        print(f"Created and saved new FAISS index at {FAISS_INDEX_PATH}.")
    except Exception as e:
        raise Exception(f"Failed to create/save FAISS index: {str(e)}")

Initialized embedding model and tokenizer.
FAISS index not found. Creating a new one...
Loaded PDF with 10 pages.
Split into 10 document chunks.
Created and saved new FAISS index at /home/cs/Desktop/Project/Astu_RAG_Chat/faiss_index.


In [None]:
# Initialize DistilGPT-2
READER_MODEL_NAME = "distilgpt2"
READER_LLM = pipeline(
    task="text-generation",
    model=READER_MODEL_NAME,
    tokenizer=AutoTokenizer.from_pretrained(READER_MODEL_NAME),
    do_sample=True,
    temperature=0.7,
    repetition_penalty=1.1,
    max_new_tokens=100
)

Device set to use cpu


: 

In [None]:
from langchain.vectorstores import FAISS
from sentence_transformers import CrossEncoder
from transformers import pipeline
import numpy as np

# Define RAG prompt template
# Define optimized RAG prompt
prompt_in_chat_format = [
    {
        "role": "system",
        "content": """You are an expert on Adama Science and Technology University (ASTU) policies. Using only the provided context from ASTU policy documents, provide a concise and accurate answer to the question. Include the source document number if relevant. If the answer cannot be found in the context, respond with "I don't know" and nothing else. Do not generate information beyond the context."""
    },
    {
        "role": "user",
        "content": """Context:
{context}
---
Question: {question}"""
    },
]
RAG_PROMPT_TEMPLATE = PromptTemplate.from_template(
    """{system}\n\n{user}""".format(
        system=prompt_in_chat_format[0]["content"],
        user=prompt_in_chat_format[1]["content"]
    )
)

def answer_with_rag(
    question: str,
    llm: pipeline,
    knowledge_index: FAISS,
    reranker: CrossEncoder,
    num_retrieved_docs: int = 5,
    num_docs_final: int = 2
):
   
    try:
        # Validate inputs
        if not isinstance(knowledge_index, FAISS):
            raise ValueError("knowledge_index must be a FAISS vector store")
        if not isinstance(reranker, CrossEncoder):
            raise ValueError("reranker must be a CrossEncoder model")
        if not question.strip():
            raise ValueError("Question cannot be empty")

        # Retrieve documents
        print(f"=> Retrieving top {num_retrieved_docs} documents...")
        relevant_docs = knowledge_index.similarity_search_with_score(query=question, k=num_retrieved_docs)
        
        # Extract documents and scores (convert cosine distance to similarity)
        docs_with_scores = [(doc.page_content, 1 - score) for doc, score in relevant_docs]  # Convert distance to similarity
        doc_texts = [doc for doc, _ in docs_with_scores]
        doc_scores = [score for _, score in docs_with_scores]
        
        # Rerank documents
        print("=> Reranking documents...")
        pairs = [[question, doc] for doc in doc_texts]
        rerank_scores = reranker.predict(pairs)
        
        # Normalize rerank scores (if not already normalized)
        rerank_scores = (rerank_scores - np.min(rerank_scores)) / (np.max(rerank_scores) - np.min(rerank_scores) + 1e-10)
        
        # Combine scores (higher is better)
        combined_scores = [0.7* rerank_score + 0.3 * doc_score for rerank_score, doc_score in zip(rerank_scores, doc_scores)]
        sorted_docs = [doc for _, doc in sorted(zip(combined_scores, doc_texts), reverse=True)]
        
        # Select top documents
        selected_docs = sorted_docs[:min(num_docs_final, len(sorted_docs))]
        print(f"=> Selected {len(selected_docs)} documents for context.")
        
        # Build context
        context = "\nExtracted documents:\n"
        context += "".join([f"Document {str(i)}:::\n{doc}\n" for i, doc in enumerate(selected_docs)])
        
        # Generate answer
        print("=> Generating answer...")
        final_prompt = RAG_PROMPT_TEMPLATE.format(question=question, context=context)
        answer = llm(final_prompt, max_length=1000, num_return_sequences=10, temperature=0.5)[0]["generated_text"]
        
        return answer.strip()
    
    except Exception as e:
        print(f"Error in RAG pipaeline: {str(e)}")
        raise

query = "tell me about addmission"
answer = answer_with_rag(query, READER_LLM, KNOWLEDGE_VECTOR_DATABASE, reranker)
print(f"Answer: {answer}")

=> Retrieving top 5 documents...
=> Reranking documents...


Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.
Both `max_new_tokens` (=100) and `max_length`(=1000) seem to have been set. `max_new_tokens` will take precedence. Please refer to the documentation for more information. (https://huggingface.co/docs/transformers/main/en/main_classes/text_generation)


=> Selected 2 documents for context.
=> Generating answer...
