In [7]:
# Sales Forecasting (Univariate Tabular-TS) - Spark 2.4.4
# Pipeline: 
# load -> log1p -> features(lag/MA/seasonality) -> TS split -> Model Search (GBT vs RF, small TS-CV) 
# -> Train Final -> Evaluate (log & original scale) -> Plots

# Notes: gw pake limiter biar friendly di VM 6GB, out of memory error kalo kg 
# (shuffle kecil, grid kecil, toPandas hanya untuk plot)

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import DoubleType
import math

# --- non-interactive matplotlib (biar bisa save gambar di VM/headless)
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt

In [2]:
# inisialisasi, spark session n configs
spark = (SparkSession.builder
         .appName("DM_GBT_RF_TabularTS_2.4.4")
         .config("spark.sql.shuffle.partitions", "8")  # kecilkan shuffle untuk VM 6GB
         .getOrCreate())

In [None]:
# ngeload data, di path sama jadi tinggal load nama aja. 
# sesuain dir klo beda e.g. "C:\Users\user\Desktop\AOL_Data Mining\attempt1\data.csv"
df0 = (spark.read.format("csv")
       .option("header", "true")
       .option("inferSchema", "true")
       .load("sales_data.csv"))

# normalisasiin nama kolom jdi lowercase
df0 = df0.select([F.col(c).alias(c.lower()) for c in df0.columns])

required = ["week_start", "total_sales"]
missing = [c for c in required if c not in df0.columns]
if missing:
    raise ValueError("Kolom wajib tidak ditemukan: %s" % ", ".join(missing))

# parse tanggal, agregasi per minggu (jaga2 kalo ad duplikat baris per mingguny juga)
df = (df0
      .withColumn("week_start_dt", F.to_date(F.col("week_start")))
      .withColumn("total_sales", F.col("total_sales").cast(DoubleType()))
      .withColumn("total_sales", F.when(F.col("total_sales") < 0, 0.0).otherwise(F.col("total_sales")))
      .groupBy("week_start_dt").agg(F.sum("total_sales").alias("total_sales"))
      .orderBy("week_start_dt"))

In [None]:
# target transform (log)
# log1p ngejaga nilai 0, jadi label dasar.
df = df.withColumn("y_log", F.log1p(F.col("total_sales")))

# gw pake winsorize (cap) di log scale biar bs nekenin effect si outlier waktu training
# ambil perkiraan quantile 0.99 (robust, cepat)
cap_q = df.select("y_log").na.drop().stat.approxQuantile("y_log", [0.99], 0.01)[0]
df = df.withColumn("y_log_train", F.when(F.col("y_log") > cap_q, cap_q).otherwise(F.col("y_log")))

In [None]:
# feature engineering, e.g :
# - lags pendek & musiman
# - moving averages (smoothing)
# - seasonality encoding via sin/cos (mingguan ~ 52)
# note lagi: semua fitur berbasis y_log (bukan original) biar stabil
w = Window.orderBy("week_start_dt")

# Lags: tambahkan beberapa yg dekat + musiman
lags = [1, 2, 3, 4, 8, 12, 26, 52]
for k in lags:
    df = df.withColumn(f"lag_{k}", F.lag("y_log", k).over(w))

# Moving average (window: baris, bukan kalender)
df = df.withColumn("ma_4",  F.avg("y_log").over(Window.orderBy("week_start_dt").rowsBetween(-3, 0)))
df = df.withColumn("ma_8",  F.avg("y_log").over(Window.orderBy("week_start_dt").rowsBetween(-7, 0)))
df = df.withColumn("ma_12", F.avg("y_log").over(Window.orderBy("week_start_dt").rowsBetween(-11, 0)))

# differencing (menangkap perubahan)
df = df.withColumn("diff_1", F.col("y_log") - F.col("lag_1"))
df = df.withColumn("diff_52", F.col("y_log") - F.col("lag_52"))

# seasonality features
df = df.withColumn("woy", F.weekofyear("week_start_dt").cast("double"))
angle = F.col("woy") * (2.0 * math.pi / 52.0)
df = df.withColumn("sin52", F.sin(angle)).withColumn("cos52", F.cos(angle))

