In [None]:
# =============================================
# 💫 RAG-Gemini: Next-Gen Colab Knowledge Base
# =============================================
!pip install -q numba==0.56.4 numpy==1.23.5
!pip install -q langchain langchain-community faiss-cpu sentence-transformers transformers torch pypdf gradio openai-whisper

import os, tempfile, io, json, traceback, shutil
from datetime import datetime
from typing import List, Dict
import torch
import gradio as gr
import whisper

# ---------------------
# LangChain imports
# ---------------------
try:
    from langchain_community.document_loaders import PyPDFLoader, TextLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain_community.vectorstores import FAISS
    from langchain_community.embeddings import HuggingFaceEmbeddings
except:
    from langchain.document_loaders import PyPDFLoader, TextLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain.vectorstores import FAISS
    from langchain.embeddings import HuggingFaceEmbeddings


from transformers import pipeline

# ---------------------
# CONFIGURATION
# ---------------------
CONFIG = {
    "chunk_size": 400,
    "chunk_overlap": 80,
    "embedding_model": "sentence-transformers/all-MiniLM-L6-v2",
    "llm_model": "gpt2",
    "retrieval_k": 4,
    "max_tokens": 200,
    "device": 0 if torch.cuda.is_available() else -1,
    "cache_dir": "./rag_gemini_cache",
    "history_file": "./rag_gemini_history.json",
}

os.makedirs(CONFIG["cache_dir"], exist_ok=True)

# ---------------------
# UTILS
# ---------------------
def save_uploaded_file(uploaded_file):
    temp_dir = tempfile.gettempdir()
    file_path = os.path.join(temp_dir, uploaded_file.name)
    with open(file_path, "wb") as f:
        f.write(uploaded_file.read())
    return file_path

def load_history():
    if os.path.exists(CONFIG["history_file"]):
        with open(CONFIG["history_file"], "r") as f:
            return json.load(f)
    return []

def save_history(hist):
    with open(CONFIG["history_file"], "w") as f:
        json.dump(hist, f, indent=2)

# ---------------------
# KNOWLEDGE BASE
# ---------------------
class KnowledgeBase:
    def __init__(self):
        self.docs = []
        self.chunks = []
        self.vectorstore = None
        self.embeddings = HuggingFaceEmbeddings(model_name=CONFIG["embedding_model"])

    def load_docs(self, uploaded_files):
        self.docs = []
        for file in uploaded_files:
            path = save_uploaded_file(file)
            if path.endswith(".pdf"):
                docs = PyPDFLoader(path).load()
            elif path.endswith(".txt"):
                docs = TextLoader(path, encoding="utf-8").load()
            else:
                continue
            self.docs.extend(docs)
        return len(self.docs)

    def embed(self):
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=CONFIG["chunk_size"], chunk_overlap=CONFIG["chunk_overlap"]
        )
        self.chunks = splitter.split_documents(self.docs)
        self.vectorstore = FAISS.from_documents(self.chunks, self.embeddings)
        return len(self.chunks)

    def retriever(self):
        return self.vectorstore.as_retriever(search_kwargs={"k": CONFIG["retrieval_k"]})

# ---------------------
# SYNTHESIS ENGINE
# ---------------------
class SynthesisEngine:
    def __init__(self):
        self.pipe = None
        self.voice_model = None

    def init(self):
        self.pipe = pipeline(
            "text-generation",
            model=CONFIG["llm_model"],
            device=CONFIG["device"],
            max_new_tokens=CONFIG["max_tokens"]
        )
        return True

    def generate(self, query, docs, concise=False):
        if not docs:
            return "No relevant context found."

        context = "\n".join([d.page_content[:600] for d in docs])
        style = "Give a brief, focused answer." if concise else "Give a detailed, reasoned explanation."
        prompt = f"Use these DOCUMENTS to answer. If not found, say so.\n{style}\n\nDOCUMENTS:\n{context}\n\nQUESTION: {query}\n\nANSWER:"
        out = self.pipe(prompt, max_new_tokens=CONFIG["max_tokens"], do_sample=False)
        text = out[0]["generated_text"]
        return text.split("ANSWER:")[-1].strip()

    def transcribe_voice(self, audio_path):
        if self.voice_model is None:
            self.voice_model = whisper.load_model("base")
        result = self.voice_model.transcribe(audio_path)
        return result["text"]

