# 📘 RAG Biography (Colab Edition)

**One-click end-to-end:** install deps → mount Drive → set project path → generate sample PDFs → load → split → embed → build Chroma collections `(username)_(character_name)` → quick retrieval.

> Default embeddings: **HuggingFace BGE small zh v1.5** (no API needed). You can switch to OpenAI/DashScope by editing the *Embeddings* cell.


In [1]:
# # # ===== Clean + Pin =====
# import sys, subprocess, IPython

# def sh(cmd):
#     print(">>", cmd)
#     subprocess.run(cmd, shell=True, check=False)

# # 1) 清 spacy 家族（会拉 weasel/srsly/wasabi，常搅版本）
# sh("pip -q uninstall -y spacy thinc catalogue srsly cymem preshed murmurhash wasabi blis typer langcodes || true")

# # 2) 固定 requests（避免和系统包冲突）
# sh("pip -q install -U requests==2.32.2")

# # 3) 关键：强制把 numpy 降到 1.26.4（并避免再次被别的包升回去）
# sh('pip -q install --force-reinstall --no-build-isolation "numpy==1.26.4"')

# # 4) 你的依赖
# sh("pip -q install -U chromadb==0.4.24 "
#    "langchain==0.2.11 langchain-core==0.2.26 langchain-community==0.2.10 "
#    "langchain-openai==0.1.17 pypdf tiktoken")

# # 5) 校验依赖冲突（可选）
# sh("pip -q check || true")

# sh("pip -q install -U reportlab python-dotenv")

# print("\n✅ 安装完成：即将重启内核以切换到 NumPy 1.26.4 ...")
# IPython.get_ipython().kernel.do_shutdown(restart=True)  # 自动重启


In [2]:
from google.colab import drive
from pathlib import Path
drive.mount('/content/drive')

PROJ = Path('/content/drive/MyDrive/rag_bio_project').resolve()
print('Project path:', PROJ)

for d in [PROJ, PROJ/'src', PROJ/'data_pdfs', PROJ/'data_txt', PROJ/'index', PROJ/'notebooks']:
    d.mkdir(parents=True, exist_ok=True)
print('✅ Folders ready.')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Project path: /content/drive/MyDrive/rag_bio_project
✅ Folders ready.


In [3]:
from pathlib import Path
import textwrap

SRC = PROJ/'src'

def ensure_file(path: Path, content: str):
    if not path.exists():
        path.write_text(textwrap.dedent(content), encoding='utf-8')
        print('[WRITE]', path)
    else:
        print('[SKIP ]', path)

