In [None]:
!pip install langchain
!pip install langchain_community
!pip install langchain_core
!pip install langchain_huggingface
!pip install kiwipiepy
!pip install konlpy
!pip install rank-bm25
!pip install chromadb
!pip install python-dotenv
!pip install pypdf
!pip install pymupdf

In [None]:
import os

# 1. colab에서 실행할 경우)

## 구글 드라이브 마운트
from google.colab import drive
drive.mount('/content/drive')
base_path = '/content/drive/MyDrive/Projects'

## 문서경로
doc_path = f'{base_path}/rag최적화실험/documents/상해보험약관.pdf'
## 질문+평가 json
eval_json_path = f'{base_path}/rag최적화실험/eval_data/상해보험약관_eval.json'
env = f'{base_path}/comm/.env'

In [None]:
'''
# 2. local에서 실행할 경우

base_path = './'

## 문서경로
doc_path = f'{base_path}/documents/상해보험약관.pdf'
## 질문+평가 json
eval_json_path = f'{base_path}/eval_data/상해보험약관_eval.json'
env = f'{base_path}/../../comm/.env'
'''

In [None]:
# env 설정
from dotenv import load_dotenv
load_dotenv(env)

In [None]:
# 문서 로드
from langchain_community.document_loaders import PyMuPDFLoader
loader = PyMuPDFLoader(doc_path)
docs = loader.load()

In [None]:
# eval json 로드
import json
with open(eval_json_path, "r", encoding="utf-8") as f:
  eval_data = json.load(f)

In [None]:
# 토크나이저
from kiwipiepy import Kiwi
kiwi = Kiwi()

def kiwi_tokenize(text):
  return " ".join([token.form for token in kiwi.tokenize(text)])

In [None]:
# 임베딩 모델
from langchain_huggingface import HuggingFaceEmbeddings
model_name = "BAAI/bge-m3"
model_kwargs = {"device": "cuda"}
encode_kwargs = {"normalize_embeddings": True}

embeddings = HuggingFaceEmbeddings(model_name=model_name, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs)

In [None]:
# 리트리버 클래스 로드
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain_community.retrievers import TFIDFRetriever
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from langchain_community.vectorstores import Chroma

In [None]:
# 리트리버 정의

## keyword 기반 리트리버
bm25 = BM25Retriever.from_documents(docs)
tfIdf = TFIDFRetriever.from_documents(docs)

## keyword ensemble
ensemble_b7_t3 = EnsembleRetriever(retrievers=[bm25, tfIdf], weights=[0.7, 0.3], search_type="mmr")
ensemble_b5_t5 = EnsembleRetriever(retrievers=[bm25, tfIdf], weights=[0.5, 0.5], search_type="mmr")
ensemble_b3_t7 = EnsembleRetriever(retrievers=[bm25, tfIdf], weights=[0.3, 0.7], search_type="mmr")

## embedding 기반 리트리버
chroma = Chroma.from_documents(documents=docs, embedding=embeddings).as_retriever()

## hybrid 리트리버
ensemble_b7_c3 = EnsembleRetriever(retrievers=[bm25, chroma], weights=[0.7, 0.3], search_type="mmr")
ensemble_b5_c5 = EnsembleRetriever(retrievers=[bm25, chroma], weights=[0.5, 0.5], search_type="mmr")
ensemble_b3_c7 = EnsembleRetriever(retrievers=[bm25, chroma], weights=[0.3, 0.7], search_type="mmr")

## 리트리버 리스트
retriever_dict = {
    'bm25':bm25,
    'tfIdf':tfIdf,
    'ensemble_b7_t3':ensemble_b7_t3,
    'ensemble_b5_t5':ensemble_b5_t5,
    'ensemble_b3_t7':ensemble_b3_t7,
    'chroma':chroma,
    'ensemble_b7_c3':ensemble_b7_c3,
    'ensemble_b5_c5':ensemble_b5_c5,
    'ensemble_b3_c7':ensemble_b3_c7,
}

In [None]:
# topk 설정
top_k = 5

In [None]:
# retriver 평가하기
import time
def eval_retriever(retriever, eval_list):
  start = time.time()
  results = {"keyword":0, "keyword_kiwi":0, "embedding":0, "embedding_kiwi":0, "hybrid":0, "hybrid_kiwi":0}

  for eval in eval_list:
    ## 정답 페이지
    answer_pages = set(eval["answer_pages"])

    ## 정답 여부 확인
    is_success = check_invoke_retriever(
        retriever=retriever,
        question=eval["question"],
        answer_pages=answer_pages)

    if is_success:
      results[eval["type"]] += 1

    ## kiwi로 가공한 질문으로 정답 여부 확인
    is_success = check_invoke_retriever(
        retriever=retriever,
        question=kiwi_tokenize(eval["question"]),
        answer_pages=answer_pages)

    if is_success:
      results[f'{eval["type"]}_kiwi'] += 1

    exec_time = time.time() - start
    # 1회 invoke할때, 평균 실행시간
    avg_time = round(exec_time/(len(eval_list)*2), 3)
  return avg_time, results

