In [4]:
!pip install faker polars spark

Collecting polars
  Downloading polars-1.32.3-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (15 kB)
Collecting spark
  Downloading spark-0.2.1.tar.gz (41 kB)
  Preparing metadata (setup.py) ... [?25ldone
Downloading polars-1.32.3-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (38.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m38.4/38.4 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hBuilding wheels for collected packages: spark
[33m  DEPRECATION: Building 'spark' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'spark'. Discussion can be found at https://github.com/pypa/pip/issues/6334[0m[33m
[0m  Building wheel for s

In [10]:
# --- Persiapan Lingkungan dan Direktori ---
import pandas as pd
import os
import random # Untuk injeksi "raw" data
from datetime import datetime, timedelta
from faker import Faker # Untuk injeksi "raw" data
import polars as pl # Jika Anda masih ingin generate data dengan Polars
from pyspark.sql.functions import col, to_timestamp

# Inisialisasi Faker jika Anda menghasilkan data di notebook ini
fake = Faker("id_ID")
Faker.seed(42)

base_output_dir = 'Data_Medalion_Architecture'
bronze_dir = os.path.join(base_output_dir, 'bronze')
silver_dir = os.path.join(base_output_dir, 'silver') # Mungkin tidak digunakan jika langsung ke Spark DB
gold_dir = os.path.join(base_output_dir, 'gold')     # Mungkin tidak digunakan jika langsung ke Spark DB

# Create local directories (for temporary CSVs if generated here)
os.makedirs(bronze_dir, exist_ok=True)
os.makedirs(silver_dir, exist_ok=True)
os.makedirs(gold_dir, exist_ok=True)

# --- Pastikan Database Spark Tersedia ---
# Asumsi SparkSession sudah aktif

# Bronze Layer Database
spark.sql("CREATE DATABASE IF NOT EXISTS brz_coffeeshop_db;")

# Silver Layer Database
spark.sql("CREATE DATABASE IF NOT EXISTS slv_coffeeshop_db;")

# Gold Layer Database
spark.sql("CREATE DATABASE IF NOT EXISTS gld_coffeeshop_db;")

print("Directories and Spark Databases ensured.")

Directories and Spark Databases ensured.


In [11]:
# --- Lapisan Bronze: Load CSV ke Spark Iceberg Tables ---

# Definisikan path ke CSV yang sudah ada
transactions_csv_path = 'transactions.csv'
products_csv_path = 'products.csv'
stores_csv_path = 'stores.csv'
categories_csv_path = 'categories.csv'

# --- Load dan Simpan ke BRONZE SPARK DATABASE ---
spark.sql("USE brz_coffeeshop_db;")
print("Using Spark database: brz_coffeeshop_db")

# Fungsi untuk memuat CSV ke Spark dan menyimpannya sebagai tabel Iceberg
def load_csv_to_bronze_iceberg(file_path: str, table_name: str, partition_by_col: str = None):
    print(f"\n--- Loading {file_path} into brz_coffeeshop_db.{table_name} (Bronze Layer) ---")
    
    # Baca CSV ke Spark DataFrame
    df_bronze = spark.read.csv(file_path, header=True, inferSchema=True)
    
    print(f"Schema for {table_name}:")
    df_bronze.printSchema()
    print(f"First 5 rows from {table_name}:")
    df_bronze.show(5)

    full_table_name = f"brz_coffeeshop_db.{table_name}"

    if table_name == "stores":
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {full_table_name} (
                store_id        STRING,
                store_name      STRING,
                city_name       STRING
            )
            USING iceberg;
        """)
    elif table_name == "categories":
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {full_table_name} (
                category_id     STRING,
                category_name   STRING
            )
            USING iceberg;
        """)
    elif table_name == "products":
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {full_table_name} (
                product_id      STRING,
                product_name    STRING,
                category_id     STRING,
                unit_price      STRING,
                base_price      STRING
            )
            USING iceberg;
        """)
    elif table_name == "transactions":
        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS {full_table_name} (
                transaction_id  STRING,
                datetime        TIMESTAMP,
                store_id        STRING,
                customer_id     STRING,
                product_id      STRING,
                quantity        STRING,
                payment_method  STRING,
                price           STRING
            )
            USING iceberg
            PARTITIONED BY (days(datetime));
        """)
        # This is the line that caused the NameError
        # Make sure 'col' is imported from pyspark.sql.functions
        df_bronze = df_bronze.withColumn("datetime", col("datetime").cast("timestamp"))
    else:
        print(f"Warning: Using inferred schema for {table_name}. Consider defining a raw schema.")
        
    df_bronze.write.format("iceberg").mode("overwrite").saveAsTable(full_table_name)
    
    print(f"Data from {file_path} loaded to {full_table_name}.")

# Panggil fungsi untuk setiap file
load_csv_to_bronze_iceberg(stores_csv_path, "stores")
load_csv_to_bronze_iceberg(categories_csv_path, "categories")
load_csv_to_bronze_iceberg(products_csv_path, "products")
load_csv_to_bronze_iceberg(transactions_csv_path, "transactions", "datetime")

# Verifikasi
spark.sql("SHOW TABLES IN brz_coffeeshop_db;").show()

Using Spark database: brz_coffeeshop_db

--- Loading stores.csv into brz_coffeeshop_db.stores (Bronze Layer) ---
Schema for stores:
root
 |-- store_id: integer (nullable = true)
 |-- store_name: string (nullable = true)
 |-- city_name: string (nullable = true)

First 5 rows from stores:
+--------+--------------------+--------------------+
|store_id|          store_name|           city_name|
+--------+--------------------+--------------------+
|       1|Jue Coffee Kuning...|Kota Jakarta Selatan|
|       2|Jue Coffee Grand ...|  Kota Jakarta Pusat|
|       3|Jue Coffee Senaya...|  Kota Jakarta Pusat|
|       4|Jue Coffee Pondok...|Kota Jakarta Selatan|
|       5|Jue Coffee Gandar...|Kota Jakarta Selatan|
+--------+--------------------+--------------------+
only showing top 5 rows



25/08/21 10:40:33 ERROR Utils: Aborting task
org.apache.iceberg.exceptions.CommitStateUnknownException: Service failed: 500: Unknown failure
Cannot determine whether the commit was successful or not, the underlying data files may or may not be needed. Manual intervention via the Remove Orphan Files Action can remove these files when a connection to the Catalog can be re-established if the commit was actually unsuccessful.
Please check to see whether or not your commit was successful before retrying this commit. Retrying an already successful operation will result in duplicate records or unintentional modifications.
At this time no files will be deleted including possibly unused manifest lists.
	at org.apache.iceberg.rest.ErrorHandlers$CommitErrorHandler.accept(ErrorHandlers.java:99)
	at org.apache.iceberg.rest.ErrorHandlers$CommitErrorHandler.accept(ErrorHandlers.java:85)
	at org.apache.iceberg.rest.HTTPClient.throwFailure(HTTPClient.java:215)
	at org.apache.iceberg.rest.HTTPClient.exe

Py4JJavaError: An error occurred while calling o175.saveAsTable.
: org.apache.iceberg.exceptions.CommitStateUnknownException: Service failed: 500: Unknown failure
Cannot determine whether the commit was successful or not, the underlying data files may or may not be needed. Manual intervention via the Remove Orphan Files Action can remove these files when a connection to the Catalog can be re-established if the commit was actually unsuccessful.
Please check to see whether or not your commit was successful before retrying this commit. Retrying an already successful operation will result in duplicate records or unintentional modifications.
At this time no files will be deleted including possibly unused manifest lists.
	at org.apache.iceberg.rest.ErrorHandlers$CommitErrorHandler.accept(ErrorHandlers.java:99)
	at org.apache.iceberg.rest.ErrorHandlers$CommitErrorHandler.accept(ErrorHandlers.java:85)
	at org.apache.iceberg.rest.HTTPClient.throwFailure(HTTPClient.java:215)
	at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:299)
	at org.apache.iceberg.rest.BaseHTTPClient.post(BaseHTTPClient.java:88)
	at org.apache.iceberg.rest.RESTClient.post(RESTClient.java:113)
	at org.apache.iceberg.rest.RESTTableOperations.commit(RESTTableOperations.java:159)
	at org.apache.iceberg.BaseTransaction.lambda$commitReplaceTransaction$1(BaseTransaction.java:382)
	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
	at org.apache.iceberg.BaseTransaction.commitReplaceTransaction(BaseTransaction.java:366)
	at org.apache.iceberg.BaseTransaction.commitTransaction(BaseTransaction.java:311)
	at org.apache.iceberg.CommitCallbackTransaction.commitTransaction(CommitCallbackTransaction.java:126)
	at org.apache.iceberg.spark.source.StagedSparkTable.commitStagedChanges(StagedSparkTable.java:34)
	at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:589)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.writeToTable(WriteToDataSourceV2Exec.scala:579)
	at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.writeToTable$(WriteToDataSourceV2Exec.scala:572)
	at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:186)
	at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:221)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:645)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:575)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.iceberg.exceptions.ServiceFailureException: Service failed: 500: Unknown failure
	... 61 more