# --- loader.py ---
ensure_file(SRC/'loader.py', '''
from pathlib import Path
from typing import List, Optional, Iterable, Dict
import hashlib, re, time
from urllib.parse import urlsplit, urlunsplit
from langchain_core.documents import Document
from langchain_community.document_loaders import PyPDFLoader, TextLoader, WebBaseLoader
def _normalize_text(s: str) -> str:
    if not s: return ''
    s = s.replace('\ufeff', '').replace('\xa0', ' ')
    s = re.sub(r'[ \t]+', ' ', s)
    s = re.sub(r'\n{3,}', '\n\n', s)
    return s.strip()
def _clean_url(u: str) -> str:
    u = u.strip().replace(' ', '')
    parts = list(urlsplit(u))
    if not parts[0]: parts[0] = 'https'
    return urlunsplit(parts)
def _doc_hash(content: str) -> str:
    return hashlib.md5(content.encode('utf-8', errors='ignore')).hexdigest()[:16]
def _filter_content(text: str, min_chars: int, max_chars: Optional[int]) -> bool:
    n = len(text)
    if n < min_chars: return False
    if max_chars is not None and n > max_chars: return False
    return True
def load_sources(pdf_dir: str='data_pdfs', txt_dir: Optional[str]='data_txt', urls: Optional[List[str]] = None,
                 recursive: bool=True, txt_extensions: Iterable[str]=( '.txt', '.md'), txt_encoding: str='utf-8',
                 pages: Optional[str]=None, min_chars: int=50, max_chars: Optional[int]=None,
                 headers: Optional[Dict[str,str]] = None, timeout: int=15, max_retries: int=2) -> List[Document]:
    docs: List[Document] = []
    total_raw = 0
    pdir = Path(pdf_dir)
    if pdir.exists():
        for p in (pdir.rglob('*.pdf') if recursive else pdir.glob('*.pdf')):
            try:
                loader = PyPDFLoader(str(p))
                loaded = loader.load()
                if pages:
                    parts = [None if x in ('', 'None', None) else int(x) for x in pages.split(':')]
                    start = parts[0] if len(parts)>0 else None
                    stop  = parts[1] if len(parts)>1 else None
                    step  = parts[2] if len(parts)>2 else None
                    loaded = [d for d in loaded[slice(start, stop, step)]]
                for d in loaded:
                    d.page_content = _normalize_text(d.page_content)
                    d.metadata['source'] = str(p)
                    d.metadata['source_type'] = 'pdf'
                    d.metadata['loader_info'] = {'type':'PyPDFLoader','pages':pages}
                    if _filter_content(d.page_content, min_chars, max_chars): docs.append(d)
                total_raw += len(loaded)
            except Exception as e:
                print(f'[WARN] Skip PDF {p}: {e}')
    if txt_dir:
        tdir = Path(txt_dir)
        if tdir.exists():
            exts = {e.lower() for e in txt_extensions}
            for p in (tdir.rglob('*') if recursive else tdir.glob('*')):
                if p.is_file() and p.suffix.lower() in exts:
                    try:
                        loader = TextLoader(str(p), encoding=txt_encoding)
                        loaded = loader.load()
                        for d in loaded:
                            d.page_content = _normalize_text(d.page_content)
                            d.metadata['source'] = str(p)
                            d.metadata['source_type'] = 'txt'
                            d.metadata['loader_info'] = {'type':'TextLoader','encoding':txt_encoding}
                            if _filter_content(d.page_content, min_chars, max_chars): docs.append(d)
                        total_raw += len(loaded)
                    except Exception as e:
                        print(f'[WARN] Skip TXT {p}: {e}')
    if urls:
        urls = [u for u in (urls or []) if isinstance(u, str) and u.strip()]
        for u in urls:
            url = _clean_url(u)
            tries = 0
            while True:
                try:
                    loader = WebBaseLoader(url, requests_kwargs={'headers': headers or {'User-Agent':'Mozilla/5.0 (RAG-Loader/1.0)'}, 'timeout': timeout})
                    loaded = loader.load()
                    for d in loaded:
                        d.page_content = _normalize_text(d.page_content)
                        d.metadata['source'] = url
                        d.metadata['source_type'] = 'web'
                        d.metadata['loader_info'] = {'type':'WebBaseLoader','timeout':timeout,'headers':bool(headers)}
                        if _filter_content(d.page_content, min_chars, max_chars): docs.append(d)
                    total_raw += len(loaded)
                    break
                except Exception as e:
                    tries += 1
                    if tries > max_retries:
                        print(f'[WARN] Skip URL {url} after {max_retries} retries: {e}'); break
                    print(f'[INFO] Retry {tries}/{max_retries} for URL {url} due to: {e}')
    seen = set(); uniq: List[Document] = []
    for d in docs:
        h = _doc_hash(d.page_content)
        key = (d.metadata.get('source'), h)
        if key not in seen:
            seen.add(key); uniq.append(d)
    print(f'[INFO] Loader complete. raw={total_raw}, kept={len(uniq)}')
    return uniq
''')

# --- splitter.py ---
ensure_file(SRC/'splitter.py', '''
from typing import List, Dict
from dataclasses import dataclass
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
@dataclass
class SplitterProfile:
    chunk_size: int; chunk_overlap: int; separators: List[str]
PDF_PROFILE = SplitterProfile(1200, 200, ['\\n\\n','\\n','。','！','？','.', '!', '?', ' ', ''])
TXT_PROFILE = SplitterProfile(1000, 150, ['\\n\\n','\\n','.', '?', '!', '。','？','！',' ', ''])
WEB_PROFILE = SplitterProfile(900, 150,  ['\\n\\n','\\n','。','！','？','.', '!', '?', ' ', ''])
PROFILE_MAP: Dict[str, SplitterProfile] = {'pdf':PDF_PROFILE,'txt':TXT_PROFILE,'web':WEB_PROFILE}
def _build_splitter(p: SplitterProfile) -> RecursiveCharacterTextSplitter:
    overlap = p.chunk_overlap if p.chunk_overlap < p.chunk_size else max(0, min(p.chunk_size//5, 200))
    if overlap != p.chunk_overlap: print(f'[WARN] overlap >= size; fallback to {overlap}')
    return RecursiveCharacterTextSplitter(chunk_size=p.chunk_size, chunk_overlap=overlap, separators=p.separators)
def split_documents_type_aware(docs: List[Document], default_type: str='pdf', verbose: bool=True) -> List[Document]:
    if verbose: print(f'[INFO] Type-aware splitting started. total_docs={len(docs)}, default_type={default_type}')
    splitter_cache: Dict[str, RecursiveCharacterTextSplitter] = {}
    def get_splitter(kind: str):
        typ = (kind or '').lower(); typ = typ if typ in PROFILE_MAP else default_type
        if typ not in splitter_cache:
            splitter_cache[typ] = _build_splitter(PROFILE_MAP[typ])
            if verbose:
                p = PROFILE_MAP[typ]; print(f"[INFO] Splitter ready for type='{typ}' (size={p.chunk_size}, overlap={p.chunk_overlap})")
        return splitter_cache[typ]
    out: List[Document] = []; per_source_index: Dict[str,int] = {}
    for d in docs:
        stype = (d.metadata.get('source_type') or default_type).lower()
        splitter = get_splitter(stype)
        chunks = splitter.split_documents([d])
        source_key = str(d.metadata.get('source', 'unknown'))
        start_idx = per_source_index.get(source_key, 0)
        for i, c in enumerate(chunks):
            c.metadata = dict(d.metadata) | {'chunk_index': start_idx + i, 'splitter_profile': stype}
            out.append(c)
        per_source_index[source_key] = start_idx + len(chunks)
        if verbose: print(f"[INFO] Split {stype} source={source_key} -> {len(chunks)} chunks (acc={len(out)})")
    if verbose: print(f'[INFO] Type-aware splitting done. total_chunks={len(out)}')
    return out
''')

