In [None]:
import pyspark.sql.functions as F
from pyspark.sql import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
import pandas as pd

from feature_engineering.engineering import engineerFeatures
from utils.utils import gapfilling, serialize
from modelling.model_utils import splitData, prepare_data, train_model, MLFlow_train_model
import mlflow

In [None]:
spark = SparkSession\
            .builder\
            .appName("test-app")\
            .getOrCreate()

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [None]:
# [TODO]: move config dictionnaries to a json config file
features_config = {

    "discount_rate":{},
    "promoted_percent":{"promoted_hierarchy": "sku", "group_key":"subclass"},
    "week_of_year":{},
    "promo_category":{},
}

model_config={
    "model": "xgboost",
    "params": {},
    "hierarchy_columns": ["sku", "subclass", "store_id", "region_id"],
    "target": "units",
    "train_startDate": "2018-01-01",
    "train_endDate": "2020-01-01",
    "inference_startDate": "2019-11-01",
    "inference_endDate": "2020-12-21",
}

path = "data/"

In [None]:
# Union transcational data

schema = StructType(
    [StructField("customer_id", StringType(), True),
    StructField("week_index", StringType(), True),
    StructField("sku", StringType(), True),
    StructField("promo_cat", StringType(), True),
    StructField("discount", FloatType(), True),
    StructField("store_id", StringType(), True)],
)

transactions = spark.read.csv(
    "data/transactions_*.csv", 
    schema=schema,
    header=False
)

In [None]:
# Reading data
customers = spark.read.csv(
    "data/customers.csv", 
    header="true", 
    inferSchema="true")

calendar = spark.read.csv(
    "data/calendar.csv", 
    header="true", 
    inferSchema="true")

products = spark.read.csv(
    "data/products.csv", 
    header="true", 
    inferSchema="true")

stores = spark.read.csv(
    "data/stores.csv", 
    header="true", 
    inferSchema="true")

In [None]:
# Adding location hierarchy for customers

customers = customers.select(
    F.col("customer_id").cast("string"), 
    F.col("store_pref").cast("string").alias("store_id")
)

stores = stores.select(
    F.col("store_id").cast("string"), 
    F.col("store_region").cast("string").alias("region_id")
).dropDuplicates()

In [None]:
products = products.select(
    F.col("prod_id").cast("string").alias("sku"),
    F.col("prod_subclass").cast("string").alias("subclass"),
    F.col("prod_class").cast("string").alias("class"),
    F.col("prod_dept").cast("string").alias("dept"),
    F.col("prod_base_price").cast("float").alias("base_price"),
).dropDuplicates()

products.show(5)

In [None]:
# daily calendar -> weekly calendar
weekly_calendar = calendar.where(
    F.col("day_of_week")=="0"
).select(
    F.to_date(F.col("calendar_day"),"MM-dd-yy").alias("date")
).distinct(
).sort(
    F.col("date").asc()
).withColumn(
    "week_index", F.monotonically_increasing_id()
).select(
    F.col("week_index").cast("string"),
    F.col("date")
)
weekly_calendar.show(5)

In [None]:
# add hierarchies
demand_data = transactions.groupby(
    "sku", "store_id", "week_index"
).agg(
    F.count("*").alias("units"),
    F.first("promo_cat").alias("promo_cat"),
    F.max("discount").alias("discount"),
).join(
    weekly_calendar, on=["week_index"], how="inner"
).drop("week_index")

serialize(spark, demand_data, path + "demand_data.parquet").show(5)


In [None]:
demand_data = spark.read.parquet(path + "demand_data.parquet", header="true", inferSchema="true")

sales_filled_data = gapfilling(demand_data, date_column="date", product_column="sku", location_column="store_id")

serialize(spark, sales_filled_data, path + "sales_filled_data.parquet").show(5)


In [None]:
# Adding product and location hierarchies to demand data
demand_data = spark.read.parquet(path + "sales_filled_data.parquet", header="true", inferSchema="true")

sales_data = demand_data.join(
    stores, on="store_id", how="inner"
).join(
    products, on="sku", how="inner"
)

serialize(spark, sales_data, path + "sales_data.parquet").show(5)

## Feature Engineering

In [None]:
sales_data = spark.read.parquet(path + "sales_data.parquet", header="true", inferSchema="true")

engineered_data = engineerFeatures(
    data=sales_data,
    config=features_config
)

serialize(spark, engineered_data, path + "engineered_data.parquet").show(5)

## Modelling

In [None]:
engineered_data = spark.read.parquet(path + "engineered_data.parquet", header="true", inferSchema="true")
engineered_data = engineered_data.where(F.col("sku")<400) # Just to reduce the size of the data for less memory consumption

train_data, test_data = splitData(
        data=engineered_data,
        model_config=model_config,
        features_config=features_config
    )

serialize(spark, train_data, path + "train_data.parquet").show(5)
serialize(spark, test_data, path + "test_data.parquet").show(5)

In [None]:
train_data = pd.read_parquet(f"data/train_data.parquet", "pyarrow")
test_data = pd.read_parquet(f"data/test_data.parquet", "pyarrow")

# preparing the training data
X_train_ohe_sparse, y_train = prepare_data(
    train_data,
    model_config,
    features_config,
    prefix="train",
)

# preparing the inferencing data
X_test_ohe_sparse, y_test = prepare_data(
    test_data,
    model_config,
    features_config,
    prefix="test",
)

In [None]:
# Normal run
train_model(X_train_ohe_sparse, X_test_ohe_sparse, y_train, y_test, model_config)

In [None]:
# MLFlow run

max_depth_list = [3,4]
learning_rate_list = [0.1, 0.001]
n_estimators_list = [20, 25]

for max_depth, learning_rate, n_estimators in zip(max_depth_list, learning_rate_list, n_estimators_list):
    model_params = {"max_depth":max_depth, "learning_rate":learning_rate, "n_estimators":n_estimators}
    MLFlow_train_model(X_train_ohe_sparse, X_test_ohe_sparse, y_train, y_test, model_config, model_params)

In [None]:
! mlflow ui