# Sparse Embedding - TF-IDF
- 업스테이지 베이스라인 기반으로 작성되었습니다.

### 0. 준비

In [1]:
import json 
import os
import pickle
import time
from contextlib import contextmanager
from typing import List, NoReturn, Optional, Tuple, Union
import faiss
import numpy as np
import pandas as pd
from datasets import Dataset, concatenate_datasets, load_from_disk
from transformers import AutoTokenizer
from sklearn.feature_extraction.text import TfidfVectorizer
from tqdm.auto import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
@contextmanager
def timer(name):
    t0 = time.time()
    yield
    print(f"[{name}] done in {time.time() - t0:.3f} s")

### 1. 데이터 불러오기
- `wikipedia_documents.json`

In [3]:
def load_data(data_path):
    """
    Load JSON data.

    Parameters:
    - data_path: str, path to the JSON file
    """
    with open(data_path, 'r', encoding='utf-8') as f:
        wiki = json.load(f)

    ### unique text 추출
    wiki_df = pd.DataFrame(wiki.values())
    wiki_unique_df = wiki_df.drop_duplicates(subset=['text'], keep='first')
    ids = wiki_unique_df['document_id'].tolist()
    contexts = wiki_unique_df['text'].tolist()
    return ids, contexts

In [4]:
retrieval_data_path = '../../data/wikipedia_documents.json'
ids, contexts = load_data(retrieval_data_path)
print(f"Length of unique context: {len(contexts)}")
# Q. 문서 제목을 활용할 수는 없을까?

Length of unique context: 56737


### 2. 임베딩 생성
- sparse embedding: TF-IDF, BM25
- learned sparse embedding: SPLADE, BGE-M3

In [5]:
def get_sparse_embedding(embedding_method, tokenize_fn):
    """
    Summary:
        Passage Embedding을 만들고
        TFIDF와 Embedding을 pickle로 저장합니다.
        만약 미리 저장된 파일이 있으면 저장된 pickle을 불러옵니다.
    """
    pickle_name = f"{embedding_method}_sparse_embedding.bin"
    vectorizer_name = f"{embedding_method}.bin"
    emb_path = os.path.join(pickle_name)
    vectorizer_path = os.path.join(vectorizer_name)

    if os.path.isfile(emb_path) and os.path.isfile(vectorizer_path):
        with open(emb_path, "rb") as file:
            p_embedding = pickle.load(file)
        with open(vectorizer_path, "rb") as file:
            vectorizer = pickle.load(file)
        print("Embedding pickle load.")
    else:
        print("Build passage embedding")
        # vectorizer 정의
        if embedding_method == 'tfidf':
            vectorizer = TfidfVectorizer(tokenizer=tokenize_fn.tokenize, ngram_range=(1, 2), max_features=50000)
            p_embedding = vectorizer.fit_transform(tqdm(contexts, desc="TF-IDF Vectorization"))
        elif embedding_method == 'bm25':
            pass ###TODO: BM25 구현
        elif embedding_method == 'splade':
            pass ###TODO: splade 구현
        elif embedding_method == 'bge-m3':
            pass ###TODO: bge-m3 구현
        else:
            raise ValueError("Unsupported embedding method.")

        # p_embedding, vectorizer 저장
        with open(emb_path, "wb") as file:
            pickle.dump(p_embedding, file)
        with open(vectorizer_path, "wb") as file:
            pickle.dump(vectorizer, file)
        print("Embedding pickle saved.")
    return p_embedding, vectorizer

In [6]:
embedding_method = 'tfidf'
tokenize_fn = AutoTokenizer.from_pretrained('klue/bert-base')
p_embedding, vectorizer = get_sparse_embedding(embedding_method, tokenize_fn)

Build passage embedding


Token indices sequence length is longer than the specified maximum sequence length for this model (1133 > 512). Running this sequence through the model will result in indexing errors
TF-IDF Vectorization: 100%|██████████| 56737/56737 [01:19<00:00, 709.46it/s]


Embedding pickle saved.


### 3. Vector Database
- FAISS

