# 0. Connect to EMR Cluster with Engineer Runtime Role

<div class="alert alert-block alert-success">
In this section, we connect to EMR cluster and create Spark session with *data engineer* EMR runtime role, which is designed as a Lake Formation database and table creator. 
</div>

## 0.1 Install and load sagemaker studio extension

In [None]:
# %pip uninstall sagemaker-studio-analytics-extension -y

In [None]:
%pip install sagemaker-studio-analytics-extension==0.0.17

In [None]:
%load_ext sagemaker_studio_analytics_extension.magics

## 0.2 Get EMR cluster ID and EMR runtime role

In [None]:
%%sh

source ~/.bash_profile
ACCOUNT_ID=$(aws sts get-caller-identity --output text --query Account)
EMR_CLUSTER_ID=$(aws emr list-clusters --active  --query 'Clusters[?contains(Name,`emr-bootcamp-runtime-role-lf`)].Id' --output text)
echo "ACCOUNT_ID:   $ACCOUNT_ID"
echo "CLUSTER_ID:   $EMR_CLUSTER_ID"
echo "IAM_ARN:      $ENGINEER_ROLE"

## 0.3 Connect to EMR cluster with runtime role and create Spark Session



<div class="alert alert-block alert-warning">
<b>Note:</b> In case the following `sm_analytics emr connect` cell fails, we can ignore the error and continue running `%configure -f` cell
</div>

* Replace `<CLUSTER_ID>` and `<ENGINEER_ROLE_ARN>` with the corresponding output from the above cell. 

In [None]:
%sm_analytics emr connect \
--cluster-id <CLUSTER_ID> \
--auth-type Basic_Access \
--emr-execution-role-arn <ENGINEER_ROLE_ARN>

<div class="alert alert-block alert-warning">
<b>Note:</b> In case the following `%configure -f` cell fails, we can ignore the error and continue running the next cell
</div>

* Replace `<ACCOUNT_ID>` with your AWS accout ID
* Replace `<REGION>` with your region, e.g. `us-east-1`

In [None]:
%%configure -f
{
"conf":{
         "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension",
         "spark.sql.catalog.iceberg_catalog":"org.apache.iceberg.spark.SparkCatalog",
         "spark.sql.catalog.iceberg_catalog.warehouse":"s3://lf-datalake-<ACCOUNT_ID>-us-east-1/",
         "spark.sql.catalog.iceberg_catalog.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog", 
         "spark.sql.catalog.iceberg_catalog.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
         "spark.sql.catalog.iceberg_catalog.glue.account-id":"<ACCOUNT_ID>",
         "spark.sql.catalog.iceberg_catalog.glue.id":"<ACCOUNT_ID>",
         "spark.sql.catalog.iceberg_catalog.client.assume-role.region":"<REGION>",
         "spark.sql.catalog.iceberg_catalog.lf.managed":"true",
    
         "spark.dynamicAllocation.enabled": "true",
         "spark.dynamicAllocation.minExecutors": "3",
         "spark.dynamicAllocation.maxExecutors": "5"
        }
}

In [None]:
from datetime import datetime
from pyspark.sql.functions import col,lit, current_timestamp,unix_timestamp, min, when, desc, split

# 1. Config Parameters for Iceberg Data Lake 

<div class="alert alert-block alert-success">
In this section, we config the parameters for source data and iceberg database and table that will be created
</div>

<div class="alert alert-block alert-warning">
<b>Note:</b> Replace the following paramters
</div>

* Replace the following `<CODEBUCKET>` with `CODEBUCKET` (e.g. `emr-roadshow-appcodeXXXX-XXXXXXXX`) from CloudFormation stack **emr-roadshow**'s **Outputs** tab. 
* Replace `ACCOUNT-ID` with your account ID

In [None]:
SRC_S3_BUCKET_NAME = "<CODEBUCKET>"
LF_S3_BUCKET_NAME = "lf-datalake-<ACCOUNT-ID>-us-east-1"

In [None]:
VERSION = 1

