# Getting Started with ‚ùÑÔ∏è Anthropic on Snowflake Cortex

Build an intelligent question-answering system using Anthropic's Claude and Snowflake's AI capabilities.‚ö°Ô∏è

This notebook demonstrates how to build an end-to-end application that:
1. Processes PDF documents using Cortex Process Docouments
2. Creates Cortex Search Service to do keyword and vector searches
3. Implements a chat interface using Snowflake's Cortex and Anthropic's Claude in Streamlit

Check out the [Quickstart](https://quickstarts.snowflake.com/guide/getting_started_on_anthropic_with_snowflake_cortex) for instructions on getting setup for this Notebook

## Setting Up Your Environment üéí

First, we'll import the required packages and set up our Snowflake session. The notebook uses several key packages:
- `streamlit`: For creating the interactive chat interface
- `snowflake-ml-python`: For Snowflake Cortex for embeddings and LLM capabilities


In [None]:
# Import python packages
import streamlit as st
import pandas as pd
import json

from snowflake.snowpark.context import get_active_session
from snowflake.cortex import complete, EmbedText768
from snowflake.snowpark.types import VectorType, FloatType
from snowflake.core.table import Table, TableColumn
from snowflake.core import CreateMode, Root
from snowflake.snowpark.functions import cast, col


session = get_active_session()
current_warehouse = session.get_current_warehouse()
database_name = session.get_current_database()
schema_name = session.get_current_schema()
role_name = session.get_current_role()
service_name = 'document_search_service'
root = Root(session)
database = root.databases[database_name]
schema = database.schemas[schema_name]


## Setting Up Stage Variables üìÅ

We'll define our stage name and retrieve the list of files to process. This stage should contain the PDF documents we want to analyze.

In [None]:
stage_name="@Documents"
files = session.sql(f"LIST{stage_name}").collect()

## Document Processing Functions üìÑ

We'll create functions to extract text from PDF files


In [None]:
def process(file_name: str):
    query = """
        SELECT TO_VARCHAR(
            SNOWFLAKE.CORTEX.PARSE_DOCUMENT(
                ?,
                ?,
                {'mode': 'OCR'}):content
        ) AS OCR;
    """

    resp = session.sql(query, params=[stage_name, file_name]).collect()
    text = resp[0]['OCR']
    
    df = pd.DataFrame({
        'TEXT' : [text],
        'FILE_NAME': file_name
    })
    
    return df

## Processing Documents

Now we'll:
1. Process all documents in our stage
2. Store the results in our table

In [None]:
# Extract file names and process files
file_names = [file['name'].split('/')[1] for file in files]

# Download and process files into a DataFrame
final_dataframe = pd.concat([
    process(file_name)
    for file_name in file_names
], ignore_index=True)

snowpark_df = session.create_dataframe(final_dataframe).select(
    col("file_name"),
    col("text")
)

# Write the transformed data directly to the target table
snowpark_df.write.mode("overwrite").save_as_table("docs_text_table")

## Create Cortex Search Service 

### Key Components Explained üìö

#### Required Parameters

- `ON`: Specifies the column containing the text to be indexed  
- `ATTRIBUTES`: Additional columns to include in search results (e.g., file\_name)  
- `WAREHOUSE`: Compute warehouse for processing the embeddings  
- `TARGET_LAG`: Maximum allowed lag for index updates  
- `EMBEDDING_MODEL`: Model used to generate text embeddings  
- Source query: The SELECT statement defining the data to index

#### Configuration Options üîß

1. Target Lag Settings:  
     
   - Shorter lag times mean more frequent updates  
   - Common values: '1 hour', '1 day', '1 week'  
   - Balance freshness needs with compute costs

   

2. Embedding Model Options:  
     
   - 'snowflake-arctic-embed-l-v2.0': Latest Snowflake embedding model  
   - Optimized for English language content  
   - 384-dimensional embeddings

   

3. Warehouse Considerations:  
     
   - Choose size based on data volume  
   - Consider compute costs vs update frequency  
   - Monitor warehouse utilization

In [None]:
CREATE OR REPLACE CORTEX SEARCH SERVICE {{service_name}}
  ON text
  ATTRIBUTES file_name
  WAREHOUSE = {{current_warehouse}}
  TARGET_LAG = '1 day'
  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
  AS (
    SELECT
        text,
        file_name
    FROM docs_text_table
);

## Building the Chat Interface üí¨

Finally, we'll create our chat interface that uses:
- Utilizes the Cortex Search Service for finding relevant context
- Chat history management for conversation continuity
- Anthropic's Claude model for generating responses
- Streamlit for the user interface

Key parameters:
- `num_results`: Number of context results provided (default: 3)
- `model_name`: Language model used (default: "claude-4-sonnet")
- `history_length`: Chat history length (default: 5)

In [None]:
import streamlit as st
import json
import time
import re
import requests
import pandas as pd
from snowflake.core import Root
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col
from snowflake.cortex import complete

# Snowflake session setup
session = get_active_session()
root = Root(session)
database_name = session.get_current_database()
schema_name = session.get_current_schema()
service_name = 'document_search_service'

# Configuration
model_name = "claude-haiku-4-5"
num_results = 3
history_length = 5
DEFAULT_CARBON_PRICE = 80.0
MIN_REQUEST_INTERVAL = 2.0
RETRY_DELAY = 5

DEFAULT_EMISSIONS = {
    "steel": 2.3,
    "aluminum": 8.6,
    "cement": 0.9,
    "fertilizer": 1.5,
    "electricity": 0.4
}

def fetch_live_carbon_price():
    """Fetch EU carbon price using regex from Trading Economics HTML."""
    try:
        url = "https://tradingeconomics.com/commodity/carbon"
        response = requests.get(url, timeout=5)
        match = re.search(r'<td[^>]*>EU Carbon Permits<\/td>\s*<td[^>]*>(\d+\.\d+)', response.text)
        if match:
            return float(match.group(1))
    except Exception as e:
        st.warning(f"‚ö†Ô∏è Could not fetch live carbon price: {e}")
    return DEFAULT_CARBON_PRICE

def write_live_price_to_stage(carbon_price):
    """Write live price to a .txt file and upload to @Documents stage."""
    content = f"""Live EU ETS Carbon Price: ‚Ç¨{carbon_price}/tonne CO‚ÇÇ as of today.
Source: Trading Economics (https://tradingeconomics.com/commodity/carbon)
This price should be used for CBAM cost calculations unless overridden by official EU guidance."""
    with open("/tmp/live_price.txt", "w") as f:
        f.write(content)
    session.file.put("/tmp/live_price.txt", "@Documents", overwrite=True)


def process(file_name: str):
    query = """
        SELECT TO_VARCHAR(SNOWFLAKE.CORTEX.PARSE_DOCUMENT(?, ?, {'mode': 'OCR'}):content) AS OCR;
    """
    resp = session.sql(query, params=["@Documents", file_name]).collect()
    text = resp[0]['OCR']
    return pd.DataFrame({'TEXT': [text], 'FILE_NAME': file_name})

def reindex_documents():
    files = session.sql("LIST @Documents").collect()
    file_names = [file['name'].split('/')[-1] for file in files]
    final_dataframe = pd.concat([process(name) for name in file_names], ignore_index=True)
    snowpark_df = session.create_dataframe(final_dataframe).select(col("FILE_NAME"), col("TEXT"))
    snowpark_df.write.mode("overwrite").save_as_table("docs_text_table")

    
def init_messages():
    if "messages" not in st.session_state:
        st.session_state.messages = []
    if "last_request_time" not in st.session_state:
        st.session_state.last_request_time = 0
    if "carbon_price" not in st.session_state:
        st.session_state.carbon_price = fetch_live_carbon_price()
        write_live_price_to_stage(st.session_state.carbon_price)
        reindex_documents()

def init_config_options():
    st.session_state.num_chat_messages = history_length
    col1, col2 = st.columns([3, 1])
    with col1:
        if st.button("Clear conversation"):
            st.session_state.messages = []
            st.rerun()
    with col2:
        st.caption(f"üí∂ Live EU ETS price: ‚Ç¨{st.session_state.carbon_price:.2f}/tCO‚ÇÇe")

    for message in st.session_state.messages:
        with st.chat_message(message["role"]):
            st.markdown(message["content"])

def get_chat_history():
    start_index = max(0, len(st.session_state.messages) - st.session_state.num_chat_messages)
    return st.session_state.messages[start_index : len(st.session_state.messages) - 1]

def extract_cbam_request(question):
    match = re.search(r"(\d+)\s+tons?\s+of\s+(\w+)", question, re.IGNORECASE)
    emissions_match = re.search(r"(\d+(\.\d+)?)\s*tCO2e", question, re.IGNORECASE)
    origin_price_match = re.search(r"(?:origin|paid).*?‚Ç¨?(\d+(\.\d+)?)", question, re.IGNORECASE)

    if match:
        quantity = int(match.group(1))
        product = match.group(2).lower()
    else:
        quantity = None
        product = None

    emissions = float(emissions_match.group(1)) if emissions_match else None
    origin_price = float(origin_price_match.group(1)) if origin_price_match else 0.0
    return product, quantity, emissions, origin_price

def calculate_cbam_cost(embedded_emissions, origin_carbon_price=0, eu_carbon_price=None):
    if eu_carbon_price is None:
        eu_carbon_price = st.session_state.carbon_price
    cbam_cost = embedded_emissions * (eu_carbon_price - origin_carbon_price)
    return max(0, cbam_cost)

def cortex_search(my_question):
    search_service = (root
        .databases[database_name]
        .schemas[schema_name]
        .cortex_search_services[service_name]
    )
    resp = search_service.search(
        query=my_question,
        columns=["text", "file_name"],
        limit=num_results
    )
    results = json.loads(resp.to_json())["results"]
    prompt_context = "\n\n".join([r["text"] for r in results]).replace("'", "")
    file_name = results[0]['file_name'] if results else "No source"
    return prompt_context[:8000], file_name

def format_chat_history(chat_history):
    return "\n".join([f"{m['role']}: {m['content']}" for m in chat_history[-3:]])

def create_prompt(user_question):
    chat_history = get_chat_history()
    history_str = format_chat_history(chat_history)
    prompt_context, file_name = cortex_search(user_question)

    prompt = f"""You are a CBAM (Carbon Border Adjustment Mechanism) specialist. Provide direct, concise answers using the provided documentation.

<context>
{prompt_context}
</context>

<chat_history>
{history_str}
</chat_history>

<question>
{user_question}
</question>

<instructions>
1. Answer directly from the provided documentation, including the indexed carbon price file.
2. Cite specific values, formulas, and guidance from the documents.
3. For CBAM calculations:
   - Use default emission values from context if actual emissions not provided
   - Formula: CBAM Cost = (Embedded Emissions tonnes CO2e) √ó (EU ETS price - Carbon Price Paid in Origin)
4. Keep responses under 150 words unless calculations require detail
5. Structure: Direct answer ‚Üí Key requirements ‚Üí Limitations (if any)
6. If info missing: State what's needed clearly
</instructions>

Response:"""
    return prompt, file_name


def complete_with_retry(model, prompt, retries=2):
    now = time.time()
    if now - st.session_state.last_request_time < MIN_REQUEST_INTERVAL:
        time.sleep(MIN_REQUEST_INTERVAL - (now - st.session_state.last_request_time))

    for attempt in range(retries):
        try:
            response = complete(model, prompt)
            st.session_state.last_request_time = time.time()
            return response
        except Exception as e:
            if attempt < retries - 1:
                time.sleep(RETRY_DELAY * (attempt + 1))
            else:
                st.error(f"‚ùå Request failed: {str(e)}")
                return None


def main():
    st.title("üåç CBAM Calculator & Documentation Assistant")
    init_messages()
    init_config_options()
    icons = {"assistant": "‚ùÑÔ∏è", "user": "üë§"}

    if question := st.chat_input("Ask about CBAM calculations, emissions, or requirements..."):
        st.session_state.messages.append({"role": "user", "content": question})

        with st.chat_message("user", avatar=icons["user"]):  # ‚úÖ colon added
            st.markdown(question)

        product, quantity, emissions, origin_price = extract_cbam_request(question)
        if product and quantity:
            if emissions is None:
                emissions = DEFAULT_EMISSIONS.get(product)
            if emissions is not None:
                total_emissions = emissions * quantity
                cbam_cost = calculate_cbam_cost(total_emissions, origin_price)
                response_text = f"üí∂ Estimated CBAM cost: ‚Ç¨{cbam_cost:.2f} for {quantity} tons of {product} (Emissions: {emissions} tCO‚ÇÇe/ton, Origin price: ‚Ç¨{origin_price}/t)"
                st.session_state.messages.append({"role": "assistant", "content": response_text})
                with st.chat_message("assistant", avatar=icons["assistant"]):
                    st.markdown(response_text)
                return

        with st.chat_message("assistant", avatar=icons["assistant"]):  # ‚úÖ colon added
            with st.spinner("Analyzing documentation..."):
                prompt, file_name = create_prompt(question)
                response = complete_with_retry(model_name, prompt)
                if response:
                    st.markdown(response)
                    st.caption(f"üìÑ Source: {file_name}")
                    st.session_state.messages.append({"role": "assistant", "content": response})

if __name__ == "__main__":
    main()
