# Configuration of session and API endpoints

In [1]:
import os
import snowflake.connector
from dotenv import load_dotenv
import json
import requests
import pandas as pd
from typing import Dict, List, Optional, Tuple, Union

load_dotenv()
# Panda settings
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_colwidth', None)  # or a specific number like 1000
pd.set_option('display.expand_frame_repr', False)

def get_snowflake_connection():  
    conn = snowflake.connector.connect(
        user=os.getenv("SNOWFLAKE_USER"),
        password=os.getenv("SNOWFLAKE_PASSWORD"),
        account=os.getenv("SNOWFLAKE_ACCOUNT"),
        warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
        database=os.getenv("SNOWFLAKE_DATABASE"),
        schema=os.getenv("SNOWFLAKE_SCHEMA"),
        host=os.getenv("SNOWFLAKE_HOST"),
        role=os.getenv("SNOWFLAKE_ROLE"),
        )
    return conn

# code for the analyst app 
# configurate the used API
AVAILABLE_SEMANTIC_MODELS_PATHS = [
    "DAYUSE_ANALYTICS.STREAMLIT_APPS.STREAMLIT_STAGE/customer_360.yaml"
]
API_ENDPOINT = "/api/v2/cortex/analyst/message"
FEEDBACK_API_ENDPOINT = "/api/v2/cortex/analyst/feedback"
API_TIMEOUT = 50000  # in milliseconds

conn = get_snowflake_connection()
cur = conn.cursor()
#test connection
print(cur.execute("select 1"))


<snowflake.connector.cursor.SnowflakeCursor object at 0x111e875c0>


# 🧪 Tests with connector functions

### Is cortex analytics app available trough this connection : YES

In [14]:
#construire le mesage en fonction du role et du contenu
def get_message(role, content:str):    
    m: dict[str, Any] = {}
    if role == "user":
        m["role"] = "user"
    else:
        m["role"] = "analyst"
    text_content = "\n".join([c for c in content if isinstance(c, str)])
    m["content"] = [{"type": "text", "text": text_content}]
    return m

# construire conversation en fonction du message
def get_conversation(messages: List[Dict]):
    # itère sur la liste de messages et construit la conversation
    return [get_message(m["role"], m["content"]) for m in messages]


# envoyer la requete à l'API
# only one messsage for the demo
def send_request(messages: List[Dict]):
    request_body = {
        "message": {"role": "user", "content": [{"type": "text", "text": "Hello, how are you?"}]},
        "semantic_model_file": f"@{AVAILABLE_SEMANTIC_MODELS_PATHS[0]}",
        "stream": True, 
    }
    print(request_body)
    resp = requests.post(
        url=f"https://{conn.host}/api/v2/cortex/analyst/message",
        json=request_body,
        headers={
            "Authorization": f'Snowflake Token="{conn.rest.token}"',
            "Content-Type": "application/json",
        },
        stream=True,
    )
    if resp.status_code < 400:
        return resp  # type: ignore
    else:
        raise Exception(f"Failed request with status {resp.status_code}: {resp.text}")
    

# create a test conversation
messages = [
    {"role": "user", "content": [{"type": "text", "text": "Hello, how are you?"}]},
    {"role": "analyst", "content": [{"type": "text", "text": "I'm fine, thank you!"}]},
    {"role": "user", "content": [{"type": "text", "text": "What is the capital of France ?"}]},
]

# get the response from the API
response = send_request()

{'message': {'role': 'user', 'content': [{'type': 'text', 'text': 'Hello, how are you?'}]}, 'semantic_model_file': '@DAYUSE_ANALYTICS.STREAMLIT_APPS.STREAMLIT_STAGE/customer_360.yaml', 'stream': True}


### Tests to do sql queries from the external env``


In [15]:
#test the sql query
sql_query = """ 
WITH __dim_order AS (  
  SELECT  
    customer_id,  
    hotel_id,  
    checkin_date  
  FROM dayuse_analytics.analytics.dim_order  
), __dim_hotel AS (  
  SELECT  
    hotel_id,  
    hotel_country,  
    city  
  FROM dayuse_analytics.analytics.dim_hotel  
), paris_stays AS (  
  SELECT DISTINCT  
    o.customer_id,  
    MAX(o.checkin_date) AS latest_checkin  
  FROM __dim_order AS o  
  LEFT OUTER JOIN __dim_hotel AS h  
    ON o.hotel_id = h.hotel_id  
  WHERE  
    h.city = 'Paris' AND h.hotel_country = 'France'  
  GROUP BY  
    o.customer_id  
)  
SELECT  
  ps.customer_id  
FROM paris_stays AS ps  
ORDER BY  
  ps.latest_checkin DESC NULLS LAST  
LIMIT 3
"""
print(cur.execute(sql_query).fetchall())


