# Install

In [3]:
#TWCC專用
%pip install sentence-transformers datasets pytrec_eval accelerate pandas pyarrow

Note: you may need to restart the kernel to use updated packages.


In [4]:
# TWCC專用

# 1. 卸載目前不相容的 PyTorch
%pip uninstall -y torch torchvision torchaudio

# 2. 安裝官方穩定版 (支援 Tesla V100)
# CUDA 12.1 或 12.4 的版本，目前最通用的指令
%pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

%pip install ipywidgets

Found existing installation: torch 2.5.1+cu121
Uninstalling torch-2.5.1+cu121:
  Successfully uninstalled torch-2.5.1+cu121
Found existing installation: torchvision 0.20.1+cu121
Uninstalling torchvision-0.20.1+cu121:
  Successfully uninstalled torchvision-0.20.1+cu121
Found existing installation: torchaudio 2.5.1+cu121
Uninstalling torchaudio-2.5.1+cu121:
  Successfully uninstalled torchaudio-2.5.1+cu121
Note: you may need to restart the kernel to use updated packages.
Looking in indexes: https://download.pytorch.org/whl/cu121
Collecting torchaudio
  Using cached https://download.pytorch.org/whl/cu121/torchaudio-2.5.1%2Bcu121-cp312-cp312-linux_x86_64.whl (3.4 MB)
Collecting sympy>=1.13.3 (from torch)
  Using cached https://download.pytorch.org/whl/sympy-1.14.0-py3-none-any.whl.metadata (12 kB)
INFO: pip is looking at multiple versions of torchaudio to determine which version is compatible with other requirements. This could take a while.
Collecting torchaudio
  Using cached https://dow

In [5]:
import os
import logging
import torch

# 設定 Log
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


In [6]:
# load data
data_dir = "./data"
os.makedirs(data_dir, exist_ok=True)

corpus_filename = "FinanceRAG_corpus.parquet"
queries_filename = "FinanceRAG_queries.parquet"

corpus_path = os.path.join(data_dir, corpus_filename)
queries_path = os.path.join(data_dir, queries_filename)

if os.path.exists(corpus_path) and os.path.exists(queries_path):
    print(f"  - Corpus: {corpus_path}")
    print(f"  - Queries: {queries_path}")


# device
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using Device: {device}")

  - Corpus: ./data/FinanceRAG_corpus.parquet
  - Queries: ./data/FinanceRAG_queries.parquet
Using Device: cuda


# Define Core Classes 
(Loader, Encoder, Retriever, Reranker)

In [7]:
import abc
import hashlib
import heapq
import json
import logging
import csv
from pathlib import Path
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union, cast

import numpy as np
from datasets import Dataset, Value, load_dataset
from sentence_transformers import SentenceTransformer, CrossEncoder
from torch import Tensor


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



# 1. DataLoader ---
class HFDataLoader:
    def __init__(self, hf_repo: Optional[str] = None, subset: Optional[str] = None, keep_in_memory: bool = False, **kwargs):
        self.corpus: Optional[Dataset] = None
        self.queries: Optional[Dataset] = None
        self.hf_repo = hf_repo
        self.subset = subset
        self.keep_in_memory = keep_in_memory
        self.streaming = False

    def load(self) -> Tuple[Dataset, Dataset]:
        return self.corpus, self.queries

# --- 2. Abstract Base Classes ---
class Encoder(abc.ABC):
    @abc.abstractmethod
    def encode_queries(self, queries: List[str], **kwargs) -> Union[torch.Tensor, np.ndarray]: raise NotImplementedError
    @abc.abstractmethod
    def encode_corpus(self, corpus: Union[List[Dict], Dict], **kwargs) -> Union[torch.Tensor, np.ndarray]: raise NotImplementedError

class Retrieval(abc.ABC):
    @abc.abstractmethod
    def retrieve(self, corpus, queries, top_k, **kwargs) -> Dict[str, Dict[str, float]]: raise NotImplementedError

class Reranker(abc.ABC):
    @abc.abstractmethod
    def rerank(self, corpus, queries, results, top_k, **kwargs) -> Dict[str, Dict[str, float]]: raise NotImplementedError

# --- 3. Concrete Implementations ---

# SentenceTransformer Encoder
class SentenceTransformerEncoder(Encoder):
    def __init__(self, model_name_or_path: str, query_prompt: Optional[str] = None, doc_prompt: Optional[str] = None, **kwargs):
        self.q_model = SentenceTransformer(model_name_or_path, **kwargs)
        self.doc_model = self.q_model
        self.query_prompt = query_prompt
        self.doc_prompt = doc_prompt

    def encode_queries(self, queries: List[str], batch_size: int = 16, **kwargs) -> Union[np.ndarray, Tensor]:
        if self.query_prompt:
            queries = [self.query_prompt + query for query in queries]
        return self.q_model.encode(queries, batch_size=batch_size, **kwargs)

    def encode_corpus(self, corpus: Union[List[Dict], Dict], batch_size: int = 8, **kwargs) -> Union[np.ndarray, Tensor]:
        if isinstance(corpus, dict):
            sentences = [((corpus["title"][i] + " " + corpus["text"][i]).strip() if "title" in corpus else corpus["text"][i].strip()) for i in range(len(corpus["text"]))]
        else:
            sentences = [((doc["title"] + " " + doc["text"]).strip() if "title" in doc else doc["text"].strip()) for doc in corpus]
        if self.doc_prompt:
            sentences = [self.doc_prompt + s for s in sentences]
        return self.doc_model.encode(sentences, batch_size=batch_size, **kwargs)

# Dense Retrieval
class DenseRetrieval(Retrieval):
    def __init__(self, model: Encoder, batch_size: int = 64, corpus_chunk_size: int = 50000):
        self.model = model
        self.batch_size = batch_size
        self.corpus_chunk_size = corpus_chunk_size

    def retrieve(self, corpus, queries, top_k=100, score_function="cos_sim", **kwargs):
        logger.info("Encoding queries...")
        query_ids = list(queries.keys())
        query_texts = [queries[qid] for qid in queries]
        query_embeddings = self.model.encode_queries(query_texts, batch_size=self.batch_size, **kwargs)
        
        logger.info("Encoding corpus and searching...")
        sorted_corpus_ids = sorted(corpus, key=lambda k: len(corpus[k].get("title", "") + corpus[k].get("text", "")), reverse=True)
        corpus_list = [corpus[cid] for cid in sorted_corpus_ids]
        
        self.results = {qid: {} for qid in query_ids}
        result_heaps = {qid: [] for qid in query_ids}

        # Batch process corpus
        for start_idx in range(0, len(corpus), self.corpus_chunk_size):
            end_idx = min(start_idx + self.corpus_chunk_size, len(corpus_list))
            sub_corpus_embeddings = self.model.encode_corpus(corpus_list[start_idx:end_idx], batch_size=self.batch_size, **kwargs)
            
            # Convert to tensors if numpy
            if isinstance(query_embeddings, np.ndarray): query_embeddings = torch.from_numpy(query_embeddings)
            if isinstance(sub_corpus_embeddings, np.ndarray): sub_corpus_embeddings = torch.from_numpy(sub_corpus_embeddings)
            
            # Ensure device
            if torch.cuda.is_available():
                query_embeddings = query_embeddings.cuda()
                sub_corpus_embeddings = sub_corpus_embeddings.cuda()

            # Compute Similarity (Cosine)
            # Normalize for Cosine Similarity
            q_norm = torch.nn.functional.normalize(query_embeddings, p=2, dim=1)
            c_norm = torch.nn.functional.normalize(sub_corpus_embeddings, p=2, dim=1)
            cos_scores = torch.mm(q_norm, c_norm.transpose(0, 1))
            cos_scores[torch.isnan(cos_scores)] = -1

            # Top-K collection
            cos_scores = cos_scores.cpu() # Move back to CPU for heap operations
            values, indices = torch.topk(cos_scores, min(top_k+1, cos_scores.size(1)), dim=1)
            
            values = values.tolist()
            indices = indices.tolist()

            for i, qid in enumerate(query_ids):
                for score, idx in zip(values[i], indices[i]):
                    doc_id = sorted_corpus_ids[start_idx + idx]
                    if doc_id != qid: # Avoid self-retrieval if IDs match
                         # Maintain heap
                        if len(result_heaps[qid]) < top_k:
                            heapq.heappush(result_heaps[qid], (score, doc_id))
                        else:
                            heapq.heappushpop(result_heaps[qid], (score, doc_id))

        # Finalize results
        for qid in result_heaps:
            for score, doc_id in result_heaps[qid]:
                self.results[qid][doc_id] = score
        return self.results