In [7]:
# --- Lapisan Silver: Transformasi dan Load ke Silver Spark Database ---
spark.sql("CREATE DATABASE IF NOT EXISTS slv_coffeeshop_db;") # Pastikan database Silver ada
spark.sql("USE slv_coffeeshop_db;")
print("\nUsing Spark database: slv_coffeeshop_db")

from pyspark.sql.functions import col, to_timestamp, regexp_replace, trim, upper, when, lit, coalesce, row_number
from pyspark.sql.types import IntegerType, DecimalType # Untuk casting tipe data di Silver
from pyspark.sql.window import Window # Untuk deduplikasi


# --- 1. Transformasi slv_stores (dimensi toko) ---
print("\n--- Transforming stores data into slv_stores ---")
df_bronze_stores = spark.table("brz_coffeeshop_db.stores")

df_slv_stores = df_bronze_stores.select(
    col("store_id").cast(IntegerType()).alias("store_id"), # Cast ke INTEGER
    trim(col("store_name")).alias("store_name"), # Hapus spasi ekstra
    trim(col("city_name")).alias("city_name") # Upper case dan hapus spasi
)

df_slv_stores.write.format("iceberg").mode("overwrite").saveAsTable("slv_stores")
print("slv_stores created.")


# --- 2. Transformasi slv_categories (dimensi kategori) ---
print("\n--- Transforming categories data into slv_categories ---")
df_bronze_categories = spark.table("brz_coffeeshop_db.categories")

