<a href="https://colab.research.google.com/github/imjongho/AndroidStudio/blob/main/Q%26A_ChatBot2_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 인공지능 PDF Q&A 챗봇 프로젝트

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install python-dotenv
# 일단 종호의 openai API 키를 .env 파일에 넣어 놓음 => 나중에 바꾸기

In [None]:
!pip install langchain_openai==0.3.7

In [None]:
!pip install langchain-huggingface==0.1.2

In [None]:
!pip install langchain_community==0.3.18

In [None]:
!pip install faiss-cpu==1.10.0

In [None]:
!pip install pypdf

In [None]:
#!pip install pydantic==2.10.6
#!pip uninstall -y gradio
#!pip install --upgrade gradio gradio-client

In [None]:
#!pip uninstall numpy -y
#!pip install --no-cache-dir numpy==1.26.4
# colab에 numpy 2.버전이 설치되어 있어서 버전 충돌남

In [2]:
import numpy
print(numpy.__version__)

1.26.4


In [1]:
import gradio
print(gradio.__version__)

5.29.0


In [None]:
import gradio as gr
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_text_splitters import CharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.vectorstores import FAISS
from langchain_core.runnables import RunnablePassthrough
from langchain_core.chat_history import BaseChatMessageHistory, InMemoryChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, trim_messages
from langchain.retrievers.multi_query import MultiQueryRetriever

# 환경 변수 불러오기(openai API 키)
load_dotenv('/content/drive/MyDrive/Colab Notebooks/.env')

# LLM 설정
llm = ChatOpenAI(model="gpt-4o-mini")

# 텍스트 분리
text_splitter = CharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=100
)

# 임베딩 모델
hf_embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-m3")

# 프롬프트 템플릿
system_message = """
당신은 사용자의 질문에 답변을 하는 친절한 AI 어시스턴트입니다.
당신의 임무는 주어진 문맥을 토대로 사용자 질문에 답하는 것입니다.
만약, 문맥에서 답변을 위한 정보를 찾을 수 없다면 '주어진 정보에서 질문에 대한 정보를 찾을 수 없습니다' 라고 답하세요.
정보를 찾을 수 있다면 한글로 답변해 주세요.
"""

human_message = """
## 과거 대화 내역:
{memory}

## 검색된 문서:
{context}

## 최신 사용자 질문:
{input}
"""
prompt_template = ChatPromptTemplate.from_messages(
    [
        ("human", human_message)
    ]
)

# 출력 파서
parser = StrOutputParser()

# 트리머 설정
trimmer = trim_messages(
    max_tokens=200,
    token_counter=llm,
    strategy="last",
    include_system=False,
    start_on="human"
)

# 전역 변수
db = None
retriever = None
rag_chain = None
faiss_path = "/content/drive/MyDrive/FaissDB/knu_faiss_db"
history_store: dict[str, InMemoryChatMessageHistory] = {}
session_counter = 1

# 세션 초기화 함수
def init_session(session_id: str):
    if session_id in history_store:
        return
    messages = InMemoryChatMessageHistory()
    messages.add_message(SystemMessage(content=system_message))
    history_store[session_id] = messages

# 세션 히스토리 함수, 최근 3쌍의 Q&A 메시지만 남기고 나머지 삭제(속도 증가)
def get_session_history(session_id: str, limit: int = 6) -> BaseChatMessageHistory:
    messages = history_store[session_id]
    while len(messages.messages) > limit + 1:
        del messages.messages[1]
    return messages

# RAG 체인 로드
def load_chain():
    global retriever, rag_chain

    # 검색기 context
    base_retriever = db.as_retriever(
        search_type="mmr",
        search_kwargs={"k": 3, "fetch_k": 10, "lambda_mult": 0.5}
    )
    # LLM을 이용해 여러 개의 서브 쿼리를 만들어 base_retriever에 넘겨 더 풍부한 결과를 가져옴(기본: 3)
    retriever = MultiQueryRetriever.from_llm(base_retriever, llm = llm)

    # 대화 내역 불러와서 trimmer 적용
    history = RunnableWithMessageHistory(RunnablePassthrough(), get_session_history)
    trimmed = history | trimmer

    # 최종 RAG 체인
    rag_chain = {
        "memory" : trimmed,
        "context": retriever,
        "input": RunnablePassthrough()
    } | prompt_template | llm | parser

# FAISS DB 로드
def load_faiss_db():
    global db
    db = FAISS.load_local(
        folder_path=faiss_path,
        embeddings=hf_embeddings,
        allow_dangerous_deserialization=True
    )

