In [None]:
import os
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_postgres import PGVector
from langchain.prompts import PromptTemplate
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from dotenv import load_dotenv
from langchain.retrievers import EnsembleRetriever

# load_dotenv() .env있으면 사용

connection_string = "postgresql+psycopg2://user:password@ip:host/database"
embedding = OpenAIEmbeddings("open_api_key = Your-KEY")

# 1. 보험사 리스트 (collection_names) 정의
collection_names = ['DB', 'samsung', 'hanwha', "현대해상", "test", "meritzfire"] 

# 2. 각 컬렉션별로 retriever 생성
retrievers = []
for name in collection_names:
    vectorstore = PGVector(
        embeddings=embedding,
        collection_name=name,
        connection=connection_string,
        use_jsonb=True)
    retrievers.append(vectorstore.as_retriever(search_kwargs={'k': 4,'fetch_k': 20}))

# 3. 모든 retriever를 하나로 묶기
combined_retriever = EnsembleRetriever(retrievers=retrievers)

# 4. LLM 모델 설정
llm = ChatOpenAI(
    model="gpt-4o-mini",  # 또는 "gpt-3.5-turbo"
    temperature=0.1,
    max_tokens=1500
)

# 6. 문서 포맷팅 함수 (출처 포함)
def format_docs(docs):
    formatted_docs = []
    # 소스 파일 경로를 기반으로 고유한 문서 ID를 할당하기 위한 딕셔너리
    source_map = {}
    doc_counter = 1

    for doc in docs:
        # 메타데이터에서 원본 파일 경로를 가져옴
        source = doc.metadata.get("source", "알 수 없는 출처")
        # 메타데이터에서 페이지 번호를 가져옴 (보통 0부터 시작하므로 +1)
        page_number = doc.metadata.get("page", "알 수 없는 페이지")
        if isinstance(page_number, int):
            page_number += 1
        
        # 새로운 소스 파일이면 고유한 문서 ID를 할당
        if source not in source_map:
            source_map[source] = doc_counter
            doc_counter += 1
            
        doc_id = source_map[source]
        
        # 형식에 맞게 문서 정보와 내용을 포맷팅
        formatted_docs.append(f"[문서 {doc_id} - {os.path.basename(source)}, 페이지 {page_number}]\n{doc.page_content}\n")
    
    return "\n".join(formatted_docs)

In [6]:
# prompt
template_dw =  """당신은 여러 보험사의 상품을 비교 분석하여 사용자에게 가장 쉽고 친절하게 정보를 전달하는 보험 전문가 챗봇입니다.

다음 지시사항을 철저히 따라 사용자의 질문에 한국어로 답변하세요.

답변의 서식:
- 마크다운(Markdown)을 사용하여 답변을 구조화하세요.
- 주요 정보는 **볼드체**로 강조하세요.
- 여러 회사를 비교하는 경우, 반드시 마크다운 표를 활용하여 항목별로 정리하세요.
- 서류나 항목을 나열할 때는 체크리스트(- 또는 * 또는 숫자 목록)를 사용하세요.

답변의 내용:
- 전문 용어는 피하고, 누구나 이해할 수 있도록 쉽고 자세하게 설명하세요.
- 답변은 질문에 대한 직접적인 내용만을 포함하며, 불필요한 서론이나 결론은 제외하세요.
- 제시된 '컨텍스트' 내의 정보만 사용하세요. 컨텍스트에 없는 내용은 절대로 지어내지 마세요. 
- 컨텍스트에 없는 내용을 질문할 시, 모른다고 말하지말고 질문한 회사의 전화번호나 링크를 줘서 직접 내용을 탐색하도록 유도하세요.
- 사용자의 상황(사고 유형, 과실 비율, 연령 등)을 고려해서 답변해주세요.
- 비교할때는 차이점을 명확하게 구체적으로 설명하세요.
- 추천 시에는 그 이유(보장 범위, 특화 옵션, 지급 조건 등)를 구체적으로 설명하세요.

출처 표기:
- 답변에 사용된 모든 정보는 마지막에 출처(파일명/페이지)를 명확히 명시하세요.
- 출처는 항상 괄호(()) 안에 (출처: 파일명, 페이지 p.N) 형식으로 표시하세요.

마지막에는 항상 더 물어볼 내용이 있는지 물어보면서 대화를 더 길게하도록 유도해

[질문]
{question}

[컨텍스트]
{context}
"""

prompt_dowon = ChatPromptTemplate.from_template(template_dw)

rag_chain_dw = (
    {
        "context": combined_retriever | format_docs,
        "question": RunnablePassthrough()
    }
    | prompt_dowon
    | llm
    | StrOutputParser()
)

