In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions
import os
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col
from pyspark.ml.feature import StringIndexer

In [20]:
spark = (
    SparkSession.builder
    .appName("M5-Feature-Engineering")
    .master("local[2]")
    .config("spark.driver.memory", "6g")
    .config("spark.executor.memory", "6g")
    .config("spark.sql.shuffle.partitions", "32")
    .getOrCreate()
)

In [21]:
base_df = spark.read.csv(
    "../dataset/cooked/1/",
    header=True,
    inferSchema=True
)


In [22]:
base_df.printSchema()
base_df.show(5)


root
 |-- date: date (nullable = true)
 |-- item_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- sales: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- event: string (nullable = true)
 |-- weekday: string (nullable = true)

+----------+-----------+--------+-----+-----+------------+---------+
|      date|    item_id|store_id|sales|price|       event|  weekday|
+----------+-----------+--------+-----+-----+------------+---------+
|2011-06-18|FOODS_1_001|    CA_1|    3|  2.0|        NULL| Saturday|
|2011-06-19|FOODS_1_001|    CA_1|    2|  2.0|Father's day|   Sunday|
|2011-06-20|FOODS_1_001|    CA_1|    3|  2.0|        NULL|   Monday|
|2011-06-21|FOODS_1_001|    CA_1|    1|  2.0|        NULL|  Tuesday|
|2011-06-22|FOODS_1_001|    CA_1|    1|  2.0|        NULL|Wednesday|
+----------+-----------+--------+-----+-----+------------+---------+
only showing top 5 rows



# Time SEries

### COnvert to date

In [23]:
base_df = base_df.withColumn(
    "date",
    to_date(col("date"))
)

### Window Time Series

In [24]:
ts_window = Window.partitionBy(
    "item_id", "store_id"
).orderBy("date")

# Feature Engineering

### Lag Features

In [25]:
feat_df = (
    base_df
    .withColumn("lag_7", lag("sales", 7).over(ts_window))
    .withColumn("lag_14", lag("sales", 14).over(ts_window))
    .withColumn("lag_28", lag("sales", 28).over(ts_window))
)

### Rolling Mean

In [26]:
feat_df = (
    feat_df
    .withColumn(
        "rolling_mean_7",
        avg("sales").over(ts_window.rowsBetween(-6, 0))
    )
    .withColumn(
        "rolling_mean_28",
        avg("sales").over(ts_window.rowsBetween(-27, 0))
    )
)

### Price Change Feature

In [27]:
feat_df = feat_df.withColumn(
    "price_change",
    col("price") - lag("price", 1).over(ts_window)
)

### Date Features

In [28]:
feat_df = (
    feat_df
    .withColumn("dayofweek", dayofweek("date"))
    .withColumn("weekofyear", weekofyear("date"))
    .withColumn("month", month("date"))
    .withColumn(
        "is_weekend",
        when(dayofweek("date").isin([1, 7]), 1).otherwise(0)
    )
)

# Encoding Kategori

In [29]:
indexer = StringIndexer(
    inputCol="event",
    outputCol="event_idx",
    handleInvalid="keep"
)

feat_df = indexer.fit(feat_df).transform(feat_df)

# Membersihkan Data

In [30]:
feat_df = feat_df.dropna(
    subset=["lag_7", "lag_14", "lag_28"]
)


In [31]:
numeric_cols = ["price_change", "rolling_mean_7", "rolling_mean_28"]
for col_name in numeric_cols:
    feat_df = feat_df.withColumn(col_name, coalesce(col(col_name), lit(0)))

In [32]:
(
    feat_df
    .repartition(4)
    .write
    .mode("overwrite")
    .parquet("../dataset/cooked/2")
)