# Cross Encoder Reranker
class CrossEncoderReranker(Reranker):
    def __init__(self, model: CrossEncoder):
        self.model = model

    def rerank(self, corpus, queries, results, top_k, batch_size=32, **kwargs):
        sentence_pairs, pair_ids = [], []
        for query_id in results:
            # Sort current results and take top_k
            sorted_docs = sorted(results[query_id].items(), key=lambda item: item[1], reverse=True)[:top_k]
            for doc_id, _ in sorted_docs:
                pair_ids.append([query_id, doc_id])
                corpus_text = (corpus[doc_id].get("title", "") + " " + corpus[doc_id].get("text", "")).strip()
                sentence_pairs.append([queries[query_id], corpus_text])

        logger.info(f"Starting Reranking for {len(sentence_pairs)} pairs...")
        scores = self.model.predict(sentence_pairs, batch_size=batch_size, **kwargs)
        
        reranked_results = {qid: {} for qid in results}
        for (qid, doc_id), score in zip(pair_ids, scores):
            reranked_results[qid][doc_id] = float(score)
            
        return reranked_results

In [8]:
# Cell 3: Task Definitions (FinDER)
from pydantic import BaseModel
from typing_extensions import Literal
from datasets import load_dataset
import csv
import os
import logging

class TaskMetadata(BaseModel):
    name: str
    dataset: dict
    description: str = ""
    type: str = "RAG"
    domains: list = []

