In [2]:
import sys
import os

sys.path.append(os.path.abspath("D:\\My Projects\\genai_telegram_rag_bot"))


In [3]:
import os
import sqlite3
import numpy as np
from sentence_transformers import SentenceTransformer

DB_PATH = "Data/vectors.db"
DOC_PATH = "Data/Documents"
MODEL_NAME = "all-MiniLM-L6-v2"

def chunk_text(text, chunk_size=400):
    words = text.split()
    chunks, chunk = [], []
    for word in words:
        chunk.append(word)
        if len(" ".join(chunk)) >= chunk_size:
            chunks.append(" ".join(chunk))
            chunk = []
    if chunk:
        chunks.append(" ".join(chunk))
    return chunks

def ingest():
    model = SentenceTransformer(MODEL_NAME)
    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()

    cur.execute("""
    CREATE TABLE IF NOT EXISTS embeddings (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        doc_name TEXT,
        content TEXT,
        vector BLOB
    )
    """)

    for file in os.listdir(DOC_PATH):
        with open(os.path.join(DOC_PATH, file), "r", encoding="utf-8") as f:
            text = f.read()
            chunks = chunk_text(text)

            for chunk in chunks:
                emb = model.encode(chunk).astype(np.float32).tobytes()
                cur.execute(
                    "INSERT INTO embeddings (doc_name, content, vector) VALUES (?, ?, ?)",
                    (file, chunk, emb)
                )

    conn.commit()
    conn.close()
    print("Documents ingested successfully.")

if __name__ == "__main__":
    ingest()


Documents ingested successfully.


In [4]:
import sqlite3
import numpy as np
from sentence_transformers import SentenceTransformer

DB_PATH = "Data/vectors.db"
MODEL_NAME = "all-MiniLM-L6-v2"

model = SentenceTransformer(MODEL_NAME)

def cosine_sim(a, b):
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

def retrieve(query, top_k=3):
    q_emb = model.encode(query)

    conn = sqlite3.connect(DB_PATH)
    cur = conn.cursor()
    cur.execute("SELECT doc_name, content, vector FROM embeddings")

    scores = []
    for doc, content, vec in cur.fetchall():
        vec = np.frombuffer(vec, dtype=np.float32)
        score = cosine_sim(q_emb, vec)
        scores.append((score, doc, content))

    conn.close()
    scores.sort(reverse=True, key=lambda x: x[0])
    return scores[:top_k]


In [5]:
def build_prompt(context, question):
    return f"""
You are a helpful assistant.
Answer ONLY using the context below.
If the answer is not present, say "I don't know".

Context:
{context}

Question:
{question}

Answer:
"""


In [None]:
import os
import requests
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes

from rag.retriever import retrieve
from rag.prompt import build_prompt

# ===============================
# Configuration
# ===============================

TELEGRAM_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
OLLAMA_URL = "http://localhost:11434/api/generate"
MODEL_NAME = "llama3"

if not TELEGRAM_TOKEN:
    raise RuntimeError(
        "TELEGRAM_BOT_TOKEN environment variable is not set. "
        "Please set it before running the bot."
    )

# ===============================
# Command Handlers
# ===============================

async def help_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
    await update.message.reply_text(
        "ðŸ¤– Mini-RAG Telegram Bot\n\n"
        "/ask <question> â€“ Ask a question from the knowledge base\n"
        "/help â€“ Show this help message"
    )

async def ask_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
    if not context.args:
        await update.message.reply_text(
            "Please provide a question.\n\nExample:\n/ask What is the leave policy?"
        )
        return

    query = " ".join(context.args)

    # Retrieve relevant chunks
    results = retrieve(query)

    if not results:
        await update.message.reply_text("I couldn't find relevant information.")
        return

    context_text = "\n".join([r[2] for r in results])
    prompt = build_prompt(context_text, query)

    # Call Ollama
    try:
        response = requests.post(
            OLLAMA_URL,
            json={
                "model": MODEL_NAME,
                "prompt": prompt,
                "stream": False
            },
            timeout=60
        )
        response.raise_for_status()
        answer = response.json().get("response", "No response generated.")
    except Exception as e:
        await update.message.reply_text(f"Error calling LLM: {e}")
        return

    sources = ", ".join(sorted(set([r[1] for r in results])))

    await update.message.reply_text(
        f"{answer}\n\nðŸ“„ Sources: {sources}"
    )

# ===============================
# App Entry Point
# ===============================

def main():
    app = ApplicationBuilder().token(TELEGRAM_TOKEN).build()

    app.add_handler(CommandHandler("help", help_cmd))
    app.add_handler(CommandHandler("ask", ask_cmd))

    print("âœ… Telegram bot started. Waiting for messages...")
    app.run_polling()

if __name__ == "__main__":
    main()


In [1]:
import os
os.getcwd()


'D:\\My Projects\\genai_telegram_rag_bot'