In [0]:
# Install pystan fbprophet mlflow
# %pip uninstall pystan -y
%pip install pystan~=2.14
%pip install fbprophet
%pip install mlflow


In [0]:
# Import Modules
import mlflow.sklearn
from fbprophet import Prophet
import numpy as np
import pandas as pd
import shutil
import os
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import LongType, IntegerType, FloatType, StringType
import numpy as np
import pandas as pd
from datetime import datetime, timedelta

In [0]:
def noise(size: int):
    return np.random.rand(size)

def date_from(origin: datetime, days: int):
    return (origin + timedelta(days=days)).strftime('%Y-%m-%d')

def prepare_data(begin_year: int, begin_month: int, begin_date: int, days: int):
    size = days
    days = np.arange(0, size, 1)
    quarter = np.sin(np.pi * days / 90.) * 2.5 + noise(size) * 0.02
    week = np.sin(np.pi * days / 7) * 1 + noise(size) * 0.05
    month = np.sin(np.pi * days / 30) * 2 + noise(size) * 0.2
    year = np.sin(np.pi * days / 366) * 4 + noise(size) * 0.3
    y = quarter * 0.2 + np.cos(week) + np.exp(month) + year * 2.2
    concat = np.concatenate([[days, quarter, week, month, year, y]], axis=1)
    df = pd.DataFrame(data=concat.T, columns=['ds', 'quarter', 'week', 'month', 'year', 'y'])
    origin = datetime(year=begin_year, month=begin_month, day=begin_date)
    df['ds'] = df['ds'].apply(lambda x: date_from(origin, x))
    return df

data = prepare_data(2001, 1, 1, 366 * 15)

def mlflow_prophet(interval_width: float, from_date: str, to_date: str):
    
    model = Prophet(
        interval_width=interval_width,
        growth='linear',
        yearly_seasonality=True,
        weekly_seasonality=True,
        daily_seasonality=True)
    range_data = data.loc[(data['ds'] >= from_date) & (data['ds'] < to_date)]
    range_data
    model.fit(range_data)
    future = model.make_future_dataframe(periods=365)
    result = model.predict(future)
    interval_str = "%.2f" % round(interval_width, 2)
    filepath = f'/dbfs/mlflow-phrophet/{interval_str}-{from_date}-{to_date}'
#     dbutils.fs.mkdirs(filepath)
    print('Write to Path:', filepath)
#     if os.path.isdir(filepath):
#         shutil.rmtree(filepath)
    mlflow.sklearn.save_model(model, filepath)
#     mlflow.sklearn.log_model(model, f'{interval_str}-{from_date}-{to_date}')
#     dbutils.fs.ls("/dbfs/mlflow-phrophet")
    return filepath

# The following line is to test if the udf works
# udf(0.82, '2003-01-01', '2006-01-01')

# this function produce the spark dataframe of parameters
def prepare_parameters(interval_width_from: float, interval_width_to: float, interval_width_step: float, year_from: int, year_to: int):
    df_interval = pd.DataFrame(data=np.arange(interval_width_from, interval_width_to, interval_width_step).T, columns=['interval_width'])
    df_year = pd.DataFrame(data=np.arange(year_from, year_to, 1).T, columns=['year'])
    sdf_interval = spark.createDataFrame(df_interval)
    sdf_year = spark.createDataFrame(df_year)
    sdf_interval.createOrReplaceTempView('sdf_interval')
    sdf_year.createOrReplaceTempView('sdf_year')
    sdf_parameters = spark.sql('''
select
    interval_width as interval_width,
    concat(cast(year as string), '-01-01') as year_from,
    concat(cast((year+3) as string), '-01-01') as year_to
From sdf_interval, sdf_year
''')
    sdf_parameters.createOrReplaceTempView('sdf_parameters')
    return sdf_parameters
    
sdf_parameters = prepare_parameters(0.2, 0.91, 0.05, 2001, 2004)

# Display the table of 180 parameters
sdf_parameters.display()

# Register the UDF function "mlflow_prophet"
spark.udf.register("mlflow_prophet", mlflow_prophet, StringType())

# Run 180 trainings in parallel
df_model_paths = spark.sql('''
Select
    mlflow_prophet(interval_width, year_from, year_to) as tree
From sdf_parameters
''')

df_model_paths.display()

interval_width,year_from,year_to
0.2,2001-01-01,2004-01-01
0.25,2001-01-01,2004-01-01
0.3,2001-01-01,2004-01-01
0.2,2002-01-01,2005-01-01
0.25,2002-01-01,2005-01-01
0.3,2002-01-01,2005-01-01
0.2,2003-01-01,2006-01-01
0.25,2003-01-01,2006-01-01
0.3,2003-01-01,2006-01-01
0.35,2001-01-01,2004-01-01


tree
/dbfs/mlflow-phrophet/0.20-2001-01-01-2004-01-01
/dbfs/mlflow-phrophet/0.25-2001-01-01-2004-01-01
/dbfs/mlflow-phrophet/0.30-2001-01-01-2004-01-01
/dbfs/mlflow-phrophet/0.20-2002-01-01-2005-01-01
/dbfs/mlflow-phrophet/0.25-2002-01-01-2005-01-01
/dbfs/mlflow-phrophet/0.30-2002-01-01-2005-01-01
/dbfs/mlflow-phrophet/0.20-2003-01-01-2006-01-01
/dbfs/mlflow-phrophet/0.25-2003-01-01-2006-01-01
/dbfs/mlflow-phrophet/0.30-2003-01-01-2006-01-01
/dbfs/mlflow-phrophet/0.35-2001-01-01-2004-01-01
