# Raw to Bronze Demo Notebook

## 1. Configure Env

In [None]:
%idle_timeout 60
%glue_version 4.0
%worker_type G.1X
%number_of_workers 2
%additional_python_modules 
%extra_jars 
%%configure
{
    "--datalake-formats": "iceberg",
    "--conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.warehouse=s3://instacart-data-eng-project/iceberg-warehouse/"
}

In [None]:
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

print("Spark version:", spark.version)
print("Initialization complete!")

In [None]:
BUCKET_NAME = "instacart-data-eng-project"
CATALOG_NAME = "glue_catalog"
DATABASE_NAME = "bronze"
TABLE_NAME = "orders"
raw_path = f"s3://{BUCKET_NAME}/raw/{TABLE_NAME}/"

## 2. Create Glue database (Bronze & Silver)

In [None]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS {CATALOG_NAME}.{DATABASE_NAME} LOCATION 's3://{BUCKET_NAME}/{DATABASE_NAME}/'")
spark.sql(f"SHOW DATABASES IN {CATALOG_NAME}").show()

## 3. Read from raw data

In [None]:
df = spark.read.option("header", "true").option("inferSchema", "true").csv(raw_path)
df.show(5)
df.printSchema()

In [None]:
record_count = df.count()
print(f"Read {record_count} records from orders table")

## 4. Add audit columns

In [None]:
df = df.withColumn("ingest_timestamp", F.current_timestamp()) \
    .withColumn("source_file", F.input_file_name())
df.show(5)

## 5. Write the raw data to bronze iceberg table (full refresh)

In [None]:
full_table_name = f"{CATALOG_NAME}.{DATABASE_NAME}.{TABLE_NAME}"
table_location = f"s3://{BUCKET_NAME}/{DATABASE_NAME}/{TABLE_NAME}"
    
df.writeTo(full_table_name) \
    .option("check-ordering", "false") \
    .tableProperty("format-version", "2") \
    .tableProperty("location", table_location) \
    .createOrReplace()

print(f"Data written to Iceberg table {full_table_name} at location {table_location}")

In [None]:
spark.table(f"{CATALOG_NAME}.{DATABASE_NAME}.{TABLE_NAME}").show()

In [None]:
spark.sql(f"DESCRIBE EXTENDED {CATALOG_NAME}.{DATABASE_NAME}.{TABLE_NAME}").show(100, truncate=False)

## 6. Write the raw data to bronze iceberg table (incremental load)

In [None]:
new_orders = [
    (100, 11111, "prior", 1, 3, 18, 2.0),
    (101, 22222, "prior", 2, 4, 19, 5.0),
]

orders_schema = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("user_id", IntegerType(), True),
    StructField("eval_set", StringType(), True),
    StructField("order_number", IntegerType(), True),
    StructField("order_dow", IntegerType(), True),
    StructField("order_hour_of_day", IntegerType(), True),
    StructField("days_since_prior_order", FloatType(), True),
])

new_orders_df = spark.createDataFrame(new_orders, orders_schema)

new_orders_df.coalesce(1).write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(f"{raw_path}/demo/")

In [None]:
df_new = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{raw_path}/demo/")
df_new.show(5)
df_new.printSchema()

In [None]:
spark.sql(f"SELECT COUNT(*) AS total_records FROM {full_table_name}").show()

In [None]:
full_table_name = f"{CATALOG_NAME}.{DATABASE_NAME}.{TABLE_NAME}"
    
df_new.withColumn("ingest_timestamp", F.current_timestamp()) \
    .withColumn("source_file", F.input_file_name()) \
    .writeTo(full_table_name) \
    .option("check-ordering", "false") \
    .append()

In [None]:
spark.sql(f"SELECT COUNT(*) AS total_records FROM {full_table_name}").show()

In [None]:
spark.table(f"{CATALOG_NAME}.{DATABASE_NAME}.{TABLE_NAME}").show()

## 7. Clean up table and database

In [None]:
spark.sql(f"DROP TABLE IF EXISTS {CATALOG_NAME}.{DATABASE_NAME}.{TABLE_NAME}")
spark.sql(f"DROP DATABASE IF EXISTS {CATALOG_NAME}.{DATABASE_NAME}")

## 8. Iceberg Time Travel

In [None]:
spark.sql(f"""
    SELECT * FROM {CATALOG_NAME}.{DATABASE_NAME}.{TABLE_NAME}.history
""").show(truncate=False)

In [None]:
new_orders = [
    (100, 11111, "prior", 1, 3, 18, 2.0),
    (101, 22222, "prior", 2, 4, 19, 5.0),
]

new_orders_df = spark.createDataFrame(new_orders, orders_schema)
new_orders_df.writeTo(f"{CATALOG_NAME}.{DATABASE_NAME}.orders_bronze").append()

print("新数据已追加！")

In [None]:
spark.sql(f"SELECT COUNT(1) FROM {full_table_name} TIMESTAMP AS OF '2026-01-28 08:49:00'").show()

In [None]:
spark.sql(f"SELECT COUNT(1) FROM {full_table_name} TIMESTAMP AS OF '2026-01-28 09:27:00'").show()

## Create function for each step

In [None]:
def read_csv_from_raw(table_name):
    """
    read CSV files from S3 raw layer
    param table_name: table name (corresponding to folder name in raw layer)
    return: DataFrame
    """
    path = f"s3://{BUCKET_NAME}/raw/{table_name}/"
    
    df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv(path) \
        .withColumn("ingest_timestamp", F.current_timestamp()) \
        .withColumn("source_file", F.input_file_name())
    
    print(f"Read {df.count()} records from {path}")
    return df

In [None]:
def write_to_bronze_iceberg_full(df, table_name):
    """
    Write DataFrame to Bronze Iceberg table
    param df: DataFrame to write
    param table_name: table name
    """
    full_table_name = f"{CATALOG_NAME}.{DATABASE_NAME}.{table_name}"
    table_location = f"s3://{BUCKET_NAME}/bronze/{table_name}"
    
    df.writeTo(full_table_name) \
        .tableProperty("format-version", "2") \
        .tableProperty("location", table_location) \
        .createOrReplace()
    
    print(f"✅ Successfully wrote to {full_table_name}")
    print(f"   Location: {table_location}")

In [None]:
def write_to_bronze_iceberg_incremental(spark, df, table_name):
    """Write DataFrame to Iceberg table (incremental - append)."""
    full_table_name = f"{CATALOG_NAME}.{DATABASE_NAME}.{table_name}"
    
    df.writeTo(full_table_name) \
        .option("check-ordering", "false") \
        .append()
    
    print(f"✅ Successfully appended to {full_table_name}")

In [None]:
tables = ["orders", "products", "aisles", "departments", "order_products"]

for table in tables:
    df = read_csv_from_raw(table)
    write_to_bronze_iceberg_full(df, table)