In [7]:
import os, json, uuid, datetime as dt
from typing import List, Optional, Dict, Iterable

# ==== 사용자 설정 ====
EVAL_JSONL_PATH = "./filename.jsonl"
CONTEXT_MAX_CHARS = 1200     # 각 컨텍스트 길이 제한 (너무 길면 RAGAS에 노이즈)
TOPK_CONTEXTS = 5            # 평가 저장용 최대 컨텍스트 수 (retriever k보다 넉넉히)
# =====================

def _clean_text(s: str) -> str:
    if not isinstance(s, str):
        s = str(s)
    return " ".join(s.split())

def _doc_to_str(doc) -> str:
    """컨텍스트 저장 시, source/page 정보도 앞에 명시해줌 (추적용)."""
    src = doc.metadata.get("source", "unknown")
    page = doc.metadata.get("page")
    if isinstance(page, int):
        page += 1  # 보통 0 기반이라 가독성 위해 +1
    head = f"[{os.path.basename(src)}" + (f" p.{page}]" if page is not None else "]")
    body = _clean_text(doc.page_content)[:CONTEXT_MAX_CHARS]
    return f"{head} {body}"

def save_jsonl_line(path: str, obj: Dict):
    with open(path, "a", encoding="utf-8") as f:
        f.write(json.dumps(obj, ensure_ascii=False) + "\n")

def run_rag_and_log(
    query: str,
    retriever,              # e.g., combined_retriever
    llm,                    # e.g., ChatOpenAI(...)
    prompt,                 # ChatPromptTemplate or Runnable prompt
    format_docs_fn,         # 사용자가 만든 format_docs(docs)
    ground_truth: Optional[str] = None,
    extra_meta: Optional[Dict] = None,
) -> str:
    """
    1) retriever로 문서 가져오기
    2) 포맷팅 컨텍스트 만들기 → LLM 호출
    3) JSONL로 (question/answer/contexts/ground_truth) 저장
    4) 모델 답변 반환
    """
    # 1) 검색
    docs = retriever.get_relevant_documents(query)
    # 평가 저장용 contexts (list[str])
    contexts = [_doc_to_str(d) for d in docs[:TOPK_CONTEXTS]]

    # 2) LLM 호출 (기존 파이프라인 호환)
    formatted_context = format_docs_fn(docs)
    chain_input = {"context": formatted_context, "question": query}
    msg = prompt.invoke(chain_input)
    result = llm.invoke(msg).content if hasattr(llm.invoke(msg), "content") else llm.invoke(msg)
    answer_text = result if isinstance(result, str) else str(result)

    # 3) JSONL 저장
    row = {
        "id": str(uuid.uuid4()),
        "timestamp": dt.datetime.now().isoformat(timespec="seconds"),
        "question": _clean_text(query),
        "answer": _clean_text(answer_text),
        "contexts": contexts,
        "ground_truth": _clean_text(ground_truth) if ground_truth else "",
    }
    if extra_meta:
        row["meta"] = extra_meta

    save_jsonl_line(EVAL_JSONL_PATH, row)
    return answer_text

In [8]:
def run_batch_and_log(
    queries: Iterable[str],
    retriever,
    llm,
    prompt,
    format_docs_fn,
    ground_truths: Optional[Dict[str, str]] = None,  # {질문: 정답요약}
):
    outputs = []
    for q in queries:
        gt = ground_truths.get(q, "") if ground_truths else ""
        ans = run_rag_and_log(
            query=q,
            retriever=retriever,
            llm=llm,
            prompt=prompt,
            format_docs_fn=format_docs_fn,
            ground_truth=gt,
        )
        outputs.append({"question": q, "answer": ans})
    return outputs

In [None]:
# 당신이 이미 만들었던 객체들 재사용
# combined_retriever, llm, prompt_dowon (= ChatPromptTemplate), format_docs

