# 🛠️ M5 Forecasting Data Preprocessing and Feature Engineering

This notebook focuses on efficiently preparing the **M5 Forecasting** dataset for further analysis and modeling, primarily using **Dask** for handling large data.

---

#### 1. 📦 Import Required Libraries
- Imported Dask, NumPy, Pandas, OS, and JSON libraries.
- Note: All necessary libraries should be installed from the provided `requirements.txt`.

---

#### 2. 📂 Create Output Directory
- Created a `./data` folder if it doesn't already exist to store processed datasets.

---


#### 3. 🛒 Sales Data Preprocessing
- Read `sales_train_validation.csv` using Dask.
- **Truncated** the dataset to only include `HOBBIES` category products due to limited compute resources.
- **Repartitioned** the dataset based on `item_id` for better parallelism.
- **Melted** the data:
  - Converted it from wide format (columns `d_1`, `d_2`, ...) to long format (one row per item-day pair).
- Extracted day numbers (e.g., `d_1` → `1`) from the `day` column.
- **Sorted** data by `id` and `day`.
- Saved the processed sales data in **Parquet** format for faster loading.

---

#### 4. 📈 Load Datasets
- Loaded the following datasets:
  - **Sell prices**: `sell_prices.csv`
  - **Calendar**: `calendar.csv` (handled categorical columns and missing values properly)
  - **Sales**: Loaded preprocessed sales data from the saved parquet files.

---

#### 5. 🧹 Handling Missing Values
- Created a utility function `handle_missing_values(df)`:
  - Generated a missing value report (percentage of nulls per column).
  - Imputed missing values using **forward fill** (`ffill`) followed by **backward fill** (`bfill`).
- Applied this function across **sales**, **prices**, and **calendar** datasets.

---

#### 6. 🛠️ Memory Management
- Repartitioned the sales dataset for better memory management and processing efficiency.

---

#### 7. 🧪 Feature Engineering
- **Sorted** sales data by `id` and `day` again to ensure correct sequencing.
- **Set `id` as the index** (ensuring a sorted index for partition operations).
- Created **lag features**:
  - Added lagged sales columns for **1**, **7**, and **28** days.
  - These features will help capture sales patterns and temporal dependencies during modeling.

---

### ✅ Output:
- Preprocessed sales data with lag features ready for modeling.
- Missing values handled across all datasets.
- All transformations are optimized for large-scale data handling using Dask.

---


In [None]:
#Importing required libraries
#Note: Install all libraries from the requirements.txt file
import dask.dataframe as dd
import numpy as np
import os
import json

from utils import *

⚡ Note: The datasets are too large to upload directly to GitHub, hence manual placement is required.

# 📂 Dataset Placement Instructions

