<a href="https://colab.research.google.com/github/MohamedAhmedGalal/Spark-Apache-favorita-grocery-sal-es-forecasting-/blob/main/Feature_engineering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Kaggle library installation**
To be able to import required dataset through api credentials

In [None]:
!pip install kaggle



# **API credentials uploading**
Required API file was downloadedd from my kaggle account, just to be able to use kaggle datasets

In [None]:
from google.colab import files
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"mawkley","key":"63bc38bb5e5b333a6e7dbd814357d370"}'}

# **Setting up Kaggle API access in Colab**
First, !mkdir -p ~/.kaggle creates a hidden .kaggle folder where the API key must be stored. Then, !mv kaggle.json ~/.kaggle/ moves  downloaded Kaggle credentials (kaggle.json) into that folder. Finally, !chmod 600 ~/.kaggle/kaggle.json secures the file by allowing only the owner to read and write it, preventing others from accessing your private key. Together, they ensure the Kaggle CLI can authenticate safely.

In [None]:
!mkdir -p ~/.kaggle

!mv kaggle.json ~/.kaggle/

!chmod 600 ~/.kaggle/kaggle.json

# **Download required avorita-grocery-sales-forecasting dataset from kaggle**

In [None]:
!kaggle competitions download -c favorita-grocery-sales-forecasting

Downloading favorita-grocery-sales-forecasting.zip to /content
 97% 446M/458M [00:00<00:00, 386MB/s]
100% 458M/458M [00:01<00:00, 477MB/s]


# **Dataset file unzipping**

In [None]:
!unzip /content/favorita-grocery-sales-forecasting.zip

Archive:  /content/favorita-grocery-sales-forecasting.zip
  inflating: holidays_events.csv.7z  
  inflating: items.csv.7z            
  inflating: oil.csv.7z              
  inflating: sample_submission.csv.7z  
  inflating: stores.csv.7z           
  inflating: test.csv.7z             
  inflating: train.csv.7z            
  inflating: transactions.csv.7z     


# **Unzipping required CSV files Train,stores and items**

In [None]:
!sudo apt-get install p7zip-full
!7z x /content/train.csv.7z
!7z x /content/stores.csv.7z
!7z x /content/items.csv.7z

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
p7zip-full is already the newest version (16.02+dfsg-8).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.

7-Zip [64] 16.02 : Copyright (c) 1999-2016 Igor Pavlov : 2016-05-21
p7zip Version 16.02 (locale=en_US.UTF-8,Utf16=on,HugeFiles=on,64 bits,2 CPUs Intel(R) Xeon(R) CPU @ 2.20GHz (406F0),ASM,AES-NI)

Scanning the drive for archives:
  0M Scan /content/                   1 file, 474092593 bytes (453 MiB)

Extracting archive: /content/train.csv.7z
--
Path = /content/train.csv.7z
Type = 7z
Physical Size = 474092593
Headers Size = 122
Method = LZMA2:24
Solid = -
Blocks = 1

  0%      0% - train.csv                  1% - train.csv                  2% - train.csv                  3% - train.csv                

# **Pyspark installation**

In [None]:
!pip install pyspark



# **Reading CSV files within dataset using Pyspark**
Spark session start,then reading dataset files

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FavoritaForecast").getOrCreate()
train = spark.read.csv("/content/train.csv", header=True, inferSchema=True)
items = spark.read.csv("/content/items.csv", header=True, inferSchema=True)
stores = spark.read.csv("/content/stores.csv", header=True, inferSchema=True)
train.show(5)

+---+----------+---------+--------+----------+-----------+
| id|      date|store_nbr|item_nbr|unit_sales|onpromotion|
+---+----------+---------+--------+----------+-----------+
|  0|2013-01-01|       25|  103665|       7.0|       NULL|
|  1|2013-01-01|       25|  105574|       1.0|       NULL|
|  2|2013-01-01|       25|  105575|       2.0|       NULL|
|  3|2013-01-01|       25|  108079|       1.0|       NULL|
|  4|2013-01-01|       25|  108701|       1.0|       NULL|
+---+----------+---------+--------+----------+-----------+
only showing top 5 rows



# **Data types consistency guaranteeing **
  According to required within test,date column in the train DataFrame is converted into a proper DateType, casts unit_sales to FloatType for numerical operations, and changes onpromotion into a Boolean flag. Both store_nbr and item_nbr are cast to IntegerType in all relevant DataFrames (train, items, and stores) to guarantee that joins and aggregations work correctly without type mismatches. This preprocessing step is essential for clean, reliable downstream analysis and modeling.

In [None]:
from pyspark.sql.types import DateType, FloatType, IntegerType
from pyspark.sql.functions import col

train = (
    train
    .withColumn("date", col("date").cast(DateType()))
    .withColumn("unit_sales", col("unit_sales").cast(FloatType()))
    .withColumn("onpromotion", col("onpromotion").cast("boolean"))
    .withColumn("store_nbr", col("store_nbr").cast(IntegerType()))
    .withColumn("item_nbr", col("item_nbr").cast(IntegerType()))
)