In [7]:
def build_faiss(num_clusters=64) -> NoReturn:
    """
    Summary:
        속성으로 저장되어 있는 Passage Embedding을
        Faiss indexer에 fitting 시켜놓습니다.
        이렇게 저장된 indexer는 `get_relevant_doc`에서 유사도를 계산하는데 사용됩니다.

    Note:
        Faiss는 Build하는데 시간이 오래 걸리기 때문에,
        매번 새롭게 build하는 것은 비효율적입니다.
        그렇기 때문에 build된 index 파일을 저정하고 다음에 사용할 때 불러옵니다.
        다만 이 index 파일은 용량이 1.4Gb+ 이기 때문에 여러 num_clusters로 시험해보고
        제일 적절한 것을 제외하고 모두 삭제하는 것을 권장합니다.
    """

    indexer_name = f"{embedding_method}_faiss_clusters{num_clusters}.index"
    indexer_path = os.path.join(indexer_name)
    if os.path.isfile(indexer_path):
        print("Load Saved Faiss Indexer.")
        indexer = faiss.read_index(indexer_path)

    else:
        print(f"Creating FAISS indexer from embeddings with num_clusters {num_clusters}.")
        p_emb = p_embedding.astype(np.float32).toarray()
        emb_dim = p_emb.shape[-1]

        num_clusters = num_clusters
        quantizer = faiss.IndexFlatL2(emb_dim)  ###TODO: L2 외의 다른 Metric 적용
        indexer = faiss.IndexIVFScalarQuantizer(
            quantizer, quantizer.d, num_clusters, faiss.METRIC_L2
        )
        indexer.train(p_emb)  ###TODO: 소요시간 표시 (timer 함수 이용)
        indexer.add(p_emb)
        faiss.write_index(indexer, indexer_path)
        print("Faiss Indexer Saved.")
    return indexer 

In [8]:
indexer = build_faiss(num_clusters=64)

Creating FAISS indexer from embeddings with num_clusters 64.
Faiss Indexer Saved.


### 4. retrieve

In [9]:
def get_relevant_doc(query: str, k: Optional[int] = 1, use_faiss: Optional[bool] = False) -> Tuple[List, List]:

    """
    Arguments:
        query (str):
            하나의 Query를 받습니다.
        k (Optional[int]): 1
            상위 몇 개의 Passage를 반환할지 정합니다.
    Note:
        vocab 에 없는 이상한 단어로 query 하는 경우 assertion 발생 (예) 뙣뙇?
    """

    with timer("transform"):
        query_vec = vectorizer.transform([query])
    assert (
        np.sum(query_vec) != 0
    ), "오류가 발생했습니다. 이 오류는 보통 query에 vectorizer의 vocab에 없는 단어만 존재하는 경우 발생합니다."

    if use_faiss:
        q_emb = query_vec.toarray().astype(np.float32)
        with timer("query faiss search"):
            D, I = indexer.search(q_emb, k)
        return D.tolist()[0], I.tolist()[0]
    else:
        with timer("query ex search"):
            result = query_vec * p_embedding.T
        if not isinstance(result, np.ndarray):
            result = result.toarray()

        sorted_result = np.argsort(result.squeeze())[::-1]
        doc_score = result.squeeze()[sorted_result].tolist()[:k]
        doc_indices = sorted_result.tolist()[:k]
        return doc_score, doc_indices

In [10]:
def get_relevant_doc_bulk(queries: List, k: Optional[int] = 1, use_faiss: Optional[bool] = False) -> Tuple[List, List]:
    """
    Arguments:
        queries (List):
            쿼리 리스트를 받습니다.
        k (Optional[int]): 1
            상위 몇 개의 Passage를 반환할지 정합니다.
    Note:
        vocab 에 없는 이상한 단어로 query 하는 경우 assertion 발생 (예) 뙣뙇?
    """

    query_vecs = vectorizer.transform(queries)
    assert (
        np.sum(query_vecs) != 0
    ), "오류가 발생했습니다. 이 오류는 보통 query에 vectorizer의 vocab에 없는 단어만 존재하는 경우 발생합니다."

    if use_faiss:
        q_emb = query_vecs.toarray().astype(np.float32)
        with timer("query faiss search (bulk)"):
            D, I = indexer.search(q_emb, k)
        return D.tolist(), I.tolist()
    else:
        with timer("query ex search (bulk)"):
            result = query_vecs * p_embedding.T
        if not isinstance(result, np.ndarray):
            result = result.toarray()
        doc_scores = []
        doc_indices = []
        for i in range(result.shape[0]):
            sorted_result = np.argsort(result[i, :])[::-1]
            doc_scores.append(result[i, :][sorted_result].tolist()[:k])
            doc_indices.append(sorted_result.tolist()[:k])
        return doc_scores, doc_indices

