# FMCG Forecasting: Weekly Prediction Job

This notebook simulates a **weekly prediction job** for FMCG demand forecasting.  
It performs real-time inference using a previously trained model and logs the predictions back to the Feature Store.

## Objectives

1. **Load a new daily batch** of sales data for SKU `MI-006`
2. Retrieve the **last 7 days of history** from the Feature Store
3. Apply the same feature transformations using `engineer_features_daily`
4. Generate predictions using the registered model
5. Save batch-level predictions to:
   - Parquet (for archiving)
   - CSV (for dashboards)
   - Feature Store (for traceability and future retraining)

> This notebook represents a **production-like scoring job**, typically triggered weekly by an orchestration tool (e.g., Airflow, dbutils.jobs.runNow, or Databricks Workflows).


## Imports

In [0]:
from utils.forecasting_utils import engineer_features_daily
from utils.weekly_helpers import load_model_from_registry, generate_batch_id, format_output_paths
from databricks.feature_store import FeatureStoreClient
from databricks.feature_store.entities.feature_lookup import FeatureLookup
import mlflow
from mlflow.tracking import MlflowClient
from pyspark.sql.functions import col, min as min_
from datetime import timedelta
from pyspark.sql.types import DateType, IntegerType


## Load new batch

In [0]:
df_batch = spark.read.parquet("dbfs:/FileStore/fmcg/batch_MI_006_2025_01_06.parquet")


## set Cut-Off Date & load 7 days back

In [0]:
cutoff_batch_date = df_batch.select(min_("date")).first()[0]
cutoff_start_date = cutoff_batch_date - timedelta(days=7)

fs = FeatureStoreClient()

df_history = fs.read_table("fmcg_features_daily") \
    .filter((col("date") >= cutoff_start_date) & (col("date") < cutoff_batch_date))

display(df_history)


## Set the same schema

In [0]:
df_batch_prepared = df_batch \
    .withColumn("date", col("date").cast(DateType())) \
    .withColumn("promotion_flag", col("promotion_flag").cast(IntegerType())) \
    .withColumn("delivery_days", col("delivery_days").cast(IntegerType())) \
    .withColumn("stock_available", col("stock_available").cast(IntegerType())) \
    .withColumn("delivered_qty", col("delivered_qty").cast(IntegerType())) \
    .withColumn("units_sold", col("units_sold").cast(IntegerType()))


from pyspark.sql.functions import lit

for col_name in ["lag_1", "lag_2", "rolling_mean_4", "rolling_std_4", "momentum", "avg_by_channel_region"]:
    df_batch_prepared = df_batch_prepared.withColumn(col_name, lit(None).cast("double"))


df_batch_prepared = df_batch_prepared.select(df_history.columns)


## Union & create features

In [0]:
df_combined = df_history.unionByName(df_batch_prepared)


In [0]:
df_combined_fe = engineer_features_daily(df_combined)


In [0]:
df_combined_fe.show()

## Prepare & Predict

In [0]:
features = [
    "lag_1", "lag_2", "rolling_mean_4", "rolling_std_4", 
    "momentum", "avg_by_channel_region"
]


In [0]:
import pandas as pd
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import struct
from pyspark.ml.feature import VectorAssembler

from databricks.feature_store import FeatureStoreClient
import mlflow

# 1. Filtruj dane batch
df_new_only = df_combined_fe.filter(col("date") >= cutoff_batch_date)

# 2. Konwertuj do Pandas do predykcji
df_pandas = df_new_only.select(features).toPandas()

# 3. Załaduj model
model_uri = 'runs:/139e23d4770d4356bc025f3035b9576b/model'
loaded_model = mlflow.pyfunc.load_model(model_uri)

# 4. Predykcja
df_pandas["units_sold_pred"] = loaded_model.predict(df_pandas)





## Save Predictions

In [0]:
from datetime import datetime

# Użyj daty batcha do stworzenia nazwy pliku
formatted_date = cutoff_batch_date.strftime("%Y-%m-%d")

# Finalne predykcje połączone z danymi wejściowymi
df_pred_full = df_new_only.toPandas()
df_pred_full["units_sold_pred"] = df_pandas["units_sold_pred"]

# Ścieżki do zapisania
output_parquet_path = f"dbfs:/FileStore/fmcg/predictions/final_daily_preds_{formatted_date}.parquet"
output_csv_path = f"/dbfs/FileStore/fmcg/predictions/final_daily_preds_{formatted_date}.csv"

# Zapis do Parquet (overwrite → nadpisuje predykcje dla tej daty, jeśli już istnieją)
spark.createDataFrame(df_pred_full).write.mode("overwrite").parquet(output_parquet_path)
print(f"✅ Saved predictions to Parquet: {output_parquet_path}")

# Zapis do CSV (opcjonalny – przydatny np. do dashboardu)
df_pred_full.to_csv(output_csv_path, index=False)
print(f"✅ Also saved as CSV: {output_csv_path}")


## Log new features

In [0]:
from databricks.feature_store import FeatureStoreClient

fs = FeatureStoreClient()

# Zapisz do Feature Store z użyciem merge i kluczy (np. sku + date)
fs.write_table(
    name="fmcg_features_daily",
    df=spark.createDataFrame(df_pred_full),
    mode="merge"
)
print("✅ Merged new predictions into Feature Store (fmcg_features_daily)")
