# Reranking 평가

- Cross-encoder 기반 reranker 모듈을 도입하여 성능 향상 평가

In [None]:
import os, sys, builtins, logging

# repo 루트 기준으로 utils 임포트 가능하도록 경로 추가
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), '..')))

from utils.logging_setup import setup_logging

# UTF-8 파일 핸들러로 로그 초기화 (logs/{timestamp}.log)
# print를 직접 로거로 보낼 것이므로 redirect_prints는 False
setup_logging(force=True, redirect_prints=False)

# print를 로거로 보내되, 기존 콘솔 출력도 유지
_original_print = builtins.print

def print(*args, **kwargs):  # noqa: A001 (shadow builtins)
    message = " ".join(str(a) for a in args)
    logging.info(message)
    try:
        _original_print(*args, **kwargs)
    except Exception:
        # 콘솔 출력 실패 시에도 로깅은 유지
        pass

In [None]:
# OpenAI API 설정 및 라이브러리 import
from openai import OpenAI
import os, sys
from typing import List, Dict
import dotenv
import sys

# 프로젝트 루트 디렉토리를 Python 경로에 추가
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))
if project_root not in sys.path:
    sys.path.append(project_root)


dotenv.load_dotenv()

In [None]:
# openai 설정

# API 키 설정 (환경변수에서 가져오기)
client = OpenAI(
    api_key=os.getenv("OPENAI_API_KEY")  # 환경변수에 API 키를 설정해주세요
)


response_format = {
    "type": "json_schema",          # JSON 스키마 강제 모드
    "json_schema": {
        "name": "translate_result",
        "schema": {
            "type": "object",
            "properties": {
                "translated": { "type": "string" },
                "mongo_query": {
                    "type": "array",
                    "minItems": 1,
                    # 각 stage는 자유형 객체로 허용(예: {"$search": {...}}, {"$project": {...}})
                    "items": {
                        "type": "object",
                        "additionalProperties": True
                    }
                }
            },
            "required": ["translated", "mongo_query"],
            "additionalProperties": False
        }
    },
    # 스키마를 더 엄격히 따르게 함(모델이 스키마 밖 형식을 내보내지 않도록)
    "strict": True
}

def chat(
    messages: List[Dict[str, str]], 
    model: str = "gpt-4o-mini",
    response_format: dict = None,
    **kwargs
) -> str:
    """
    OpenAI GPT-4o-mini API를 사용하여 채팅 완성을 수행합니다.
    
    Args:
        messages: 대화 메시지 리스트 [{"role": "user", "content": "메시지"}]
        model: 사용할 모델명 (기본값: gpt-4o-mini)
        **kwargs: OpenAI API 매개변수들
            - temperature: 창의성 조절 (0.0-2.0, 기본값: 0.7)
            - max_tokens: 최대 토큰 수 (기본값: 1000)
            - top_p: 확률 임계값 (기본값: 0.95)
            - frequency_penalty: 빈도 페널티 (기본값: 0.0)
            - presence_penalty: 존재 페널티 (기본값: 0.0)
            - stream: 스트리밍 여부 (기본값: False)
            - 기타 OpenAI API가 지원하는 모든 매개변수
        
    Returns:
        GPT 응답 텍스트
    """
    # 기본값 설정
    default_params = {
        "temperature": 0.1,
        "max_tokens": 2048,
        "top_p": 0.95,
        "frequency_penalty": 0.0,
        "presence_penalty": 0.0
    }
    
    # 기본값과 사용자 입력 병합
    params = {**default_params, **kwargs}
    
    try:
        response = client.chat.completions.create(
            model=model,
            messages=messages,
            response_format=response_format,
            **params
        )
        
        return response.choices[0].message.content
        
    except Exception as e:
        print(f"API 호출 중 오류 발생: {e}")
        return None


In [None]:
# MongoDB 연결 테스트
import json
import os
import pymongo
from pymongo import MongoClient
import dotenv

dotenv.load_dotenv()

print("=" * 50)
print("MongoDB 연결 테스트 시작")
print("=" * 50)

# 설정 파일 로드
with open('../configs/config.json', 'r', encoding='utf-8') as f:
    config = json.load(f)

# MongoDB 클라이언트 연결
mongodb_client = MongoClient(os.getenv("MONGODB_URI"))
print("🔗 MongoDB 클라이언트 연결 완료")

try:
    mongodb_client.admin.command('ping')
    print("✅ MongoDB 연결 성공!")
    
    current_db = mongodb_client[config['path']['db_name']]
    target_collection = current_db[config['path']['collection_name']]
    
    print(f"🎯 현재 데이터베이스: {config['path']['db_name']}")
    print(f"🎯 타겟 컬렉션: {config['path']['collection_name']}")
    
    # 컬렉션 통계 정보
    stats = current_db.command("collStats", config['path']['collection_name'])
    print(f"📈 문서 개수: {stats['count']:,}")
    
    print("=" * 50)
    print("✅ MongoDB 연결 테스트 완료!")
    print("=" * 50)
    
