# 階段 1：資料載入與前處理

In [1]:
from datasets import load_dataset

# 直接載入 SQuAD 2.0 官方版本
dataset = load_dataset("squad_v2")

# 檢查資料結構
print(dataset)

# 查看一筆樣本
print(dataset["train"][0])


DatasetDict({
    train: Dataset({
        features: ['id', 'title', 'context', 'question', 'answers'],
        num_rows: 130319
    })
    validation: Dataset({
        features: ['id', 'title', 'context', 'question', 'answers'],
        num_rows: 11873
    })
})
{'id': '56be85543aeaaa14008c9063', 'title': 'Beyoncé', 'context': 'Beyoncé Giselle Knowles-Carter (/biːˈjɒnseɪ/ bee-YON-say) (born September 4, 1981) is an American singer, songwriter, record producer and actress. Born and raised in Houston, Texas, she performed in various singing and dancing competitions as a child, and rose to fame in the late 1990s as lead singer of R&B girl-group Destiny\'s Child. Managed by her father, Mathew Knowles, the group became one of the world\'s best-selling girl groups of all time. Their hiatus saw the release of Beyoncé\'s debut album, Dangerously in Love (2003), which established her as a solo artist worldwide, earned five Grammy Awards and featured the Billboard Hot 100 number-one singles "C

In [2]:
from pydantic import BaseModel, Field
from typing import Optional
from langchain_core.documents import Document

# 定義資料模型
class testdata(BaseModel):
    id: str
    title: str
    context: str
    question: str
    answers: Optional[dict] = Field(default_factory=dict)


# 先抽樣 4000 筆作為初步測試
sample_data = dataset["train"].shuffle(seed=42).select(range(4000))

# 將資料轉換為 LangChain Document 格式
train_docs = []
qa_pairs = []
for item in sample_data:
    data = testdata(**item)
    # 建立檢索文檔
    doc = Document(
        page_content=f"Title: {data.title}\nContext: {data.context}", 
        metadata={"id": data.id}
    )
    train_docs.append(doc)

    # 建立 QA 配對資料（供評估使用）
    qa_pairs.append(
            {
                "id": data.id,
                "question": data.question,
                "gold label": data.answers
    })

# 階段 2：文本切分與嵌入 (FAISS index 構建)

In [None]:
# 使用 text splitter 切分 context
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
import faiss
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain.text_splitter import RecursiveCharacterTextSplitter

# 創建文本切分器，設定 chunk_size 和 chunk_overlap
splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=100)
docs = splitter.split_documents(train_docs)

# 創建 FAISS 向量庫
embeddings_model = HuggingFaceEmbeddings(
            model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
        )

# 建立FAISS索引 傳入測試字串，取得模型的向量維度參數
index = faiss.IndexFlatL2(len(embeddings_model.embed_query("test dim len")))

# 創建向量庫
vector_store = FAISS(embedding_function=embeddings_model,
                     index=index,
                     docstore=InMemoryDocstore(),
                     index_to_docstore_id={},)

# 將切分後的文件加入向量庫
vector_store.add_documents(docs)

# 階段 3：Retriever 創建

In [None]:
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
from nltk.tokenize import word_tokenize
import jieba
import nltk
# 下載 punkt_tab 資料包以支援 word_tokenize
nltk.download('punkt')
nltk.download('punkt_tab')

# 稀疏檢索器：BM25

# 定義中文 tokenizer（輸入資料如果是中文時可用）（英文使用nltk-word_tokenize）
def chinese_tokenizer(text: str):
    return list(jieba.cut(text))

# 使用 tokenizer 創建 BM25 retriever，注意必須正確傳入分詞函數
bm25_retriever = BM25Retriever.from_documents(docs, preprocess_func=word_tokenize, k = 10)

# 稠密檢索器：FAISS
faiss_retriever = vector_store.as_retriever(search_kwargs={"k":10},search_type="similarity")

# 混合檢索：Ensemble Retriever
hybrid_retriever = EnsembleRetriever(
    retrievers=[bm25_retriever, faiss_retriever],
    weights=[0.2,0.8]  # 可調整權重比例
)

### 載入llm 模型 與 創建 輸入改寫器、交叉比對重排序器

In [None]:
# LLM Model name:
model_name = "meta-llama/Llama-3.1-8B-Instruct"
# huggingface_token:
token = "Your_HuggingFace_Token"

In [None]:
import torch
from transformers import BitsAndBytesConfig
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from langchain_huggingface import HuggingFacePipeline

tokenizer = AutoTokenizer.from_pretrained(
    model_name,
    trust_remote_code=False,
    token=token,
)

device = "cuda" if torch.cuda.is_available() else "cpu"

#gpu 量化設定
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,  # 或 load_in_8bit=True
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.float16,
)

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map="auto",
    quantization_config=bnb_config,
    trust_remote_code=False,
    token=token,
)

generation_pipeline = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    return_full_text=False,
    max_new_tokens=128,
    temperature=0.1,
    repetition_penalty=1.1,
    pad_token_id = tokenizer.eos_token_id,
    eos_token_id = tokenizer.eos_token_id,
)

# 將 HuggingFace pipeline 封裝為 LangChain LLM
llm = HuggingFacePipeline(pipeline=generation_pipeline)


In [6]:
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain.retrievers import ContextualCompressionRetriever

# 交叉比對重排序器
reranker = HuggingFaceCrossEncoder(model_name="cross-encoder/ms-marco-MiniLM-L-6-v2")

# 交叉比對重排序器，取前3名
compressor = CrossEncoderReranker(model=reranker, top_n=3)
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor, base_retriever=hybrid_retriever
)

In [7]:
from langchain.chains import RetrievalQA
# 建立問答鏈
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    retriever=compression_retriever,
    chain_type="stuff"  # or "map_reduce", "refine"
)

