In [0]:
print("Hello DataX")


# Screenshot Output
![Hello DataX Screenshot](/Volumes/workspace/default/first_Program_By_Akash/first_Program_By_Akash.png)



In [0]:
df = (spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("/Volumes/workspace/default/first_Program_By_Akash/ecommerce_orders.csv")
)


In [0]:
from pyspark.sql.functions import col, trim, to_date

# 1️⃣ Read Bronze (already created earlier)
# bronze_df = (spark.read
#     .format("csv")
#     .option("header", "true")
#     .option("inferSchema", "true")
#     .load("/Volumes/workspace/default/first_Program_By_Akash/ecommerce_orders.csv")
# )

# print("====> Bronze Layer Preview")
# bronze_df.show(5)

# 2️⃣ Transform → Silver
silver_df = (
    bronze_df
    .withColumn("price", col("price").cast("double"))
    .withColumn("quantity", col("quantity").cast("int"))
    # Correct date format
    .withColumn("order_date", to_date(col("order_date"), "dd-MM-yyyy"))
    .withColumn("status", trim(col("status")))
    .filter(col("order_id").isNotNull())
    .filter(col("price") > 0)
    .filter(col("quantity") > 0)
    .filter(col("status") == "delivered")
)

# silver_df.show(5)

print("====> Silver Layer Preview")
silver_df.show()

# 3️⃣ Write Silver → Delta Table (proper folder, not CSV file)
silver_path = "/Volumes/workspace/default/first_Program_By_Akash/ecommerce_orders_silver"

silver_df.write.format("delta").mode("overwrite").save(silver_path)

print(f"✅ Silver layer created successfully at {silver_path}")


In [0]:
from pyspark.sql.functions import col, sum as _sum, avg, count, to_date, month, year

# 1️⃣ Read Silver Delta Table
silver_path = "/Volumes/workspace/default/first_Program_By_Akash/ecommerce_orders_silver"
silver_df = spark.read.format("delta").load(silver_path)

# 2️⃣ Transform → Gold (aggregates / business KPIs)
gold_df = (
    silver_df
    # Add year and month for grouping
    .withColumn("order_year", year(col("order_date")))
    .withColumn("order_month", month(col("order_date")))
    # Aggregate revenue and quantity per product per month
    .groupBy("product_id", "order_year", "order_month")
    .agg(
        _sum(col("price") * col("quantity")).alias("total_revenue"),
        _sum("quantity").alias("total_quantity"),
        avg(col("price")).alias("avg_price"),
        count("order_id").alias("total_orders")
    )
)

print("====> Gold Layer Preview")
gold_df.show(10, truncate=False)

# 3️⃣ Write Gold → Delta Table (analytics-ready)
gold_path = "/Volumes/workspace/default/first_Program_By_Akash/ecommerce_orders_gold"

gold_df.write.format("delta").mode("overwrite").save(gold_path)

print(f"✅ Gold layer created successfully at {gold_path}")
