In [None]:
!pip install -U google-genai faiss-cpu beautifulsoup4 requests numpy

In [None]:
import os
import numpy as np
import faiss
import requests
from bs4 import BeautifulSoup
from google import genai

#  Set your Gemini API key
os.environ["GOOGLE_API_KEY"] = "your_api_key"

client = genai.Client()

In [None]:
#step1::Load Website Content
def load_website(url):
    headers = {"User-Agent": "Mozilla/5.0"}
    response = requests.get(url, headers=headers)
    soup = BeautifulSoup(response.text, "html.parser")

    # Remove unwanted elements
    for tag in soup(["script", "style", "nav", "footer"]):
        tag.decompose()

    paragraphs = soup.find_all("p")
    text = " ".join([p.get_text() for p in paragraphs])
    text = " ".join(text.split())

    return text

In [None]:
#step2::Text Chunking
def chunk_text(text, chunk_size=400, overlap=80):
    words = text.split()
    chunks = []
    start = 0

    while start < len(words):
        end = start + chunk_size
        chunk = " ".join(words[start:end])
        chunks.append(chunk)
        start += chunk_size - overlap

    return chunks

In [None]:
def embed_text(text):
    response = client.models.embed_content(
        model="models/gemini-embedding-001",
        contents=text
    )
    return np.array(response.embeddings[0].values, dtype="float32")

In [None]:
#step3::Create Documents with Cached Embeddings
url = "https://en.wikipedia.org/wiki/Artificial_intelligence"

website_text = load_website(url)
chunks = chunk_text(website_text)


documents = []

for i, chunk in enumerate(chunks):
    emb = embed_text(chunk)   # Generate embedding once

    documents.append({
        "text": chunk,
        "source": f"{url} | Chunk {i+1}",
        "embedding": emb
    })

print("Total Chunks:", len(documents))

In [None]:
#step5:: Build FAISS Vector Index
embeddings = [doc["embedding"] for doc in documents]
dimension = embeddings[0].shape[0]

index = faiss.IndexFlatL2(dimension)
index.add(np.array(embeddings))

print("FAISS index built successfully.")

In [None]:
# ============================================
#  Helper Functions for Advanced RAG
# --------------------------------------------
# Contains:
# - Embedding generation
# - Retrieval logic
# - Guardrail filtering
# - Cosine re-ranking
# - Multi-query expansion
# - Verification
# - Confidence estimation
# - Final answer generation
# ============================================

In [None]:
def retrieve(query, top_k=8):
    query_vector = embed_text(query)
    distances, indices = index.search(np.array([query_vector]), top_k)

    results = []
    for idx, dist in zip(indices[0], distances[0]):
        results.append({
            "text": documents[idx]["text"],
            "source": documents[idx]["source"],
            "score": float(dist),
            "embedding": documents[idx]["embedding"]
        })
    return results

In [None]:
SIMILARITY_THRESHOLD = 1.2

def guardrail_filter(results):
    return [r for r in results if r["score"] < SIMILARITY_THRESHOLD]

In [None]:
def rerank(query, retrieved_chunks):
    chunk_texts = "\n\n".join(
        [f"Chunk {i+1}: {chunk['text']}" for i, chunk in enumerate(retrieved_chunks)]
    )

    prompt = f"""
    Query: {query}

    Rank the top 3 most relevant chunks by number.

    {chunk_texts}

    Return only numbers separated by commas.
    """

    response = client.models.generate_content(
        model="models/gemini-2.5-flash",
        contents=prompt
    )

    ranked_numbers = response.text.strip()
    top_indices = [int(n.strip()) - 1 for n in ranked_numbers.split(",") if n.strip().isdigit()]

    return [retrieved_chunks[i] for i in top_indices if i < len(retrieved_chunks)]

In [None]:
def cosine_rerank(query, retrieved_chunks, top_k=3):
    query_vec = embed_text(query)

    reranked = []

    for chunk in retrieved_chunks:
        chunk_vec = chunk["embedding"]

        cosine_sim = np.dot(query_vec, chunk_vec) / (
            np.linalg.norm(query_vec) * np.linalg.norm(chunk_vec)
        )

        chunk["cosine_score"] = float(cosine_sim)
        reranked.append(chunk)

    reranked = sorted(reranked, key=lambda x: x["cosine_score"], reverse=True)

    return reranked[:top_k]