[(7816052,), (7799287,), (7820036,)]


### Tests to call the snowflake LLMs (LLMs running in Snowflake)

In [16]:
# call LLM via complete request via SQL request 
def call_cortex_complete(cursor, prompt: str, model: str = 'mistral-7b'):
    """Calls the Snowflake Cortex COMPLETE function with the given prompt and model."""
    try:
        # Using %s placeholders for model and prompt for security and correctness
        query = "SELECT SNOWFLAKE.CORTEX.COMPLETE(%s, %s) AS response;"
        # print(f"Executing Cortex query with model='{model}', prompt='{prompt[:100]}...'") # For debugging
        cursor.execute(query, (model, prompt))
        result = cursor.fetchone()
        if result and result[0]:
            return result[0]  # The response from COMPLETE is in the first column
        else:
            print("Cortex returned no result or an empty result.")
            return None
    except Exception as e:
        print(f"Error calling Cortex COMPLETE: {e}")
        if hasattr(e, 'sfqid'): # Snowflake query ID can be helpful for debugging
            print(f"Snowflake Query ID: {e.sfqid}")
        return None
    
call_cortex_complete(cur, 
                    """[
            {{'role': 'system', 'content': 'You are a helpful AI assistant that is writing some emails.' }},
            {{'role': 'user', 'content': '{placeholder}' }}""",
                    'mistral-7b'
                    )

KeyboardInterrupt: 

### Open AI LLM Use


In [63]:
import openai
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")

def openai_llm_call(prompt: str):
    response = openai.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content


response = openai_llm_call("Hello, how are you?")
print(response)


Hello! I'm just a computer program, so I don't have feelings, but I'm here and ready to help you. How can I assist you today?


# This is for when we're on the snowflake environment.

In [5]:
# -------on snowflake environment-------
# import _snowflake  # For interacting with Snowflake-specific APIs
# from snowflake.snowpark.context import get_active_session
# from snowflake.snowpark.exceptions import SnowparkSQLException
# from snowflake.cortex import Complete, ExtractAnswer, Sentiment, Summarize, Translate, ClassifyText
# session = get_active_session()

# LLM part

In [19]:
def complete_prompts(prompt_template, sql_template, placeholder_value):
    # une fonction qui retourne une sql query constitué du prompt template complété par le placeholder et du system prompt
    formated_prompt = prompt_template.format(placeholder = placeholder_value)
    formated_prompt_sql = sql_template.format(placeholder = formated_prompt)
    
    return formated_prompt, formated_prompt_sql

In [20]:
def llm_call(prompt_generate_mail : str):
    
    generated_response = Complete("llama2-70b-chat", )
    return generated_response

# Get SQL query and the DATAFRAME from its execussion

In [17]:
# def get_analyst_response(messages: List[Dict]):
def get_analyst_response(conv: List[Dict]):

    # test payload
    message_test = [{
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "which company had the most revenue?"
                }
            ]
        }]

    request_body = {
        "messages": conv,
        "semantic_model_file": f"@{AVAILABLE_SEMANTIC_MODELS_PATHS[0]}",
        "stream": False, 
    }
    print(request_body['messages'])

    resp = requests.post(
        url=f"https://{conn.host}/api/v2/cortex/analyst/message",
        json=request_body,
        headers={
            "Authorization": f'Snowflake Token="{conn.rest.token}"',
            "Content-Type": "application/json",
        }
    )

    parsed_response = json.loads(resp.text)  
    
    print(parsed_response)

    return parsed_response


In [18]:
analyst_response = get_analyst_response(create_user_payload("Give me 3 customer_ids of a customer that went in a hotel in Paris, France yesterday."))

