# bronze.ipynb
Ingests data from the raw folder into bronze iceberg tables

In [1]:
from pyspark.sql import SparkSession
from spark_config import spark_config_bronze, spark_config_minio

In [2]:
# Stop any existing spark sessions, from previous jupyter runs
spark = SparkSession.builder.getOrCreate()
spark.stop()

# Create a new spark session
builder = SparkSession.builder
builder.appName("bronze")
builder.master("spark://spark-master:7077")

# Apply common spark configs, for the bronze catalog and access to minio
spark_config_minio(builder)
spark_config_bronze(builder)

spark = builder.getOrCreate()

In [3]:
spark.sql(f"""
    CREATE SCHEMA IF NOT EXISTS bronze.data_platform_example
    LOCATION 's3a://bronze/data_platform_example'
""")

DataFrame[]

In [4]:
df = spark.read \
        .option("recursiveFileLookup", "true") \
        .json("s3a://raw/page_load/v1/")

df.printSchema()
df.show()

25/02/27 07:09:28 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

root
 |-- metadata: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |    |-- version: string (nullable = true)
 |-- payload: struct (nullable = true)
 |    |-- browser: string (nullable = true)
 |    |-- page: string (nullable = true)
 |    |-- user_name: string (nullable = true)

+--------------------+--------------------+
|            metadata|             payload|
+--------------------+--------------------+
|{page_load, 2025-...|{Chrome, /contact...|
|{page_load, 2025-...|{NULL, /cart, Tyr...|
|{page_load, 2025-...|{Safari, /home, A...|
|{page_load, 2025-...|{Safari, /contact...|
|{page_load, 2025-...|{Firefox, /home, ...|
|{page_load, 2025-...|{Safari, /product...|
|{page_load, 2025-...|{Edge, /home, Joh...|
|{page_load, 2025-...|{Safari, /home, D...|
|{page_load, 2025-...|{Safari, /home, C...|
|{page_load, 2025-...|{Edge, /contact, ...|
|{page_load, 2025-...|{Edge, /cart, Den...|
|{page_load, 2025-...|{NULL, /home, C

In [6]:
# "bronze" is the name of our catalog, as configured at the top of the file
# "example" is the name of the database / schema, used for domain separation
# "foobar" is the name of the iceberg table itself
#
table_name = "bronze.data_platform_example.page_load_v1"

table_exists = spark.catalog.tableExists(table_name)
if not table_exists:
    print("Table does not exist, creating new table")
    # If this is our first run ever, the table won't exist, and we need to create it
    df.write.format('iceberg').saveAsTable(table_name)
else:
    # On further runs, we don't want to overwrite the whole table, just add to it
    # because this job is meant to be run incrementally.
    print("Table exists, appending to existing table")
    df.write.format('iceberg').mode('append').saveAsTable(table_name)


Table exists, appending to existing table


                                                                                

In [6]:
spark.stop()