In [0]:
dbutils.fs.ls("/mnt/raw")


[FileInfo(path='dbfs:/mnt/raw/bronze/', name='bronze/', size=0, modificationTime=1763725478000),
 FileInfo(path='dbfs:/mnt/raw/demand_data/', name='demand_data/', size=0, modificationTime=1763708940000),
 FileInfo(path='dbfs:/mnt/raw/external_factors/', name='external_factors/', size=0, modificationTime=1763715906000),
 FileInfo(path='dbfs:/mnt/raw/feature_engineering/', name='feature_engineering/', size=0, modificationTime=1763708878000),
 FileInfo(path='dbfs:/mnt/raw/silver/', name='silver/', size=0, modificationTime=1763731722000)]

In [0]:
# ====================================================
# 1. IMPORTS + PATHS
# ====================================================
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import *

silver_path = "/mnt/raw/silver/master"
gold_path = "/mnt/raw/gold/demand_forecasting_final"

dbutils.fs.mkdirs("/mnt/raw/gold")

# ====================================================
# 2. LOAD SILVER DATA
# ====================================================
df = spark.read.format("delta").load(silver_path)

print("Silver data loaded:", df.count(), "rows")

# ====================================================
# 3. SORT DATA FOR WINDOW FUNCTIONS
# ====================================================
w = Window.partitionBy("Region", "Service").orderBy("Date")

# ====================================================
# 4. LAG FEATURES (Existing rows only)
# ====================================================
df = (
    df.withColumn("Lag_1", lag("Daily_Usage_Units", 1).over(w))
      .withColumn("Lag_7", lag("Daily_Usage_Units", 7).over(w))
      .withColumn("Lag_14", lag("Daily_Usage_Units", 14).over(w))
      .withColumn("Lag_30", lag("Daily_Usage_Units", 30).over(w))
)

# ====================================================
# 5. MOVING AVERAGES
# ====================================================
df = (
    df.withColumn("MA_7", avg("Daily_Usage_Units").over(w.rowsBetween(-7, 0)))
      .withColumn("MA_14", avg("Daily_Usage_Units").over(w.rowsBetween(-14, 0)))
      .withColumn("MA_30", avg("Daily_Usage_Units").over(w.rowsBetween(-30, 0)))
)

# ====================================================
# 6. WEEK-OVER-WEEK % CHANGE
# ====================================================
df = df.withColumn(
    "WoW_Change",
    (col("Daily_Usage_Units") - col("Lag_7")) / col("Lag_7")
)

# ====================================================
# 7. SEASONALITY INDICATORS
# ====================================================
df = (
    df.withColumn("DayOfWeek", dayofweek("Date"))
      .withColumn("WeekOfYear", weekofyear("Date"))
      .withColumn("Month", month("Date"))
      .withColumn("Quarter", quarter("Date"))
)

# ====================================================
# 8. STRING INDEX ENCODING (Region / Service)
# ====================================================
region_indexer = StringIndexer(inputCol="Region", outputCol="Region_Index")
service_indexer = StringIndexer(inputCol="Service", outputCol="Service_Index")

df = region_indexer.fit(df).transform(df)
df = service_indexer.fit(df).transform(df)

# ====================================================
# 9. FINAL CLEANING (REMOVE rows where NaN caused by lag)
# ====================================================
df_final = df.dropna(subset=["Daily_Usage_Units"])


print("Final Gold dataset rows:", df_final.count())

# ====================================================
# 10. WRITE GOLD TABLE
# ====================================================
df_final.write.format("delta") \
    .mode("overwrite") \
    .save(gold_path)

print("GOLD LAYER CREATED SUCCESSFULLY!")

# ====================================================
# 11. TEST READ
# ====================================================
display(spark.read.format("delta").load(gold_path).limit(5000))


Silver data loaded: 10962 rows
🏃 View run mercurial-shrimp-782 at: https://adb-2252190068396335.15.azuredatabricks.net/ml/experiments/2532680752963574/runs/857b187b280e4e119d681587441dfe68
🧪 View experiment at: https://adb-2252190068396335.15.azuredatabricks.net/ml/experiments/2532680752963574
🏃 View run thundering-shad-907 at: https://adb-2252190068396335.15.azuredatabricks.net/ml/experiments/2532680752963574/runs/c24211a3f46d45b9b9ad96d5476b15c4
🧪 View experiment at: https://adb-2252190068396335.15.azuredatabricks.net/ml/experiments/2532680752963574
Final Gold dataset rows: 10962
GOLD LAYER CREATED SUCCESSFULLY!


