## Streamlit Conversational App 만들기 (Cortex Analyst)

이 단계에서는 Streamlit을 활용하여 자연어로 Snowflake 데이터를 분석하는 **Cortex Analyst** 앱을 생성하는 방법을 안내합니다.

---

### 📌 Streamlit 앱 생성 방법

아래 단계를 따라 Snowflake 내에 Streamlit 앱을 생성합니다:

1. Snowflake의 **Snowsight** 화면으로 이동합니다.
2. **Streamlit 탭**을 선택 후 **+ Streamlit App** 버튼을 클릭합니다.
3. 앱 이름을 입력하고, 기본 설정을 완료한 후 **Create** 버튼 클릭합니다.

![APP생성](https://quickstarts.snowflake.com/guide/getting_started_with_cortex_analyst/img/6d0a0ac354e05ba9.png)

---

### 📌 코드 붙여넣기 및 실행하기

1. Git 리포지토리에서 `Streamlit in Snowflake (SiS)` 앱 코드(`.py` 파일)를 확인합니다.
2. 로컬 코드 편집기로 열어서 전체 코드를 복사합니다.
3. Snowflake의 Streamlit 앱 에디터에 해당 코드를 붙여넣습니다.
![APP코드입력](https://quickstarts.snowflake.com/guide/getting_started_with_cortex_analyst/img/d9ad739cd45ed025.png)
4. 붙여넣은 후 **Run** 버튼을 클릭하여 앱을 실행합니다.

이제 간편하게 채팅 형태로 질문하여 데이터 분석을 수행할 수 있습니다.

예시 질문:
- **"지난 달 매출이 가장 높은 상품은 무엇인가요?"**
- **"특정 지역별 매출 추세는 어떠한가요?"**

### 📌 Cortex Analyst API 호출 함수 이해하기

Streamlit 앱의 핵심은 다음과 같은 Python 함수입니다:

### 📌 `get_analyst_response` 함수 이해하기

Streamlit Conversational 앱에서 정의된 **`get_analyst_response`** 함수는 사용자의 입력(자연어 질문)을 받아 Cortex Analyst API를 호출하는 핵심 역할을 합니다.


#### ⚙️ 함수가 하는 일:

1. **자연어 질문**과 대화 기록을 JSON 형태로 패키징합니다.
2. 미리 생성한 Semantic Model (`revenue_timeseries.yaml`)을 참조하여, 정확한 SQL 쿼리 생성을 요청합니다.
3. Cortex Analyst API로 요청을 전송한 후, 응답을 받아 분석 결과를 반환합니다.


#### ⚙️ 주요 역할:
1. 사용자 입력을 기반으로 정확한 분석 결과 생성
2. 자연어에서 SQL로 정확한 변환을 수행하도록 Cortex Analyst API와 소통
3. Semantic Model(.yaml 파일)을 기반으로 정확하고 효율적인 분석 수행 지원

In [None]:
def get_analyst_response(messages: List[Dict]) -> Tuple[Dict, Optional[str]]:
    """
    Send chat history to the Cortex Analyst API and return the response.

    Args:
        messages (List[Dict]): The conversation history.

    Returns:
        Optional[Dict]: The response from the Cortex Analyst API.
    """
    # Prepare the request body with the user's prompt
    request_body = {
        "messages": messages,
        "semantic_model_file": f"@{st.session_state.selected_semantic_model_path}",
    }

    # Send a POST request to the Cortex Analyst API endpoint
    # Adjusted to use positional arguments as per the API's requirement
    resp = _snowflake.send_snow_api_request(
        "POST",  # method
        API_ENDPOINT,  # path
        {},  # headers
        {},  # params
        request_body,  # body
        None,  # request_guid
        API_TIMEOUT,  # timeout in milliseconds
    )

    # Content is a string with serialized JSON object
    parsed_content = json.loads(resp["content"])

    # Check if the response is successful
    if resp["status"] < 400:
        # Return the content of the response as a JSON object
        return parsed_content, None
    else:
        # Craft readable error message
        error_msg = f"""
🚨 An Analyst API error has occurred 🚨

* response code: `{resp['status']}`
* request-id: `{parsed_content['request_id']}`
* error code: `{parsed_content['error_code']}`

Message: ```{parsed_content['message']}```
        """
        return parsed_content, error_msg

---

### 📌 Cortex Analyst API 호출 함수정리

In [None]:
"""
Cortex Analyst App
====================
이 앱은 사용자가 자연어를 사용하여 데이터를 인터랙티브하게 조회할 수 있도록 합니다.
"""

import json  # JSON 데이터 직렬화/역직렬화에 사용
import time   # 간단한 지연(딜레이) 구현에 사용
from datetime import datetime  # 타임스탬프 형식 변환에 사용
from typing import Dict, List, Optional, Tuple, Union  # 타입 힌트 제공

import _snowflake  # Snowflake API와 상호작용하기 위한 모듈
import pandas as pd  # 데이터 처리 및 DataFrame 구조 사용
import streamlit as st  # Streamlit 라이브러리로 웹 앱 구축
from snowflake.snowpark.context import get_active_session  # Snowflake 세션 획득
from snowflake.snowpark.exceptions import SnowparkSQLException  # SQL 실행 에러 처리

# 사용 가능한 시맨틱 모델 경로의 리스트
# 각 경로는 <DATABASE>.<SCHEMA>.<STAGE>/<FILE-NAME> 형식의 YAML 파일을 가리킵니다.
AVAILABLE_SEMANTIC_MODELS_PATHS = [
    "CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.RAW_DATA/revenue_timeseries.yaml"
]
# API 엔드포인트와 타임아웃 등의 상수 정의
API_ENDPOINT = "/api/v2/cortex/analyst/message"
FEEDBACK_API_ENDPOINT = "/api/v2/cortex/analyst/feedback"
API_TIMEOUT = 50000  # 밀리초 단위 타임아웃

# 전역 세션 객체 - Snowflake와의 연결을 위한 세션을 생성합니다.
session = get_active_session()


def main():
    """
    앱의 메인 실행 함수.
    - 세션 상태를 초기화하고, 헤더/사이드바를 표시하며,
      사용자의 입력을 받아 대화 내역을 업데이트합니다.
    """
    # 세션 상태 초기화 (대화 메시지, 추천, 경고 등)
    if "messages" not in st.session_state:
        reset_session_state()
    show_header_and_sidebar()
    # 첫 화면에서는 기본 질문을 보내어 앱 사용법 안내
    if len(st.session_state.messages) == 0:
        process_user_input("What questions can I ask?")
    display_conversation()
    handle_user_inputs()
    handle_error_notifications()
    display_warnings()


def reset_session_state():
    """
    세션 상태를 초기화하는 함수.
    자료구조:
      - st.session_state: Python의 dict 형태로 대화 메시지(리스트), 현재 추천값, 경고 메시지(리스트), 피드백 제출 내역(딕셔너리) 등을 저장합니다.
    """
    st.session_state.messages = []  # 대화 메시지를 저장할 리스트
    st.session_state.active_suggestion = None  # 사용자가 선택한 추천 문구
    st.session_state.warnings = []  # 경고 메시지를 저장할 리스트
    st.session_state.form_submitted = {}  # 요청별 피드백 제출 상태를 저장하는 딕셔너리


def show_header_and_sidebar():
    """
    앱의 헤더와 사이드바 UI를 구성합니다.
    - 헤더에서는 제목과 간단한 설명을 보여주며,
    - 사이드바에서는 시맨틱 모델 선택 및 채팅 기록 초기화 버튼을 제공합니다.
    """
    st.title("Cortex Analyst")
    st.markdown("Welcome to Cortex Analyst! Type your questions below to interact with your data. ")

    with st.sidebar:
        # 시맨틱 모델 경로를 선택하는 selectbox (리스트에서 파일 이름만 표시)
        st.selectbox(
            "Selected semantic model:",
            AVAILABLE_SEMANTIC_MODELS_PATHS,
            format_func=lambda s: s.split("/")[-1],
            key="selected_semantic_model_path",
            on_change=reset_session_state,  # 선택 변경 시 대화 기록 초기화
        )
        st.divider()
        # 중앙 정렬을 위한 컬럼 레이아웃 (Streamlit의 레이아웃 활용)
        _, btn_container, _ = st.columns([2, 6, 2])
        if btn_container.button("Clear Chat History", use_container_width=True):
            reset_session_state()


def handle_user_inputs():
    """
    사용자의 입력을 처리하는 함수.
    - 사용자가 채팅 입력란에 질문을 입력하면 process_user_input 호출
    - 추천 문구가 선택된 경우 해당 문구를 질문으로 처리합니다.
    """
    user_input = st.chat_input("What is your question?")
    if user_input:
        process_user_input(user_input)
    elif st.session_state.active_suggestion is not None:
        suggestion = st.session_state.active_suggestion
        st.session_state.active_suggestion = None
        process_user_input(suggestion)


def handle_error_notifications():
    """
    API 호출 중 발생한 오류를 사용자에게 토스트 메시지로 알립니다.
    """
    if st.session_state.get("fire_API_error_notify"):
        st.toast("An API error has occured!", icon="🚨")
        st.session_state["fire_API_error_notify"] = False


def process_user_input(prompt: str):
    """
    사용자의 입력(prompt)을 받아 대화 내역에 추가하고, Cortex Analyst API에 요청을 보냅니다.
    
    자료구조:
      - 새 메시지는 딕셔너리로 구성되며, "role"과 "content" 키를 가집니다.
      - st.session_state.messages는 대화 메시지 리스트로, 순서대로 저장됩니다.
      
    알고리즘:
      1. 입력 받은 prompt를 메시지 딕셔너리로 생성 후 대화 리스트에 추가
      2. 사용자 메시지를 즉시 화면에 출력
      3. API 요청을 위한 로딩 스피너를 표시하면서 get_analyst_response 함수 호출
      4. API 응답을 받아 대화 내역에 추가 후 화면 갱신(st.rerun)
    """
    st.session_state.warnings = []  # 새 요청 시작 시 이전 경고 초기화

    new_user_message = {
        "role": "user",
        "content": [{"type": "text", "text": prompt}],
    }
    st.session_state.messages.append(new_user_message)
    with st.chat_message("user"):
        user_msg_index = len(st.session_state.messages) - 1
        display_message(new_user_message["content"], user_msg_index)

    # 분석가(Assistant) 메시지 영역: API 호출 중 로딩 스피너 표시
    with st.chat_message("analyst"):
        with st.spinner("Waiting for Analyst's response..."):
            time.sleep(1)  # 인위적인 딜레이 (실제 API 호출 전 로딩 효과)
            response, error_msg = get_analyst_response(st.session_state.messages)
            if error_msg is None:
                analyst_message = {
                    "role": "analyst",
                    "content": response["message"]["content"],
                    "request_id": response["request_id"],
                }
            else:
                analyst_message = {
                    "role": "analyst",
                    "content": [{"type": "text", "text": error_msg}],
                    "request_id": response["request_id"],
                }
                st.session_state["fire_API_error_notify"] = True

            if "warnings" in response:
                st.session_state.warnings = response["warnings"]

            st.session_state.messages.append(analyst_message)
            st.rerun()  # 화면 전체를 갱신하여 최신 상태를 반영


def display_warnings():
    """
    저장된 경고 메시지를 화면에 출력합니다.
    자료구조: st.session_state.warnings는 각 경고를 딕셔너리 형태로 저장합니다.
    """
    warnings = st.session_state.warnings
    for warning in warnings:
        st.warning(warning["message"], icon="⚠️")


def get_analyst_response(messages: List[Dict]) -> Tuple[Dict, Optional[str]]:
    """
    현재까지의 대화 내역을 Cortex Analyst API에 전송하고 응답을 받아옵니다.
    
    자료구조:
      - messages: 대화 내역을 담고 있는 리스트 (각 항목은 딕셔너리)
      - request_body: API 요청 본문은 딕셔너리 형태로 구성
    알고리즘:
      1. 요청 본문 구성 (대화 메시지 + 시맨틱 모델 파일 위치)
      2. _snowflake.send_snow_api_request() 함수를 사용하여 POST 요청 전송
      3. 응답 상태에 따라 성공/오류 분기 처리 후 결과 리턴
    """
    request_body = {
        "messages": messages,
        "semantic_model_file": f"@{st.session_state.selected_semantic_model_path}",
    }

    resp = _snowflake.send_snow_api_request(
        "POST",  # HTTP 메소드
        API_ENDPOINT,  # API 경로
        {},  # 헤더
        {},  # URL 파라미터
        request_body,  # 요청 본문
        None,  # 요청 고유 ID (옵션)
        API_TIMEOUT,  # 타임아웃 (밀리초)
    )

    parsed_content = json.loads(resp["content"])

    if resp["status"] < 400:
        return parsed_content, None
    else:
        # 오류 메시지를 사람이 읽기 쉽게 구성
        error_msg = f"""
🚨 An Analyst API error has occurred 🚨

* response code: `{resp['status']}`
* request-id: `{parsed_content['request_id']}`
* error code: `{parsed_content['error_code']}`

Message:
