In [None]:
# 00_generate_synthetic_data.py
import numpy as np, pandas as pd, os

np.random.seed(42)

# ----- Parameters (35 years; ~1.1M rows -> fine locally) -----
start = pd.Timestamp("1990-01-01")
end   = pd.Timestamp("2024-12-31")
categories = ["Footwear","Apparel","Accessories"]
channels   = ["Online","Store","Outlet"]
base_price = {"Footwear":120.0,"Apparel":60.0,"Accessories":30.0}
base_demand= {"Footwear":200,   "Apparel":300,  "Accessories":400}

# ----- Calendar & global signals -----
cal = pd.date_range(start, end, freq="D")
cdf = pd.DataFrame({"date":cal})
n = len(cdf); t = np.arange(n)
cdf["dow"]    = cdf["date"].dt.dayofweek
cdf["month"]  = cdf["date"].dt.month
cdf["year"]   = cdf["date"].dt.year
cdf["market_index"] = 1.0 + 0.15*np.sin(2*np.pi*t/365.25) + 0.05*np.random.randn(n)
rw = np.cumsum(np.random.normal(0,0.02,n))
cdf["liquidity"] = (rw - rw.min())/(rw.max()-rw.min())  # 0..1
cdf["turnover"]  = 1.0 + 0.10*np.cos(2*np.pi*t/90) + 0.05*np.random.randn(n)

# ----- Expand over category × channel and simulate -----
rows=[]
for cat in categories:
    for ch in channels:
        df=cdf.copy()
        # Base price with simple channel deltas
        bp = base_price[cat]*(1 + (0.05 if ch=="Online" else 0.0) - (0.05 if ch=="Outlet" else 0.0))
        df["base_price"]=bp

        # Liquidity → discount bands (17–24%)
        low  = df["liquidity"]<0.4
        med  = (df["liquidity"]>=0.4)&(df["liquidity"]<0.7)
        high = df["liquidity"]>=0.7
        disc = np.zeros(len(df))
        disc[low]  = np.random.uniform(0.17,0.19, low.sum())
        disc[med]  = np.random.uniform(0.19,0.22, med.sum())
        disc[high] = np.random.uniform(0.22,0.24, high.sum())
        disc += np.random.normal(0,0.003,len(df))
        disc = np.clip(disc,0,0.50)
        df["discount_rate"]=disc
        df["price"] = df["base_price"]*(1-df["discount_rate"])

        # Demand: diminishing returns to discount, + seasonality + market + turnover
        a,b = 1.0, 2.2  # peak ≈ 20–22%
        disc_mult  = 1 + a*df["discount_rate"] - b*(df["discount_rate"]**2)
        weekend    = 1 + 0.08*(df["dow"]>=5)  # Sat/Sun
        peak_month = 1 + 0.10*(df["month"].isin([8,9,11,12]))
        exog_mult  = df["market_index"]*(1 + 0.05*(df["turnover"]-df["turnover"].mean()))
        base_d     = base_demand[cat]*(1.0 + (0.10 if ch=="Online" else -0.05 if ch=="Outlet" else 0.0))
        lam = base_d * disc_mult * weekend * peak_month * exog_mult

        qty = np.maximum(0, np.round(np.random.lognormal(np.log(lam/100.0), 0.25)*100.0)).astype(int)

        df["category"]=cat; df["channel"]=ch
        df["quantity_sold"]=qty
        df["revenue"]=df["price"]*df["quantity_sold"]
        rows.append(df)

full = pd.concat(rows, ignore_index=True)

os.makedirs("data", exist_ok=True)
full.to_csv("data/adidas_daily.csv", index=False)
print("Saved data/adidas_daily.csv with", len(full), "rows")

Saved data/adidas_daily.csv with 115056 rows


In [None]:
# 10_ingest_clean_to_curated_local.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("AdidasIngestCleanLocal").getOrCreate()

# Read generated CSV
df = (spark.read.option("header",True).option("inferSchema",True)
      .csv("data/adidas_daily.csv"))

