In [6]:
# -*- coding: utf-8 -*-
"""
Cybersecurity RAG System with Gemini 2.5 + Gradio Custom UI + Re-ranking

This updated version integrates a re-ranking step using a Cross-Encoder
model to improve the relevance of retrieved document chunks before they are
sent to the Large Language Model (LLM).
"""
!pip install PyPDF2 faiss-cpu transformers torch gradio sentence-transformers

import PyPDF2
from transformers import AutoTokenizer, AutoModel
import torch
import faiss
import numpy as np
import gradio as gr
import os
import requests
import json
from sentence_transformers import CrossEncoder
PDF_FILES = {
    "PDF 1": "CYBER SECURITY (R18A0521).pdf",
    "PDF 2": "Introduction to Cybersecurity.pdf",
    "PDF 3": "cybersecuirty_sb_factsheets_all.pdf"
}
EMBEDDING_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
RERANKER_MODEL_NAME = 'cross-encoder/ms-marco-MiniLM-L-6-v2'
GEMINI_API_KEY = "" #Your gemini-API key
GEMINI_MODEL_NAME = "gemini-2.5-pro"

TOP_K_INITIAL_RETRIEVAL = 10
TOP_K_AFTER_RERANK = 3
vector_indexes = {}
text_chunks_map = {}
tokenizer = None
model = None
reranker_model = None
def extract_text_from_pdf(path):
    try:
        with open(path, 'rb') as file:
            reader = PyPDF2.PdfReader(file)
            full_text = ""
            for page in reader.pages:
                page_text = page.extract_text()
                if page_text:
                    full_text += page_text + "\n"
            return full_text
    except Exception as e:
        print(f"Error reading {path}: {e}")
        return ""

def chunk_text(text, chunk_size=500, overlap=50):
    if not text.strip():
        return []
    words = text.split()
    return [" ".join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size - overlap)]
def initialize_embedding_model():
    global tokenizer, model
    try:
        tokenizer = AutoTokenizer.from_pretrained(EMBEDDING_MODEL_NAME)
        model = AutoModel.from_pretrained(EMBEDDING_MODEL_NAME)
        print(f"Embedding model '{EMBEDDING_MODEL_NAME}' loaded successfully.")
    except Exception as e:
        print(f"Error loading embedding model: {e}")
        tokenizer, model = None, None

def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output[0]
    mask = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    return torch.sum(token_embeddings * mask, 1) / torch.clamp(mask.sum(1), min=1e-9)

def get_embeddings(texts):
    if tokenizer is None or model is None:
        print("Embedding model not initialized. Cannot generate embeddings.")
        return np.array([])
    encoded = tokenizer(texts, padding=True, truncation=True, return_tensors='pt')
    with torch.no_grad():
        model_out = model(**encoded)
    pooled = mean_pooling(model_out, encoded['attention_mask'])
    return torch.nn.functional.normalize(pooled, p=2, dim=1).cpu().numpy()

def build_index(chunks):
    if not chunks:
        print("No chunks to build FAISS index.")
        return None
    embeddings = get_embeddings(chunks)
    if embeddings.size == 0:
        print("No embeddings generated for FAISS index.")
        return None
    dim = embeddings.shape[1]
    index = faiss.IndexFlatL2(dim)
    index.add(embeddings)
    print(f"FAISS index built with {index.ntotal} vectors.")
    return index

def retrieve_chunks(index, chunks, query_embed, top_k=TOP_K_INITIAL_RETRIEVAL):
    """
    Retrieves top_k_initial_retrieval chunks from FAISS. This is the first pass.
    """
    if index is None:
        print("FAISS index is not available for retrieval.")
        return []
    D, I = index.search(query_embed.reshape(1, -1), top_k)
    # Filter out invalid indices (e.g., if top_k is larger than available chunks)
    valid_indices = [i for i in I[0] if i < len(chunks)]
    retrieved_content = [chunks[i] for i in valid_indices]
    print(f"Initial retrieval from FAISS returned {len(retrieved_content)} chunks.")
    return retrieved_content

def initialize_reranker_model():
    global reranker_model
    try:
        reranker_model = CrossEncoder(RERANKER_MODEL_NAME)
        print(f"Re-ranker model '{RERANKER_MODEL_NAME}' loaded successfully.")
    except Exception as e:
        print(f"Error loading re-ranker model: {e}")
        reranker_model = None

def rerank_chunks(query, chunks, top_k_rerank=TOP_K_AFTER_RERANK):
    """
    Re-ranks a list of text chunks based on their relevance to the query
    using the cross-encoder model.
    """
    if reranker_model is None:
        print("Re-ranker model not initialized. Skipping re-ranking.")
        return chunks[:top_k_rerank] # Fallback to simply taking top_k from initial retrieval

    if not chunks:
        return []

    sentence_pairs = [[query, chunk] for chunk in chunks]

    scores = reranker_model.predict(sentence_pairs)

    scored_chunks = sorted(zip(scores, chunks), key=lambda x: x[0], reverse=True)
    reranked_content = [chunk for score, chunk in scored_chunks[:top_k_rerank]]
    print(f"Re-ranking reduced to {len(reranked_content)} chunks.")
    return reranked_content