# drop baris awal yg g punya lag/MA
feature_cols = [f"lag_{k}" for k in lags] + ["ma_4", "ma_8", "ma_12", "diff_1", "diff_52", "sin52", "cos52"]
df_feat = df.dropna(subset=feature_cols + ["y_log", "y_log_train"])

In [None]:
# TIME-BASED SPLIT 80:20
# Train = 80% pertama waktu; Test = 20% terakhir
w_all = Window.orderBy("week_start_dt")
df_feat = df_feat.withColumn("rn", F.row_number().over(w_all))
n = df_feat.count()
train_n = int(n * 0.8)
train = df_feat.filter(F.col("rn") <= train_n).cache()
test  = df_feat.filter(F.col("rn") >  train_n).cache()

In [None]:
# SMALL TIME-SERIES cross validator (grid ringan)
# gw bikin manual, cv bawaan spark anomaly, ngelakuin shuffle k-fold (ga pas di case ini utk ts)
# bikin 3 fold berurutan: train<=cut1 validate=cut1->cut2, dst.
def make_ts_folds(df_in, folds=3):
    """bagi data training jadi beberapa blok waktu, untuk validasi berurutan."""
    total = df_in.count()
    idxs = [int(total*(i+1)/(folds+1)) for i in range(folds)]  # ex: 25%, 50%, 75% dari train
    cut_dates = [df_in.orderBy("week_start_dt").limit(k).agg(F.max("week_start_dt").alias("d")).collect()[0]["d"] for k in idxs]
    # Bentuk pasangan (train<=prev_cut, valid di (prev_cut, cut])
    prev = None
    splits = []
    for d in cut_dates:
        if prev is None:
            tr = df_in.filter(F.col("week_start_dt") <= d)
        else:
            tr = df_in.filter(F.col("week_start_dt") <= prev)
        va = df_in.filter((F.col("week_start_dt") > prev) if prev is not None else (F.col("week_start_dt") > F.lit("0001-01-01"))) \
                  .filter(F.col("week_start_dt") <= d)
        splits.append((tr.cache(), va.cache()))
        prev = d
    return splits

ts_splits = make_ts_folds(train, folds=3)

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Dua kandidat model: GBT & RF
def make_gbt(**kw):
    return GBTRegressor(featuresCol="features", labelCol="y_log_train",
                        maxIter=kw.get("maxIter",150),
                        maxDepth=kw.get("maxDepth",8),
                        stepSize=kw.get("stepSize",0.1),
                        subsamplingRate=kw.get("subsamplingRate",0.8),
                        seed=42)

def make_rf(**kw):
    return RandomForestRegressor(featuresCol="features", labelCol="y_log_train",
                                 numTrees=kw.get("numTrees",300),
                                 maxDepth=kw.get("maxDepth",12),
                                 maxBins=kw.get("maxBins",64),
                                 subsamplingRate=kw.get("subsamplingRate",0.8),
                                 seed=42)

# Grid kecil (biar hemat, gw di vm)
gbt_grid = [
    {"model":"gbt", "maxIter":100, "maxDepth":6, "stepSize":0.10},
    {"model":"gbt", "maxIter":150, "maxDepth":8, "stepSize":0.05},
]
rf_grid = [
    {"model":"rf", "numTrees":300, "maxDepth":10},
    {"model":"rf", "numTrees":500, "maxDepth":12},
]
param_grid = gbt_grid + rf_grid

def rmse_log(df_pred):
    ev = RegressionEvaluator(labelCol="y_log", predictionCol="prediction", metricName="rmse")
    return ev.evaluate(df_pred)

