### Preliminary Step

In [None]:
# Define Dependencies (Packages To Be Downloaded From Maven)

# 1. Required for Spark to interact with an Iceberg WH
DEPENDENCIES = "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1"
DEPENDENCIES += ",software.amazon.awssdk:bundle:2.20.18"
DEPENDENCIES += ",com.amazonaws:aws-java-sdk-bundle:1.11.901"
DEPENDENCIES += ",org.apache.hadoop:hadoop-aws:3.3.4"

# 2. Required only when catalog is stored in Postgres DB
DEPENDENCIES += ",org.postgresql:postgresql:42.6.0"

### 1. Local to MinIO - Using JDBC Catalog (PostGres DB)

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
#from pyspark.sql.types import *

def spark_local_to_minio(icb_catalog_name,
                         iceberg_warehouse,
                         storage_type,
                         pg_user,
                         pg_password,
                         minio_bucket,
                         minio_access_key,
                         minio_secret_key,
                         minio_end_point):

    conf = (
            SparkConf()
            .setAppName('spark_local_to_minio')
            #Dependencies
            .set('spark.jars.packages', DEPENDENCIES)
            #SQL Extensions
            .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
            #Catalog Configuration
            .set(f'spark.sql.catalog.{icb_catalog_name}', 'org.apache.iceberg.spark.SparkCatalog')
            .set(f'spark.sql.catalog.{icb_catalog_name}.catalog-impl', 'org.apache.iceberg.jdbc.JdbcCatalog')
            .set(f'spark.sql.catalog.{icb_catalog_name}.uri', f'jdbc:postgresql://localhost:5439/{pg_db}')
            .set(f'spark.sql.catalog.{icb_catalog_name}.jdbc.user', pg_user)
            .set(f'spark.sql.catalog.{icb_catalog_name}.jdbc.password', pg_password)
            .set(f'spark.sql.catalog.{icb_catalog_name}.jdbc.verifyServerCertificate', 'true')
            .set(f'spark.sql.catalog.{icb_catalog_name}.jdbc.useSSL', 'true')
            .set(f'spark.sql.defaultCatalog', icb_catalog_name)
            .set(f'spark.sql.catalog.{icb_catalog_name}.warehouse', f's3a://{minio_bucket}/{iceberg_warehouse}/{storage_type}/')
            # MinIO Configuration
            .set('spark.hadoop.fs.s3a.access.key', minio_access_key)
            .set('spark.hadoop.fs.s3a.secret.key', minio_secret_key)
            .set("spark.hadoop.fs.s3a.endpoint", minio_end_point)
    )
    
    ## Start Spark Session
    spark = SparkSession.builder.config(conf=conf).getOrCreate()

    print("Spark Session Running")
    
    return spark

############################################
icb_catalog_name = 'pg_catalog'
iceberg_warehouse = 'iceberg-warehouse-pg'
storage_type = 'data-archives'

pg_db = 'iceberg_warehouse_pg'
pg_user = 'postgres'
pg_password = 'postgres'

minio_bucket = 'iceberg-bucket'
minio_access_key = 'admin'
minio_secret_key = 'password'
minio_end_point = 'http://127.0.0.1:9000'

spark = spark_local_to_minio(icb_catalog_name,
                             iceberg_warehouse,
                             storage_type,
                             pg_user,
                             pg_password,
                             minio_bucket,
                             minio_access_key,
                             minio_secret_key,
                             minio_end_point)


In [None]:
# Creates A Test Table (Within MinIO) Into iceberg-warehouse-pg Warehouse
# *NOTE*: Before this is run, you won't be able to see the ICB WH in the UI.

spark.sql(f"""CREATE OR REPLACE TABLE {icb_catalog_name}.TEST_SCHEMA.TEST_TABLE_MINIO_PG (
             FIELD_1 BIGINT,
             FIELD_2 varchar(50),
             FIELD_3 DATE,
             FIELD_4 DOUBLE,
             FIELD_5 TIMESTAMP
             )
             USING iceberg
             """)

# Display Tables Created In The PG Catalog (TEST_SCHEMA)
spark.sql(f'SHOW TABLES IN {icb_catalog_name}.TEST_SCHEMA').show(truncate=False)

# select * from iceberg_warehouse_pg.public.iceberg_tables;

### 2. Local to S3 - Using Hadoop Catalog

In [None]:
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession
#from pyspark.sql.types import *

