<a href="https://colab.research.google.com/github/Amisha1019/Gen-AI-Customer-Service-Bot-Internship-Task/blob/main/task_1_chatbot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install sentence-transformers
!pip install faiss-cpu
!pip install numpy
!pip install pandas
!pip install requests
!pip install beautifulsoup4
!pip install pdfplumber
!pip install python-magic
!pip install python-dotenv
!pip install APScheduler
!pip install tqdm
!pip install pydantic
!pip install PyYAML
!pip install sqlalchemy

Collecting faiss-cpu
  Downloading faiss_cpu-1.12.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (5.1 kB)
Downloading faiss_cpu-1.12.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (31.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m43.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faiss-cpu
Successfully installed faiss-cpu-1.12.0
Collecting pdfplumber
  Downloading pdfplumber-0.11.7-py3-none-any.whl.metadata (42 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.8/42.8 kB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pdfminer.six==20250506 (from pdfplumber)
  Downloading pdfminer_six-20250506-py3-none-any.whl.metadata (4.2 kB)
Collecting pypdfium2>=4.18.0 (from pdfplumber)
  Downloading pypdfium2-4.30.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (48 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m48.5/48.5 kB

In [6]:
config = {
    "index_path": "./data/faiss.index",  # persisted FAISS file
    "meta_db": "./data/meta.db",  # sqlite metadata
    "embedder": {
        "model_name": "all-MiniLM-L6-v2",  # sentence-transformers model
        "chunk_size": 500,
        "chunk_overlap": 50
    },
    "sources": [
        {
            "type": "local_folder",
            "path": "./sources/docs",
            "include_extensions": [".txt", ".md", ".csv", ".pdf"]
        },
        {
            "type": "urls",
            "urls": [
                "https://example.com/customer-faq"
                # add other customer docs or knowledgebase pages here
            ]
        },
        {
            "type": "file",
            "path": "./sources/special_notice.txt"
        }
    ],
    "scheduler": {
        "enabled": True,
        "interval_minutes": 10  # ingest every 10 minutes
    }
}

In [7]:
import os
import hashlib
import mimetypes
import requests
from bs4 import BeautifulSoup
from typing import Tuple, Optional

def file_hash(path: str, block_size: int = 65536) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for block in iter(lambda: f.read(block_size), b""):
            h.update(block)
    return h.hexdigest()

def text_from_pdf(path: str) -> str:
    import pdfplumber
    text = []
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            page_text = page.extract_text()
            if page_text:
                text.append(page_text)
    return "\n".join(text)

def text_from_csv(path: str) -> str:
    import pandas as pd
    df = pd.read_csv(path)
    return df.to_csv(index=False)

def fetch_url_text(url: str, timeout: int = 10) -> Tuple[str, dict]:
    headers = {"User-Agent": "GenAI-KB-Updater/1.0"}
    resp = requests.get(url, headers=headers, timeout=timeout)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, "html.parser")

    # Remove scripts/styles and extract visible text
    for s in soup(["script", "style", "noscript"]):
        s.decompose()
    texts = soup.stripped_strings
    return " ".join(texts), {"status_code": resp.status_code, "headers": dict(resp.headers)}

def guess_mime_type(path: str) -> Optional[str]:
    m, _ = mimetypes.guess_type(path)
    return m


In [8]:
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List

class Embedder:
    def __init__(self, model_name="all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)

    def embed_texts(self, texts: List[str]) -> np.ndarray:
        # returns numpy array shape (n, dim)
        return self.model.encode(texts, convert_to_numpy=True, show_progress_bar=False)


In [10]:
import os
import time
import json
import sqlite3
from pathlib import Path
from typing import List, Dict, Tuple
import numpy as np
from tqdm import tqdm

# from utils import file_hash, text_from_pdf, text_from_csv, fetch_url_text, guess_mime_type
# from embedder.py import Embedder
import faiss
DEFAULT_DIM = 384  # matches all-MiniLM-L6-v2

class MetaDB:
    def __init__(self, db_path: str):
        os.makedirs(os.path.dirname(db_path) or ".", exist_ok=True)
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self._init_table()

    def _init_table(self):
        cur = self.conn.cursor()
        cur.execute("""
            CREATE TABLE IF NOT EXISTS chunks (
                id TEXT PRIMARY KEY,
                source TEXT,
                source_type TEXT,
                content TEXT,
                chunk_hash TEXT,
                created_at REAL
            )
        """)
        self.conn.commit()

In [11]:
    def upsert_chunk(self, chunk_id: str, source: str, source_type: str, content: str, chunk_hash: str):
        cur = self.conn.cursor()
        cur.execute("""
            INSERT INTO chunks (id, source, source_type, content, chunk_hash, created_at)
            VALUES (?, ?, ?, ?, ?, ?)
            ON CONFLICT(id) DO UPDATE SET
                content=excluded.content,
                chunk_hash=excluded.chunk_hash,
                created_at=excluded.created_at
        """, (chunk_id, source, source_type, content, chunk_hash, time.time()))
        self.conn.commit()


In [14]:
    def get_chunk(self, chunk_id: str):
        cur = self.conn.cursor()
        cur.execute("SELECT * FROM chunks WHERE id = ?", (chunk_id,))
        return cur.fetchone()

    def all_chunks(self):
        cur = self.conn.cursor()
        cur.execute("SELECT id, source, source_type, chunk_hash, created_at FROM chunks")
        return cur.fetchall()

In [15]:
class FaissStore:
    def __init__(self, index_path: str, dim: int):
        self.index_path = index_path
        self.dim = dim
        os.makedirs(os.path.dirname(index_path) or ".", exist_ok=True)
        self.id_map = {}  # mapping from sequential internal id to chunk id
        self._load_or_create_index()

In [64]:
    def _load_or_create_index(self):
        if os.path.exists(self.index_path):
            self.index = faiss.read_index(self.index_path)
            # load mapping file if exists
            map_path = self.index_path + ".meta.json"
            if os.path.exists(map_path):
                with open(map_path, "r") as f:
                    self.id_map = json.load(f)
            else:
                self.id_map = {}
        else:
            self.index = faiss.IndexFlatIP(self.dim)  # Inner product; use normalized vectors for cosine
            self.id_map = {}
            self._persist()

In [65]:
    def add_vectors(self, ids: List[str], vectors: np.ndarray):
        # assign sequential integer ids
        current_count = int(self.index.ntotal)
        n = vectors.shape[0]
        # store mapping
        for i, cid in enumerate(ids):
            self.id_map[str(current_count + i)] = cid
        # ensure float32
        vectors = vectors.astype("float32")
        # normalize for cosine similarity
        faiss.normalize_L2(vectors)
        self.index.add(vectors)
        self._persist()

In [66]:
    def search(self, vector: np.ndarray, top_k: int = 5) -> List[Tuple[str, float]]:
        vec = vector.astype("float32")
        faiss.normalize_L2(vec)
        D, I = self.index.search(vec, top_k)
        results = []
        for dist_row, idx_row in zip(D, I):
            for dist, iid in zip(dist_row, idx_row):
                if iid == -1:
                    continue
                chunk_id = self.id_map.get(str(iid))
                results.append((chunk_id, float(dist)))
        return results

In [67]:
    def _persist(self):
        faiss.write_index(self.index, self.index_path)
        map_path = self.index_path + ".meta.json"
        with open(map_path, "w") as f:
            json.dump(self.id_map, f)

In [20]:
def chunk_text(text: str, chunk_size: int, overlap: int) -> List[str]:
    tokens = text.split()
    if not tokens:
        return []
    chunks = []
    i = 0
    while i < len(tokens):
        chunk_tokens = tokens[i:i+chunk_size]
        chunks.append(" ".join(chunk_tokens))
        i += chunk_size - overlap
    return chunks

In [51]:
class Ingestor:
    def __init__(self, embedder: Embedder, faiss_store: FaissStore, meta_db: MetaDB, chunk_size=500, overlap=50):
        self.embedder = embedder
        self.faiss_store = faiss_store
        self.meta_db = meta_db
        self.chunk_size = chunk_size
        self.overlap = overlap

    def process_local_file(self, path: str, source_tag: str):
        mime = guess_mime_type(path) or ""
        if path.lower().endswith(".pdf") or "pdf" in mime:
            text = text_from_pdf(path)
        elif path.lower().endswith(".csv"):
            text = text_from_csv(path)
        else:
            with open(path, "r", encoding="utf-8", errors="ignore") as f:
                text = f.read()
        return self._process_text_source(text, f"file:{path}", "file")

    def process_url(self, url: str):
        text, meta = fetch_url_text(url)
        # you can store meta headers like Last-Modified or ETag for change detection
        return self._process_text_source(text, f"url:{url}", "url")

    def _process_text_source(self, text: str, source_identifier: str, source_type: str):
        # split into chunks, compute chunk hashes, embed, upsert
        chunks = chunk_text(text, self.chunk_size, self.overlap)
        if not chunks:
            return 0
        embeddings = self.embedder.embed_texts(chunks)
        # create chunk ids deterministically: hash of source + chunk index
        ids = []
        for i, chunk in enumerate(chunks):
            chunk_id = hashlib.sha256(f"{source_identifier}:{i}".encode()).hexdigest()
            chunk_hash = hashlib.sha256(chunk.encode("utf-8")).hexdigest()
            ids.append((chunk_id, chunk, chunk_hash))
        # Now check meta_db whether chunk exists and whether hash changed
        to_add_ids = []
        to_add_vecs = []
        for idx, (cid, content, chash) in enumerate(ids):
            row = self.meta_db.get_chunk(cid)
            if row is None or row[3] != chash:  # row fields: (id, source, source_type, content, chunk_hash, created_at)
                to_add_ids.append(cid)
                to_add_vecs.append(embeddings[idx])
                self.meta_db.upsert_chunk(cid, source_identifier, source_type, content, chash)
        if to_add_ids:
            vecs = np.vstack(to_add_vecs)
            self.faiss_store.add_vectors(to_add_ids, vecs)
        return len(to_add_ids)


In [54]:
"""
This is a placeholder for special notices or urgent updates.
You can update this file dynamically when new announcements are available.
"""

'\nThis is a placeholder for special notices or urgent updates.\nYou can update this file dynamically when new announcements are available.\n'

In [58]:
!mkdir -p ./sources
!echo "This is a placeholder notice." > ./sources/special_notice.txt

In [68]:
import os
import faiss
import json

class FaissStore:
    def __init__(self, index_path: str, dim: int):
        self.index_path = index_path
        self.dim = dim
        os.makedirs(os.path.dirname(index_path) or ".", exist_ok=True)
        self.id_map = {}  # mapping from internal ID to chunk ID
        self._load_or_create_index()  # <-- this must exist

    def _load_or_create_index(self):
        # If index exists, load it; else create a new one
        if os.path.exists(self.index_path):
            self.index = faiss.read_index(self.index_path)
            # Load mapping if exists
            map_path = self.index_path + ".meta.json"
            if os.path.exists(map_path):
                with open(map_path, "r") as f:
                    self.id_map = json.load(f)
            else:
                self.id_map = {}
        else:
            self.index = faiss.IndexFlatIP(self.dim)
            self.id_map = {}
            self._persist()

    def _persist(self):
        faiss.write_index(self.index, self.index_path)
        map_path = self.index_path + ".meta.json"
        with open(map_path, "w") as f:
            json.dump(self.id_map, f)

    # Add other methods: add_vectors, search


In [70]:
import faiss
import os
import json
import numpy as np
from typing import List, Tuple

class FaissStore:
    def __init__(self, index_path: str, dim: int):
        self.index_path = index_path
        self.dim = dim
        os.makedirs(os.path.dirname(index_path) or ".", exist_ok=True)
        self.id_map = {}  # mapping from sequential internal id to chunk id
        self._load_or_create_index()

    def _load_or_create_index(self):
        if os.path.exists(self.index_path):
            self.index = faiss.read_index(self.index_path)
            # load mapping file if exists
            map_path = self.index_path + ".meta.json"
            if os.path.exists(map_path):
                with open(map_path, "r") as f:
                    self.id_map = json.load(f)
            else:
                self.id_map = {}
        else:
            self.index = faiss.IndexFlatIP(self.dim)  # Inner product; use normalized vectors for cosine
            self.id_map = {}
            self._persist()

    def add_vectors(self, ids: List[str], vectors: np.ndarray):
        # assign sequential integer ids
        current_count = int(self.index.ntotal)
        n = vectors.shape[0]
        # store mapping
        for i, cid in enumerate(ids):
            self.id_map[str(current_count + i)] = cid
        # ensure float32
        vectors = vectors.astype("float32")
        # normalize for cosine similarity
        faiss.normalize_L2(vectors)
        self.index.add(vectors)
        self._persist()

    def search(self, vector: np.ndarray, top_k: int = 5) -> List[Tuple[str, float]]:
        vec = vector.astype("float32")
        faiss.normalize_L2(vec)
        # Reshape vector for Faiss
        if vec.ndim == 1:
            vec = vec.reshape(1, -1)
        D, I = self.index.search(vec, top_k)
        results = []
        for dist_row, idx_row in zip(D, I):
            for dist, iid in zip(dist_row, idx_row):
                if iid == -1:
                    continue
                chunk_id = self.id_map.get(str(iid))
                results.append((chunk_id, float(dist)))
        # If the input vector was 1D, the output should also represent a single search result
        if len(results) > 0 and vector.ndim == 1:
            return results
        return results

    def _persist(self):
        faiss.write_index(self.index, self.index_path)
        map_path = self.index_path + ".meta.json"
        with open(map_path, "w") as f:
            json.dump(self.id_map, f)

In [71]:
import os
import faiss
import json
import numpy as np
from typing import List, Tuple

class FaissStore:
    def __init__(self, index_path: str, dim: int):
        """
        Initializes the FAISS store. Loads existing index if present, else creates a new one.
        Args:
            index_path: Path to store FAISS index.
            dim: Dimension of embeddings.
        """
        self.index_path = index_path
        self.dim = dim
        os.makedirs(os.path.dirname(index_path) or ".", exist_ok=True)
        self.id_map = {}  # maps internal integer IDs to chunk IDs
        self._load_or_create_index()  # Load or create index

    def _load_or_create_index(self):
        """
        Loads the FAISS index from disk if exists, else creates a new IndexFlatIP.
        """
        if os.path.exists(self.index_path):
            self.index = faiss.read_index(self.index_path)
            map_path = self.index_path + ".meta.json"
            if os.path.exists(map_path):
                with open(map_path, "r") as f:
                    self.id_map = json.load(f)
            else:
                self.id_map = {}
        else:
            # Use Inner Product (cosine similarity if vectors are normalized)
            self.index = faiss.IndexFlatIP(self.dim)
            self.id_map = {}
            self._persist()

    def add_vectors(self, ids: List[str], vectors: np.ndarray):
        """
        Adds new vectors to the index and updates mapping.
        Args:
            ids: List of chunk IDs (strings).
            vectors: numpy array of shape (n, dim)
        """
        if len(ids) != vectors.shape[0]:
            raise ValueError("Number of IDs must match number of vectors.")

        # Normalize vectors for cosine similarity
        vectors = vectors.astype("float32")
        faiss.normalize_L2(vectors)

        # Assign sequential integer IDs
        start_idx = int(self.index.ntotal)
        for i, cid in enumerate(ids):
            self.id_map[str(start_idx + i)] = cid

        self.index.add(vectors)
        self._persist()

    def search(self, vector: np.ndarray, top_k: int = 5) -> List[Tuple[str, float]]:
        """
        Searches for top_k nearest neighbors in the vector store.
        Args:
            vector: numpy array of shape (1, dim)
            top_k: number of nearest neighbors to retrieve
        Returns:
            List of tuples (chunk_id, similarity_score)
        """
        vec = vector.astype("float32")
        faiss.normalize_L2(vec)
        D, I = self.index.search(vec, top_k)
        results = []
        for dist_row, idx_row in zip(D, I):
            for dist, iid in zip(dist_row, idx_row):
                if iid == -1:
                    continue
                chunk_id = self.id_map.get(str(iid))
                results.append((chunk_id, float(dist)))
        return results

    def _persist(self):
        """
        Persists the FAISS index and mapping to disk.
        """
        faiss.write_index(self.index, self.index_path)
        map_path = self.index_path + ".meta.json"
        with open(map_path, "w") as f:
            json.dump(self.id_map, f)


In [73]:
# from ingest import FaissStore # Removed import
import numpy as np

# Initialize FAISS store
store = FaissStore(index_path="./data/customer_service_index.faiss", dim=384)

# Example: add vectors
vectors = np.random.rand(3, 384).astype("float32")
store.add_vectors(["chunk1", "chunk2", "chunk3"], vectors)

# Example: search
query_vec = np.random.rand(1, 384).astype("float32")
results = store.search(query_vec, top_k=2)
print(results)

[('chunk3', 0.7678486108779907), ('chunk2', 0.7536894679069519)]


In [None]:
import yaml
import time
from apscheduler.schedulers.background import BackgroundScheduler
# from ingest import Embedder, FaissStore, MetaDB, Ingestor # Removed import

def job(ingestor, cfg):
    try:
        n = ingestor.ingest_sources(cfg["sources"])
        print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Scheduler: Added {n} new chunks.")
    except Exception as e:
        print("Scheduler job failed:", e)

def start_scheduler(cfg_path="config.yaml"):
    # cfg = yaml.safe_load(open(cfg_path)) # Removed file loading
    cfg = config # Use the config dictionary from the notebook
    emb = Embedder(cfg["embedder"]["model_name"])
    dim = emb.model.get_sentence_embedding_dimension()
    faiss_store = FaissStore(cfg["index_path"], dim) # Corrected key
    meta_db = MetaDB(cfg["meta_db"]) # Corrected key
    ing = Ingestor(emb, faiss_store, meta_db, chunk_size=cfg["embedder"]["chunk_size"], overlap=cfg["embedder"]["chunk_overlap"])

    sched = BackgroundScheduler()
    interval = cfg.get("scheduler", {}).get("interval_minutes", 10)
    sched.add_job(job, "interval", minutes=interval, args=[ing, cfg])
    sched.start()
    print(f"Scheduler started; polling every {interval} minutes.")
    try:
        while True:
            time.sleep(60)
    except (KeyboardInterrupt, SystemExit):
        sched.shutdown()

if __name__ == "__main__":
    start_scheduler()

Scheduler started; polling every 10 minutes.
Scheduler job failed: 'Ingestor' object has no attribute 'ingest_sources'


In [None]:
import yaml
import numpy as np
from ingest import Embedder, FaissStore, MetaDB
# If your LLM is OpenAI or other, add code to call it. Here we show a structured flow.

class ChatBot:
    def __init__(self, cfg_path="config.yaml"):
        cfg = yaml.safe_load(open(cfg_path))
        self.cfg = cfg
        self.embedder = Embedder(cfg["embedder"]["model_name"])
        self.dim = self.embedder.model.get_sentence_embedding_dimension()
        self.store = FaissStore(cfg["vector_store"]["index_path"], self.dim)
        self.meta_db = MetaDB(cfg["vector_store"]["meta_db"])

    def _get_chunk_content(self, chunk_id):
        row = self.meta_db.get_chunk(chunk_id)
        if row:
            # table columns: id, source, source_type, content, chunk_hash, created_at
            return row[3]
        return None

    def answer(self, query: str, top_k: int = 5):
        qvec = self.embedder.embed_texts([query])
        results = self.store.search(qvec, top_k=top_k)
        contexts = []
        for cid, score in results:
            content = self._get_chunk_content(cid)
            if content:
                contexts.append((content, score))
        # Build prompt for LLM: combine top contexts + question
        prompt = self._build_prompt(query, contexts)
        # Here you would call your LLM (OpenAI, local LLM, etc.) with prompt and return response.
        # For now we'll return the prompt and contexts for demonstration.
        return {"prompt": prompt, "contexts": contexts}

    def _build_prompt(self, query, contexts):
        pieces = ["You are a helpful customer support assistant. Use the following knowledge snippets to answer the question.\n"]
        for i, (ctx, score) in enumerate(contexts):
            pieces.append(f"=== snippet {i+1} (score {score:.4f}) ===\n{ctx}\n")
        pieces.append(f"Question: {query}\nAnswer (concise):")
        return "\n".join(pieces)

# Example usage
if __name__ == "__main__":
    bot = ChatBot()
    q = input("Ask: ")
    out = bot.answer(q)
    print("\n--- Prompt sent to LLM ---\n")
    print(out["prompt"][:4000])  # trim long output in example
    print("\n--- Retrieved contexts (ids omitted) ---")
    for c, s in out["contexts"]:
        print(f"len {len(c)} chars, score {s:.4f}")