queries = [
    # 1. 단순 정보 조회
    "DB손해보험 운전자보험의 해약환급금이 있나요?",
    "현대해상 운전자보험에서 음주운전 사고도 보장되나요?",
    "삼성화재 운전자보험은 형사합의금 지원이 있나요?",
    "메리츠 운전자보험의 가입 최소 연령은 몇 살인가요?",
    "한화손해보험 운전자보험의 보장 기간은 얼마나 되나요?",

    # 2. 조건부 질문
    "제가 무면허 운전 중 사고를 냈을 경우 DB손해보험에서 보장되나요?",
    "메리츠 운전자보험은 뺑소니 사고에 대해서도 보장하나요?",
    "삼성화재 운전자보험에서 신호위반 사고 시 어떤 보장을 받을 수 있나요?",
    "한화손해보험 운전자보험은 교통사고로 2주 이상 입원할 경우 어떤 지원이 있나요?",
    "현대해상 운전자보험에서 자전거 사고도 보장되나요?",

    # 3. 비교형 질문
    "DB손해보험과 메리츠 운전자보험의 해약환급금 조건 차이를 알려주세요.",
    "삼성화재와 한화손해보험 운전자보험의 형사합의금 지원 차이는 무엇인가요?",
    "현대해상과 메리츠 운전자보험은 벌금 보장 한도가 어떻게 다른가요?",
    "DB손해보험과 삼성화재 운전자보험의 보장기간 차이를 비교해주세요.",
    "한화손해보험과 현대해상 운전자보험 중 입원비 보장이 더 좋은 곳은 어디인가요?",

    # 4. 추천형 질문
    "제가 26세 여성이고, 신호위반 뺑소니 사고로 1주일 입원했을 때, 어떤 보험사가 가장 유리한가요?",
    "40대 남성이 음주운전 사고를 냈을 때 보장받기 좋은 운전자보험은 어디인가요?",
    "출퇴근길 자전거 사고에 대비하려면 어느 보험사가 적합한가요?",
    "초보 운전자가 가입하기 좋은 운전자보험은 어디인가요?",
    "해약환급금이 없는 상품과 있는 상품 중 장기적으로 어떤 선택이 더 유리한가요?",

    # 5. 오해 유발 질문
    "운전자보험은 자동차 수리비도 보장해주나요?",
    "운전자보험에 가입하면 건강보험 혜택도 늘어나나요?",
    "운전자보험은 해외에서 발생한 교통사고도 보장하나요?",
    "운전자보험은 자동차 종합보험과 동일한 보장을 해주나요?",
    "운전자보험을 들면 자동차 보험료가 할인되나요?",

    # 6. 애매한 표현 포함 질문
    "보험을 깨면 얼마 돌려받을 수 있어요?",
    "교통사고 났을 때 변호사 선임비도 나와요?",
    "운전자보험에서 합의금은 최대 얼마까지 받을 수 있어요?",
    "큰 사고 말고 가벼운 접촉사고도 보장이 되나요?",
    "경찰 조사 받을 때 필요한 비용도 보장되나요?"
]


# 선택: 사람이 미리 정리한 참조 정답(있는 경우)
# ground_truths = {
#     "DB손해보험 운전자보험의 해약환급금이 있나요?":
#         "중도 해지 시 해약환급금은 상품유형/납입기간에 따라 없거나 적을 수 있음.",
# }

batch_outputs = run_batch_and_log(
    queries=queries,
    retriever=combined_retriever,
    llm=llm,
    prompt=prompt_dowon,
    format_docs_fn=format_docs,
    # ground_truths=ground_truths
)

print(f"JSONL 저장 완료 → {EVAL_JSONL_PATH}")

In [6]:
import pandas as pd
from ragas import evaluate
from ragas.metrics import (
    Faithfulness,        # 답변 충실도 (hallucination 방지)
    AnswerRelevancy,    # 답변의 질문 관련성 답변이 원래 질문에 얼마나 직접적으로 관련되어 있는지를 측정
    ContextRelevance,  # 검색된 컨텍스트의 질문 관련성 검색한 문서들이 실제로 질문에 답변하는 데 얼마나 유용한 정보를 담고 있는지 측정
)
from datasets import Dataset

def load_jsonl(path: str):
    with open(path, "r", encoding="utf-8") as f:
        return [json.loads(line) for line in f]

records = load_jsonl(EVAL_JSONL_PATH)

df = {
    "question": [r["question"] for r in records],
    "answer": [r["answer"] for r in records],
    "contexts": [r["contexts"] for r in records],  # list[str]
    "ground_truth": [r.get("ground_truth", "") for r in records],
}
eval_dataset = Dataset.from_dict(df)

# 5. RAGAS 평가 실행
result = evaluate(
    dataset=eval_dataset,
    metrics=[
        Faithfulness(),
        AnswerRelevancy(),
        ContextRelevance()
        ]
)

# 6. 결과 분석 출력
def analyze_evaluation_results(result):
    print("=== RAG 성능 평가 결과 ===")
    df = result.to_pandas()
    print("평가 결과:\n", df)

    print("\n=== 메트릭별 평균 점수 ===")
    numeric_columns = df.select_dtypes(include=['float64', 'int64']).columns
    for col in numeric_columns:
        print(f"{col}: {df[col].mean():.3f}")

analyze_evaluation_results(result)