In [1]:
import os 
from pathlib import Path
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import json
import sys

In [None]:
DRY_RUN = "false"#os.getenv("DRY_RUN")

if DRY_RUN is None:
    print("❌ DRY_RUN environment variable must be set to 'true' or 'false'.")
    sys.exit(1)

warehouse_path = os.path.abspath("data/warehouse")
#spark.stop()
spark = SparkSession.builder \
    .appName("bronze_ingestion") \
    .config("spark.hadoop.hadoop.native.lib", "false") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.0") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", warehouse_path) \
    .getOrCreate()

# List of Iceberg tables in format 'catalog.namespace.table'
tables = [
    "local.bronze.amazon_sale_report",
    "local.bronze.cloud_warehouse_compersion_chart",
    "local.bronze.international_sale_report",
    "local.bronze.may22",
    "local.bronze.p__l_march_2021",
    "local.bronze.sale_report"
]

In [4]:
# ingest raw CSVs into Iceberg tables
csv_dir = Path(r"data/Raw")
catalog_dir = Path(r"data/Bronze")


# Ingest each CSV
for csv_file in csv_dir.glob("*.csv"):
    table_name = csv_file.stem.lower().replace(" ", "_")
    print(f"Ingesting {csv_file.name} into table {table_name}")
    print(f"CSV file: {str(csv_file)}")

    df = spark.read.csv(str(csv_file), header=True, inferSchema=True)
    if table_name in ["amazon_sale_report", "international_sale_report"]:
        df = df.withColumn("date", F.to_date(F.col("Date"), "MM-dd-yy"))

    if DRY_RUN == "true":
        print(f"[DRY RUN] Skipping ingestion for {csv_file.name}")
        empty_df = df.limit(0)
        empty_df.writeTo(f"local.bronze.{table_name}").using("iceberg").createOrReplace()
    elif DRY_RUN == "false":
        print(f"Ingesting {csv_file.name} into Iceberg table {table_name}")
        df.writeTo(f"local.bronze.{table_name}").using("iceberg").createOrReplace()
    

# Rename columns in Iceberg tables
for table in tables:
    # Get the schema of the table
    df = spark.table(table)
    schema = df.schema

    for field in schema.fields:
        old_name = field.name
        new_name = old_name.replace(" ", "_").replace("-", "_").replace(":", "").replace(".", "")
        if old_name != new_name:
            print(f"Renaming {table}: {old_name} -> {new_name}")
            spark.sql(f"""
                ALTER TABLE {table}
                RENAME COLUMN `{old_name}` TO `{new_name}`
            """)

print("✅ Done ingesting all CSVs into Iceberg.")

# add table comments from metadata file

with open("table_metadata.json", "r") as f:
    metadata = json.load(f)

# Loop through the tables
for table_name, table_info in metadata.items():
    full_table_name = f"local.bronze.{table_name.lower()}"
    
    # Add table-level comment
    table_comment = table_info.get("description", "").replace("'", "''")
    if table_comment:

        spark.sql(f"COMMENT ON TABLE {full_table_name} IS '{table_comment}'")
        df = spark.sql(f"DESCRIBE TABLE EXTENDED local.bronze.{table_name.lower()}")

# add column comments from metadata file

with open("table_metadata.json", "r") as f:
    metadata = json.load(f)

# Loop through the tables
for table_name, table_info in metadata.items():
    full_table_name = f"local.bronze.{table_name.lower()}"
    
    
    # Add column-level comments
    columns = table_info.get("columns", {})
    for column_name, column_comment in columns.items():
        if column_comment.strip():
            safe_comment = column_comment.replace("'", "''")
            spark.sql(f"ALTER TABLE {full_table_name} ALTER COLUMN {column_name.lower()} COMMENT '{safe_comment}'")

Ingesting Amazon Sale Report.csv into table amazon_sale_report
CSV file: data\Raw\Amazon Sale Report.csv
Ingesting Amazon Sale Report.csv into Iceberg table amazon_sale_report
Ingesting Cloud Warehouse Compersion Chart.csv into table cloud_warehouse_compersion_chart
CSV file: data\Raw\Cloud Warehouse Compersion Chart.csv
Ingesting Cloud Warehouse Compersion Chart.csv into Iceberg table cloud_warehouse_compersion_chart
Ingesting Expense IIGF.csv into table expense_iigf
CSV file: data\Raw\Expense IIGF.csv
Ingesting Expense IIGF.csv into Iceberg table expense_iigf
Ingesting International sale Report.csv into table international_sale_report
CSV file: data\Raw\International sale Report.csv
Ingesting International sale Report.csv into Iceberg table international_sale_report
Ingesting May22.csv into table may22
CSV file: data\Raw\May22.csv
Ingesting May22.csv into Iceberg table may22
Ingesting P  L March 2021.csv into table p__l_march_2021
CSV file: data\Raw\P  L March 2021.csv
Ingesting P  L