df_slv_categories = df_bronze_categories.select(
    col("category_id").cast(IntegerType()).alias("category_id"),
    trim(col("category_name")).alias("category_name")
)

df_slv_categories.write.format("iceberg").mode("overwrite").saveAsTable("slv_categories")
print("slv_categories created.")


# --- 3. Transformasi slv_products (dimensi produk) ---
print("\n--- Transforming products data into slv_products ---")
df_bronze_products = spark.table("brz_coffeeshop_db.products")

df_slv_products = df_bronze_products.select(
    col("product_id").cast(IntegerType()).alias("product_id"),
    trim(col("product_name")).alias("product_name"),
    col("category_id").cast(IntegerType()).alias("category_id"),
    # Konversi harga yang mungkin STRING ke INTEGER, hapus karakter non-angka
    regexp_replace(col("unit_price"), '[^0-9]', '').cast(IntegerType()).alias("unit_price"),
    regexp_replace(col("base_price"), '[^0-9]', '').cast(IntegerType()).alias("base_price")
)

df_slv_products.write.format("iceberg").mode("overwrite").saveAsTable("slv_products")
print("slv_products created.")


# --- 4. Transformasi slv_transactions (fakta transaksi) ---
print("\n--- Transforming transactions data into slv_transactions ---")
df_bronze_transactions = spark.table("brz_coffeeshop_db.transactions")

# 1. Deduplicate Transactions
# Definisikan window untuk deduplikasi, berdasarkan transaction_id dan ambil yang terbaru jika ada duplikat waktu
window_spec_transactions = Window.partitionBy("transaction_id").orderBy(col("datetime").desc())
df_transactions_dedup = df_bronze_transactions.withColumn("row_num", row_number().over(window_spec_transactions)).filter(col("row_num") == 1).drop("row_num")

# 3. Recalculate/Clean Price and Cast other columns
# Load clean products data from Silver layer (sudah dibuat di langkah sebelumnya)
df_slv_products_clean = spark.table("slv_coffeeshop_db.slv_products")

df_slv_transactions_final = df_transactions_dedup.alias("t").join(
    df_slv_products_clean.alias("p"),
    col("t.product_id").cast(IntegerType()) == col("p.product_id"), # Join on cleaned product_id
    "left"
).select(
    col("t.transaction_id"),
    col("t.datetime").alias("transaction_timestamp"),
    col("t.store_id").cast(IntegerType()).alias("store_id"),
    col("t.customer_id"),
    col("t.product_id").cast(IntegerType()).alias("product_id"),
    col("t.quantity").cast(IntegerType()).alias("quantity"),
    trim(col("t.payment_method")).alias("payment_method"), # Bersihkan spasi metode pembayaran
    # **Recalculate price using cleaned unit_price from slv_products**
    # Ini akan mengganti 'price discrepancies' dari Bronze
    (col("t.quantity").cast(IntegerType()) * col("p.unit_price").cast(IntegerType())).alias("total_item_price")
)


# Menulis final cleaned transactions data ke Silver table
df_slv_transactions_final.write.format("iceberg").mode("overwrite").saveAsTable("slv_transactions")
print("slv_transactions created.")

spark.sql("SHOW TABLES IN slv_coffeeshop_db;").show()


Using Spark database: slv_coffeeshop_db

--- Transforming stores data into slv_stores ---