except Exception as e:
    print(f"❌ 오류: {e}")

## 실험 2 파이프라인 기반

In [None]:
import os, sys
import pandas as pd

# 프로젝트 루트 디렉토리를 Python 경로에 추가
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))
if project_root not in sys.path:
    sys.path.append(project_root)

query_df = pd.read_csv("../data/helloworld_test_query_with_translation_query_20250930_191253.csv")
query_df.head()

In [None]:
# Reranking 모델 설정
from Azure.reranking_model import ChatModel
from dotenv import load_dotenv

load_dotenv()
with open('../configs/config.json', 'r', encoding='utf-8') as f:
    config = json.load(f)
    
# Reranking 모델 인스턴스 생성
reranking_chat_model = ChatModel(config)

print("Reranking 기반 하이브리드 검색 모델 설정 완료")
print(f"Reranker 모델: {config['reranker_config']['model']}")
print(f"후보 문서 수: {config['reranker_config']['numCandidates']}")
print(f"최종 반환 문서 수: {config['chat_config']['top_k']}")

print("쿼리 기반 하이브리드 검색 모델 설정 완료")

# 키워드 기반 검색 함수
def get_query_model_response_with_docs(query_text, mongo_query):
    """
    키워드 기반 하이브리드 검색 모델로부터 답변을 생성하고 검색된 문서들의 인덱스를 반환
    """
    try:
        # 빈 대화 히스토리로 시작
        conversation_history = []
        
        # 키워드 기반 모델 답변 생성
        response = reranking_chat_model.generate_ai_response(
            conversation_history, 
            query_text, 
            target_collection, 
            mongo_query=mongo_query
        )
        
        return {
            "answer": response["answer"],
            "retrieved_doc_ids": response["retrieved_doc_ids"],
            "retrieved_docs": response["retrieved_docs"]
        }
        
    except Exception as e:
        print(f"오류 발생: {e}")
        return {
            "answer": "",
            "retrieved_doc_ids": [],
            "retrieved_docs": []
        }

print("쿼리 기반 검색 함수 정의 완료")


In [None]:
# Retrieval Correctness 계산 함수
def calculate_retrieval_correctness(retrieved_doc_ids, ground_truth_ids):
    """
    검색된 문서 ID들과 ground truth ID들을 비교하여 correctness 계산
    """
    if not retrieved_doc_ids or not ground_truth_ids:
        return 0
    
    # ground_truth_ids가 문자열 리스트인 경우 처리
    if isinstance(ground_truth_ids, str):
        try:
            # 문자열을 리스트로 변환 (예: "['id1', 'id2']" -> ['id1', 'id2'])
            import ast
            ground_truth_list = ast.literal_eval(ground_truth_ids)
        except:
            ground_truth_list = [ground_truth_ids]
    else:
        ground_truth_list = ground_truth_ids
    
    # 검색된 문서 중 하나라도 ground truth에 있으면 1, 아니면 0
    for doc_id in retrieved_doc_ids:
        if doc_id in ground_truth_list:
            return 1
    
    return 0

print("Retrieval Correctness 계산 함수 정의 완료")

# 확장된 Retrieval 메트릭 계산 함수들
def calculate_recall_at_k(retrieved_doc_ids, ground_truth_ids, k=None):
    """
    Recall@k 계산: 검색된 문서 중 관련 문서의 비율
    """
    if not retrieved_doc_ids or not ground_truth_ids:
        return 0.0
    
    # ground_truth_ids가 문자열 리스트인 경우 처리
    if isinstance(ground_truth_ids, str):
        try:
            import ast
            ground_truth_list = ast.literal_eval(ground_truth_ids)
        except:
            ground_truth_list = [ground_truth_ids]
    else:
        ground_truth_list = ground_truth_ids
    
    # k가 지정되지 않으면 검색된 문서 수만큼 사용
    if k is None:
        k = len(retrieved_doc_ids)
    
    # 상위 k개 문서만 고려
    top_k_retrieved = retrieved_doc_ids[:k]
    
    # 관련 문서 수 계산
    relevant_retrieved = sum(1 for doc_id in top_k_retrieved if doc_id in ground_truth_list)
    
    # Recall = 관련 문서 수 / 전체 관련 문서 수
    if len(ground_truth_list) == 0:
        return 0.0
    
    return relevant_retrieved / len(ground_truth_list)


def calculate_all_metrics(retrieved_doc_ids, ground_truth_ids, k=None):
    """
    모든 메트릭을 한 번에 계산
    """
    return {
        "correctness": calculate_retrieval_correctness(retrieved_doc_ids, ground_truth_ids),
        "recall_at_k": calculate_recall_at_k(retrieved_doc_ids, ground_truth_ids, k)
    }

