# 1. Introduction
### This section sets up main imports and logging for the RAG pipeline demo.

In [None]:
# --- Rag Eu Lab Project: Part 1/5 ---
# Imports and basic setup
from __future__ import annotations

import argparse
import dataclasses
import hashlib
import json
import logging
import random
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Sequence, Tuple

# Optional heavy deps
try:
    import pandas as pd
except Exception:
    pd = None  # type: ignore

try:
    import numpy as np
except Exception:
    np = None  # type: ignore

try:
    from sentence_transformers import SentenceTransformer
except Exception:
    SentenceTransformer = None

try:
    import faiss
except Exception:
    faiss = None

try:
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.neighbors import NearestNeighbors
except Exception:
    TfidfVectorizer = None
    NearestNeighbors = None

logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 2. Data Classes
### TableMeta describes metadata for each table.
### DocChunk is a textual chunk derived from tables, ready for embedding.

In [None]:
# --- Rag Eu Lab Project: Part 2/5 ---
# Data classes
dataclass
class TableMeta:
    name: str
    schema: Dict[str, str]
    nrows: int
    digest: str

@dataclass
class DocChunk:
    id: str
    table: str
    chunk_text: str
    metadata: Dict[str, str]

# Utility function
def short_hash(s: str, length: int = 10) -> str:
    return hashlib.sha1(s.encode('utf-8')).hexdigest()[:length]

# 3. Table Simulation and Schema Normalizer
### TableLoader simulates multiple tables; SchemaNormalizer converts schema and sample rows into text chunks.

In [None]:
# --- Rag Eu Lab Project: Part 3/5 ---
# Table loader and schema normalizer
class TableLoader:
    def __init__(self, n_tables: int = 600, seed: int = 42):
        self.n_tables = n_tables
        random.seed(seed)

    def simulate_tables(self) -> Iterable[Tuple[TableMeta, Optional[pd.DataFrame]]]:
        for i in range(self.n_tables):
            name = f"eu_lab_table_{i:04d}"
            ncols = random.randint(5, 25)
            nrows = random.randint(50, 5000)
            schema = {f"col_{c:02d}": random.choice(['int', 'float', 'str', 'date', 'category', 'json']) for c in range(ncols)}
            digest = short_hash(name + json.dumps(schema))
            meta = TableMeta(name=name, schema=schema, nrows=nrows, digest=digest)
            df = None
            if pd is not None:
                data = {col: [random.randint(0, 1000) if dtype=='int' else f'str_{random.randint(0,9999)}' for _ in range(min(200,nrows))] for col,dtype in schema.items()}
                df = pd.DataFrame(data)
            yield meta, df

class SchemaNormalizer:
    def canonicalize_schema(self, schema: Dict[str, str]) -> Dict[str, str]:
        return {col.strip().lower().replace(' ', '_'): dtype if dtype in ['int','float','str','date','category','json'] else 'str' for col,dtype in sorted(schema.items())}

    def tabular_to_text(self, meta: TableMeta, df: Optional[pd.DataFrame] = None, n_samples: int = 3) -> List[DocChunk]:
        schema = self.canonicalize_schema(meta.schema)
        schema_text = f"Table: {meta.name}\nRows: {meta.nrows}\nSchema:\n" + '\n'.join([f" - {c}: {t}" for c,t in schema.items()])
        chunks = [DocChunk(id=f"{meta.digest}_schema", table=meta.name, chunk_text=schema_text, metadata={'role':'schema','nrows':str(meta.nrows)})]
        if df is not None and len(df) > 0:
            samples = df.sample(n=min(n_samples, len(df)), random_state=0)
            for ridx,row in samples.iterrows():
                row_text = f"Row sample (idx={ridx}): " + ', '.join([f"{col}={repr(row[col])}" for col in df.columns])
                chunks.append(DocChunk(id=f"{meta.digest}_row_{ridx}", table=meta.name, chunk_text=row_text, metadata={'role':'sample_row'}))
        stats_text = f"Statistics for {meta.name}:\n" + '\n'.join([f" - {col}: type={t}" for col,t in schema.items()])
        chunks.append(DocChunk(id=f"{meta.digest}_stats", table=meta.name, chunk_text=stats_text, metadata={'role':'stats'}))
        return chunks

# 4. Embedding and Vector Store
### EmbeddingModel abstracts embedding backend; VectorStore abstracts similarity search.

In [None]:
# --- Rag Eu Lab Project: Part 4/5 ---
# Embedding and Vector Store
class EmbeddingModel:
    def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
        self.model = SentenceTransformer(model_name) if SentenceTransformer else None
        self.tfidf = TfidfVectorizer(max_features=16384) if self.model is None else None

    def fit_transform(self, docs: Sequence[str]):
        if self.model: return self.model.encode(list(docs), show_progress_bar=False)
        else: return self.tfidf.fit_transform(docs).toarray()

    def transform(self, docs: Sequence[str]):
        if self.model: return self.model.encode(list(docs), show_progress_bar=False)
        else: return self.tfidf.transform(docs).toarray()