25/08/21 10:38:30 ERROR Utils: Aborting task
org.apache.iceberg.exceptions.CommitStateUnknownException: Service failed: 500: Unknown failure
Cannot determine whether the commit was successful or not, the underlying data files may or may not be needed. Manual intervention via the Remove Orphan Files Action can remove these files when a connection to the Catalog can be re-established if the commit was actually unsuccessful.
Please check to see whether or not your commit was successful before retrying this commit. Retrying an already successful operation will result in duplicate records or unintentional modifications.
At this time no files will be deleted including possibly unused manifest lists.
	at org.apache.iceberg.rest.ErrorHandlers$CommitErrorHandler.accept(ErrorHandlers.java:99)
	at org.apache.iceberg.rest.ErrorHandlers$CommitErrorHandler.accept(ErrorHandlers.java:85)
	at org.apache.iceberg.rest.HTTPClient.throwFailure(HTTPClient.java:215)
	at org.apache.iceberg.rest.HTTPClient.exe

Py4JJavaError: An error occurred while calling o112.saveAsTable.
: org.apache.iceberg.exceptions.CommitStateUnknownException: Service failed: 500: Unknown failure
Cannot determine whether the commit was successful or not, the underlying data files may or may not be needed. Manual intervention via the Remove Orphan Files Action can remove these files when a connection to the Catalog can be re-established if the commit was actually unsuccessful.
Please check to see whether or not your commit was successful before retrying this commit. Retrying an already successful operation will result in duplicate records or unintentional modifications.
At this time no files will be deleted including possibly unused manifest lists.
	at org.apache.iceberg.rest.ErrorHandlers$CommitErrorHandler.accept(ErrorHandlers.java:99)
	at org.apache.iceberg.rest.ErrorHandlers$CommitErrorHandler.accept(ErrorHandlers.java:85)
	at org.apache.iceberg.rest.HTTPClient.throwFailure(HTTPClient.java:215)
	at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:299)
	at org.apache.iceberg.rest.BaseHTTPClient.post(BaseHTTPClient.java:88)
	at org.apache.iceberg.rest.RESTClient.post(RESTClient.java:113)
	at org.apache.iceberg.rest.RESTTableOperations.commit(RESTTableOperations.java:159)
	at org.apache.iceberg.BaseTransaction.lambda$commitReplaceTransaction$1(BaseTransaction.java:382)
	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
	at org.apache.iceberg.BaseTransaction.commitReplaceTransaction(BaseTransaction.java:366)
	at org.apache.iceberg.BaseTransaction.commitTransaction(BaseTransaction.java:311)
	at org.apache.iceberg.CommitCallbackTransaction.commitTransaction(CommitCallbackTransaction.java:126)
	at org.apache.iceberg.spark.source.StagedSparkTable.commitStagedChanges(StagedSparkTable.java:34)
	at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:589)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.writeToTable(WriteToDataSourceV2Exec.scala:579)
	at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.writeToTable$(WriteToDataSourceV2Exec.scala:572)
	at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:186)
	at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:221)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:645)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:575)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.iceberg.exceptions.ServiceFailureException: Service failed: 500: Unknown failure
	... 61 more


In [13]:
from pyspark.sql.functions import col, current_date, lit, sum, countDistinct, avg, to_date, year, month, dayofmonth, expr, sha2, concat, lit
from pyspark.sql.types import DateType
from datetime import datetime

# --- Lapisan Gold: Transformasi dan Load ke Gold Spark Database ---
spark.sql("USE gld_coffeeshop_db;")
print("\nUsing Spark database: gld_coffeeshop_db")

# --- 1. dim_store (dimensi toko) ---
print("\n--- Transforming slv_stores into dim_store (Gold Layer) ---")
df_slv_stores = spark.table("slv_coffeeshop_db.slv_stores")

df_gld_dim_store = df_slv_stores.select(
    col("store_id"),
    col("store_name"),
    col("city_name"),
    current_date().alias("loaded_date") # Metadata loaded date
)

# --- FIX HERE: Use expr() for dim_store_key calculation ---
# Build the SQL string for the surrogate key calculation
surrogate_key_sql_expression_store = f"SHA2(CONCAT(CAST(store_id AS STRING), CAST(loaded_date AS STRING)), 256)"

# Add the surrogate key column using expr
df_gld_dim_store = df_gld_dim_store.withColumn(
    "dim_store_key", expr(surrogate_key_sql_expression_store)
)
# --- END FIX ---

df_gld_dim_store.write.format("iceberg").mode("overwrite").saveAsTable("dim_store")
print("dim_store created.")

# --- 2. dim_category (dimensi kategori) ---
print("\n--- Transforming slv_categories into dim_category (Gold Layer) ---")
df_slv_categories = spark.table("slv_coffeeshop_db.slv_categories")