In [11]:
def retrieve(
    query_or_dataset: Union[str, Dataset], topk: Optional[int] = 1, use_faiss: Optional[bool] = False
) -> Union[Tuple[List, List], pd.DataFrame]:

    """
    Arguments:
        query_or_dataset (Union[str, Dataset]):
            str이나 Dataset으로 이루어진 Query를 받습니다.
            str 형태인 하나의 query만 받으면 `get_relevant_doc`을 통해 유사도를 구합니다.
            Dataset 형태는 query를 포함한 HF.Dataset을 받습니다.
            이 경우 `get_relevant_doc_bulk`를 통해 유사도를 구합니다.
        topk (Optional[int], optional): Defaults to 1.
            상위 몇 개의 passage를 사용할 것인지 지정합니다.

    Returns:
        1개의 Query를 받는 경우  -> Tuple(List, List)
        다수의 Query를 받는 경우 -> pd.DataFrame: [description]

    Note:
        다수의 Query를 받는 경우,
            Ground Truth가 있는 Query (train/valid) -> 기존 Ground Truth Passage를 같이 반환합니다.
            Ground Truth가 없는 Query (test) -> Retrieval한 Passage만 반환합니다.
    """
    
    if use_faiss:
        assert indexer is not None, "build_faiss() 메소드를 먼저 수행해주세요."
    else:
        assert p_embedding is not None, "get_sparse_embedding() 메소드를 먼저 수행해주세요."

    if isinstance(query_or_dataset, str):
        doc_scores, doc_indices = get_relevant_doc(query_or_dataset, k=topk, use_faiss=use_faiss)  ###TODO: get_relevant_doc_faiss
        print("[Search query]\n", query_or_dataset, "\n")

        for i in range(topk):
            print(f"Top-{i+1} passage with score {doc_scores[i]:4f}")
            print(contexts[doc_indices[i]])

        return (doc_scores, [contexts[doc_indices[i]] for i in range(topk)])

    elif isinstance(query_or_dataset, Dataset):

        # Retrieve한 Passage를 pd.DataFrame으로 반환합니다.
        queries = query_or_dataset["question"]
        total = []

        with timer("query exhaustive search"):
            doc_scores, doc_indices = get_relevant_doc_bulk(queries, k=topk, use_faiss=use_faiss)  ###TODO: get_relevant_doc_bulk_faiss
        
        for idx, example in enumerate(
            tqdm(query_or_dataset, desc="Sparse retrieval: ")
        ):
            tmp = {
                # Query와 해당 id를 반환합니다.
                "question": example["question"],
                "id": example["id"],
                # Retrieve한 Passage의 id, context를 반환합니다.
                "context": " ".join(
                    [contexts[pid] for pid in doc_indices[idx]]
                ),
            }
            if "context" in example.keys() and "answers" in example.keys():
                # validation 데이터를 사용하면 ground_truth context와 answer도 반환합니다.
                tmp["original_context"] = example["context"]
                tmp["answers"] = example["answers"]
            total.append(tmp)

        cqas = pd.DataFrame(total)
        return cqas

### 5. test

In [12]:
query = "대통령을 포함한 미국의 행정부 견제권을 갖는 국가 기관은?"

org_dataset_path = "../../data/train_dataset"
org_dataset = load_from_disk(org_dataset_path)

full_ds = concatenate_datasets(
    [
        org_dataset["train"].flatten_indices(),
        org_dataset["validation"].flatten_indices(),
    ]
)  # train dev 를 합친 4192 개 질문에 대해 모두 테스트
print("*" * 40, "query dataset", "*" * 40)
print(full_ds)

**************************************** query dataset ****************************************
Dataset({
    features: ['title', 'context', 'question', 'id', 'answers', 'document_id', '__index_level_0__'],
    num_rows: 4192
})


In [13]:
use_faiss = True 

with timer("single query by faiss"):
    scores, indices = retrieve(query, use_faiss=use_faiss)

print("-"*100)

with timer("bulk query by exhaustive search"):
    df = retrieve(full_ds, use_faiss=use_faiss)
    df["correct"] = df["original_context"] == df["context"]

    print("correct retrieval result by faiss", df["correct"].sum() / len(df))

