#### **Gold Layer Transformation & Aggregation for <mark>Orders</mark>**

In [1]:
from pyspark.sql.functions import *

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 3, Finished, Available, Finished)

In [2]:
orders_df = spark.read.parquet("Files/customer_data_silver/customers_orders/part-00000-c7dd2fec-4fae-497e-a3fd-637da3201fe6-c000.snappy.parquet")
# display(orders_df)

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 4, Finished, Available, Finished)

In [3]:
from pyspark.sql.functions import col, to_timestamp, current_date

# ensure numeric types
orders = orders_df \
    .withColumn("Quantity", col("Quantity").cast("int")) \
    .withColumn("TotalAmount", col("TotalAmount").cast("double")) \
    .withColumn("UnitPrice", col("UnitPrice").cast("double")) \
    .withColumn("OrderYear", year(col("OrderDate"))) \
    .withColumn("OrderMonth", month(col("OrderDate"))) \
    .cache()

# orders.createOrReplaceTempView("orders_silver")

orders.write.mode("overwrite").parquet("Files/customer_data_gold/customers_orders/customer_order_kpis")
orders.write.mode("overwrite").saveAsTable("ordertable_kpis")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 5, Finished, Available, Finished)

###### **Total revenue, total orders, avg order value, unique customers, avg items per order.**

In [4]:
from pyspark.sql.functions import sum as _sum, countDistinct, avg

kpis = orders.select(
    _sum("TotalAmount").alias("TotalRevenue"),
    _sum("Quantity").alias("TotalItemsSold"),
    countDistinct("OrderID").alias("TotalOrders"),
    countDistinct("CustomerID").alias("DistinctCustomers"),
    avg("TotalAmount").alias("AvgOrderValue"),
    avg("Quantity").alias("AvgItemsPerOrder")
).limit(1)

#kpis.show(truncate=False)
#kpis.createOrReplaceTempView("gold_revenue") # # Register as temp view (exploration)

# Persist Gold KPIs
kpis.write.mode("overwrite").parquet("Files/customer_data_gold/customer_orders/orders_gold_revenue_kpi")
kpis.write.mode("overwrite").saveAsTable("gold_revenue_kpis")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 6, Finished, Available, Finished)

In [11]:
#spark.sql("SELECT * FROM gold_kpi1").show()

StatementMeta(, fe94027e-fcd2-4795-97ab-4b61bb65c714, 15, Finished, Available, Finished)

+-----------------+--------------+-----------+-----------------+-----------------+------------------+
|     TotalRevenue|TotalItemsSold|TotalOrders|DistinctCustomers|    AvgOrderValue|  AvgItemsPerOrder|
+-----------------+--------------+-----------+-----------------+-----------------+------------------+
|783914.8599999975|          7385|       3000|              100|261.3049533333325|2.4616666666666664|
+-----------------+--------------+-----------+-----------------+-----------------+------------------+



###### **Revenue / Orders by Year-Month (time-series)**

In [7]:
from pyspark.sql.functions import sum as _sum, countDistinct

monthly_trend = orders.groupBy("OrderYear", "OrderMonth") \
    .agg(
        _sum("TotalAmount").alias("MonthlyRevenue"),
        _sum("Quantity").alias("MonthlyItems"),
        countDistinct("OrderID").alias("MonthlyOrders")
    ).orderBy("OrderYear", "OrderMonth")

# Exploration
# monthly_trend.show(20)
# monthly_trend.createOrReplaceTempView("gold_monthly_trend")

# Persist Gold KPI
monthly_trend.write.mode("overwrite").parquet("Files/customer_data_gold/customer_orders/orders_monthly_trend_kpi/")
monthly_trend.write.mode("overwrite").saveAsTable("gold_monthly_trend")


StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 10, Finished, Available, Finished)

###### **Revenue & Orders by Weekday and Hour (peak times)**

In [8]:
by_weekday = orders.groupBy("OrderWeekday") \
    .agg(
        _sum("TotalAmount").alias("Revenue"),
        countDistinct("OrderID").alias("Orders")
    ) \
    .orderBy("OrderWeekday")

# For hourly analysis, keep using OrderDate
by_hour = orders.groupBy(hour("OrderDate").alias("hour")) \
    .agg(
        _sum("TotalAmount").alias("Revenue"),
        countDistinct("OrderID").alias("Orders")
    ) \
    .orderBy("hour")

# Create views for dashboarding
#by_weekday.createOrReplaceTempView("gold_by_weekday")
#by_hour.createOrReplaceTempView("gold_by_hour")