df_gld_dim_category = df_slv_categories.select(
    col("category_id"),
    col("category_name"),
    current_date().alias("loaded_date")
)
# Build the SQL string for the surrogate key calculation
surrogate_key_sql_expression_category = f"SHA2(CONCAT(CAST(category_id AS STRING), CAST(loaded_date AS STRING)), 256)"
df_gld_dim_category = df_gld_dim_category.withColumn(
    "dim_category_key", expr(surrogate_key_sql_expression_category)
)
df_gld_dim_category.write.format("iceberg").mode("overwrite").saveAsTable("dim_category")
print("dim_category created.")

# --- 3. dim_product (dimensi produk) ---
print("\n--- Transforming slv_products into dim_product (Gold Layer) ---")
df_slv_products = spark.table("slv_coffeeshop_db.slv_products")

df_gld_dim_product = df_slv_products.select(
    col("product_id"),
    col("product_name"),
    col("category_id"),
    col("unit_price"),
    col("base_price"),
    current_date().alias("loaded_date")
)
# Build the SQL string for the surrogate key calculation
surrogate_key_sql_expression_product = f"SHA2(CONCAT(CAST(product_id AS STRING), CAST(loaded_date AS STRING)), 256)"
df_gld_dim_product = df_gld_dim_product.withColumn(
    "dim_product_key", expr(surrogate_key_sql_expression_product)
)
df_gld_dim_product.write.format("iceberg").mode("overwrite").saveAsTable("dim_product")
print("dim_product created.")


# --- 4. fact_sales (fakta penjualan) ---
print("\n--- Transforming slv_transactions into fact_sales (Gold Layer) ---")
df_slv_transactions = spark.table("slv_coffeeshop_db.slv_transactions")

# Gabungkan dengan dimensi untuk mendapatkan kunci surrogate
df_gld_fact_sales = df_slv_transactions.alias("t").join(
    df_gld_dim_store.alias("ds"), col("t.store_id") == col("ds.store_id"), "left"
).join(
    df_gld_dim_product.alias("dp"), col("t.product_id") == col("dp.product_id"), "left"
).join(
    df_gld_dim_category.alias("dc"), col("dp.category_id") == col("dc.category_id"), "left"
).select(
    col("t.transaction_id"),
    col("t.transaction_timestamp"),
    col("ds.dim_store_key"), # Foreign key ke dim_store
    col("t.customer_id"),
    col("dp.dim_product_key"), # Foreign key ke dim_product
    col("dc.dim_category_key"), # Foreign key ke dim_category (melalui produk)
    col("t.quantity"),
    col("t.total_item_price"),
    col("t.payment_method"),
    current_date().alias("loaded_date")
)

df_gld_fact_sales.write.format("iceberg").mode("overwrite").saveAsTable("fact_sales")
print("fact_sales created.")


# --- 5. gold_daily_sales_summary (fakta agregat harian) ---
print("\n--- Aggregating fact_sales into gold_daily_sales_summary (Gold Layer) ---")

df_gold_daily_sales_summary = df_gld_fact_sales.groupBy(
    to_date(col("transaction_timestamp")).alias("sale_date")
).agg(
    sum("total_item_price").alias("total_revenue"),
    countDistinct("transaction_id").alias("total_transactions"),
    sum("quantity").alias("total_quantity_sold"),
    current_date().alias("loaded_date")
)

df_gold_daily_sales_summary.write.format("iceberg").mode("overwrite").saveAsTable("gold_daily_sales_summary")
print("gold_daily_sales_summary created.")

# --- 6. gold_product_performance (fakta agregat produk) ---
print("\n--- Aggregating fact_sales into gold_product_performance (Gold Layer) ---")

# Ambil tabel Gold yang sudah ada
df_gld_fact_sales = spark.table("gld_coffeeshop_db.fact_sales")
df_gld_dim_product = spark.table("gld_coffeeshop_db.dim_product")
df_gld_dim_category = spark.table("gld_coffeeshop_db.dim_category")


df_gold_product_performance = df_gld_fact_sales.alias("fs").join(
    df_gld_dim_product.alias("dp"), col("fs.dim_product_key") == col("dp.dim_product_key"), "left"
).join(
    df_gld_dim_category.alias("dc"),
    # --- FIX HERE: Join dim_product (dp) with dim_category (dc) on category_id ---
    col("dp.category_id") == col("dc.category_id"),
    # --- END FIX ---
    "left"
).groupBy(
    col("dp.product_id"),
    col("dp.product_name"),
    col("dc.category_name") # Use category_name from the joined dim_category
).agg(
    sum("fs.quantity").alias("total_quantity_sold"),
    sum("fs.total_item_price").alias("total_revenue_from_product"),
    avg("dp.unit_price").alias("average_unit_price")
)

df_gold_product_performance.write.format("iceberg").mode("overwrite").saveAsTable("gold_product_performance")
print("gold_product_performance created.")


spark.sql("SHOW TABLES IN gld_coffeeshop_db;").show()


