**Iceberg AWS Event Based Table Management for EMR-6.11 Spark Cluster**

**Step1.** Configuring Iceberg on Spark session

Configure your Spark session using the %%configure magic command. We will be using Hive Catalog for Iceberg Tables. Before you run the following step, create a S3 bucket in your AWS account with following naming convemtion /iceberg/

Update the your-iceberg-storage-blog in below configuration with the bucket which you created to test this example

In [1]:
%%configure -f
{
"conf":{
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
    "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
    "spark.sql.catalog.dev.warehouse":"s3://<your-iceberg-storage-blog>/iceberg/"
    }
}

In [3]:
spark.sql(""" DROP TABLE  iceberg_db.sensor_data_parquet_table """)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

**Step2.** Create a new database for the Iceberg table in the AWS Glue Data Catalog named DB and provide the S3 URI specified in the Spark config as s3://<your-iceberg-storage-blog>/iceberg/db. Also, create another Database named iceberg_db in Glue for the parquet tables."


Create a new Spark table in Parquet format pointing to the bucket containing small object

In [4]:
spark.sql(""" CREATE TABLE  iceberg_db.sensor_data_parquet_table (
    sensorid int, 
    currenttemperature int, 
    status string, 
    date_ts timestamp)
USING parquet 
location 's3://<your-iceberg-storage-blog>/sensor_data_smallfiles_parquet/'
""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

**Step3.** Now let's run a aggregate SQL to measure the performance of Spark SQL on the parquet table with 58,176 small object.

In [5]:
spark.sql(""" select maxtemp, mintemp, avgtemp from 
(select
 max(currenttemperature) as maxtemp, 
 min(currenttemperature) as mintemp, 
 avg(currenttemperature) as avgtemp 
 from iceberg_db.sensor_data_parquet_table
 where month(date_ts) between 2 and 10
 order by  maxtemp, mintemp, avgtemp)""").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------+-----------------+
|maxtemp|mintemp|          avgtemp|
+-------+-------+-----------------+
|    150|     10|79.99732039114782|
+-------+-------+-----------------+

**Result:** The execution time noted for the above aggregation query as **8m 47.12s**

**>>Now in the following steps we will create a new Iceberg table from the Spark/Parquet table using CTAS. After this step, we will show how the automated compaction job is going to help improve the performance of the queries**

**Step4.** Let's create a new Iceberg table using CTAS (Create Table As Select) from the earlier Spark/Glue table having the small files 

In [6]:
spark.sql(""" DROP TABLE IF EXISTS dev.db.sensor_data_iceberg_format """)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

In [7]:
spark.sql(""" CREATE TABLE dev.db.sensor_data_iceberg_format USING iceberg AS (SELECT * FROM iceberg_db.sensor_data_parquet_table)""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

**Step5.** Validate a new Iceberg snapshot created for the new table

In [9]:
spark.sql(""" Select * from dev.db.sensor_data_iceberg_format.snapshots limit 5""").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+---------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+---------+---------+--------------------+--------------------+
|2023-07-26 17:08:...|5276645498530515109|     null|   append|s3://iceberg-file...|{spark.app.id -> ...|
+--------------------+-------------------+---------+---------+--------------------+--------------------+

**Validation:** Validate the S3 Data folder corresponding to the newly created Iceberg table. It shows that during the CTAS statement above it added 1,879 objects in the new /Data folder with total size of 1.3GB. So Iceberg did some optimization while loading data from parquet table.

**Step6:** Now we have data in the Iceberg table, let's run the previous Aggregation SQL to check the execution time

In [8]:
spark.sql(""" select maxtemp, mintemp, avgtemp from 
(select
 max(currenttemperature) as maxtemp, 
 min(currenttemperature) as mintemp, 
 avg(currenttemperature) as avgtemp 
 from dev.db.sensor_data_iceberg_format
 where month(date_ts) between 2 and 10
 order by  maxtemp, mintemp, avgtemp)""").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------+-----------------+
|maxtemp|mintemp|          avgtemp|
+-------+-------+-----------------+
|    150|     10|79.99732039114782|
+-------+-------+-----------------+

**Result** Note the execution time for the above aggregation query ran on the Iceberg table with 1879 objects as **1m 39.56s**. There is already some significant performance improvement by converting the expternal parquet table to Iceberg table

**Step7.** Now let's add the configurations we will need to apply the automatic compaction of small files of Iceberg tables. Note the last four newly added configurations in the following statement. Parameter "optimize-data.commit-threshold" suggests that the compaction will take place after the the 1st successful commit. The dafault is 10 successful commit to trigger the compaction. For this testing we are using just 1 commit to trigger the compaction.

In [10]:
%%configure -f
{
"conf":{
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
    "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
    "spark.sql.catalog.dev.warehouse":"s3://<your-iceberg-storage-blog>/iceberg/",
    "spark.sql.catalog.dev.metrics-reporter-impl":"org.apache.iceberg.aws.manage.AwsTableManagementMetricsEvaluator",
    "spark.sql.catalog.dev.optimize-data.impl":"org.apache.iceberg.aws.manage.EmrOnEc2OptimizeDataExecutor",
    "spark.sql.catalog.dev.optimize-data.emr.cluster-id":"j-xxxxxxxxxxxx",
    "spark.sql.catalog.dev.optimize-data.commit-threshold":"1"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1690346859076_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1690346859076_0002,pyspark,idle,Link,Link,,✔


**Step8.** A Quick sanity check that the configurations are working fine with Spark-SQL

In [11]:
spark.sql(""" select * from dev.db.sensor_data_iceberg_format limit 1 """).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------------------+------+-------------------+
|sensorid|currenttemperature|status|            date_ts|
+--------+------------------+------+-------------------+
|    2811|               116|  FAIL|2023-06-07 14:50:25|
+--------+------------------+------+-------------------+

**Step9.** Now to activate the automatic compaction process, we have to add a new record to the existing Iceberg table. Let's add a new record in the table using Spark Insert 

In [12]:
spark.sql(""" Insert into dev.db.sensor_data_iceberg_format values(999123, 86, 'PASS', timestamp'2023-07-26 12:50:25') """) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

**Step10.** Go to EMR Console to check the Cluster Steps. You should see a new Step added which goes from Pending to Running and finally Completed state.


**Step11:** Validate that the record inserted was successful

In [37]:
spark.sql(""" select * from dev.db.sensor_data_iceberg_format where sensorid = 999123 and date_ts = timestamp'2023-07-26 12:50:25' """).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+------------------+------+-------------------+
|sensorid|currenttemperature|status|            date_ts|
+--------+------------------+------+-------------------+
|  999999|                92|  FAIL|2023-07-26 01:50:25|
+--------+------------------+------+-------------------+

**Step12:** Check the snapshot table to see that a new snapshot is created for the table with operation as **replace**

In [14]:
spark.sql(""" Select * from dev.db.sensor_data_iceberg_format.snapshots limit 5""").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2023-07-26 17:08:...|5276645498530515109|               null|   append|s3://iceberg-file...|{spark.app.id -> ...|
|2023-07-26 17:21:...|3706857927051738345|5276645498530515109|   append|s3://iceberg-file...|{spark.app.id -> ...|
|2023-07-26 17:28:...|  27167933715576041|3706857927051738345|  replace|s3://iceberg-file...|{added-data-files...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+

**Step13.** Go to the S3 bucket /Data folder corresponding to the Iceberg table and see that the data files are compacted from the previous smaller sizes to ~437MB. The /Data folder will still contain the previous smaller files for time-travel perspective unless you issue an expiere snapshot to remove them.

**Step14.** Now let's run the same aggregate query and record the performance after the compaction

In [15]:
spark.sql(""" select maxtemp, mintemp, avgtemp from 
(select
 max(currenttemperature) as maxtemp, 
 min(currenttemperature) as mintemp, 
 avg(currenttemperature) as avgtemp 
 from dev.db.sensor_data_iceberg_format
 where month(date_ts) between 2 and 10
 order by  maxtemp, mintemp, avgtemp)""").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------+-----------------+
|maxtemp|mintemp|          avgtemp|
+-------+-------+-----------------+
|    150|     10|79.99732040101847|
+-------+-------+-----------------+

**Summary of EMR Testing:** Note the execution time for the above aggregation query ran on the compacted Iceberg table reduced to **59.43s** from the previous run time of **1m 39.56s**. That is about **40%** improvement. With more small files you have in our source bucket, you would realize more performance boost with this post-hook compaction implementation