# Create Snowflake Objects

Create database and warehouse.

In [None]:
CREATE OR REPLACE DATABASE ICEBERG_DEMO;
CREATE OR REPLACE WAREHOUSE demo_wh;

USE DATABASE ICEBERG;
USE WAREHOUSE demo_wh;

use role ACCOUNTADMIN;


# Setup Snowflake
## Create an External Volume
To create an external volume, complete the instructions for your cloud storage service:
- [Accessing Amazon S3 using external volumes](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume-s3)
- [Accessing Microsoft Azure Storage using external volumes](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume-azure)

Remember from the Overview section, your Snowflake account must be in the same region as your external volume location. And to use the Sentiment LLM function, [supported regions](https://docs.snowflake.com/en/user-guide/snowflake-cortex/llm-functions#availability) currently include:
- AWS US West 2 (Oregon)
- AWS US East 1 (N. Virginia)
- AWS Europe Central 1 (Frankfurt)
- Azure East US 2 (Virginia)
- Azure West Europe (Netherlands)

In [None]:
-- Use accountadmin role to create an external volume
USE ROLE accountadmin;

-- Create an external volume
CREATE OR REPLACE EXTERNAL VOLUME iceberg_cortex_vol
   STORAGE_LOCATIONS =
      (
         (
            NAME = 'my-s3-us-west-2'
            STORAGE_PROVIDER = 'S3'
            STORAGE_BASE_URL = 's3://iceberg-sentiment/'
            
            STORAGE_AWS_ROLE_ARN = '<your_aws_role_Arn>'      #'arn:aws:iam::719851637562:role/Snf-iceberg'
            STORAGE_AWS_EXTERNAL_ID ='<your_external_id>'     #'my_external_id'
         )
      );

## Create an Iceberg Table
Iceberg Tables can currently use Snowflake, AWS Glue, or object storage as the catalog. In public preview soon, Snowflake can use catalog integration with an Iceberg REST endpoint. In this quickstart, use Snowflake as the catalog to allow read and write operations to the table. More information about integrating catalogs can be found [here](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-catalog-integration).

Create an Iceberg Table referencing the external volume you just created. You can specify `BASE_LOCATION` to instruct Snowflake where to write table data and metadata, or leave empty to write data and metadata to the location specified in the external volume definition.

In [None]:
CREATE OR REPLACE ICEBERG TABLE iceberg_demo.public.product_reviews (
    id STRING,
    product_name STRING,
    product_id STRING,
    reviewer_name STRING,
    review_date DATE,
    review STRING,
    sentiment FLOAT
)
    CATALOG = 'SNOWFLAKE'
    EXTERNAL_VOLUME = 'iceberg_cortex_vol'
    BASE_LOCATION = 'demo/product_reviews/'
;

# Load CSV files into Iceberg via Snowpark Python
There are multiple ways to load new data into Snowflake-managed Iceberg Tables including INSERT, [COPY INTO](https://docs.snowflake.com/en/sql-reference/sql/copy-into-table), and [Snowpipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-auto).

For this quickstart, we will use Snowpark to write CSV files from dataframes into the Iceberg Table. Snowflake will write Parquet files and Iceberg metadata to your external volume.

First, create an external stage and file format.

In [None]:
-- Create a file format
CREATE OR REPLACE FILE FORMAT iceberg_demo.public.csv_ff
    TYPE = 'CSV'
    FIELD_OPTIONALLY_ENCLOSED_BY = '"'
    SKIP_HEADER = 1;

-- Create an external stage to read CSV files from an S3 bucket in-place
CREATE OR REPLACE STAGE iceberg_demo.public.files
    URL = 's3://sfquickstarts/iceberg_cortex/'
    FILE_FORMAT = iceberg_demo.public.csv_ff
    DIRECTORY = (ENABLE = TRUE);

In [None]:
# Import necessary modules and create a session
import json
from snowflake.snowpark import Session
import snowflake.snowpark.types as T

session = get_active_session()

In [None]:
# Create a schema Snowpark dataframe matching the CSV files
reviews_schema = T.StructType([T.StructField("ID", T.StringType()),
                               T.StructField("PRODUCT_NAME", T.StringType()),
                               T.StructField("PRODUCT_ID", T.StringType()),
                               T.StructField("REVIEWER_NAME", T.StringType()),
                               T.StructField("REVIEW_DATE", T.DateType()),
                               T.StructField("REVIEW", T.StringType()),
                               T.StructField("SENTIMENT", T.FloatType())])

In [None]:
# Read the January product reviews into a dataframe
jan_df = session.read \
    .schema(reviews_schema) \
    .option("skip_header", 1) \
    .option("field_optionally_enclosed_by", '"') \
    .csv("@iceberg_demo.public.files/product_reviews_jan_24.csv")

# View the dataframe
jan_df.show()

In [None]:
# Write the dataframe to the Iceberg Table
jan_df.write.mode("append").save_as_table("iceberg_demo.public.product_reviews")

In [None]:
SELECT * FROM iceberg_demo.public.product_reviews;

You now see metadata files and Parquet data files in your object storage, whether you’re using Amazon S3 or Azure storage.

![iceberg_files](https://github.com/Snowflake-Labs/sfquickstarts/blob/master/site/sfguides/src/cortex_ai_sentiment_iceberg/assets/iceberg_files.png)

# Snowflake Cortex LLM Functions
Now you can query the Iceberg Table using LLM functions from Snowflake Cortex AI. Run the query below to calculate sentiment scores for product reviews.

In [None]:
SELECT
    id,
    product_name,
    review_date,
    snowflake.cortex.sentiment(review) as review_sentiment
FROM iceberg_demo.public.product_reviews

In [None]:
-- Write the sentiment scores back to the Iceberg Table.
UPDATE iceberg_demo.public.product_reviews as pr
   SET sentiment = jan.review_sentiment
  FROM {{sql_reviews_select}} AS jan
 WHERE jan.id = pr.id;

# Create a CDC Pipeline
Suppose new product reviews continue to be generated, stored as new CSV files, and you'd like to use Snowflake to automatically compute sentiment scores on new product reviews.

[Streams on Directory Tables](https://docs.snowflake.com/en/user-guide/data-load-dirtables-pipeline) can detect new files in stages, perform computation, and store results. LLM functions from Snowflake Cortex can be called in these pipelines, writing results to Iceberg Tables.

To simulate this, create a Stream on the Iceberg Table to detect new product reviews loaded to the table. On a schedule, a Serverless Task will call the SENTIMENT function on to incrementally process new records.

In [None]:
-- Create a Stream to detect new product review records in the Iceberg Table
CREATE STREAM iceberg_demo.public.product_reviews_stream ON TABLE iceberg_demo.public.product_reviews;

-- Create a Serverless Task to add sentiment for new records from the Stream
CREATE OR REPLACE TASK iceberg_demo.public.cortex_sentiment_score
    SCHEDULE = 'USING CRON 0 0 * * * America/Los_Angeles'
    USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL'
AS
UPDATE iceberg_demo.public.product_reviews AS pr
   SET sentiment = snowflake.cortex.sentiment(prs.review)
  FROM iceberg_demo.public.product_reviews_stream AS prs
 WHERE prs.id = pr.id;

Now see the incremental processing pipeline in action. Create a dataframe for February product reviews and write it to the Iceberg Table.

In [None]:
feb_df = session.read \
    .schema(reviews_schema) \
    .option("skip_header", 1) \
    .option("field_optionally_enclosed_by", '"') \
    .csv("@iceberg_demo.public.files/product_reviews_feb_24.csv")

feb_df.write.mode("append").save_as_table("iceberg_demo.public.product_reviews")

The Task will execute on the specified schedule. Manually trigger the task to calculate sentiment scores for February product reviews, writing the results back to the Iceberg Table. Now, you should see the February product reviews and sentiment scores.

For example, for each product, what was the change in sentiment from January to February? Run the query below.

In [None]:
-- Manually trigger Task
EXECUTE task cortex_sentiment_score;

In [None]:
-- Sentiment change from January to February
WITH jan AS (
    SELECT
        product_name,
        AVG(sentiment) AS avg_sentiment
    FROM iceberg_demo.public.product_reviews
    WHERE MONTHNAME(review_date) = 'Jan'
    GROUP BY 1
)
, feb AS (
    SELECT
        product_name,
        AVG(sentiment) AS avg_sentiment
    FROM iceberg_demo.public.product_reviews
    WHERE MONTHNAME(review_date) = 'Feb'
    GROUP BY 1
)
SELECT
    COALESCE(j.product_name, f.product_name) AS product_name,
    j.avg_sentiment AS jan_sentiment,
    f.avg_sentiment AS feb_sentiment,
    feb_sentiment - jan_sentiment AS sentiment_diff
FROM jan j
FULL OUTER JOIN feb f
    ON j.product_name = f.product_name
ORDER BY sentiment_diff DESC;

In [None]:
import streamlit as st

st.bar_chart(sql_reviews.to_df(), x='PRODUCT_NAME', y='SENTIMENT_DIFF')