# 0.Notebook Setup With Hudi DB Creator EMR Runtime Role


In [None]:
%%configure -f
{ "conf": {
    "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar",
    "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
    "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension,com.amazonaws.emr.recordserver.connector.spark.sql.RecordServerSQLExtension",
    "spark.sql.catalog.spark_catalog.lf.managed":"true"
}}


In [None]:
# Import libraries

import os
from datetime import datetime

from pyspark.sql.functions import col,lit, current_timestamp,unix_timestamp, min, when, desc, split

## 0.1 Global variables setup

Go to 'CloudFormation'. Select the blog stack, and select 'Outputs' tab. Copy 'S3BucketName' value, and replace `<STACK-OUTPUTS-S3-BUCKET-NAME>` in the following cell.

In [None]:
S3_BUCKET_NAME = <"STACK-OUTPUTS-S3-BUCKET-NAME">

In [None]:
VERSION = 1

# source data variables
SRC_DB_NAME = "tpcparquet"
SRC_TABLE_NAME = "dl_tpc_customer"
SRC_DATA_TEMP_VIEWT = f"{SRC_TABLE_NAME}_view_{VERSION}"

SRC_DATA_S3 = os.path.join(
    "s3://",
    S3_BUCKET_NAME,
    "shared_datasets",
    SRC_DB_NAME,
    SRC_TABLE_NAME
)

RECORD_KEY = "c_customer_id"
PARTITION_FIELD = "c_birth_country"
PRECOMBINE_FIELD = "ts"

# target Hudi data variables
HUDI_CATALOG = "spark_catalog"
HUDI_DATABASE = f"rsv2_blog_hudi_db_{VERSION}"
HUDI_DATABASE_LOCATION = os.path.join(
    "s3://",
    S3_BUCKET_NAME,
    HUDI_DATABASE
)

COW_TABLE_NAME_SQL = f"rsv2_blog_hudi_cow_sql_{SRC_TABLE_NAME}_{VERSION}"
COW_TABLE_LOCATION_SQL = os.path.join(
    HUDI_DATABASE_LOCATION,
    COW_TABLE_NAME_SQL
)

MOR_TABLE_NAME_SQL = f"rsv2_blog_hudi_mor_sql_{SRC_TABLE_NAME}_{VERSION}"
MOR_TABLE_LOCATION_SQL = os.path.join(
    HUDI_DATABASE_LOCATION,
    MOR_TABLE_NAME_SQL
)

## 0.2 Spark variables setup

In [None]:
# sparkmagic SQL configs
spark.conf.set('src_df_view', SRC_DATA_TEMP_VIEWT)

spark.conf.set('hudi_catalog', HUDI_CATALOG)
spark.conf.set('hudi_db', HUDI_DATABASE)
spark.conf.set('hudi_db_location', HUDI_DATABASE_LOCATION)

spark.conf.set('cow_table_name_sql', COW_TABLE_NAME_SQL)
spark.conf.set('cow_table_location_sql', COW_TABLE_LOCATION_SQL)
spark.conf.set('merge_table', 'merge_table')
spark.conf.set('mor_table_name_sql', MOR_TABLE_NAME_SQL)
spark.conf.set('mor_table_location_sql', MOR_TABLE_LOCATION_SQL)

spark.conf.set('hudi_primary_key', RECORD_KEY)
spark.conf.set('hudi_pre_combined_field', PRECOMBINE_FIELD)
spark.conf.set('hudi_partitioin_field', PARTITION_FIELD)

# 1 Create Hudi DB via Lake Formation

***Please do the following steps in Blog "Create Hudi DB and Tables In Lake Formation" Section before runing the following cells***

In [None]:
%%sql 

CREATE DATABASE IF NOT EXISTS ${hudi_catalog}.${hudi_db} LOCATION '${hudi_db_location}';

# 2. Hudi DB Creator CoW DDL & DML with Spark SQL

## 2.1 Create Hudi CoW table using Spark SQL

We can drop the table using the following SQL statement 
```
%%sql

DROP TABLE IF EXISTS ${hudi_catalog}.${hudi_db}.${cow_table_name_sql};
```

In [None]:
%%sql 