items = (
    items
    .withColumn("item_nbr", col("item_nbr").cast(IntegerType()))
)

stores = (
    stores
    .withColumn("store_nbr", col("store_nbr").cast(IntegerType()))
)


# **Dataset files (tables) joining**
Train is to be joined with items according to item_nbr feature (column), then joining created dataframe with stores table according to store_nbr feature (column). After joining, the head of created dataframe is checked to comply requiredd joining properly.

In [None]:
df = train.join(items, on="item_nbr", how="left")

# df + stores on store_nbr
df = df.join(stores, on="store_nbr", how="left")

# --- Step 6: Quick check ---
df.printSchema()
df.show(5)

root
 |-- store_nbr: integer (nullable = true)
 |-- item_nbr: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- unit_sales: float (nullable = true)
 |-- onpromotion: boolean (nullable = true)
 |-- family: string (nullable = true)
 |-- class: integer (nullable = true)
 |-- perishable: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- type: string (nullable = true)
 |-- cluster: integer (nullable = true)

+---------+--------+---+----------+----------+-----------+------------+-----+----------+-------+-----------+----+-------+
|store_nbr|item_nbr| id|      date|unit_sales|onpromotion|      family|class|perishable|   city|      state|type|cluster|
+---------+--------+---+----------+----------+-----------+------------+-----+----------+-------+-----------+----+-------+
|       25|  103665|  0|2013-01-01|       7.0|       NULL|BREAD/BAKERY| 2712|         1|Salinas|Santa Elena|   D|      1|
|

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    col, lag, avg, stddev, dayofweek, when
)
from pyspark.sql import functions as F

# Define the base window (partition by store/item, order by date)
w = Window.partitionBy("store_nbr", "item_nbr").orderBy("date")

# --- Step 1: Lag features (safe: use past rows only) ---
df = df.withColumn("lag_1", F.lag("unit_sales", 1).over(w))
df = df.withColumn("lag_7", F.lag("unit_sales", 7).over(w))

# --- Step 2: Rolling features (calendar-based, exclude current day) ---
# Rolling mean over previous 7 rows (days), excluding current row
df = df.withColumn(
    "rolling_mean_7",
    F.avg("unit_sales").over(w.rowsBetween(-7, -1))
)

# Rolling std over previous 14 rows (days), excluding current row
df = df.withColumn(
    "rolling_std_14",
    F.stddev("unit_sales").over(w.rowsBetween(-14, -1))
)
# --- Step 3: Calendar-based features ---
df = (
    df.withColumn("day_of_week", F.dayofweek("date") - 2)  # Spark: Sunday=1, so shift → Monday=0
      .withColumn("day_of_week", F.when(F.col("day_of_week") < 0, 6).otherwise(F.col("day_of_week")))
      .withColumn("is_weekend", F.when(F.col("day_of_week").isin([5, 6]), 1).otherwise(0))
)

df.show(10)


+---------+--------+--------+----------+----------+-----------+---------+-----+----------+-----+---------+----+-------+-----+-----+------------------+------------------+-----------+----------+
|store_nbr|item_nbr|      id|      date|unit_sales|onpromotion|   family|class|perishable| city|    state|type|cluster|lag_1|lag_7|    rolling_mean_7|    rolling_std_14|day_of_week|is_weekend|
+---------+--------+--------+----------+----------+-----------+---------+-----+----------+-----+---------+----+-------+-----+-----+------------------+------------------+-----------+----------+
|        1|  108634|11974039|2013-10-03|       1.0|       NULL|GROCERY I| 1075|         0|Quito|Pichincha|   D|     13| NULL| NULL|              NULL|              NULL|          3|         0|
|        1|  108634|12019808|2013-10-04|       1.0|       NULL|GROCERY I| 1075|         0|Quito|Pichincha|   D|     13|  1.0| NULL|               1.0|              NULL|          4|         0|
|        1|  108634|12066191|2013-1

In [None]:
df = df.dropna(subset=["lag_1", "lag_7", "rolling_mean_7", "rolling_std_14"])

Checking that day names and numbers within week are mapped correctly,but scanning this large dataset from spark takes a lot of time,it is better to take a sample from data to check on
It appeared that numbering is ok, and is_weekend is ok, as i have manually checked that sample day_of_week are consistent with date, and that days that are labeled is_weekend are actyally saturdays and sundays



### Data Leakage Defense

**What is data leakage in time-series features?**  
Data leakage occurs when information from the future which should be forecasted first "leaks" into the model during training. That is to say, forecasted data happening in future should be predicted not trained on, however, the model design had a leakage that caused training on predicted data.
For time-series problems, this leakage  occurs if lag or rolling features according to their design and definition use values from the same day or future dates.  
This misleads model expected performance, as it will perform very good while training but fails after production when using new test or validation datan.
# **Design-Time Leakage Prevention Guarantees**
**How we prevented leakage in this implementation:**  
- All lag features (lag_1, lag_7) were created using with only past rows allowed.as follows:
    