# 階段 4：測試 Recall、Precision、MRR

In [None]:
from typing import List
# 建立 evaluator
def evaluate_retriever(
    retriever,
    qa_pairs: List[dict],
    k: int = 3
):
    """Evaluate retriever with metrics: Recall@k, Precision@k, MRR."""
    total = 0
    sum_recall = 0.0
    sum_precision = 0.0
    sum_mrr = 0.0

    for qa in qa_pairs:
        q = qa["question"]
        ground_truths = qa["gold label"]["text"]  # list[str]
        if not ground_truths:
            continue  

        
        docs = retriever.invoke(q)
        
        hits = []
        for idx, doc in enumerate(docs):
            text = doc.page_content.lower()
            
            for gt in ground_truths:
                if gt.lower() in text:
                    hits.append(idx + 1) 
                    break

        total += 1
        if hits:
            # Recall@k
            sum_recall += 1
            # Precision@k
            sum_precision += len(hits) / k
            # MRR
            sum_mrr += 1.0 / hits[0]
        else:
            # no hit
            sum_recall += 0
            sum_precision += 0
            sum_mrr += 0

    # aggregate
    recall_at_k = sum_recall / total if total > 0 else 0
    precision_at_k = sum_precision / total if total > 0 else 0
    mrr = sum_mrr / total if total > 0 else 0

    return {
        "recall@k": recall_at_k,
        "precision@k": precision_at_k,
        "mrr": mrr,
        "evaluated_count": total
    }


# 進行評估
results = evaluate_retriever(compression_retriever, qa_pairs, k=3)
print("Retriever Eval:", results)

Retriever Eval: {'recall@k': 0.9101727447216891, 'precision@k': 0.3869481765834993, 'mrr': 0.8611644273832373, 'evaluated_count': 2605}


# 階段 5：Langchain evaluation 測試 BERT Score

In [19]:
from datasets import Dataset
import pandas as pd
from langchain.evaluation.qa import QAEvalChain

eval_chain = QAEvalChain.from_llm(llm)

# 將 QA pair 轉成 HuggingFace Dataset
hf_dataset = Dataset.from_list([
    {"id": qa["id"], "question": qa["question"], "answers": qa["gold label"]["text"]}
    for qa in qa_pairs[:150]  # 先用前150筆測試
])

# 批量生成 + 評估
def generate_and_eval(batch):
    # 將 question 欄位轉成 chain 預期的 key
    inputs = [{"query": q} for q in batch["question"]]
    # batch["question"] 是 list[str]
    
    # 嘗試使用 batch（支援批量的 chain）
    try:
        results = qa_chain.batch(inputs)
        batch["prediction"] = [r["result"] for r in results]
    except AttributeError:
        # 如果 chain 不支援 batch ，就回退單筆 invoke
        batch["prediction"] = [qa_chain.invoke(q) for q in batch["question"]]

    # 準備 evaluate 的輸入格式
    examples = [
        {"query": q, "answer": a}
        for q, a in zip(batch["question"], batch["answers"])
    ]
    predictions = [{"result": p} for p in batch["prediction"]]

    # 執行官方的 evaluate（支援批次）
    eval_results = eval_chain.evaluate(examples, predictions)

    batch["eval"] = eval_results
    return batch

# 使用 map 批量處理，batch_size 可調整
hf_dataset = hf_dataset.map(generate_and_eval, batched=True, batch_size=2)

# 轉成 DataFrame
eval_col = hf_dataset["eval"]
all_results = []
for e in eval_col:
    if isinstance(e, list):
        all_results.extend(e)
    else:
        all_results.append(e)

df = pd.DataFrame(all_results)

# 解析每筆是否正確，建立新欄位
df['is_correct'] = df['results'].apply(lambda x: 1 if 'CORRECT' in x else 0)

# 計算 Accuracy
accuracy = df['is_correct'].mean()
print(f"Accuracy: {accuracy:.4f}")

# 若想看其他統計
print(df.describe())

Map:   0%|          | 0/150 [00:00<?, ? examples/s]

Accuracy: 0.9800
       is_correct
count  150.000000
mean     0.980000
std      0.140469
min      0.000000
25%      1.000000
50%      1.000000
75%      1.000000
max      1.000000


In [None]:
preds_raw = hf_dataset["prediction"]
refs = hf_dataset["answers"]
print("Preds:", preds_raw[:2])
print("Refs:", refs[:2])

In [20]:
import evaluate
# 取得生成與答案
preds_raw = hf_dataset["prediction"]
refs = hf_dataset["answers"]

# preds_raw 是 list[dict]，我們只取出其中的 result 欄位
preds = [p["result"] if isinstance(p, dict) and "result" in p else str(p) for p in preds_raw]

# squad 2.0 包含無答案情況

# 攤平 references（取每筆中的第一個答案）
flatten_refs = [r[0] if isinstance(r, list) and len(r) > 0 else "" for r in refs]

# 判斷是否有答案
has_answer = [r.strip() != "" for r in flatten_refs]

# 分離有答案與無答案的預測與參考
answerable_preds = [p for p, ha in zip(preds, has_answer) if ha]
answerable_refs = [r for r, ha in zip(refs, has_answer) if ha]

# BERTScore（需要 bert-score 套件）
bertscore = evaluate.load("bertscore")
bert_result = bertscore.compute(predictions=answerable_preds, references=answerable_refs, lang="en")
bert_f1_mean = sum(bert_result["f1"]) / len(bert_result["f1"])
print(f"BERTScore (F1 mean): {bert_f1_mean:.4f}")


Some weights of RobertaModel were not initialized from the model checkpoint at roberta-large and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


BERTScore (F1 mean): 0.8220