# --- embeddings.py ---
ensure_file(SRC/'embeddings.py', '''
import os, re, time, hashlib
from dataclasses import dataclass
from typing import List, Tuple, Dict, Optional
from langchain_core.documents import Document
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain_community.embeddings import DashScopeEmbeddings
from chromadb import PersistentClient
@dataclass
class EmbeddingConfig:
    provider: str = 'hf'
    model: str = 'BAAI/bge-small-zh-v1.5'
    normalize: bool = True
def _sanitize_name(s: str, fallback_prefix: str='user') -> str:
    s2 = re.sub(r'[^a-zA-Z0-9_]+', '_', str(s or '')).strip('_')
    return s2 or f"{fallback_prefix}_{int(time.time())}"
def _doc_id(meta: Dict, idx: int) -> str:
    base = f"{meta.get('source','unknown')}|{meta.get('chunk_index', idx)}"
    return hashlib.md5(base.encode('utf-8', errors='ignore')).hexdigest()[:12] + f'_{idx}'
def build_embeddings(cfg: EmbeddingConfig):
    prov = cfg.provider.lower()
    if prov == 'hf':
        print(f"[INFO] Using HF embeddings: {cfg.model} (normalize={cfg.normalize})")
        return HuggingFaceBgeEmbeddings(model_name=cfg.model, encode_kwargs={'normalize_embeddings': cfg.normalize})
    elif prov == 'openai':
        if not os.getenv('OPENAI_API_KEY'): raise RuntimeError('OPENAI_API_KEY is not set.')
        model_name = cfg.model or 'text-embedding-3-small'
        print(f"[INFO] Using OpenAI embeddings: {model_name}")
        return OpenAIEmbeddings(model=model_name)
    elif prov == 'dashscope':
        if not os.getenv('DASHSCOPE_API_KEY'): raise RuntimeError('DASHSCOPE_API_KEY is not set.')
        model_name = cfg.model or 'text-embedding-v1'
        print(f"[INFO] Using DashScope embeddings: {model_name}")
        return DashScopeEmbeddings(model=model_name)
    else:
        raise ValueError(f'Unknown provider: {cfg.provider}')
def compute_vectors_once(chunks: List[Document], embedder):
    texts = [c.page_content for c in chunks]
    metas = [dict(c.metadata or {}) for c in chunks]
    ids = [_doc_id(m, i) for i, m in enumerate(metas)]
    print(f"[INFO] Computing embeddings for {len(texts)} chunks...")
    vectors = embedder.embed_documents(texts)
    if not vectors or not vectors[0]: raise RuntimeError('Empty embeddings returned.')
    dim = len(vectors[0])
    emb_info = {'provider': type(embedder).__name__, 'dim': dim}
    print(f"[INFO] Embedding dim={dim}")
    return ids, texts, metas, vectors, emb_info
def persist_vectorstores_for_characters(ids, texts, metas, vectors, persist_dir, username, character_names, emb_meta, collection_prefix: Optional[str]=None):
    user_tag = _sanitize_name(username, 'user')
    created = []
    client: PersistentClient = PersistentClient(path=persist_dir)
    for cname in character_names:
        char_tag = _sanitize_name(cname, 'char')
        base_name = f"{user_tag}_{char_tag}"
        coll_name = base_name if not collection_prefix else _sanitize_name(f"{collection_prefix}_{base_name}", 'coll')
        print(f"[INFO] Creating/updating collection: {coll_name}")
        coll = client.get_or_create_collection(name=coll_name, metadata={'embedding': emb_meta})
        B = 256
        for i in range(0, len(ids), B):
            coll.add(ids=ids[i:i+B], documents=texts[i:i+B], metadatas=metas[i:i+B], embeddings=vectors[i:i+B])
        print(f"[INFO] Collection '{coll_name}' upserted with {len(ids)} items.")
        created.append(coll_name)
    print(f"[INFO] Done. Created/updated {len(created)} collections.")
    return created
def build_embeddings_and_vectorstores(chunks: List[Document], username: str, character_names: List[str], persist_dir: str='index', emb_cfg: EmbeddingConfig = EmbeddingConfig()):
    if not character_names: raise ValueError('character_names must not be empty.')
    embedder = build_embeddings(emb_cfg)
    ids, texts, metas, vectors, emb_info = compute_vectors_once(chunks, embedder)
    return persist_vectorstores_for_characters(ids, texts, metas, vectors, persist_dir, username, character_names, {'provider':emb_cfg.provider,'model':emb_cfg.model,'dim':emb_info['dim'],'normalize':emb_cfg.normalize})
''')