class BaseTask:
    def __init__(self, metadata: TaskMetadata):
        self.metadata = metadata
        self.queries = None
        self.corpus = None
        self.retrieve_results = None
        self.rerank_results = None
        self.load_data()

    def load_data(self):
        pass

    def retrieve(self, retriever, top_k=100, **kwargs):
        self.retrieve_results = retriever.retrieve(self.corpus, self.queries, top_k=top_k, **kwargs)
        return self.retrieve_results

    def rerank(self, reranker, results=None, top_k=100, batch_size=32, **kwargs):
        if results is None: results = self.retrieve_results
        self.rerank_results = reranker.rerank(self.corpus, self.queries, results, top_k, batch_size, **kwargs)
        return self.rerank_results
    
    def save_results(self, output_dir: str, top_k: int = 10):
        os.makedirs(output_dir, exist_ok=True)
        csv_path = os.path.join(output_dir, "results.csv")
        final_results = self.rerank_results if self.rerank_results else self.retrieve_results
        
        with open(csv_path, "w", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(["query_id", "corpus_id"])
            # 寫入 CSV
            for qid, docs in final_results.items():
                sorted_docs = sorted(docs.items(), key=lambda x: x[1], reverse=True)[:top_k]
                for doc_id, _ in sorted_docs:
                    writer.writerow([qid, doc_id])
        logger.info(f"Saved results to {csv_path}")


logger = logging.getLogger(__name__)

class FinDER(BaseTask):
    def __init__(self, data_folder="./data"):
        self.metadata = TaskMetadata(
            name="FinDER",
            description="Local Parquet Data",
            dataset={
                "path": "Local_Files", 
                "subset": "default",
            },
            type="RAG",
            domains=["Report"],
        )
        self.data_folder = data_folder
        super().__init__(self.metadata)

    def load_data(self):
        # 設定檔案路徑
        corpus_file = os.path.join(self.data_folder, "FinanceRAG_corpus.parquet")
        queries_file = os.path.join(self.data_folder, "FinanceRAG_queries.parquet")

        print(f"Loading data from Parquet files in: {self.data_folder} ...")
        # 1. 讀取 Corpus
        print("Loading Corpus (Parquet)...")
        # split="train" 指定 split 直接拿 Dataset
        corpus_ds = load_dataset("parquet", data_files=corpus_file, split="train")
        
        self.corpus = {}
        for row in corpus_ds:
            # FinanceRAG parquet: _id, title, text
            doc_id = str(row.get("_id"))
            self.corpus[doc_id] = {
                "title": row.get("title", ""),
                "text": row.get("text", "")
            }
            
        # 2. 讀取 Queries
        print("Loading Queries (Parquet)...")
        queries_ds = load_dataset("parquet", data_files=queries_file, split="train")
        
        self.queries = {}
        for row in queries_ds:
            q_id = str(row.get("_id"))
            # Queries parquet
            self.queries[q_id] = row.get("text", "")

        print(f"Successfully loaded {len(self.corpus)} docs and {len(self.queries)} queries.")
        
        # 驗證 ID
        if self.corpus:
            first_id = next(iter(self.corpus))
            print(f"Example Corpus ID check: {first_id}")

In [None]:
# Cell 4: Execution Pipeline
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# 1. Initialize Task & Data
finder_task = FinDER(data_folder="./data")

# 2. Initialize Retrieval Model (Bi-Encoder)
# 使用 e5-large-v2
encoder_model = SentenceTransformerEncoder(
    model_name_or_path='intfloat/e5-large-v2',
    query_prompt='query: ',
    doc_prompt='passage: ',
    device=device 
)

retrieval_model = DenseRetrieval(
    model=encoder_model,
    batch_size=128
)

# 3. Perform Retrieval
print("\n--- Starting Retrieval ---")
retrieval_result = finder_task.retrieve(
    retriever=retrieval_model,
    top_k=100
)

# Print Example
first_qid = list(retrieval_result.keys())[0]
print(f"Retrieval done. Top 5 docs for query {first_qid}:")
top_docs = sorted(retrieval_result[first_qid].items(), key=lambda x: x[1], reverse=True)[:5]
for i, (doc_id, score) in enumerate(top_docs):
    print(f"  {i+1}. {doc_id} (Score: {score:.4f})")

# 4. Initialize Reranker (Cross-Encoder)
print("\n--- Starting Reranking ---")
reranker_model = CrossEncoderReranker(
    model=CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2', device=device)
)

# 5. Perform Reranking
reranking_result = finder_task.rerank(
    reranker=reranker_model,
    results=retrieval_result,
    top_k=100,
    batch_size=64 
)

# Print Example
print(f"Reranking done. Top 5 docs for query {first_qid}:")
top_rerank_docs = sorted(reranking_result[first_qid].items(), key=lambda x: x[1], reverse=True)[:5]
for i, (doc_id, score) in enumerate(top_rerank_docs):
    print(f"  {i+1}. {doc_id} (Score: {score:.4f})")

# 6. Save Results
output_dir = './results'
finder_task.save_results(output_dir=output_dir)
print(f"\nResults saved to {output_dir}/results.csv")

Using device: cuda
Loading data from Parquet files in: ./data ...
Loading Corpus (Parquet)...
Loading Queries (Parquet)...


INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: intfloat/e5-large-v2


Successfully loaded 13863 docs and 216 queries.
Example Corpus ID check: ADBE20230004


INFO:__main__:Encoding queries...



--- Starting Retrieval ---


Batches:   0%|          | 0/2 [00:00<?, ?it/s]

INFO:__main__:Encoding corpus and searching...


Batches:   0%|          | 0/109 [00:00<?, ?it/s]

Retrieval done. Top 5 docs for query q00001:
  1. MSFT20230966 (Score: 0.8740)
  2. MSFT20230216 (Score: 0.8646)
  3. MSFT20230015 (Score: 0.8594)
  4. MSFT20230254 (Score: 0.8580)
  5. MSFT20230155 (Score: 0.8534)

--- Starting Reranking ---


INFO:__main__:Starting Reranking for 21600 pairs...


Batches:   0%|          | 0/338 [00:00<?, ?it/s]