[{'role': 'user', 'content': [{'type': 'text', 'text': 'Give me 3 customer_ids of a customer that went in a hotel in Paris, France yesterday.'}]}]
Response text: {
  "message" : {
    "role" : "analyst",
    "content" : [ {
      "type" : "text",
      "text" : "This is our interpretation of your question:\n\nGive me 3 customer_ids of customers who had a hotel booking in Paris, France with a check-in date of 2025-05-18 (yesterday)."
    }, {
      "type" : "sql",
      "statement" : "WITH __dim_order AS (\n  SELECT\n    customer_id,\n    hotel_id,\n    checkin_datetime\n  FROM dayuse_analytics.analytics.dim_order\n), __dim_hotel AS (\n  SELECT\n    hotel_id,\n    country,\n    city\n  FROM dayuse_analytics.analytics.dim_hotel\n)\nSELECT DISTINCT\n  o.customer_id\nFROM __dim_order AS o\nLEFT OUTER JOIN __dim_hotel AS h\n  ON o.hotel_id = h.hotel_id\nWHERE\n  h.city = 'Paris'\n  AND h.country = 'France'\n  AND TO_DATE(o.checkin_datetime) = DATEADD(DAY, -1, CURRENT_DATE)\nLIMIT 3\n -- Gen

In [19]:
def get_query_exec_result(analyst_response_json : dict):
    # puts the sql, the thought, the sql confidency, the dataframe at the right place
    # query_exec_result = [thought, sql_query, sql_confidence, df]
    
    query_exec_result=[]
    sql_query = ""
    sql_confidence = ""
    thought = ""
    content = analyst_response_json['message']['content']
    for item in content :
        if item['type'] == "sql":
            sql_query = item["statement"]
            sql_confidence = item["confidence"]
        elif item['type'] == "text":
            thought = item["text"]
    
    global session
    try:
        df = cur.execute(sql_query).fetchall()
        query_exec_result = [thought, sql_query, sql_confidence, df]
        return query_exec_result
    except SnowparkSQLException as e:
        return str(e)
    
get_query_exec_result(analyst_response)

['This is our interpretation of your question:\n\nGive me 3 customer_ids of customers who had a hotel booking in Paris, France with a check-in date of 2025-05-18 (yesterday).',
 "WITH __dim_order AS (\n  SELECT\n    customer_id,\n    hotel_id,\n    checkin_datetime\n  FROM dayuse_analytics.analytics.dim_order\n), __dim_hotel AS (\n  SELECT\n    hotel_id,\n    country,\n    city\n  FROM dayuse_analytics.analytics.dim_hotel\n)\nSELECT DISTINCT\n  o.customer_id\nFROM __dim_order AS o\nLEFT OUTER JOIN __dim_hotel AS h\n  ON o.hotel_id = h.hotel_id\nWHERE\n  h.city = 'Paris'\n  AND h.country = 'France'\n  AND TO_DATE(o.checkin_datetime) = DATEADD(DAY, -1, CURRENT_DATE)\nLIMIT 3\n -- Generated by Cortex Analyst\n;",
 {'verified_query_used': None},
 [(7802035,), (7803803,), (7613910,)]]

# Utils

In [20]:
def create_user_payload(user_text:str):    
    # Create the payload structure
    payload = [
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": user_text
                }
            ]
        }
    ]
    
    # Return the payload as a Python object
    return payload

In [30]:
def create_sql_llm_query(sql_template, placeholder, system_prompt):
    return sql_template.format(placeholder=placeholder, system_prompt=system_prompt)

## Utils - Iteration functions

In [31]:
def process_df_ids(df, prompt_template_reco):
    hotel_reco = []
    for index, row in df.iterrows():
        customer_id = row['CUSTOMER_ID']
        print(f"Processing row {index}: {customer_id}")
        
        # create the payload for a customer id
        placeholder_reco = prompt_template_reco.format(customer_id = customer_id)
        payload_reco = create_user_payload(placeholder_reco)
        print(payload_reco)
        
        # get response : hotel to recommand
        analyst_response_reco = get_analyst_response(payload_reco)  # inside first key "content"
        print(f"Analyst response for hotel recommendation for row {index}: {customer_id} : \n {analyst_response_reco}")
        query_exec_result_hotel_reco = get_query_exec_result(analyst_response_reco)
        
        # query_exec_result = [thought, sql_query, sql_confidence, df] for hotel reco
        print("---------- hotel recommendation res ----------")
        print(f"thought:\n {query_exec_result_hotel_reco[0]}\n")
        print(f"sql_query:\n {query_exec_result_hotel_reco[1]}\n")
        print(f"sql_confidence:\n {query_exec_result_hotel_reco[2]}\n")
        print(f"df_hotel_recommendation:\n{query_exec_result_hotel_reco[3]}\n")
        print("----------------------------------------------")

        hotel_reco.append({
            'customer_id': customer_id,
            'hotel_recommendation': query_exec_result_hotel_reco[3]  # get the right row of the df
        })

    return hotel_reco


In [32]:
def generate_batch_mail(list_customer_hotel, prompt_template_mail, sql_template_mail, system_prompt):
    # sql template mail is the sql query calling llm where we have to put the prompt 
    # prompt template mail is the template of the prompt sent to the llm where we  have to add, customer info and hotel info
    # list_customer_hotel contains the customer info and the hotel info of hotels to recommand to them
    
    generated_mails = []
    for index in list_customer_hotel:
        # get what I need to generate the mail
        customer_id = index["customer_id"]
        hotel_rec = index["hotel_recommendation"]

        #prepare the payload for the llm
        completed_mail_prompt = prompt_template_mail.format(hotel_info=hotel_rec, customer_info=customer_id)
        completed_sql_llm_query = create_sql_llm_query(sql_template_mail, completed_mail_prompt, system_prompt=system_prompt)
        
        # generate the mail for this customer
        result = session.sql(completed_sql_llm_query).collect()

        response_str = result[0]['RESPONSE']

        # Parse the JSON inside the string
        response_json = json.loads(response_str)
        
        # Extract the email content
        email_content = response_json["choices"][0]["messages"]
        
        generated_mails.append({
                    'customer_id': customer_id,
                    'mail': email_content
                })
        
    return generated_mails



# Prompt templates

In [51]:
# prompt templates
prompt_template_mail = """
Write an e-mail, recommanding these hotels:
HOTEL INFORMATION :
{hotel_info}

to the person whose information are precised here:
CUSTOMER INFORMATION :
{customer_info}.

Additionnal instructioin : 
You should only rely on the informations that you explicitelly know from the HOTEL INFORMATION. 
And only rely on the information present in CUSTOMER INFORMATION. Do not hallucinate data that is not in those sections.
I want you to call the customer by its first name.  
"""

prompt_target_customer = """Give me 3 customer_ids of customers that went in a hotel in Paris, France during the past 7 days. 
For each customer ID, also give the first name of the customer.
"""

# May use a ML model for the recommandatioin later instead of this SQL query
prompt_template_recommand = """Give me only 2 hotels to recommand to this customer : {customer_id}. 
The 2 hôtels have to be available during the subsequent 7 days and for a timeslot during which the client has already historically booked an offer.
For each hotel, give the subsequent informations : hotel name, time slot, type of room, number of stars.
"""

prompt_template_SQL = """
    SELECT SNOWFLAKE.CORTEX.COMPLETE(
        'llama2-70b-chat',
        [
            {{'role': 'system', 'content': 'You are a helpful AI assistant that is writing some emails.' }},
            {{'role': 'user', 'content': '{placeholder}' }}
        ], {{}}
    ) as response;
"""

# Process main

## Get customer targets : customer_ids list

In [13]:
# get the customer targets 
payload_customer_target = create_user_payload(prompt_target_customer)
analyst_response_customer_target = get_analyst_response(payload_customer_target) #inside first key "content"
print(f"Analyst response for targeted customers {analyst_response_customer_target}")
query_exec_result_customer_target = get_query_exec_result(analyst_response_customer_target)

# query_exec_result = [thought, sql_query, sql_confidence, df] for customer targets
print("---------- customer targets res ----------")
print(f"thougt :\n {query_exec_result_customer_target[0]}\n")
print(f"sql_query :\n {query_exec_result_customer_target[1]}\n")
print(f"sql_confidence :\n {query_exec_result_customer_target[2]}\n")
print(f"df customer targers: \n{query_exec_result_customer_target[3]}\n")
print("--------------------------------------------")

df_customer_target = query_exec_result_customer_target[3]


NameError: name '_snowflake' is not defined

## Iterate over customer_id list to get hotels to recommand

In [None]:
list_ids_rec = process_df_ids(df_customer_target, prompt_template_recommand)

## Generate email for each customer

In [53]:
system_prompt = "You are a helpful AI assistant that is writing some emails."
generated_mails = generate_batch_mail(list_ids_rec, prompt_template_mail, prompt_template_SQL)
print(generated_mails)

NameError: name 'generate_batch_mail' is not defined

In [None]:
generated_mails