# --- vectorstore.py ---
ensure_file(SRC/'vectorstore.py', '''
from chromadb import PersistentClient
def get_collection(persist_dir: str, collection_name: str):
    client = PersistentClient(path=persist_dir)
    return client.get_collection(collection_name)
def quick_query(persist_dir: str, collection_name: str, query_text: str, n_results: int=5):
    coll = get_collection(persist_dir, collection_name)
    return coll.query(query_texts=[query_text], n_results=n_results, include=['documents','metadatas','distances'])
''')

print('✅ src/ files ensured.')

[SKIP ] /content/drive/MyDrive/rag_bio_project/src/loader.py
[SKIP ] /content/drive/MyDrive/rag_bio_project/src/splitter.py
[SKIP ] /content/drive/MyDrive/rag_bio_project/src/embeddings.py
[SKIP ] /content/drive/MyDrive/rag_bio_project/src/vectorstore.py
✅ src/ files ensured.


In [4]:
import sys
from pathlib import Path
CANDIDATES = [PROJ, Path.cwd(), Path('/content/drive/MyDrive/rag_bio_project')]
project_src=None
for base in CANDIDATES:
    src = base/'src'
    if src.exists() and (src/'loader.py').exists():
        project_src=src.resolve(); break
assert project_src, 'src/ not found. Check PROJ path.'
sys.path.insert(0, str(project_src))
print('[INFO] Added to sys.path:', project_src)

from loader import load_sources
from splitter import split_documents_type_aware
from embeddings import EmbeddingConfig, build_embeddings_and_vectorstores
from vectorstore import quick_query
print('✅ Imports OK')

[INFO] Added to sys.path: /content/drive/MyDrive/rag_bio_project/src




✅ Imports OK


In [5]:
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from dotenv import load_dotenv
import os

DATA_PDFS = PROJ/'data_pdfs'
DATA_TXT  = PROJ/'data_txt'
INDEX_DIR = PROJ/'index'
ENV_PATH  = PROJ/' .env'
load_dotenv(ENV_PATH)
print('[INFO] ENV loaded. OPENAI_API_KEY exists:', bool(os.getenv('OPENAI_API_KEY')))

def make_pdf(path, title, lines):
    c = canvas.Canvas(str(path), pagesize=letter)
    width, height = letter
    y = height - 72
    c.setFont('Times-Roman', 12)
    c.drawString(72, y, title); y -= 24
    for ln in lines:
        for seg in ln.split('\n'):
            c.drawString(72, y, seg); y -= 18
            if y < 72:
                c.showPage(); y = height - 72; c.setFont('Times-Roman', 12)
    c.save()

make_pdf(DATA_PDFS/'liqing.pdf', 'Li Qing Biography', [
    'Born in Suzhou in 1990; studied Economics and became a product manager.',
    'Finance: ~350k CNY income; index funds; medium risk preference.',
    'Marriage: married in 2018; one daughter in 2022.',
    'Experience: 2020-2022 SEA market localization projects.'
])
make_pdf(DATA_PDFS/'wangmu.pdf', 'Wang Mu Biography', [
    'Born in Chengdu in 1985; LL.B then MFin; shifted to NGO operations.',
    'Finance: ~200k CNY income; house + money-market funds; conservative.',
    'Marriage: single; passionate about education equity.',
    'Experience: 2015-2019 broker risk control; compliance expertise.'
])
print('[INFO] PDFs generated at', DATA_PDFS)

