In [None]:
from ml_collections import ConfigDict
from langchain_openai import ChatOpenAI
from langchain_community.utilities import SQLDatabase
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
import sqlite3
import pandas as pd
from transformers import AutoTokenizer
from langchain_core.runnables import RunnableLambda
from loguru import logger
import subprocess
import re
from langchain.llms.huggingface_endpoint import HuggingFaceEndpoint
from langchain_community.chat_models.huggingface import ChatHuggingFace

csv를 db로 저장

In [None]:
csv_file_path = 'data.csv'
sqlite_file_path = 'data.db'
# 이부분 한번에 줄 수 있는지 생각
table_name = 'test_table'
df = pd.read_csv(csv_file_path)
conn = sqlite3.connect(sqlite_file_path)
cursor = conn.cursor()
df.to_sql(table_name, conn, if_exists='replace', index=False)
conn.close()

기본 설정

In [None]:
db_path = "data.db"
db = SQLDatabase.from_uri(f"sqlite:///{db_path}")
schema = db.get_table_info()

# print(schema)

llm = ChatOpenAI(temperature=0, model_name='gpt-4')
TEMPLATE_MODEL = 'gpt'

In [None]:
def run_query(query):
    return db.run(query)


# execute_sql_query 함수는 주어진 데이터베이스 경로(db_path)와 SQL 쿼리(query)를 사용하여 데이터베이스에 접속하고, 쿼리를 실행한 후 결과를 반환
def execute_sql_query(db_path, query):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    cursor.execute(query)
    results = cursor.fetchall()
    columns = [description[0] for description in cursor.description]
    conn.close()
    return columns, results

def get_input_gpt(user_message):
    return user_message

def get_template(template,var="custom"):
    if var == "gpt":
        return get_input_gpt(template)

TemplateConfig = ConfigDict()

TemplateConfig.make_query_split = """
사용자 질문 : {query}
사용자의 질문을 두가지로 분리하십시오.
1. 데이터베이스에서 데이터를 가져오라는 요청문
2. 사용자의 요구사항

예시 : 1월12일자 데이터의 컬럼별 평균값을 구해줘
요청1 : 1월12일자 데이터를 가져오세요.
요청2 : 컬럼별 평균값을 구하세요.

요청1 : ...
요청2 : ...
"""

TemplateConfig.make_sql_query = """
사용자의 질문에 답변하기 위해 데이터베이스에서 데이터를 가져와야합니다.
테이블 스키마를 바탕으로 데이터를 가져오도록 SQL문을 생성하세요.
무조건 SQL문 만을 생성하세요.

table 이름은 test_table 입니다.
{schema}
 
Question: {query}
출력 양식 : ```sql\n<sql code>\n```"""



TemplateConfig.make_code = """
당신은 python을 이용한 데이터분석 전문가입니다.
분석에 사용될 데이터(data.csv)는 가지고있습니다.
사용자 질문에 답하기 위해, 분석에 필요한 python code를 작성해주세요.
무조건 마지막에 사용자 질문에 대한 답변을 (result.csv)로 저장하는 python code를 작성하세요.
무조건 python code만 생성하세요.
Table Schema : {schema}
Question: {query}
Column Information : {columns}
Data Row 1 : {sample}
출력 양식 : ```python\n<python code>\n```"""


TemplateConfig.select_visualization= """
아래 데이터는 '{query}'에 따른 데이터를 데이터베이스에서 가져온결과입니다.
추출한 데이터:
{context}

주어진 데이터의 특성을 가장 잘 표현할 수 있는 시각화 방법을 아래에서 한가지 선택하세요.
시각화 방법만 말하세요.

- 시각화 방법
1. TABLE : 범주형 데이터, 텍스트 데이터, 요약 통계 ex) 특정 시간대의 데이터 상태, 데이터 값
2. HEATMAP, BUBBLE, BAR : 빈도형 데이터, 연속형 데이터 간의 관계 ex) 온도, 압력, 출력 간의 상관관계 분석, 특정 이벤트 발생 빈도
3. SCATTER, TIMELINE, HISTOGRAM : 연속형 데이터, 시간 데이터  ex) 발전소의 일별 출력량 분포

답변: """