# Persist Gold KPI
by_weekday.write.mode("overwrite").parquet("Files/customer_data_gold/customer_orders/gold_orders_by_weekday_kpi/")
by_weekday.write.mode("overwrite").parquet("Files/customer_data_gold/customer_orders/gold_orders_by_hour_kpi/")

by_weekday.write.mode("overwrite").saveAsTable("gold_orders_by_weekday")
by_hour.write.mode("overwrite").saveAsTable("gold_orders_by_hour")


StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 11, Finished, Available, Finished)

###### **Top N Customers by Lifetime Value (CLV)**

In [9]:
# Total spend, orders and avg order value per customer.
customer_ltv = orders.groupBy("CustomerID", "CustomerName", "Email", "Location") \
    .agg(
        _sum("TotalAmount").alias("TotalSpend"),
        countDistinct("OrderID").alias("TotalOrders"),
        round(avg("TotalAmount"),2).alias("AvgOrderValue")
    ).orderBy(col("TotalSpend").desc())

# customer_ltv.createOrReplaceTempView("gold_customer_ltv")
# customer_ltv.show(10)

customer_ltv.write.mode("overwrite").parquet("Files/customer_data_gold/customer_orders/gold_customer_ltv_kpi/")
customer_ltv.write.mode("overwrite").saveAsTable("gold_customer_ltv")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 12, Finished, Available, Finished)

###### **Repeat Purchase / Retention metrics**

In [15]:
customer_orders = orders.groupBy("CustomerID").agg(countDistinct("OrderID").alias("orders_count"))

repeat_rate = customer_orders.withColumn("is_repeat", when(col("orders_count") > 1, 1).otherwise(0)) \
    .agg(
        (100 * _sum("is_repeat") / countDistinct("CustomerID")).alias("RepeatCustomerPercent"),
        _sum("orders_count").alias("TotalOrders"),
        countDistinct("CustomerID").alias("TotalCustomers")
    )

# repeat_rate.show(truncate=False)
# repeat_rate.createOrReplaceTempView("gold_repeat_rate")

StatementMeta(, fe94027e-fcd2-4795-97ab-4b61bb65c714, 19, Finished, Available, Finished)

+---------------------+-----------+--------------+
|RepeatCustomerPercent|TotalOrders|TotalCustomers|
+---------------------+-----------+--------------+
|100.0                |3000       |100           |
+---------------------+-----------+--------------+



###### **Top Products by Revenue & Quantity**

In [10]:
# Product performance for merchandising
product_perf = orders.groupBy("ProductID", "ProductName", "Category") \
    .agg(
        _sum("TotalAmount").alias("Revenue"),
        _sum("Quantity").alias("UnitsSold"),
        countDistinct("OrderID").alias("OrderCount"),
        round(avg("AvgItemPrice"), 2).alias("AvgItemPrice")
    ).orderBy(col("Revenue").desc())

# product_perf.createOrReplaceTempView("gold_product_perf")
# product_perf.show(10)

product_perf.write.mode("overwrite").parquet("Files/customer_data_gold/customer_orders/gold_product_perf_kpi/")
product_perf.write.mode("overwrite").saveAsTable("gold_product_performance")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 13, Finished, Available, Finished)

###### **Revenue by Category & Price Bucket (segmentation)**

In [11]:
category_perf = orders.groupBy("Category", "PriceBucket") \
    .agg(
        _sum("TotalAmount").alias("Revenue"),
        _sum("Quantity").alias("UnitsSold"),
        countDistinct("OrderID").alias("OrderCount")
    ).orderBy(col("Revenue").desc())

# category_perf.createOrReplaceTempView("gold_category_perf")
# category_perf.show(10)

category_perf.write.mode("overwrite").parquet("Files/customer_data_gold/customer_orders/gold_category_perf_kpi/")
product_perf.write.mode("overwrite").saveAsTable("gold_category_performance")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 14, Finished, Available, Finished)

###### **Payment Method Analysis**

In [12]:
# Share of revenue & orders by payment type.
payment_perf = orders.groupBy("PaymentMethod") \
    .agg(
        _sum("TotalAmount").alias("Revenue"),
        countDistinct("OrderID").alias("Orders"),
        (100 * _sum("TotalAmount") / kpis.collect()[0]["TotalRevenue"]).alias("RevenuePct")
    ).orderBy(col("Revenue").desc())

# payment_perf.createOrReplaceTempView("gold_payment_perf")
# payment_perf.show()