from pathlib import Path

DATA_TXT = Path("data_txt")
DATA_TXT.mkdir(parents=True, exist_ok=True)

liqing = "\n".join([
    "Born in Suzhou in 1990; studied Economics and became a product manager.",
    "Finance: ~350k CNY income; index funds; medium risk preference.",
    "Marriage: married in 2018; one daughter in 2022.",
    "Experience: 2020-2022 SEA market localization projects.",
])
(DATA_TXT / "liqing.txt").write_text(liqing, encoding="utf-8")

wangmu = "\n".join([
    "Born in Chengdu in 1985; LL.B then MFin; shifted to NGO operations.",
    "Finance: ~200k CNY income; house + money-market funds; conservative.",
    "Marriage: single; passionate about education equity.",
    "Experience: 2015-2019 broker risk control; compliance expertise.",
])
(DATA_TXT / "wangmu.txt").write_text(wangmu, encoding="utf-8")

print('[INFO] TXT samples ensured at', DATA_TXT)

[INFO] ENV loaded. OPENAI_API_KEY exists: False
[INFO] PDFs generated at /content/drive/MyDrive/rag_bio_project/data_pdfs
[INFO] TXT samples ensured at data_txt


In [6]:
from pathlib import Path
import importlib, re, json, sys

PROJ = Path('/content/drive/MyDrive/rag_bio_project').resolve()
SRC = PROJ/'src'
assert SRC.exists(), f"src not found: {SRC}"

# 1) patch loader.py: make loader_info a JSON string (not a dict)
lp = SRC/'loader.py'
ls = lp.read_text(encoding='utf-8')

if 'import json' not in ls:
    ls = ls.replace('from urllib.parse import urlsplit, urlunsplit',
                    'from urllib.parse import urlsplit, urlunsplit\nimport json')

# PyPDFLoader
ls = re.sub(
    r"d\.metadata\['loader_info'\]\s*=\s*\{[^}]+\}",
    "d.metadata['loader_info'] = json.dumps({'type':'PyPDFLoader','pages':pages}, ensure_ascii=False)",
    ls
)
# TextLoader
ls = re.sub(
    r"d\.metadata\['loader_info'\]\s*=\s*\{[^}]+\}",
    "d.metadata['loader_info'] = json.dumps({'type':'TextLoader','encoding':txt_encoding}, ensure_ascii=False)",
    ls,
    count=1  # only replace the TXT occurrence once
)
# WebBaseLoader
ls = re.sub(
    r"d\.metadata\['loader_info'\]\s*=\s*\{[^}]+\}",
    "d.metadata['loader_info'] = json.dumps({'type':'WebBaseLoader','timeout':timeout,'headers':bool(headers)}, ensure_ascii=False)",
    ls,
    count=1  # only replace the Web occurrence once
)

lp.write_text(ls, encoding='utf-8')
print("✔ loader.py patched")

# 2) patch embeddings.py: (a) collection metadata -> JSON string (b) sanitize per-doc metadatas before add()
ep = SRC/'embeddings.py'
es = ep.read_text(encoding='utf-8')

if 'import json' not in es:
    es = es.replace('from chromadb import PersistentClient',
                    'from chromadb import PersistentClient\nimport json')

# add a sanitizer
if '_sanitize_meta_for_chroma' not in es:
    es = es.replace(
        'def compute_vectors_once(chunks: List[Document], embedder):',
        """def _sanitize_meta_for_chroma(m: dict) -> dict:
    out = {}
    for k, v in (m or {}).items():
        if isinstance(v, (str, int, float, bool)):
            out[k] = v
        else:
            out[k] = json.dumps(v, ensure_ascii=False)
    return out

def compute_vectors_once(chunks: List[Document], embedder):"""
    )

# ensure collection metadata uses JSON string
es = es.replace(
    "metadata={'embedding': emb_meta}",
    "metadata={'embedding': json.dumps(emb_meta, ensure_ascii=False)}"
)

# sanitize metas list before coll.add
es = re.sub(
    r"for i in range\(0, len\(ids\), B\):\s*"
    r"coll\.add\(ids=ids\[i:i\+B\], documents=texts\[i:i\+B\], metadatas=metas\[i:i\+B\], embeddings=vectors\[i:i\+B\]\)",
    "for i in range(0, len(ids), B):\n"
    "            batch_metas = [ _sanitize_meta_for_chroma(m) for m in metas[i:i+B] ]\n"
    "            coll.add(ids=ids[i:i+B], documents=texts[i:i+B], metadatas=batch_metas, embeddings=vectors[i:i+B])",
    es
)