# Cast & simple filters
clean = (df
  .withColumn("date", col("date").cast("date"))
  .withColumn("quantity_sold", col("quantity_sold").cast("int"))
  .withColumn("revenue", col("revenue").cast("double"))
  .withColumn("discount_rate", col("discount_rate").cast("double"))
  .withColumn("base_price", col("base_price").cast("double"))
  .withColumn("price", col("price").cast("double"))
  .withColumn("year", col("year").cast("int"))
  .filter(col("discount_rate").between(0,0.9))
  .filter(col("price") > 0)
)

# Write LOCAL Parquet (curated)
(clean
 .repartition("year","category")
 .write.mode("overwrite")
 .partitionBy("year","category")
 .parquet("data/curated/"))

print("Wrote curated Parquet to data/curated/")

Wrote curated Parquet to data/curated/


In [None]:
# 20_feature_engineering_to_features_local.py
# Creates ML features (lags, moving averages, calendar) and writes LOCAL Parquet to data/features/

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, avg, month, dayofweek, col

spark = SparkSession.builder.appName("AdidasFeaturesLocal").getOrCreate()

base = spark.read.parquet("data/curated/")

w = Window.partitionBy("category").orderBy("date")

feat = (base
  .withColumn("qty_lag_1",  lag("quantity_sold", 1).over(w))
  .withColumn("qty_lag_7",  lag("quantity_sold", 7).over(w))
  .withColumn("qty_lag_30", lag("quantity_sold",30).over(w))
  .withColumn("qty_ma_7",   avg("quantity_sold").over(w.rowsBetween(-6,0)))
  .withColumn("qty_ma_30",  avg("quantity_sold").over(w.rowsBetween(-29,0)))
  .withColumn("month",      month(col("date")))
  .withColumn("dow",        dayofweek(col("date")))    # 1..7 (Sun..Sat)
  .na.drop(subset=["qty_lag_1","qty_lag_7","qty_lag_30","qty_ma_7","qty_ma_30"])
)

(feat
 .repartition("year","category")
 .write.mode("overwrite")
 .partitionBy("year","category")
 .parquet("data/features/"))

print("Wrote features to data/features/")


Wrote features to data/features/


In [None]:
# 30_train_models_local.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName("AdidasTrainLocal").getOrCreate()
data = spark.read.parquet("data/features/").cache()

ev_rmse = RegressionEvaluator(metricName="rmse", labelCol="label", predictionCol="prediction")
ev_r2   = RegressionEvaluator(metricName="r2",   labelCol="label", predictionCol="prediction")

# -------- A) Discount level (HOW MUCH?) --------
cols_level = ["discount_rate","liquidity","turnover","month","dow","base_price"]
vecA = (VectorAssembler(inputCols=cols_level, outputCol="features")
        .transform(data).select("features", col("quantity_sold").alias("label")))
trainA, testA = vecA.randomSplit([0.8,0.2], seed=42)

en = LinearRegression(regParam=5e-4, elasticNetParam=0.2)
rf = RandomForestRegressor(numTrees=150, maxDepth=12, seed=42)

enm = en.fit(trainA); pred_en = enm.transform(testA)
rfm = rf.fit(trainA); pred_rf = rfm.transform(testA)

print("[LEVEL] ElasticNet  RMSE:", ev_rmse.evaluate(pred_en), " R2:", ev_r2.evaluate(pred_en))
print("[LEVEL] RandomForest RMSE:", ev_rmse.evaluate(pred_rf), " R2:", ev_r2.evaluate(pred_rf))

# Save models locally
enm.write().overwrite().save("models/level_elasticnet")
rfm.write().overwrite().save("models/level_rf")

# -------- B) Timing (WHEN?) --------
cols_time = ["qty_lag_1","qty_lag_7","qty_lag_30","qty_ma_7","qty_ma_30","month","dow"]
vecB = (VectorAssembler(inputCols=cols_time, outputCol="features")
        .transform(data).select("features", col("quantity_sold").alias("label")))
trainB, testB = vecB.randomSplit([0.8,0.2], seed=42)

rf_t = RandomForestRegressor(numTrees=200, maxDepth=14, seed=42)
mod_t = rf_t.fit(trainB)
pred_t = mod_t.transform(testB)

print("[TIMING] RF RMSE:", ev_rmse.evaluate(pred_t), " R2:", ev_r2.evaluate(pred_t))