payment_perf.write.mode("overwrite").parquet("Files/customer_data_gold/customer_orders/gold_payment_perf_kpi/")
payment_perf.write.mode("overwrite").saveAsTable("gold_payment_performance")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 15, Finished, Available, Finished)

###### **Order Value Distribution (Low/Medium/High)**

In [13]:
order_value_dist = orders.groupBy("OrderValueCategory") \
    .agg(_sum("TotalAmount").alias("Revenue"), countDistinct("OrderID").alias("Orders")) \
    .orderBy("OrderValueCategory")

# order_value_dist.createOrReplaceTempView("gold_order_value_dist")
# order_value_dist.show()

order_value_dist.write.mode("overwrite").parquet("Files/customer_data_gold/customer_orders/gold_order_value_dist_kpi/")
order_value_dist.write.mode("overwrite").saveAsTable("gold_order_value_dist")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 16, Finished, Available, Finished)

###### **RFM Segmentation (Recency, Frequency, Monetary)**

In [14]:
from pyspark.sql.functions import datediff, max as _max

# reference date = max order date in dataset (avoid using current_date for historical consistency)
ref_date = orders.select(_max("OrderDate").alias("max_dt")).collect()[0]["max_dt"]

rfm = orders.groupBy("CustomerID") \
    .agg(
        datediff(lit(ref_date), _max("OrderDate")).alias("RecencyDays"),
        countDistinct("OrderID").alias("Frequency"),
        round(_sum("TotalAmount"),2).alias("Monetary")
    )

# simple bucketing (example thresholds tunable)
rfm = rfm.withColumn("RecencyBucket", when(col("RecencyDays") <= 30, "0-30")
                               .when(col("RecencyDays") <= 90, "31-90")
                               .otherwise("90+")) \
         .withColumn("FrequencyBucket", when(col("Frequency") == 1, "1")
                                      .when(col("Frequency") <= 3, "2-3")
                                      .otherwise("4+")) \
         .withColumn("MonetaryBucket", when(col("Monetary") < 100, "Low")
                                       .when(col("Monetary") < 500, "Medium")
                                       .otherwise("High"))

# rfm.createOrReplaceTempView("gold_rfm")
# rfm.show(10)

payment_perf.write.mode("overwrite").parquet("Files/customer_data_gold/customer_orders/gold_rfm_kpi/")
payment_perf.write.mode("overwrite").saveAsTable("gold_rmf")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 17, Finished, Available, Finished)

###### **Orders with missing customer/product info (data quality view)**

In [21]:
# List rows that were kept but missing enrichment (important for ETL monitoring).
missing_info = orders.filter((col("customer_missing") == True) | (col("product_missing") == True)) \
    .select("OrderID", "OrderDate", "CustomerID", "ProductID", "customer_missing", "product_missing")

missing_info.createOrReplaceTempView("gold_missing_info")
# missing_info.show(10, truncate=False)

StatementMeta(, fe94027e-fcd2-4795-97ab-4b61bb65c714, 25, Finished, Available, Finished)

+-------+--------------------------+----------+---------+----------------+---------------+
|OrderID|OrderDate                 |CustomerID|ProductID|customer_missing|product_missing|
+-------+--------------------------+----------+---------+----------------+---------------+
|ORD1173|2022-03-05 03:59:07.182394|CUST105   |PROD250  |false           |true           |
|ORD1878|2022-11-17 13:50:40.613537|CUST105   |PROD250  |false           |true           |
|ORD1246|2022-03-31 19:40:42.814271|CUST106   |PROD250  |false           |true           |
|ORD1346|2022-05-07 07:58:14.364788|CUST110   |PROD250  |false           |true           |
|ORD1114|2022-02-11 14:58:22.767589|CUST111   |PROD250  |false           |true           |
|ORD3376|2024-05-17 12:42:00.84028 |CUST113   |PROD250  |false           |true           |
|ORD2320|2023-04-27 23:03:20.466822|CUST117   |PROD250  |false           |true           |
|ORD1757|2022-10-04 09:31:52.237412|CUST118   |PROD250  |false           |true           |

### **Gold Layer Transformation & Aggregation for <mark>WEB LOGS</mark>**

In [15]:
weblogs_df = spark.read.parquet("Files/customer_data_silver/customer_web_logs/part-00000-8ffc9852-b2f1-4bca-97a3-6318521e9314-c000.snappy.parquet")
# display(weblogs_df)

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 18, Finished, Available, Finished)

###### **User Engagement by Action Type**