*   Written command line,windowSpec = Window.partitionBy("store_nbr", "item_nbr").orderBy("date"), used orderBy("date"), prevents Spark from rows shuffling. which guarantees ordering according to time restriction
*   partitionBy usage, means each store-item pair is its own separate time series.preventing stores data to leak through other stores and times,i.e
data from other stores or other products never enters the calculation.
*   At any row t, lag("unit_sales", 1) uses only the value at t-1. Hence not using t or t+1 preventing future values to be used



- Rolling features (7-day mean, 14-day std) were restricted to strictly **previous days only**, never including the current or future row, with the same way as previously explained, when using windowSpec.
- Calendar features (day_of_week, is_weekend) come only from the `date` column, which is always known at prediction time, so no leakage risk.


# **Assertions for Correctness of Generated Features**

# **Lag features assertion**
This assertion verifies that lag features were generated correctly and without leakage by recalculating them independently and checking consistency. For each row, lag_1 must exactly match the previous day`s unit_sales, and lag_7 must exactly match the value from seven days earlier. If any mismatch is found, the assertion fails, ensuring that the lag features truly depend only on past data and not on the current or future values.

In [None]:
# For lag_1
check_lag1 = (
    df.withColumn("prev_sales", F.lag("unit_sales", 1).over(w))
      .filter(F.col("lag_1") != F.col("prev_sales"))
)
assert check_lag1.count() == 0, "lag_1 mismatch with actual pr evious row"

# For lag_7
check_lag7 = (
    df.withColumn("sales_7ago", F.lag("unit_sales", 7).over(w))
      .filter(F.col("lag_7") != F.col("sales_7ago"))
)
assert check_lag7.count() == 0, "lag_7 mismatch with actual 7th previous row"


# **Rolling features assertion**
This assertion manually validates the by comparing Spark`s computed values against Pandas ground truth on a small sample, for calculations reduction. For each row, it checks that rolling_mean_7 equals the average of the previous 7 rows and rolling_std_14 equals the standard deviation of the previous 14 rows, both excluding the current row. Pandas is used here because it makes it easy to compute and inspect these statistics row by row, serving as a reliable reference to confirm Spark’s results. If any mismatch is detected, the assertion fails, ensuring the rolling features are leakage-free.

In [None]:
import math

# Take a small sample for manual validation
sample = (
    df.filter((F.col("store_nbr") == 1) & (F.col("item_nbr") == 96995))
      .orderBy("date")
      .limit(30)   # small sample for checking
      .toPandas()
)

# Validate rolling features against Pandas ground truth
for i in range(len(sample)):
    # --- rolling mean (7 rows before current, exclude current row) ---
    if i >= 7:
        # Spark rowsBetween(-7, -1) → exactly the 7 rows before i
        mean7 = sample.loc[i-7:i-1, "unit_sales"].mean()
        assert math.isclose(
            sample.loc[i, "rolling_mean_7"], mean7, rel_tol=1e-6
        ), f"Mismatch in rolling_mean_7 at row {i}"

    # --- rolling std (14 rows before current, exclude current row) ---
    if i >= 14:
        std14 = sample.loc[i-14:i-1, "unit_sales"].std(ddof=1)  # sample std (same as Spark)
        assert math.isclose(
            sample.loc[i, "rolling_std_14"], std14, rel_tol=1e-6
        ), f"Mismatch in rolling_std_14 at row {i}"

print("✅ Verified: rolling_mean_7 and rolling_std_14 match Spark's definition (no leakage).")


✅ Verified: rolling_mean_7 and rolling_std_14 match Spark's definition (no leakage).


calendar-based features assertion
This assertion validates calendar-based features to ensure they were generated correctly. The first check confirms that day_of_week values are always between 0 and 6 (Monday=0 to Sunday=6). The second check ensures that is_weekend is only set to 1 for Saturday and Sunday, and never for weekdays. These assertions guarantee that the date-derived features are consistent, logically correct, and free from errors.

In [None]:
assert df.filter(~F.col("day_of_week").between(0,6)).count() == 0, "Invalid day_of_week"
assert df.filter((F.col("day_of_week") < 5) & (F.col("is_weekend") == 1)).count() == 0, "is_weekend wrong"


In [None]:
# ✅ Drop rows with nulls in any lag/rolling feature
final_df = df.na.drop(
    subset=["lag_1", "lag_7", "rolling_mean_7", "rolling_std_14"]
)

# ✅ Save as partitioned Parquet by store
(
    final_df
    .write
    .mode("overwrite")
    .partitionBy("store_nbr")
    .parquet("final_dataset_partitioned")
)