Date,Region,Service,Daily_Usage_Units,Peak_Usage_Units,VM_Count,Storage_TB,Season,Econ_Index,Downtime_Min,Cloud_Demand_Index,GDP_Growth,Inflation,Competitor_Price_Index,Usage_Lag_1,Usage_Lag_7,Week_Over_Week_Growth,Seasonality_Factor,Lag_1,Lag_7,Lag_14,Lag_30,MA_7,MA_14,MA_30,WoW_Change,DayOfWeek,WeekOfYear,Month,Quarter,Region_Index,Service_Index
2020-01-01,CENTRAL INDIA,COMPUTE,61531.0,74826,9029,0,Winter,91,0,67,3.727301838650201,4.200878600524534,120,0,0,0.0,1.0837165987040462,,,,,61531.0,61531.0,61531.0,,4,1,1,1,0.0,0.0
2020-01-02,CENTRAL INDIA,COMPUTE,93746.0,112998,5086,0,Winter,95,3,77,4.657382372114083,3.6821822305514,129,61531,0,0.0,1.1395085227337844,61531.0,,,,77638.5,77638.5,77638.5,,5,1,1,1,0.0,0.0
2020-01-03,CENTRAL INDIA,COMPUTE,68070.0,80850,12145,0,Winter,85,1,89,2.910710224528352,5.363531613888448,93,93746,0,0.0,1.1353953811009108,93746.0,,,,74449.0,74449.0,74449.0,,6,1,1,1,0.0,0.0
2020-01-04,CENTRAL INDIA,COMPUTE,110669.0,123190,4297,0,Winter,95,0,93,2.692698703750492,3.798314907624418,115,68070,0,0.0,1.1903332865183245,68070.0,,,,83504.0,83504.0,83504.0,,7,1,1,1,0.0,0.0
2020-01-05,CENTRAL INDIA,COMPUTE,162718.0,185163,7741,0,Winter,108,0,88,4.488592563558635,7.516976088529391,98,110669,0,0.0,1.1931447410319511,110669.0,,,,99346.8,99346.8,99346.8,,1,1,1,1,0.0,0.0
2020-01-06,CENTRAL INDIA,COMPUTE,102619.0,116231,5267,0,Winter,101,2,84,5.022146010034563,3.0655951605785035,114,162718,0,0.0,1.080941472448527,162718.0,,,,99892.16666666669,99892.16666666669,99892.16666666669,,2,2,1,1,0.0,0.0
2020-01-07,CENTRAL INDIA,COMPUTE,155858.0,192130,11271,0,Winter,101,0,96,3.852970938517362,4.290569935528852,124,102619,0,0.0,1.1107788045470044,102619.0,,,,107887.28571428572,107887.28571428572,107887.28571428572,,3,2,1,1,0.0,0.0
2020-01-08,CENTRAL INDIA,COMPUTE,68632.0,75442,3439,0,Winter,85,0,101,2.44401672312273,4.970925480972137,104,155858,61531,11.540524288569989,1.172468895555551,155858.0,61531.0,,,102980.375,102980.375,102980.375,0.1154052428856999,4,2,1,1,0.0,0.0
2020-01-09,CENTRAL INDIA,COMPUTE,88279.0,95690,11152,0,Winter,91,0,82,5.648551957397496,5.276924836157606,107,68632,93746,-5.831715486527425,1.0501843938978788,68632.0,93746.0,,,106323.875,101346.88888888888,101346.88888888888,-0.0583171548652742,5,2,1,1,0.0,0.0
2020-01-10,CENTRAL INDIA,COMPUTE,91340.0,105483,12001,0,Winter,98,1,115,5.248756131504405,6.6494943110654745,121,88279,68070,34.18539738504481,1.162053106776431,88279.0,68070.0,,,106023.125,100346.2,100346.2,0.3418539738504481,6,2,1,1,0.0,0.0


In [0]:
gold_df = spark.read.format("delta").load(gold_path)
gold_df.groupBy("Service").count().show()

+-------+-----+
|Service|count|
+-------+-----+
|STORAGE| 5481|
|COMPUTE| 5481|
+-------+-----+



In [0]:
%sql
CREATE TABLE IF NOT EXISTS default.gold_final
USING DELTA
LOCATION '/mnt/raw/gold/demand_forecasting_final';


In [0]:
spark.read.table("default.gold_final").columns


['date',
 'region',
 'service',
 'daily_usage_units',
 'peak_usage_units',
 'vm_count',
 'storage_tb',
 'season',
 'econ_index',
 'downtime_min',
 'cloud_demand_index',
 'gdp_growth',
 'inflation',
 'competitor_price_index',
 'usage_lag_1',
 'usage_lag_7',
 'week_over_week_growth',
 'seasonality_factor']