ep.write_text(es, encoding='utf-8')
print("✔ embeddings.py patched")

# 3) reload modules
sys.path.insert(0, str(SRC))
import loader, embeddings
import importlib
importlib.reload(loader)
importlib.reload(embeddings)
from embeddings import EmbeddingConfig, build_embeddings_and_vectorstores
print("✅ Patch applied & modules reloaded.")


✔ loader.py patched
✔ embeddings.py patched
✅ Patch applied & modules reloaded.


In [7]:
docs = load_sources(pdf_dir=str(DATA_PDFS), txt_dir=str(DATA_TXT), urls=None)
print('[INFO] loaded docs:', len(docs))
chunks = split_documents_type_aware(docs, default_type='pdf', verbose=True)
print('[INFO] chunks:', len(chunks))

username = 'demo_user'
character_names = ['LiQing', 'WangMu']

emb_cfg = EmbeddingConfig(provider='hf', model='BAAI/bge-small-zh-v1.5', normalize=True)
# Optional:
# emb_cfg = EmbeddingConfig(provider='openai', model='text-embedding-3-small')
# emb_cfg = EmbeddingConfig(provider='dashscope', model='text-embedding-v1')

created = build_embeddings_and_vectorstores(
    chunks, username=username, character_names=character_names, persist_dir=str(INDEX_DIR), emb_cfg=emb_cfg
)
print('[INFO] Collections created:', created)

[INFO] Loader complete. raw=4, kept=4, pdf_dir=/content/drive/MyDrive/rag_bio_project/data_pdfs, txt_dir=data_txt, urls=0
[INFO] loaded docs: 4
[INFO] Type-aware splitting started. total_docs=4, default_type=pdf
[INFO] Splitter ready for type='pdf' (size=1200, overlap=200)
[INFO] Split pdf source=/content/drive/MyDrive/rag_bio_project/data_pdfs/liqing.pdf -> 1 chunks (acc=1)
[INFO] Split pdf source=/content/drive/MyDrive/rag_bio_project/data_pdfs/wangmu.pdf -> 1 chunks (acc=2)
[INFO] Splitter ready for type='txt' (size=1000, overlap=150)
[INFO] Split txt source=data_txt/wangmu.txt -> 1 chunks (acc=3)
[INFO] Split txt source=data_txt/liqing.txt -> 1 chunks (acc=4)
[INFO] Type-aware splitting done. total_chunks=4
[INFO] chunks: 4
[INFO] Using HF embeddings: BAAI/bge-small-zh-v1.5 (normalize=True)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


[INFO] Computing embeddings for 4 chunks...
[INFO] Embedding dim=512


ERROR:chromadb.telemetry.product.posthog:Failed to send telemetry event ClientStartEvent: capture() takes 1 positional argument but 3 were given
ERROR:chromadb.telemetry.product.posthog:Failed to send telemetry event ClientCreateCollectionEvent: capture() takes 1 positional argument but 3 were given


[INFO] Creating/updating collection: demo_user_LiQing




InvalidDimensionException: Embedding dimension 512 does not match collection dimensionality 1024

In [None]:
from pathlib import Path
import textwrap, importlib, sys, json

PROJ = Path('/content/drive/MyDrive/rag_bio_project').resolve()
SRC = PROJ/'src'
assert SRC.exists(), f"src not found: {SRC}"

vf = SRC/'vectorstore.py'
vf.write_text(textwrap.dedent(r'''
from chromadb import PersistentClient
import json

# We reuse the same embedder used for indexing
from embeddings import EmbeddingConfig, build_embeddings

def _embedder_from_coll_meta(meta: dict):
    info = meta.get("embedding")
    if isinstance(info, str):
        try:
            info = json.loads(info)
        except Exception:
            info = {}
    info = info or {}
    provider  = (info.get("provider") or "hf")
    model     = (info.get("model") or "BAAI/bge-small-zh-v1.5")
    normalize = bool(info.get("normalize", True))
    return build_embeddings(EmbeddingConfig(provider=provider, model=model, normalize=normalize))

def get_collection(persist_dir: str, collection_name: str):
    client = PersistentClient(path=persist_dir)
    return client.get_collection(collection_name)

def quick_query(persist_dir: str, collection_name: str, query_text: str, n_results: int = 5):
    client = PersistentClient(path=persist_dir)
    coll = client.get_collection(collection_name)
    embedder = _embedder_from_coll_meta(coll.metadata or {})
    # Compute query embedding with the SAME model/dim as the collection
    if hasattr(embedder, "embed_query"):
        qvec = embedder.embed_query(query_text)
    else:
        qvec = embedder.embed_documents([query_text])[0]
    return coll.query(query_embeddings=[qvec], n_results=n_results,
                      include=["documents","metadatas","distances"])
'''), encoding='utf-8')

