# Iceberg Hands-on Lab Notebook

This notebook creates an Iceberg table in the Glue Catalog database from the parquet files loaded into S3.
It also provides a temaplate of how existing Glue Catalog Data Lake parquet tables can be converted to Iceberg format to be used directly with Snowflake.

---

### Glue session configurations. 

Configure version and compute resources

In [None]:
%session_id_prefix iceberg-upgrade-add-files-
%glue_version 4.0
%idle_timeout 300
%number_of_workers 2
%worker_type G.1X
%%configure 
{
  "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
  "--datalake-formats": "iceberg"
}

### Variables for S3 bucket name, path, database and table names

Enter the S3 bucket name that is used for this HOL - replace your-bucket-name with the S3 bucket name in your AWS account that is used for this HOL

In [None]:
bucket_name = "your-bucket-name" # your Amazon S3 bucket name for this HOL
bucket_prefix = "iceberg/quotes" # path to parquet data
catalog_name = "glue_catalog"
database_name = "iceberg_devday"
table_name = "quotes"
file_path = f"s3://{bucket_name}/{bucket_prefix}"

### Configure S3 path and copy data files

In [None]:
import boto3
s3 = boto3.client('s3')

s3.put_object(Bucket=bucket_name, Key=(bucket_prefix+'/'))

s3.copy_object(
    Bucket=bucket_name, 
    CopySource='/snowflake-corp-se-workshop/VHOL_Iceberg_SNOW_AWS/data/quotes/snow_T3u4rF4p-cY_AGgidpZHvhc_0_2_002.parquet',
    Key=(bucket_prefix+'/snow_T3u4rF4p-cY_AGgidpZHvhc_0_2_002.parquet')
)

s3.copy_object(
    Bucket=bucket_name, 
    CopySource='/snowflake-corp-se-workshop/VHOL_Iceberg_SNOW_AWS/data/quotes/snow_T3u4rF4p-cY_AGgidpZHvhc_0_2_004.parquet',
    Key=(bucket_prefix+'/snow_T3u4rF4p-cY_AGgidpZHvhc_0_2_004.parquet')
)

s3.copy_object(
    Bucket=bucket_name, 
    CopySource='/snowflake-corp-se-workshop/VHOL_Iceberg_SNOW_AWS/data/quotes/snow_T3u4rF4p-cY_AGgidpZHvhc_0_2_006.parquet',
    Key=(bucket_prefix+'/snow_T3u4rF4p-cY_AGgidpZHvhc_0_2_006.parquet')
)



### Create Glue Database

In [None]:
glue = boto3.client(service_name='glue')

glue.create_database(
        
    DatabaseInput={
        'Name': database_name,
        'Description': 'iceberg database'
    }    
)

### Initialize Spark Session

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.warehouse", file_path) \
    .config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .getOrCreate()

### Check existing tables in the catalog/database

In [None]:
%%sql

USE iceberg_devday

In [None]:
%%sql

SHOW TABLES

### Create an Iceberg table from temporary view

The view is used to workaround Spark DDL operations. 
This is not required if the tables already exist in the Glue Catalog, as you can simply reuse the DDL from the existing table with a CTAS LIMIT 0

In [None]:
# Create temp view to generate DDL for Iceberg table
# This step is not needed if using existing Glue Catalog tables or Glue Crawler - simplified for lab