CREATE TABLE IF NOT EXISTS ${hudi_catalog}.${hudi_db}.${cow_table_name_sql}(
    c_customer_id string,
    c_birth_country string,
    c_customer_sk integer,
    c_email_address string,
    c_first_name string,
    c_last_name string,
    ts bigint
) USING hudi
LOCATION '${cow_table_location_sql}'
OPTIONS (
  type = 'cow',
  primaryKey = '${hudi_primary_key}',
  preCombineField = '${hudi_pre_combined_field}'
 ) 
PARTITIONED BY (${hudi_partitioin_field});

## 2.2 Insert Hudi CoW table using Spark SQL

In [None]:
# Read Source Data
src_df = spark.read.parquet(SRC_DATA_S3)
src_df.createOrReplaceTempView(SRC_DATA_TEMP_VIEWT)


In [None]:
%%sql

SELECT 
*
FROM ${src_df_view}
LIMIT 5


In [None]:
src_df.filter(
    (col("c_birth_country") == 'CHINA') | (col("c_birth_country") == 'HONG KONG')
).groupBy(
    "c_birth_country"
).count().show(10, False)

In [None]:
%%sql

INSERT OVERWRITE ${hudi_catalog}.${hudi_db}.${cow_table_name_sql}
SELECT 
    c_customer_id ,  
    c_customer_sk,
    c_email_address,
    c_first_name,
    c_last_name,
    unix_timestamp(current_timestamp()) AS ts,
    c_birth_country
FROM ${src_df_view}
WHERE c_birth_country = 'HONG KONG' OR c_birth_country = 'CHINA' 
LIMIT 1000

In [None]:
%%sql

SELECT * FROM ${hudi_catalog}.${hudi_db}.${cow_table_name_sql} LIMIT 5

In [None]:
%%sql

SELECT 
    c_birth_country, 
    count(*) 
FROM ${hudi_catalog}.${hudi_db}.${cow_table_name_sql} 
GROUP BY c_birth_country;

## 2.3 Again Insert into Hudi CoW table using Spark SQL

In [None]:
# Read Source Data
src_df = spark.read.parquet(SRC_DATA_S3)
src_df.createOrReplaceTempView(SRC_DATA_TEMP_VIEWT)

insert_into_view = "insert_into_view"

spark.conf.set('insert_into_view', insert_into_view)


We can drop the temporary view if we need to update

```
%%sql 

DROP VIEW IF EXISTS ${insert_into_view};
```

In [None]:
%%sql


CREATE TEMPORARY VIEW ${insert_into_view} AS (
    (SELECT
        c_customer_id ,  
        c_customer_sk,
        c_email_address,
        c_first_name,
        c_last_name,
        unix_timestamp(current_timestamp()) AS ts,
        c_birth_country
    FROM ${src_df_view}
    WHERE c_birth_country = 'INDIA' 
    LIMIT 3
    )
    
    UNION
    
    (SELECT
        c_customer_id ,  
        c_customer_sk,
        c_email_address,
        'MASKED' AS c_first_name,
        c_last_name,
        unix_timestamp(current_timestamp()) AS ts,
        c_birth_country
    FROM ${hudi_catalog}.${hudi_db}.${cow_table_name_sql}
    WHERE c_birth_country = 'CHINA' 
    LIMIT 3
    )
);

In [None]:
%%sql

SELECT * FROM ${insert_into_view}

In [None]:
%%sql
INSERT INTO ${hudi_catalog}.${hudi_db}.${cow_table_name_sql}
SELECT 
    c_customer_id ,  
    c_customer_sk,
    c_email_address,
    c_first_name,
    c_last_name,
    unix_timestamp(current_timestamp()) AS ts,
    c_birth_country
FROM ${insert_into_view}

Check Hudi CoW table after merge_into operation

In [None]:
%%sql

SELECT *  
FROM ${hudi_catalog}.${hudi_db}.${cow_table_name_sql}
WHERE 
    c_birth_country = 'INDIA' OR c_first_name = 'MASKED'

# 3. Hudi DB Creator MoR DDL & DML with Spark SQL 

## 3.1 Create Hudi MoR table using Spark SQL

We can drop the table using the following SQL statement 
```
%%sql

DROP TABLE IF EXISTS ${hudi_catalog}.${hudi_db}.${mor_table_name_sql};
```

```
%%sql

DROP TABLE IF EXISTS ${hudi_catalog}.${hudi_db}.${mor_table_name_sql}_ro;
```

```
%%sql

DROP TABLE IF EXISTS ${hudi_catalog}.${hudi_db}.${mor_table_name_sql}_rt;
```

