In [1]:
import os, sys
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

logger.info("Application started successfully.")

2025-06-30 06:51:53,100 - INFO - Application started successfully.


In [2]:
from pyspark.sql.functions import lit, current_timestamp
from datetime import datetime, timedelta

In [3]:
# load environment variables

NESSIE_URI = os.environ['NESSIE_URI']
REF = "etl"
FULL_PATH_TO_WAREHOUSE = os.environ['WAREHOUSE']
AWS_S3_ENDPOINT = os.environ["AWS_S3_ENDPOINT"]
AWS_ACCESS_KEY = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_KEY  = os.environ['AWS_SECRET_ACCESS_KEY']

In [4]:
# set pyspark configuration
from pyspark import SparkConf

conf = SparkConf() \
    .setAppName("process_dim_adventure_works") \
    .setMaster("local[*]") \
    .set("spark.executor.memory", "4g") \
    .set("spark.driver.memory", "2g") \
    .set("spark.sql.catalog.defaultCatalog", "nessie") \
    .set("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog") \
    .set("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog") \
    .set("spark.sql.catalog.nessie.uri", NESSIE_URI) \
    .set("spark.sql.catalog.nessie.ref", REF) \
    .set("spark.sql.catalog.nessie.authentication.type", "NONE") \
    .set("spark.sql.catalog.nessie.s3.endpoint", AWS_S3_ENDPOINT) \
    .set("spark.sql.catalog.nessie.s3.path-style-access", "true") \
    .set("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .set("spark.sql.catalog.nessie.warehouse", FULL_PATH_TO_WAREHOUSE)

# set MinIO config
conf.set("fs.s3a.access.key", AWS_ACCESS_KEY) 
conf.set("fs.s3a.secret.key", AWS_SECRET_KEY) 
conf.set("fs.s3a.endpoint", AWS_S3_ENDPOINT) 
conf.set("fs.s3a.connection.ssl.enabled", "false") 
conf.set("fs.s3a.path.style.access", "true") 

<pyspark.conf.SparkConf at 0x7f4a02b690c0>

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config(conf=conf).getOrCreate()
logger.info("Spark session started with Nessie and Iceberg configuration.")

25/06/30 06:51:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
2025-06-30 06:51:54,791 - INFO - Spark session started with Nessie and Iceberg configuration.


In [6]:
# create bronze in nessie catalog
spark.sql(f"USE REFERENCE main IN nessie")
namespace = "bronze"
df = spark.sql("SHOW NAMESPACES in nessie")
namespace_exists = df.filter(df.namespace == namespace).count() > 0

logger.info(f"namespace {namespace} exists: {namespace_exists}")
if not namespace_exists:
    spark.sql(f"CREATE NAMESPACE nessie.{namespace}")

spark.sql("SHOW NAMESPACES in nessie").show()


25/06/30 06:51:57 WARN S3FileIO: Unclosed S3FileIO instance created by:
	org.apache.iceberg.aws.s3.S3FileIO.initialize(S3FileIO.java:444)
	org.apache.iceberg.CatalogUtil.loadFileIO(CatalogUtil.java:402)
	org.apache.iceberg.CatalogUtil.loadFileIO(CatalogUtil.java:349)
	org.apache.iceberg.nessie.NessieCatalog.initialize(NessieCatalog.java:132)
	org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:277)
	org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:331)
	org.apache.iceberg.spark.SparkCatalog.buildIcebergCatalog(SparkCatalog.java:153)
	org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:752)
	org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:65)
	org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:54)
	scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
	org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:54)
	org.apache.spark.sql.connector.

+---------+
|namespace|
+---------+
|   bronze|
+---------+



In [7]:
# create etl branch
spark.sql(f"DROP BRANCH IF EXISTS {REF}")
spark.sql(f"CREATE BRANCH {REF} IN nessie FROM main")
spark.sql(f"LIST REFERENCES IN nessie")


DataFrame[refType: string, name: string, hash: string]

In [8]:
spark.sql(f"USE nessie")
spark.sql(f"USE REFERENCE {REF} IN nessie")

DataFrame[refType: string, name: string, hash: string]

In [9]:
import toml

config_file = os.path.join(os.getcwd(), "notebooks/lib/bronze_adventure_works.toml")
print(config_file)
with open(config_file, "r") as f:
    config = toml.load(f)

/home/iceberg/notebooks/notebooks/lib/bronze_adventure_works.toml