In [None]:
# Rename columns in Iceberg tables
for table in tables:
    # Get the schema of the table
    df = spark.table(table)
    schema = df.schema

    for field in schema.fields:
        old_name = field.name
        new_name = old_name.replace(" ", "_").replace("-", "_").replace(":", "").replace(".", "")
        print(old_name)
        if old_name != new_name:
            print(f"Renaming {table}: {old_name} -> {new_name}")
            spark.sql(f"""
                ALTER TABLE {table}
                RENAME COLUMN `{old_name}` TO `{new_name}`
            """)

print("✅ Done ingesting all CSVs into Iceberg.")

In [7]:
# add table comments from metadata file

with open("table_metadata.json", "r") as f:
    metadata = json.load(f)

# Loop through the tables
for table_name, table_info in metadata.items():
    full_table_name = f"local.bronze.{table_name.lower()}"
    
    # Add table-level comment
    table_comment = table_info.get("description", "").replace("'", "''")
    if table_comment:
        print("commenting table:", full_table_name)
        print("comment:", table_comment)
        spark.sql(f"COMMENT ON TABLE {full_table_name} IS '{table_comment}'")
        df = spark.sql(f"DESCRIBE TABLE EXTENDED local.bronze.{table_name.lower()}")
        print(df.show(50))

commenting table: local.bronze.amazon_sale_report
comment: This table provides detailed insights into Amazon sales data, including SKU Code, Design Number, Stock, Category, Size and Color, to help optimize product profitability. One row represents one sale.
+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|               index|                 int|   NULL|
|            Order_ID|              string|   NULL|
|                date|                date|   NULL|
|              Status|              string|   NULL|
|          Fulfilment|              string|   NULL|
|      Sales_Channel_|              string|   NULL|
|  ship_service_level|              string|   NULL|
|               Style|              string|   NULL|
|                 SKU|              string|   NULL|
|            Category|              string|   NULL|
|                Size|              string|   NULL|
|             

In [8]:
# add column comments from metadata file

with open("table_metadata.json", "r") as f:
    metadata = json.load(f)

# Loop through the tables
for table_name, table_info in metadata.items():
    full_table_name = f"local.bronze.{table_name.lower()}"
    
    
    # Add column-level comments
    columns = table_info.get("columns", {})
    for column_name, column_comment in columns.items():
        if column_comment.strip():
            safe_comment = column_comment.replace("'", "''")
            spark.sql(f"ALTER TABLE {full_table_name} ALTER COLUMN {column_name.lower()} COMMENT '{safe_comment}'")

In [3]:
spark.sql("select * from local.bronze.amazon_sale_report").show()

+-------------------+----------+--------------------+----------+--------------+------------------+--------+-------------------+-------------+----+----------+--------------+---+--------+------+-----------+--------------+----------------+------------+--------------------+-----+------------+----------+
|           Order_ID|      date|              Status|Fulfilment|Sales_Channel_|ship_service_level|   Style|                SKU|     Category|Size|      ASIN|Courier_Status|Qty|currency|Amount|  ship_city|    ship_state|ship_postal_code|ship_country|       promotion_ids|  B2B|fulfilled_by|Unnamed_22|
+-------------------+----------+--------------------+----------+--------------+------------------+--------+-------------------+-------------+----+----------+--------------+---+--------+------+-----------+--------------+----------------+------------+--------------------+-----+------------+----------+
|405-8078784-5731545|2022-04-30|           Cancelled|  Merchant|     Amazon.in|          Standard

In [4]:
from py4j.java_gateway import java_import

java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
path = spark._jvm.Path(spark.conf.get("spark.sql.catalog.local.warehouse"))
fs = path.getFileSystem(spark._jsc.hadoopConfiguration())
print("Resolved warehouse path:", path.makeQualified(fs.getUri(), fs.getWorkingDirectory()))


Resolved warehouse path: file:/c:/Users/joonas.syrjanen/Documents/Data rag/data/warehouse
