# Imports

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F

# Widgets

In [0]:
dbutils.widgets.text("catalog", "project_smartdata")
dbutils.widgets.text("schema_source", "bronze")
dbutils.widgets.text("schema_sink", "silver")

# Constants

In [0]:
catalog         =  dbutils.widgets.get("catalog")
schema_source   =  dbutils.widgets.get("schema_source")
schema_sink     = dbutils.widgets.get("schema_sink")
table_category  = "category"
table_products  = "products"
table_stores    = "stores"
table_sales     = "sales"

table_store_sales   = "store_sales"
table_product_sales = "product_sales"
table_category_sales = "category_sales"

# Read and Cache catalogs

In [0]:
df_stores = spark.table(f"{catalog}.{schema_source}.{table_stores}")
df_products =  spark.table(f"{catalog}.{schema_source}.{table_products}");
df_category = spark.table(f"{catalog}.{schema_source}.{table_category}")
df_stores.cache()
df_products.cache()
df_category.cache()


# UDF

In [0]:
def warning_score_store_sale(amount_total_sale):
    print(f"Monto: {amount_total_sale}")
    if amount_total_sale >= 30000000:
        return "EXCEPCIONAL"
    elif amount_total_sale >= 25000000:
        return  "SOBRE PROMEDIO"
    elif  amount_total_sale >= 15000000:
        return "PROMEDIO"
    else:
        return "CRITICO"

In [0]:
warning_score_store_sale_udf = F.udf(warning_score_store_sale, StringType())

# KPIs

In [0]:
df_sales = spark.table(f"{catalog}.{schema_source}.{table_sales}")
df_join_sales = df_sales.alias("sale").join(broadcast(df_stores.alias("store")), col("sale.store_id") == col("store.store_id") )\
                                         .join(broadcast(df_products.alias("product")), col("sale.product_id") == col("product.product_id"))\
                                         .join(broadcast(df_category.alias("category")), col("product.category_id") == col("category.category_id"))

In [0]:
df_store_sales_year_total = df_join_sales.withColumn("date",
    F.coalesce(
        F.to_date("sale_date", "dd/MM/yyyy"),
        F.to_date("sale_date", "yyyy-MM-dd"),
        F.to_date("sale_date", "dd-MMM-yyyy"), 
        F.to_date("sale_date", "dd-MM-yyyy"), 
        F.to_date("sale_date", "MM/dd/yyyy"),
        F.to_date("sale_date", "yyyyMMdd")
    )).withColumn("year", F.year("date"))\
      .withColumn("sale_total", F.col("price") * F.col("quantity"))
 

In [0]:
df_store_total_sales =  (df_store_sales_year_total.groupBy("year","store_name")
                                                      .agg(F.sum("sale_total").alias("amount_total_sales"),
                                                           F.count("*").alias("total_sales"))
                                                      .orderBy("year",desc("amount_total_sales"))
                                                      )         
df_store_total_sales = df_store_total_sales.withColumn("score",warning_score_store_sale_udf("amount_total_sales"))         

In [0]:
df_store_total_sales.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(f"{catalog}.{schema_sink}.{table_store_sales}")

In [0]:
df_products_total_sales = (df_store_sales_year_total.groupBy("year","product_name")
                                                      .agg(F.sum("sale_total").alias("amount_total_sales"),
                                                           F.count("*").alias("total_sales"))
                                                      .orderBy("year",desc("amount_total_sales"))
                                                      )    


In [0]:
df_products_total_sales.write.mode("overwrite").saveAsTable(f"{catalog}.{schema_sink}.{table_product_sales}")

In [0]:
df_category_total_sales = (df_store_sales_year_total.groupBy("year","category_name")
                                                      .agg(F.sum("sale_total").alias("amount_total_sales"),
                                                           F.count("*").alias("total_sales"))
                                                      .orderBy("year",desc("amount_total_sales"))
                                                      )    


In [0]:
df_category_total_sales.write.mode("overwrite").saveAsTable(f"{catalog}.{schema_sink}.{table_category_sales}")