class VectorStore:
    def __init__(self, embedding_dim: Optional[int] = None):
        self.embeddings = None
        self.ids: List[str] = []
        self.metadatas: List[Dict[str,str]] = []
        self._index = None

    def build(self, vectors, ids: List[str], metadatas: List[Dict[str,str]]):
        import numpy as _np
        self.embeddings = _np.array(vectors, dtype=_np.float32)
        self.ids = ids
        self.metadatas = metadatas
        self._index = faiss.IndexFlatIP(len(vectors[0])) if faiss else NearestNeighbors(n_neighbors=10, metric='cosine')
        if faiss: faiss.normalize_L2(self.embeddings); self._index.add(self.embeddings)
        else: self._index.fit(self.embeddings)

    def search(self, vectors, k: int = 5) -> List[List[Tuple[str, float]]]:
        import numpy as _np
        q = _np.array(vectors, dtype=_np.float32)
        results = []
        if faiss:
            faiss.normalize_L2(q)
            distances, indices = self._index.search(q,k)
            for drow,irow in zip(distances,indices):
                results.append([(self.ids[idx], float(dist)) for dist,idx in zip(drow,irow) if idx>=0])
        else:
            distances, indices = self._index.kneighbors(q, n_neighbors=k)
            for drow,irow in zip(distances.tolist(), indices.tolist()):
                results.append([(self.ids[idx], float(1.0-dist)) for dist,idx in zip(drow,irow)])
        return results

# 5. Retriever, LLM, RAG Pipeline, Demo
### Combines embeddings + vector store + stub LLM to answer queries with provenance.

In [None]:
# --- Rag Eu Lab Project: Part 5/5 ---
# Retriever + LLM + Pipeline
class Retriever:
    def __init__(self, embedder: EmbeddingModel):
        self.embedder = embedder
        self.vstore = VectorStore()
        self.id_to_chunk: Dict[str, DocChunk] = {}

    def index_documents(self, chunks: Sequence[DocChunk]):
        texts = [c.chunk_text for c in chunks]
        ids = [c.id for c in chunks]
        metas = [c.metadata for c in chunks]
        vectors = self.embedder.fit_transform(texts)
        self.vstore.build(vectors, ids, metas)
        self.id_to_chunk = {c.id:c for c in chunks}

    def retrieve(self, query: str, k:int=5) -> List[Tuple[DocChunk,float]]:
        qvec = self.embedder.transform([query])
        hits = self.vstore.search(qvec,k=k)[0]
        return [(self.id_to_chunk[hid], score) for hid,score in hits if hid in self.id_to_chunk]

class LLMClient:
    def __init__(self, provider='auto'):
        self.have_openai = False

    def answer(self, prompt: str, max_tokens: int = 256) -> str:
        lines = [l.strip() for l in prompt.splitlines() if l.strip()]
        salient = [l for l in lines if 'Table:' in l or ':' in l]
        return f"(stub-answer) Based on documents: {' '.join(salient[:6])[:800]}"

class RAGPipeline:
    def __init__(self, retriever: Retriever, llm: LLMClient):
        self.retriever = retriever
        self.llm = llm

    def _assemble_prompt(self, query: str, hits: List[Tuple[DocChunk,float]]) -> str:
        header = [f"Query: {query}", "---Context---"]
        ctx_lines = []
        for i,(chunk,score) in enumerate(hits):
            ctx_lines.append(f"[Source {i+1}] table={chunk.table} id={chunk.id} score={score:.4f}")
            ctx_lines.append(chunk.chunk_text)
            ctx_lines.append('---')
        instruction = ["Answer concisely using ONLY context."]
        return '\n'.join(header+ctx_lines+instruction)

    def answer(self, query: str, k:int=5) -> Dict[str,object]:
        hits = self.retriever.retrieve(query,k=k)
        prompt = self._assemble_prompt(query,hits)
        resp = self.llm.answer(prompt)
        provenance = [{'id':c.id,'table':c.table,'score':s,'role':c.metadata.get('role','')} for c,s in hits]
        return {'answer':resp,'provenance':provenance,'query':query}

# --- Demo functions for notebook usage ---
def build_demo_pipeline(n_tables:int=50) -> RAGPipeline:
    loader = TableLoader(n_tables=n_tables)
    normalizer = SchemaNormalizer()
    embedder = EmbeddingModel()
    retriever = Retriever(embedder)
    all_chunks = []
    for meta,df in loader.simulate_tables():
        all_chunks.extend(normalizer.tabular_to_text(meta, df=df, n_samples=1))
    retriever.index_documents(all_chunks)
    llm = LLMClient()
    return RAGPipeline(retriever,llm)

def demo_run(pipeline:RAGPipeline, queries:Sequence[str]):
    for q in queries:
        out = pipeline.answer(q,k=5)
        print(f"\nQuery: {q}")
        print(f"Answer: {out['answer']}")
        print("Provenance:")
        for p in out['provenance']:
            print(f" - {p['id']} (table={p['table']}) score={p['score']:.4f} role={p['role']}")