## Library

In [28]:
import os
import torch
import time
import pandas as pd
import re

from sklearn.model_selection import train_test_split
from dotenv import load_dotenv

from sentence_transformers import SentenceTransformer
from sentence_transformers import CrossEncoder

from tqdm import tqdm
from pydantic import BaseModel, Field

from langchain.prompts import PromptTemplate
from langchain.vectorstores import FAISS
from langchain.callbacks import get_openai_callback
from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain
from langchain_core.prompts import ChatPromptTemplate
from itertools import combinations


from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    confusion_matrix,
    classification_report,
)

load_dotenv()

True

In [29]:
seed = 1
rerank_k = 5
retrieve_k = 10

## Data

In [None]:
train = pd.read_csv(f'../seed{seed}/seed{seed}_train.csv')
validation = pd.read_csv(f'../seed{seed}/seed{seed}_validation.csv')
test = pd.read_csv(f'../seed{seed}/seed{seed}_test.csv')

## Vectorstore

In [31]:
device = torch.device("cpu")

class KUREEmbedding:
    def __init__(self, model_name="nlpai-lab/KURE-v1"):
        self.model = SentenceTransformer(model_name, trust_remote_code=True).to(device)

    def embed_documents(self, texts):
        embeddings = self.model.encode(texts, convert_to_numpy=True)
        return embeddings

    def embed_query(self, text):
        return self.embed_documents([text])[0]

class KoE5Embedding(KUREEmbedding):
    def __init__(self, model_name="nlpai-lab/KoE5"):
        super().__init__(model_name)

In [None]:
vectorstore_path = f"../seed{seed}/faiss_index_seed{seed}_koe5"
if os.path.exists(vectorstore_path):
    embeddings = KoE5Embedding()

    vectorstore = FAISS.load_local(
        vectorstore_path,
        embeddings.embed_query,
        allow_dangerous_deserialization=True,
    )

    retriever = vectorstore.as_retriever(search_kwargs={"k": retrieve_k})

## Classifier

In [None]:
class SentenceClassification(BaseModel):
    prediction: str = Field(description="Classification of the sentence: True, False")

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
structured_llm_grader = llm.with_structured_output(SentenceClassification)

## Prompt & Chain

In [34]:
classification_prompt = ChatPromptTemplate.from_messages(
    [("system", "Classify the sentence into one of : True , False. "
                "Judge as True if it is unethical.\n"
                "Return in JSON: {{\"prediction\": \"True\" or \"False\"}}"),
     ("human", "Reference sentence: \n\n {sentences} \n\n Input sentence: {question}")]
)

classification_grader = classification_prompt | structured_llm_grader

## Rerank

In [35]:
def retrieve_and_rerank_no_reranker(question, k):
    """
    Reranker 사용하지 않은 방식
    """
    similar_docs = retriever.invoke(question)
    retrieved_texts = [doc.page_content for doc in similar_docs][:k]

    return retrieved_texts

# Hugging Face Cross-Encoder Reranker (cross-encoder/ms-marco-MiniLM-L-6-v2)
def retrieve_and_rerank_hf_crossencoder(question, k):
    """
    FAISS retriever를 이용하여 검색한 문서를 Hugging Face Cross-Encoder 기반으로 재정렬하여 최적의 문서를 반환
    """
    reranker_model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")

    similar_docs = retriever.invoke(question)
    retrieved_texts = [doc.page_content for doc in similar_docs]

    query_pairs = [[question, doc] for doc in retrieved_texts]
    scores = reranker_model.predict(query_pairs)

    sorted_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)
    reranked_texts = [retrieved_texts[i] for i in sorted_indices][:k]

    return reranked_texts

# Hugging Face Reranker (BAAI/bge-reranker-v2-m3)
def retrieve_and_rerank_hf_bge(question, k):
    """
    FAISS retriever를 이용하여 검색한 문서를 Hugging Face BGE Reranker 기반으로 재정렬하여 최적의 문서를 반환
    """
    reranker_model = CrossEncoder(model_name="BAAI/bge-reranker-v2-m3")

    similar_docs = retriever.invoke(question)
    retrieved_texts = [doc.page_content for doc in similar_docs]

    query_pairs = [[question, doc] for doc in retrieved_texts]
    scores = reranker_model.predict(query_pairs)

    sorted_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)
    reranked_texts = [retrieved_texts[i] for i in sorted_indices][:k]

    return reranked_texts