# ---------------------
# RAG SYSTEM
# ---------------------
class RAGGemini:
    def __init__(self):
        self.kb = KnowledgeBase()
        self.llm = SynthesisEngine()
        self.retriever = None
        self.history = load_history()

    def setup(self, files):
        n = self.kb.load_docs(files)
        c = self.kb.embed()
        self.llm.init()
        self.retriever = self.kb.retriever()
        return f"✅ {n} docs loaded & {c} chunks embedded."

    def query(self, q, concise=False):
        docs = self.retriever.get_relevant_documents(q)
        answer = self.llm.generate(q, docs, concise)
        sources = "\n\n".join([f"📘 {d.page_content[:200]}..." for d in docs[:3]])
        record = {"time": datetime.now().isoformat(), "query": q, "answer": answer}
        self.history.append(record)
        save_history(self.history)
        return answer, sources

rag = RAGGemini()

# ---------------------
# UI CALLBACKS
# ---------------------
def build_system(files):
    return rag.setup(files)

def ask(q, concise):
    return rag.query(q, concise)

def show_history():
    hist = rag.history[-10:][::-1]
    return "\n\n".join([f"🕒 {h['time']}\n**Q:** {h['query']}\n**A:** {h['answer']}" for h in hist])

def voice_to_text(audio):
    temp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
    audio.export(temp.name, format="wav")
    text = rag.llm.transcribe_voice(temp.name)
    return text

# ---------------------
# GRADIO UI
# ---------------------
with gr.Blocks(css="""
body {background: #0e0e11; color: #e3e3e3; font-family: 'Inter', sans-serif;}
.gr-button {background: linear-gradient(90deg, #1f2937, #374151); color: white; border-radius: 8px;}
.gr-textbox, .gr-markdown {background-color: #111827; border-radius: 8px;}
""") as demo:
    gr.Markdown("## 🌌 **RAG-Gemini** — Intelligent Knowledge Base with Local Models")
    gr.Markdown("> Upload. Ask. Discover insights. (Runs 100% offline)")

    with gr.Row():
        file_uploader = gr.File(file_count="multiple", file_types=[".pdf", ".txt"], label="📂 Upload PDFs or Texts")
        build_btn = gr.Button("🚀 Build Knowledge Base")
    status_box = gr.Textbox(label="System Status")

    build_btn.click(build_system, inputs=[file_uploader], outputs=[status_box])

    with gr.Tab("💬 Ask"):
        query_box = gr.Textbox(label="Ask a Question")
        concise = gr.Checkbox(label="Concise Mode", value=False)
        ask_btn = gr.Button("🔍 Query")
        answer_box = gr.Textbox(label="Answer", lines=5)
        sources_box = gr.Textbox(label="Cited Sources", lines=5)
        ask_btn.click(ask, inputs=[query_box, concise], outputs=[answer_box, sources_box])

    with gr.Tab("🎙️ Voice Query"):
        audio_input = gr.Audio(sources=["microphone"], type="filepath")
        voice_btn = gr.Button("🎧 Transcribe")
        voice_text = gr.Textbox(label="Recognized Text")
        voice_btn.click(rag.llm.transcribe_voice, inputs=[audio_input], outputs=[voice_text])

    with gr.Tab("🕘 History"):
        hist_btn = gr.Button("📜 Show Recent")
        hist_box = gr.Markdown()
        hist_btn.click(show_history, outputs=[hist_box])

    with gr.Tab("⚙️ System Info"):
        gr.Markdown(f"**Embedding model:** `{CONFIG['embedding_model']}`")
        gr.Markdown(f"**LLM model:** `{CONFIG['llm_model']}`")
        gr.Markdown(f"**Device:** `{CONFIG['device']}`")
        gr.Markdown("Cache and embeddings are stored locally.")

