In [1]:
import logging
import sys
from datetime import datetime

from pyspark.sql import SparkSession

# Logging configuration
formatter = logging.Formatter('[%(asctime)s] %(levelname)s @ line %(lineno)d: %(message)s')
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(handler)


# Application-specific variables
dt_string = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
AppName = "EmployeeDataS3TableJob"

# Replace S3_BUCKET and ACCOUNT_NUMBER with your own values
input_csv_path = "s3a://<S3_BUCKET>/s3table-example/input/"
s3table_arn = "arn:aws:s3tables:us-west-2:<ACCOUNT_NUMBER>:bucket/doeks-spark-s3-tables"
namespace = "doeks_namespace"
table_name = "employee_s3_table"
full_table_name = f"s3tablesbucket.{namespace}.{table_name}"


In [2]:

spark = (SparkSession
    .builder
    .appName(f"{AppName}_{dt_string}")
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog")
    .config("spark.sql.catalog.s3tablesbucket.warehouse", s3table_arn)
    .config('spark.hadoop.fs.s3.impl', "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.sql.defaultCatalog", "s3tablesbucket")
    .config("spark.hadoop.fs.s3a.connection.timeout", "1200000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.maximum", "200") \
    .config("spark.hadoop.fs.s3a.fast.upload", "true") \
    .config("spark.hadoop.fs.s3a.readahead.range", "256K") \
    .config("spark.hadoop.fs.s3a.input.fadvise", "random") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider.mapping", "com.amazonaws.auth.WebIdentityTokenCredentialsProvider=software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider")
    .getOrCreate())

spark.sparkContext.setLogLevel("DEBUG")
logger.info("Spark session initialized successfully")


[2025-01-13 19:47:18,917] INFO @ line 22: Spark session initialized successfully


In [3]:
namespace = "doeks_namespace"
table_name = "employee_s3_table"
full_table_name = f"s3tablesbucket.{namespace}.{table_name}"

# Step 1: Create namespace if not exists
logger.info(f"Creating namespace: {namespace}")
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.{namespace}")

# Step 2: Read input CSV data
logger.info(f"Reading employee data from input CSV: {input_csv_path}")
employee_df = spark.read.csv(input_csv_path, header=True, inferSchema=True)

logger.info("Previewing employee data schema")
employee_df.printSchema()

logger.info("Previewing first 10 records from the input data")
employee_df.show(10, truncate=False)

logger.info("Source data count:")
employee_df.count()



[2025-01-13 19:47:18,923] INFO @ line 6: Creating namespace: doeks_namespace
[2025-01-13 19:47:21,093] INFO @ line 10: Reading employee data from input CSV: s3a://<S3_BUCKET>/s3table-example/input/
[2025-01-13 19:47:24,560] INFO @ line 13: Previewing employee data schema
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- level: string (nullable = true)
 |-- salary: double (nullable = true)

[2025-01-13 19:47:24,564] INFO @ line 16: Previewing first 10 records from the input data
+---+-----------+------+--------+
|id |name       |level |salary  |
+---+-----------+------+--------+
|1  |Employee_1 |Exec  |101000.0|
|2  |Employee_2 |Exec  |149000.0|
|3  |Employee_3 |Junior|86000.0 |
|4  |Employee_4 |Exec  |147500.0|
|5  |Employee_5 |Exec  |74000.0 |
|6  |Employee_6 |Exec  |66500.0 |
|7  |Employee_7 |Junior|69500.0 |
|8  |Employee_8 |Exec  |116000.0|
|9  |Employee_9 |Mid   |56000.0 |
|10 |Employee_10|Exec  |186500.0|
+---+-----------+------+--------+
only showi

100

In [4]:
# Step 3: Create or replace table and write data in one operation
logger.info(f"Creating/Replacing and writing data to table: {full_table_name}")
(employee_df.writeTo(full_table_name)
            .using("iceberg")
            .createOrReplace())

# Step 4: Read data back from the Iceberg table
logger.info(f"Reading data back from Iceberg table: {full_table_name}")
iceberg_data_df = spark.read.format("iceberg").load(full_table_name)

logger.info("Previewing first 10 records from the Iceberg table")
iceberg_data_df.show(10, truncate=False)

# Count records using both DataFrame API and SQL
logger.info("Total records in Iceberg table (DataFrame API):")
print(f"DataFrame count: {iceberg_data_df.count()}")

# List the table snapshots
logger.info("List the s3table snapshot versions:")
spark.sql(f"SELECT * FROM {full_table_name}.history LIMIT 10").show()

# Stop Spark session
logger.info("Stopping Spark Session")
spark.stop()

[2025-01-13 19:47:25,207] INFO @ line 2: Creating/Replacing and writing data to table: s3tablesbucket.doeks_namespace.employee_s3_table
[2025-01-13 19:47:27,861] INFO @ line 8: Reading data back from Iceberg table: s3tablesbucket.doeks_namespace.employee_s3_table
[2025-01-13 19:47:28,079] INFO @ line 11: Previewing first 10 records from the Iceberg table
+---+-----------+------+--------+
|id |name       |level |salary  |
+---+-----------+------+--------+
|1  |Employee_1 |Exec  |101000.0|
|2  |Employee_2 |Exec  |149000.0|
|3  |Employee_3 |Junior|86000.0 |
|4  |Employee_4 |Exec  |147500.0|
|5  |Employee_5 |Exec  |74000.0 |
|6  |Employee_6 |Exec  |66500.0 |
|7  |Employee_7 |Junior|69500.0 |
|8  |Employee_8 |Exec  |116000.0|
|9  |Employee_9 |Mid   |56000.0 |
|10 |Employee_10|Exec  |186500.0|
+---+-----------+------+--------+
only showing top 10 rows

[2025-01-13 19:47:28,794] INFO @ line 15: Total records in Iceberg table (DataFrame API):
DataFrame count: 100
[2025-01-13 19:47:29,153] INFO