# Imports

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F

# Widgets

In [0]:
dbutils.widgets.removeAll()

In [0]:
dbutils.widgets.text("catalogo", "chocolate_sales")
dbutils.widgets.text("esquema_source", "silver")
dbutils.widgets.text("esquema_sink", "golden")

# Parameters

In [0]:
catalogo = dbutils.widgets.get("catalogo")
esquema_source = dbutils.widgets.get("esquema_source")
esquema_sink = dbutils.widgets.get("esquema_sink")

# Read source table

In [0]:
df_sales_final = spark.table(f"{catalogo}.{esquema_source}.sales_final")

# Aggregations

In [0]:
df_sabana_datos = df_sales_final

df_salesman_KPI = df_sabana_datos.groupBy("salesman_id","salesman_name").agg(sum("amount").alias("total_sales"),
                                                             count(col("salesman_id")).alias("conteo_ventas"),
                                                             sum("boxes_shipped").alias("total_boxes_shipped")).orderBy(col("total_sales").desc())

df_country_KPI = df_sabana_datos.groupBy("country_id","country").agg(sum("amount").alias("total_sales"),
                                                           count(col("country_id")).alias("conteo_ventas"),
                                                           sum("boxes_shipped").alias("total_boxes_shipped")).orderBy(col("total_sales").desc())

df_product_KPI = df_sabana_datos.groupBy("upc_code","product_desc").agg(sum("amount").alias("total_sales"),
                                                           count(col("upc_code")).alias("conteo_ventas"),
                                                           sum("boxes_shipped").alias("total_boxes_shipped")).orderBy(col("total_sales").desc())

# Save in table

In [0]:
df_sabana_datos.write.mode("overwrite").saveAsTable(f"{catalogo}.{esquema_sink}.sales_team_data")
df_salesman_KPI.write.mode("overwrite").saveAsTable(f"{catalogo}.{esquema_sink}.salesman_kpi")
df_country_KPI.write.mode("overwrite").saveAsTable(f"{catalogo}.{esquema_sink}.country_kpi")
df_product_KPI.write.mode("overwrite").saveAsTable(f"{catalogo}.{esquema_sink}.product_kpi")