def call_gemini(prompt):
    url = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_MODEL_NAME}:generateContent?key={GEMINI_API_KEY}"
    payload = {
        "contents": [{"role": "user", "parts": [{"text": prompt}]}],
        "generationConfig": {
            "temperature": 0.7,
            "maxOutputTokens": 500
        }
    }
    try:
        res = requests.post(url, headers={"Content-Type": "application/json"}, data=json.dumps(payload))
        res.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
        data = res.json()
        # Ensure the 'parts' exist before accessing
        if data and data.get('candidates') and data['candidates'][0].get('content') and data['candidates'][0]['content'].get('parts'):
            return data['candidates'][0]['content']['parts'][0]['text']
        else:
            print(f"Unexpected Gemini API response structure: {data}")
            return "⚠️ Gemini API: Unexpected response structure."
    except requests.exceptions.RequestException as e:
        print(f"⚠️ Gemini API network error: {e}")
        return f"⚠️ Gemini API network error: {e}"
    except json.JSONDecodeError as e:
        print(f"⚠️ Gemini API JSON decode error: {e}. Response text: {res.text if 'res' in locals() else 'N/A'}")
        return f"⚠️ Gemini API JSON decode error: {e}"
    except Exception as e:
        print(f"⚠️ An unexpected error occurred during Gemini call: {e}")
        return f"⚠️ An unexpected error occurred: {e}"
def rag_answer_question(user_question, mode):
    if not user_question.strip():
        return "❗ Please enter a valid question."

    if tokenizer is None or model is None:
        initialize_embedding_model()
    if reranker_model is None:
        initialize_reranker_model()

    if not vector_indexes or all(v is None for v in vector_indexes.values()):
        return "⚠️ PDF indexes not initialized or all failed. Please ensure PDFs are correctly processed."

    query_embed = get_embeddings([user_question])
    if query_embed.size == 0:
        return "⚠️ Could not generate embedding for the question."

    results_output = ""

    for pdf_label, index in vector_indexes.items():
        chunks = text_chunks_map.get(pdf_label)
        if not chunks or index is None: # Check if chunks or index are valid for this PDF
            results_output += f"📄 **{pdf_label}**: No valid text chunks or index available.\n\n---\n\n"
            continue

        initial_retrieved_chunks = retrieve_chunks(index, chunks, query_embed)
        final_retrieved_chunks = rerank_chunks(user_question, initial_retrieved_chunks)

        if not final_retrieved_chunks:
            if mode == "Gemini Answer":
                prompt_no_context = f"""
You are a cybersecurity expert.
The user asked: "{user_question}".
I could not find specific, relevant information related to this question within the document titled "{pdf_label}" even after thorough search.
Please answer the question using your general cybersecurity knowledge, or state if you cannot answer it at all.
"""
                response = call_gemini(prompt_no_context)
                results_output += f"📄 **{pdf_label}** (General Knowledge / No Specific Context Found):\n{response.strip()}\n\n---\n\n"
            elif mode == "Raw PDF":
                results_output += f"📄 **{pdf_label}** (Raw PDF Chunks): No highly relevant info found for '{user_question}' in this PDF.\n\n---\n\n"
            continue

        if mode == "Gemini Answer":
            context = "\n\n".join(final_retrieved_chunks)
            if not context.strip():
                 prompt_no_context_from_retrieval = f"""
You are a cybersecurity expert.
The user asked: "{user_question}".
While some document parts were retrieved from "{pdf_label}", they did not contain useful information.
Please answer the question using your general cybersecurity knowledge, or state if you cannot answer it at all.
"""
                 response = call_gemini(prompt_no_context_from_retrieval)
                 results_output += f"📄 **{pdf_label}** (General Knowledge / Retrieved Context Empty):\n{response.strip()}\n\n---\n\n"
            else:
                prompt_with_context = f"""
You are a cybersecurity expert.
Use the below context from the document "{pdf_label}" to answer the user question. If the answer isn't available in this context, clearly state that you cannot answer based on this specific document's context and then attempt to answer using your general cybersecurity knowledge.

Context from {pdf_label}:
{context}

Question: {user_question}
Answer:
"""
                response = call_gemini(prompt_with_context)
                results_output += f"📄 **{pdf_label}**:\n{response.strip()}\n\n---\n\n"

        elif mode == "Raw PDF":
            results_output += f"📄 **{pdf_label}** (Re-ranked PDF Chunks):\n" # Changed description
            for i, chunk in enumerate(final_retrieved_chunks, 1):
                results_output += f"🔹 Chunk {i}:\n{chunk.strip()}\n\n"
            results_output += "---\n\n"

    if not results_output.strip():
        results_output = "I couldn't find any relevant information across the provided PDFs. Please try a different question or ensure the documents contain the information you're looking for."

    return results_output