In [16]:
engagement_by_action = weblogs_df.groupBy("action_group") \
    .agg(
        count("*").alias("TotalEvents"),
        countDistinct("user_id").alias("UniqueUsers")
    ).orderBy(col("TotalEvents").desc())

# engagement_by_action.createOrReplaceTempView("gold_engagement_by_action")
# engagement_by_action.show(10)

engagement_by_action.write.mode("overwrite").parquet("Files/customer_data_gold/web_logs/gold_User_engagement_by_action_kpi/")
engagement_by_action.write.mode("overwrite").saveAsTable("gold_user_engagement_by_action")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 19, Finished, Available, Finished)

###### **Activity by Hour of Day**

In [17]:
activity_by_hour = weblogs_df.groupBy("event_hour") \
    .agg(
        count("*").alias("TotalEvents"),
        countDistinct("user_id").alias("ActiveUsers")
    ).orderBy("event_hour")

# activity_by_hour.createOrReplaceTempView("gold_activity_by_hour")
# activity_by_hour.show(10)

activity_by_hour.write.mode("overwrite").parquet("Files/customer_data_gold/web_logs/gold_User_activity_by_hour_kpi/")
activity_by_hour.write.mode("overwrite").saveAsTable("gold_User_activity_by_hour_kpi")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 20, Finished, Available, Finished)

###### **Daily Traffic Trend**

In [18]:
traffic_by_day = weblogs_df.groupBy("event_year", "event_month", "event_day", "event_weekday") \
    .agg(count("*").alias("TotalEvents")) \
    .orderBy("event_year", "event_month", "event_day")

traffic_by_day.createOrReplaceTempView("gold_traffic_by_day")
# traffic_by_day.show(10)

traffic_by_day.write.mode("overwrite").parquet("Files/customer_data_gold/web_logs/gold_traffic_by_day_kpi/")
traffic_by_day.write.mode("overwrite").saveAsTable("gold_traffic_by_day")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 21, Finished, Available, Finished)

### **Gold Layer Transformation & Aggregation for <mark>Social Media</mark>**

In [19]:
social_media_df = spark.read.parquet("Files/customer_data_silver/customers_social_media/part-00000-9ed40699-0136-430c-95a2-30eb3ca20999-c000.snappy.parquet")
# display(df)

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 22, Finished, Available, Finished)

###### **Posts per Platform**

In [20]:
posts_per_platform = social_media_df.groupBy("platform") \
    .agg(count("*").alias("TotalPosts")) \
    .orderBy("platform")

# posts_per_platform.createOrReplaceTempView("gold_posts_per_platform")
# posts_per_platform.show(10)

posts_per_platform.write.mode("overwrite").parquet("Files/customer_data_gold/Social_Media/gold_posts_per_platform_kpi/")
posts_per_platform.write.mode("overwrite").saveAsTable("gold_posts_per_platform")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 23, Finished, Available, Finished)

###### **Sentiment Distribution per Platform**

In [21]:
sentiment_distribution = social_media_df.groupBy("platform", "sentiment") \
    .agg(count("*").alias("SentimentCount")) \
    .orderBy("platform", "sentiment")

# sentiment_distribution.createOrReplaceTempView("gold_sentiment_distribution")
# sentiment_distribution.show(10)

sentiment_distribution.write.mode("overwrite").parquet("Files/customer_data_gold/Social_Media/gold_sentiment_distribution_kpi/")
sentiment_distribution.write.mode("overwrite").saveAsTable("gold_sentiment_distribution")


StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 24, Finished, Available, Finished)

###### **Average Sentiment Score by Month**

In [22]:
avg_sentiment_by_month = social_media_df.groupBy("post_year", "post_month") \
    .agg(avg("sentiment_score").alias("AvgSentimentScore")) \
    .orderBy("post_year", "post_month")

avg_sentiment_by_month.createOrReplaceTempView("gold_avg_sentiment_by_month")
# avg_sentiment_by_month.show(10)

avg_sentiment_by_month.write.mode("overwrite").parquet("Files/customer_data_gold/Social_Media/gold_avg_sentiment_by_month_kpi/")
avg_sentiment_by_month.write.mode("overwrite").saveAsTable("gold_avg_sentiment_by_month")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 25, Finished, Available, Finished)

###### **Content Length Analysis**

In [23]:
content_length_analysis = social_media_df.groupBy("platform") \
    .agg(
        avg("content_length").alias("AvgContentLength"),
        max("content_length").alias("MaxContentLength"),
        min("content_length").alias("MinContentLength")
    ) \
    .orderBy("platform")

content_length_analysis.createOrReplaceTempView("gold_content_length_analysis")
# content_length_analysis.show(10)

