In [1]:
import psycopg2

def get_objects_and_columns():
    conn = psycopg2.connect(
        host='localhost',
        port=5432,
        dbname='postgres',
        user='postgres',
        password='keti1234!'
      )

    cur = conn.cursor()

    # 1) 테이블/뷰/머뷰 기본 정보
    cur.execute("""
    WITH rels AS (
      SELECT
        n.nspname        AS schema,
        c.relname        AS name,
        CASE c.relkind
          WHEN 'r' THEN 'TABLE'
          WHEN 'v' THEN 'VIEW'
          WHEN 'm' THEN 'MATERIALIZED VIEW'
          ELSE c.relkind::text
        END AS kind,
        pg_total_relation_size(c.oid) AS bytes,
        obj_description(c.oid, 'pg_class') AS comment,
        c.oid AS oid
      FROM pg_class c
      JOIN pg_namespace n ON n.oid = c.relnamespace
      WHERE n.nspname = 'public'
        AND c.relkind IN ('r','v','m')
    )
    SELECT schema, name, kind, pg_size_pretty(bytes), COALESCE(comment,''), oid
    FROM rels
    ORDER BY kind, name;
    """)
    objs = cur.fetchall()

    result = {}
    for schema, name, kind, size, comment, oid in objs:
        # 2) 컬럼 정보
        cur.execute("""
        SELECT a.attname, pg_catalog.format_type(a.atttypid, a.atttypmod),
               col_description(a.attrelid, a.attnum)
        FROM pg_attribute a
        WHERE a.attrelid = %s AND a.attnum > 0 AND NOT a.attisdropped
        ORDER BY a.attnum;
        """, (oid,))
        cols = [
            {
                "name": cname,
                "type": ctype,
                "comment": ccomment if ccomment else ""
            }
            for cname, ctype, ccomment in cur.fetchall()
        ]

        result[name] = {
            "kind": kind,
            "size": size,
            "comment": comment,
            "columns": cols
        }

    cur.close()
    conn.close()
    return result

In [2]:
schema_info = get_objects_and_columns()

In [3]:
schema_info

{'battery_performance_ranking': {'kind': 'MATERIALIZED VIEW',
  'size': '144 kB',
  'comment': '배터리 종합 성능 랭킹(100점 만점).\n구성: SOH(20) + 셀밸런싱(15) + 주행효율(20) + 충전효율(적응형,15) + 온도안정성(15) + 충전습관(15).\n세부 지표와 데이터 품질(표본 수) 포함. 동일 점수는 동일 순위, 차종 Unknown은 뒤로 정렬.',
  'columns': [{'name': 'clientid',
    'type': 'character varying(50)',
    'comment': '차량 식별자(가명/해시).'},
   {'name': 'car_type',
    'type': 'character varying',
    'comment': "차종(car_type 매핑). 없으면 'Unknown'."},
   {'name': 'model_year',
    'type': 'integer',
    'comment': '연식(년). car_type 매핑 원본 값(없으면 0).'},
   {'name': 'soh_total_score',
    'type': 'integer',
    'comment': 'SOH 점수(0~20): 평균 SOH 절대값(12) + SOH 안정성(8).'},
   {'name': 'cell_total_score',
    'type': 'integer',
    'comment': '셀 밸런싱 점수(0~15): 전압 편차 평균(10) + 편차 안정성(5).'},
   {'name': 'driving_total_score',
    'type': 'integer',
    'comment': '주행 효율 점수(0~20): SOC/km 효율(15) + 일관성(5). 주행 세그먼트 기반.'},
   {'name': 'charging_total_score',
    'type': 'numeric',
    'comment'

In [17]:
from dotenv import load_dotenv
load_dotenv()
from langchain_teddynote import logging
logging.langsmith("EV Performance")

LangSmith 추적을 시작합니다.
[프로젝트명]
EV Performance


In [2]:
import psycopg2

conn = psycopg2.connect(
    host='localhost',
    port=5432,
    dbname='postgres',
    user='postgres',
    password='keti1234!'
)


cur = conn.cursor()


# cur.execute(
#     """
#     SELECT table_name 
#     FROM information_schema.tables 
#     WHERE table_schema = 'public'
#     ORDER BY table_name
#     """
# )

cur.execute(
    """
SELECT c.relname AS object_name,
       d.description
FROM pg_class c
LEFT JOIN pg_description d ON d.objoid = c.oid
WHERE c.relkind IN ('r','m','v')  -- r=table, m=matview, v=view
  AND c.relnamespace = 'public'::regnamespace;
    """
)

tables = cur.fetchall()

table_list = [t[0] for t in tables]
print("Public 스키마 테이블 목록:", table_list)
    
cur.close()
conn.close()


Public 스키마 테이블 목록: ['bw_segment_states', 'battery_performance_ranking', 'bw_segments', 'bw_data', 'bw_dashboard', 'car_type', 'bw_vehicle_status', 'battery_performance_scores_per_segment', 'mv_dashboard_stats', 'mv_dashboard_car_type_stats', 'mv_bw_monthly', 'ev_battery_capacity', 'bw_charging_type']


In [3]:
print(response)


NameError: name 'response' is not defined

In [20]:
from typing_extensions import TypedDict, Annotated
from langgraph.graph.message import add_messages

from langgraph.graph import START, END, StateGraph
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.runnables import RunnableConfig
from langchain_teddynote.messages import invoke_graph, stream_graph, random_uuid
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import Literal
from langchain_core.prompts import ChatPromptTemplate

select_model = "gpt-oss:20b" #  FRONTEND 에서 넘어온 모델 정보

class EvState(TypedDict):
    messages: Annotated[list, add_messages]
    db_info: Annotated[list, "DB Info"]  # DB 정보
    user_question: Annotated[str, "Question"]  # 사용자 질문
    db_query:  Annotated[str, "DB Query"]  # DB 쿼리 생성
    db_result: Annotated[str, "DB Answer"]  # DB 쿼리 답변
    
    

class RouteQuery(BaseModel):

    # 데이터 소스 선택을 위한 리터럴 타입 필드
    binary_score: Literal["yes", "no"] = Field(
        ...,
        description="Given a user question, 전기차, 배터리등과 같은 질문이면 'yes'를 반환하고 그렇지 않으면 'no'를 반환하세요.",
    )


# router
def router_question_node(state: EvState) -> EvState:
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    structured_llm_router = llm.with_structured_output(RouteQuery)
    
    # 시스템 메시지와 사용자 질문을 포함한 프롬프트 템플릿 생성
    system = """You are an expert at routing a user question. 전기차, 배터리등과 같은 질문이면 'yes'를 반환하고 그렇지 않으면 'no'를 반환하세요."""

    # Routing 을 위한 프롬프트 템플릿 생성
    route_prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system),
            ("human", "{question}"),
        ]
    )

    # 프롬프트 템플릿과 구조화된 LLM 라우터를 결합하여 질문 라우터 생성
    question_router = route_prompt | structured_llm_router
    response = question_router.invoke(state["user_question"])
    
    if response.binary_score == "yes":
        return "db_search"
    else:
        return "general_answer"
# 노드

# 일반 답변 노드
def general_answer(state: EvState) -> EvState:
    
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", "당신은 친절한 상담사 KETI 입니다. 사용자의 질문에 대해 친절하게 답변해주세요."),
            ("user", "다음 질문에 대한 답변을 생성해주세요: {question}"),
        ]
    )
    chain = prompt | ChatOllama(model=select_model)
    
    response = chain.invoke({"question": state["user_question"]})
    return {"messages": response}