In [None]:
def extract_sources(chunks):
    return list(set(chunk["source"] for chunk in chunks))

In [None]:
def expand_query(query):
    prompt = f"""
    Generate 3 different rephrasings of the following query.

    Query: {query}

    Return each variation on a new line.
    """

    response = client.models.generate_content(
        model="models/gemini-2.5-flash",
        contents=prompt
    )

    variations = response.text.strip().split("\n")
    variations = [v.strip("- ").strip() for v in variations if v.strip()]

    return [query] + variations

In [None]:
def multi_query_retrieve(query, top_k=5):
    queries = expand_query(query)

    all_results = []

    for q in queries:
        results = retrieve(q, top_k)
        all_results.extend(results)

    # Remove duplicates using source
    unique = {r["source"]: r for r in all_results}

    return list(unique.values())

In [None]:
def verify_answer(answer, context):
    prompt = f"""
    Based only on the context below, verify whether the answer is fully supported.

    Context:
    {context}

    Answer:
    {answer}

    Reply with only YES or NO.
    """

    response = client.models.generate_content(
        model="models/gemini-2.5-flash",
        contents=prompt
    )

    return response.text.strip().upper()

In [None]:
def calculate_confidence(reranked_chunks, verification="YES"):
    if not reranked_chunks:
        return 0.0

    cosine_scores = [chunk["cosine_score"] for chunk in reranked_chunks]
    avg_score = np.mean(cosine_scores)

    # Normalize cosine (0 to 1 range assumption for meaningful results)
    retrieval_conf = max(0, min(1, avg_score))

    # Penalize if few chunks support answer
    coverage_factor = len(reranked_chunks) / 3  # assuming top_k=3
    coverage_factor = min(1, coverage_factor)

    confidence = retrieval_conf * coverage_factor

    # Penalize if verification fails
    if verification != "YES":
        confidence *= 0.6

    return round(confidence, 3)

In [None]:
def generate_answer(query, conversation_history):

    # Step 1: Retrieve
    retrieved = multi_query_retrieve(query)

    # Step 2: Guardrail Filter
    filtered = guardrail_filter(retrieved)

    if not filtered:
        return {
            "answer": "The answer is not available in the provided website content.",
            "sources": [],
            "confidence": 0.0
        }

    # Step 3: Cosine Re-rank
    reranked = cosine_rerank(query, filtered)

    # Step 4: Prepare Context
    context = "\n\n".join([chunk["text"] for chunk in reranked])

    # Step 5: Source Attribution
    sources = extract_sources(reranked)

    # Step 6: Final Answer Generation
    history_text = "\n".join(
    [f"User: {h['user']}\nAssistant: {h['assistant']}" for h in conversation_history[-3:]]
    )

    final_prompt = f"""
    You are a helpful assistant.

    Conversation History:
    {history_text}

    Use the context below to answer the question.
    If the answer is not found, say it is not available.

    Context:
    {context}

    Current Question:
    {query}
    """

    response = client.models.generate_content(
      model="models/gemini-2.5-flash",
      contents=final_prompt
    )

    answer = response.text

    confidence = calculate_confidence(reranked)

    # Verification step
    verification = verify_answer(answer, context)

    if verification != "YES":
        confidence *= 0.6   # reduce confidence if not fully supported

    conversation_history.append({
    "user": query,
    "assistant": answer
    })

    return {
      "answer": answer,
      "sources": sources,
      "confidence": round(confidence, 3),
      "verified": verification
    }

In [None]:
# ============================================
#  Interactive Chat Loop with Memory
# --------------------------------------------
# - Maintain short-term conversation history
# - Allow user to ask multiple questions
# - Support follow-up queries
# - Exit when user types 'exit'
# ============================================

In [None]:
conversation_history = []

while True:
    query = input("\nAsk a question (type 'exit' to stop): ")

    if query.lower() == "exit":
        print("Exiting chatbot.")
        break

    result = generate_answer(query, conversation_history)

    print("\nAnswer:\n")
    print(result["answer"])

    print("\nSources:")
    for s in result["sources"]:
        print("-", s)

    print("\nConfidence Score:", result["confidence"])
    print("Verified:", result["verified"])