[transform] done in 0.003 s
[query faiss search] done in 0.516 s
[Search query]
 대통령을 포함한 미국의 행정부 견제권을 갖는 국가 기관은? 

Top-1 passage with score 3.964760
대통령 지시(Presidential directive)는 미국 대통령의 국가안보 관련 행정명령이다. NSC의 조언과 동의(Advice and consent)가 필요하다. 국가안보 대통령 지시가 보다 뜻이 명확하다.

역대 대통령들은 다양한 용어를 사용했다. 케네디 대통령은 국가안보실행메모(NSAM, National Security Action Memorandums), 닉슨과 포드 대통령은 국가안보결정메모(NSDM, National Security Decision Memorandums), 클린턴 대통령은 대통령결정지시, 조지 부시 대통령은 국가안보대통령지시, 오바마 대통령은 대통령정책지시라고 부른다.

미국의 대통령 지시는 비밀명령으로 내려지기도 하는데, 이는 수십년이 지나면 비밀해제되어 일반에 공개된다. 미국 육군 정보와 보안 사령부와 같이, 군대에 특정 부대를 창설하는 경우, 대통령 지시만으로 창설되곤 한다.
[single query by faiss] done in 0.520 s
----------------------------------------------------------------------------------------------------
[query faiss search (bulk)] done in 329.465 s
[query exhaustive search] done in 330.981 s


Sparse retrieval: 100%|██████████| 4192/4192 [00:00<00:00, 9913.42it/s]

correct retrieval result by faiss 0.03482824427480916
[bulk query by exhaustive search] done in 331.418 s





In [14]:
use_faiss = False 

with timer("single query by exhaustive search"):
    scores, indices = retrieve(query)

print("-"*100)

with timer("bulk query by exhaustive search"):
    df = retrieve(full_ds)
    df["correct"] = df["original_context"] == df["context"]
    print(
        "correct retrieval result by exhaustive search",
        df["correct"].sum() / len(df),
    )

[transform] done in 0.002 s
[query ex search] done in 0.482 s
[Search query]
 대통령을 포함한 미국의 행정부 견제권을 갖는 국가 기관은? 

Top-1 passage with score 0.198994
국회에 관해 규정하는 헌법 제4장의 첫 조문이다.

본조에서 말하는 "국권"이란 국가가 갖는 지배권을 포괄적으로 나타내는 국가 권력, 곧 국가의 통치권을 의미한다. 국권은 일반적으로 입법권·행정권·사법권의 3권으로 분류되지만, 그 중에서도 주권자인 국민의 의사를 직접 반영하는 기관으로서 국회를 "최고 기관"으로 규정한 것이다. 다만, 최고 기관이라 해서 타 기관의 감시와 통제를 받지 않는 것은 아니며 권력 분립 원칙에 따라 국회에 대한 행정권, 사법권의 견제를 받는다.

또한 일본 전체 국민을 대표하는 기관을 국회로 규정함으로써, 국회는 일본의 유일한 입법 기관의 지위를 가지고 있다. 일본 제국 헌법 하에서 입법권은 천황의 권한에 속했으며, 제국의회는 천황의 입법 행위를 보좌하는 기관에 불과했다.

여기서 "유일한 입법 기관"의 의미로는 다음과 같은 해석이 있다.
* 국회 중심 입법 원칙 : 국회가 국가의 입법권을 독점한다는 원칙
* 국회 단독 입법 원칙 : 국회의 입법은 다른 기관의 간섭 없이 이루어진다는 원칙

또한 국회의 입법에 벗어나지 않는 범위 내에서 행정 기관은 정령 등의 규칙 제정권을 가지며(헌법 제73조 제6호), 최고재판소는 소송에 관한 절차, 변호사 및 재판소에 관한 내부 규율 및 사법 사무 처리에 관한 사항에 대한 규칙 제정권(헌법 제77조 제1항)을 가진다.
[single query by exhaustive search] done in 0.490 s
----------------------------------------------------------------------------------------------------
[query ex search (bulk)] done 

Sparse retrieval: 100%|██████████| 4192/4192 [00:00<00:00, 5495.93it/s]

correct retrieval result by exhaustive search 0.25166984732824427
[bulk query by exhaustive search] done in 27.602 s