TemplateConfig.make_visualization_code = """
당신은 python을 이용한 데이터 시각화 전문가입니다.

plotly의 {query}을 사용하여 주어진 데이터프레임을 시각화하는 수준 높은 python code를 작성해주세요. 
무조건 python code만 생성하세요. 

다음 사항들을 반영하세요 :
1. 시각화의 유형에 관계없이 컬럼 넓이와 행 높이를 조절할 수 있도록 합니다.
2. 시각화에 제목을 추가합니다.
3. 시각화의 컬러 테마를 설정합니다.
4. 시각화의 요소 정렬 및 폰트 설정을 포함합니다.
5. 데이터프레임의 컬럼을 적절히 나누어 여러 개의 시각화로 표시합니다.
6. 시각화에 사용될 df는 가지고 있습니다.

데이터 프레임 :
{context}

출력 양식 : ```python\n<python code>\n```"""


TemplateConfig.data_analysis= """
아래 데이터는 '{query}'에 따라 데이터를 데이터베이스에서 가져온결과입니다.
추출한 데이터:
{context}

주어진 데이터의 특성을 가장 잘 표현할 수 있는 시각화 방법을 아래에서 한가지 선택하세요.
시각화 방법만 말하세요.

- 시각화 방법
1. TABLE : 범주형 데이터, 텍스트 데이터, 요약 통계 ex) 특정 시간대의 데이터 상태, 데이터 값
2. HEATMAP, BUBBLE, BAR : 빈도형 데이터, 연속형 데이터 간의 관계 ex) 온도, 압력, 출력 간의 상관관계 분석, 특정 이벤트 발생 빈도
3. SCATTER, TIMELINE, HISTOGRAM : 연속형 데이터, 시간 데이터  ex) 발전소의 일별 출력량 분포

답변: """


In [None]:
# 질의 분석 chain
def make_query_split_prompt(state)->ChatPromptTemplate:
    prompt = TemplateConfig.make_query_split.format_map({"query":state["query"]})
    prompt = get_template(prompt,TEMPLATE_MODEL)
    return prompt

def make_sql_query_prompt(state)->ChatPromptTemplate:
    prompt = TemplateConfig.make_sql_query.format_map({"query":state["query"],
                                                   "schema":state["schema"]})
    prompt = get_template(prompt,TEMPLATE_MODEL)
    return prompt

def make_code_prompt(state)->ChatPromptTemplate:
    prompt = TemplateConfig.make_code.format_map({"query":state["query"],
                                                  "schema":state["schema"],
                                                  "columns":state["columns"],
                                                  "sample":state["sample"]})
    prompt = get_template(prompt,TEMPLATE_MODEL)
    return prompt

def make_generate_prompt(state)->ChatPromptTemplate:
    prompt = TemplateConfig.generate.format_map({"query":state["query"],
                                                 "context":state["context"]})
    prompt = get_template(prompt,TEMPLATE_MODEL)
    return prompt

def select_visualization_prompt(state)->ChatPromptTemplate:
    prompt = TemplateConfig.select_visualization.format_map({"query":state["query"],
                                                 "context":state["context"]})
    prompt = get_template(prompt,TEMPLATE_MODEL)
    return prompt

def make_visualization_code_prompt(state)->ChatPromptTemplate:
    prompt = TemplateConfig.make_visualization_code.format_map({"query":state["query"],
                                                 "context":state["context"]})
    prompt = get_template(prompt,TEMPLATE_MODEL)
    return prompt


<h1> 체인생성

In [None]:
Chain__split_query = (RunnableLambda(make_query_split_prompt)| llm | StrOutputParser())
Chain__make_sql_query = (RunnableLambda(make_sql_query_prompt)| llm | StrOutputParser())
Chain__make_code = (RunnableLambda(make_code_prompt)| llm | StrOutputParser())
Chain__generate = (RunnableLambda(make_generate_prompt)| llm | StrOutputParser())


Chain__select_visualization = (RunnableLambda(select_visualization_prompt)| llm | StrOutputParser())
Chain__make_visualization_code = (RunnableLambda(make_visualization_code_prompt)| llm | StrOutputParser())

# 얘는 없고
# Chain__make_image = (RunnableLambda(make_image_prompt)| llm | StrOutputParser())

<h3> 사용자 요청문 분리 chain

In [None]:
QUERY = "3월 24일 데이터에 대해 분 단위로 변환하여 기초 통계정보를 알려주세요."
logger.debug("체인 실행 중..")

split_query = Chain__split_query.invoke({"query":QUERY}).strip()
# requests = re.split(r'요청\d+ : ', split_query)
requests = re.split(r'요청\s*\d+\s*:\s*', split_query)
requests = [req for req in requests if req]

DB_QUERY = requests[0]
REQ_QUERY = requests[1]
logger.info(f"DB 요청문 :{DB_QUERY}")
logger.info(f"사용자 요청문 :{REQ_QUERY}")