# DB 테이블 조회 노드

def db_search(state: EvState) -> EvState:
    cur = conn.cursor()

    cur.execute(
    """
    SELECT c.relname AS object_name,
        d.description
    FROM pg_class c
    LEFT JOIN pg_description d ON d.objoid = c.oid
    WHERE c.relkind IN ('r','m','v')  -- r=table, m=matview, v=view
    AND c.relnamespace = 'public'::regnamespace;
    """
    )

    tables = cur.fetchall()

    table_list = [t[0] for t in tables]
    print("Public 스키마 테이블 목록:", table_list)

    cur.close()
    conn.close()
    
    return {"db_info": table_list}




# DB 쿼리 생성 노드
def generate_query(state: EvState) -> EvState:
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a helpful assistant that generates 포스트그래 SQL queries. 다른 설명은 하지말고 쿼리문만 생성해"),
            ("user", "Generate a SQL query to answer the following question: {question}"),
        ]
    )
    chain = prompt | ChatOllama(model=select_model)
    response = chain.invoke({"question": state["user_question"]})
    return {"db_query": response.content}


# DB 조회 노드
def db_query(state: EvState) -> EvState:
    conn = psycopg2.connect(
        host='localhost',
        port=5432,
        dbname='postgres',
        user='postgres',
        password='keti1234!'
    )
    cur = conn.cursor()
    
    query = state["db_query"].replace("```sql", "").replace("```", "").strip()
    cur.execute(query)
    result = cur.fetchall()

    cur.close()
    conn.close()
    return {"db_result": result}

# 분석 답변 노드
def analyze_answer(state: EvState) -> EvState:
    
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", "DB에서 조회된 다음 내용을 보고 사용자가 질문을 분석한 내용을 답변하세요.  db_result:\n{db_result}"),
            ("user", "Generate a SQL query to answer the following question: {question}"),
        ]
        )
    chain = prompt | ChatOllama(model=select_model)
    response = chain.invoke({"question": state["user_question"],"db_result": state["db_result"] })

    
    return {"messages": response}






# langgraph.graph에서 StateGraph와 END를 가져옵니다.
workflow = StateGraph(EvState)

# 노드를 추가합니다.
workflow.add_node("router_question_node", router_question_node)
workflow.add_node("general_answer", general_answer)
workflow.add_node("db_gen_query", db_query)
workflow.add_node("generate_query", generate_query)
workflow.add_node("analyze_answer", analyze_answer)


workflow.add_conditional_edges(
    START,
    router_question_node,
    {
        "db_search": "generate_query",
        "general_answer": "general_answer"
    }
)
workflow.add_edge("generate_query", "db_gen_query")
workflow.add_edge("db_gen_query", "analyze_answer")
workflow.add_edge("analyze_answer", END)
workflow.add_edge("general_answer", END)


