In [1]:
from app.core.config import settings
from app.database.mysql import get_mysql_connection
connection = get_mysql_connection()

2024-12-22 01:51:49,911 - fastapi_project - INFO - Retrieved parameter: /tailorlink/mysql/MYSQL_URI
2024-12-22 01:51:50,099 - fastapi_project - INFO - Retrieved parameter: /tailorlink/mysql/MYSQL_USER
2024-12-22 01:51:50,286 - fastapi_project - INFO - Retrieved parameter: /tailorlink/mysql/MYSQL_PASSWORD
2024-12-22 01:51:50,469 - fastapi_project - INFO - Retrieved parameter: /tailorlink/mysql/MYSQL_DB_NAME
2024-12-22 01:51:50,654 - fastapi_project - INFO - Retrieved parameter: /tailorlink/openai/OPENAI_API_KEY
2024-12-22 01:51:50,850 - fastapi_project - INFO - Retrieved parameter: /tailorlink/milvus/MILVUS_URI
2024-12-22 01:51:51,028 - fastapi_project - INFO - Retrieved parameter: /tailorlink/milvus/MILVUS_TOKEN
2024-12-22 01:51:51,217 - fastapi_project - INFO - Retrieved parameter: /tailorlink/langchain/LANGCHAIN_API_KEY
2024-12-22 01:51:51,403 - fastapi_project - INFO - Retrieved parameter: /tailorlink/langchain/LANGCHAIN_ENDPOINT
2024-12-22 01:51:51,481 - fastapi_project - INFO - Co

# 커스텀 MessageConverter 클래스 만들기

In [2]:
from typing import Any, Dict
from sqlalchemy import Column, Integer, String, Text, DateTime, func
from sqlalchemy.orm import Session, declarative_base
from langchain_community.chat_message_histories.sql import BaseMessageConverter
from langchain.schema.messages import BaseMessage

from langchain.schema import HumanMessage, AIMessage, SystemMessage

# 1) SQLAlchemy Base 선언
Base = declarative_base()


class MessageModel(Base):
    __tablename__ = "message_store_with_user"  # 테이블명 고정
    id = Column(Integer, primary_key=True, autoincrement=True)
    session_id = Column(String(255), index=True, nullable=False)
    message_type = Column(String(255), nullable=False)
    role = Column(String(255), nullable=True)
    content = Column(Text, nullable=True)
    # 새로 추가한 user_id
    user_id = Column(String(255), nullable=True)
    created_at = Column(
        DateTime(timezone=True),
        server_default=func.now(),
        index=True
    )

class MyMessageConverter(BaseMessageConverter):
    """user_id 열이 포함된 커스텀 컨버터"""

    def __init__(self, table_name: str = "message_store_with_user"):
        # 부모 클래스의 기본 초기화 (인자 없이)
        super().__init__()  
        # 여기서 우리가 원하는 table_name 값을 자식 클래스에서 저장
        self.table_name = table_name

    def get_sql_model_class(self):
        """
        SQLAlchemy 모델 정의
        """
        return MessageModel

    def to_dict(self, message: BaseMessage) -> Dict[str, Any]:
        """
        BaseMessage -> Dict
        (message_type, role, content, user_id 등 직접 구성)
        """
        row_dict = {}
        # 예: message_type (HumanMessage, AIMessage 등)
        row_dict["message_type"] = (
            "human" if message.__class__.__name__ == "HumanMessage"
            else "ai" if message.__class__.__name__ == "AIMessage"
            else "system"
        )
      
        row_dict["role"] = getattr(message, "role", None)
        row_dict["content"] = message.content

        # user_id가 message.additional_kwargs 안에 있을 수 있다고 가정
        row_dict["user_id"] = message.additional_kwargs.get("user_id")

        return row_dict

    def from_dict(self, row_dict: Dict[str, Any]) -> BaseMessage:
        """
        Dict -> LangChain BaseMessage 역변환
        (기본적으로 super().from_dict()로 message_type, role, content 등을 복원)
        """
        # message_type 구분
        msg_type = row_dict.get("message_type", "human")

        # content
        content = row_dict.get("content", "")
        # user_id를 additional_kwargs에 넣어두기
        additional_kwargs = {}
        if row_dict.get("user_id"):
            additional_kwargs["user_id"] = row_dict["user_id"]

        # message_type에 따라 적절한 Message 클래스로 생성
        if msg_type == "human":
            return HumanMessage(content=content, additional_kwargs=additional_kwargs)
        elif msg_type == "ai":
            return AIMessage(content=content, additional_kwargs=additional_kwargs)
        else:
            return SystemMessage(content=content, additional_kwargs=additional_kwargs)

    def to_sql_model(self, message: BaseMessage, session_id: str):
        """
        (필수) BaseMessage를 SQLAlchemy 모델 인스턴스로 변환
        """
        row_dict = self.to_dict(message)          # 우선 dict로 변환
        row_dict["session_id"] = session_id
        model_class = self.get_sql_model_class()  # 우리가 정의한 Model 클래스
        model_instance = model_class(**row_dict)  # 언패킹해서 생성
        return model_instance

    def from_sql_model(self, model_instance) -> BaseMessage:
        """
        (필수) SQLAlchemy 모델 -> BaseMessage 역변환
        """
        # model_instance의 컬럼을 dict로 만들기
        row_dict = {
            col.name: getattr(model_instance, col.name)
            for col in model_instance.__table__.columns
        }
        # 이를 다시 메시지 객체로 역직렬화
        message = self.from_dict(row_dict)
        return message