In [231]:
aaa = "3월 24일 데이터에 대해 분 단위로 변환하여 기초 통계정보를 알려주세요."

Chain__make_sql_query = (RunnableLambda(make_sql_query_prompt)| llm | StrOutputParser())

logger.debug("SQL문을 생성합니다...")
sql_query = Chain__make_sql_query.invoke({"query": aaa,
                                            "schema": schema}).strip()

2024-05-31 04:56:07,363 - root - DEBUG - SQL문을 생성합니다...
2024-05-31 04:56:07,363 - root - DEBUG - SQL문을 생성합니다...
2024-05-31 04:56:07,363 - root - DEBUG - SQL문을 생성합니다...
2024-05-31 04:56:07,363 - root - DEBUG - SQL문을 생성합니다...


In [232]:
print(sql_query)

```sql
SELECT 
    strftime('%Y-%m-%d %H:%M', datetime) as minute,
    AVG("GT GEN#1 KV") as avg_kv,
    AVG("GT GEN#1 KVOLT") as avg_kvolt,
    AVG("GT GEN#1 MVAR") as avg_mvar,
    AVG("GT GEN#1 MEGAWATT TARGET") as avg_megawatt_target,
    AVG("GT GEN#1 MEGAWATT TARGET.1") as avg_megawatt_target_1,
    AVG("GT#1 Generator Efficiency") as avg_generator_efficiency,
    AVG("GT#1 Generator Electronic Loss") as avg_generator_electronic_loss,
    AVG("GT#1 Generator Mechenical Loss") as avg_generator_mechenical_loss,
    AVG("GT#1 Generator Loss by Power Factor Curve") as avg_generator_loss_by_power_factor_curve,
    AVG("GT#1 Generator Loss by Power Factor Curve at Design Power Factor and Actual MW") as avg_generator_loss_by_power_factor_curve_at_design_power_factor_and_actual_mw,
    AVG("GT#1 Generator Exciter Loss by Curve") as avg_generator_exciter_loss_by_curve,
    AVG("GT#1 Generator Power Factor Actual") as avg_generator_power_factor_actual,
    AVG("GT#1 ppm NOx in Exhaust Gas"

<h3> 쿼리를 통해 데이터베이스를 조회하여 해당되는 데이터를 가져옴

In [None]:
import subprocess
import pandas as pd
import logging

# '''
# id	name
# 1	Alice
# 2	Bob
# 3	Charlie

# columns = ['id', 'name']
# result = [(1, 'Alice'), (2, 'Bob'), (3, 'Charlie')]
# '''
# logger.debug("SQL문을 생성합니다...")
# sql_query = Chain__make_sql_query.invoke({"query":DB_QUERY,
#                                           "schema":schema}).strip()
# logger.info(f"생성된 SQL문 :{sql_query}")
# sql_query = sql_query.strip('```sql').strip('```').strip()

# columns, result = execute_sql_query(db_path, sql_query)

# 로거 설정
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

# Suppress detailed logs from `httpx` and `openai` libraries
logging.getLogger('httpx').setLevel(logging.WARNING)
logging.getLogger('httpcore').setLevel(logging.WARNING)
logging.getLogger('openai').setLevel(logging.WARNING)

# SQL 쿼리 생성 및 실행 로직
def generate_and_execute_sql():
    logger.debug("SQL문을 생성합니다...")
    sql_query = Chain__make_sql_query.invoke({"query": DB_QUERY,
                                              "schema": schema}).strip()
    sql_query = sql_query.strip('```sql').strip('```').strip()
    logger.info(f"생성된 SQL문 : {sql_query}")
    
    columns, result = execute_sql_query(db_path, sql_query)
    return columns, result

# 파이썬 코드 생성 로직
def generate_python_code():
    logger.debug("결과 데이터 생성을 위한 파이썬 코드를 생성합니다...")
    python_code = Chain__make_code.invoke({"query": REQ_QUERY,
                                           "columns": columns,
                                           "schema": schema,
                                           "sample": result[0]}).strip()
    python_code = python_code.strip('```python').strip('```').strip()

    logger.info(f"생성된 파이썬코드 : {python_code}")
    with open('process_data.py', 'w') as file:
        file.write(python_code)

# SQL 쿼리 생성 및 실행 시도
try:
    columns, result = generate_and_execute_sql()
except Exception as e:
    logger.error("SQL 쿼리 생성 또는 실행 오류가 발생했습니다. 다시 시도합니다.", exc_info=True)
    columns, result = generate_and_execute_sql()

# 파이썬 코드 생성 시도
generate_python_code()

# subprocess 실행 및 예외 처리
try:
    result = subprocess.run(['python', 'process_data.py'], capture_output=True, text=True)
    result.check_returncode()  # subprocess가 실패하면 CalledProcessError가 발생합니다.
except subprocess.CalledProcessError as e:
    logger.error("코드 실행 오류가 발생했습니다. 파이썬 코드 생성을 다시 시도합니다.")
    generate_python_code()
    try:
        result = subprocess.run(['python', 'process_data.py'], capture_output=True, text=True)
        result.check_returncode()
    except subprocess.CalledProcessError as e:
        logger.critical("재시도 후에도 코드 실행 오류가 발생했습니다.", exc_info=True)
        raise

# 결과 데이터 로드
df = pd.read_csv("result.csv")

context = df.to_html(index=False)
logger.debug("HTML 컨텍스트 생성 완료")

<h1> 시각화 관련 체인

In [None]:

# context = df.to_string(index=False)  

# logger.debug(f"결과 데이터를 분석합니다...")
# analysis = Chain__analysis_data.invoke({"query":REQ_QUERY,
#                                         "context":context})
# logger.info(f"분석 결과 :{analysis}")

# logger.debug("시각화를 위한 파이썬 코드를 생성합니다...")
# python_code = Chain__make_image.invoke({"analysis":analysis,
#                                         "context":context}).strip()
# python_code = python_code.strip('```python').strip('```').strip()
# logger.info(f"생성된 파이썬코드 :{python_code}")
# with open('process_data.py', 'w') as file:
#     file.write(python_code)
# subprocess.run(['python', 'process_data.py'], capture_output=True, text=True)

# logger.info("답변을 생성합니다...")
# response = Chain__generate.invoke({"query":QUERY,
#                                    "context":context}).strip()

<h3> df -> 테이블 변환

In [None]:
# async for text in Chain__generate.astream({"query":QUERY,"context":context}):
# # async for text in Chain__generate.astream({"context":context}):
#     print(text, end="", flush=True)

In [None]:
logger.debug("데이터 시각화 선택 체인...")
test = Chain__select_visualization.invoke({"query":REQ_QUERY,
                                        "context":context}).strip()

print(test)

tool = test

from io import StringIO

df = pd.read_csv("result.csv")
logger.debug("데이터 시각화 선택 체인...")
aa = Chain__make_visualization_code.invoke({"query":tool,
                                        "context":df}).strip()


print(aa)

<h3> 데이터 시각화 코드 테스트

In [None]:
import plotly.graph_objects as go

# 데이터프레임을 분할하여 여러 개의 시각화를 생성합니다.
df1 = df.iloc[:, :10]
df2 = df.iloc[:, 10:20]
df3 = df.iloc[:, 20:]

# 시각화를 생성하는 함수를 정의합니다.
def create_table(df, title):
    fig = go.Figure(data=[go.Table(
        header=dict(values=list(df.columns),
                    fill_color='paleturquoise',
                    align='left',
                    font=dict(color='black', size=12),
                    line_color='darkslategray'),
        cells=dict(values=[df[col] for col in df.columns],
                   fill_color='lavender',
                   align='left',
                   height=30,
                   font=dict(color='black', size=12),
                   line_color='darkslategray'))
    ])

    # 시각화에 제목을 추가합니다.
    fig.update_layout(title=title, autosize=True)
    fig.show()

# 각 데이터프레임에 대해 시각화를 생성합니다.
create_table(df1, 'Table 1')
create_table(df2, 'Table 2')
create_table(df3, 'Table 3')

In [None]:
# print(df.head())
# 테이블 생성
def create_table(df, cols, title, width, height, columnwidth, rowheight):
    headerColor = '#D95204'
    rowEvenColor = '#F2BC57'
    rowOddColor = '#F2E0C9'

    if 'Unnamed: 0' not in cols:
        cols = ['Unnamed: 0'] + list(cols)
    
    fig = go.Figure(data=[go.Table(
        columnwidth=[columnwidth] * len(cols),
        header=dict(
            values=['<b>' + col + '</b>' for col in cols],
            line_color='white',
            fill_color=headerColor,
            # align=['left', 'center'],
            align='center',  
            font=dict(color='black', size=14)
        ),
        cells=dict(
            height=rowheight,
            values=[df[col].tolist() for col in cols],
            line_color='white',
            fill_color=[[rowOddColor, rowEvenColor] * (len(df) // 2 + 1)],
            # align=['left', 'center'],
            align='center',
            font=dict(color='black', size=12)
        )
    )])
    
    fig.update_layout(
        title_text=title,
        width=width,
        height=height
    )
    return fig


num_cols = len(df.columns)
cols_per_table = 4 
num_tables = (num_cols + cols_per_table - 1) // cols_per_table

for i in range(num_tables):
    start_col = i * cols_per_table
    end_col = min(start_col + cols_per_table, num_cols)
    cols = df.columns[start_col:end_col]
    title = f'Table {i+1}'
    fig = create_table(df, cols, title, width=1500, height=480, columnwidth=200, rowheight=30) 
    fig.show()

In [None]:
import plotly.express as px

fig = px.histogram(df, x="GT GEN#1 KV")
fig.show()


In [None]:
import plotly.express as px
import pandas as pd

# 예시 DataFrame 생성
data = {
    'country': ['한국', '일본', '중국'],
    'continent': ['Asia', 'Asia', 'Asia'],
    'year': [2007, 2007, 2007],
    '만족도': [43.828, 72.39, 72.961],
    'pop': [31889923, 190010647, 1318683096],
    'gdpPercap': [974.5803384, 9065.800825, 4959.114854]
}

df = pd.DataFrame(data)

# Create a scatter plot using Plotly
fig = px.scatter(
    df.query("year==2007"),
    x="gdpPercap",
    y="만족도",
    size="pop",
    color="continent",
    hover_name="country",
    log_x=True,
    size_max=60,
)
# Update the layout to change the font
fig.update_layout(
    font=dict(
        family="Nanum Gothic, sans-serif",  # 원하는 글꼴을 설정하세요. 예: "Arial, sans-serif", "Courier New, monospace", "Nanum Gothic, sans-serif"
        size=14,                     # 글씨 크기 설정
        color="black"                # 글씨 색상 설정
    )
)
# Display the plot in Jupyter Notebook
fig.show()

In [None]:
import plotly.express as px
import plotly.graph_objects as go
import ipywidgets as widgets
from IPython.display import display

# 데이터 로드
df = px.data.iris()  # 데이터를 자신의 데이터 소스로 변경 가능

# 초기 그래프 생성
fig = go.Figure(px.scatter(
    df, x="sepal_length", y="sepal_width", height=350,
    color="species", title="Playing with Fonts"))

# 색상 선택기 생성
font_color_picker = widgets.ColorPicker(
    concise=False,
    description='Font Color',
    value='#119DFF',
    disabled=False
)

title_color_picker = widgets.ColorPicker(
    concise=False,
    description='Title Color',
    value='#F71016',
    disabled=False
)

# 그래프 업데이트 함수
def update_graph(change):
    font_color = font_color_picker.value
    title_color = title_color_picker.value
    fig.update_layout(
        font_color=font_color,
        title_font_color=title_color
    )
    fig.show()

# 색상 선택기 이벤트 핸들러 연결
font_color_picker.observe(update_graph, names='value')
title_color_picker.observe(update_graph, names='value')

# 위젯과 그래프 표시
display(font_color_picker, title_color_picker)
fig.show()


In [None]:
import plotly.graph_objects as go

fig = go.Figure(data=[go.Table(
    header=dict(values=list(df.columns),
                fill_color='paleturquoise',
                align='left'),
    cells=dict(values=[df['Unnamed: 0'], df['GT GEN#1 KV'], df['GT GEN#1 KVOLT'], df['GT GEN#1 MVAR'], df['GT GEN#1 MEGAWATT TARGET'], df['GT GEN#1 MEGAWATT TARGET.1'], df['GT#1 Generator Efficiency'], df['GT#1 Generator Electronic Loss'], df['GT#1 Generator Mechenical Loss'], df['GT#1 Generator Loss by Power Factor Curve'], df['GT#1 Fuel Mass Flow'], df['GT#1 Generator Hydrogen Pressure'], df['GT#1 Compressor Discharge Pressure'], df['GT#1 Inlet Pressure Loss'], df['GT#1 Turbine Exhaust Draft Loss'], df['GT#1 NG WTR Pressure'], df['GT#1 Generator Cold Gas Temperature'], df['GT#1 Fuel Temperature'], df['GT#1 Fuel Inlet Temperature'], df['S-TBQC-035 In air tot press tr']],
               fill_color='lavender',
               align='left'))
])

fig.show()