In [1]:
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from pydantic import SecretStr
from langchain_openai import ChatOpenAI

from schema import ResultSchema, LangchainLLMExpenseAnalyzeResult, LangchainLLMExpenseAnalyzerConfig
from prompt import __PROMPT_TEMPLATE__
from pendulum import DateTime
from uuid import uuid4

if TYPE_CHECKING:
    from langchain_core.runnables import RunnableSerializable


@dataclass
class LangchainLLMExpenseAnalyzePipeline:
    """
    A generic pipeline for analyzing expenses using Langchain LLM.
    This class is a placeholder for the actual implementation.
    """

    config: LangchainLLMExpenseAnalyzerConfig | dict = field(default_factory=LangchainLLMExpenseAnalyzerConfig)
    client: ChatOpenAI = field(init=False, repr=False)
    pipeline: "RunnableSerializable" = field(init=False, repr=False)

    def __post_init__(self):
        if isinstance(self.config, dict):
            self.config = LangchainLLMExpenseAnalyzerConfig(**self.config)
        self.client = ChatOpenAI(**self.config.model_dump())
        self.client = self.client.with_structured_output(schema=ResultSchema, include_raw=True, strict=True)  # type: ignore
        self.pipeline = __PROMPT_TEMPLATE__ | self.client | self.__output_processe

    def __output_processe(self, response: dict) -> LangchainLLMExpenseAnalyzeResult:
        return LangchainLLMExpenseAnalyzeResult(
            id=response["raw"].id,  # type: ignore
            message=response["raw"].content,  # type: ignore
            expenses=response["parsed"].expenses,  # type: ignore
        )

    async def aanalyze(
        self,
        message: str,
        tags: list[str],
        current_datetime=DateTime.now("Asia/Seoul").to_iso8601_string(),
        config: dict | None = None,
    ) -> LangchainLLMExpenseAnalyzeResult:
        response = await self.pipeline.ainvoke(
            dict(
                current_datetime=current_datetime,
                message=message,
                tags=tags,
            ),
            config=config,
        )
        response.message = message.strip()
        return response

    def analyze(
        self,
        message: str,
        tags: list[str],
        current_datetime=DateTime.now("Asia/Seoul").to_iso8601_string(),
        config: dict | None = None,
    ) -> LangchainLLMExpenseAnalyzeResult:
        response = self.pipeline.invoke(
            dict(
                current_datetime=current_datetime,
                message=message,
                tags=tags,
            ),
            config=config,
        )
        response.message = message.strip()
        return response


In [2]:
pipeline = LangchainLLMExpenseAnalyzePipeline(
    config=dict(
        base_url="http://localhost:1234/v1",
        api_key="lm-studio",
        model="qwen2.5-vl-7b-instruct",
        temperature=0.0,
        max_tokens=1024 * 2,
    )
)

In [3]:
message = "담배값 2만원, 점심비용 1.5만원, 저녁 비용 2.3만원"
tags = ["담배", "커피", "편의점", "식비", "데이트", "교육"]

In [4]:
results = pipeline.analyze(message=message, tags=tags)

In [5]:
for e in results.expenses:
    print(e)

Expense-담배값-20000-[['담배']]
Expense-점심비용-15000-[['식비']]
Expense-저녁 비용-23000-[['식비']]