# PDF 업로드 및 DB 저장
def add_pdf_to_db(file):
    global db

    loader = PyPDFLoader(file.name)
    docs = loader.load_and_split(text_splitter=text_splitter)

    # 각 청크에 파일명 metadata 추가
    for doc in docs:
        doc.metadata["file_name"] = os.path.basename(file.name)
        # 메타데이터 추가 가능

    if db is None:
        db = FAISS.from_documents(docs, hf_embeddings)
    else:
      db.add_documents(docs)

    db.save_local(faiss_path)
    load_chain()    # DB가 바뀌었으니 retriever/chain을 업데이트
    return f"{os.path.basename(file.name)} 문서를 처리하여 FAISS DB에 저장했습니다."

# 질문 처리
def answer_question(question: str, session_id: str) -> str:
    init_session(session_id)
    # RAG 체인 호출 → 답변 생성(자동으로 history에 기록됨)
    answer = rag_chain.invoke(
        question,
        config={"configurable": {"session_id": session_id}}
    )
    # AI 메시지 수동 저장
    get_session_history(session_id).add_message(AIMessage(content=answer))
    return answer

# FAISS DB에 저장된 문서 목록 보여주는 함수
def show_stored_documents():
    if db is None:
        return "DB 로드 문제"
    docs = list(db.docstore._dict.values())  # 저장된 모든 청크들을 가져와 리스트로 변환
    file_names = {doc.metadata.get("file_name", "Unknown") for doc in docs}
    return "📚 저장된 문서 목록:\n" + "\n".join(f"• {f}" for f in sorted(file_names))

# 세션별로 저장된 대화 내역 보여주는 함수(Human/AI)
def show_history(session_id: str) -> str:
    init_session(session_id)
    msgs = get_session_history(session_id).messages

    lines = []
    if msgs and msgs[0].type == "system":
        lines.append(f"System: {msgs[0].content.strip()}")

    seq = [(m.type, m.content.strip()) for m in msgs if m.type in ("human", "ai")]
    pairs = [
        f"Human: {usr}\nAI: {ai}"
        for (t1, usr), (t2, ai) in zip(seq, seq[1:])
        if t1 == "human" and t2 == "ai"
    ]
    lines.extend(pairs)
    return "\n\n".join(lines)

# Gradio UI 설정
with gr.Blocks(theme=gr.themes.Soft()) as demo:
    # ── 사이드바: 세션 관리 ──
    with gr.Sidebar():
        session_dropdown = gr.Dropdown(
            label="채팅 세션 선택",
            choices=[],
            value=None,
            interactive=True
        )
        new_session_btn = gr.Button("➕ 새 채팅")
        del_session_btn = gr.Button("🗑 세션 삭제")

    gr.Markdown("""
    # 📄 인공지능 PDF Q&A 챗봇
    **여러 PDF 파일을 업로드하고 질문을 입력하면 AI가 답변을 제공합니다!**
    """)

    with gr.Row():
        with gr.Column(scale=1):
            file_input = gr.File(label="PDF 파일 선택")
            upload_button = gr.Button("📤 벡터 DB에 저장")
            show_files_button = gr.Button("📚 저장된 문서 보기")
            status_output = gr.Textbox(label="📢 상태 메시지")

        with gr.Column(scale=2):
            question_input = gr.Textbox(label="❓ 질문 입력", placeholder="궁금한 내용을 적어주세요.")
            submit_button = gr.Button("🤖 답변 받기")
            answer_output = gr.Textbox(label="📝 AI 답변")

        with gr.Column(scale=3):
            show_history_button = gr.Button("🕘 히스토리 보기")
            history_output = gr.Textbox(label="🗒 전체 대화 내역 | System/Human/AI Message", lines=15, interactive=False)

    # ── 사이드바 이벤트 바인딩 ──
    def create_session():
        global session_counter
        sid = f"session_{session_counter}"
        session_counter += 1
        init_session(sid)
        # Dropdown choices 갱신 & 새로 만든 세션으로 선택
        return gr.update(choices=list(history_store.keys()), value=sid)

    # 세션 삭제 함수
    def delete_session(session_id: str):
        history_store.pop(session_id, None)

    new_session_btn.click(
        fn=create_session,
        inputs=None,
        outputs=[session_dropdown]
    )

    def remove_session(sid):
        delete_session(sid)
        keys = list(history_store.keys())
        new_val = keys[0] if keys else None
        return gr.update(choices=keys, value=new_val)

    del_session_btn.click(
        fn=remove_session,
        inputs=[session_dropdown],
        outputs=[session_dropdown]
    )

    # ── 파일 업로드 / 질문 / 히스토리 이벤트 ──
    upload_button.click(add_pdf_to_db, inputs=file_input, outputs=status_output)
    submit_button.click(answer_question, inputs=[question_input, session_dropdown], outputs=answer_output)
    show_files_button.click(show_stored_documents, outputs=status_output)
    show_history_button.click(show_history, inputs=[session_dropdown], outputs=history_output)

# 벡터 DB 로드 후 실행
load_faiss_db()
init_session("session_1")
load_chain()
demo.launch(debug=True)