In [3]:
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories import SQLChatMessageHistory

# 먼저, 아무것도 저장하지 않는 "No-Op" 히스토리 클래스를 하나 정의
class NoOpChatMessageHistory(BaseChatMessageHistory):
    """add_message(), get_messages() 모두 아무것도 안 하는 히스토리"""

    def add_message(self, message):
        pass  # 아무 작업도 안 함

    def get_messages(self):
        return []  # 항상 빈 리스트

    def clear(self):
        pass



In [4]:
# 1) 커스텀 컨버터 인스턴스 생성
my_converter = MyMessageConverter(table_name="message_store_with_user")


In [5]:
from langchain_community.chat_message_histories import SQLChatMessageHistory
from sqlalchemy import create_engine
# SQLChatMessageHistory 객체를 생성하고 세션 ID와 데이터베이스 연결 파일을 설정

# MySQL 접속을 위한 connection string 설정
connection_string = f"mysql+pymysql://{settings.DATABASE_USER}:{settings.DATABASE_PASSWORD}@{settings.DATABASE_URL}:3306/{settings.DATABASE_NAME}"

chat_message_history = SQLChatMessageHistory(
    session_id="sql_history",
    connection=connection_string,
    custom_message_converter=my_converter,
)


In [6]:
# 사용자 메시지를 추가합니다.
chat_message_history.add_user_message(
    "안녕? 만나서 반가워. 내 이름은 테디야. 나는 랭체인 개발자야. 앞으로 잘 부탁해!"
)
# AI 메시지를 추가합니다.
chat_message_history.add_ai_message("안녕 테디, 만나서 반가워. 나도 잘 부탁해!")


In [7]:
from langchain_core.prompts import (
    ChatPromptTemplate,
    MessagesPlaceholder,
)
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser


In [8]:
prompt = ChatPromptTemplate.from_messages(
    [
        # 시스템 메시지
        ("system", "You are a helpful assistant."),
        # 대화 기록을 위한 Placeholder
        MessagesPlaceholder(variable_name="chat_history"),
        ("human", "{question}"),  # 질문
    ]
)

# chain 을 생성합니다.
chain = prompt | ChatOpenAI(model_name="gpt-4o-mini") | StrOutputParser()


In [9]:
def get_chat_history(user_id, session_id):
    return SQLChatMessageHistory(
        table_name="message_store_with_user",
        session_id=session_id,
        connection=connection_string,
        custom_message_converter=my_converter,
    )


In [10]:
from langchain_core.runnables.utils import ConfigurableFieldSpec

config_fields = [
    ConfigurableFieldSpec(
        id="user_id",
        annotation=str,
        name="User ID",
        description="Unique identifier for a user.",
        default="",
        is_shared=True,
    ),
    ConfigurableFieldSpec(
        id="session_id",
        annotation=str,
        name="Conversation ID",
        description="Unique identifier for a conversation.",
        default="",
        is_shared=True,
    )
]


In [11]:
chain_with_history = RunnableWithMessageHistory(
    chain,
    get_chat_history,  # 대화 기록을 가져오는 함수를 설정합니다.
    input_messages_key="question",  # 입력 메시지의 키를 "question"으로 설정
    history_messages_key="chat_history",  # 대화 기록 메시지의 키를 "history"로 설정
    history_factory_config=config_fields,  # 대화 기록 조회시 참고할 파라미터를 설정합니다.
)


In [12]:
# config 설정
config = {"configurable": {"session_id": "user1", "user_id": "mabasa"}}


In [13]:
# 질문과 config 를 전달하여 실행합니다.
chain_with_history.invoke({"question": "안녕 반가워, 내 이름은 테디야"}, config)


'안녕, 테디! 다시 만나서 반가워. 오늘은 어떤 이야기를 나눌까?'

In [14]:
# 후속 질문을 실해합니다.
chain_with_history.invoke({"question": "내 이름이 뭐라고?"}, config)


'너의 이름은 테디야! 맞지?'

In [15]:
# config 설정
config = {"configurable": {"user_id": "user3", "conversation_id": "conversation1"}}

# 질문과 config 를 전달하여 실행합니다.
chain_with_history.invoke({"question": "내 이름이 뭐라고?"}, config)


ValueError: Missing keys ['session_id'] in config['configurable'] Expected keys are ['session_id', 'user_id'].When using via .invoke() or .stream(), pass in a config; e.g., chain.invoke({'question': 'foo'}, {'configurable': {'session_id': '[your-value-here]'}})