In [0]:
%pip install lightgbm

In [0]:
storage_account = "stgm5forecastdev"

account_key = dbutils.secrets.get(scope="m5-scope", key="adls-key")

spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
    account_key
)


raw_path = f"abfss://raw@{storage_account}.dfs.core.windows.net"
curated_path = f"abfss://curated@{storage_account}.dfs.core.windows.net"

display(dbutils.fs.ls(raw_path))


#ETL steps

## Smoke test ETL step

In [0]:
from pyspark.sql.functions import col

file_path = f"{raw_path}/sales_train_evaluation.csv"

df = (spark.read
      .option("header", "true")
      .csv(file_path))

# Keep id columns + first 7 days as integers for a smoke test
id_cols = ["id", "item_id", "dept_id", "store_id", "cat_id", "state_id"]
value_cols = [c for c in df.columns if c.startswith("d_")][:7]

df_small = df.select(
    *id_cols,
    *[col(c).cast("int").alias(c) for c in value_cols]
)

output_path = f"{curated_path}/sales_small/"

(df_small
 .write
 .mode("overwrite")
 .parquet(output_path))

display(dbutils.fs.ls(output_path))


##Creating a long-format table for CA1  

In [0]:
from pyspark.sql.functions import expr, col

storage_account = "stgm5forecastdev"
raw_path = f"abfss://raw@{storage_account}.dfs.core.windows.net"
curated_path = f"abfss://curated@{storage_account}.dfs.core.windows.net"

# 1) Read full sales_train_evaluation
df = (spark.read
      .option("header", "true")
      .csv(f"{raw_path}/sales_train_evaluation.csv"))

# 2) Filter to a single store to keep it light, e.g. CA_1
df_store = df.filter(col("store_id") == "CA_1")

# 3) Unpivot d_1...d_1913 into (d, sales)
value_cols = [c for c in df_store.columns if c.startswith("d_")]

exprs = ", ".join([f"'{c}', {c}" for c in value_cols])

df_long = (df_store.select("id", "item_id", "dept_id", "store_id", "cat_id", "state_id", *value_cols)
           .selectExpr(
               "id", "item_id", "dept_id", "store_id", "cat_id", "state_id",
               f"stack({len(value_cols)}, {exprs}) as (d, sales)"
           ))

# 4) Cast sales to int; map d_# to a dummy date index (we'll map to real dates later if needed)
df_long = df_long.withColumn("sales", col("sales").cast("int"))

# 5) Write to curated as Parquet
output_path = f"{curated_path}/m5_daily_ca1/"
(df_long
 .write
 .mode("overwrite")
 .partitionBy("store_id", "item_id")
 .parquet(output_path))

display(dbutils.fs.ls(output_path))


Sanity-checking the long-table format for CA1!

In [0]:
from pyspark.sql.functions import regexp_replace, col

storage_account = "stgm5forecastdev"
curated_path = f"abfss://curated@{storage_account}.dfs.core.windows.net"

daily_path = f"{curated_path}/m5_daily_ca1/"

df_daily = spark.read.parquet(daily_path)

df_daily.printSchema()
df_daily.show(5)
df_daily.count()


In [0]:
df_daily = df_daily.withColumn(
    "day_num",
    regexp_replace("d", "d_", "").cast("int")
)

df_daily.select("id", "store_id", "item_id", "d", "day_num", "sales").show(5)


In [0]:
from pyspark.sql.functions import col

item_example = df_daily.select("item_id").first()["item_id"]
print("Using item:", item_example)

df_item = (df_daily
           .filter(col("item_id") == item_example)
           .orderBy("day_num"))

pdf = df_item.select("day_num", "sales").toPandas()
pdf.head()

In [0]:
import numpy as np
import lightgbm as lgb

def smape(y_true, y_pred):
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    return 100 / len(y_true) * np.sum(
        2 * np.abs(y_pred - y_true) / (np.abs(y_true) + np.abs(y_pred) + 1e-8)
    )

pdf = pdf.dropna().sort_values("day_num")

horizon = 28  # last 28 days as validation
train = pdf.iloc[:-horizon]
val = pdf.iloc[-horizon:]

X_train = train[["day_num"]]
y_train = train["sales"]
X_val = val[["day_num"]]
y_val = val["sales"]

train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)

params = {
    "objective": "regression",
    "metric": "mae",
    "verbosity": -1,
}

model = lgb.train(
    params,
    train_data,
    num_boost_round=200,
    valid_sets=[val_data],
    callbacks=[lgb.early_stopping(stopping_rounds=20, verbose=False)],
)

y_pred = model.predict(X_val, num_iteration=model.best_iteration)
print("SMAPE:", smape(y_val, y_pred))

