# ❄️ Anthropic on Snowflake Cortex – End‑to‑End RAG Notebook

This single notebook walks through:

1. Setting up a Snowflake database/schema for RAG  
2. Ingesting PDF **and** CSV source documents  
3. Building a Cortex Search Service  
4. Creating a Standards Library table  
5. Auto‑classifying each standard as **Yes / No / Needs Review**  
6. (Optional) Launching a Streamlit review dashboard  

Cells marked **SQL** are meant to be executed in Snowflake Notebooks with the *SQL* language selector;  
cells marked **Python** run in the built‑in Snowpark kernel.


In [None]:
-- SECTION 0 – Prerequisites  (SQL)
USE ROLE ACCOUNTADMIN;            -- or another role with CREATE DB privs
CREATE DATABASE IF NOT EXISTS corp_rag;
CREATE SCHEMA   IF NOT EXISTS corp_rag;
USE DATABASE corp_rag;
USE SCHEMA   corp_rag;


In [None]:
# SECTION 1 – Python environment  (Python)
# Make sure the notebook has the packages below via the *Packages* pane:
#   snowflake-ml-python>=1.8, snowflake, streamlit

import pandas as pd, json, uuid, streamlit as st
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col
from snowflake.cortex import complete
from snowflake.core import Root

session       = get_active_session()
root          = Root(session)
warehouse     = session.get_current_warehouse()
database_name = session.get_current_database()
schema_name   = session.get_current_schema()
service_name  = "document_search_service"


In [None]:
-- SECTION 2 – Stage & list source files  (SQL)
CREATE STAGE IF NOT EXISTS docs_stage DIRECTORY = (ENABLE = TRUE);
-- Drag‑and‑drop PDFs / CSVs into: Databases ▸ corp_rag ▸ docs_stage ▸ Upload

LIST @docs_stage;


In [None]:
# SECTION 3 – Parse PDFs (Python)
from snowflake.snowpark.types import StringType

pdf_files = [row["name"].split("/")[1]
             for row in session.sql("LIST @docs_stage/*.pdf").collect()]

def parse_pdf(file_name: str):
    query = """
        SELECT TO_VARCHAR(
            SNOWFLAKE.CORTEX.PARSE_DOCUMENT(@docs_stage, ?, {'mode':'OCR'}):content
        ) AS text;
    """
    return (session.sql(query, params=[file_name])
                   .collect()[0]["TEXT"].replace("'", ""))

pdf_df = pd.DataFrame({
    "file_name": pdf_files,
    "text":      [parse_pdf(f) for f in pdf_files]
})


In [None]:
# SECTION 4 – Load CSVs (Python)
session.sql("""
    CREATE OR REPLACE FILE FORMAT csv_fmt
    TYPE = CSV FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1
""").collect()

csv_snow = (session.read.schema("variant")
                     .option("file_format", "csv_fmt")
                     .csv("@docs_stage/*.csv"))

csv_flat = (csv_snow
            .select(col("$1").cast(StringType()).alias("text"),
                    col("METADATA$FILENAME").alias("file_name")))

csv_df = csv_flat.to_pandas()

full_df = pd.concat([pdf_df, csv_df], ignore_index=True)


In [None]:
# SECTION 5 – Persist docs_text_table (Python)
(session.create_dataframe(full_df)
        .select(col("file_name"), col("text"))
        .write.mode("overwrite")
        .save_as_table("docs_text_table"))


In [None]:
-- SECTION 6 – Create / refresh Cortex Search Service  (SQL)
CREATE OR REPLACE CORTEX SEARCH SERVICE document_search_service
  ON text
  ATTRIBUTES file_name
  WAREHOUSE = ${warehouse}
  TARGET_LAG = '1 day'
  EMBEDDING_MODEL = 'snowflake-arctic-embed-l-v2.0'
  AS (
    SELECT text, file_name FROM docs_text_table
  );


In [None]:
-- SECTION 7 – Standards Library skeleton  (SQL)
CREATE OR REPLACE TABLE standards (
  id        INT AUTOINCREMENT,
  standard  STRING,
  answer    STRING,
  rationale STRING
);

INSERT INTO standards (standard) VALUES
 ('Does the report include an ISO‑27001 certificate?'),
 ('Is PII encrypted at rest?'),
 ('Is SOC 2 Type II coverage current?');


In [None]:
# SECTION 8 – Classify each standard (Python)
MODEL = "claude-3-5-sonnet"
svc = (root.databases[database_name]
           .schemas[schema_name]
           .cortex_search_services[service_name])

def classify(row):
    req = row.STANDARD
    ctx = (svc.search(query=req, columns=["text"], limit=6)
              .to_pandas()["text"].str.cat(sep="\n\n"))
    prompt = f"""You are an auditor. Decide if <context> shows <requirement> is met.
Return JSON {{\"answer\":\"Yes|No|Needs Review\",\"rationale\":"one line"}}.
<requirement>{req}</requirement>
<context>{ctx}</context>"""
    j = json.loads(complete(MODEL, prompt, temperature=0))
    return (row.ID, j["answer"], j["rationale"])

std_pdf = session.table("standards").to_pandas()
updates = pd.DataFrame([classify(r) for _, r in std_pdf.iterrows()],
                       columns=["ID","ANSWER","RATIONALE"])

session.write_pandas(updates, "tmp_updates", auto_create_table=True)

session.sql("""MERGE INTO standards t USING tmp_updates s ON s.ID=t.ID
              WHEN MATCHED THEN UPDATE SET answer=s.ANSWER, rationale=s.RATIONALE""").collect()

In [None]:
# SECTION 9 – Streamlit review app (Python)
# Save the following as streamlit/review_app.py if you want a dashboard:
review_code = """import streamlit as st
from snowflake.snowpark.context import get_active_session
session = get_active_session()
df = session.table('standards').filter("answer='Needs Review'").to_pandas()
st.title('🕵️ Needs‑Review Dashboard')
edited = st.data_editor(df, num_rows='dynamic',
     column_config={'answer': st.column_config.Selectbox('answer', options=['Yes','No','Needs Review'])})
if st.button('Save changes'):
    session.write_pandas(edited, 'tmp_review', auto_create_table=True)
    session.sql('MERGE INTO standards t USING tmp_review s ON s.ID=t.ID WHEN MATCHED THEN UPDATE SET answer=s.answer, rationale=s.rationale').collect()
    st.success('Updates saved!')"""
print(review_code)

In [None]:
# Save notebook metadata
print("Notebook cells populated. Ready to run!")