In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("AggregateTables").getOrCreate()

source_schema = "_gold"
target_schema = "_gold" 
source_table = "master_dso"

In [None]:
master_dso_df = spark.sql(f"SELECT * FROM {source_schema}.{source_table}")

#### Aggregate Table Profits By Year


In [None]:
# Calculate year from Order_Date column
master_dso_df  = master_dso_df.withColumn("Year", F.year("Order_Date"))

# Group by year and calculate total profit for each year
profit_by_year = master_dso_df.groupBy("Year").agg(F.sum("Profit").alias("Total_Profit")).orderBy("Year")

# Show the aggregated profits by year
profit_by_year.show()

# Save the aggregated profits by year to a new table

profit_by_year.write.mode("overwrite").saveAsTable(f"{target_schema}.profit_by_year")

#### Aggregate Table Profits by - Product Category

In [None]:
profit_by_product_category = master_dso_df.groupBy("Category").agg(F.sum("Profit").alias("Total_Profit")).orderBy("Total_Profit", ascending=False)

# Show the aggregated profits by product category
profit_by_product_category.show()
# Save the aggregated profits by product category to a new table
profit_by_product_category.write.mode("overwrite").saveAsTable(f"{target_schema}.profit_by_product_category")

#### Aggregate Table Profits By Product Sub Category

In [None]:
profit_by_sub_category = master_dso_df.groupBy("Sub-Category").agg(F.sum("Profit").alias("Total_Profit")).orderBy("Total_Profit", ascending=False)

# Show the aggregated profits by product sub category
profit_by_sub_category.show()

# Save the aggregated profits by product sub category to a new table
profit_by_sub_category.write.mode("overwrite").saveAsTable(f"{target_schema}.profit_by_sub_category")

#### Aggregate Table Profits By Customer

In [None]:
profit_by_customer = master_dso_df.groupBy("Customer_ID", "Customer_Name").agg(F.sum("Profit").alias("Total_Profit"))

# Show the aggregated profits by customer
profit_by_customer.show()

# Save the aggregated profits by customer to a new table
profit_by_customer.write.mode("overwrite").saveAsTable(f"{target_schema}.profit_by_customer")