# source data variables
SRC_DB_NAME = "tpcparquet"
SRC_TABLE_NAME = "dl_tpc_customer"
SRC_DATA_TEMP_VIEW = f"{SRC_TABLE_NAME}_view_{VERSION}"

SRC_DATA_S3 = f"s3://{SRC_S3_BUCKET_NAME}/data/{SRC_TABLE_NAME}"

PARTITION_FIELD = "c_birth_country"

# Iceberg variables
ICEBERG_CATALOG = "iceberg_catalog"
ICEBERG_DATABASE = f"emr_bootcamp_iceberg_db_{VERSION}"
ICEBERG_DATABASE_LOCATION = f"s3://{LF_S3_BUCKET_NAME}/{ICEBERG_DATABASE}"
ICEBERG_TABLE_NAME = f"emr_bootcamp_iceberg_sql_{SRC_TABLE_NAME}_{VERSION}"
ICEBERG_TABLE_LOCATION = f"{ICEBERG_DATABASE_LOCATION}/{ICEBERG_TABLE_NAME}"

In [None]:
# sparkmagic SQL configs
spark.conf.set('src_df_view', SRC_DATA_TEMP_VIEW)

spark.conf.set('iceberg_catalog', ICEBERG_CATALOG)
spark.conf.set('iceberg_db', ICEBERG_DATABASE)
spark.conf.set('iceberg_db_location', ICEBERG_DATABASE_LOCATION)
spark.conf.set('iceberg_table_name', ICEBERG_TABLE_NAME)
spark.conf.set('iceberg_table_location', ICEBERG_TABLE_LOCATION)
spark.conf.set('iceberg_partition_field', PARTITION_FIELD)

spark.conf.set('merge_table', 'merge_table')

print ("SRC_DATA_S3:                  "+SRC_DATA_S3)
print ("SRC_DATA_TEMP_VIEW:           "+SRC_DATA_TEMP_VIEW)
print ("src_df_view:                  "+SRC_DATA_TEMP_VIEW)
print ("iceberg_catalog:              "+ICEBERG_CATALOG)
print ("iceberg_db:                   "+ICEBERG_DATABASE)
print ("iceberg_table_name:           "+ICEBERG_TABLE_NAME)
print ("iceberg_db_location:          "+ICEBERG_DATABASE_LOCATION)
print ("iceberg_table_location:       "+ICEBERG_TABLE_LOCATION)

# 2. Create Iceberg DB and Table

<div class="alert alert-block alert-success">
In this section, we will create Iceberg database and tables
</div>

In [None]:
# Read source data

src_df = spark.read.parquet(SRC_DATA_S3)
src_df.createOrReplaceTempView(SRC_DATA_TEMP_VIEW)

In [None]:
%%sql

SELECT 
*
FROM ${src_df_view}
LIMIT 5

In [None]:
%%sql 

CREATE DATABASE IF NOT EXISTS ${iceberg_catalog}.${iceberg_db} LOCATION '${iceberg_db_location}';

In [None]:
# %%sql
# DROP TABLE IF EXISTS ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name};

In [None]:
%%sql 

CREATE TABLE IF NOT EXISTS ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name}(
    c_customer_id string,
    c_birth_country string,
    c_customer_sk INT,
    c_email_address string,
    c_first_name string,
    c_last_name string,
    ts BIGINT
) USING iceberg
LOCATION '${iceberg_table_location}'
PARTITIONED BY (${iceberg_partition_field});

In [None]:
%%sql

DESCRIBE ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name}

# 3. Insert First 1000 Records to Iceberg Table

<div class="alert alert-block alert-success">
In this section, we will insert a batch of records to created iceberg table for the first time.
</div>

In [None]:
src_df.filter(
    (col("c_birth_country") == 'CHINA') | (col("c_birth_country") == 'HONG KONG')
).groupBy(
    "c_birth_country"
).count().show(10, False)

In [None]:
%%sql

