In [1]:
import os
import json
import hashlib
import requests
import numpy as np
import faiss
from tqdm import tqdm
from pypdf import PdfReader
import ipywidgets as widgets
from IPython.display import display, Markdown, clear_output

In [2]:
#setting 
SILICONFLOW_API_KEY = "sk-nxcwqkxhmpvfahoheegvcdvrqsslbmbrluoqpxglcnicrxrc"
BASE_URL = "https://api.siliconflow.cn/v1"

EMBEDDING_MODEL = "BAAI/bge-m3"
CHAT_MODEL = "Qwen/Qwen3-8B"

DOC_PATH = "Kindle_User's_Guide_English.pdf"
INDEX_DIR = "./rag_index"

assert SILICONFLOW_API_KEY.startswith("sk-"), "❌ API Key 格式不对（需以 sk- 开头）"
import os
os.makedirs(INDEX_DIR, exist_ok=True)

print("✅ 配置已载入")

✅ 配置已载入


In [3]:
REQUEST_TIMEOUT = 25
RETRY = 1
SLEEP_BETWEEN = 0.8

# 如在公司/学校网络经常被代理劫持，建议先关掉系统代理
USE_SYSTEM_PROXY = False        # False = 不使用系统代理
VERIFY_SSL = True               # 如仍然 SSLEOFError，可临时改为 False（风险自担）

# requests 参数
_REQ_KW = {
    "timeout": REQUEST_TIMEOUT,
    "verify": VERIFY_SSL
}
if not USE_SYSTEM_PROXY:
    _REQ_KW["proxies"] = {}     # 显式禁用系统代理

if not VERIFY_SSL:
    # 关闭证书校验的告警
    requests.packages.urllib3.disable_warnings()

def _headers():
    return {"Authorization": f"Bearer {SILICONFLOW_API_KEY}", "Content-Type": "application/json"}

def _post_json(url, payload):
    def once(u):
        resp = requests.post(u, headers=_headers(), json=payload, **_REQ_KW)
        if resp.status_code >= 400:
            try: detail = resp.json()
            except Exception: detail = resp.text
            raise RuntimeError(f"{resp.status_code} {resp.reason} @ {u}\n{detail}")
        return resp.json()

    try:
        return once(url)
    except Exception as e1:
        msg = str(e1)
        # 典型网络类错误：SSL EOF、ProxyError、超时，先切另一个域名试一次
        net_err_signals = ("SSLEOFError", "SSL:", "ProxyError", "Max retries", "timed out", "Connection aborted", "EOF")
        if any(sig in msg for sig in net_err_signals):
            alt = "https://api.siliconflow.cn/v1" if "ai" in BASE_URL else "https://api.siliconflow.ai/v1"
            if alt not in url:
                time.sleep(SLEEP_BETWEEN)
                try:
                    return once(url.replace(BASE_URL, alt))
                except Exception as e2:
                    raise RuntimeError(f"[切域失败] {e2}") from e2
        # 同域再补一次重试
        last = e1
        for _ in range(RETRY):
            time.sleep(SLEEP_BETWEEN)
            try:
                return once(url)
            except Exception as e3:
                last = e3
        raise RuntimeError(f"[请求失败] {last}") from last

def embeddings(texts):
    if not texts: return []
    out = _post_json(f"{BASE_URL}/embeddings", {"model": EMBEDDING_MODEL, "input": texts})
    return [d["embedding"] for d in out.get("data", [])]

def chat(messages, temperature=0.3, max_tokens=None):
    payload = {
        "model": CHAT_MODEL,
        "messages": messages,
        "temperature": temperature,
        "stream": False
    }
    if max_tokens is not None:
        payload["max_tokens"] = max_tokens
    out = _post_json(f"{BASE_URL}/chat/completions", payload)
    return out["choices"][0]["message"]["content"]

# 快速自检：分别打两个站点，哪边通用哪边
def quick_self_test():
    for u in [BASE_URL, ("https://api.siliconflow.cn/v1" if "ai" in BASE_URL else "https://api.siliconflow.ai/v1")]:
        try:
            r = requests.get(f"{u}/models", headers=_headers(), **_REQ_KW)
            r.raise_for_status()
            return True, f"API可用：{u}"
        except Exception as e:
            last = f"{u} -> {e}"
    return False, f"API异常：{last}"

ok, msg = quick_self_test()
print(f"[check] {msg}")

[check] API可用：https://api.siliconflow.cn/v1


In [4]:
def split_text(text, chunk=900, overlap=150):
    text = (text or "").replace("\r", "")
    out, n, i = [], len(text), 0
    while i < n:
        j = min(i + chunk, n)
        part = text[i:j].strip()
        if part: out.append(part)
        if j == n: break
        i = max(0, j - overlap)
    return out

