# 載入必要套件

In [None]:
!pip install sentence-transformers faiss-cpu tqdm
!pip install openai

In [None]:
pip install scann

# Step 1：資料清洗＋段落切分＋向量化＋ScaNN建構

第一版本:FAISS

In [12]:
import json
import os
import re
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np

# 1. 載入 JSON 資料
def load_jsonl(path):
    with open(path, 'r', encoding='utf-8') as f:
        return json.load(f)

# 2. 資料清洗規則
EXCLUDED_HEADERS = [
    "abstract", "references", "acknowledgement", "acknowledgments", 
    "appendix", "author contributions", "supplementary materials"
]

def clean_paragraph(p):
    p = p.strip()
    p = re.sub(r'\s+', ' ', p)
    return p

def is_informative(p, min_len=30):
    if not p or len(p) < min_len:
        return False
    lower_p = p.lower()
    return not any(lower_p.startswith(h) for h in EXCLUDED_HEADERS)

# 3. 分段+清洗
def chunk_papers(data, section_splitter=True):
    all_chunks = []
    metadata = []
    for item in tqdm(data, desc="分段&結構化清洗"):
        title = item["title"]
        # === 第 1 層切分（以 \n\n\n 當作大章節）
        raw_sections = item["full_text"].split("\n\n\n")
        for section_id, raw_section in enumerate(raw_sections):
            # === 第 2 層切分（針對 ::: 子章節）
            if section_splitter and " ::: " in raw_section:
                sub_sections = raw_section.split(" ::: ")
                first_header, *rest = sub_sections
                sub_chunks = [first_header.strip()] + [s.strip() for s in rest]
            else:
                sub_chunks = [raw_section.strip()]

            for chunk_id, chunk in enumerate(sub_chunks):
                clean_chunk = clean_paragraph(chunk)
                if is_informative(clean_chunk):
                    all_chunks.append(clean_chunk)
                    metadata.append({
                        "title": title,
                        "section_id": section_id,
                        "chunk_id": chunk_id,
                        "section_name": clean_chunk.split(" ")[0][:50]  # 取開頭詞當section tag
                    })
    return all_chunks, metadata

# 4. 編碼
def encode_chunks(chunks, model_name="BAAI/bge-base-en-v1.5", batch_size=32):
    model = SentenceTransformer(model_name)
    embeddings = []
    for i in tqdm(range(0, len(chunks), batch_size), desc="向量化"):
        batch = chunks[i:i+batch_size]
        embs = model.encode(batch, show_progress_bar=False, normalize_embeddings=True)
        embeddings.append(embs)
    embeddings = np.vstack(embeddings).astype('float32')
    return embeddings

# 5. 建立 FAISS 向量索引
def build_faiss(embeddings, dim=None, save_path="faiss_index"):
    if dim is None:
        dim = embeddings.shape[1]
    index = faiss.IndexFlatIP(dim)   # 內積 (更快且常見)
    index.add(embeddings)
    faiss.write_index(index, f"{save_path}.bin")
    print(f"FAISS index 儲存至: {save_path}.bin")
    return index

def save_metadata(metadata, path):
    with open(path, "w", encoding="utf-8") as f:
        json.dump(metadata, f, ensure_ascii=False, indent=2)
    print(f"metadata 儲存至: {path}")

def split_into_sentences(text):
    sentences = re.split(r'(?<=[。！？!?\.])\s+', text.strip())
    return [s.strip() for s in sentences if len(s.strip()) > 10]


if __name__ == "__main__":
    # 1. 載入資料
    data = load_jsonl("datasets/private_dataset.json")
    # 2. 分段與清洗
    chunks, meta = chunk_papers(data)
    print(f"共保留有效段落數: {len(chunks)}")
    with open("chunks.json", "w", encoding="utf-8") as f:
        json.dump(chunks, f, ensure_ascii=False, indent=2)
    
    # 3. 嵌入向量
    emb = encode_chunks(chunks, model_name="BAAI/bge-large-en-v1.5")
    # 4. 建立 FAISS 向量索引
    build_faiss(emb, dim=emb.shape[1], save_path="faiss_index_public")
    # 5. 儲存 metadata (方便之後根據檢索結果找回原始資料)
    save_metadata(meta, "faiss_index_public_metadata.json")


