# EMR Import to Iceberg Demo

This notebook shows how to import and optimize a public CSV dataset to an Iceberg table.

First, we need to read the data from S3. When setting up the session, we specify several Iceberg-related parameters

In [1]:
%%configure -f
{
    "conf": {
        "spark.jars": "local:///usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar",
        "spark.sql.catalog.dev.warehouse": "s3://odapocinfra-awspocdatabucket5eb99186-1nswrrisawh0t/output/poc/iceberg/",
        "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "spark.sql.catalog.dev": "org.apache.iceberg.spark.SparkCatalog",
        "spark.sql.catalog.dev.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1695280706788_0002,pyspark,busy,Link,Link,,


In [12]:
df = spark.read.format("csv").load("s3://noaa-gsod-pds/20*/72793*.csv", inferSchema="true", header="true")
df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+--------+---------+---------+--------------------+----+---------------+----+---------------+------+--------------+-----+--------------+-----+----------------+----+---------------+-----+-----+----+--------------+----+--------------+----+---------------+-----+------+
|    STATION|      DATE|LATITUDE|LONGITUDE|ELEVATION|                NAME|TEMP|TEMP_ATTRIBUTES|DEWP|DEWP_ATTRIBUTES|   SLP|SLP_ATTRIBUTES|  STP|STP_ATTRIBUTES|VISIB|VISIB_ATTRIBUTES|WDSP|WDSP_ATTRIBUTES|MXSPD| GUST| MAX|MAX_ATTRIBUTES| MIN|MIN_ATTRIBUTES|PRCP|PRCP_ATTRIBUTES| SNDP|FRSHTT|
+-----------+----------+--------+---------+---------+--------------------+----+---------------+----+---------------+------+--------------+-----+--------------+-----+----------------+----+---------------+-----+-----+----+--------------+----+--------------+----+---------------+-----+------+
|72793024233|2000-01-01| 47.4444|-122.3138|    112.8|SEATTLE TACOMA IN...|41.7|           24.0|38.7|           24.0|1010.7|       

We want to use [Iceberg auto-partitioning](https://iceberg.apache.org/docs/latest/partitioning/), but to do so we need to use `sortWithinPartitions`.

In [13]:
df = df.sortWithinPartitions("DATE")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

And then we'll create a temp table to use to create our Iceberg table.

In [14]:
df.createOrReplaceTempView("input_table")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now create the Glue table.

In [15]:
partitions=["years(DATE)"]
query = f"""
        CREATE OR REPLACE TABLE dev.poc_default.noaa_poc_demo
        USING iceberg
        PARTITIONED BY ({','.join(partitions)})
        AS SELECT * FROM input_table
    """
spark.sql(query)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[]

Now we can query the table!

In [16]:
count = spark.sql(f"SELECT count(*) FROM dev.poc_default.noaa_poc_demo").first()[0]
print(f"Number of records in iceberg: {count}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of records in iceberg: 43305

In [17]:
%%sh
aws s3 ls s3://odapocinfra-awspocdatabucket5eb99186-1nswrrisawh0t/output/poc/iceberg/poc_default.db/noaa_poc_demo/data/DATE_year=2023/ --summarize --human | tail

2023-09-21 07:59:27   15.0 KiB 00003-9-d2b1e785-5ecb-4827-a171-2ba89629d21e-00005.parquet
2023-09-21 08:03:35   29.2 KiB 00023-302-58362fc3-a108-493c-95bf-18d5f9efe73f-00001.parquet

Total Objects: 2
   Total Size: 44.3 KiB


## Iceberg Maintenance

While Iceberg has some good defaults, we still occasionally need to perform maintenace particularly if our source data is not of good shape.

In this case, we had many little CSV files. We can change import settings, or we can optimize after the fact.

In [18]:
target_file_size = 256
compact_query = f"CALL dev.system.rewrite_data_files(table => 'poc_default.noaa_poc_demo', options => map('target-file-size-bytes', {target_file_size * 1024 * 1024}))"
out = spark.sql(compact_query)
out.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------+----------------------+---------------------+-----------------------+
|rewritten_data_files_count|added_data_files_count|rewritten_bytes_count|failed_data_files_count|
+--------------------------+----------------------+---------------------+-----------------------+
|                        50|                    10|               845541|                      0|
+--------------------------+----------------------+---------------------+-----------------------+

In [19]:
%%sh
aws s3 ls s3://odapocinfra-awspocdatabucket5eb99186-1nswrrisawh0t/output/poc/iceberg/poc_default.db/noaa_poc_demo/data/DATE_year=2023/ --human | sort | tail

2023-09-21 07:59:27   15.0 KiB 00003-9-d2b1e785-5ecb-4827-a171-2ba89629d21e-00005.parquet
2023-09-21 08:03:35   29.2 KiB 00023-302-58362fc3-a108-493c-95bf-18d5f9efe73f-00001.parquet
