In [9]:
import os
import sys
import time
import logging
import traceback
from pyspark.sql import SparkSession

In [10]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
    stream=sys.stdout,
)
logger = logging.getLogger("mongo_to_iceberg_nb")

In [11]:
MONGO_HOST = os.getenv("MONGO_HOST", "mongo")
MONGO_URI = (
    f"mongodb://mongo_user:mongo_pass@{MONGO_HOST}:27017/"
    "airflow_db?authSource=admin"
)

ICEBERG_WAREHOUSE = os.getenv(
    "ICEBERG_WAREHOUSE",
    "s3a://promotionengine-search/warehouse"
)

# ⚠️ Iceberg + Spark must use v1
NESSIE_URI = os.getenv("NESSIE_URI", "http://nessie:19120/api/v1")
NESSIE_REF = os.getenv("NESSIE_REF", "main")

JAR_DIR = os.getenv("JAR_DIR", "/home/jovyan/jars")

In [12]:
JARS = [
    os.path.join(JAR_DIR, "iceberg-spark-runtime-3.4_2.12-1.5.2.jar"),
    os.path.join(JAR_DIR, "iceberg-nessie-1.5.2.jar"),
    os.path.join(JAR_DIR, "nessie-client-0.99.0.jar"),
    os.path.join(JAR_DIR, "nessie-spark-extensions-3.4_2.12-0.105.7.jar"),
    os.path.join(JAR_DIR, "hadoop-aws-3.3.4.jar"),
    os.path.join(JAR_DIR, "aws-java-sdk-bundle-1.12.772.jar"),
    os.path.join(JAR_DIR, "mongo-spark-connector_2.12-10.1.1.jar"),
    os.path.join(JAR_DIR, "mongodb-driver-core-4.11.2.jar"),
    os.path.join(JAR_DIR, "mongodb-driver-sync-4.11.2.jar"),
    os.path.join(JAR_DIR, "bson-4.11.2.jar"),
    os.path.join(JAR_DIR, "iceberg-aws-bundle-1.5.2.jar"),
]

NAMESPACE = "sales"
TABLE = "mongo_orders"
TABLE_IDENT = f"nessie.{NAMESPACE}.{TABLE}"

In [13]:
def now():
    return time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())

In [14]:
def check_jars():
    logger.info("Checking Spark JARs...")
    missing = []
    for jar in JARS:
        exists = os.path.exists(jar)
        logger.info("JAR=%s exists=%s", jar, exists)
        if not exists:
            missing.append(jar)
    return missing

In [15]:
def create_spark():
    logger.info("Creating SparkSession")

    spark = (
        SparkSession.builder
        .appName("MONGO_TO_ICEBERG_NOTEBOOK")
        .config("spark.jars", ",".join(JARS))

        # Iceberg + Nessie
        .config(
            "spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        )
        .config("spark.sql.defaultCatalog", "nessie")
        .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
        .config(
            "spark.sql.catalog.nessie.catalog-impl",
            "org.apache.iceberg.nessie.NessieCatalog",
        )
        .config("spark.sql.catalog.nessie.uri", NESSIE_URI)
        .config("spark.sql.catalog.nessie.ref", NESSIE_REF)
        .config("spark.sql.catalog.nessie.warehouse", ICEBERG_WAREHOUSE)
        .config("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")

        # AWS creds picked automatically from env
        .config(
            "spark.hadoop.fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
        )

        # MongoDB
        .config("spark.mongodb.read.connection.uri", MONGO_URI)
        .config("spark.mongodb.write.connection.uri", MONGO_URI)

        .getOrCreate()
    )

    logger.info("Spark version=%s", spark.version)
    return spark

In [16]:
def main():
    logger.info("========== JOB STARTED at %s ==========", now())

    missing = check_jars()
    if missing:
        raise RuntimeError(f"Missing JARs: {missing}")

    spark = create_spark()

    try:
        logger.info("Reading from MongoDB...")
        df = (
            spark.read
            .format("mongodb")
            .option("database", "airflow_db")
            .option("collection", "orders")
            .load()
        )

        row_count = df.count()
        logger.info("MongoDB row count=%d", row_count)
        df.printSchema()

        if row_count == 0:
            raise RuntimeError("MongoDB collection is EMPTY")

        logger.info("Ensuring namespace exists: %s", NAMESPACE)
        spark.sql(f"CREATE NAMESPACE IF NOT EXISTS nessie.{NAMESPACE}")

        logger.info("Writing to Iceberg table: %s", TABLE_IDENT)
        df.writeTo(TABLE_IDENT).createOrReplace()

        logger.info("Verifying Iceberg read...")
        result = spark.sql(f"SELECT COUNT(*) AS cnt FROM {TABLE_IDENT}").collect()
        logger.info("Verification COUNT=%s", result)

        logger.info("Stopping Spark")
        spark.stop()

        logger.info("========== JOB SUCCESS at %s ==========", now())

    except Exception as e:
        logger.error("JOB FAILED: %s", str(e))
        logger.error("TRACEBACK:\n%s", traceback.format_exc())
        raise

main()


2025-12-23 10:16:19,794 | INFO | mongo_to_iceberg_nb | Checking Spark JARs...
2025-12-23 10:16:19,799 | INFO | mongo_to_iceberg_nb | JAR=/home/jovyan/jars/iceberg-spark-runtime-3.4_2.12-1.5.2.jar exists=True
2025-12-23 10:16:19,803 | INFO | mongo_to_iceberg_nb | JAR=/home/jovyan/jars/iceberg-nessie-1.5.2.jar exists=True
2025-12-23 10:16:19,809 | INFO | mongo_to_iceberg_nb | JAR=/home/jovyan/jars/nessie-client-0.99.0.jar exists=True
2025-12-23 10:16:19,814 | INFO | mongo_to_iceberg_nb | JAR=/home/jovyan/jars/nessie-spark-extensions-3.4_2.12-0.105.7.jar exists=True
2025-12-23 10:16:19,818 | INFO | mongo_to_iceberg_nb | JAR=/home/jovyan/jars/hadoop-aws-3.3.4.jar exists=True
2025-12-23 10:16:19,823 | INFO | mongo_to_iceberg_nb | JAR=/home/jovyan/jars/aws-java-sdk-bundle-1.12.772.jar exists=True
2025-12-23 10:16:19,826 | INFO | mongo_to_iceberg_nb | JAR=/home/jovyan/jars/mongo-spark-connector_2.12-10.1.1.jar exists=True
2025-12-23 10:16:19,829 | INFO | mongo_to_iceberg_nb | JAR=/home/jovyan