In [None]:

# 初始化參數
import os
file_path = "nstc_jobs_full.csv"  # 爬蟲產出的 CSV 路徑
model_name = "all-MiniLM-L6-v2"   # 向量化模型
chunk_size = 300                   # 文件切割長度
chunk_overlap = 30                 # 文件切割重疊
encoding_batch_size = 32           # 向量化批次大小
top_k = 5                          # 查詢回傳筆數
save_path_prefix = "faiss_recursive"  # FAISS index 儲存前綴
log_dir = "logs"                   # log 檔儲存資料夾
output_log_csv = f"{log_dir}/rag_results_log.csv"
output_log_excel = f"{log_dir}/rag_results_log.xlsx"
os.makedirs(log_dir, exist_ok=True)


In [111]:
import pandas as pd

def load_and_prepare_data(file_path):
    """讀取爬蟲 CSV 並產生文本與中繼資料。
    參數: file_path (str) CSV 路徑
    回傳: texts, metadatas
    使用: texts, metas = load_and_prepare_data('nstc_jobs_full.csv')
    """
    df = pd.read_csv(file_path)
    df = df.dropna(subset=["職缺名稱", "發佈日期", "連結", "詳細內容"])

    texts = [
        f"職缺名稱：{row['職缺名稱']}\n發佈日期：{row['發佈日期']}\n詳細內容：\n{row['詳細內容']}"
        for _, row in df.iterrows()
    ]

    metadatas = [
        {"職缺名稱": row["職缺名稱"], "連結": row["連結"]}
        for _, row in df.iterrows()
    ]

    return texts, metadatas


In [112]:
from langchain.docstore.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter

def build_documents(texts, metadatas, chunk_size, chunk_overlap):
    """將文本依指定長度切割成文件片段。
    texts: 原始文本列表
    metadatas: 對應的中繼資料列表
    chunk_size: 每段最大字元數
    chunk_overlap: 重疊字元數
    使用: docs = build_documents(texts, metadatas, 300, 30)
    """
    docs = [Document(page_content=text, metadata=meta) for text, meta in zip(texts, metadatas)]
    splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    return splitter.split_documents(docs)


In [113]:
import re
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document

def hybrid_chunking(text, metadata, chunk_size=300, chunk_overlap=30):
    """依標題或空行分段後再切割過長段落。
    text: 單篇文本
    metadata: 此文本的中繼資料
    chunk_size: 每段最大字元
    chunk_overlap: 段落重疊量
    使用: chunks = hybrid_chunking(texts[0], metadatas[0])
    """
    # Step 1: 先依據 【標題】或雙換行或條列符號切段
    segments = re.split(r"(?=【[^】]+】)|(?<=\n)\d+\.\s+|\n{2,}", text)

    # Step 2: 對每段做長度判斷，如太長則進一步切割
    splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    all_chunks = []

    for seg in segments:
        if len(seg.strip()) == 0:
            continue
        doc = Document(page_content=seg.strip(), metadata=metadata)
        sub_chunks = splitter.split_documents([doc])
        all_chunks.extend(sub_chunks)

    return all_chunks
def build_documents_hybrid(texts, metadatas, chunk_size=300, chunk_overlap=30):
    """對多筆文本進行 hybrid chunking，回傳所有切好的 Document 物件"""
    all_docs = []
    for text, meta in zip(texts, metadatas):
        chunks = hybrid_chunking(text, meta, chunk_size, chunk_overlap)
        all_docs.extend(chunks)
    return all_docs


In [114]:

def build_documents_sentence(texts, metadatas):
    '''將多篇文本以句子為單位切割。'''
    all_docs = []
    for text, meta in zip(texts, metadatas):
        chunks = sentence_chunking(text, meta)
        all_docs.extend(chunks)
    return all_docs

def build_documents_paragraph(texts, metadatas):
    '''將多篇文本以段落為單位切割。'''
    all_docs = []
    for text, meta in zip(texts, metadatas):
        chunks = paragraph_chunking(text, meta)
        all_docs.extend(chunks)
    return all_docs


In [115]:

from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import os

def build_faiss_index(split_docs, model_name, batch_size=32):
    '''將文件片段編碼並建立 FAISS 索引。'''
    model = SentenceTransformer(model_name)
    contents = [doc.page_content for doc in split_docs]
    embeddings = model.encode(contents, show_progress_bar=True, batch_size=batch_size)
    dim = embeddings.shape[1]
    index = faiss.IndexFlatL2(dim)
    index.add(np.array(embeddings))
    return index, model, split_docs

def load_or_build_index(docs, model_name, save_path_prefix, batch_size=32):
    '''載入既有索引，或在不存在時重新建立並儲存。'''
    index_file = f"{save_path_prefix}.index"
    docs_file = f"{save_path_prefix}_docs.pkl"
    if os.path.exists(index_file) and os.path.exists(docs_file):
        print(f"📦 載入已存在的 index：{save_path_prefix}")
        embedding_model = SentenceTransformer(model_name)
        index, loaded_docs = load_faiss_index(save_path_prefix)
        return index, embedding_model, loaded_docs
    else:
        print(f"🛠️ 建立新 index 並儲存至：{save_path_prefix}")
        index, embedding_model, split_docs = build_faiss_index(docs, model_name, batch_size)
        save_faiss_index(index, split_docs, save_path_prefix)
        return index, embedding_model, split_docs


In [116]:

import pickle
import faiss