query = f"""
create or replace temporary view temp_view_quotes as 
select
    CAST ( 'a' AS VARCHAR(100)) AS UUID ,
    CAST ( 'a' AS VARCHAR(32)) AS BATCHID ,
    CAST ( 'a' AS VARCHAR(100)) AS QUOTE_PRODUCT ,
    CAST ( '2024-01-01' AS DATE) AS QUOTEDATE,
    CAST ( 1 AS DECIMAL(38,0)) AS QUOTEHOUR ,
    CAST ( 'a' AS VARCHAR(50)) AS POLICYNO ,
    CAST ( '2024-01-01' AS DATE) AS INCEPTIONDATE ,
    CAST ( '2024-01-01' AS DATE) AS EXPIRYDATE ,
    CAST ( '2024-01-01' AS DATE) AS EFFECTIVESTARTDATE ,
    CAST ( '2024-01-01' AS DATE) AS EFFECTIVEENDDATE ,
    CAST ( 'a' AS VARCHAR(20)) AS PREVINSR ,
    CAST ( 'a' AS VARCHAR(20)) AS CREDITCHECKSCONSENTIND ,
    CAST ( 1 AS DECIMAL(38,0)) AS CREDITSCORE ,
    CAST ( 'a' AS VARCHAR(100)) AS DATEOFBIRTH ,
    CAST ( 'a' AS VARCHAR(20)) AS HOMEOWNERIND ,
    CAST ( 'a' AS VARCHAR(20)) AS MARITALSTATUS ,
    CAST ( 1 AS DECIMAL(38,0)) AS VEHICLESAVAILABLE ,
    CAST ( 'a' AS VARCHAR(20)) AS PRN ,
    CAST ( 'a' AS VARCHAR(20)) AS SEX ,
    CAST ( 'a' AS VARCHAR(100)) AS POSTCODEDISTRICT ,
    CAST ( 'a' AS VARCHAR(100)) AS POSTCODEFULL ,
    CAST ( 'a' AS VARCHAR(100)) AS POSTCODESECTOR ,
    CAST ( 'a' AS VARCHAR(100)) AS SURNAME ,
    CAST ( 1 AS DECIMAL(28,2)) AS IPTAMOUNT ,
    CAST ( 1 AS DECIMAL(28,2)) AS NEWRISKPREMIUM ,
    CAST ( 1 AS DECIMAL(28,2)) AS OLDRISKPREMIUM ,
    CAST ( 1 AS DECIMAL(28,2)) AS ORIGINALPREMIUM ,
    CAST ( 1 AS DECIMAL(28,2)) AS PREMIUMINCLIPT ,
    CAST ( 1 AS DECIMAL(28,2)) AS PREMIUMEXCLIPT ,
    CAST ( 1 AS DECIMAL(28,2)) AS TOTALPREMIUMPAYABLE ,
    CAST ( 'a' AS VARCHAR(25)) AS AGENCYREF ,
    CAST ( 'a' AS VARCHAR(25)) AS BUSINESSSOURCECODE ,
    CAST ( 'a' AS VARCHAR(25)) AS INTERMEDIARY_CODE ,
    CAST ( 'a' AS VARCHAR(25)) AS INSRPMTTYPE ,
    CAST ( 1 AS DECIMAL(38,0)) AS DEBITFRQCY ,
    CAST ( 'a' AS VARCHAR(250)) AS ADDRESS ,
    CAST ( 'a' AS VARCHAR(250)) AS FULLNAME ,
    CAST ( 'a' AS VARCHAR(250)) AS POSTCODE ,
    CAST ( 'a' AS VARCHAR(250)) AS PHONENUMBER ,
    CAST ( 'a' AS VARCHAR(511)) AS EMAIL 

"""
spark.sql(query)

In [None]:
# Create empty Iceberg table usign DDL from parquet table with temp view

query = f"""
CREATE OR REPLACE TABLE {catalog_name}.{database_name}.{table_name} 
USING iceberg
AS SELECT * FROM temp_view_quotes LIMIT 0


"""
spark.sql(query)

In [None]:
%%sql

SHOW TABLES

### Execute add_files procedure

Use add_files to add the parquest files to the created Iceberg table. This allows you to leverage Iceberg format without the need to rewrite the underlying data files. Can be very handy to convert existing Data Lake tables to Iceberg format.

In [None]:
query = f"""
CALL {catalog_name}.system.add_files(table => '{database_name}.{table_name}', source_table => '`parquet`.`{file_path}`')
"""

spark.sql(query).show(truncate=False)

### Check the data files which belong to the Iceberg table

Notice that the data file path is still pointing to the original path where Parquet files are residing. Therefore, no change in the path

In [None]:
query = f"""
SELECT file_path FROM {catalog_name}.{database_name}.{table_name}.files
"""

spark.sql(query).show(10, truncate=False)

### Check new metadata files

These files will be available in the Glue Catalog directory set above. This path is pointing to a separate folder under the S3 bucket, where only the metedata files resides

In [None]:
query = f"""
SELECT snapshot_id, manifest_list FROM {catalog_name}.{database_name}.{table_name}.snapshots
"""

spark.sql(query).show(10, truncate=False)

### Stop session

In [None]:
%stop_session