# LLM Listwise Reranker
def retrieve_and_rerank_llm_listwise(question, k):
    """
    FAISS retriever를 이용하여 검색한 문서를 LLM Listwise 방식으로 재정렬하여 최적의 문서를 반환
    """
    similar_docs = retriever.invoke(question)
    retrieved_texts = [doc.page_content for doc in similar_docs]

    prompt_template = PromptTemplate(
        input_variables=["question", "documents"],
        template="""
        질문: {question}
        Question: {question}
        Rank the following documents based on their relevance to the question:
        {documents}
    
        List the documents starting from the most relevant.
        """
    )
    chain = LLMChain(llm=llm, prompt=prompt_template)
    ranked_output = chain.run({"question": question, "documents": "\n".join(retrieved_texts)})
    pattern = r"\d+\.\s(.+?\(Label: (True|False)\))"
    matches = re.findall(pattern, ranked_output) 

    reranked_texts = [match[0] for match in matches][:k]

    return reranked_texts

# LLM Pointwise Reranker
def retrieve_and_rerank_llm_pointwise(question, k):
    """
    FAISS retriever를 이용하여 검색한 각 문서에 대해 LLM이 개별 점수를 부여하여 재정렬하는 방식 (Pointwise)
    """
    similar_docs = retriever.invoke(question)
    retrieved_texts = [doc.page_content for doc in similar_docs]

    reranked_docs = []
    
    for doc in retrieved_texts:
        prompt_template = PromptTemplate(
            input_variables=["question", "document"],
            template="""
            Question: {question}
            Evaluate how relevant the document is to the given question on a scale from 1 to 10.

            Document:
            {document}

            Score (1-10):
            """
        )
        score_output = llm.invoke(prompt_template.format(question=question, document=doc))

        score_text = score_output.content if hasattr(score_output, "content") else str(score_output)

        score_match = re.search(r"\d+", score_text)
        score = int(score_match.group()) if score_match else 0
        
        reranked_docs.append((doc, score))

    reranked_docs.sort(key=lambda x: x[1], reverse=True)
    reranked_texts = [doc[0] for doc in reranked_docs][:k]

    return reranked_texts

# LLM Pairwise Reranker
def retrieve_and_rerank_llm_pairwise(question, k):
    """
    FAISS retriever를 이용하여 검색한 문서 쌍을 비교하여 우선순위를 결정하는 Pairwise 방식의 Reranker
    """
    similar_docs = retriever.invoke(question)
    retrieved_texts = [doc.page_content for doc in similar_docs]
    
    doc_scores = {doc: 0 for doc in retrieved_texts}
    
    for doc1, doc2 in combinations(retrieved_texts, 2):
        prompt_template = PromptTemplate(
            input_variables=["question", "doc1", "doc2"],
            template="""
            Question: {question}
            Select the document that is more relevant to the question below.

            Document 1:
            {doc1}

            Document 2:
            {doc2}

            Output only the number of the more relevant document: 1 or 2.
            """
        )
        comparison_output = llm.invoke(prompt_template.format(question=question, doc1=doc1, doc2=doc2))

        comparison_text = comparison_output.content if hasattr(comparison_output, "content") else str(comparison_output)

        selected_match = re.search(r"\b[12]\b", comparison_text)
        if selected_match:
            selected_doc = doc1 if selected_match.group() == "1" else doc2
            doc_scores[selected_doc] += 1
    
    reranked_texts = sorted(doc_scores.keys(), key=lambda x: doc_scores[x], reverse=True)[:k]

    return reranked_texts

## Prediction

In [None]:
results = []


for idx, row in tqdm(
    test.iterrows(), total=len(test), desc="Processing"
):
    question = row["문장"]
    answer = row["비도덕여부"]
    
    retrieved_sentences = retrieve_and_rerank_hf_crossencoder(question, rerank_k)

    prediction = classification_grader.invoke(
        {"question": question, "sentences": retrieved_sentences}
    )

    results.append(
        {
            "question": question,
            "answer": answer,
            "prediction": prediction.prediction
        }
    )

df_results = pd.DataFrame(results)

## Evaluation

In [None]:
y_true = df_results["answer"].map({True: 1, False: 0})
y_pred = df_results["prediction"].map({"True": 1, "False": 0})

accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1_final = f1_score(y_true, y_pred)
f1_macro = f1_score(y_true, y_pred, average="macro")
f1_weighted = f1_score(y_true, y_pred, average="weighted")

conf_matrix = confusion_matrix(y_true, y_pred)

print("\n===== Classification Performance Results =====")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1_final:.4f}")
print(f"F1-score (Macro): {f1_macro:.4f}")
print(f"F1-score (Weighted): {f1_weighted:.4f}")

print("\n===== Classification Confusion Matrix =====")
print(conf_matrix)

print("\n===== Detailed Classification Report =====")
print(classification_report(y_true, y_pred))