mod_t.write().overwrite().save("models/timing_rf")

[LEVEL] ElasticNet  RMSE: 111.65269021767224  R2: 0.4417837810153328
[LEVEL] RandomForest RMSE: 102.12478610309496  R2: 0.5329897772007931


Py4JJavaError: An error occurred while calling o3374.fit.
: org.apache.spark.SparkException: Job 54 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1301)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1299)
	at scala.collection.mutable.HashSet$Node.foreach(HashSet.scala:450)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:376)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1299)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:3234)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:85)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$stop$3(DAGScheduler.scala:3120)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1300)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:3120)
	at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2346)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1300)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2346)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2297)
	at org.apache.spark.SparkContext.$anonfun$new$36(SparkContext.scala:704)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:231)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:205)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1937)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:205)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:205)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:184)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2505)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2524)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2549)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1057)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:417)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1056)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:740)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:417)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:739)
	at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:665)
	at org.apache.spark.ml.tree.impl.RandomForest$.runBagged(RandomForest.scala:210)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:304)
	at org.apache.spark.ml.regression.RandomForestRegressor.$anonfun$train$1(RandomForestRegressor.scala:159)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:226)
	at scala.util.Try$.apply(Try.scala:217)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:226)
	at org.apache.spark.ml.regression.RandomForestRegressor.train(RandomForestRegressor.scala:137)
	at org.apache.spark.ml.regression.RandomForestRegressor.train(RandomForestRegressor.scala:46)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:79)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)


In [None]:
# 40_visualize_dashboards.py
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as ssum

spark = SparkSession.builder.appName("AdidasDashboards").getOrCreate()

# ---- A) Liquidity-based discount policy (visual guide) ----
fig, ax = plt.subplots(figsize=(9,4.8))
ax.add_patch(patches.Rectangle((0.15,17),0.25,2, color='#8da0cb', alpha=0.45, label='Low liquidity: 17–19%'))
ax.add_patch(patches.Rectangle((0.40,19),0.30,3, color='#66c2a5', alpha=0.45, label='Medium liquidity: 19–22%'))
ax.add_patch(patches.Rectangle((0.70,22),0.30,2, color='#fc8d62', alpha=0.45, label='High liquidity: 22–24%'))
ax.axhline(17, color='gray', ls='--'); ax.axhline(24, color='gray', ls='--')
ax.text(0.82, 24.2, 'Overall optimal range', color='gray')
ax.set_xlim(0,1); ax.set_ylim(16,25)
ax.set_xlabel('Liquidity (0 = low, 1 = high)'); ax.set_ylabel('Recommended Discount (%)')
ax.legend(loc='upper left'); ax.grid(alpha=0.2)
plt.tight_layout(); plt.savefig("outputs/liquidity_discount_strategy.png", dpi=160); plt.close()

# ---- B) Actual (last 10) vs Forecast (next 20) ----
daily = (spark.read.parquet("hdfs:///projects/adidas/curated/daily/")
         .groupBy("date").agg(ssum("quantity_sold").alias("qty"))
         .orderBy("date"))
pdf = daily.toPandas()

# Take last 30 actuals → linear trend → forecast next 20
recent = pdf.tail(30).reset_index(drop=True)
import numpy as np
x = np.arange(len(recent)); coef = np.polyfit(x, recent["qty"].values, 1)
trend = np.poly1d(coef); horizon = 20
future_x = np.arange(len(recent), len(recent)+horizon)
forecast_vals = trend(future_x)*(1+np.random.normal(0,0.02,horizon))
forecast_vals = np.maximum(0, forecast_vals)

plt.figure(figsize=(9,4.8))
plt.plot(np.arange(1,11), recent["qty"].values[-10:], label="Actual Demand", color="#1f77b4")
plt.plot(np.arange(11,31), forecast_vals, label="Forecasted Demand", color="#ff7f0e")
plt.xticks(range(1,31,2)); plt.xlabel("Day"); plt.ylabel("Demand (units)")
plt.legend(); plt.grid(alpha=0.2); plt.tight_layout()
plt.savefig("outputs/forecast_dashboard.png", dpi=160); plt.close()

print("Saved dashboards to outputs/*.png")

ConnectionRefusedError: [Errno 111] Connection refused