def initialize_all_pdfs():
    global text_chunks_map, vector_indexes
    initialize_embedding_model()
    initialize_reranker_model() # Initialize the re-ranker model here

    for label, file_path in PDF_FILES.items():
        print(f"🔄 Processing {file_path} (labeled as '{label}')...")
        if not os.path.exists(file_path):
            print(f"🔴 Error: PDF file '{file_path}' not found. Please upload it to your Colab environment.")
            text_chunks_map[label] = []
            vector_indexes[label] = None
            continue

        text = extract_text_from_pdf(file_path)
        if not text.strip():
            print(f"⚠️ Warning: No readable text extracted from {file_path}. This PDF might be scanned or image-based.")
            text_chunks_map[label] = []
            vector_indexes[label] = None
            continue

        chunks = chunk_text(text)
        if not chunks:
            print(f"⚠️ Warning: No chunks created from {file_path}. Text might be too short or extraction failed.")
            text_chunks_map[label] = []
            vector_indexes[label] = None
            continue

        text_chunks_map[label] = chunks
        vector_indexes[label] = build_index(chunks)
        print(f"✅ Finished processing {file_path}. {len(chunks)} chunks created.")
    print("✅ Overall Initialization complete for all configured PDFs.")

def create_gradio_ui():
    theme = gr.themes.Default()

    with gr.Blocks(theme=theme, css="""
        .gradio-container {background-color: #1a1a2e; color: white;}
        textarea, input, button, select {font-size: 16px !important;}
        .output {font-weight: bold;}
    """) as app:
        gr.Markdown("## 🔐 Cybersecurity Q&A from Multiple PDFs (Gemini RAG with Re-ranking)")
        gr.Markdown("**Important:** Please ensure the specified PDF files are uploaded to your Colab environment. Each PDF will be processed individually. Using re-ranking for improved relevance.")

        with gr.Row():
            question = gr.Textbox(lines=2, placeholder="Ask your question here", label="💬 Your Question")
            mode = gr.Radio(["Gemini Answer", "Raw PDF"], label="Answer Mode", value="Gemini Answer")
            btn = gr.Button("🔍 Get Answers")

        answer = gr.Textbox(lines=20, label="📘 Result (PDF-wise)")

        gr.Markdown("#### To make this work, you need to upload these PDF files to your Colab environment:")
        for pdf_label, file_name in PDF_FILES.items():
            gr.Markdown(f"- `{file_name}` (for {pdf_label})")
        gr.Markdown("Make sure they are in the same directory as your notebook, or provide the full path.")

        btn.click(fn=rag_answer_question, inputs=[question, mode], outputs=answer)

        gr.Markdown("#### ⚡ Powered by Gemini 2.5 Pro + FAISS + Sentence Transformers (Embedding & Re-ranker)")

    app.launch(share=True, debug=True)

if __name__ == "__main__":
    initialize_all_pdfs()
    create_gradio_ui()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

Embedding model 'sentence-transformers/all-MiniLM-L6-v2' loaded successfully.


config.json:   0%|          | 0.00/794 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/132 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

Re-ranker model 'cross-encoder/ms-marco-MiniLM-L-6-v2' loaded successfully.
🔄 Processing CYBER SECURITY (R18A0521).pdf (labeled as 'PDF 1')...


  return forward_call(*args, **kwargs)


FAISS index built with 30 vectors.
✅ Finished processing CYBER SECURITY (R18A0521).pdf. 30 chunks created.
🔄 Processing Introduction to Cybersecurity.pdf (labeled as 'PDF 2')...
FAISS index built with 47 vectors.
✅ Finished processing Introduction to Cybersecurity.pdf. 47 chunks created.
🔄 Processing cybersecuirty_sb_factsheets_all.pdf (labeled as 'PDF 3')...
FAISS index built with 14 vectors.
✅ Finished processing cybersecuirty_sb_factsheets_all.pdf. 14 chunks created.
✅ Overall Initialization complete for all configured PDFs.
Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://dc45f521863dbdf785.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


  return forward_call(*args, **kwargs)


Initial retrieval from FAISS returned 10 chunks.
Re-ranking reduced to 3 chunks.
Initial retrieval from FAISS returned 10 chunks.
Re-ranking reduced to 3 chunks.
Initial retrieval from FAISS returned 10 chunks.
Re-ranking reduced to 3 chunks.
Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://dc45f521863dbdf785.gradio.live
