In [None]:
import ray
import ray.data
import pandas as pd
from prophet import Prophet


import mlflow
import mlflow.pyfunc

from mlflow.client import MlflowClient

import os

In [None]:
tracking_uri = "http://127.0.0.1:8080"

mlflow.set_tracking_uri(tracking_uri)

client = MlflowClient(tracking_uri=tracking_uri)

mlflow.set_experiment("Prophet Forecasting")

In [None]:
def prep_store_data(
    df: pd.DataFrame,
    store_id: int = 4,
    store_open: int = 1,
) -> pd.DataFrame:

    df_store = df[
        (df['Store'] == store_id) &\
        (df['Open'] == store_open)
    ].reset_index(drop=True)
    df_store['Date'] = pd.to_datetime(df_store['Date'])
    df_store.rename(columns={'Date': 'ds', 'Sales': 'y'}, inplace=True)
    return df_store.sort_values('ds', ascending=True)


def train_predict(
    df: pd.DataFrame,
    train_fraction: float,
    seasonality: dict,
) -> tuple[Prophet, pd.DataFrame, pd.DataFrame, pd.DataFrame, int]:
    
    train_index = int(train_fraction*df.shape[0])
    df_train = df.copy().iloc[0:train_index]
    df_test = df.copy().iloc[train_index:]

    model = Prophet(
        yearly_seasonality=seasonality['yearly'],
        weekly_seasonality=seasonality['weekly'],
        daily_seasonality=seasonality['daily'],
        interval_width=0.95
    )

    model.fit(df_train)

    predicted = model.predict(df_test)

    return model, predicted, df_train, df_test, train_index

In [None]:
@ray.remote(num_returns=5)
def prep_train_predict(
    df:pd.DataFrame,
    store_id: int,
    store_open: int=1,
    train_fraction: float=0.8,
    seasonality: dict={
        'yearly': True,
        'weekly': True,
        'daily': False,
    }
) -> tuple[Prophet, pd.DataFrame, pd.DataFrame, pd.DataFrame, int]:
    
    df = prep_store_data(df, store_id=store_id, store_open=store_open)
    return train_predict(df, train_fraction, seasonality)

In [None]:
train_path = "./rossman_store_data/train.csv"

df = pd.read_csv(train_path)

In [None]:
store_ids = df['Store'].unique()

seasonality = {
    'yearly': True,
    'weekly': True,
    'daily': False
}

In [None]:
ray.init()

In [None]:
df_id = ray.put(df)

In [None]:
model_obj_refs, pred_obj_refs, train_obj_refs, test_obj_refs, train_index_obj_refs = map(
    list,
    zip(*([prep_train_predict.remote(df_id, store_id) for store_id in store_ids])),
)

In [None]:
ray_results = {
    'predictions': ray.get(pred_obj_refs),
    'train_data': ray.get(train_obj_refs),
    'test_data': ray.get(test_obj_refs),
    'train_indices': ray.get(train_index_obj_refs)
}

In [None]:
def register_models_to_mlflow(store_ids, model_obj_refs):
    for store_id, model in zip(store_ids, ray.get(model_obj_refs)):
        with mlflow.start_run():
            mlflow.prophet.log_model(
                pr_model=model,
                artifact_path="prophet_model",
                registered_model_name=f"prophet-retail-forecaster-store-{store_id}"
            )

            latest_version = client.get_latest_versions(
                name=f"prophet-retail-forecaster-store-{store_id}",
                stages=["None"]
            )[0].version

            client.transition_model_version_stage(
                name=f"prophet-retail-forecaster-store-{store_id}",
                version=latest_version,
                stage="Production"
            )

In [None]:
register_models_to_mlflow(store_ids, model_obj_refs)

In [None]:
def get_production_model(store_id: int):
    model_name = f"prophet-retail-forecaster-store-{store_id}"
    model = mlflow.pyfunc.load_model(
        model_uri=f"models:/{model_name}/Production"
    )

    latest_versions_metadata = client.get_latest_versions(
        name=model_name
    )
    return model, latest_versions_metadata

In [None]:
loaded_model, metadata = get_production_model(100)