分段&結構化清洗: 100%|██████████| 100/100 [00:00<00:00, 1087.84it/s]


共保留有效段落數: 1364


向量化: 100%|██████████| 43/43 [00:29<00:00,  1.46it/s]

FAISS index 儲存至: faiss_index_public.bin
metadata 儲存至: faiss_index_public_metadata.json





第二版本:ScaNN <-先跑這版

In [None]:
import json
import re
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
import scann
import numpy as np


# 1. 載入 JSON 資料
def load_jsonl(path):
    with open(path, 'r', encoding='utf-8') as f:
        return json.load(f)

# 2. 資料清洗規則
EXCLUDED_HEADERS = [
    "abstract", "references", "acknowledgement", "acknowledgments", 
    "appendix", "author contributions", "supplementary materials"
]

def clean_paragraph(p):
    p = p.strip()
    p = re.sub(r'\s+', ' ', p)
    return p

def is_informative(p, min_len=30):
    if not p or len(p) < min_len:
        return False
    lower_p = p.lower()
    return not any(lower_p.startswith(h) for h in EXCLUDED_HEADERS)

# 3. 分段+清洗
def split_into_sentences(text):
    sentences = re.split(r'(?<=[。！？!?\.])\s+', text.strip())
    return [s.strip() for s in sentences if len(s.strip()) > 10]

def sentence_overlap_chunking(sentences, window_size=5, stride=3):
    chunks = []
    for i in range(0, len(sentences), stride):
        window = sentences[i:i+window_size]
        if window:
            chunk = " ".join(window).strip()
            if len(chunk) > 30:
                chunks.append(chunk)
    return chunks

# 分段技巧
def chunk_papers(data, section_splitter=True, use_overlap=True, window_size=5, stride=3):
    all_chunks = []
    metadata = []

    for item in tqdm(data, desc="分段&結構化清洗"):
        title = item["title"]
        raw_sections = item["full_text"].split("\n\n\n")

        for section_id, raw_section in enumerate(raw_sections):
            # === 第二層切 ::: 子章節
            if section_splitter and " ::: " in raw_section:
                sub_sections = raw_section.split(" ::: ")
                first_header, *rest = sub_sections
                sub_chunks = [first_header.strip()] + [s.strip() for s in rest]
            else:
                sub_chunks = [raw_section.strip()]

            for chunk_id, chunk in enumerate(sub_chunks):
                clean_chunk = clean_paragraph(chunk)

                if not is_informative(clean_chunk):
                    continue

                if use_overlap:
                    # 句子重疊切分
                    sentences = split_into_sentences(clean_chunk)
                    overlapped = sentence_overlap_chunking(sentences, window_size, stride)
                    for i, o_chunk in enumerate(overlapped):
                        all_chunks.append(o_chunk)
                        metadata.append({
                            "title": title,
                            "section_id": section_id,
                            "chunk_id": f"{chunk_id}_{i}",
                            "section_name": o_chunk.split(" ")[0][:50]
                        })
                else:
                    all_chunks.append(clean_chunk)
                    metadata.append({
                        "title": title,
                        "section_id": section_id,
                        "chunk_id": chunk_id,
                        "section_name": clean_chunk.split(" ")[0][:50]
                    })

    return all_chunks, metadata

# 4. 編碼
def encode_chunks(chunks, model_name="BAAI/bge-large-en-v1.5", batch_size=32):
    model = SentenceTransformer(model_name)
    embeddings = []
    for i in tqdm(range(0, len(chunks), batch_size), desc="向量化"):
        batch = chunks[i:i+batch_size]
        embs = model.encode(batch, show_progress_bar=False, normalize_embeddings=True)
        embeddings.append(embs)
    embeddings = np.vstack(embeddings).astype('float32')
    return embeddings

