In [1]:
import os
import datetime
import pytz
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Unset SPARK_HOME to ensure the pyspark library uses its own bundled Spark runtime,
# preventing conflicts with any globally installed Spark versions.
# Please restart the Jupyter kernel for this change to take effect.
os.environ.pop('SPARK_HOME', None)

## DEFINE VARIABLES
CATALOG_URI = "http://nessie:19120/api/v2"
WAREHOUSE = "s3://lakehouse/"

POSTGRES_DB = os.environ.get("POSTGRES_DB")
POSTGRES_JDBC_URL = f"jdbc:postgresql://postgres:5432/{POSTGRES_DB}"
POSTGRES_USER = os.environ.get("POSTGRES_USER")
POSTGRES_PASSWORD = os.environ.get("POSTGRES_PASSWORD")

MYSQL_DB = os.environ.get("MYSQL_DB")
MYSQL_JDBC_URL = f"jdbc:mysql://mysql:3306/{MYSQL_DB}"
MYSQL_USER = os.environ.get("MYSQL_USER")
MYSQL_PASSWORD = os.environ.get("MYSQL_PASSWORD")

STORAGE_URI = "http://minio:9000"
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")
AWS_REGION = os.environ.get("AWS_REGION", "us-east-1")

## CONFIGURE SPARK SESSION
conf = (
    pyspark.SparkConf()
    .setAppName('Iceberg Ingestion')
    .set('spark.jars.packages',
         'org.postgresql:postgresql:42.7.3,'
         'mysql:mysql-connector-java:8.0.33,'
         'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,'
         'org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.77.1,'
         'software.amazon.awssdk:bundle:2.24.8,'
         'software.amazon.awssdk:url-connection-client:2.24.8')
    .set('spark.sql.extensions',
         'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,'
         'org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
    .set('spark.sql.catalog.iceberg', 'org.apache.iceberg.spark.SparkCatalog')
    .set('spark.sql.catalog.iceberg.uri', CATALOG_URI)
    .set('spark.sql.catalog.iceberg.ref', 'main')
    .set('spark.sql.catalog.iceberg.authentication.type', 'NONE')
    .set('spark.sql.catalog.iceberg.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
    .set('spark.sql.catalog.iceberg.warehouse', WAREHOUSE)
    .set('spark.sql.catalog.iceberg.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
    .set('spark.sql.catalog.iceberg.client.region', AWS_REGION)
    .set('spark.sql.catalog.iceberg.s3.endpoint', STORAGE_URI)
    .set('spark.sql.catalog.iceberg.s3.access-key-id', AWS_ACCESS_KEY)
    .set('spark.sql.catalog.iceberg.s3.secret-access-key', AWS_SECRET_KEY)
    .set('spark.sql.catalog.iceberg.s3.path-style-access', 'true')
    .set("spark.hadoop.fs.s3a.endpoint", STORAGE_URI)
    .set("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY)
    .set("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_KEY)
    .set("spark.hadoop.fs.s3a.path.style.access", "true")
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
)

## START SPARK SESSION
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running @ " + datetime.datetime.now(pytz.timezone('US/Eastern')).strftime("%Y-%m-%d %H:%M:%S"))


Spark Running @ 2026-01-11 23:02:17


In [3]:
 # Define the JDBC connection properties
# properties = {
#     "user": POSTGRES_USER,
#     "password": POSTGRES_PASSWORD,
#     "driver": "org.postgresql.Driver"
# }

properties = {
    "user": MYSQL_USER,
    "password": MYSQL_PASSWORD,
    "driver": "com.mysql.cj.jdbc.Driver"
}

# Read the sales_data table from RDBMS into a Spark DataFrame
transactions_df = spark.read.jdbc(url=MYSQL_JDBC_URL, table="poc.transactions", properties=properties)

# Show the first few rows of the dataset
transactions_df.show()

+-------+----------------+----------------+-----------+----------------+------+
|     id|transaction_date|transaction_type|posted_date|     description|amount|
+-------+----------------+----------------+-----------+----------------+------+
|txn_001|      2024-01-01|          credit| 2024-01-01| Initial Deposit| 500.0|
|txn_002|      2024-01-02|           debit| 2024-01-03|     Coffee Shop|   4.0|
|txn_003|      2024-01-05|           debit| 2024-01-06|Online Bookstore| 20.99|
+-------+----------------+----------------+-----------+----------------+------+



In [4]:
# Manipulate the data
# Multiply each amount by 2
transactions_df = transactions_df.withColumn("amount", col("amount") * 2)

# Create a namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.poc;")

# Write the DataFrame to an Iceberg table in the Nessie catalog
transactions_df.writeTo("iceberg.poc.transactions").createOrReplace()

# Verify that the data was written to Iceberg by reading the table
spark.read.table("iceberg.poc.transactions").show()

+-------+----------------+----------------+-----------+----------------+------+
|     id|transaction_date|transaction_type|posted_date|     description|amount|
+-------+----------------+----------------+-----------+----------------+------+
|txn_001|      2024-01-01|          credit| 2024-01-01| Initial Deposit|1000.0|
|txn_002|      2024-01-02|           debit| 2024-01-03|     Coffee Shop|   8.0|
|txn_003|      2024-01-05|           debit| 2024-01-06|Online Bookstore| 41.98|
+-------+----------------+----------------+-----------+----------------+------+