INSERT OVERWRITE ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name}
SELECT 
    c_customer_id, 
    c_birth_country,
    CAST(c_customer_sk AS INT),
    c_email_address,
    c_first_name,
    c_last_name,
    unix_timestamp(current_timestamp()) AS ts    
FROM ${src_df_view}
WHERE c_birth_country = 'HONG KONG' OR c_birth_country = 'CHINA' 
LIMIT 1000

In [None]:
%%sql

SELECT * FROM ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name} LIMIT 5

In [None]:
%%sql
-- total count should be 1000
SELECT 
    c_birth_country, 
    count(*) 
FROM ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name} 
GROUP BY c_birth_country;

# 4. Insert 6 extra Records to Iceberg Table

<div class="alert alert-block alert-success">
In this section, we will insert the second batch of records to created iceberg table.
</div>

In [None]:
# Read Source Data
src_df = spark.read.parquet(SRC_DATA_S3)
src_df.createOrReplaceTempView(SRC_DATA_TEMP_VIEW)

insert_into_view = "insert_into_view"

spark.conf.set('insert_into_view', insert_into_view)

In [None]:
%%sql


CREATE TEMPORARY VIEW ${insert_into_view} AS (
    (SELECT
        c_customer_id , 
        c_birth_country,
        c_customer_sk,
        c_email_address,
        c_first_name,
        c_last_name,
        unix_timestamp(current_timestamp()) AS ts
    FROM ${src_df_view}
    WHERE c_birth_country = 'INDIA' 
    LIMIT 3
    )
    
    UNION
    
    (SELECT
        c_customer_id , 
        c_birth_country,
        c_customer_sk,
        c_email_address,
        'MASKED' AS c_first_name,
        c_last_name,
        unix_timestamp(current_timestamp()) AS ts
    FROM ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name}
    WHERE c_birth_country = 'CHINA' 
    LIMIT 3
    )
);

In [None]:
%%sql

SELECT * FROM ${insert_into_view}

In [None]:
%%sql
INSERT INTO ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name}
SELECT 
    c_customer_id,
    c_birth_country,
    c_customer_sk,
    c_email_address,
    c_first_name,
    c_last_name,
    unix_timestamp(current_timestamp()) AS ts
FROM ${insert_into_view}

In [None]:
%%sql
-- expect two records with difference timestampes, representing 2 batches of inserts
-- if not 2 rows, execute the query again

SELECT ts, count(*) 
FROM ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name} 
GROUP BY ts;

In [None]:
%%sql
-- INDIA records were inserted in the 2nd batch. Both inserts include CHINA records

SELECT 
    c_birth_country, 
    count(*) 
FROM ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name} 
WHERE c_birth_country in ('INDIA' ,'CHINA')
GROUP BY c_birth_country;

In [None]:
%%sql
-- 3 INDIA records were inserted in the 2nd batch

SELECT *  
FROM ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name}
WHERE 
    c_birth_country = 'INDIA'

# 5. Time travel

<div class="alert alert-block alert-success">
In this section, we will try time travel feature of Iceberg
</div>

In [None]:
%%sql

SELECT *
FROM ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name}.history 

In [None]:
%%sql

SELECT *
FROM ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name}.snapshots 

<div class="alert alert-block alert-warning">
<b>Note:</b> Replace the snapshot_id from above queries for snapshots
</div>

* Replace `<FIRST_SNAPSHOT_ID>` with `snapshot_id` WITHOUT `parent_id`

* Replace `<SECOND_SNAPSHOT_ID>` with `snapshot_id` with `parent_id`

In [None]:
%%sql

SELECT 
    c_birth_country, 
    count(*) 
FROM ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name} FOR SYSTEM_VERSION AS OF <FIRST_SNAPSHOT_ID>
GROUP BY c_birth_country;

In [None]:
%%sql

SELECT 
    c_birth_country, 
    count(*) 
FROM ${iceberg_catalog}.${iceberg_db}.${iceberg_table_name} FOR SYSTEM_VERSION AS OF <SECOND_SNAPSHOT_ID>
GROUP BY c_birth_country;