# 5. 建立 ScaNN 向量索引
def build_scann(embeddings, save_path="scann_index", num_leaves=150, num_leaves_to_search=60, reordering_k=100):
    # 建立 ScaNN index
    searcher = scann.scann_ops_pybind.builder(embeddings, 10, "dot_product") \
        .tree(num_leaves=num_leaves, num_leaves_to_search=num_leaves_to_search) \
        .score_ah(2, anisotropic_quantization_threshold=0.2) \
        .reorder(reordering_k).build()
    
    print(f"ScaNN index 建立完成")
    return searcher

def save_metadata(metadata, path):
    with open(path, "w", encoding="utf-8") as f:
        json.dump(metadata, f, ensure_ascii=False, indent=2)
    print(f"metadata 儲存至: {path}")


if __name__ == "__main__":
    # 1. 載入資料
    data = load_jsonl("datasets/public_dataset.json")
    
    # 2. 分段與清洗
    chunks, meta = chunk_papers(data)
    print(f"共保留有效段落數: {len(chunks)}")
    
    with open("chunks.json", "w", encoding="utf-8") as f:
        json.dump(chunks, f, ensure_ascii=False, indent=2)

    # 3. 嵌入向量
    emb = encode_chunks(chunks, model_name="BAAI/bge-large-en-v1.5")
    
    # 4. 建立 ScaNN 向量索引
    np.save("scann_embeddings.npy", emb)
    searcher = build_scann(emb, save_path="scann_index")
    
    # 5. 儲存 metadata 
    save_metadata(meta, "scann_index_metadata.json")


In [None]:
def search_scann(query, chunks, metadata, searcher, encoder, top_k):
    # 將 query 轉為向
    query_vec = encoder.encode([query], normalize_embeddings=True).astype('float32')
    
    # 查詢 top-k 相似向量
    neighbors, scores = searcher.search_batched(query_vec)
    
    results = []
    for idx, score in zip(neighbors[0], scores[0]):
        results.append({
            "score": float(score),
            "index": int(idx),
            "chunk": chunks[idx],           # 原始段落
            "metadata": metadata[idx]       # 該段落的來源資訊（title、section等）
        })
    return results

# Step 2：建構向量檢索器（Retriever）

faiss

In [13]:
import json
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer

class VectorRetriever:
    def __init__(self, index_path, metadata_path, chunk_path=None,
                 model_name="BAAI/bge-large-en-v1.5"):
        self.index = faiss.read_index(index_path)
        # 載入 metadata
        with open(metadata_path, "r", encoding="utf-8") as f:
            self.metadata = json.load(f)
        # 載入原始 chunks（可選）
        if chunk_path:
            with open(chunk_path, "r", encoding="utf-8") as f:
                self.chunks = json.load(f)
        else:
            self.chunks = None
            
        self.encoder = SentenceTransformer(model_name)

    def retrieve(self, query, top_k=5):
        # 編碼 query
        q_vec = self.encoder.encode([query], normalize_embeddings=True).astype('float32')
        # 搜尋 top-k 相似向量
        scores, indices = self.index.search(q_vec, top_k)
        results = []
        for score, idx in zip(scores[0], indices[0]):
            meta = self.metadata[idx]
            content = self.chunks[idx] if self.chunks else None
            results.append({
                "score": float(score),
                "index": int(idx),
                "metadata": meta,
                "content": content
            })
        return results

ScaNN

In [None]:
import json
import numpy as np
from sentence_transformers import SentenceTransformer
import scann

class ScaNNRetriever:
    def __init__(self, embedding_path, metadata_path, chunk_path=None,
                 model_name="BAAI/bge-large-en-v1.5"):
        self.embeddings = np.load(embedding_path)
        
        with open(metadata_path, "r", encoding="utf-8") as f:
            self.metadata = json.load(f)
        
        if chunk_path:
            with open(chunk_path, "r", encoding="utf-8") as f:
                self.chunks = json.load(f)
        else:
            self.chunks = None

        # 建立 ScaNN index
        self.searcher = scann.scann_ops_pybind.builder(self.embeddings, 10, "dot_product") \
            .tree(num_leaves=150, num_leaves_to_search=60) \
            .score_ah(2, anisotropic_quantization_threshold=0.2) \
            .reorder(100).build()

        self.encoder = SentenceTransformer(model_name)

    def retrieve(self, query, top_k=5):
        q_vec = self.encoder.encode([query], normalize_embeddings=True).astype('float32')
        neighbors, scores = self.searcher.search_batched(q_vec)

        results = []
        for idx, score in zip(neighbors[0], scores[0]):
            meta = self.metadata[idx]
            content = self.chunks[idx] if self.chunks else None
            results.append({
                "score": float(score),
                "index": int(idx),
                "metadata": meta,
                "content": content
            })
        return results