# 热重载
sys.path.insert(0, str(SRC))
import vectorstore
import importlib
importlib.reload(vectorstore)
from vectorstore import quick_query
print("✅ vectorstore.py patched & reloaded.")


In [None]:
print('\n[TEST] Query LiQing collection:')
res = quick_query(str(INDEX_DIR), 'demo_user_LiQing', 'income and marriage', n_results=3)
for i,(doc,meta,dist) in enumerate(zip(res.get('documents',[['']])[0], res.get('metadatas',[['']])[0], res.get('distances',[['']])[0])):
    print('-'*60)
    print('rank', i+1, 'dist', dist)
    print('source', meta.get('source'))
    print(doc[:200].replace('\n',' '))

print('\n[TEST] Query WangMu collection:')
res = quick_query(str(INDEX_DIR), 'demo_user_WangMu', 'risk control and marriage', n_results=3)
for i,(doc,meta,dist) in enumerate(zip(res.get('documents',[['']])[0], res.get('metadatas',[['']])[0], res.get('distances',[['']])[0])):
    print('-'*60)
    print('rank', i+1, 'dist', dist)
    print('source', meta.get('source'))
    print(doc[:200].replace('\n',' '))

# 🔧 RAG Pipeline — Retriever → Middleware → Prompt → LLM (Appended)
以下单元格基于你现有工程，补充完整的检索中间层与流水线测试，并可一键跑通 Demo。

In [None]:
from chromadb import PersistentClient
client = PersistentClient(path="/content/drive/MyDrive/rag_bio_project/index")

for col in client.list_collections():
    coll = client.get_collection(col.name)
    print("name:", coll.name, "| id:", coll.id, "| count:", coll.count())
    print("meta:", coll.metadata)
    print("-"*60)


In [None]:
%pip -q install python-dotenv

from pathlib import Path
from dotenv import load_dotenv, dotenv_values
import os

PROJ = Path("/content/drive/MyDrive/rag_bio_project")  # 按你的真实路径
ENV_PATH = PROJ / ".env"

# 读取看一下是否真的拿到 key
cfg = dotenv_values(ENV_PATH)      # 只读取，不写环境
print("[.env keys]", list(cfg.keys()))
print("[OPENAI_API_KEY startswith sk?]", str(cfg.get("OPENAI_API_KEY",""))[:7])

# 真正写入当前进程的环境变量（override=True 覆盖已有值）
load_dotenv(ENV_PATH, override=True)

print("[verify] OPENAI_API_KEY set:", bool(os.getenv("OPENAI_API_KEY")))


In [None]:

# 0) 安装依赖（如已安装可跳过）
%pip -q install -U numpy==1.26.4 chromadb==0.4.24 langchain==0.2.11 langchain-core==0.2.26         langchain-community==0.2.10 langchain-openai==0.1.17 pypdf tiktoken
# %pip -q install -U langchain-ollama  # 如果要用本地 Ollama 模型
print("Deps OK")


In [None]:

# 1) 路径设置：指向我们刚刚生成的工程目录
from pathlib import Path
PROJ = Path("/content/drive/MyDrive/rag_bio_project")
SRC = PROJ/"src"
INDEX_DIR = PROJ/"index"     # 指向你的 Chroma 索引目录（Drive 中同名也可改这里）
import sys
sys.path.append(str(SRC))
print("Project:", PROJ)
print("Index:", INDEX_DIR)


In [None]:

# 2) 列出向量库 collections 以确认可见
from chromadb import PersistentClient
client = PersistentClient(path=str(INDEX_DIR))
cols = [c.name for c in client.list_collections()]
print("Collections:", cols)


## ✅ Retriever 冒烟测试（高阈值 + MMR + 分级）

In [None]:
from pathlib import Path
from chromadb import PersistentClient
import json

print("INDEX_DIR =", INDEX_DIR)
client = PersistentClient(path=str(INDEX_DIR))
names = [c.name for c in client.list_collections()]
print("Collections:", names)

for n in names:
    coll = client.get_collection(n)
    meta = coll.metadata or {}
    emb = meta.get("embedding")
    if isinstance(emb, str):
        try: emb = json.loads(emb)
        except: pass
    print(f" - {n:<32} count={coll.count()}  embedding={emb}")