In [None]:
%%sql 

CREATE TABLE IF NOT EXISTS ${hudi_catalog}.${hudi_db}.${mor_table_name_sql}(
    c_customer_id string,
    c_birth_country string,
    c_customer_sk integer,
    c_email_address string,
    c_first_name string,
    c_last_name string,
    ts bigint
) USING hudi
LOCATION '${mor_table_location_sql}'
OPTIONS (
  type = 'mor',
  primaryKey = '${hudi_primary_key}',
  preCombineField = '${hudi_pre_combined_field}'
 ) 
PARTITIONED BY (${hudi_partitioin_field});

## 3.2 Insert Hudi MoR table using Spark SQL

In [None]:
# Read Source Data
src_df = spark.read.parquet(SRC_DATA_S3)
src_df.createOrReplaceTempView(SRC_DATA_TEMP_VIEWT)


In [None]:
%%sql

INSERT OVERWRITE ${hudi_catalog}.${hudi_db}.${mor_table_name_sql}
SELECT 
    c_customer_id ,  
    c_customer_sk,
    c_email_address,
    c_first_name,
    c_last_name,
    unix_timestamp(current_timestamp()) AS ts,
    c_birth_country
FROM ${src_df_view}
WHERE c_birth_country = 'HONG KONG' OR c_birth_country = 'CHINA' 
LIMIT 1000

In [None]:
%%sql

SELECT 
    c_birth_country, 
    count(*) 
FROM ${hudi_catalog}.${hudi_db}.${mor_table_name_sql} 
GROUP BY c_birth_country;

## 3.3 Insert into Hudi MoR table using Spark SQL

In [None]:
%%sql

INSERT INTO ${hudi_catalog}.${hudi_db}.${mor_table_name_sql}
SELECT
        c_customer_id ,  
        c_customer_sk,
        'UNKNOWN' AS c_email_address,
        c_first_name,
        c_last_name,
        unix_timestamp(current_timestamp()) AS ts,
        c_birth_country
    FROM ${src_df_view}
    WHERE c_birth_country = 'HONG KONG' 
    LIMIT 50

In [None]:
%%sql

SELECT
    a.email_label,
    count(*)
FROM (
    SELECT
        CASE
            WHEN c_email_address = 'UNKNOWN' THEN 'UNKNOWN'
            ELSE 'NOT_UNKNOWN'
        END AS email_label
    FROM ${hudi_catalog}.${hudi_db}.${mor_table_name_sql}
    WHERE c_birth_country = 'HONG KONG'
) a
GROUP BY a.email_label;

## 3.4 Again Insert into Hudi MoR table using Spark SQL

In [None]:
# Read Source Data
src_df = spark.read.parquet(SRC_DATA_S3)
src_df.createOrReplaceTempView(SRC_DATA_TEMP_VIEWT)

mor_insert_into_view = "mor_insert_into_view"

spark.conf.set('mor_insert_into_view', mor_insert_into_view)

In [None]:
%%sql

CREATE TEMPORARY VIEW ${mor_insert_into_view} AS (
    (SELECT
        c_customer_id ,  
        c_customer_sk,
        c_email_address,
        c_first_name,
        c_last_name,
        unix_timestamp(current_timestamp()) AS ts,
        c_birth_country
    FROM ${src_df_view}
    WHERE c_birth_country = 'INDIA' 
    LIMIT 3
    )
    
    UNION
    
    (SELECT
        c_customer_id ,  
        c_customer_sk,
        c_email_address,
        'MASKED' AS c_first_name,
        c_last_name,
        unix_timestamp(current_timestamp()) AS ts,
        c_birth_country
    FROM ${hudi_catalog}.${hudi_db}.${cow_table_name_sql}
    WHERE c_birth_country = 'CHINA' 
    LIMIT 3
    )
);

In [None]:
%%sql

INSERT INTO ${hudi_catalog}.${hudi_db}.${mor_table_name_sql}
SELECT 
    c_customer_id ,  
    c_customer_sk,
    c_email_address,
    c_first_name,
    c_last_name,
    unix_timestamp(current_timestamp()) AS ts,
    c_birth_country
FROM ${mor_insert_into_view} 

In [None]:
%%sql

SELECT *  
FROM ${hudi_catalog}.${hudi_db}.${mor_table_name_sql}
WHERE 
    c_birth_country = 'INDIA' OR c_first_name = 'MASKED'