# Step 3：設計 Prompt 模板 & RAG 回答模組

In [14]:
PROMPT_TEMPLATE = """
You are a research assistant. Answer the question strictly based on the provided context and any clues or evidence.
Carefully analyze the context and extract only the most relevant information.
Provide a concise, direct answer to the question in a single sentence. 
If the context provides no useful information, try your best to infer an answer from what you have.
Only respond with "I don't know" if absolutely no relevant clues can be found in the context.
Avoid explanations, introductory phrases, or unnecessary details. No extra steps or reasoning, just the final answer.
Do NOT copy full sentences verbatim from the context.
If the answer is too cryptic and unclear, just help me answer based on known clues to minimize the chances of answering “I don't know”.

Context:
{context}

Question:
{question}

Answer (no explanation):
"""

In [15]:
from sentence_transformers import SentenceTransformer, util

bert_model = SentenceTransformer("BAAI/bge-large-en-v1.5")

def select_top_sentences(query, candidate_chunks, max_sentences):
    all_sentences = []
    for chunk in candidate_chunks:
        all_sentences.extend(split_into_sentences(chunk))
    query_emb = bert_model.encode(query, convert_to_tensor=True)
    sent_embs = bert_model.encode(all_sentences, convert_to_tensor=True)
    scores = util.pytorch_cos_sim(query_emb, sent_embs)[0]
    top_idx = scores.argsort(descending=True)[:max_sentences]
    return [all_sentences[i] for i in top_idx]


def build_context(sentences, max_char=4500):
    context = ""
    for s in sentences:
        if len(context) + len(s) + 1 > max_char:
            break
        context += s + " "
    return context.strip()

In [16]:
# 特例:計數題型
def is_count_question(query):
    q = query.lower()
    return any(q.startswith(p) for p in ["how many", "how much", "how long", "how often"])

def extract_numeric_evidence(sentences):
    return [s for s in sentences if re.search(r"\b\d{1,6}\b", s)]

def extract_best_count_answer(sentences, query=None):
    """
    根據句子中的數字與名詞組合，選出最可能是 count 答案的句子。
    """
    keyword_whitelist = {"articles", "documents", "samples", "questions", "entries", "sentences", "reviews", "papers"}
    query_keywords = set(re.findall(r'\b\w+\b', query.lower())) if query else set()

    scored = []
    for s in sentences:
        s_lower = s.lower()
        nums = re.findall(r"\b\d{1,6}\b", s_lower)
        if not nums:
            continue

        match_score = 0
        for word in keyword_whitelist:
            if word in s_lower:
                match_score += 2
                if word in query_keywords:
                    match_score += 3  

        # 額外根據數字個數加小分（例如句子中有兩個數字，可能更豐富）
        match_score += len(nums)

        scored.append((match_score, s.strip()))

    if scored:
        scored.sort(reverse=True)  # 根據分數排序
        return scored[0][1]  # 回傳分數最高的句子
    else:
        return None

In [17]:
from openai import OpenAI

client = OpenAI(
    api_key="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",
    base_url="https://api.groq.com/openai/v1"
)

def query_llm(prompt, model="gemma2-9b-it"):
    response = client.chat.completions.create(  
        model=model,
        messages=[
            {"role": "user", "content": prompt}
        ],
        temperature=0.5,
        max_tokens=600,
    )
    return response.choices[0].message.content.strip()

