In [0]:
dbutils.widgets.text("silver_schema", "")
dbutils.widgets.text("gold_schema", "")
dbutils.widgets.text("storage_account_name", "")

In [0]:
silver_schema = dbutils.widgets.get("silver_schema")
gold_schema = dbutils.widgets.get("gold_schema")
storage_account_name = dbutils.widgets.get("storage_account_name")

##### Who are the top customers by revenue?


###### Pysaprk Query for top customers by revenue
from pyspark.sql.functions import *

df_orders = spark.sql(
    f"""
    SELECT * FROM {silver_schema}.orders
          """
)

df_accounts = spark.sql(
    f"""
    SELECT * FROM {silver_schema}.accounts
          """
) 

df_top_customers_by_revenue = df_orders.join(df_accounts, df_orders.account_id == df_accounts.id, "inner"). \
    groupBy(df_orders.account_id, df_accounts.name.alias("customer_name")). \
    agg(round(sum(df_orders.total_amt_usd),2).alias("total_revenue"),count("*").alias("total_orders")). \
    orderBy(desc("total_revenue"))

In [0]:
df_top_customers_by_revenue = spark.sql(
    f"""
    SELECT o.account_id,a.name AS customer_name,ROUND(SUM(o.total_amt_usd),2) AS total_revenue ,COUNT(*) AS total_orders 
    FROM {silver_schema}.ORDERS o JOIN {silver_schema}.ACCOUNTS a ON o.account_id = a.id
    GROUP BY o.account_id, a.name
    ORDER BY total_revenue DESC
    """
)

In [0]:
# Writing df_top_customers_by_revenue as a denormalized table into gold layer in Parquet for ADF compatibility
df_top_customers_by_revenue.write.mode("overwrite").parquet(
    f"abfss://gold@{storage_account_name}.dfs.core.windows.net/top_customers_by_revenue"
)


##### Who is the highest purchaser of glossy paper?


###### Pysaprk Query for highest purchaser of glossy paper
from pyspark.sql.functions import *

df_orders = spark.sql(
    f"""
    SELECT * FROM {silver_schema}.orders
          """
)

df_accounts = spark.sql(
    f"""
    SELECT * FROM {silver_schema}.accounts
          """
) 

df_gloss_qty_by_customer = df_orders.alias("o").join(df_accounts.alias("a"), col("o.account_id") == col("a.id"), "inner"). \
groupBy(col("o.account_id"), col("a.name")).agg(sum(col("gloss_qty")). \
alias("total_gloss_qty_purchased"),round(sum(col("gloss_amt_usd")),2).alias("total_gloss_amount_usd")). \
orderBy("total_gloss_qty_purchased", ascending=False)

In [0]:
df_gloss_qty_by_customer = spark.sql(
    f"""
    SELECT o.account_id,a.name,SUM(o.gloss_qty) total_gloss_qty_purchased,ROUND(sum(gloss_amt_usd),2) total_gloss_amount_usd 
    FROM {silver_schema}.ORDERS o JOIN {silver_schema}.ACCOUNTS a ON o.account_id = a.id
    GROUP BY o.account_id, a.name
    ORDER BY total_gloss_qty_purchased DESC
    """
)


In [0]:
#writing df_gloss_qty_by_customer as a denormalized table into gold layer in Parquet for ADF compatibility
df_gloss_qty_by_customer.write.mode("overwrite").parquet(
    f"abfss://gold@{storage_account_name}.dfs.core.windows.net/gloss_qty_by_customer"
)


In [0]:
dbutils.notebook.exit(["gold.top_customers_by_revenue","gold.gloss_qty_by_customer"])