# 기록을 위한 메모리 저장소를 설정합니다.
memory = MemorySaver()

# 그래프를 컴파일합니다.
app= workflow.compile(checkpointer=memory)


# config 설정(재귀 최대 횟수, thread_id)
config = RunnableConfig(recursion_limit=20, configurable={"thread_id": random_uuid()})


# 질문 입력
inputs = EvState(user_question="bw_data 상위 열개 조회")

# 그래프 실행
invoke_graph(app, inputs, config)


🔄 Node: [1;36mgeneral_answer[0m 🔄
- - - - - - - - - - - - - - - - - - - - - - - - - 

**bw_data 상위 10개 행 조회하기**

아래 예시는 **SQL**과 **Python (pandas)** 두 가지 방법을 함께 안내해 드립니다.  
필요한 환경에 맞게 한 가지 방법을 선택해서 사용하시면 됩니다.

---

## 1️⃣ SQL 으로 조회 (MySQL / PostgreSQL / SQLite 등)

```sql
-- 가장 상위 10개의 행을 가져옵니다
SELECT *
FROM bw_data
LIMIT 10;          -- MySQL / PostgreSQL / SQLite
-- 또는
SELECT TOP 10 *
FROM bw_data;      -- SQL Server
```

- `LIMIT 10` : 첫 번째 10개 행을 가져옵니다.  
- `TOP 10` : SQL Server 전용 구문입니다.

필요하다면 정렬 기준을 지정해서 가장 최근 기록이 나올 수 있도록 할 수도 있습니다.

```sql
SELECT *
FROM bw_data
ORDER BY created_at DESC
LIMIT 10;
```

---

## 2️⃣ Python (pandas) 으로 조회

```python
import pandas as pd

# 예시: CSV 파일이 있는 경우
# df = pd.read_csv('bw_data.csv')

# 혹은 데이터베이스에서 바로 읽어올 때
# conn = ...  # DB 연결
# df = pd.read_sql('SELECT * FROM bw_data', conn)

# 상위 10개 행 확인
top10 = df.head(10)
print(top10)
```

- `df.head(10)` : DataFrame의 첫 10개 행을 반환합니다.  
- 데이터베이스에서 직접 읽어올 때는 `pd.read_sql()`을 사용하면 편리합니다.

---

## 3️⃣ Ju

In [22]:
response = invoke_graph(app, inputs, config)


🔄 Node: [1;36mgeneral_answer[0m 🔄
- - - - - - - - - - - - - - - - - - - - - - - - - 

저도 만나서 반가워요! 언제든 궁금한 점이나 도움이 필요하시면 편하게 말씀해 주세요. 😊


In [26]:
response = app.invoke(inputs, config)

response["messages"][-1].content

'안녕하세요! 만나서 반갑습니다 😊  \n저는 KETI 상담사입니다. 무엇을 도와드릴까요? 궁금한 점이나 필요하신 정보가 있으면 언제든 말씀해 주세요!'

In [21]:
# 질문 입력
inputs = EvState(user_question="만나서 반가워")

# 그래프 실행
invoke_graph(app, inputs, config)


🔄 Node: [1;36mgeneral_answer[0m 🔄
- - - - - - - - - - - - - - - - - - - - - - - - - 

안녕하세요! 만나서 반가워요 😊  
무엇을 도와드릴까요? 궁금한 점이나 필요한 내용이 있으면 언제든 말씀해 주세요!



🔄 Node: [1;36mgenerate_query[0m 🔄
- - - - - - - - - - - - - - - - - - - - - - - - - 
[1;32mdb_query[0m:
SELECT * FROM bw_data LIMIT 10;

🔄 Node: [1;36mdb_gen_query[0m 🔄
- - - - - - - - - - - - - - - - - - - - - - - - - 
('V012BE0021', datetime.datetime(2025, 2, 9, 4, 57, 23, 465000), 54784.0, 79.5, 100.0, 363.4, 17.6, 46682.0, 0.0, Decimal('1'), Decimal('0'), 4.04, 4.02, 4.025111111111107, 4.02, 7.0, 5.0, 6.2, 6.0, None, None, None, None, None, None, None, None, None)
('V012BE0021', datetime.datetime(2025, 2, 9, 4, 57, 26, 465000), 54784.0, 79.5, 100.0, 363.5, 17.6, 46682.0, 0.0, Decimal('1'), Decimal('0'), 4.04, 4.02, 4.024888888888885, 4.02, 7.0, 5.0, 6.2, 6.0, None, None, None, None, None, None, None, None, None)
('V012BE0021', datetime.datetime(2025, 2, 9, 4, 57, 29, 465000), 54784.0, 79.5, 100.0, 363.5, 17.6, 46682.0, 0.0, Decimal('1'), Decimal('0'), 4.04, 4.02, 4.025999999999995, 4.02, 7.0, 5.0, 6.2, 6.0, None, None, None, None, None, None, None, None, None)
('V012BE0021',