# Build Text to SQL with Cortex Analyst


## Create the required roles to use Cortex Analyst

In [None]:
USE WAREHOUSE COMPUTE_WH;

USE ROLE SECURITYADMIN;

CREATE OR REPLACE ROLE cortex_user_role;
GRANT DATABASE ROLE SNOWFLAKE.CORTEX_USER TO ROLE cortex_user_role;

USE ROLE SECURITYADMIN;

GRANT ROLE cortex_user_role TO USER BUIVH;

## Create the required databases, schemas and warehouse

In [None]:
USE ROLE ACCOUNTADMIN;

CREATE OR REPLACE DATABASE cortex_analyst_demo;

CREATE OR REPLACE SCHEMA cortex_analyst_demo.revenue_timeseries;

CREATE OR REPLACE WAREHOUSE cortex_analyst_wh
    WAREHOUSE_SIZE = 'large'
    WAREHOUSE_TYPE = 'standard'
    AUTO_SUSPEND = 60
    AUTO_RESUME = TRUE
    INITIALLY_SUSPENDED = TRUE
COMMENT = 'Warehouse for Cortex Analyst demo';

GRANT USAGE ON WAREHOUSE cortex_analyst_wh TO ROLE cortex_user_role;
GRANT OPERATE ON WAREHOUSE cortex_analyst_wh TO ROLE cortex_user_role;

GRANT OWNERSHIP ON SCHEMA cortex_analyst_demo.revenue_timeseries TO ROLE cortex_user_role;
GRANT OWNERSHIP ON DATABASE cortex_analyst_demo TO ROLE cortex_user_role;

USE ROLE cortex_user_role;

USE WAREHOUSE cortex_analyst_wh;

USE DATABASE cortex_analyst_demo;
USE SCHEMA cortex_analyst_demo.revenue_timeseries;

CREATE OR REPLACE STAGE raw_data DIRECTORY = (ENABLE = TRUE);

/*--
â€¢ Fact and Dimension Table Creation
--*/

-- Fact table: daily_revenue
CREATE OR REPLACE TABLE cortex_analyst_demo.revenue_timeseries.daily_revenue (
    date DATE,
    revenue FLOAT,
    cogs FLOAT,
    forecasted_revenue FLOAT,
    product_id INT,
    region_id INT
);

-- Dimension table: product_dim
CREATE OR REPLACE TABLE cortex_analyst_demo.revenue_timeseries.product_dim (
    product_id INT,
    product_line VARCHAR(16777216)
);

-- Dimension table: region_dim
CREATE OR REPLACE TABLE cortex_analyst_demo.revenue_timeseries.region_dim (
    region_id INT,
    sales_region VARCHAR(16777216),
    state VARCHAR(16777216)
);

## Copy data from stage into tables

In [None]:
USE ROLE CORTEX_USER_ROLE;
USE DATABASE CORTEX_ANALYST_DEMO;
USE SCHEMA CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES;
USE WAREHOUSE CORTEX_ANALYST_WH;

COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.DAILY_REVENUE
FROM @raw_data
FILES = ('daily_revenue.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)

ON_ERROR=CONTINUE
FORCE = TRUE ;



COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.PRODUCT_DIM
FROM @raw_data
FILES = ('product.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)

ON_ERROR=CONTINUE
FORCE = TRUE ;



COPY INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.REGION_DIM
FROM @raw_data
FILES = ('region.csv')
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT=AUTO,
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT=AUTO
    EMPTY_FIELD_AS_NULL = FALSE
    error_on_column_count_mismatch=false
)

ON_ERROR=CONTINUE
FORCE = TRUE ;

In [None]:
from snowflake.cortex import complete

user_query = "What is the highest daily revenue recorded in a single day in each sales region?"

naive_text_to_sql_messages = [
        {
            'role': 'system',
            'content': 'You are a helpful assistant that writes SQL to answer natural language questions. Respond with only SQL'
        },
        {
            'role': 'user',
            'content': f'The user has posed a question, which is captured in: {user_query}. '
                       f'All tables are in the fully qualified snowflake schema: CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.'
                       f'The tables available that can be used by the SQL are:'
                       f'table: daily_revenue. columns: date, product_id, region_id, revenue, cogs, forecasted_revenue, daily_forecasted_revenue, daily_profit'
                       f'table: product_dim. columns: product_id, product_line'
                       f'table: region_dim. region_id, sales_region, state'
                       f'Please write valid snowflake SQL to answer the user question.'
        }
    ]
    

sql = complete("claude-3-5-sonnet", naive_text_to_sql_messages)

In [None]:
sql

In [None]:
session = get_active_session()

sql_output = session.sql(sql)

sql_output

## Using semantic models to share context with LLMs


In [None]:
semantic_model = """"""

semantic_model_name = """
name: Revenue
"""

semantic_model += semantic_model_name

semantic_model_revenue = """
tables:
  - name: daily_revenue
    description: Daily total revenue, aligned with daily "Cost of Goods Sold" (COGS), and forecasted revenue.
    base_table:
      database: cortex_analyst_demo
      schema: revenue_timeseries
      table: daily_revenue
    primary_key:
      columns:
        - date
        - product_id
        - region_id
    dimensions:
      - name: product_id
        expr: product_id
        data_type: number
      - name: region_id
        expr: region_id
        data_type: number
    time_dimensions:
      - name: date
        expr: date
        description: date with measures of revenue, COGS, and forecasted revenue.
        unique: true
        data_type: date
    measures:
      - name: daily_revenue
        expr: revenue
        description: total revenue for the given day
        synonyms: ["sales", "income"]
        default_aggregation: sum
        data_type: number
      - name: daily_cogs
        expr: cogs
        description: total cost of goods sold for the given day
        synonyms: ["cost", "expenditures"]
        default_aggregation: sum
        data_type: number
      - name: daily_forecasted_revenue
        expr: forecasted_revenue
        description: total forecasted revenue for a given day
        synonyms: ["forecasted sales", "forecasted income"]
        default_aggregation: sum
        data_type: number
      - name: daily_profit
        description: profit is the difference between revenue and expenses.
        expr: revenue - cogs
        data_type: number
      - name: daily_forecast_abs_error
        synonyms:
          - absolute error
          - L1
        description: absolute error between forecasted and actual revenue
        expr: abs(forecasted_revenue - revenue)
        data_type: number
        default_aggregation: avg
"""

semantic_model += semantic_model_revenue

In [None]:
from snowflake.cortex import complete

user_query = "What is the highest daily revenue recorded in a single day in each sales region?"

naive_text_to_sql_messages = [
        {
            'role': 'system',
            'content': 'You are a helpful assistant that writes snowflake SQL to answer natural language questions. Respond with only SQL'
        },
        {
            'role': 'user',
            'content': f'The user has posed a question, which is captured in: {user_query}. '
                       f'All tables are in the fully qualified snowflake schema: CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.'
                       f'{semantic_model}'
                       f'Please write valid snowflake SQL to answer the user question, and do not include fences.'
        }
    ]
    

sql = complete("claude-3-5-sonnet", naive_text_to_sql_messages)

In [None]:
sql

In [None]:

sql_output = session.sql(sql)

sql_output

In [None]:
semantic_model_region = """
  - name: region
    description: Region dimension table with unique region identifiers and geographic attributes.
    base_table:
      database: cortex_analyst_demo
      schema: revenue_timeseries
      table: region_dim
    primary_key:
      columns:
        - region_id
    dimensions:
      - name: region_id
        expr: region_id
        data_type: number
      - name: sales_region
        expr: sales_region
        description: Region associated with revenue
        data_type: varchar
        sample_values:
          - North America
          - Europe
          - Asia
          - South America
          - Africa
"""

semantic_model += semantic_model_region

In [None]:
semantic_model_region_relationships = """
relationships:
  - name: revenue_to_region
    left_table: daily_revenue
    right_table: region
    relationship_columns:
      - left_column: region_id
        right_column: region_id
    join_type: left_outer
    relationship_type: many_to_one
"""

semantic_model += semantic_model_region_relationships

In [None]:
semantic_model

In [None]:
from snowflake.cortex import complete

user_query = "What is the highest daily revenue recorded in a single day in each sales region?"

naive_text_to_sql_messages = [
        {
            'role': 'system',
            'content': 'You are a helpful assistant that writes snowflake SQL to answer natural language questions. Respond with only SQL'
        },
        {
            'role': 'user',
            'content': f'The user has posed a question, which is captured in: {user_query}. '
                       f'All tables are in the fully qualified snowflake schema: CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.'
                       f'{semantic_model}'
                       f'Please write valid snowflake SQL to answer the user question, and do not include fences.'
        }
    ]
    

sql = complete("claude-3-5-sonnet", naive_text_to_sql_messages)

sql

In [None]:
sql_output = session.sql(sql)

sql_output

## Call Cortex Analyst


In [None]:
import json
from typing import List
import _snowflake
from snowflake.snowpark.context import get_active_session

API_ENDPOINT = "/api/v2/cortex/analyst/message"
API_TIMEOUT = 50000  # in milliseconds

def get_sql_from_cortex_analyst(query: str) -> List[str]:
    """
    Calls Cortex Analyst with the user's query and returns any generated SQL statements.
    
    Returns:
        A list of SQL statements found in the Analyst's response (there can be more than one).
    """
    # Build the message list (only one user message for simplicity).
    messages = [
        {
            "role": "user",
            "content": [{"type": "text", "text": query}],
        }
    ]
    
    # Build the request body
    request_body = {
        "messages": messages,
        "semantic_model": semantic_model,
    }
    
    # Make the request to the Analyst API
    resp = _snowflake.send_snow_api_request(
        "POST",  # method
        API_ENDPOINT,  # path
        {},  # headers
        {},  # params
        request_body,  # body
        None,  # request_guid
        API_TIMEOUT,  # timeout in milliseconds
    )
    
    # Parse the response content
    parsed_content = json.loads(resp["content"])
    content_list = parsed_content["message"]["content"]

    # Extract only the SQL type response
    sql_statements = [item for item in content_list if item.get("type") == "sql"][0]
    return sql_statements

user_query = "What is the highest daily revenue recorded in a single day in each sales region?"
analyst_output = get_sql_from_cortex_analyst(user_query)

In [None]:
analyst_output

## Run the SQL


In [None]:
sql_output = session.sql(analyst_output["statement"])

sql_output

## Get to a natural language response


In [None]:
markdown_sql_output = sql_output.to_pandas().to_markdown(index=False)

In [None]:
markdown_sql_output

## Send the query and SQL output to the LLM

In [None]:
from snowflake.cortex import complete
import pandas as pd

sql_output = session.sql(analyst_output["statement"])

messages = [
        {
            'role': 'system',
            'content': 'You are a helpful assistant that sql output to answer natural language questions.'
        },
        {
            'role': 'user',
            'content': f'The user has posed a question, which is captured in: {user_query}. '
                       f'The question has been translated into a SQL statement, executed, and the results are found in:'
                       f'\n{sql_output.to_pandas().to_markdown(index=False)}'
                       f'Please answer the user question.'
        }
    ]

options = {
    'guardrails': True,
}

complete("claude-3-5-sonnet", messages, options = options)

## Put it together

Now we have a single method that can answer questions from tabular data!

In [None]:
def answer_question_using_analyst(query: str):
    # use cortex analyst to generate sql for the query
    analyst_output = get_sql_from_cortex_analyst(query)
    # execute sql
    sql_output = session.sql(analyst_output['statement'])
    # make the output LLM-readable
    markdown_sql_output = sql_output.to_pandas().to_markdown(index=False)
    # send query and sql results to LLM
    messages = [
        {
            'role': 'system',
            'content': 'You are a helpful assistant that sql output to answer natural language questions.'
        },
        {
            'role': 'user',
            'content': f'The user has posed a question, which is captured in: {user_query}. '
                       f'The question has been translated into a SQL statement, executed, and the results are found in:'
                       f'\n{markdown_sql_output}'
                       f'Please answer the user question.'
        }
    ]
    
    
    options = {
        'guardrails': True,
    }

    response = complete("claude-3-5-sonnet", messages, options = options)

    return response

In [None]:
answer_question_using_analyst("What is the highest daily revenue recorded in a single day in each sales region?")

## Expanding Scope

Now that Cortex Analyst can answer questions about revenue and break down by region.

In [None]:
semantic_model_product = """
tables:
  - name: product
    description: Product dimension table with unique product identifiers and attributes.
    base_table:
      database: cortex_analyst_demo
      schema: revenue_timeseries
      table: product_dim
    primary_key:
      columns:
        - product_id
    dimensions:
      - name: product_id
        expr: product_id
        data_type: number
      - name: product_line
        expr: product_line
        description: Product line associated with revenue
        data_type: varchar
        sample_values:
          - Electronics
          - Clothing
          - Home Appliances
          - Toys
          - Books
"""


semantic_model = semantic_model.replace("tables:", semantic_model_product)

In [None]:
semantic_model_product_relationships = """
relationships:
  - name: revenue_to_product
    left_table: daily_revenue
    right_table: product
    relationship_columns:
      - left_column: product_id
        right_column: product_id
    join_type: left_outer
    relationship_type: many_to_one
"""

semantic_model = semantic_model.replace("relationships:", semantic_model_product_relationships)

## Create a Cortex Search service to help Analyst



In [None]:
USE DATABASE cortex_analyst_demo;
USE SCHEMA revenue_timeseries;
use ROLE cortex_user_role;

CREATE OR REPLACE CORTEX SEARCH SERVICE product_line_search_service
  ON product_dimension
  WAREHOUSE = cortex_analyst_wh
  TARGET_LAG = '1 hour'
  AS (
      SELECT DISTINCT product_line AS product_dimension FROM product_dim
  );

In [None]:
semantic_model_product_search_service = """
tables:
  - name: product_dimension
    base_table:
      database: cortex_analyst_demo
      schema: revenue_timeseries
      table: product_dim

    dimensions:
      - name: product_line
        expr: product_line
        cortex_search_service_name: product_line_search_service
        data_type: varchar

"""

semantic_model = semantic_model.replace("tables:", semantic_model_product_search_service)

In [None]:
answer_question_using_analyst("What is the highest daily revenue recorded in a single day for each product line?")

In [None]:
semantic_model_custom_instructions = """
custom_instructions: "Always break results down by year"
"""

semantic_model += semantic_model_custom_instructions

In [None]:
answer_question_using_analyst("What is the highest daily revenue recorded in a single day for each product line?")

## Verified Queries

In [None]:
semantic_model_verified_queries = """
verified_queries:
  - name: "highest daily revenue by region"
    question: "What is the highest daily revenue recorded in a single day for each product line?"
    verified_at: 1738020395
    verified_by: josh
    sql: "
WITH __daily_revenue AS (
  SELECT
    region_id,
    date,
    revenue AS daily_revenue
  FROM cortex_analyst_demo.revenue_timeseries.daily_revenue
), __region AS (
  SELECT
    region_id,
    sales_region
  FROM cortex_analyst_demo.revenue_timeseries.region_dim
), daily_revenue_by_region AS (
  SELECT
    r.sales_region,
    d.date,
    SUM(d.daily_revenue) AS total_daily_revenue
  FROM __daily_revenue AS d
  LEFT OUTER JOIN __region AS r
    ON d.region_id = r.region_id
  GROUP BY
    r.sales_region,
    d.date
)
SELECT
  sales_region,
  MAX(total_daily_revenue) AS highest_daily_revenue,
  MIN(date) AS data_start_date,
  MAX(date) AS data_end_date
FROM daily_revenue_by_region
GROUP BY
  sales_region
ORDER BY
  highest_daily_revenue DESC NULLS LAST
 -- Generated by Cortex Analyst
;
"
"""

semantic_model += semantic_model_verified_queries

In [None]:
get_sql_from_cortex_analyst("What is the highest daily revenue recorded in a single day for each product line?")

## Using Semantic Model from stage


In [None]:
semantic_model

In [None]:
def get_sql_from_cortex_analyst(query: str) -> List[str]:
    """
    Calls Cortex Analyst with the user's query and returns any generated SQL statements.
    
    Returns:
        A list of SQL statements found in the Analyst's response (there can be more than one).
    """
    # Build the message list (only one user message for simplicity).
    messages = [
        {
            "role": "user",
            "content": [{"type": "text", "text": query}],
        }
    ]
    
    # Build the request body
    request_body = {
        "messages": messages,
        "semantic_model_file": '@"CORTEX_ANALYST_DEMO"."REVENUE_TIMESERIES"."RAW_DATA"/revenue_timeseries.yaml',
    }
    
    # Make the request to the Analyst API
    resp = _snowflake.send_snow_api_request(
        "POST",  # method
        API_ENDPOINT,  # path
        {},  # headers
        {},  # params
        request_body,  # body
        None,  # request_guid
        API_TIMEOUT,  # timeout in milliseconds
    )
    
    # Use resp.json() to get parsed JSON
    parsed_content = json.loads(resp["content"])

    return parsed_content

user_query = "What is the highest daily revenue recorded in a single day in each sales region?"
analyst_output = get_sql_from_cortex_analyst(user_query)

In [None]:
analyst_output