Using Spark database: gld_coffeeshop_db

--- Transforming slv_stores into dim_store (Gold Layer) ---
dim_store created.

--- Transforming slv_categories into dim_category (Gold Layer) ---
dim_category created.

--- Transforming slv_products into dim_product (Gold Layer) ---
dim_product created.

--- Transforming slv_transactions into fact_sales (Gold Layer) ---


                                                                                

fact_sales created.

--- Aggregating fact_sales into gold_daily_sales_summary (Gold Layer) ---


                                                                                

gold_daily_sales_summary created.

--- Aggregating fact_sales into gold_product_performance (Gold Layer) ---
gold_product_performance created.
+-----------------+--------------------+-----------+
|        namespace|           tableName|isTemporary|
+-----------------+--------------------+-----------+
|gld_coffeeshop_db|        dim_category|      false|
|gld_coffeeshop_db|         dim_product|      false|
|gld_coffeeshop_db|           dim_store|      false|
|gld_coffeeshop_db|          fact_sales|      false|
|gld_coffeeshop_db|gold_daily_sales_...|      false|
|gld_coffeeshop_db|gold_product_perf...|      false|
+-----------------+--------------------+-----------+



In [14]:
%%sql

show databases

25/08/15 00:33:47 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


namespace
brz_coffeeshop_db
brz_hospital_db
coffeeshop
gld_coffeeshop_db
gld_hospital_db
slv_coffeeshop_db
slv_hospital_db
coffeeshop_medalion


In [15]:
%%sql

use gld_coffeeshop_db


In [16]:
%%sql

show tables

namespace,tableName,isTemporary
gld_coffeeshop_db,dim_category,False
gld_coffeeshop_db,dim_product,False
gld_coffeeshop_db,dim_store,False
gld_coffeeshop_db,fact_sales,False
gld_coffeeshop_db,gold_daily_sales_summary,False
gld_coffeeshop_db,gold_product_performance,False


In [17]:
%%sql

select * from dim_category

category_id,category_name,loaded_date,dim_category_key
1,Coffee,2025-08-15,a10d89ac9805c74f97040822a8c07b87f4821a881661f672fc28730b682a458a
2,Non-Coffee,2025-08-15,80b528c911364b4050c30f7bf723e430e79f62e406b344fe54844a04ecb8779d
3,Snacks,2025-08-15,27daa7189ba69114348e90347e85f7a284a7d0abac68f83749dceaa562dc5edc
4,Pastries & Cakes,2025-08-15,9c0fab0a365086a0ae119722f83f36ad097455f0e84a40aa79ce91514ede8649
5,Breakfast Menu,2025-08-15,e6440b7fd272be7bae0b2bdfc8fd5d9e8c06c440c6b971f8335987215d3e56fd
6,Lunch & Dinner,2025-08-15,2bab3416a69a1fe97cb1811cb624be7f97c91621d88eadf6475cc91fd232e37e
7,Desserts,2025-08-15,ee03dfcf24c4c26551402fdd0765e2d435692c58ced9ac0359c7df3d3e7d00db
8,Merchandise,2025-08-15,db4b484ac1f666fc4df57634b5dd3ce2f2eb89a21a2c395ebcd093d179f1bcb7
9,Brewing Equipment,2025-08-15,4f30959587af37e8c0e0875d73fba2887372da211907e5c5c16d5233dcffe058
10,Packaged Beans,2025-08-15,0f2532fa0c55dff59d6bfc56c03e70a880a0b3ea0930b98e2cc38506934800f1


In [21]:
%%sql

select * from dim_product

product_id,product_name,category_id,unit_price,base_price,loaded_date,dim_product_key
101,Kopi Telur Tradisional,1,18000,13320,2025-08-15,a75c8ed697d4012ae5c245d2efa00bb3dd4292a8f9dddf15d4a19113bbbb4723
102,Kopi Kelapa Khas Vietnam,1,22000,14960,2025-08-15,2777376b58918aadce0d8610683076e8a48d90fd582a7f35a75d1fe1801e5e94
103,Kopi Vietnam Drip Original,1,20000,13600,2025-08-15,0f03671d4664ace6206891b105e2a4c942f23295f4fc2244da3d1e92330d2e34
104,Kopi Butter Gurih,1,19000,12350,2025-08-15,b7aa5d6a3bd30888c95fb8c3297671f659fa338c6931f788b485fa6615a207d4
105,Kopi Susu Kampung Kental Manis,1,15000,10800,2025-08-15,29c7e9afc4b26dda35a0c27b8e29793984b73bf7cf34561015bbe66924f62e7a
106,Kopi Coklat Spesial,1,21000,15750,2025-08-15,96d2be23c4f130ed4282829b60633fcf47013ac832688110556ee76b1f2882a9
107,Es Kopi Susu Aren,1,23000,16100,2025-08-15,e31579bbe8df4dbe9f204d34110f423583e5cf6a377d39b53ced448497956d6c
108,Es Kopi Hitam Dingin,1,16000,11040,2025-08-15,9d002f05df49f82a9c5f24f21c1e57b14ffb947ad76b0e282fa9443cbbb38d7e
109,Es Kopi Hitam Lemon Segar,1,18000,12600,2025-08-15,144bdcb958f32f698feb9b710fa0851687ea4574a6bb7bb68bc23665d71d6545
110,Drip Bag Coffee Lokal Blend,1,25000,17250,2025-08-15,5444bae2713c572c5a3f32598e2b28b04c8cbae285da452e2799a7d40fd9216a