content_length_analysis.write.mode("overwrite").parquet("Files/customer_data_gold/Social_Media/gold_content_length_analysis_kpi/")
content_length_analysis.write.mode("overwrite").saveAsTable("gold_content_length_analysis")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 26, Finished, Available, Finished)

###### **Weekday Posting Behavior**

In [24]:
weekday_posts = social_media_df.groupBy("post_weekday") \
    .agg(count("*").alias("TotalPosts")) \
    .orderBy("post_weekday")

# weekday_posts.createOrReplaceTempView("gold_weekday_posts")
# weekday_posts.show(10)

weekday_posts.write.mode("overwrite").parquet("Files/customer_data_gold/Social_Media/gold_weekday_posts_kpi/")
weekday_posts.write.mode("overwrite").saveAsTable("gold_weekday_posts")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 27, Finished, Available, Finished)

###### **Quarterly Platform Activity**

In [25]:
quarterly_activity = social_media_df.groupBy("post_year", "post_quarter", "platform") \
    .agg(count("*").alias("TotalPosts")) \
    .orderBy("post_year", "post_quarter", "platform")

# quarterly_activity.createOrReplaceTempView("gold_quarterly_activity")
# quarterly_activity.show(10)

quarterly_activity.write.mode("overwrite").parquet("Files/customer_data_gold/Social_Media/gold_quarterly_activity_kpi/")
quarterly_activity.write.mode("overwrite").saveAsTable("gold_quarterly_activity")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 28, Finished, Available, Finished)

### **Gold Layer Transformation & Aggregation for <mark>Reviews</mark>**

In [26]:
reviews_df = spark.read.parquet("Files/customer_data_silver/customers_reviews/part-00000-9b424005-ec91-4bac-9cfb-92b803acd6b4-c000.snappy.parquet")
# display(reviews_df)

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 29, Finished, Available, Finished)

###### **Average Rating per Product**

In [27]:
from pyspark.sql.functions import avg, count

avg_rating_per_product = reviews_df.groupBy("product_id") \
    .agg(
        avg("rating").alias("AvgRating"),
        count("*").alias("TotalReviews")
    ) \
    .orderBy("AvgRating", ascending=False)

# avg_rating_per_product.createOrReplaceTempView("gold_avg_rating_per_product")
# avg_rating_per_product.show(10)

avg_rating_per_product.write.mode("overwrite").parquet("Files/customer_data_gold/Reviews/gold_avg_rating_per_product_kpi/")
avg_rating_per_product.write.mode("overwrite").saveAsTable("gold_avg_rating_per_product")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 30, Finished, Available, Finished)

###### **Sentiment Distribution per Product**

In [43]:
sentiment_distribution = reviews_df.groupBy("product_id", "rating_category") \
    .agg(count("*").alias("ReviewCount")) \
    .orderBy("product_id", "rating_category")

# sentiment_distribution.createOrReplaceTempView("gold_sentiment_distribution")
# sentiment_distribution.show(10)

StatementMeta(, fe94027e-fcd2-4795-97ab-4b61bb65c714, 55, Finished, Available, Finished)

+----------+---------------+-----------+
|product_id|rating_category|ReviewCount|
+----------+---------------+-----------+
|   PROD200|       Negative|         16|
|   PROD200|        Neutral|          6|
|   PROD200|       Positive|         22|
|   PROD201|       Negative|         25|
|   PROD201|        Neutral|         16|
|   PROD201|       Positive|         14|
|   PROD202|       Negative|         28|
|   PROD202|        Neutral|         11|
|   PROD202|       Positive|         24|
|   PROD203|       Negative|         26|
+----------+---------------+-----------+
only showing top 10 rows



###### **Review Trends Over Time**

In [28]:
avg_rating_by_month = reviews_df.groupBy("review_year", "review_month") \
    .agg(
        avg("rating").alias("AvgMonthlyRating"),
        count("*").alias("TotalMonthlyReviews")
    ) \
    .orderBy("review_year", "review_month")

# avg_rating_by_month.createOrReplaceTempView("gold_avg_rating_by_month")
# avg_rating_by_month.show(10)

avg_rating_by_month.write.mode("overwrite").parquet("Files/customer_data_gold/Reviews/gold_avg_rating_by_month_kpi/")
avg_rating_by_month.write.mode("overwrite").saveAsTable("gold_avg_rating_by_month")

StatementMeta(, e98f137b-d095-416c-b3cb-3e5d637771f1, 31, Finished, Available, Finished)