In [4]:
import os
import re
import time
import pickle
import pandas as pd
from tqdm import tqdm
from collections import defaultdict
from IPython.display import clear_output

from dotenv import load_dotenv
from langchain.docstore.document import Document
from langchain.chains import RetrievalQA, LLMChain
from langchain.prompts import PromptTemplate
from langchain_community.embeddings import ClovaXEmbeddings
from langchain_community.chat_models import ChatClovaX
from pymilvus import connections, utility
from langchain_community.vectorstores.milvus import Milvus

### Milvus 연결 & CLOVA_X SETTING

In [5]:
# Milvus 연결 및 env 환경 설정
load_dotenv(dotenv_path=r"C:\Kill_the_RAG\Project\Aiffel_final_project\.env")

os.environ["NCP_CLOVASTUDIO_API_KEY"] = os.getenv("NCP_CLOVASTUDIO_API_KEY")
os.environ["NCP_CLOVASTUDIO_API_URL"] = os.getenv(
    "NCP_CLOVASTUDIO_API_URL", "https://clovastudio.stream.ntruss.com/"
)

connections.connect(
    alias="default",
    host=os.getenv("MILVUS_HOST", "localhost"),
    port=os.getenv("MILVUS_PORT", "19530"),
)

# 임베딩 모델 및 LLM 초기화
ncp_embeddings = ClovaXEmbeddings(model="bge-m3")
llm_clova = ChatClovaX(model="HCX-003", max_tokens=2048)

In [6]:
# pkl 임베딩 파일에서 데이터 불러오기
embedding_file = r"C:\Kill_the_RAG\Project\Aiffel_final_project\Code\Data\semifinal_embedding\embedding_semifinal.pkl"
if os.path.exists(embedding_file):
    with open(embedding_file, "rb") as f:
        saved_data = pickle.load(f)
    all_text_embedding_pairs = saved_data["embeddings"]
    all_metadata_list = saved_data["metadata"]
    print("임베딩 데이터 불러오기")
else:
    raise FileNotFoundError(f"임베딩 파일을 찾을 수 없습니다: {embedding_file}")

# 메타데이터 키 매핑
metadata_mapping = {
    "ISBN": "ISBN",
    "페이지": "page",
    "가격": "price",
    "제목": "title",
    "저자": "author",
    "분류": "category",
    "저자소개": "author_intro",
    "책소개": "book_intro",
    "목차": "table_of_contents",
    "출판사리뷰": "publisher_review",
    "추천사": "recommendation",
}

all_metadata_list_mapped = []
for meta in all_metadata_list:
    mapped_meta = {metadata_mapping.get(key, key): value for key, value in meta.items()}
    all_metadata_list_mapped.append(mapped_meta)

# Document 리스트 구성
documents = [
    Document(page_content=pair[0], metadata=meta)
    for pair, meta in zip(all_text_embedding_pairs, all_metadata_list_mapped)
]

임베딩 데이터 불러오기


In [7]:
# Milvus 벡터 DB 구축
collection_name = "book_rag_db"
if utility.has_collection(collection_name):
    utility.drop_collection(collection_name)

vectorstore = Milvus(
    embedding_function=ncp_embeddings,
    collection_name=collection_name,
    connection_args={"host": "localhost", "port": "19530"},
    auto_id=True,
)

texts = [pair[0] for pair in all_text_embedding_pairs]
embeds = [pair[1] for pair in all_text_embedding_pairs]


def precomputed_embed_documents(cls, input_texts):
    if input_texts != texts:
        raise ValueError(
            "ERROR : 입력 텍스트 순서가 사전 계산된 임베딩과 일치하지 않음"
        )
    return embeds


ClovaXEmbeddings.embed_documents = classmethod(precomputed_embed_documents)