best_cfg, best_score = None, float("inf")
for cfg in param_grid:
    # build model
    if cfg["model"] == "gbt":
        reg = make_gbt(**cfg)
    else:
        reg = make_rf(**cfg)
    pipe = Pipeline(stages=[assembler, reg])

    # avg RMSE(log) across TS folds
    rmses = []
    for tr, va in ts_splits:
        m = pipe.fit(tr)
        pred_va = m.transform(va)
        score = rmse_log(pred_va)  # di log scale (stabil vs outlier)
        rmses.append(score)
    avg_rmse = sum(rmses) / len(rmses)
    print("CFG %s -> avg RMSE(log)=%.4f" % (cfg, avg_rmse))

    if avg_rmse < best_score:
        best_score, best_cfg = avg_rmse, cfg

print("\n>> best config (TimeSeries CrossValid, RMSE log):", best_cfg, "score=", round(best_score,4))

In [None]:
# final training dataset (pake best config)
reg_final = make_gbt(**best_cfg) if best_cfg["model"]=="gbt" else make_rf(**best_cfg)
pipe_final = Pipeline(stages=[assembler, reg_final])
model = pipe_final.fit(train)

In [None]:
# prediksi + invers transform
pred = (model.transform(test)
        .withColumn("pred_log", F.col("prediction"))
        .withColumn("y_pred", F.expm1(F.col("pred_log")))   # balik ke skala asli
        .withColumn("y_true", F.col("total_sales")))

In [None]:
# evaluasi, original & log scale

# a) Original scale
pred_eval = pred.select("y_true","y_pred").dropna()

rmse = (pred_eval
        .withColumn("se", (F.col("y_true")-F.col("y_pred"))**2)
        .agg(F.sqrt(F.avg("se")).alias("rmse"))
        .collect()[0]["rmse"])

mae = (pred_eval
       .withColumn("ae", F.abs(F.col("y_true")-F.col("y_pred")))
       .agg(F.avg("ae").alias("mae"))
       .collect()[0]["mae"])

# MAPE ny terfilter (ignore minggu dengan true terlalu kecil, biar ga eksplosif)
mape_floor = 1000.0
mape = (pred_eval
        .withColumn("ape", F.when(F.col("y_true") >= mape_floor,
                                  F.abs((F.col("y_true")-F.col("y_pred"))/F.col("y_true"))))
        .agg(F.avg("ape").alias("mape"))
        .collect()[0]["mape"])

# sMAPE (more stabil)
smape = (pred_eval
         .withColumn("sm", (F.abs(F.col("y_pred")-F.col("y_true")) /
                            ((F.abs(F.col("y_true"))+F.abs(F.col("y_pred")))/2.0)))
         .agg(F.avg("sm").alias("smape")).collect()[0]["smape"])

mean_true = pred_eval.agg(F.avg("y_true").alias("m")).collect()[0]["m"]
ss_tot = pred_eval.withColumn("d", (F.col("y_true")-mean_true)**2).agg(F.sum("d")).collect()[0][0]
ss_res = pred_eval.withColumn("r", (F.col("y_true")-F.col("y_pred"))**2).agg(F.sum("r")).collect()[0][0]
r2 = float(1.0 - (ss_res/ss_tot)) if (ss_tot is not None and ss_tot != 0) else None

print("\n=== TEST SET METRICS (ORIGINAL SCALE) ===")
print("RMSE: %.4f | MAE: %.4f | MAPE(>=%.0f): %s | sMAPE: %s | R^2: %s" %
      (rmse, mae, mape_floor,
       "NA" if mape is None else "{:.2f}%".format(mape*100.0),
       "NA" if smape is None else "{:.2f}%".format(smape*100.0),
       "NA" if r2 is None else "{:.4f}".format(r2)))

# b) Log scale (fair untuk data skewed)
pred_log_eval = (model.transform(test)
                 .select(F.col("y_log").alias("y_true_log"),
                         F.col("prediction").alias("y_pred_log"))
                 .dropna())

ev_rmse_log = RegressionEvaluator(labelCol="y_true_log", predictionCol="y_pred_log", metricName="rmse")
ev_mae_log  = RegressionEvaluator(labelCol="y_true_log", predictionCol="y_pred_log", metricName="mae")
rmse_log = ev_rmse_log.evaluate(pred_log_eval)
mae_log  = ev_mae_log.evaluate(pred_log_eval)