In [None]:
from retriever import retrieve

tmp = retrieve(
    persist_dir=str(INDEX_DIR),
    query_text="李青的年收入是多少？",
    k=10, strategy="similarity",
    fetch_k=50,
    score_threshold=0.0,     # 关闭阈值
)

scores = [round(x["score"], 4) for x in tmp.get("items", [])]
print("raw scores (top 10):", scores[:10])


In [None]:
from chromadb import PersistentClient
from retriever import _embedder_from_coll_meta  # 我们代码里已有

client = PersistentClient(path=str(INDEX_DIR))
# 挑一个你确认有“李青”内容的集合名
cname = [n for n in names if "LiQing" in n or "liqing" in n.lower()][0]
coll = client.get_collection(cname)

embedder = _embedder_from_coll_meta(coll.metadata or {})
q = "李青 年 收入"
qvec = (embedder.embed_query(q)
        if hasattr(embedder, "embed_query")
        else embedder.embed_documents([q])[0])

qr = coll.query(query_embeddings=[qvec], n_results=5,
                include=["documents","distances","metadatas"])
print("docs:", qr["documents"][0][:2])
print("dists:", qr["distances"][0][:2])


In [None]:

from retriever import retrieve
question = "李青的年收入是多少？"
res = retrieve(persist_dir=str(INDEX_DIR), query_text=question,
               k=5, strategy="mmr", strictness="strict")
print("Route:", res.get("route"))
for it in res.get("items", [])[:3]:
    print(it["grade"], f"{it['score']:.3f}", "src:", (it["metadata"] or {}).get("source"))


## 🧩 Middleware：角色识别 / 怀疑度（占位） / 人格（占位） / 复查（占位）

In [None]:

# from middleware import detect_characters_from_question, VerificationConfig, verify_answer_against_context
# det = detect_characters_from_question("请比较LiQing与WangMu的收入", persist_dir=str(INDEX_DIR))
# print("Role detection:", det)
# # 复查占位演示（默认不启用）
# vres = verify_answer_against_context("dummy answer", res.get("items", []), VerificationConfig(enabled=False))
# print("Verification (disabled):", vres)


## 🧠 Prompt 自动选择 + 预览

In [None]:

from prompting import build_prompt_messages_auto
msgs, info = build_prompt_messages_auto("What is LiQing's annual income?", res)
print("Mode:", info["mode"])
print("System msg preview:", msgs[0].content[:160])


## 🚀 一键流水线 Demo（需要 API Key）

In [None]:
from pipeline import PipelineConfig, run_pipeline
# from middleware import DeceptionConfig

cfg = PipelineConfig(
    persist_dir=str(INDEX_DIR),
    strictness="medium",              # 建议先用 medium，确认召回后再调高
    provider="openai", model="gpt-4o-mini", temperature=0.2,
    do_role_detection=True,
    # 可选：在这里开启/调整中间层
    # suspicion=SuspicionConfig(enabled=True, level=0.0),
    # verification=VerificationConfig(enabled=True, mode="presence", min_hits=1),
    # deception=DeceptionConfig(enabled=False),
)

# print("deception enabled?", cfg.deception.enabled)  # False
out = run_pipeline("李青的年收入是多少？", cfg)
print("Answer:", out["answer"][:500])
print("\nReferences:\n", out["references"])
print("\nPrompt mode:", out["prompt_mode"])
print("Route:", out["route"])


In [None]:
from pipeline import PipelineConfig, run_pipeline
# from middleware import DeceptionConfig

cfg = PipelineConfig(
    persist_dir=str(INDEX_DIR),
    strictness="medium",              # 建议先用 medium，确认召回后再调高
    provider="openai", model="gpt-4o-mini", temperature=0.2,
    do_role_detection=True,
    # 可选：在这里开启/调整中间层
    # suspicion=SuspicionConfig(enabled=True, level=0.0),
    # verification=VerificationConfig(enabled=True, mode="presence", min_hits=1),
    # deception=DeceptionConfig(enabled=False),
)

# print("deception enabled?", cfg.deception.enabled)  # False
out = run_pipeline("李青, 你结婚了吗?以李青的口吻回答", cfg)
print("Answer:", out["answer"][:500])
print("\nReferences:\n", out["references"])
print("\nPrompt mode:", out["prompt_mode"])
print("Route:", out["route"])


In [None]:
import os
print("OPENAI_API_KEY exists:", bool(os.getenv("OPENAI_API_KEY")))
print("OPENAI_ORG:", os.getenv("OPENAI_ORG"))
