In [None]:
CREATE WAREHOUSE iceberg_lab;
CREATE ROLE iceberg_lab;
CREATE DATABASE iceberg_lab;
CREATE SCHEMA iceberg_lab;
GRANT ALL ON DATABASE iceberg_lab TO ROLE iceberg_lab WITH GRANT OPTION;
GRANT ALL ON SCHEMA iceberg_lab.iceberg_lab TO ROLE iceberg_lab WITH GRANT OPTION;
GRANT ALL ON WAREHOUSE iceberg_lab TO ROLE iceberg_lab WITH GRANT OPTION;

In [None]:
CREATE USER iceberg_lab
    PASSWORD='<your desired password>',
    LOGIN_NAME='ICEBERG_LAB',
    MUST_CHANGE_PASSWORD=FALSE,
    DISABLED=FALSE,
    DEFAULT_WAREHOUSE='ICEBERG_LAB',
    DEFAULT_NAMESPACE='ICEBERG_LAB.ICEBERG_LAB',
    DEFAULT_ROLE='ICEBERG_LAB';

In [None]:
GRANT ROLE iceberg_lab TO USER iceberg_lab;
SET USERNAME=CURRENT_USER();
GRANT ROLE iceberg_lab TO USER IDENTIFIER($USERNAME);
GRANT ROLE accountadmin TO USER iceberg_lab;

## Create an Iceberg Table

### Create External Volume
Before you create an Iceberg table, you must have an external volume. An external volume is a Snowflake object that stores information about your cloud storage locations and identity and access management (IAM) entities (for example, IAM roles). Snowflake uses an external volume to establish a connection with your cloud storage in order to access Iceberg metadata and Parquet data.

To create an external volume, complete the instructions for your cloud storage service:

- [Accessing Amazon S3 using external volumes](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume-s3)
- [Accessing Google Cloud Storage using external volumes](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume-gcs)
- [Accessing Microsoft Azure Storage using external volumes](https://docs.snowflake.com/en/user-guide/tables-iceberg-configure-external-volume-azure)

In [None]:
USE ROLE accountadmin;

GRANT ALL ON EXTERNAL VOLUME iceberg_lab_vol TO ROLE iceberg_lab WITH GRANT OPTION;

In [None]:
USE ROLE iceberg_lab;
USE DATABASE iceberg_lab;
USE SCHEMA iceberg_lab;
CREATE OR REPLACE ICEBERG TABLE customer_iceberg (
    c_custkey INTEGER,
    c_name STRING,
    c_address STRING,
    c_nationkey INTEGER,
    c_phone STRING,
    c_acctbal INTEGER,
    c_mktsegment STRING,
    c_comment STRING
)  
    CATALOG='SNOWFLAKE'
    EXTERNAL_VOLUME='iceberg_lab_vol'
    BASE_LOCATION='iceberg_lab/iceberg_lab/customer_iceberg';

In [None]:
INSERT INTO customer_iceberg
  SELECT * FROM snowflake_sample_data.tpch_sf1.customer;

In [None]:
SELECT
    *
FROM customer_iceberg c
INNER JOIN snowflake_sample_data.tpch_sf1.nation n
    ON c.c_nationkey = n.n_nationkey;

In [None]:
INSERT INTO customer_iceberg
    SELECT
        *
    FROM snowflake_sample_data.tpch_sf1.customer
    LIMIT 5;

In [None]:
SELECT
    count(*) AS after_row_count,
    before_row_count
FROM customer_iceberg
JOIN (
        SELECT count(*) AS before_row_count
        FROM customer_iceberg AT(OFFSET => -60)
    )
    ON 1=1
GROUP BY 2;

## Governance on Iceberg Tables

In [None]:
USE ROLE accountadmin;
CREATE OR REPLACE ROLE tpch_us;
SET USERNAME=CURRENT_USER();
GRANT ROLE tpch_us TO USER IDENTIFIER($USERNAME);
CREATE OR REPLACE ROLE tpch_intl;
GRANT ROLE tpch_intl TO USER IDENTIFIER($USERNAME);

USE ROLE iceberg_lab;
USE DATABASE iceberg_lab;
USE SCHEMA iceberg_lab;

CREATE OR REPLACE ROW ACCESS POLICY rap_nation
AS (nation_key number) RETURNS BOOLEAN ->
  ('TPCH_US' = current_role() and nation_key = 24) OR
  ('TPCH_INTL' = current_role() and nation_key != 24)
;

ALTER ICEBERG TABLE customer_iceberg
ADD ROW ACCESS POLICY rap_nation ON (c_nationkey);

GRANT ALL ON DATABASE iceberg_lab TO ROLE tpch_intl;
GRANT ALL ON SCHEMA iceberg_lab.iceberg_lab TO ROLE tpch_intl;
GRANT ALL ON ICEBERG TABLE iceberg_lab.iceberg_lab.customer_iceberg TO ROLE tpch_intl;
GRANT ALL ON DATABASE iceberg_lab TO ROLE tpch_us;
GRANT ALL ON SCHEMA iceberg_lab.iceberg_lab TO ROLE tpch_us;
GRANT ALL ON ICEBERG TABLE iceberg_lab.iceberg_lab.customer_iceberg TO ROLE tpch_us;
GRANT USAGE ON EXTERNAL VOLUME iceberg_lab_vol TO ROLE tpch_intl;
GRANT USAGE ON EXTERNAL VOLUME iceberg_lab_vol TO ROLE tpch_us;
GRANT USAGE ON WAREHOUSE iceberg_lab TO ROLE tpch_us;
GRANT USAGE ON WAREHOUSE iceberg_lab TO ROLE tpch_intl;

In [None]:
USE ROLE tpch_intl;
USE WAREHOUSE iceberg_lab;
SELECT
    count(*)
FROM iceberg_lab.iceberg_lab.customer_iceberg;

In [None]:
USE ROLE tpch_us;
USE WAREHOUSE iceberg_lab;
SELECT
    count(*)
FROM iceberg_lab.iceberg_lab.customer_iceberg;

In [None]:
USE ROLE accountadmin;
CREATE OR REPLACE ROLE tpch_analyst;
SET USERNAME=CURRENT_USER();
GRANT ROLE tpch_analyst TO USER IDENTIFIER($USERNAME);

USE ROLE iceberg_lab;
ALTER ROW ACCESS POLICY rap_nation
SET body ->
  ('TPCH_US' = current_role() and nation_key = 24) or
  ('TPCH_INTL' = current_role() and nation_key != 24) or
  ('TPCH_ANALYST' = current_role()) or 
  ('ICEBERG_LAB' = current_role())
;

GRANT ALL ON DATABASE iceberg_lab TO ROLE tpch_analyst;
GRANT ALL ON SCHEMA iceberg_lab.iceberg_lab TO ROLE tpch_analyst;
GRANT ALL ON TABLE iceberg_lab.iceberg_lab.customer_iceberg TO ROLE tpch_analyst;
GRANT USAGE ON WAREHOUSE iceberg_lab TO ROLE tpch_analyst;
GRANT USAGE ON EXTERNAL VOLUME iceberg_lab_vol TO ROLE tpch_analyst;
USE ROLE iceberg_lab;

CREATE OR REPLACE MASKING POLICY pii_mask AS (val string) RETURNS string ->
    CASE
        WHEN 'TPCH_ANALYST' = current_role() THEN '*********'
        ELSE val
    END;

ALTER ICEBERG TABLE customer_iceberg MODIFY COLUMN c_name SET MASKING POLICY pii_mask;
ALTER ICEBERG TABLE customer_iceberg MODIFY COLUMN c_address SET MASKING POLICY pii_mask;
ALTER ICEBERG TABLE customer_iceberg MODIFY COLUMN c_phone SET MASKING POLICY pii_mask;

In [None]:
USE ROLE tpch_analyst;
SELECT
    *
FROM customer_iceberg;

## Transform Iceberg Tables

In [None]:
USE ROLE iceberg_lab;
USE DATABASE iceberg_lab;
USE SCHEMA iceberg_lab;

CREATE OR REPLACE ICEBERG TABLE orders_iceberg 
    CATALOG = 'SNOWFLAKE'
    EXTERNAL_VOLUME = 'iceberg_lab_vol'
    BASE_LOCATION = 'iceberg_lab/iceberg_lab/orders_iceberg'
    AS
    SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS;

CREATE OR REPLACE ICEBERG TABLE nation_iceberg 
    CATALOG = 'SNOWFLAKE'
    EXTERNAL_VOLUME = 'iceberg_lab_vol'
    BASE_LOCATION = 'iceberg_lab/iceberg_lab/nation_iceberg'
    AS
    SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.NATION;

In [None]:
CREATE OR REPLACE DYNAMIC ICEBERG TABLE nation_orders_iceberg
    TARGET_LAG = '1 minute'
    WAREHOUSE = ICEBERG_LAB
    CATALOG = 'SNOWFLAKE'
    EXTERNAL_VOLUME = 'iceberg_lab_vol'
    BASE_LOCATION = 'iceberg_lab/iceberg_lab/nation_orders_iceberg'
    AS
    SELECT
        n.n_regionkey AS regionkey,
        n.n_nationkey AS nationkey,
        n.n_name AS nation,
        c.c_custkey AS custkey,
        COUNT(o.o_orderkey) AS order_count,
        SUM(o.o_totalprice) AS total_price
    FROM ICEBERG_LAB.ICEBERG_LAB.ORDERS_ICEBERG o
    JOIN ICEBERG_LAB.ICEBERG_LAB.CUSTOMER_ICEBERG c
        ON o.o_custkey = c.c_custkey
    JOIN ICEBERG_LAB.ICEBERG_LAB.NATION_ICEBERG n
        ON c.c_nationkey = n.n_nationkey
    GROUP BY
        n.n_regionkey,
        n.n_nationkey,
        n.n_name,
        c.c_custkey
    ;

In [None]:
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col, rank
from snowflake.snowpark.window import Window

session = get_active_session()

db = "iceberg_lab"
schema = "iceberg_lab"

# Load the input table
df = session.table("ICEBERG_LAB.ICEBERG_LAB.NATION_ORDERS_ICEBERG")

# Define a window partitioned by nation, ordered by total_price descending
nation_window = Window.partition_by("nation").order_by(col("total_price").desc())

# Rank customers within each nation
df_ranked = df.with_column("nation_rank", rank().over(nation_window))

# Flag top 3 customers per nation as VIPs
df_vips = df_ranked.with_column("is_vip", (col("nation_rank") <= 3))

# Show the results
df_vips = df_vips.select("nationkey", "custkey", "total_price", "nation_rank", "is_vip")

output_table = "customer_vips_iceberg"

iceberg_config = {
    "external_volume": "iceberg_lab_vol",
    "catalog": "snowflake",
    "base_location": f"{db}/{schema}/{output_table}",
    "storage_serialization_policy": "COMPATIBLE",
}

df_vips.show()
df_vips.write.mode("overwrite").save_as_table(f"{output_table}", iceberg_config=iceberg_config)

## Sharing Iceberg Tables

In [None]:
USE ROLE accountadmin;
GRANT CREATE ACCOUNT ON ACCOUNT TO ROLE iceberg_lab;
USE ROLE ICEBERG_LAB;

In [None]:
USE ROLE iceberg_lab;
CREATE OR REPLACE SECURE VIEW nation_orders_v AS
SELECT
    nation,
    SUM(order_count) as order_count,
    SUM(total_price) as total_price
FROM nation_orders_iceberg
GROUP BY nation;

In [None]:
USE ROLE accountadmin;
GRANT CREATE SHARE ON ACCOUNT TO ROLE iceberg_lab;
USE ROLE iceberg_lab;

In [None]:
USE ROLE accountadmin;

CREATE OR REPLACE WAREHOUSE iceberg_lab_reader 
    WAREHOUSE_SIZE = XSMALL
    AUTO_SUSPEND = 1
    AUTO_RESUME = TRUE
    INITIALLY_SUSPENDED = TRUE;

In [None]:
SELECT *
FROM reader_iceberg_lab_nation_orders_shared_data.iceberg_lab.nation_orders_v
ORDER BY order_count DESC;

## Cleanup

In [None]:
USE ROLE iceberg_lab;
DROP SHARE iceberg_lab_nation_orders_shared_data;
DROP DATABASE iceberg_lab;
USE ROLE accountadmin;
DROP EXTERNAL VOLUME iceberg_lab_vol;
DROP USER iceberg_lab;
DROP ROLE iceberg_lab;
DROP ROLE tpch_us;
DROP ROLE tpch_intl;
DROP ROLE tpch_analyst;
DROP WAREHOUSE iceberg_lab;