def extract_list_items(text):
    if isinstance(text, list):
        return [s.strip().rstrip(".") for s in text if isinstance(s, str)]

    text = text.strip()
    items = []

    lines = re.split(r"[•\-\n]", text)  # 支援條列符號與換行
    for line in lines:
        parts = re.split(r",|，|、| and | or |;", line)  # 更靈活的分割器
        for part in parts:
            clean = part.strip(" -•0123456789.\"\n").rstrip(".")
            if len(clean.split()) <= 8 and len(clean) >= 2:
                items.append(clean)

    items = list(dict.fromkeys(items))

    if not items:
        return [text]
    return items

def extract_final_sentence(text):
    # 找最後一句 answer
    lines = text.strip().splitlines()
    candidates = [l for l in lines if l.strip() and not l.lower().startswith("step")]
    
    # 取最後一個合理句子
    for line in reversed(candidates):
        if len(line.strip().split()) > 4:
            return re.sub(r'^[Aa]nswer\s*:\s*', '', line.strip())
    
    # fallback 萃取不到就整段丟回
    return text.strip()

def answer_question(query, chunks, metadata, index, encoder, model="gemma2-9b-it", title=None, top_k=40):
    q_vec = encoder.encode([query], normalize_embeddings=True).astype('float32')
    scores, indices = index.search(q_vec, top_k)

    candidate_chunks = [chunks[i] for i in indices[0]]

    best_sentences = select_top_sentences(query, candidate_chunks, max_sentences=18)
    
    if is_count_question(query):
        numeric_evidence = extract_numeric_evidence(best_sentences)
        if numeric_evidence:
            count_answer = extract_best_count_answer(numeric_evidence)
            if count_answer:
                return {
                    "title": title,
                    "answer": count_answer,
                    "evidence": numeric_evidence
                }

    # === 6. 去除重複 & 太短的句子
    seen = set()
    unique_evidence = []
    for s in best_sentences:
        if s not in seen and len(s.split()) > 4:
            seen.add(s)
            unique_evidence.append(s)

    # === 7. 組 context 並生成 prompt
    context = build_context(unique_evidence, max_char=5000)
    prompt = PROMPT_TEMPLATE.format(context=context, question=query)

    # === 8. 使用 Groq 的 LLM 進行生成
    reasoning_output = query_llm(prompt, model=model)
    final_answer = extract_final_sentence(reasoning_output)

    return {
        "title": title,
        "answer": final_answer,
        "evidence": unique_evidence
    }

# Step 4：處理 QA 任務

In [None]:
import json
import numpy as np
from tqdm import tqdm
from sentence_transformers import SentenceTransformer

chunks = json.load(open("chunks.json"))
metadata = json.load(open("faiss_index_public_metadata.json"))
index = faiss.read_index("faiss_index_public.bin")
encoder = SentenceTransformer("BAAI/bge-large-en-v1.5")

private_data = json.load(open("datasets/private_dataset.json"))
outputs = []

for item in tqdm(private_data, desc="處理 QA 問題"):
    title = item["title"]
    query = item["question"]

    result = answer_question(
        query=query,
        chunks=chunks,
        metadata=metadata,
        index=index,
        encoder=encoder,
        model="gemma2-9b-it",
        title=title
    )

    outputs.append(result)

with open("sample_submission_private.json", "w", encoding="utf-8") as f:
    json.dump(outputs, f, ensure_ascii=False, indent=2)

print("處理完成，結果已儲存於 sample_submission_private.json")

處理 QA 問題: 100%|██████████| 100/100 [04:01<00:00,  2.42s/it]

處理完成，結果已儲存於 sample_submission_public.json





# Evaluation

In [26]:
from rouge_score import rouge_scorer

scorer = rouge_scorer.RougeScorer(["rougeL"], use_stemmer=True)
fmeasure_scores = []

result = outputs[demo_id]
unique_evidence = result["evidence"]

for chunk in unique_evidence:
  scores = scorer.score_multi(  # using maximum f-measure
    targets=demo_evidence,
    prediction=chunk
    )

  fmeasure_scores.append(scores["rougeL"].fmeasure)

final_evidence_score = sum(fmeasure_scores) / len(fmeasure_scores)
print(f"{final_evidence_score = :.4f}")

final_evidence_score = 0.1438