print("확장된 Retrieval 메트릭 계산 함수들 정의 완료")


In [None]:
# 안전한 mongo_query 파싱 및 평가 루프 대체
import json, ast, time
from tqdm import tqdm

def parse_mongo_query(query_raw):
    # 이미 리스트[dict]
    if isinstance(query_raw, list):
        return query_raw
    # 문자열이면 ast 우선 → json → 마지막으로 단순치환 후 json
    if isinstance(query_raw, str):
        for parser in (ast.literal_eval, json.loads):
            try:
                return parser(query_raw)
            except Exception:
                pass
        # 단순 따옴표 치환 시도 (가능한 경우에만)
        try:
            sanitized = query_raw.replace("'", '"')
            return json.loads(sanitized)
        except Exception as e:
            raise e
    raise ValueError("Unsupported mongo_query type: {}".format(type(query_raw)))

# 평가 실행 (기존 변수들 재사용)
baseline_results = []
retrieved_doc_ids_list = []
correctness_scores = []
recall_at_k_scores = []

print(f"총 {len(query_df)}개의 쿼리에 대해 쿼리 기반 하이브리드 검색 평가를 시작합니다...")

for idx, row in tqdm(query_df.iterrows(), total=len(query_df), desc="쿼리 하이브리드 검색 평가"): 
    query_text = row['translated_4o_mini']
    ground_truth_ids = row['ground_truth_id']

    try:
        mongo_query = parse_mongo_query(row['mongo_query_4o_mini'])
    except Exception as e:
        print(f"파싱 실패로 해당 샘플 건너뜀: {e}")
        baseline_results.append("")
        retrieved_doc_ids_list.append([])
        correctness_scores.append(0)
        recall_at_k_scores.append(0.0)
        continue

    print(f"\n[{idx+1}/{len(query_df)}] 처리 중: {query_text[:50]}...")
    print(f"사용할 쿼리: {mongo_query}")

    result = get_query_model_response_with_docs(query_text, mongo_query)

    baseline_results.append(result['answer'])
    retrieved_doc_ids_list.append(result['retrieved_doc_ids'])

    metrics = calculate_all_metrics(result['retrieved_doc_ids'], ground_truth_ids)
    correctness_scores.append(metrics['correctness'])
    recall_at_k_scores.append(metrics['recall_at_k'])

    print(f"검색된 문서 수: {len(result['retrieved_doc_ids'])}")
    print(f"Correctness: {metrics['correctness']}")
    print(f"Recall@k: {metrics['recall_at_k']:.3f}")

    time.sleep(1)

print(f"\n=== 키워드 기반 하이브리드 검색 평가 완료! ===")
print(f"평균 Correctness: {sum(correctness_scores) / len(correctness_scores):.3f}")
print(f"평균 Recall@k: {sum(recall_at_k_scores) / len(recall_at_k_scores):.3f}")


In [None]:
# 키워드 기반 검색 결과 저장 및 비교 분석
import datetime

# 데이터프레임에 키워드 기반 검색 결과 추가
query_df['answer'] = baseline_results
query_df['retrieved_doc_ids'] = retrieved_doc_ids_list
query_df['correctness'] = correctness_scores
query_df['recall_at_k'] = recall_at_k_scores

# 결과 저장
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
evaluation_csv_filename = f"../data/evaluation_results_{timestamp}.csv"
query_df.to_csv(evaluation_csv_filename, index=False, encoding='utf-8')

print(f"\n키워드 기반 검색 결과 저장 완료:")
print(f"- answer: {len(baseline_results)}개")
print(f"- retrieved_doc_ids: {len(retrieved_doc_ids_list)}개") 
print(f"- correctness: {len(correctness_scores)}개")
print(f"- recall_at_k: {len(recall_at_k_scores)}개")
print(f"- CSV 파일: {evaluation_csv_filename}")

# 평가 결과 상세 요약
total_queries = len(query_df)
correct_retrievals = sum(correctness_scores)
avg_correctness = correct_retrievals / total_queries
avg_recall = sum(recall_at_k_scores) / len(recall_at_k_scores)

print(f"\n=== 키워드 기반 하이브리드 검색 평가 결과 요약 ===")
print(f"총 쿼리 수: {total_queries}")
print(f"정확한 검색 수: {correct_retrievals}")
print(f"평균 Correctness: {avg_correctness:.3f} ({avg_correctness*100:.1f}%)")
print(f"평균 Recall@k: {avg_recall:.3f} ({avg_recall*100:.1f}%)")


In [None]:
# 결과 저장 및 비교 분석
import datetime

# 결과 저장
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
reranking_csv_filename = f"../data/reranking_evaluation_results_{timestamp}.csv"
query_df.to_csv(reranking_csv_filename, index=False, encoding='utf-8')

print(f"\nReranking 평가 결과 저장 완료: {reranking_csv_filename}")