In [1]:
import os
import shutil
import yaml
import tempfile
from pathlib import Path

In [2]:
project_root = Path(".").absolute().parent

In [3]:
os.chdir(project_root)

In [4]:
import mlflow
import pandas as pd
from pyspark.sql import SparkSession, DataFrame
from delta import configure_spark_with_delta_pip
from meli_forecast.params import Params
from meli_forecast.tasks.create_database import CreateDataBaseTask
from meli_forecast.tasks.create_tables import CreateTablesTask
from meli_forecast.tasks.ingestion import IngestionTask
from meli_forecast.tasks.input import InputTask
from meli_forecast.tasks.split import SplitTask
from meli_forecast.tasks.model import ModelTask
from meli_forecast.tasks.evaluation import EvaluationTask
from meli_forecast.tasks.output import OutputTask
from meli_forecast.utils import write_delta_table
from meli_forecast.schemas import ForecastSchema

Importing plotly failed. Interactive plots will not work.


In [5]:
conf_file = "conf/dev_config.yml"
config = yaml.safe_load(Path(conf_file).read_text())
params = Params(**config)

# Setting Spark Session

In [6]:
warehouse_dir = tempfile.TemporaryDirectory().name
_builder = (
    SparkSession.builder.master("local[*]")
    .config(
        "spark.hive.metastore.warehouse.dir", Path(warehouse_dir).as_uri()
    )
    .config(
        "spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"
    )
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "true")
)
spark: SparkSession = configure_spark_with_delta_pip(
    _builder
).getOrCreate()

24/08/14 16:00:23 WARN Utils: Your hostname, dgarridoa-MS-7C75 resolves to a loopback address: 127.0.1.1; using 192.168.100.11 instead (on interface enp3s0)
24/08/14 16:00:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/dgarridoa/.ivy2/cache
The jars for the packages stored in: /home/dgarridoa/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fbd8e5ca-aa7c-48c9-8eb9-0e16fb01532b;1.0
	confs: [default]


:: loading settings :: url = jar:file:/home/dgarridoa/.cache/pypoetry/virtualenvs/meli-forecast-x6SuGLy_-py3.11/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found io.delta#delta-spark_2.12;3.2.0 in central
	found io.delta#delta-storage;3.2.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 93ms :: artifacts dl 4ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.2.0 from central in [default]
	io.delta#delta-storage;3.2.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-fbd8e5ca-aa7c-48c9-8eb9-0e16fb01532b
	confs: [default]
	0 artifacts copied, 3 already retrieved (0kB/3ms)
24/08/14 16:00:23 

In [39]:
spark.sparkContext.setLogLevel("ERROR")

# Setting MLflow tracking server

In [7]:
tracking_uri = tempfile.TemporaryDirectory().name
registry_uri = f"sqlite:///{tempfile.TemporaryDirectory().name}"

mlflow.set_tracking_uri(Path(tracking_uri).as_uri())
mlflow.set_registry_uri(registry_uri)
os.environ[
    "MLFLOW_EXPERIMENT_NAME"
] = "/Shared/meli_forecast/dev_meli_forecast"

# Create Database Task

In [15]:
task = CreateDataBaseTask(params.common)
task.launch(spark)

#  Create Tables Task

In [16]:
create_database_task = CreateTablesTask(params.common)
create_database_task.launch(spark)

# Ingestion Task

In [17]:
ingestion_task = IngestionTask(params.ingestion)
ingestion_task.launch(spark)

24/08/14 16:02:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

# Input Task

In [18]:
input_task = InputTask(params.common)
input_task.launch(spark)

                                                                                

# Split Task

In [19]:
split_task = SplitTask(params.split)
split_task.launch(spark)

                                                                                

# Model Task

In [20]:
df_split = spark.read.table("dev.split")
num_partitions = (
    df_split.select(*split_task.params.group_columns).distinct().cache().count()
)
df_split = df_split.repartition(num_partitions, *split_task.params.group_columns).cache()