In [30]:
%%sql

select * from dim_store

store_id,store_name,city_name,loaded_date,dim_store_key
1,Jue Coffee Kuningan City,KOTA JAKARTA SELATAN,2025-08-15,a10d89ac9805c74f97040822a8c07b87f4821a881661f672fc28730b682a458a
2,Jue Coffee Grand Indonesia,KOTA JAKARTA PUSAT,2025-08-15,80b528c911364b4050c30f7bf723e430e79f62e406b344fe54844a04ecb8779d
3,Jue Coffee Senayan City,KOTA JAKARTA PUSAT,2025-08-15,27daa7189ba69114348e90347e85f7a284a7d0abac68f83749dceaa562dc5edc
4,Jue Coffee Pondok Indah Mall,KOTA JAKARTA SELATAN,2025-08-15,9c0fab0a365086a0ae119722f83f36ad097455f0e84a40aa79ce91514ede8649
5,Jue Coffee Gandaria City,KOTA JAKARTA SELATAN,2025-08-15,e6440b7fd272be7bae0b2bdfc8fd5d9e8c06c440c6b971f8335987215d3e56fd
6,Jue Coffee Pacific Place,KOTA JAKARTA SELATAN,2025-08-15,2bab3416a69a1fe97cb1811cb624be7f97c91621d88eadf6475cc91fd232e37e
7,Jue Coffee Kota Kasablanka,KOTA JAKARTA SELATAN,2025-08-15,ee03dfcf24c4c26551402fdd0765e2d435692c58ced9ac0359c7df3d3e7d00db
8,Jue Coffee Lotte Avenue,KOTA JAKARTA SELATAN,2025-08-15,db4b484ac1f666fc4df57634b5dd3ce2f2eb89a21a2c395ebcd093d179f1bcb7
9,Jue Coffee Plaza Senayan,KOTA JAKARTA PUSAT,2025-08-15,4f30959587af37e8c0e0875d73fba2887372da211907e5c5c16d5233dcffe058
10,Jue Coffee Sarinah Thamrin,KOTA JAKARTA PUSAT,2025-08-15,0f2532fa0c55dff59d6bfc56c03e70a880a0b3ea0930b98e2cc38506934800f1


In [31]:
%%sql

select * from fact_sales

