In [1]:
!pip install tag

Collecting tag
  Downloading tag-0.5.tar.gz (758 kB)
     ---------------------------------------- 0.0/758.7 kB ? eta -:--:--
     ------------------------------------- 758.7/758.7 kB 31.0 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting intervaltree>=3.0 (from tag)
  Downloading intervaltree-3.1.0.tar.gz (32 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting sortedcontainers<3.0,>=2.0 (from intervaltree>=3.0->tag)
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl (29 kB)
Building wheels for collected packages: tag, intervaltree
  Building wheel for tag (setup.py): started
  Building wheel for tag (setup.py): finished with status 'done'
  Created wheel for tag: filename=tag-0.5-py3-none-any.whl size=777731 sha256=320195d73ac53feacea8e0973ef08495a18d414c8934e7d12

In [2]:
from langchain.chains import ConversationChain
from langchain.prompts import PromptTemplate
from langchain.vectorstores import FAISS
from langchain_openai import ChatOpenAI
from langchain.sql_database import SQLDatabase
from langchain.chains.question_answering import load_qa_chain
from langchain_experimental.sql import SQLDatabaseChain

In [3]:
from dotenv import load_dotenv
load_dotenv()

True

In [4]:
# Step 1: LLM 초기화
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.5)

In [5]:
# Step 2: 필터링 (부동산 관련 여부 판단)
filter_prompt = PromptTemplate(
    input_variables=["question"],
    template="'{question}'이 부동산과 관련된 질문인지 여부를 'Yes' 또는 'No'로 답변하세요."
)

In [6]:
def is_real_estate_related(question: str) -> bool:
    response = llm(filter_prompt.format(question=question))
    return "Yes" in response

In [7]:
# Step 3: Entity 추출
entity_extraction_prompt = PromptTemplate(
    input_variables=["question"],
    template="'{question}'에서 필요한 부동산 관련 엔티티(예: 도시, 가격, 평수 등)를 추출하세요."
)

In [8]:

def extract_entities(question: str) -> list:
    response = llm(entity_extraction_prompt.format(question=question))
    return response.split(", ")  # 간단히 리스트로 변환

###
# Step 4: SQL 설정
db = SQLDatabase.from_uri("sqlite:///./data/real_estate_transactions.db")  # SQLite 예시
sql_chain = SQLDatabaseChain(llm=llm, database=db, verbose=True)

def query_sql(entities: dict) -> str:
    query = f"""
    SELECT * FROM real_estate_table
    WHERE city='{entities.get("city")}' 
      AND price <= {entities.get("price", 999999999)}
      AND size >= {entities.get("size", 0)};
    """
    return sql_chain.run(query)

# Step 5: 추가 질문 생성
missing_entity_prompt = PromptTemplate(
    input_variables=["missing_entities"],
    template="질문을 완성하기 위해 다음 엔티티를 포함한 추가 질문을 생성하세요: {missing_entities}"
)

def generate_followup_question(missing_entities: list) -> str:
    response = llm(missing_entity_prompt.format(missing_entities=", ".join(missing_entities)))
    return response

# Step 6: 전체 흐름 구현
def handle_question(question: str):
    # 1. 질문 필터링
    if not is_real_estate_related(question):
        return "부동산 관련 질문이 아닙니다. 다른 질문을 해주세요."

    # 2. Entity 추출
    entities = extract_entities(question)
    required_entities = {"city", "price", "size"}
    missing_entities = required_entities - set(entities)

    # 3. 부족한 정보가 있는 경우 추가 질문 요청
    if missing_entities:
        return generate_followup_question(missing_entities)

    # 4. SQL 실행 및 결과 반환
    result = query_sql(entities)
    return result or "검색된 결과가 없습니다."

# # Step 7: 벡터DB (추가적인 응답을 위한 설정)
# vector_store = FAISS.load_local("vector_db_index")

def query_vector_db(question: str):
    docs = vector_store.similarity_search(question, k=1)
    return docs[0].page_content if docs else "관련 정보를 찾을 수 없습니다."

# Step 8: 통합 핸들러
def main_handler(question: str):
    sql_response = handle_question(question)
    if "검색된 결과가 없습니다" in sql_response:
        vector_response = query_vector_db(question)
        return vector_response
    return sql_response


# 테스트
if __name__ == "__main__":
    question = "강남에서 전세 10억 이하 아파트를 찾고 싶어요."
    print(main_handler(question))


  response = llm(filter_prompt.format(question=question))


부동산 관련 질문이 아닙니다. 다른 질문을 해주세요.


In [13]:
!pip install graphviz

Collecting graphviz
  Using cached graphviz-0.20.3-py3-none-any.whl.metadata (12 kB)
