In [0]:
from pyspark.sql.functions import *

In [0]:
gold_df = spark.read.table("workspace.default.silver_retail_table_clean")
gold_df.display()

Adding **Revenue** Column which is Multiplication of UnitPrice and Quantitity Column

In [0]:
gold_df = gold_df.withColumn("Revenue", round(col("UnitPrice") * col("Quantity"),2))

In [0]:
gold_df.display()

### Gold Table One - Total Revenue per Country per Year

In [0]:
Total_Revenue_country_year = gold_df.groupBy("Country").pivot("InvoiceYear").agg(round(sum("Revenue"),2).alias("Total Revenue By Country"))

In [0]:
Total_Revenue_country_year.write.mode("overwrite").format("delta").saveAsTable("gold_total_revenue_country_year")

### Monthly Sale Trends

In [0]:
monthly_sales = gold_df.groupBy("InvoiceYear", "InvoiceMonth")\
  .agg(sum("Revenue").alias("Monthly_Revenue"))\
  .orderBy("InvoiceYear","InvoiceMonth")

In [0]:
monthly_sales.write.mode("overwrite").format("delta").saveAsTable("gold_monthly_sales")

### Top Customers By Revenue

Here We found that Customer IDs with NULL value was the top result, so we just renamed those items to "UNKNOWN". Also, as the ColumnID was an integer type, we also had to change it to STRING Type

In [0]:
top_customers = gold_df.withColumn("CustomerID_clean", coalesce(col("CustomerID").cast("string"), lit("UNKNOWN")))\
    .groupBy("CustomerID_clean")\
    .agg(round(sum("Revenue"),2).alias("Customer_Revenue"))\
        .orderBy(col("Customer_Revenue").desc())

In [0]:
top_customers.display()

In [0]:
top_customers = top_customers.withColumnRenamed("CustomerID_clean", "CustomerID")

In [0]:
top_customers.write.mode("overwrite").format("delta").option("overwriteSchema", True).saveAsTable("gold_top_customers")