transaction_id,transaction_timestamp,dim_store_key,customer_id,dim_product_key,dim_category_key,quantity,total_item_price,payment_method,loaded_date
000132fe-19c4-464d-82c9-455957ee8604,2023-07-24 11:37:35,cc32ac4e402559c17950372ee8d03eaa2ab771a800424c40ec8ad1f34526b0d6,,3970491cdd2658ae1b1e9028125f3916ad7bb1d1cf41f8384e1de7a79e3213dc,80b528c911364b4050c30f7bf723e430e79f62e406b344fe54844a04ecb8779d,2,70000,ShopeePay,2025-08-15
00021ebc-071a-4716-aa89-24e9fa3b1576,2023-07-30 15:36:33,9d002f05df49f82a9c5f24f21c1e57b14ffb947ad76b0e282fa9443cbbb38d7e,,84da10702e1915c0133c6df14090ff51716346f26daf7ac50908ce550fe530dd,2bab3416a69a1fe97cb1811cb624be7f97c91621d88eadf6475cc91fd232e37e,2,150000,Credit card,2025-08-15
0002c61e-0674-434d-be49-c75ee0e50918,2023-07-24 13:31:42,96d2be23c4f130ed4282829b60633fcf47013ac832688110556ee76b1f2882a9,,3aa8768161c2e2538664893b4bf26907346f87d8a613f68fbf1d99064be12f62,27daa7189ba69114348e90347e85f7a284a7d0abac68f83749dceaa562dc5edc,2,44000,QRIS,2025-08-15
0003c1f8-40c4-41c5-8d1f-a66246c6e3a2,2023-07-03 12:08:42,c95e0ec0e5ee4bfd3263b60a3439f76f3c07d09ec75f4f3347a41df13b707a6c,,01cb523f980c09e2b950d9997c2847a39247f1cf4d4b6ea2b3f5d5a0781bd506,4f30959587af37e8c0e0875d73fba2887372da211907e5c5c16d5233dcffe058,2,500000,Gopay,2025-08-15
0004475d-8fc6-45e3-80d6-4608b00f65eb,2023-07-04 18:30:12,0f2532fa0c55dff59d6bfc56c03e70a880a0b3ea0930b98e2cc38506934800f1,,775fe57d4e5d1d4d58199b713a4a04ba42889434dec87a2773e7eedc154da649,e6440b7fd272be7bae0b2bdfc8fd5d9e8c06c440c6b971f8335987215d3e56fd,1,45000,Cash,2025-08-15
0005936e-8f75-4792-ac43-97408526f170,2023-07-25 18:27:47,bfea7a19589dcae4512bf6f3c7ffd2435e712bffc8210cf67398263b6ce16504,,3358353460149125421ec4365b540e144128675a21b191cc657f7edae5927a3b,e6440b7fd272be7bae0b2bdfc8fd5d9e8c06c440c6b971f8335987215d3e56fd,3,150000,ShopeePay,2025-08-15
000617a9-9a38-431b-883f-d662cf070841,2023-07-25 09:34:19,8292b4c840040f7c2585b93fd0b2c6c95ab59a57626f93bf2d9c17bcfcab6452,,ae3870b0a08d7e20f016c52187fdd13a5c4559cb3f12c9e506b77ce43e2dda16,80b528c911364b4050c30f7bf723e430e79f62e406b344fe54844a04ecb8779d,1,34000,QRIS,2025-08-15
0006366b-81f3-4d40-afa7-1f726c78c97c,2023-07-19 13:29:23,5734c720d31e2d2c6a4e8b51a23351bc1ff487f07d8280ae80ad240e2aa33693,,7c9845547cc12fbc5639f1841439f33a497f39edf36cc1d456cbb73db2b60fed,0f2532fa0c55dff59d6bfc56c03e70a880a0b3ea0930b98e2cc38506934800f1,1,90000,Credit card,2025-08-15
000677a4-e07a-456e-9f60-5b01da8e13b5,2023-07-09 09:34:05,dc4c676c916b50280869bd482ef38c8e9a44e0f9aa9a986be60a751d80aad61f,,2777376b58918aadce0d8610683076e8a48d90fd582a7f35a75d1fe1801e5e94,a10d89ac9805c74f97040822a8c07b87f4821a881661f672fc28730b682a458a,1,22000,Debit card,2025-08-15
00075787-f38a-4228-8465-5c87618630d2,2023-07-11 09:08:41,d40ac281a1cf19520a3898fef20bb05bdcba1dea67117da26eb8115316bd8aa0,,983f143d610994aeb45504c56db27f171ee30eab6ea2a5fc143b203b4902308f,ee03dfcf24c4c26551402fdd0765e2d435692c58ced9ac0359c7df3d3e7d00db,1,20000,Credit card,2025-08-15


In [32]:
%%sql

select * from gold_daily_sales_summary

sale_date,total_revenue,total_transactions,total_quantity_sold,loaded_date
2023-07-15,2823131000,33320,55836,2025-08-15
2023-07-29,2765060000,33318,55460,2025-08-15
2023-07-18,2783104000,33204,55282,2025-08-15
2023-07-04,2787672000,33422,55748,2025-08-15
2023-07-19,2754635000,33235,55484,2025-08-15
2023-07-07,2749297000,33301,55265,2025-08-15
2023-07-22,2764189000,33204,55088,2025-08-15
2023-07-12,2850202000,33498,55744,2025-08-15
2023-07-10,2760840000,33336,55448,2025-08-15
2023-07-02,2819422000,33375,55958,2025-08-15


In [33]:
%%sql

select * from gold_product_performance	

product_id,product_name,category_name,total_quantity_sold,total_revenue_from_product,average_unit_price
1004,Decaf Blend (250g),Packaged Beans,14425,1298250000,90000.0
802,Jue Coffee Tumbler (Large),Merchandise,14294,1715280000,120000.0
411,Scones with Jam & Cream,Pastries & Cakes,14658,483714000,33000.0
403,Almond Croissant,Pastries & Cakes,14108,493780000,35000.0
307,Onion Rings,Snacks,14529,392283000,27000.0
113,Latte Panas,Coffee,14340,401520000,28000.0
1003,Single Origin Robusta Lampung (250g),Packaged Beans,14143,1060725000,75000.0
120,Cold Brew Black,Coffee,14544,407232000,28000.0
123,Manual Brew V60,Coffee,14442,505470000,35000.0
414,Donat Gula,Pastries & Cakes,14298,257364000,18000.0