def save_faiss_index(index, docs, save_path_prefix='faiss_index'):
    '''將索引及文件序列化存檔。'''
    faiss.write_index(index, f"{save_path_prefix}.index")
    with open(f"{save_path_prefix}_docs.pkl", 'wb') as f:
        pickle.dump(docs, f)
    print(f"✅ 已儲存 FAISS index 和文件到：{save_path_prefix}.index / _docs.pkl")

def load_faiss_index(save_path_prefix='faiss_index'):
    '''讀取先前儲存的索引與文件。'''
    index = faiss.read_index(f"{save_path_prefix}.index")
    with open(f"{save_path_prefix}_docs.pkl", 'rb') as f:
        docs = pickle.load(f)
    print(f"📂 成功載入 index 與 documents，筆數：{len(docs)}")
    return index, docs


In [117]:
# 舊版 rag_query，已被新版取代


In [118]:
import csv
import os
from datetime import datetime

def save_rag_log(results, model_name, chunk_size, chunk_overlap, save_path="rag_results_log.csv"):
    """將查詢結果寫入 CSV 檔。
    results: rag_query 產生的結果
    model_name: 使用的模型名稱
    chunk_size: 切割長度設定
    chunk_overlap: 重疊長度設定
    save_path: 儲存的 CSV 路徑
    使用: save_rag_log(results, model_name, chunk_size, chunk_overlap)
    """
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    write_header = not os.path.exists(save_path)

    with open(save_path, "a", newline='', encoding="utf-8-sig") as f:  # 👈 修正編碼為 utf-8-sig
        writer = csv.DictWriter(
            f,
            fieldnames=["timestamp", "query", "rank", "職缺名稱", "連結", "摘要", "model_name", "chunk_size", "chunk_overlap"]
        )
        if write_header:
            writer.writeheader()
        for row in results:
            writer.writerow({
                "timestamp": timestamp,
                "query": row["query"],
                "rank": row["rank"],
                "職缺名稱": row["職缺名稱"],
                "連結": row["連結"],
                "摘要": row["摘要"],
                "model_name": model_name,
                "chunk_size": chunk_size,
                "chunk_overlap": chunk_overlap
            })


In [119]:
import pandas as pd
from datetime import datetime
import os

def save_rag_log_to_excel(results, model_name, chunk_size, chunk_overlap, chunking_strategy, save_path="rag_results_log.xlsx"):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # 將本次查詢結果轉成 DataFrame
    new_df = pd.DataFrame([{
        "timestamp": timestamp,
        "query": row["query"],
        "rank": row["rank"],
        "職缺名稱": row["職缺名稱"],
        "連結": row["連結"],
        "摘要": row["摘要"],
        "model_name": model_name,
        "chunk_size": chunk_size,
        "chunk_overlap": chunk_overlap,
        "chunking_strategy": chunking_strategy
    } for row in results])

    # 如果檔案已存在就讀入合併
    if os.path.exists(save_path):
        old_df = pd.read_excel(save_path)
        combined_df = pd.concat([old_df, new_df], ignore_index=True)
    else:
        combined_df = new_df

    # 寫入 Excel
    combined_df.to_excel(save_path, index=False)
    print(f"✅ 已寫入 {len(new_df)} 筆紀錄，累計：{len(combined_df)} 筆 ➜ {save_path}")


In [120]:

        def rag_query(query, top_k, embedding_model, index, indexed_docs):
            '''根據查詢字串在索引中取回相關片段。'''
            query_embedding = embedding_model.encode([query])
            D, I = index.search(np.array(query_embedding), top_k)
            results = []
            for i, idx in enumerate(I[0]):
                if idx >= len(indexed_docs):
                    print(f"⚠️ 無效索引：{idx} 超出 indexed_docs 範圍（{len(indexed_docs)}）")
                    continue
                doc = indexed_docs[idx]
                snippet = doc.page_content.strip().replace('
', ' ')
                if len(snippet) > 200:
                    snippet = snippet[:200] + '...'
                results.append({'query': query,
                                'rank': i + 1,
                                '職缺名稱': doc.metadata['職缺名稱'],
                                '連結': doc.metadata['連結'],
                                '摘要': snippet})
            return results


In [121]:
# 參數設定已在最上方定義


In [127]:
# 此區塊已整合至主流程


In [124]:
# 此區塊已整合至主流程


In [125]:
# 此區塊已整合至主流程


In [None]:
# 此區塊已整合至主流程


In [None]:
# 此區塊已整合至主流程


In [134]:
# 此區塊已整合至主流程


In [135]:
# 此區塊已整合至主流程


In [137]:
# 此區塊已整合至主流程


In [None]:

# === 主流程執行 ===
texts, metadatas = load_and_prepare_data(file_path)
docs = build_documents_hybrid(texts, metadatas, chunk_size, chunk_overlap)
index, embedding_model, indexed_docs = load_or_build_index(
    docs,
    model_name=model_name,
    save_path_prefix=save_path_prefix,
    batch_size=encoding_batch_size,
)
print(f"Hybrid Chunking 完成，共產生 {len(docs)} 個 chunks")

query = "生物相關的職缺有哪些"
results = rag_query(query, top_k, embedding_model, index, indexed_docs)

save_rag_log_to_excel(
    results,
    model_name=model_name,
    chunk_size=chunk_size,
    chunk_overlap=chunk_overlap,
    chunking_strategy='hybrid_chunking',
    save_path=output_log_excel,
)
save_rag_log(
    results,
    model_name=model_name,
    chunk_size=chunk_size,
    chunk_overlap=chunk_overlap,
    save_path=output_log_csv,
)