def build_index():
    print("[ingest] loading pdf...")
    reader = PdfReader(DOC_PATH)
    chunks = []
    for i, page in enumerate(reader.pages):
        text = page.extract_text() or ""
        for ci, c in enumerate(split_text(text)):
            chunks.append((hashlib.sha1(f"{i}:{ci}:{c[:20]}".encode()).hexdigest(), c, i+1))

    print(f"[ingest] total chunks: {len(chunks)}")

    dim = len(embeddings(["probe"])[0])
    index = faiss.IndexFlatIP(dim)
    metas, vecs = [], []

    for i in tqdm(range(0, len(chunks), 64)):
        batch = chunks[i:i+64]
        ids = [x[0] for x in batch]
        texts = [x[1] for x in batch]
        pages = [x[2] for x in batch]

        embs = embeddings(texts)
        vecs.extend(embs)
        metas.extend([{"id": cid, "text": t, "page": p} for cid, t, p in zip(ids, texts, pages)])

    vecs = np.array(vecs, dtype=np.float32)
    vecs /= (np.linalg.norm(vecs, axis=1, keepdims=True) + 1e-12)
    index.add(vecs)

    faiss.write_index(index, f"{INDEX_DIR}/index.faiss")
    with open(f"{INDEX_DIR}/meta.jsonl", "w", encoding="utf-8") as f:
        for m in metas: f.write(json.dumps(m, ensure_ascii=False) + "\n")

    print("✅ 索引构建完成")

if not os.path.exists(f"{INDEX_DIR}/index.faiss"):
    build_index()
else:
    print("✅ 索引已存在")


✅ 索引已存在


In [5]:
def rag_ask(query, top_k=5):
    index = faiss.read_index(f"{INDEX_DIR}/index.faiss")
    metas = [json.loads(l) for l in open(f"{INDEX_DIR}/meta.jsonl", encoding="utf-8")]

    q = embeddings([query])[0]
    q = np.array([q], dtype=np.float32)
    q /= (np.linalg.norm(q, axis=1, keepdims=True) + 1e-12)

    D, I = index.search(q, top_k)
    hits = [metas[i] for i in I[0]]

    ctx = "\n\n---\n\n".join([f"[p.{h['page']}]\n{h['text']}" for h in hits])

    messages = [
        {"role": "system", "content": "你是 Kindle 智能客服，只能根据参考内容回答；不确定时回答“我不知道”。"},
        {"role": "user", "content": f"问题：{query}\n\n参考内容：\n{ctx}"}
    ]
    ans = chat(messages)

    return ans, hits


In [6]:
q_input = widgets.Text(
    value="示例：如何在 Kindle 调整字体大小？",
    placeholder="在这里输入你的问题（回车 = 提交）",
    description="问题",
    disabled=False,
    layout=widgets.Layout(width="100%")
)
btn = widgets.Button(description="提问", button_style="primary")
topk_slider = widgets.IntSlider(value=5, min=1, max=10, step=1, description="TopK")
show_src = widgets.Checkbox(value=True, description="显示来源")
out = widgets.Output(layout={"border": "1px solid #444", "padding": "6px"})

def _submit_query(_=None):
    with out:
        clear_output()
        question = (q_input.value or "").strip()
        if not question:
            display(Markdown("> ⚠️ 请输入问题"))
            return
        display(Markdown(f"**Q：{question}**"))
        display(Markdown("> 正在检索与生成回答，请稍候…"))
        try:
            ans, sources = rag_ask(question, top_k=topk_slider.value)
            clear_output()
            display(Markdown(f"**Q：{question}**"))
            display(Markdown(f"\n**A：**\n\n{ans}"))
            if show_src.value:
                md = ["\n---\n**参考来源**"]
                for i, s in enumerate(sources, 1):
                    snippet = s["text"][:300] + ("..." if len(s["text"]) > 300 else "")
                    md.append(f"\n{i}. [{os.path.basename(DOC_PATH)}] p.{s['page']}\n\n> {snippet}")
                display(Markdown("\n".join(md)))
        except Exception as e:
            display(Markdown(f"> ❌ 出错：`{e}`"))

#click submit button Enter(Text.on_submit) have the same meaning
btn.on_click(lambda b: _submit_query())
q_input.on_submit(_submit_query)

#UI
ui = widgets.VBox([
    q_input,
    widgets.HBox([btn, topk_slider, show_src]),
    out
])
display(ui)

  q_input.on_submit(_submit_query)


VBox(children=(Text(value='示例：如何在 Kindle 调整字体大小？', description='问题', layout=Layout(width='100%'), placeholder=…