Please download the following datasets from the [M5 Forecasting Accuracy Kaggle competition](https://www.kaggle.com/competitions/m5-forecasting-accuracy/data):

1. `sales_train_evaluation.csv`
2. `sales_train_validation.csv`

After downloading, **place them in the following directory structure** relative to this notebook:

#### 🛠️ Sales Data Reshaping: Wide to Long Format

The **sales dataset** provided is originally in a **wide format**, where each day's sales are represented as separate columns.

For effective **time-series analysis** and **modeling**, it is essential to reshape this data into a **long format** — where each row represents a single product's sales on a specific day.

This preprocessing step **transforms** the dataset and **stores** the reshaped version, enabling easier feature engineering, model training, and forecasting tasks.


In [None]:
## The Sales dataset is in wide format, we need it in long format, this operation preprocess it and stores in long format
def data_repartition_dask(df, key_col):
    groups = df[key_col].unique().compute().sort_values().values.tolist()
    groups.append(groups[-1])
    df = df.set_index(key_col, divisions=groups ).reset_index()
    return df

def data_dimentionality_prep(file_read_path, file_write_path):
    sales_data_prep = dd.read_csv(file_read_path)
    print("1 : Partitions : ", sales_data_prep.npartitions)
    sales_data_prep = data_repartition_dask(sales_data_prep, 'item_id')
    print("2 : re-Partitions : ", sales_data_prep.npartitions)

    # Melt the data: Convert wide format to long format
    sales_data_prep = sales_data_prep.melt(
        id_vars=["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"],
        var_name="day",
        value_name="sales"
    )
    sales_data_prep.head()

    # Extract day number from column names safely using Pandas
    def extract_day(df):
        df["day"] = df["day"].str.extract(r"d_(\d+)").astype(int)
        return df

    meta = {
        "id": "object",
        "item_id": "object",
        "dept_id": "object",
        "cat_id": "object",
        "store_id": "object",
        "state_id": "object",
        "day": "int64",
        "sales": "float64"
    }

    # Apply transformation using map_partitions
    sales_data_prep = sales_data_prep.map_partitions(extract_day, meta=meta)
    sales_data_prep = sales_data_prep.map_partitions(lambda df: df.sort_values(['id',"day"]))

    print("3 : final Partitions : ", sales_data_prep.npartitions)
    # Save processed sales data
    sales_data_prep.to_parquet(file_write_path)


# Create output folder if it doesn't exist
os.makedirs("../data", exist_ok=True)

if not os.path.isdir('..\data\processed\sales_train_validation'):
    data_dimentionality_prep(file_read_path = '../data/raw/sales_train_validation.csv', file_write_path = '../data/processed/sales_train_validation')
    

if not os.path.isdir('..\data\processed\sales_train_evaluation'):
    data_dimentionality_prep(file_read_path = '../data/raw/sales_train_evaluation.csv', file_write_path = '../data/processed/sales_train_evaluation')



In [None]:
# Step 1: Load the dataset efficiently using Dask

prices_data = dd.read_csv('../data/raw/sell_prices.csv')
calendar_data = dd.read_csv('../data/raw/calendar.csv', dtype={
        'event_name_1': 'object',
        'event_type_1': 'object',
        'event_name_2': 'object',
        'event_type_2': 'object'
    },
    assume_missing=True  # Ensures proper dtype handling for missing values
)
sales_data_val = dd.read_parquet('../data/processed/sales_train_validation')
sales_data_eval = dd.read_parquet('../data/processed/sales_train_evaluation')

#Have mercy on my personal compute, truncating data with filters
item_ids = [
    'HOBBIES_1_001',
    'HOBBIES_1_002',
    'HOBBIES_1_003',
    'HOBBIES_1_004',
    'HOBBIES_1_005'
]
sales_data_val = data_repartition_dask(sales_data_val[sales_data_val['item_id'].isin(item_ids)], 'item_id')
sales_data_eval = data_repartition_dask(sales_data_eval[sales_data_eval['item_id'].isin(item_ids)], 'item_id')


In [None]:
def data_engineering(data_df):
    # Step 2: Handle missing values & outliers
    data_df, data_df_missing = handle_missing_values(data_df)

    data_df = data_df.map_partitions(
        lambda df: df.sort_values(["id", "day"])
    )
    data_df = data_df.set_index("id", sorted=True)

    # Step 3: Feature Engineering
    # Creating lag features for sales
    data_df = create_lag_features(data_df, lags=[1, 7, 28])

    # Rolling window features
    data_df = create_rolling_features(data_df)
    data_df = data_df.reset_index()

    return data_df, data_df_missing
    


In [None]:
prices_data, prices_missing = handle_missing_values(prices_data)
calendar_data, calendar_missing = handle_missing_values(calendar_data)


#back to DE 
sales_val, sales_missing = data_engineering(sales_data_val)
sales_eval, _ = data_engineering(sales_data_eval)

In [None]:
# Label categorical data

manager = DaskLabelEncoderManager(columns=['id', 'item_id',	'dept_id',	'cat_id',	'store_id',	'state_id'])

# Train phase
sales_val_encoded = manager.fit_transform(sales_val)
# sales_val_encoded = dd.from_delayed([sales_val_encoded.compute()])
manager.save('../data/encoders.pkl')

# # Test phase
sales_eval_encoded = manager.transform(sales_eval)

# Save processed data
# Export cleaned and engineered dataset
sales_val_encoded.to_parquet("../data/processed/final_sales_data_val")
sales_eval_encoded.to_parquet("../data/processed/final_sales_data_eval")

In [None]:
# Step 4: Generate Data Quality Report
# TODO : Extend this very vanila version to profiling, validation rules, Anomaly detection, report generation
quality_report = {
    'sales_missing': sales_missing.to_dict(),
    'prices_missing': prices_missing.to_dict(),
    'calendar_missing': calendar_missing.to_dict()
}

with open('../data/data_quality_report.json', 'w') as f:
    json.dump(quality_report, f, indent=4)


print("Data engineering completed. Processed data saved.")