In [10]:
for item in [item for item in config if config[item]["ingestion"]["enabled"]]:
    logger.info(f"Processing {item}...")
    source = os.path.join(FULL_PATH_TO_WAREHOUSE, config[item]["ingestion"]["source"])
    target_table = namespace + "." + config[item]["ingestion"]["target_table"].upper()
    column_mapping = config[item]["column_mapping"]
    logger.info(f"Processing Bronze | Source: {source}, Target Table: {target_table}")

    # Read the data from the source
    df = spark.read \
        .option("header", True) \
        .csv(source)
    logger.info(f"Data read from {source} with schema: {df.schema}")
    
    # Rename columns based on the mapping, handling comma-separated source columns
    for target_col, source_cols in column_mapping.items():
        source_col_list = [col.strip() for col in source_cols.split(",")]
        for src in source_col_list:
            if src in df.columns:
                df = df.withColumnRenamed(src, target_col)
                break  # Stop after the first match

    # Check if the target table exists
    if not spark.catalog.tableExists(f"nessie.{target_table}"):
        logger.info(f"Table {target_table} does not exist. Creating it.")
        # Create the table with the specified schema and partitioning
        df.writeTo(f"nessie.{target_table}") \
            .using("iceberg") \
            .partitionedBy("ingest_date") \
            .createOrReplace()
        logger.info(f"Table {target_table} created successfully.")
    else:
        logger.info(f"Table {target_table} already exists.")
        # If the table exists, check for new columns and add them if necessary
        existing_cols = set(spark.table(f"nessie.{target_table}").columns)
        df_cols = set(df.columns)
        
        # reorder df columns to match the target table schema
        missing_cols = existing_cols - df_cols
        for col in missing_cols:
            df = df.withColumn(col, lit(None).cast(dict(spark.table(f"nessie.{target_table}").dtypes)[col]))
            logger.info(f"Added missing column {col} with null values to DataFrame")
        
        # Overwrite only the partitions present in the DataFrame (by ingest_date)
        df.writeTo(f"nessie.{target_table}") \
            .using("iceberg") \
            .overwritePartitions()
        
    # enable iceberg garbage collection
    spark.sql(f"""
        ALTER TABLE nessie.{target_table} SET TBLPROPERTIES ('gc.enabled'='true')
    """)
    
    # Data retention: expire old snapshots and remove orphan files
    expire_ts = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%dT%H:%M:%S")
    orphan_ts = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%S")
    
    logger.info(f'expiring old snapshots for {target_table}')
    spark.sql(f"""
        CALL nessie.system.expire_snapshots(
            table => '{target_table}',
            older_than => TIMESTAMP '{expire_ts}',
            retain_last => 1
        )
    """)
    logger.info(f'removing orphan files for {target_table}')
    spark.sql(f"""
        CALL nessie.system.remove_orphan_files(
            table => '{target_table}',
            older_than => TIMESTAMP '{orphan_ts}'
        )
    """)
        
    # disable iceberg garbage collection
    spark.sql(f"""
        ALTER TABLE nessie.{target_table} SET TBLPROPERTIES ('gc.enabled'='false')
    """)
    



2025-06-30 06:52:02,456 - INFO - Processing bronze_product...
2025-06-30 06:52:02,458 - INFO - Processing Bronze | Source: s3a://warehouse/staging/adventure_works/product.csv, Target Table: bronze.PRODUCT
25/06/30 06:52:02 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/06/30 06:52:03 WARN S3FileIO: Unclosed S3FileIO instance created by:
	org.apache.iceberg.aws.s3.S3FileIO.initialize(S3FileIO.java:444)
	org.apache.iceberg.CatalogUtil.loadFileIO(CatalogUtil.java:402)
	org.apache.iceberg.CatalogUtil.loadFileIO(CatalogUtil.java:349)
	org.apache.iceberg.nessie.NessieCatalog.initialize(NessieCatalog.java:132)
	org.apache.iceberg.CatalogUtil.loadCatalog(CatalogUtil.java:277)
	org.apache.iceberg.CatalogUtil.buildIcebergCatalog(CatalogUtil.java:331)
	org.apache.iceberg.spark.SparkCatalog.buildIcebergCatalog(SparkCatalog.java:153)
	org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:752)
	org.apache.s

In [11]:
spark.sql("LIST REFERENCES IN nessie").show()

+-------+----+--------------------+
|refType|name|                hash|
+-------+----+--------------------+
| Branch| etl|bf7e24b9020fb1c5f...|
| Branch|main|0b2b7d5a8140efd80...|
+-------+----+--------------------+



In [12]:
spark.sql(f"MERGE BRANCH {REF} INTO main IN nessie").show()
spark.sql(f"DROP BRANCH {REF} IN nessie").show()

+----+--------------------+
|name|                hash|
+----+--------------------+
|main|8c17b6e462babdc50...|
+----+--------------------+

+------+
|status|
+------+
|    OK|
+------+



In [13]:
spark.sql("USE REFERENCE main IN nessie")
spark.sql("LIST REFERENCES").show()

+-------+----+--------------------+
|refType|name|                hash|
+-------+----+--------------------+
| Branch|main|8c17b6e462babdc50...|
+-------+----+--------------------+



In [14]:
spark.stop()
logger.info("Spark session stopped.")

2025-06-30 06:53:19,398 - INFO - Spark session stopped.