Using cached graphviz-0.20.3-py3-none-any.whl (47 kB)
Installing collected packages: graphviz
Successfully installed graphviz-0.20.3


In [11]:
from langgraph.graph import StateGraph, END, START
from typing import TypedDict, Annotated, Sequence
import json

# 상태 타입 정의
class AgentState(TypedDict):
    messages: Sequence[tuple[str, str]]
    is_real_estate: bool
    entities: dict
    sql_result: str

# 엔티티 추출 프롬프트 수정
entity_extraction_prompt = PromptTemplate(
    input_variables=["question"],
    template="""다음 부동산 관련 질문에서 필요한 정보를 추출하여 정확히 JSON 형식으로만 응답하세요.

질문: '{question}'

반드시 다음 형식으로 JSON을 반환해주세요:
{{
    "city": "추출된 지역명 (예: 강남구)",
    "price": 추출된 금액(만원 단위 숫자),
    "size": 추출된 면적(없으면 0)
}}

예시:
질문: "강남구에서 전세 10억 이하 아파트 찾아줘"
응답: {{"city": "강남구", "price": 100000, "size": 0}}

참고사항:
- 금액은 반드시 만원 단위 숫자로 변환 (예: 10억 -> 100000)
- 지역명이 없으면 빈 문자열 ("")
- 면적이 없으면 0
- 다른 설명 없이 JSON만 반환

응답:"""
)

def check_real_estate(state: AgentState) -> AgentState:
    """부동산 관련 질문인지 확인"""
    question = state["messages"][-1][1]
    response = llm(filter_prompt.format(question=question))
    # AIMessage의 content 속성을 사용
    content = response.content if hasattr(response, 'content') else str(response)
    is_real_estate = "yes" in content.lower()
    return {"is_real_estate": is_real_estate}

def query_database(state: AgentState) -> AgentState:
    """데이터베이스 쿼리 실행"""
    entities = state["entities"]
    try:
        result = db.run(f"""
            SELECT 시군구, 단지명, "보증금(만원)", "전용면적(㎡)"
            FROM apartment_rent
            WHERE 시군구 LIKE '%{entities["city"]}%'
            AND CAST(REPLACE("보증금(만원)", ',', '') AS INTEGER) <= {entities["price"]}
            AND 전월세구분 = '전세'
            ORDER BY CAST(REPLACE("보증금(만원)", ',', '') AS INTEGER) ASC
            LIMIT 5
        """)
        return {"sql_result": result}
    except Exception as e:
        print(f"SQL 쿼리 실행 중 오류 발생: {str(e)}")
        return {"sql_result": "데이터를 찾을 수 없습니다."}

def extract_entities_node(state: AgentState) -> AgentState:
    """엔티티 추출"""
    question = state["messages"][-1][1]
    entities = extract_entities(question)  # 기존 extract_entities 함수 사용
    return {"entities": entities}

# 그래프 생성
workflow = StateGraph(AgentState)

# 노드 추가
workflow.add_node("check_real_estate", check_real_estate)
workflow.add_node("extract_entities", extract_entities_node)
workflow.add_node("query_database", query_database)

# 엣지 추가 (START 노드 포함)
workflow.add_edge(START, "check_real_estate")  # START에서 시작하는 엣지 추가
workflow.add_edge("check_real_estate", "extract_entities")
workflow.add_conditional_edges(
    "check_real_estate",
    lambda x: "extract_entities" if x["is_real_estate"] else END
)
workflow.add_edge("extract_entities", "query_database")
workflow.add_edge("query_database", END)  # 마지막 노드에서 END로 가는 엣지 추가

# 그래프 컴파일
app = workflow.compile()

# 실행 및 시각화
from graphviz import Digraph

def visualize_graph(workflow):
    dot = Digraph(comment='Real Estate Query Workflow')
    dot.attr(rankdir='LR')
    
    # 노드 추가
    dot.node('start', 'Start', shape='circle')
    dot.node('check', 'Check Real Estate', shape='box')
    dot.node('extract', 'Extract Entities', shape='box')
    dot.node('query', 'Query Database', shape='box')
    dot.node('end', 'End', shape='circle')
    
    # 엣지 추가
    dot.edge('start', 'check')
    dot.edge('check', 'extract', 'is real estate')
    dot.edge('check', 'end', 'not real estate')
    dot.edge('extract', 'query')
    dot.edge('query', 'end')
    
    return dot

# 그래프 시각화
graph = visualize_graph(workflow)
graph.render('real_estate_workflow', format='png', cleanup=True)

# 테스트
example_query = "강남구에서 전세 10억 이하 아파트를 찾고 싶어요."
result = app.invoke({
    "messages": [("user", example_query)],
    "is_real_estate": False,
    "entities": {},
    "sql_result": ""
})