# retreiver.invoke후 정답포함여부 리턴
def check_invoke_retriever(retriever, question, answer_pages):
  return_docs = retriever.invoke(question)[:top_k]
  return_pages = set([doc.metadata["page"]+1 for doc in return_docs])
  return bool(answer_pages & return_pages)

In [None]:
# 리트리버들 invoke
results = {}
times = {}

for name, retriever in retriever_dict.items():
  avg_time, result = eval_retriever(retriever, eval_data)
  times[name] = avg_time
  results[name] = result

In [None]:
# reranker 관련 변수
reranker_name = "BAAI/bge-reranker-v2-m3"

In [None]:
# reranker 설정
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain.retrievers import ContextualCompressionRetriever

reranker_model = HuggingFaceCrossEncoder(model_name=reranker_name)
compressor = CrossEncoderReranker(model=reranker_model, top_n=top_k*2)

In [None]:
# rerenker invoke
for name, retriever in retriever_dict.items():
  reranker = ContextualCompressionRetriever(base_compressor=compressor, base_retriever=retriever)
  avg_time, result = eval_retriever(reranker, eval_data)
  times[f'{name}_rerank'] = avg_time
  results[f'{name}_rerank'] = result

In [None]:
# Reranker Custom Class
from langchain.retrievers.document_compressors import CrossEncoderReranker
import operator
from typing import Optional, Sequence

from langchain_core.callbacks import Callbacks
from langchain_core.documents import Document
from langchain_community.document_transformers.embeddings_redundant_filter import _DocumentWithState

class CustomCrossEncoderReranker(CrossEncoderReranker):
    def compress_documents(
        self,
        documents: Sequence[Document],
        query: str,
        callbacks: Optional[Callbacks] = None,
    ) -> Sequence[Document]:
        if not documents:
            return []

        # 문서 개수가 top_n보다 적은 경우 존재
        top_n = min(self.top_n, len(documents))
        scores = self.model.score([(query, doc.page_content) for doc in documents])
        docs_with_scores = list(zip(documents, scores))

        # 정렬후, top_n개 가져옴
        results = sorted(docs_with_scores, key=operator.itemgetter(1), reverse=True)[:top_n]

        top_n_result = []
        for result in results:
            doc = result[0]
            doc.metadata['score'] = result[1]

            if isinstance(doc, _DocumentWithState):
                # Wrapper class인 경우, Document로 convert
                top_n_result.append(
                    Document(
                        page_content=doc.page_content,
                        metadata=doc.metadata,
                    )
                )
            else:
                top_n_result.append(doc)

        return top_n_result

In [None]:
# reranker - filter 추가
from langchain.retrievers.document_compressors.embeddings_filter import EmbeddingsFilter
from langchain.retrievers.document_compressors.base import DocumentCompressorPipeline

# 필터 압축 검색기
filter = EmbeddingsFilter(embeddings=embeddings, similarity_threshold=0.3)
compressor_pipeline = DocumentCompressorPipeline(transformers=[filter, compressor])

In [None]:
# rerenker(+filter) invoke
for name, retriever in retriever_dict.items():
  reranker = ContextualCompressionRetriever(base_compressor=compressor_pipeline, base_retriever=retriever)
  avg_time, result = eval_retriever(reranker, eval_data)
  times[f'{name}_rerank_filter'] = avg_time
  results[f'{name}_rerank_filter'] = result

In [None]:
# 결과값 -> 그래프 생성

import pandas as pd

# nested dict → row 기반 리스트로 변환
flat_results = [
    {"retriever": r_name, "question_type": q_type, "score": score}
    for r_name, q_dict in results.items()
    for q_type, score in q_dict.items()
]

df = pd.DataFrame(flat_results)
df.to_csv(f"{base_path}/rag최적화실험/results/rag_eval_results.csv", index=False)

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(12,6))
sns.barplot(data=df, x="retriever", y="score", hue="question_type")
plt.xticks(rotation=45)
plt.title("RAG Retriever Accuracy by Type")
plt.show()

In [None]:
time_results = [
    {"retriever": r_name, "avg_time": avg_time}
    for r_name, avg_time in times.items()
]

time_df = pd.DataFrame(time_results)
time_df.to_csv(f"{base_path}/rag최적화실험/results/rag_eval_times.csv", index=False)

In [None]:
plt.figure(figsize=(12,6))
sns.barplot(data=time_df, x="retriever", y="avg_time")
plt.xticks(rotation=45)
plt.title("RAG Retriever Average Execution Time by Type")
plt.show()