In [0]:
client_id=""
tenant_id=""
client_secret=""

spark.conf.set("fs.azure.account.auth.type.storagedatafactory2605.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.storagedatafactory2605.dfs.core.windows.net",
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")

spark.conf.set("fs.azure.account.oauth2.client.id.storagedatafactory2605.dfs.core.windows.net", client_id)
spark.conf.set("fs.azure.account.oauth2.client.secret.storagedatafactory2605.dfs.core.windows.net", client_secret)

spark.conf.set("fs.azure.account.oauth2.client.endpoint.storagedatafactory2605.dfs.core.windows.net",
               f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")


In [0]:
display(dbutils.fs.ls("abfss://fsilver@storagedatafactory2605.dfs.core.windows.net/"))


path,name,size,modificationTime
abfss://fsilver@storagedatafactory2605.dfs.core.windows.net/api_docs_delta/,api_docs_delta/,0,1763209641000
abfss://fsilver@storagedatafactory2605.dfs.core.windows.net/cleaned_logs_delta/,cleaned_logs_delta/,0,1763208857000
abfss://fsilver@storagedatafactory2605.dfs.core.windows.net/supply_chain_delta/,supply_chain_delta/,0,1763209201000


In [0]:
df_supply = spark.read.format("delta").load(
    "abfss://fsilver@storagedatafactory2605.dfs.core.windows.net/supply_chain_delta/"
)
df_supply.printSchema()


root
 |-- order_id: integer (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_price: double (nullable = true)
 |-- product_category_id: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- product_description: string (nullable = true)
 |-- order_ts: timestamp (nullable = true)
 |-- ship_ts: timestamp (nullable = true)
 |-- order_year: integer (nullable = true)
 |-- order_month: integer (nullable = true)
 |-- order_ym: string (nullable = true)
 |-- order_item_quantity: integer (nullable = true)
 |-- sales: double (nullable = true)
 |-- order_item_total: double (nullable = true)
 |-- order_profit_per_order: double (nullable = true)
 |-- benefit_per_order: double (nullable = true)
 |-- market: string (nullable = true)
 |-- order_region: string (nullable = true)
 |-- order_state: string (nullable = true)



In [0]:
import pyspark.sql.functions as F

df_monthly = (
    df_supply
    .groupBy("product_name", "category_name", "order_year", "order_month", "order_ym")
    .agg(
        F.sum("sales").alias("monthly_sales"),
        F.sum("order_item_quantity").alias("monthly_quantity")
    )
    .orderBy("product_name", "order_year", "order_month")
)


In [0]:
from pyspark.sql.window import Window

w = Window.partitionBy("product_name").orderBy("order_year", "order_month")

df_monthly = (
    df_monthly
    .withColumn("lag_1_sales", F.lag("monthly_sales", 1).over(w))
    .withColumn("lag_3_sales", F.lag("monthly_sales", 3).over(w))
    .withColumn("rmean_3_sales", F.avg("monthly_sales").over(w.rowsBetween(-2, 0)))
)


In [0]:
df_monthly = df_monthly.withColumn(
    "target_next_month_sales",
    F.lead("monthly_sales", 1).over(w)
)


In [0]:
display(df_monthly.limit(20))


product_name,category_name,order_year,order_month,order_ym,monthly_sales,monthly_quantity,lag_1_sales,lag_3_sales,rmean_3_sales,target_next_month_sales
adidas brazuca 2014 official match ball,baseball & softball,2017,4,2017-04,799.9500275,5,,,799.9500275,2399.8500825
adidas brazuca 2014 official match ball,baseball & softball,2017,5,2017-05,2399.8500825,15,799.9500275,,1599.900055,1919.880066
adidas brazuca 2014 official match ball,baseball & softball,2017,6,2017-06,1919.880066,12,2399.8500825,,1706.5600586666667,2239.8600770000003
adidas brazuca 2014 official match ball,baseball & softball,2017,7,2017-07,2239.8600770000003,14,1919.880066,799.9500275,2186.530075166667,2239.8600770000003
adidas brazuca 2014 official match ball,baseball & softball,2017,8,2017-08,2239.8600770000003,14,2239.8600770000003,2399.8500825,2133.2000733333334,799.9500275
adidas brazuca 2014 official match ball,baseball & softball,2017,9,2017-09,799.9500275,5,2239.8600770000003,1919.880066,1759.8900605,
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,1,2015-01,629.82001486,18,,,629.82001486,699.80002208
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,2,2015-02,699.80002208,20,629.82001486,,664.8100184699999,454.87001414
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,3,2015-03,454.87001414,13,699.80002208,,594.8300170266666,1119.68001538
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,4,2015-04,1119.68001538,32,454.87001414,629.82001486,758.1166838666667,874.75002664


In [0]:
gold_monthly_path = "abfss://fgold@storagedatafactory2605.dfs.core.windows.net/monthly_sales_delta/"

df_monthly.write.format("delta").mode("overwrite").save(gold_monthly_path)


In [0]:
display(dbutils.fs.ls(gold_monthly_path))


path,name,size,modificationTime
abfss://fgold@storagedatafactory2605.dfs.core.windows.net/monthly_sales_delta/_delta_log/,_delta_log/,0,1763223098000
abfss://fgold@storagedatafactory2605.dfs.core.windows.net/monthly_sales_delta/part-00000-3f21887b-6444-480e-bf2b-1ec502eacc48.c000.snappy.parquet,part-00000-3f21887b-6444-480e-bf2b-1ec502eacc48.c000.snappy.parquet,72933,1763223099000
abfss://fgold@storagedatafactory2605.dfs.core.windows.net/monthly_sales_delta/part-00000-ba025aca-6326-481f-b1d8-5d361af85bf7.c000.snappy.parquet,part-00000-ba025aca-6326-481f-b1d8-5d361af85bf7.c000.snappy.parquet,72933,1763224086000


In [0]:
gold_monthly_path = "abfss://fgold@storagedatafactory2605.dfs.core.windows.net/monthly_sales_delta/"

df_monthly = spark.read.format("delta").load(gold_monthly_path)

display(df_monthly)
df_monthly.printSchema()


product_name,category_name,order_year,order_month,order_ym,monthly_sales,monthly_quantity,lag_1_sales,lag_3_sales,rmean_3_sales,target_next_month_sales
adidas brazuca 2014 official match ball,baseball & softball,2017,4,2017-04,799.9500275,5,,,799.9500275,2399.8500825
adidas brazuca 2014 official match ball,baseball & softball,2017,5,2017-05,2399.8500825,15,799.9500275,,1599.900055,1919.880066
adidas brazuca 2014 official match ball,baseball & softball,2017,6,2017-06,1919.880066,12,2399.8500825,,1706.5600586666667,2239.8600770000003
adidas brazuca 2014 official match ball,baseball & softball,2017,7,2017-07,2239.8600770000003,14,1919.880066,799.9500275,2186.530075166667,2239.8600770000003
adidas brazuca 2014 official match ball,baseball & softball,2017,8,2017-08,2239.8600770000003,14,2239.8600770000003,2399.8500825,2133.2000733333334,799.9500275
adidas brazuca 2014 official match ball,baseball & softball,2017,9,2017-09,799.9500275,5,2239.8600770000003,1919.880066,1759.8900605,
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,1,2015-01,629.82001486,18,,,629.82001486,699.80002208
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,2,2015-02,699.80002208,20,629.82001486,,664.8100184699999,454.87001414
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,3,2015-03,454.87001414,13,699.80002208,,594.8300170266666,1119.68001538
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,4,2015-04,1119.68001538,32,454.87001414,629.82001486,758.1166838666667,874.75002664


root
 |-- product_name: string (nullable = true)
 |-- category_name: string (nullable = true)
 |-- order_year: integer (nullable = true)
 |-- order_month: integer (nullable = true)
 |-- order_ym: string (nullable = true)
 |-- monthly_sales: double (nullable = true)
 |-- monthly_quantity: long (nullable = true)
 |-- lag_1_sales: double (nullable = true)
 |-- lag_3_sales: double (nullable = true)
 |-- rmean_3_sales: double (nullable = true)
 |-- target_next_month_sales: double (nullable = true)



In [0]:
display(df_monthly.limit(10))

product_name,category_name,order_year,order_month,order_ym,monthly_sales,monthly_quantity,lag_1_sales,lag_3_sales,rmean_3_sales,target_next_month_sales
adidas brazuca 2014 official match ball,baseball & softball,2017,4,2017-04,799.9500275,5,,,799.9500275,2399.8500825
adidas brazuca 2014 official match ball,baseball & softball,2017,5,2017-05,2399.8500825,15,799.9500275,,1599.900055,1919.880066
adidas brazuca 2014 official match ball,baseball & softball,2017,6,2017-06,1919.880066,12,2399.8500825,,1706.5600586666667,2239.8600770000003
adidas brazuca 2014 official match ball,baseball & softball,2017,7,2017-07,2239.8600770000003,14,1919.880066,799.9500275,2186.530075166667,2239.8600770000003
adidas brazuca 2014 official match ball,baseball & softball,2017,8,2017-08,2239.8600770000003,14,2239.8600770000003,2399.8500825,2133.2000733333334,799.9500275
adidas brazuca 2014 official match ball,baseball & softball,2017,9,2017-09,799.9500275,5,2239.8600770000003,1919.880066,1759.8900605,
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,1,2015-01,629.82001486,18,,,629.82001486,699.80002208
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,2,2015-02,699.80002208,20,629.82001486,,664.8100184699999,454.87001414
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,3,2015-03,454.87001414,13,699.80002208,,594.8300170266666,1119.68001538
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,4,2015-04,1119.68001538,32,454.87001414,629.82001486,758.1166838666667,874.75002664


In [0]:
from pyspark.sql import functions as F

df_monthly = df_monthly.withColumn(
    "mom_growth",
    (F.col("monthly_sales") - F.col("lag_1_sales")) / F.col("lag_1_sales")
)


In [0]:
from pyspark.sql.window import Window

w = Window.partitionBy("product_name").orderBy("order_year", "order_month")

df_monthly = df_monthly.withColumn(
    "lag_12_sales",
    F.lag("monthly_sales", 12).over(w)
)

df_monthly = df_monthly.withColumn(
    "yoy_growth",
    (F.col("monthly_sales") - F.col("lag_12_sales")) / F.col("lag_12_sales")
)


In [0]:
df_monthly = df_monthly.withColumn(
    "rmean_6_sales",
    F.avg("monthly_sales").over(w.rowsBetween(-5, 0))
)


In [0]:
df_monthly.select("product_name", "order_ym", "monthly_sales", "mom_growth", "yoy_growth", "rmean_6_sales").show(10)


+--------------------+--------+------------------+--------------------+----------+------------------+
|        product_name|order_ym|     monthly_sales|          mom_growth|yoy_growth|     rmean_6_sales|
+--------------------+--------+------------------+--------------------+----------+------------------+
|adidas brazuca 20...| 2017-04|       799.9500275|                NULL|      NULL|       799.9500275|
|adidas brazuca 20...| 2017-05|      2399.8500825|                 2.0|      NULL|       1599.900055|
|adidas brazuca 20...| 2017-06|       1919.880066|-0.20000000000000007|      NULL|1706.5600586666667|
|adidas brazuca 20...| 2017-07|2239.8600770000003| 0.16666666666666685|      NULL|     1839.88506325|
|adidas brazuca 20...| 2017-08|2239.8600770000003|                 0.0|      NULL|1919.8800660000002|
|adidas brazuca 20...| 2017-09|       799.9500275| -0.6428571428571429|      NULL|1733.2250595833332|
|adidas kids' f5 m...| 2015-01|      629.82001486|                NULL|      NULL|

In [0]:
df_monthly.write.format("delta").mode("overwrite").save("abfss://fgold@storagedatafactory2605.dfs.core.windows.net/monthly_sales_enriched_delta/")


In [0]:
display(dbutils.fs.ls("abfss://fgold@storagedatafactory2605.dfs.core.windows.net/monthly_sales_enriched_delta/"))


path,name,size,modificationTime
abfss://fgold@storagedatafactory2605.dfs.core.windows.net/monthly_sales_enriched_delta/_delta_log/,_delta_log/,0,1763223574000
abfss://fgold@storagedatafactory2605.dfs.core.windows.net/monthly_sales_enriched_delta/part-00000-b401a2ec-0913-4668-b9db-91762b298c82.c000.snappy.parquet,part-00000-b401a2ec-0913-4668-b9db-91762b298c82.c000.snappy.parquet,114408,1763223574000
abfss://fgold@storagedatafactory2605.dfs.core.windows.net/monthly_sales_enriched_delta/part-00000-e8583eb5-b2cb-40e7-b01b-945b86377730.c000.snappy.parquet,part-00000-e8583eb5-b2cb-40e7-b01b-945b86377730.c000.snappy.parquet,114408,1763224092000


In [0]:
logs_silver_path = "abfss://fsilver@storagedatafactory2605.dfs.core.windows.net/cleaned_logs_delta/"

df_logs = spark.read.format("delta").load(logs_silver_path)

display(df_logs)
df_logs.printSchema()


product,category,DATE,MONTH,HOUR,department,ip,url,event_ts
perfect fitness perfect rip deck,cleats,9/1/2017 18:25,Sep,18,apparel,19.239.13.199,/department/apparel/category/cleats/product/Perfect%20Fitness%20Perfect%20Rip%20Deck,2017-09-01T18:25:00Z
yakima doubledown ace hitch mount 4-bike rack,strength training,9/1/2017 18:25,Sep,18,footwear,61.230.25.199,/department/footwear/category/strength%20training/product/Yakima%20DoubleDown%20Ace%20Hitch%20Mount%204-Bike%20Rack,2017-09-01T18:25:00Z
top flite women's 2017 xl hybrid,golf apparel,9/1/2017 18:26,Sep,18,outdoors,16.231.175.0,/department/outdoors/category/golf%20apparel/product/Top%20Flite%20Women's%202017%20XL%20Hybrid/add_to_cart,2017-09-01T18:26:00Z
o'brien men's neoprene life vest,indoor outdoor games,9/1/2017 18:26,Sep,18,fan shop,62.206.64.9,/department/fan%20shop/category/indoor/outdoor%20games/product/O'Brien%20Men's%20Neoprene%20Life%20Vest,2017-09-01T18:26:00Z
nike men's comfort 2 slide,tennis & racquet,9/1/2017 18:26,Sep,18,fitness,42.220.36.253,/department/fitness/category/tennis%20&%20racquet/product/Nike%20Men's%20Comfort%202%20Slide,2017-09-01T18:26:00Z
adidas kids' rg iii mid football cleat,featured shops,9/1/2017 18:26,Sep,18,apparel,212.11.137.222,/department/apparel/category/featured%20shops/product/adidas%20Kids'%20RG%20III%20Mid%20Football%20Cleat,2017-09-01T18:26:00Z
diamondback women's serene classic comfort bi,camping & hiking,9/1/2017 18:27,Sep,18,fan shop,7.218.119.117,/department/fan%20shop/category/camping%20&%20hiking/product/Diamondback%20Women's%20Serene%20Classic%20Comfort%20Bi,2017-09-01T18:27:00Z
adidas kids' rg iii mid football cleat,featured shops,9/1/2017 18:27,Sep,18,apparel,88.169.187.201,/department/apparel/category/featured%20shops/product/adidas%20Kids'%20RG%20III%20Mid%20Football%20Cleat/add_to_cart,2017-09-01T18:27:00Z
bridgestone e6 straight distance nfl tennesse,electronics,9/1/2017 18:28,Sep,18,outdoors,25.45.74.1,/department/outdoors/category/electronics/product/Bridgestone%20e6%20Straight%20Distance%20NFL%20Tennesse,2017-09-01T18:28:00Z
pelican sunstream 100 kayak,water sports,9/1/2017 18:28,Sep,18,fan shop,113.89.175.180,/department/fan%20shop/category/water%20sports/product/Pelican%20Sunstream%20100%20Kayak,2017-09-01T18:28:00Z


root
 |-- product: string (nullable = true)
 |-- category: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- HOUR: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- url: string (nullable = true)
 |-- event_ts: timestamp (nullable = true)



In [0]:
display(df_logs.limit(10))

product,category,DATE,MONTH,HOUR,department,ip,url,event_ts
perfect fitness perfect rip deck,cleats,9/1/2017 18:25,Sep,18,apparel,19.239.13.199,/department/apparel/category/cleats/product/Perfect%20Fitness%20Perfect%20Rip%20Deck,2017-09-01T18:25:00Z
yakima doubledown ace hitch mount 4-bike rack,strength training,9/1/2017 18:25,Sep,18,footwear,61.230.25.199,/department/footwear/category/strength%20training/product/Yakima%20DoubleDown%20Ace%20Hitch%20Mount%204-Bike%20Rack,2017-09-01T18:25:00Z
top flite women's 2017 xl hybrid,golf apparel,9/1/2017 18:26,Sep,18,outdoors,16.231.175.0,/department/outdoors/category/golf%20apparel/product/Top%20Flite%20Women's%202017%20XL%20Hybrid/add_to_cart,2017-09-01T18:26:00Z
o'brien men's neoprene life vest,indoor outdoor games,9/1/2017 18:26,Sep,18,fan shop,62.206.64.9,/department/fan%20shop/category/indoor/outdoor%20games/product/O'Brien%20Men's%20Neoprene%20Life%20Vest,2017-09-01T18:26:00Z
nike men's comfort 2 slide,tennis & racquet,9/1/2017 18:26,Sep,18,fitness,42.220.36.253,/department/fitness/category/tennis%20&%20racquet/product/Nike%20Men's%20Comfort%202%20Slide,2017-09-01T18:26:00Z
adidas kids' rg iii mid football cleat,featured shops,9/1/2017 18:26,Sep,18,apparel,212.11.137.222,/department/apparel/category/featured%20shops/product/adidas%20Kids'%20RG%20III%20Mid%20Football%20Cleat,2017-09-01T18:26:00Z
diamondback women's serene classic comfort bi,camping & hiking,9/1/2017 18:27,Sep,18,fan shop,7.218.119.117,/department/fan%20shop/category/camping%20&%20hiking/product/Diamondback%20Women's%20Serene%20Classic%20Comfort%20Bi,2017-09-01T18:27:00Z
adidas kids' rg iii mid football cleat,featured shops,9/1/2017 18:27,Sep,18,apparel,88.169.187.201,/department/apparel/category/featured%20shops/product/adidas%20Kids'%20RG%20III%20Mid%20Football%20Cleat/add_to_cart,2017-09-01T18:27:00Z
bridgestone e6 straight distance nfl tennesse,electronics,9/1/2017 18:28,Sep,18,outdoors,25.45.74.1,/department/outdoors/category/electronics/product/Bridgestone%20e6%20Straight%20Distance%20NFL%20Tennesse,2017-09-01T18:28:00Z
pelican sunstream 100 kayak,water sports,9/1/2017 18:28,Sep,18,fan shop,113.89.175.180,/department/fan%20shop/category/water%20sports/product/Pelican%20Sunstream%20100%20Kayak,2017-09-01T18:28:00Z


In [0]:
from pyspark.sql import functions as F

df_logs_g = (
    df_logs
    .withColumn("product_clean", F.lower(F.col("product")))
    .withColumn("log_year", F.year("event_ts"))
    .withColumn("log_month", F.month("event_ts"))
    .withColumn("log_ym", F.date_format("event_ts", "yyyy-MM"))
    .withColumn("url_length", F.length("url"))
)


In [0]:
df_logs_g.show(5)

+--------------------+--------------------+--------------+-----+----+----------+-------------+--------------------+-------------------+--------------------+--------+---------+-------+----------+
|             product|            category|          DATE|MONTH|HOUR|department|           ip|                 url|           event_ts|       product_clean|log_year|log_month| log_ym|url_length|
+--------------------+--------------------+--------------+-----+----+----------+-------------+--------------------+-------------------+--------------------+--------+---------+-------+----------+
|perfect fitness p...|              cleats|9/1/2017 18:25|  Sep|  18|   apparel|19.239.13.199|/department/appar...|2017-09-01 18:25:00|perfect fitness p...|    2017|        9|2017-09|        84|
|yakima doubledown...|   strength training|9/1/2017 18:25|  Sep|  18|  footwear|61.230.25.199|/department/footw...|2017-09-01 18:25:00|yakima doubledown...|    2017|        9|2017-09|       115|
|top flite women's...|   

In [0]:
df_logs_monthly = (
    df_logs_g
    .withColumn("is_add_to_cart", F.col("url").contains("add_to_cart").cast("int"))
    .withColumn("is_morning", F.when((F.col("HOUR") >= 5) & (F.col("HOUR") < 12), 1).otherwise(0))
    .withColumn("is_afternoon", F.when((F.col("HOUR") >= 12) & (F.col("HOUR") < 17), 1).otherwise(0))
    .withColumn("is_evening", F.when((F.col("HOUR") >= 17) & (F.col("HOUR") < 22), 1).otherwise(0))
    .groupBy("product_clean", "log_ym")
    .agg(
        F.count("*").alias("web_views_month"),
        F.countDistinct("ip").alias("web_unique_ips_month"),
        F.sum("is_add_to_cart").alias("web_add_to_cart_month"),
        F.avg("url_length").alias("web_avg_url_length"),
        F.sum("is_morning").alias("web_morning_views"),
        F.sum("is_afternoon").alias("web_afternoon_views"),
        F.sum("is_evening").alias("web_evening_views")
    )
)


In [0]:
df_logs_monthly = df_logs_monthly.withColumn(
    "conversion_rate",
    F.col("web_add_to_cart_month") / F.col("web_views_month")
)


In [0]:
from pyspark.sql.window import Window

w = Window.partitionBy("product_clean").orderBy("log_ym")

df_logs_monthly = df_logs_monthly.withColumn(
    "views_3m",
    F.sum("web_views_month").over(w.rowsBetween(-2, 0))
)

df_logs_monthly = df_logs_monthly.withColumn(
    "views_6m",
    F.sum("web_views_month").over(w.rowsBetween(-5, 0))
)

df_logs_monthly = df_logs_monthly.withColumn(
    "momentum_score",
    F.col("views_3m") / F.col("views_6m")
)


In [0]:
df_logs_monthly.show(10)


+--------------------+-------+---------------+--------------------+---------------------+------------------+-----------------+-------------------+-----------------+-------------------+--------+--------+------------------+
|       product_clean| log_ym|web_views_month|web_unique_ips_month|web_add_to_cart_month|web_avg_url_length|web_morning_views|web_afternoon_views|web_evening_views|    conversion_rate|views_3m|views_6m|    momentum_score|
+--------------------+-------+---------------+--------------------+---------------------+------------------+-----------------+-------------------+-----------------+-------------------+--------+--------+------------------+
|adidas brazuca 20...|2017-09|           1371|                 810|                  364|113.18599562363238|              151|                318|              632|0.26549963530269877|    1371|    1371|               1.0|
|adidas brazuca 20...|2017-10|            815|                 635|                  193|112.84171779141104|    

In [0]:
df_logs_monthly.write.format("delta").mode("overwrite").save(
    "abfss://fgold@storagedatafactory2605.dfs.core.windows.net/logs_monthly_delta/"
)


In [0]:
display(dbutils.fs.ls("abfss://fgold@storagedatafactory2605.dfs.core.windows.net/logs_monthly_delta/"))


path,name,size,modificationTime
abfss://fgold@storagedatafactory2605.dfs.core.windows.net/logs_monthly_delta/_delta_log/,_delta_log/,0,1763224164000
abfss://fgold@storagedatafactory2605.dfs.core.windows.net/logs_monthly_delta/part-00000-707bb543-4d95-46fe-92a6-9885cff73de5.c000.snappy.parquet,part-00000-707bb543-4d95-46fe-92a6-9885cff73de5.c000.snappy.parquet,26045,1763224165000


In [0]:
api_silver_path = "abfss://fsilver@storagedatafactory2605.dfs.core.windows.net/api_docs_delta/"

df_api = spark.read.format("delta").load(api_silver_path)

display(df_api)
df_api.printSchema()


description,field_name
Days of scheduled delivery of the purchased product,days_for_shipment_scheduled_
Link of visit and purchase of the product,product_image
Country where the customer made the purchase,customer_country
Customer Zipcode,customer_zipcode
"The following shipping modes are presented : Standard Class , First Class , Second Class , Same Day",shipping_mode
Total amount per order,order_item_total
Order item code,order_item_id
Customer name,customer_fname
Order code,order_id
Order Profit Per Order,order_profit_per_order


root
 |-- description: string (nullable = true)
 |-- field_name: string (nullable = true)



In [0]:
######################
sales_path = "abfss://fgold@storagedatafactory2605.dfs.core.windows.net/monthly_sales_enriched_delta/"
logs_path  = "abfss://fgold@storagedatafactory2605.dfs.core.windows.net/logs_monthly_delta/"

df_sales = spark.read.format("delta").load(sales_path)
df_logs  = spark.read.format("delta").load(logs_path)

display(df_sales)
display(df_logs)


product_name,category_name,order_year,order_month,order_ym,monthly_sales,monthly_quantity,lag_1_sales,lag_3_sales,rmean_3_sales,target_next_month_sales,mom_growth,lag_12_sales,yoy_growth,rmean_6_sales
adidas brazuca 2014 official match ball,baseball & softball,2017,4,2017-04,799.9500275,5,,,799.9500275,2399.8500825,,,,799.9500275
adidas brazuca 2014 official match ball,baseball & softball,2017,5,2017-05,2399.8500825,15,799.9500275,,1599.900055,1919.880066,2.0,,,1599.900055
adidas brazuca 2014 official match ball,baseball & softball,2017,6,2017-06,1919.880066,12,2399.8500825,,1706.5600586666667,2239.8600770000003,-0.2,,,1706.5600586666667
adidas brazuca 2014 official match ball,baseball & softball,2017,7,2017-07,2239.8600770000003,14,1919.880066,799.9500275,2186.530075166667,2239.8600770000003,0.1666666666666668,,,1839.88506325
adidas brazuca 2014 official match ball,baseball & softball,2017,8,2017-08,2239.8600770000003,14,2239.8600770000003,2399.8500825,2133.2000733333334,799.9500275,0.0,,,1919.880066
adidas brazuca 2014 official match ball,baseball & softball,2017,9,2017-09,799.9500275,5,2239.8600770000003,1919.880066,1759.8900605,,-0.6428571428571429,,,1733.2250595833332
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,1,2015-01,629.82001486,18,,,629.82001486,699.80002208,,,,629.82001486
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,2,2015-02,699.80002208,20,629.82001486,,664.8100184699999,454.87001414,0.1111111199531432,,,664.8100184699999
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,3,2015-03,454.87001414,13,699.80002208,,594.8300170266666,1119.68001538,-0.3500000003029436,,,594.8300170266666
adidas kids' f5 messi fg soccer cleat,baseball & softball,2015,4,2015-04,1119.68001538,32,454.87001414,629.82001486,758.1166838666667,874.75002664,1.461538418831417,,,726.042516615


product_clean,log_ym,web_views_month,web_unique_ips_month,web_add_to_cart_month,web_avg_url_length,web_morning_views,web_afternoon_views,web_evening_views,conversion_rate,views_3m,views_6m,momentum_score
adidas brazuca 2017 official match ball,2017-09,1371,810,364,113.18599562363238,151,318,632,0.2654996353026987,1371,1371,1.0
adidas brazuca 2017 official match ball,2017-10,815,635,193,112.84171779141104,103,313,304,0.2368098159509202,2186,2186,1.0
adidas brazuca 2017 official match ball,2017-11,828,626,219,113.17391304347828,127,293,282,0.2644927536231884,3014,3014,1.0
adidas brazuca 2017 official match ball,2017-12,831,644,202,112.91696750902528,108,311,314,0.2430806257521059,2474,3845,0.6434330299089727
adidas brazuca 2017 official match ball,2018-01,857,649,230,113.22053675612602,130,300,298,0.2683780630105017,2516,4702,0.5350914504466184
adidas kids' f5 messi fg soccer cleat,2017-09,1434,826,387,113.23849372384936,153,368,636,0.2698744769874477,1434,1434,1.0
adidas kids' f5 messi fg soccer cleat,2017-10,821,630,213,113.11327649208285,136,303,269,0.2594397076735688,2255,2255,1.0
adidas kids' f5 messi fg soccer cleat,2017-11,849,632,225,113.18021201413428,116,332,288,0.2650176678445229,3104,3104,1.0
adidas kids' f5 messi fg soccer cleat,2017-12,836,640,218,113.12918660287082,139,311,272,0.2607655502392344,2506,3940,0.6360406091370558
adidas kids' f5 messi fg soccer cleat,2018-01,878,653,232,113.17084282460137,118,344,301,0.2642369020501139,2563,4818,0.5319634703196348


In [0]:
import pyspark.sql.functions as F

df_sales_clean = df_sales.withColumn(
    "product_key",
    F.lower(F.trim(F.col("product_name")))
)

df_logs_clean = df_logs.withColumn(
    "product_key",
    F.lower(F.trim(F.col("product_clean")))
)


In [0]:
df_master = (
    df_sales_clean.alias("s")
    .join(
        df_logs_clean.alias("l"),
        [
            F.col("s.product_key") == F.col("l.product_key"),
            F.col("s.order_ym") == F.col("l.log_ym")
        ],
        "left"
    )
)


In [0]:
df_gold_final = df_master.select(
    "s.product_name",
    "s.category_name",
    "s.order_year",
    "s.order_month",
    "s.order_ym",
    "s.monthly_sales",
    "s.monthly_quantity",
    "s.lag_1_sales",
    "s.lag_3_sales",
    "s.rmean_3_sales",
    "s.target_next_month_sales",
    "s.mom_growth",
    "s.yoy_growth",
    "s.rmean_6_sales",

    # logs
    "l.web_views_month",
    "l.web_unique_ips_month",
    "l.web_add_to_cart_month",
    "l.web_avg_url_length",
    "l.web_morning_views",
    "l.web_afternoon_views",
    "l.web_evening_views",
    "l.conversion_rate",
    "l.views_3m",
    "l.views_6m",
    "l.momentum_score"
)


In [0]:
df_gold_final.write.format("delta").mode("overwrite").save(
    "abfss://fgold@storagedatafactory2605.dfs.core.windows.net/df_gold_final/"
)

In [0]:
# GOLD → PARQUET EXPORT PATHS
gold_base_path = "abfss://fgold@storagedatafactory2605.dfs.core.windows.net/"

export_paths = {
    "monthly_sales_enriched": gold_base_path + "export/monthly_sales_enriched/",
    "logs_monthly": gold_base_path + "export/logs_monthly/",
    "master_gold_final": gold_base_path + "export/master_gold_final/"
}

# LOAD GOLD TABLES
df_sales = spark.read.format("delta").load(gold_base_path + "monthly_sales_enriched_delta/")
df_logs  = spark.read.format("delta").load(gold_base_path + "logs_monthly_delta/")
df_master = spark.read.format("delta").load(gold_base_path + "df_gold_final/")

# SAVE AS PARQUET
df_sales.write.mode("overwrite").parquet(export_paths["monthly_sales_enriched"])
df_logs.write.mode("overwrite").parquet(export_paths["logs_monthly"])
df_master.write.mode("overwrite").parquet(export_paths["master_gold_final"])

print("🎉 Gold Layer Export Completed")
for name, path in export_paths.items():
    print(f"{name} → {path}")


🎉 Gold Layer Export Completed
monthly_sales_enriched → abfss://fgold@storagedatafactory2605.dfs.core.windows.net/export/monthly_sales_enriched/
logs_monthly → abfss://fgold@storagedatafactory2605.dfs.core.windows.net/export/logs_monthly/
master_gold_final → abfss://fgold@storagedatafactory2605.dfs.core.windows.net/export/master_gold_final/