vectorstore.add_texts(
    texts=texts, metadatas=all_metadata_list_mapped, embeddings=embeds
)

  vectorstore = Milvus(


[456881635511137694,
 456881635511137695,
 456881635511137696,
 456881635511137697,
 456881635511137698,
 456881635511137699,
 456881635511137700,
 456881635511137701,
 456881635511137702,
 456881635511137703,
 456881635511137704,
 456881635511137705,
 456881635511137706,
 456881635511137707,
 456881635511137708,
 456881635511137709,
 456881635511137710,
 456881635511137711,
 456881635511137712,
 456881635511137713,
 456881635511137714,
 456881635511137715,
 456881635511137716,
 456881635511137717,
 456881635511137718,
 456881635511137719,
 456881635511137720,
 456881635511137721,
 456881635511137722,
 456881635511137723,
 456881635511137724,
 456881635511137725,
 456881635511137726,
 456881635511137727,
 456881635511137728,
 456881635511137729,
 456881635511137730,
 456881635511137731,
 456881635511137732,
 456881635511137733,
 456881635511137734,
 456881635511137735,
 456881635511137736,
 456881635511137737,
 456881635511137738,
 456881635511137739,
 456881635511137740,
 456881635511

In [8]:
# Dense Retriever 및 RetrievalQA 체인 구축
dense_retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
dpr_qa_chain = RetrievalQA.from_chain_type(
    llm=llm_clova, retriever=dense_retriever, return_source_documents=True
)


def extract_field(text, field_name):
    pattern = rf"{re.escape(field_name)}\s*:\s*(.*)"
    match = re.search(pattern, text)
    return match.group(1).strip() if match else ""


MIN_INFO_LENGTH = 10

In [9]:
# 결정 프롬프트 (Decision Prompt)
decision_prompt = """
[대화 맥락]
사용자 대화 내역:
{{ history }}
사용자의 최신 질문: "{{ query }}"

[역할 및 목표]
{{ role_instructions }}
현재 대화 상황과 질문의 맥락을 분석하여 아래 중 하나의 행동을 출력해라:
- "추천": 사용자가 책 추천을 명확히 요청한 경우, 추가 정보 없이 바로 추천을 진행.
- "추가 질문": 책 추천을 위해 더 세부적인 선호도 정보(예: 작가, 시대, 장르 등)가 필요하면, 구체적인 질문을 던져서 정보를 요청.
- "대화 유지": 책 추천이 필요하지 않거나, 단순 일상 대화로 이어지길 원하면, 추천 검색 없이 일반 대화 응답을 생성.

[최종 출력 형식 - 반드시 아래 내용만 출력]
{% raw %}
행동: "{{행동 (추천/추가 질문/대화 유지)}}"
추천 책 정보: "{{책 제목 및 상세 정보, 정보 충분 시; 정보 부족 시 빈 문자열}}"
추가 질문: "{{추가 질문, 정보 보완 필요 시; 충분하면 빈 문자열}}"
{% endraw %}
"""

# 페르소나별 역할
literature_role = "너는 감성적이고 문학적인 도서 추천 챗봇이다. 사용자의 감정과 취향을 섬세하게 파악하여, 감성적인 언어로 책을 추천해라."
science_role = "너는 정확하고 논리적인 과학/기술 도서 추천 챗봇이다. 사용자의 관심 분야와 요구를 분석하여, 명확하고 구체적인 정보로 책을 추천해라."

In [63]:
class BaseRAGPipeline:
    def __init__(self, config, llm, retriever, qa_chain, documents):
        self.config = config
        self.llm = llm
        self.retriever = retriever
        self.qa_chain = qa_chain
        self.documents = documents
        
        # 멀티턴 대화 관리
        self.query_history = []
        self.user_preferences = defaultdict(list)
        self.preferences_text = ""

        self.decision_chain = LLMChain(
            llm=self.llm,
            prompt=PromptTemplate(
                template=decision_prompt,
                input_variables=["history", "query", "role_instructions"],
                template_format="jinja2",
            ),
        )

    def robust_parse_decision_response(self, response_text):
        action_match = re.search(r"행동\s*[:：]\s*\"?([^\"\n]+)\"?", response_text)
        action = action_match.group(1).strip() if action_match else None

        book_info_match = re.search(r"추천\s*책\s*정보\s*[:：]\s*\"?([^\"\n]+)\"?", response_text)
        book_info = book_info_match.group(1).strip() if book_info_match else ""

        follow_match = re.search(r"추가\s*질문\s*[:：]\s*\"?([^\"\n]+)\"?", response_text)
        additional_question = follow_match.group(1).strip() if follow_match else ""
        return action, book_info, additional_question

    def generate_answer(self, query):
        # RAG 검색 후 결과 조합
        try:
            result = self.qa_chain.invoke(query)
        except Exception as e:
            print("Error invoking QA chain:", e)
            fallback_response = self.llm.invoke(
                "죄송합니다. 추천 정보를 가져오는데 실패했습니다. 추가 정보를 제공해 주실 수 있나요?"
            )
            return fallback_response.text().strip(), None

        source_docs = result["source_documents"]
        retrieved_isbns = set()
        for doc in source_docs:
            isbn = doc.metadata.get("ISBN")
            if isbn:
                retrieved_isbns.add(isbn)

        aggregated_docs = []
        for isbn in retrieved_isbns:
            book_docs = [doc for doc in self.documents if doc.metadata.get("ISBN") == isbn]
            if not book_docs:
                continue
            aggregated_text = "\n".join([doc.page_content for doc in book_docs])
            aggregated_docs.append(Document(page_content=aggregated_text, metadata=book_docs[0].metadata))

        formatted_answers = []
        for doc in aggregated_docs:
            metadata = doc.metadata
            title = metadata.get("title") or extract_field(doc.page_content, "제목")
            author = metadata.get("author") or extract_field(doc.page_content, "저자")
            aggregated_text = doc.page_content
            book_intro = extract_field(aggregated_text, "책소개")
            publisher_review = extract_field(aggregated_text, "출판사리뷰")
            recommendation_field = extract_field(aggregated_text, "추천사")

            # 책소개 > 출판사리뷰 > 추천사 순
            if book_intro and len(book_intro.strip()) >= MIN_INFO_LENGTH:
                selected_info = book_intro
            elif publisher_review and len(publisher_review.strip()) >= MIN_INFO_LENGTH:
                selected_info = publisher_review
            elif recommendation_field and len(recommendation_field.strip()) >= MIN_INFO_LENGTH:
                selected_info = recommendation_field
            else:
                selected_info = ""

            if not selected_info:
                reason = "추천 정보 생성 불가"
            else:
                reason_prompt = (
                    f"다음 정보를 참고하여, 이 책이 추천되는 이유를 간결하고 명확하게 요약해라. "
                    f"책의 주요 특징과 강점을 중심으로 설명해라.\n\n정보:\n{selected_info}"
                )
                reason_response = self.llm.invoke(reason_prompt)
                generated_reason = reason_response.text().strip()
                if not generated_reason or len(generated_reason) < 10 or "추천 정보 생성 불가" in generated_reason:
                    reason = "추천 정보 생성 불가"
                else:
                    reason = generated_reason

            formatted = f"책 제목: {title}\n저자: {author}\n추천 이유: {reason}"
            formatted_answers.append(formatted)

        answer = "\n\n".join(formatted_answers)
        refined_answer = self.refine_answer_with_persona(answer)
        return refined_answer, None

    def refine_answer_with_persona(self, raw_answer):
        prompt = (
            "아래 원본 추천 결과를 읽고, 각 책의 정보를 다음 형식에 맞춰 재작성해라.\n\n"
            "형식:\n"
            "책 제목: <책 제목>\n"
            "저자: <저자>\n"
            "추천 이유: <추천 이유>\n\n"
            "원본 추천 결과:\n"
            f"{raw_answer}\n\n"
            "출력 시, 반드시 위 형식만을 사용하고 불필요한 안내 문구는 포함하지 말아라."
        )
        refined = self.llm.invoke(prompt).text().strip()
        return refined

    def print_chat_history(self):
        """최근 4줄의 대화만 출력 (중복 인삿말 제거)"""
        print("-" * 50)
        for line in self.query_history[-4:]:
            if line.startswith("사용자:"):
                print(f"[사용자] {line.split(':',1)[1].strip()}")
            elif line.startswith("챗봇:"):
                print(f"[챗봇] {line.split(':',1)[1].strip()}")
        print("-" * 50)

    def search_and_generate_answer(self, user_query):
        self.preferences_text = " ".join(self.user_preferences["preferences"])
        query_summary = "\n".join(self.query_history[-5:])
        final_query = f"{user_query} {self.preferences_text}".strip()

        prompt_vars = {
            "history": query_summary,
            "query": final_query,
            "role_instructions": self.config["role_instructions"],
        }
        decision_response = self.decision_chain.invoke(prompt_vars)
        response_text = decision_response["text"].strip()

        # 디버그 출력: 최종 검색 쿼리와 행동을 한 번만 출력
        print(f"\n[디버그] 최종 검색 쿼리: {final_query}")
        action, book_info, additional_question = self.robust_parse_decision_response(response_text)
        print(f"[디버그] 행동: {action}")

        if action == "추가 질문" and additional_question:
            self.query_history.append(f"챗봇: {additional_question}")
            self.print_chat_history()
            user_response = input("[사용자] ")
            self.query_history.append(f"사용자: {user_response}")
            if user_response not in self.user_preferences["preferences"]:
                self.user_preferences["preferences"].append(user_response)
            self.preferences_text = " ".join(self.user_preferences["preferences"])
            final_query = f"{user_query} {self.preferences_text}".strip()
            prompt_vars["query"] = final_query
            decision_response = self.decision_chain.invoke(prompt_vars)
            response_text = decision_response["text"].strip()
            print(f"\n[디버그] 최종 검색 쿼리: {final_query}")
            action, book_info, additional_question = self.robust_parse_decision_response(response_text)
            print(f"[디버그] 행동: {action}")

        if action == "추천":
            final_query = f"{user_query} {self.preferences_text}".strip()
            answer, _ = self.generate_answer(final_query)
            # 디버그 출력은 제거하여 중복 출력을 방지
            return answer

        if action == "대화 유지":
            response = self.llm.invoke(f"대화 응답: {user_query}\n대화 내역: {query_summary}")
            return response.text().strip()

        response = self.llm.invoke(f"대화 응답: {user_query}\n대화 내역: {query_summary}")
        return response.text().strip()

    def interactive_multi_turn_qa(self):
        # 최초 인사말은 이미 query_history에 저장되어 있음.
        self.print_chat_history()
        while True:
            user_query = input("[사용자] ")
            if user_query.lower() == "quit":
                print("\n[대화 저장 중...]")
                print("대화 저장 완료")
                import sys
                sys.exit()

            self.query_history.append(f"사용자: {user_query}")
            answer = self.search_and_generate_answer(user_query)
            if answer is not None:
                self.query_history.append(f"챗봇: {answer}")
                # 이번 턴의 사용자 메시지와 챗봇 응답만 출력
                print("-" * 50)
                print(f"[사용자] {user_query}")
                print(f"[챗봇] {answer}")
                print("-" * 50)

In [64]:
# ---------------- 페르소나별 RAG 파이프라인 ----------------
class LiteratureRAGPipeline(BaseRAGPipeline):
    def __init__(self, llm, retriever, qa_chain, documents):
        config = {"persona": "Literature", "role_instructions": literature_role}
        super().__init__(config, llm, retriever, qa_chain, documents)

class ScienceRAGPipeline(BaseRAGPipeline):
    def __init__(self, llm, retriever, qa_chain, documents):
        config = {"persona": "Science", "role_instructions": science_role}
        super().__init__(config, llm, retriever, qa_chain, documents)

In [66]:
def main():
    print("페르소나 선택:")
    print("1. 예술/문학")
    print("2. 과학/기술")
    choice = input("원하는 페르소나 번호를 입력하세요 (1 또는 2): ").strip()

    if choice == "1":
        pipeline = LiteratureRAGPipeline(llm_clova, dense_retriever, dpr_qa_chain, documents)
        greeting = "안녕하세요! 감성적이고 문학적인 도서 추천 챗봇입니다. 어떤 책을 읽고 싶으신가요?"
    elif choice == "2":
        pipeline = ScienceRAGPipeline(llm_clova, dense_retriever, dpr_qa_chain, documents)
        greeting = "안녕하십니까. 정확하고 논리적인 과학/기술 도서 추천 챗봇입니다. 관심 있는 분야에 대해 편하게 이야기해 주세요."
    else:
        print("잘못된 선택입니다. 기본 예술/문학 페르소나로 실행합니다.")
        pipeline = LiteratureRAGPipeline(llm_clova, dense_retriever, dpr_qa_chain, documents)
        greeting = "안녕하세요! 감성적이고 문학적인 도서 추천 챗봇입니다. 어떤 책을 읽고 싶으신가요?"

    # 히스토리에 인사말 저장
    pipeline.query_history.append(f"챗봇: {greeting}")

    # 시작
    pipeline.interactive_multi_turn_qa()

if __name__ == "__main__":
    main()

페르소나 선택:
1. 예술/문학
2. 과학/기술
--------------------------------------------------
[챗봇] 안녕하세요! 감성적이고 문학적인 도서 추천 챗봇입니다. 어떤 책을 읽고 싶으신가요?
--------------------------------------------------

[디버그] 최종 검색 쿼리: 심심해 죽겠어
[디버그] 행동: 추가 질문
--------------------------------------------------
[챗봇] 안녕하세요! 감성적이고 문학적인 도서 추천 챗봇입니다. 어떤 책을 읽고 싶으신가요?
[사용자] 심심해 죽겠어
[챗봇] 심심하실 때 주로 어떤 장르의 책을 읽으시나요?
--------------------------------------------------

[디버그] 최종 검색 쿼리: 심심해 죽겠어 흥미 진진한 추리 소설
[디버그] 행동: 추가 질문
--------------------------------------------------
[사용자] 심심해 죽겠어
[챗봇] 챗봇: 심심하시군요! 그렇다면 마음을 편안하게 해주는 힐링 소설이나 가볍게 읽을 수 있는 에세이를 읽어보시는 건 어떠세요? 제가 몇 가지 추천해드릴게요.  

1. 달러구트 꿈 백화점 (이미예) - 잠들어야만 입장할 수 있는 꿈 백화점에서 일어나는 이야기를 담은 판타지 소설이에요. 

2. 지구 끝의 온실 (김초엽) - 멸망한 지구에서 살아남은 사람들의 이야기를 그린 SF 소설로, 아름다운 문장과 감동적인 이야기로 인기를 끌고 있어요.

3. 하얼빈 (김훈) - 안중근 의사의 마지막 순간을 그린 역사소설로, 김훈 작가의 섬세한 문체와 강렬한 이야기가 인상적이에요.

4. 튜브 (손원평) - 실패와 좌절을 겪은 주인공이 다시 일어서는 과정을 그린 성장소설로, 유쾌하면서도 따뜻한 이야기가 담겨 있어요.

5. 햇빛은 찬란하고 인생은 귀하니까요 (장명숙) - 이탈리아 유학파 패션 디자이너이자 유튜버인 밀라논

SystemExit: 

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [None]:
result = dpr_qa_chain.invoke({"query": "..."})
print(len(result["source_documents"]))

3


In [26]:
# pd.set_option('display.max_rows', None)      # 모든 행 보기
# pd.set_option('display.max_columns', None)   # 모든 열 보기
pd.set_option("display.max_colwidth", None)  # 열 내용 길이 제한 없음

In [49]:
# df.loc[
#     df["제목"].str.contains("죄와 벌", na=False),
#     ["제목", "책소개", "출판사리뷰", "저자", "저자소개", "분류"],
# ]

In [20]:
pd.reset_option("all")

  pd.reset_option("all")
  pd.reset_option("all")