In [21]:
def get_forecast(task, df_split: DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
    df_train = df_split.filter(df_split["split"] == "train")
    pdf_forecast_on_test = task.fit_predict(df_train, task.params.test_size).toPandas()
    pdf_forecast = task.fit_predict(df_split, task.params.steps).toPandas()
    return pdf_forecast_on_test, pdf_forecast

In [None]:
def get_forecast(task, df_split: DataFrame) -> tuple[DataFrame, DataFrame]:
    df_train = df_split.filter(df_split["split"] == "train")
    df_forecast_on_test = task.fit_predict(df_train, task.params.test_size)
    df_forecast = task.fit_predict(df_split, task.params.steps)
    return df_forecast_on_test, df_forecast


models = [
    "ExponentialSmoothing",
    "Prophet",
    "XGBModel",
    "RandomForest",
    "Croston",
    "NaiveMean",
    "NaiveMovingAverage"
    
]
forecast_on_test = []
all_models_forecast = []

for model in models:
    model_task =  ModelTask(params.models[model])
    df_forecast_on_test, df_forecast = get_forecast(model_task, df_split)
    forecast_on_test.append(df_forecast_on_test.toPandas())
    all_models_forecast.append(df_forecast.toPandas())

In [25]:
df_forecast_on_test = pd.concat(forecast_on_test)
df_all_models_forecast = pd.concat(all_models_forecast)

In [37]:
write_delta_table(
    spark,
    spark.createDataFrame(df_forecast_on_test, schema=ForecastSchema),
    ForecastSchema,
    model_task.params.database,
    "forecast_on_test",
)

In [38]:
write_delta_table(
    spark,
    spark.createDataFrame(df_all_models_forecast, schema=ForecastSchema),
    ForecastSchema,
    model_task.params.database,
    "all_models_forecast",
)

# Evaluation Task

In [40]:
task = EvaluationTask(params.evaluation)
task.launch(spark)

                                                                                

# Output Task

In [41]:
task = OutputTask(params.output)
task.launch(spark)

# Results

In [42]:
spark.sql("show tables from dev").show()

+---------+-------------------+-----------+
|namespace|          tableName|isTemporary|
+---------+-------------------+-----------+
|      dev|all_models_forecast|      false|
|      dev|        best_models|      false|
|      dev|           forecast|      false|
|      dev|   forecast_on_test|      false|
|      dev|                geo|      false|
|      dev|              input|      false|
|      dev|            metrics|      false|
|      dev|             output|      false|
|      dev|              sales|      false|
|      dev|              split|      false|
+---------+-------------------+-----------+



In [43]:
df_mae = spark.sql("""
    select
      *
    from
      dev.metrics
    where
      metric = 'mae'
    union
    select
      "Champion" as model,
      city,
      product_id,
      metric,
      value
    from
      dev.best_models
    """
).toPandas()

In [44]:
df_mae.head()

Unnamed: 0,model,city,product_id,metric,value
0,Croston,B2,543de64b-7c34-4283-8c8c-7592057af4f8,mae,0.334764
1,Croston,B3,b1660dad-f9c4-4cfe-9d63-5c97e820afbb,mae,4292.539411
2,Croston,M3,6b6a2f69-a0d7-4d7a-b191-570f267f315e,mae,2598.129639
3,Croston,M4,c37c63fa-c946-4c83-b5f6-cafaf7371038,mae,788.396973
4,Croston,M5,9de8c74d-219a-41a3-943f-05780f46d629,mae,2.85


In [45]:
df_mae.groupby("model").agg({"value": ["median", "mean", "count"]})

Unnamed: 0_level_0,value,value,value
Unnamed: 0_level_1,median,mean,count
model,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2
Champion,0.177197,1542.055706,2907
Croston,1122.675903,3182.308402,2874
ExponentialSmoothing,1114.871584,3052.748432,2849
NaiveMean,654.747073,2458.947756,2907
NaiveMovingAverage,0.285714,2060.563026,2890
Prophet,1134.614526,2832.468756,2907
RandomForest,1.012904,2624.797254,2885
XGBModel,0.728441,2469.812136,2885


In [46]:
df_pred_vs_sales_on_test = spark.sql("""
    with champion_forecast_on_test (
      select
        'Champion' as model,
        ft.city,
        ft.product_id,
        ft.date,
        ft.sales
      from
        dev.forecast_on_test ft
        inner join (
          select
            city,
            product_id,
            model
          from
            dev.best_models
        ) bm on ft.city = bm.city
        and ft.product_id = bm.product_id
        and ft.model = bm.model
    )
    select
      *
    from
      champion_forecast_on_test
    union
    select
      *
    from
      dev.forecast_on_test
    union
    select
      'Sales' as model,
      city,
      product_id,
      date,
      sales
    from
      dev.split
    where
      split = 'test'
    """
).toPandas()

In [47]:
df_pred_vs_sales_on_test.head()

Unnamed: 0,model,city,product_id,date,sales
0,Champion,B2,defecb65-93a5-43fb-845c-d42201cedcec,2024-08-02,163.647619
1,Champion,B3,1c9641b7-ba05-4b66-92f6-c04d046ab618,2024-08-03,10015.681246
2,Champion,M4,d5d73b79-48aa-4a36-8eed-86c311ec3bc9,2024-08-01,5207.832981
3,Champion,B2,f5799df6-9026-479a-b350-a3ae95d3f373,2024-08-07,0.51407
4,Champion,M3,16d20241-60b0-4c6b-b7e3-b8b5e7f3f9e6,2024-08-04,10776.777889


In [48]:
df_pred_vs_sales_on_test.groupby(["model"]).agg({"sales": "sum"})

Unnamed: 0_level_0,sales
model,Unnamed: 1_level_1
Champion,24740190.0
Croston,53862350.0
ExponentialSmoothing,29716090.0
NaiveMean,42251660.0
NaiveMovingAverage,30403140.0
Prophet,26168310.0
RandomForest,47510050.0
Sales,34958230.0
XGBModel,40790960.0


In [49]:
df_pred_vs_sales_on_test.groupby(["model", "date"]).agg({"sales": "sum"}).head(10)

Unnamed: 0_level_0,Unnamed: 1_level_0,sales
model,date,Unnamed: 2_level_1
Champion,2024-08-01,3330759.0
Champion,2024-08-02,3204615.0
Champion,2024-08-03,3142960.0
Champion,2024-08-04,3548915.0
Champion,2024-08-05,3833428.0
Champion,2024-08-06,3983461.0
Champion,2024-08-07,3696048.0
Croston,2024-08-01,7694622.0
Croston,2024-08-02,7694622.0
Croston,2024-08-03,7694622.0


In [50]:
df_output = spark.read.table("dev.output").toPandas()

In [51]:
df_output.head()

Unnamed: 0,product_id,date,city,sales
0,00afde38-77a7-410e-8f94-9d1110a44693,2024-08-10,B1,0.0
1,00afde38-77a7-410e-8f94-9d1110a44693,2024-08-09,B1,0.0
2,00afde38-77a7-410e-8f94-9d1110a44693,2024-08-08,B1,0.0
3,00dac0d3-3b8f-4d7f-919a-bfb852e11148,2024-08-10,B1,0.0
4,00dac0d3-3b8f-4d7f-919a-bfb852e11148,2024-08-09,B1,0.0


In [52]:
df_output.groupby("date").agg({"sales": "sum"})

Unnamed: 0_level_0,sales
date,Unnamed: 1_level_1
2024-08-08,40665110.0
2024-08-09,40534680.0
2024-08-10,40327930.0


# Delete temporal data

In [55]:
spark.stop()
if Path(warehouse_dir).exists():
    shutil.rmtree(warehouse_dir)

In [56]:
mlflow.end_run()