def spark_local_to_s3(icb_catalog_name,
                      iceberg_warehouse,
                      storage_type,
                      s3_bucket,
                      s3_access_key,
                      s3_secret_key):
    
    
    os.environ.update({'AWS_ACCESS_KEY_ID': s3_access_key,
                       'AWS_SECRET_ACCESS_KEY': s3_secret_key
                   #   'AWS_SESSION_TOKEN': s3_session_token
                      })
    
    conf = (
            SparkConf()
            .setAppName('spark_local_to_s3')
            #packages
            .set('spark.jars.packages', DEPENDENCIES)
            #SQL Extensions
            .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
            #Configuring Catalog
            .set(f'spark.sql.catalog.{icb_catalog_name}', 'org.apache.iceberg.spark.SparkCatalog')
            .set(f'spark.sql.catalog.{icb_catalog_name}.type', 'hadoop')
            .set(f'spark.sql.catalog.{icb_catalog_name}.warehouse', f's3a://{s3_bucket}/{iceberg_warehouse}/{storage_type}/')
            .set(f'spark.sql.catalog.{icb_catalog_name}.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
    )

    ## Start Spark Session
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
    print("Spark Session Running")
    
    return spark

############################################
icb_catalog_name = 'hadoop_catalog'
iceberg_warehouse = 'iceberg-warehouse-dev-hdp'
storage_type = 'data-archives'

s3_bucket = 'iceberg-bucket-9004'
s3_access_key='XXXXX'
s3_secret_key='XXXXX'

spark = spark_local_to_s3(icb_catalog_name,
                          iceberg_warehouse,
                          storage_type,
                          s3_bucket,
                          s3_access_key,
                          s3_secret_key)

In [None]:
# Same as for method 1: unless this is run, IVB WH won't be displayed within the S3 Bucket
spark.sql(f"""CREATE OR REPLACE TABLE {icb_catalog_name}.TEST_SCHEMA.TEST_TABLE_EMR_S3_HDP (
             FIELD_1 BIGINT,
             FIELD_2 varchar(50),
             FIELD_3 DATE,
             FIELD_4 DOUBLE,
             FIELD_5 TIMESTAMP
             )
             USING iceberg
             """)

spark.sql(f'SHOW TABLES IN {icb_catalog_name}.TEST_SCHEMA').show(truncate=False)

### 3. Local to S3 - Deploy Spark App To EMR via CLI

In [None]:
# Set Up Credentials For aws_personal profile
nano ~/.aws/credentials
# structure
# [aws_personal]
# aws_access_key_id=xxxxxx
# aws_secret_access_key=xxxxxx
# region=xxxxxx

# Confirm That EMR Cluster Exists & Waiting
aws emr list-clusters --profile aws_personal

# Add Step To AWS EMR Cluster Via CLI 
# Replace j-xxxxxxxxxxx with Cluster ID
aws emr add-steps --profile aws_personal --cluster-id j-xxxxxxxxxxx \ 
--steps '[{
  "Args":["spark-submit",
    "--deploy-mode","client",
    "--jars","s3://iceberg-bucket-9004/jars/hadoop-aws-3.3.4.jar,s3://iceberg-bucket-9004/jars/bundle-2.20.18.jar,s3://iceberg-bucket-9004/jars/aws-java-sdk-bundle-1.11.901.jar,s3://iceberg-bucket-9004/jars/iceberg-spark-runtime-3.3_2.12-1.3.1.jar",
    "--conf","spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog",
    "--conf","spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "--conf","spark.sql.catalog.hadoop_catalog.type=hadoop",
    "--conf","spark.sql.catalog.hadoop_catalog.warehouse=s3://iceberg-bucket-9004/iceberg-warehouse-emr/data-archives/",
    "--conf","spark.sql.catalog.hadoop_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO",
    "s3://iceberg-bucket-9004/scripts/create_iceberg_wh_app.py"],
  "Type":"CUSTOM_JAR",
  "ActionOnFailure":"CONTINUE",
  "Jar":"command-runner.jar",
  "Properties":"",
  "Name":"create_iceberg_wh_via_emr_cli"
}]'

### 4. Prod to S3 - Deploy Spark App To EMR via Airflow

In [None]:
# The Spark Job, deployed to EMR via Airflow, will create an ICB WH, based on JSON Config Below
{
    "local_conf":{
       "local_sub_folder":"/assets/",
       "files_to_upload":["create_iceberg_wh_app.py"]
    },
    "s3_conf":{
       "bucket_name":"iceberg-bucket-9004",
       "s3_scripts_path":"scripts/"
    },
    "spark_submit_cmd":{
       "cmd":"[\"spark-submit\", \"--deploy-mode\", \"client\"]",
       "pyspark_exec":"scripts/create_iceberg_wh_app.py"
    },
    "spark_conf":{
         "spark.sql.catalog.hadoop_catalog": "org.apache.iceberg.spark.SparkCatalog",
         "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
         "spark.sql.catalog.hadoop_catalog.type": "hadoop",
         "spark.sql.catalog.hadoop_catalog.warehouse": "s3a://iceberg-bucket-9004/iceberg-warehouse-prod-hdp/data-archives/",
         "spark.sql.catalog.hadoop_catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO"
    },
    "spark_jars_conf":{
       "bucket_prefix":"s3://iceberg-bucket-9004/",
       "bucket_subfolder":"jars/"
    },
    "spark_jars_conf_value":[
         "hadoop-aws-3.3.4.jar",
         "bundle-2.20.18.jar",
         "aws-java-sdk-bundle-1.11.901.jar",
         "iceberg-spark-runtime-3.3_2.12-1.3.1.jar"
    ]
 }

 