demo.launch(share=True)

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.4 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m2.4/2.4 MB[0m [31m81.7 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.4/2.4 MB[0m [31m46.0 MB/s[0m eta [36m0:00:00[0m
[?25h  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32mpython setup.py egg_info[0m did not run successfully.
  [31m│[0m exit code: [1;36m1[0m
  [31m╰─>[0m See above for output.
  
  [1;35mnote[0m: This error originates from a subprocess, and is likely not a problem with pip.
  Preparing metadata (setup.py) ... [?25l[?25herror
[1;31merror[0m: [1mmetadata-generation-failed[0m

[31m×[0m Encountered error while generating package metadata.
[31m╰─>[0m See above for output.

[1;35mnote[0m: This is an issue with the package mentioned above, not pip.
[1;36mhint[0m: See above for details.


In [2]:
# =============================================
# 💫 RAG-Gemini: Next-Gen Colab Knowledge Base
# =============================================
!pip install -q langchain langchain-community faiss-cpu sentence-transformers transformers torch pypdf gradio openai-whisper

import os, tempfile, io, json, traceback, shutil
from datetime import datetime
from typing import List, Dict
import torch
import gradio as gr
import whisper

# ---------------------
# LangChain imports
# ---------------------
try:
    from langchain_community.document_loaders import PyPDFLoader, TextLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain_community.vectorstores import FAISS
    from langchain_community.embeddings import HuggingFaceEmbeddings
except:
    from langchain.document_loaders import PyPDFLoader, TextLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain.vectorstores import FAISS
    from langchain.embeddings import HuggingFaceEmbeddings

from transformers import pipeline

# ---------------------
# CONFIGURATION
# ---------------------
CONFIG = {
    "chunk_size": 400,
    "chunk_overlap": 80,
    "embedding_model": "sentence-transformers/all-MiniLM-L6-v2",
    "llm_model": "gpt2",
    "retrieval_k": 4,
    "max_tokens": 200,
    "device": 0 if torch.cuda.is_available() else -1,
    "cache_dir": "./rag_gemini_cache",
    "history_file": "./rag_gemini_history.json",
}

os.makedirs(CONFIG["cache_dir"], exist_ok=True)

# ---------------------
# UTILS
# ---------------------
def save_uploaded_file(uploaded_file):
    temp_dir = tempfile.gettempdir()
    file_path = os.path.join(temp_dir, uploaded_file.name)
    with open(file_path, "wb") as f:
        f.write(uploaded_file.read())
    return file_path

def load_history():
    if os.path.exists(CONFIG["history_file"]):
        with open(CONFIG["history_file"], "r") as f:
            return json.load(f)
    return []

def save_history(hist):
    with open(CONFIG["history_file"], "w") as f:
        json.dump(hist, f, indent=2)

# ---------------------
# KNOWLEDGE BASE
# ---------------------
class KnowledgeBase:
    def __init__(self):
        self.docs = []
        self.chunks = []
        self.vectorstore = None
        self.embeddings = HuggingFaceEmbeddings(model_name=CONFIG["embedding_model"])

    def load_docs(self, uploaded_files):
        self.docs = []
        for file in uploaded_files:
            path = save_uploaded_file(file)
            if path.endswith(".pdf"):
                docs = PyPDFLoader(path).load()
            elif path.endswith(".txt"):
                docs = TextLoader(path, encoding="utf-8").load()
            else:
                continue
            self.docs.extend(docs)
        return len(self.docs)

    def embed(self):
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=CONFIG["chunk_size"], chunk_overlap=CONFIG["chunk_overlap"]
        )
        self.chunks = splitter.split_documents(self.docs)
        self.vectorstore = FAISS.from_documents(self.chunks, self.embeddings)
        return len(self.chunks)

    def retriever(self):
        return self.vectorstore.as_retriever(search_kwargs={"k": CONFIG["retrieval_k"]})

# ---------------------
# SYNTHESIS ENGINE
# ---------------------
class SynthesisEngine:
    def __init__(self):
        self.pipe = None
        self.voice_model = None

    def init(self):
        self.pipe = pipeline(
            "text-generation",
            model=CONFIG["llm_model"],
            device=CONFIG["device"],
            max_new_tokens=CONFIG["max_tokens"]
        )
        return True

    def generate(self, query, docs, concise=False):
        if not docs:
            return "No relevant context found."

        context = "\n".join([d.page_content[:600] for d in docs])
        style = "Give a brief, focused answer." if concise else "Give a detailed, reasoned explanation."
        prompt = f"Use these DOCUMENTS to answer. If not found, say so.\n{style}\n\nDOCUMENTS:\n{context}\n\nQUESTION: {query}\n\nANSWER:"
        out = self.pipe(prompt, max_new_tokens=CONFIG["max_tokens"], do_sample=False)
        text = out[0]["generated_text"]
        return text.split("ANSWER:")[-1].strip()

    def transcribe_voice(self, audio_path):
        if self.voice_model is None:
            self.voice_model = whisper.load_model("base")
        result = self.voice_model.transcribe(audio_path)
        return result["text"]

# ---------------------
# RAG SYSTEM
# ---------------------
class RAGGemini:
    def __init__(self):
        self.kb = KnowledgeBase()
        self.llm = SynthesisEngine()
        self.retriever = None
        self.history = load_history()

    def setup(self, files):
        n = self.kb.load_docs(files)
        c = self.kb.embed()
        self.llm.init()
        self.retriever = self.kb.retriever()
        return f"✅ {n} docs loaded & {c} chunks embedded."

    def query(self, q, concise=False):
        docs = self.retriever.get_relevant_documents(q)
        answer = self.llm.generate(q, docs, concise)
        sources = "\n\n".join([f"📘 {d.page_content[:200]}..." for d in docs[:3]])
        record = {"time": datetime.now().isoformat(), "query": q, "answer": answer}
        self.history.append(record)
        save_history(self.history)
        return answer, sources

rag = RAGGemini()

# ---------------------
# UI CALLBACKS
# ---------------------
def build_system(files):
    return rag.setup(files)

def ask(q, concise):
    return rag.query(q, concise)

def show_history():
    hist = rag.history[-10:][::-1]
    return "\n\n".join([f"🕒 {h['time']}\n**Q:** {h['query']}\n**A:** {h['answer']}" for h in hist])

def voice_to_text(audio):
    temp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
    audio.export(temp.name, format="wav")
    text = rag.llm.transcribe_voice(temp.name)
    return text

# ---------------------
# GRADIO UI
# ---------------------
with gr.Blocks(css="""
body {background: #0e0e11; color: #e3e3e3; font-family: 'Inter', sans-serif;}
.gr-button {background: linear-gradient(90deg, #1f2937, #374151); color: white; border-radius: 8px;}
.gr-textbox, .gr-markdown {background-color: #111827; border-radius: 8px;}
""") as demo:
    gr.Markdown("## 🌌 **RAG-Gemini** — Intelligent Knowledge Base with Local Models")
    gr.Markdown("> Upload. Ask. Discover insights. (Runs 100% offline)")

    with gr.Row():
        file_uploader = gr.File(file_count="multiple", file_types=[".pdf", ".txt"], label="📂 Upload PDFs or Texts")
        build_btn = gr.Button("🚀 Build Knowledge Base")
    status_box = gr.Textbox(label="System Status")

    build_btn.click(build_system, inputs=[file_uploader], outputs=[status_box])

    with gr.Tab("💬 Ask"):
        query_box = gr.Textbox(label="Ask a Question")
        concise = gr.Checkbox(label="Concise Mode", value=False)
        ask_btn = gr.Button("🔍 Query")
        answer_box = gr.Textbox(label="Answer", lines=5)
        sources_box = gr.Textbox(label="Cited Sources", lines=5)
        ask_btn.click(ask, inputs=[query_box, concise], outputs=[answer_box, sources_box])

    with gr.Tab("🎙️ Voice Query"):
        audio_input = gr.Audio(sources=["microphone"], type="filepath")
        voice_btn = gr.Button("🎧 Transcribe")
        voice_text = gr.Textbox(label="Recognized Text")
        voice_btn.click(rag.llm.transcribe_voice, inputs=[audio_input], outputs=[voice_text])

    with gr.Tab("🕘 History"):
        hist_btn = gr.Button("📜 Show Recent")
        hist_box = gr.Markdown()
        hist_btn.click(show_history, outputs=[hist_box])

    with gr.Tab("⚙️ System Info"):
        gr.Markdown(f"**Embedding model:** `{CONFIG['embedding_model']}`")
        gr.Markdown(f"**LLM model:** `{CONFIG['llm_model']}`")
        gr.Markdown(f"**Device:** `{CONFIG['device']}`")
        gr.Markdown("Cache and embeddings are stored locally.")

demo.launch(share=True)


  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32mpython setup.py egg_info[0m did not run successfully.
  [31m│[0m exit code: [1;36m1[0m
  [31m╰─>[0m See above for output.
  
  [1;35mnote[0m: This error originates from a subprocess, and is likely not a problem with pip.
  Preparing metadata (setup.py) ... [?25l[?25herror
[1;31merror[0m: [1mmetadata-generation-failed[0m

[31m×[0m Encountered error while generating package metadata.
[31m╰─>[0m See above for output.

[1;35mnote[0m: This is an issue with the package mentioned above, not pip.
[1;36mhint[0m: See above for details.
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m798.6/798.6 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m779.1/7