# R2 log (manual)
mean_log = pred_log_eval.agg(F.avg("y_true_log").alias("m")).collect()[0]["m"]
ss_tot_log = pred_log_eval.withColumn("d", (F.col("y_true_log")-mean_log)**2).agg(F.sum("d")).collect()[0][0]
ss_res_log = pred_log_eval.withColumn("r", (F.col("y_true_log")-F.col("y_pred_log"))**2).agg(F.sum("r")).collect()[0][0]
r2_log = float(1.0 - (ss_res_log/ss_tot_log)) if (ss_tot_log is not None and ss_tot_log != 0) else None

print("\n=== TEST SET METRICS (LOG SCALE) ===")
print("RMSE_log: %.4f | MAE_log: %.4f | R^2_log: %s" %
      (rmse_log, mae_log, "NA" if r2_log is None else "{:.4f}".format(r2_log)))

In [None]:
# display important feature/insight biar bs diambil (top 12)
last_stage = model.stages[-1]
importances = last_stage.featureImportances.toArray()
fi = sorted(list(zip(feature_cols, importances)), key=lambda x: x[1], reverse=True)
print("\nTop feature importances:")
for name, score in fi[:12]:
    print("{:<10s} : {:.4f}".format(name, score))

In [None]:
# plot gw save di dir home cloudera, biar gambar bs gw copy n save di real laptop
# distribusi & boxplot (original vs log)
pdf_all = df.select("total_sales","y_log").dropna().toPandas()

plt.figure()
plt.hist(pdf_all["total_sales"], bins=50)
plt.title("Distribution of total_sales (original)"); plt.xlabel("total_sales"); plt.ylabel("count")
plt.tight_layout(); plt.savefig("dist_original.png", dpi=150); plt.close()

plt.figure()
plt.boxplot(pdf_all["total_sales"].values, vert=True)
plt.title("Boxplot of total_sales (original)")
plt.tight_layout(); plt.savefig("box_original.png", dpi=150); plt.close()

plt.figure()
plt.hist(pdf_all["y_log"], bins=50)
plt.title("Distribution of log1p(total_sales)"); plt.xlabel("log1p(total_sales)"); plt.ylabel("count")
plt.tight_layout(); plt.savefig("dist_log.png", dpi=150); plt.close()

plt.figure()
plt.boxplot(pdf_all["y_log"].values, vert=True)
plt.title("Boxplot of log1p(total_sales)")
plt.tight_layout(); plt.savefig("box_log.png", dpi=150); plt.close()

# actual vs predicted (line) + scatter (test set)
pdf_test = (pred.select("week_start_dt","y_true","y_pred").orderBy("week_start_dt").toPandas())
plt.figure()
plt.plot(pdf_test["week_start_dt"], pdf_test["y_true"], label="Actual")
plt.plot(pdf_test["week_start_dt"], pdf_test["y_pred"], label="Predicted")
plt.title("Actual vs Predicted (Test, original scale)")
plt.xlabel("week_start"); plt.ylabel("total_sales"); plt.legend(); plt.xticks(rotation=45)
plt.tight_layout(); plt.savefig("actual_vs_pred.png", dpi=150); plt.close()

plt.figure()
plt.scatter(pdf_test["y_true"], pdf_test["y_pred"])
maxv = max(pdf_test["y_true"].max(), pdf_test["y_pred"].max())
plt.plot([0, maxv],[0, maxv])
plt.title("Actual vs Predicted (Scatter, Test)")
plt.xlabel("Actual total_sales"); plt.ylabel("Predicted total_sales")
plt.tight_layout(); plt.savefig("scatter_ap.png", dpi=150); plt.close()

print("\nSaved plots: dist_original.png, box_original.png, dist_log.png, box_log.png, actual_vs_pred.png, scatter_ap.png")

In [None]:
# head of predictions, biar bs sanity check di log & original
pred.select("week_start_dt","y_true","y_pred","pred_log").orderBy("week_start_dt").show(12, truncate=False)

# (optional yg ini) unpersist
train.unpersist(); test.unpersist()