# Spark with Iceberg on S3 

## Prerequisites

#### 1. Make sure maven is configured with java 11
1. Hadoop 3.x: Requires Java 8 or Java 11, so point to java 11 version in `PATH`
2. Check `mvn --version`
3. If not configured, setup maven https://phoenixnap.com/kb/install-maven-windows

#### 2. Add `winutils.exe` to hadoop home
1. Create a directory for hadoop home (ex: `C:\hadoop`)
2. Download `winutils.exe`, a popular repository having this is https://github.com/steveloughran/winutils,   
   File link: https://github.com/steveloughran/winutils/blob/master/hadoop-3.0.0/bin/winutils.exe
4. Create a folder in hadoop home named `bin` and add `winutils.exe` to it
5. Add environment variable, `HADOOP_HOME` (`C:\hadoop`)
6. Add  `HADOOP_HOME\bin` to `PATH` (`%HADOOP_HOME%\bin`)

#### 3. Add below AWS related values to environment variables

These can be taken from https://sysco-sso.awsapps.com/start/#

1. `AWS_REGION`
2. `AWS_ACCESS_KEY_ID`
3. `AWS_SECRET_ACCESS_KEY`
4. `AWS_SESSION_TOKEN`

## Note
> A database named `test_uph_iceberg` has been created in AWS Glue. It will be used in this example.   
Link: https://us-east-1.console.aws.amazon.com/glue/home?region=us-east-1#/v2/data-catalog/databases/view/test_uph_iceberg?catalogId=909082630066

> Catelog name is taken as `uph_rnd`. This can be any name

## Create Spark Session

In [1]:
ICEBERG_S3_WAREHOUSE = "s3://cx-unique-purchase-data-non-prod/dev/test_data"
AWS_REGION = "us-east-1"

In [3]:
from pyspark.sql import SparkSession
import time

start_time = time.time()

spark = (
    SparkSession.builder 
    .appName('Spark Iceberg Example')
    
    .config("spark.network.timeout", '10000s')
    .config('spark.sql.autoBroadcastJoinThreshold', -1)
    .config('spark.shuffle.consolidateFiles', True)
    .config('spark.dynamicAllocation.enabled', False)
    .config("spark.serializer", 'org.apache.spark.serializer.KryoSerializer')
    .config('spark.shuffle.service.enabled', False)
    .config('spark.hadoop.fs.s3.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')

    .config("spark.hadoop.fs.s3a.endpoint", f"s3.{AWS_REGION}.amazonaws.com")

    .config('spark.jars.packages', 
            'org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.1,'
            'org.apache.iceberg:iceberg-aws:1.4.1,'
            'org.apache.iceberg:iceberg-aws-bundle:1.4.1,')

    .config('spark.partial-progress.enabled', True)
    
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    
    .config("spark.sql.catalog.uph_rnd", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.uph_rnd.glue-database", "test_uph_iceberg")
    .config("spark.sql.catalog.uph_rnd.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.uph_rnd.warehouse", ICEBERG_S3_WAREHOUSE)
    .config("spark.sql.catalog.uph_rnd.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .getOrCreate()
)

end_time = time.time()

print(f"Spark session created in {end_time - start_time} seconds")

Spark session created in 0.7298665046691895 seconds


## Create Iceberg Table

In [3]:
# Create an Iceberg table using DDL (if not already created)
spark.sql("""
    CREATE TABLE IF NOT EXISTS uph_rnd.test_uph_iceberg.users (
        UserID INT,
        Username STRING,
        Email STRING,
        SignupDate DATE,
        LastLogin DATE
    )
    USING iceberg
    PARTITIONED BY (LastLogin)
""")

DataFrame[]

## Check Tables in database

In [4]:
# Check tables in test_uph_iceberg database
spark.sql("SHOW TABLES IN uph_rnd.test_uph_iceberg").show()

+----------------+---------+-----------+
|       namespace|tableName|isTemporary|
+----------------+---------+-----------+
|test_uph_iceberg|    users|      false|
+----------------+---------+-----------+



## Load Example Data

In [41]:
# Load CSV example data from a CSV file
csv_file_path = "example_user_list.csv"
df_users = spark.read.csv(csv_file_path, header=True, inferSchema=True)
df_users.show()

+------+------------+--------------------+----------+----------+
|UserID|    Username|               Email|SignupDate| LastLogin|
+------+------------+--------------------+----------+----------+
|     1|     johndoe| johndoe@example.com|2024-01-01|2024-10-01|
|     2|     janedoe| janedoe@example.com|2024-02-15|2024-10-12|
|     3|    bobsmith|bobsmith@example.com|2024-03-20|2024-09-28|
|     4|  alicejones|alicejones@exampl...|2024-04-10|2024-10-05|
|     5|charliebrown|charliebrown@exam...|2024-05-05|2024-10-10|
+------+------------+--------------------+----------+----------+



## Insert Example Data

In [42]:
# Load CSV data into temporary view
df_users.createOrReplaceTempView("temp_users")

In [50]:
# Insert data from the temporary view into the Iceberg table
spark.sql("""
    INSERT INTO uph_rnd.test_uph_iceberg.users
    SELECT * FROM temp_users
""")

DataFrame[]

## Read Data in Iceberg Table

In [4]:
df = spark.sql("SELECT * FROM uph_rnd.test_uph_iceberg.users")
df.show()

+------+------------+--------------------+----------+----------+
|UserID|    Username|               Email|SignupDate| LastLogin|
+------+------------+--------------------+----------+----------+
|     3|    bobsmith|bobsmith@example.com|2024-03-20|2024-09-28|
|     1|     johndoe| johndoe@example.com|2024-01-01|2024-10-01|
|     4|  alicejones|alicejones@exampl...|2024-04-10|2024-10-05|
|     5|charliebrown|charliebrown@exam...|2024-05-05|2024-10-10|
|     2|     janedoe| janedoe@example.com|2024-02-15|2024-10-12|
+------+------------+--------------------+----------+----------+



## Truncate Data in Iceberg Table

In [49]:
# Truncate table
spark.sql("TRUNCATE TABLE uph_rnd.test_uph_iceberg.users")

DataFrame[]

In [6]:
s3_parquet_path = "s3://cx-staging-glue-catalog/dev/sysco_src_restrictions_product_attributes_staging.parquet"
parquet_df = spark.read.parquet(s3_parquet_path)
parquet_df.show()

+-----------+-----------+---------+-------------+---------+--------------------+--------------------+-------+
|customer_id|cust_id_int|seller_id|sub_seller_id|list_type|             list_id|          updated_at|opco_id|
+-----------+-----------+---------+-------------+---------+--------------------+--------------------+-------+
|     000000|          0|     USBL|         USBL|    Block|056_common_cannotbuy|2024-03-12 12:47:...|     56|
|     000000|          0|     USBL|         USBL|  Can Buy|          056_canbuy|2024-03-12 12:47:...|     56|
|     000006|          6|     USBL|         USBL|    Block|056_common_cannotbuy|2024-09-24 23:35:...|     56|
|     000008|          8|     USBL|         USBL|  Can Buy|          056_canbuy|2024-09-24 23:35:...|     56|
|     000009|          9|     USBL|         USBL|    Block|056_common_cannotbuy|2024-03-12 12:47:...|     56|
|     000009|          9|     USBL|         USBL|  Can Buy|          056_canbuy|2024-03-12 12:47:...|     56|
|     0000