## Introduction

This notebook is a follow up to DS 1 - Stock Analysis. In this notebook, models stored in MLflow will be downloaded and future predictions will be stored in the lakehouse, which can then be leveraged in Power BI.

In [1]:
!pip install prophet
import pyspark.sql.functions as F
import mlflow
import pandas as pd
import datetime
from datetime import timedelta
from pyspark.sql.functions import concat, col, lit, when, substring 
from pyspark.sql.types import *
from mlflow import MlflowClient
from mlflow.entities import ViewType

StatementMeta(, b616ac0e-95ab-4072-a519-9f46296626bf, 3, Finished, Available)

Collecting prophet
  Downloading prophet-1.1.5-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.4/14.4 MB[0m [31m118.4 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
[?25hCollecting cmdstanpy>=1.0.4 (from prophet)
  Downloading cmdstanpy-1.2.1-py3-none-any.whl (93 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m93.7/93.7 kB[0m [31m30.4 MB/s[0m eta [36m0:00:00[0m
Collecting stanio~=0.3.0 (from cmdstanpy>=1.0.4->prophet)
  Downloading stanio-0.3.0-py3-none-any.whl (6.2 kB)
Installing collected packages: stanio, cmdstanpy, prophet
Successfully installed cmdstanpy-1.2.1 prophet-1.1.5 stanio-0.3.0


In [2]:
def create_prediction_table():
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS stocks_prediction (
            Predict_time TIMESTAMP
            ,Symbol VARCHAR(5)
            ,yhat DOUBLE
            ,yhat_lower DOUBLE
            ,yhat_upper DOUBLE
            ,Generated TIMESTAMP)
        USING DELTA
        """)
    
# create the stocks prediction table if needed
create_prediction_table()

StatementMeta(, b616ac0e-95ab-4072-a519-9f46296626bf, 4, Finished, Available)

In [3]:
class StockPrediction:
    def __init__(self, model_uri, symbol) -> None:
        self.model_uri = model_uri
        self.symbol = symbol
        self.generated = datetime.datetime.utcnow()

StatementMeta(, b616ac0e-95ab-4072-a519-9f46296626bf, 5, Finished, Available)

In [5]:
def get_symbols():

    # get the symbols from the dim_symbol table if lakehouse module is completed
    #symbol_df = spark.sql("SELECT Symbol FROM StocksLakehouse.dim_symbol")

    # create symbols manually if dim_symbol (from lakehouse module) does not exist
    symbol_df = spark.createDataFrame( \
        [['CSCO', 'ADSK', 'INTC', 'ADBE', 'WMT', 'NFLX', 'DIS' , 'KO', 'AMZN', 'META']])

    symbols = symbol_df.collect()
    return symbols

# get a list of all stock symbols
symbol_list = get_symbols()
print(symbol_list)

StatementMeta(, b616ac0e-95ab-4072-a519-9f46296626bf, 7, Finished, Available)

[Row(_1='CSCO', _2='ADSK', _3='INTC', _4='ADBE', _5='WMT', _6='NFLX', _7='DIS', _8='KO', _9='AMZN', _10='META')]


In [None]:
# queries mlflow for matching models for each stock, using the most recent model
def create_prediction_list(symbols):

    prediction_list = []

    for row in symbols:
        symbol = row['Symbol']

        runs_df = mlflow.search_runs(experiment_names=[f"{symbol}-stock-prediction"],
            run_view_type=ViewType.ACTIVE_ONLY,
            filter_string="attributes.status = 'Finished'",
            order_by=["attributes.start_time DESC"])

        if not runs_df.empty:
            run_id = runs_df.iloc[0].run_id
            model_uri = f"runs:/{run_id}/{symbol}-stock-prediction-model"
            print(model_uri)
            prediction_list.append(StockPrediction(model_uri, symbol))
            
    return prediction_list

# search for available models in mlflow
prediction_list = create_prediction_list(symbol_list)

StatementMeta(, , , Cancelled, )

## Functions for filtering, building, and merging data

In [None]:
# merge the predictions with the table in the lakehouse

from delta.tables import *

def write_predictions(predictions_pd, symbol, generated):

    predictions_df = spark.createDataFrame(predictions_pd) 
    predictions_df = predictions_df.withColumn("symbol", lit(symbol))
    predictions_df = predictions_df.withColumn("generated", lit(generated))
   
    stock_predictions_table = DeltaTable.forName(spark, "stocks_prediction")

    stock_predictions_table.alias('table') \
    .merge(
        predictions_df.alias('predictions'),
        f'table.Predict_time = predictions.ds and table.Symbol = "{symbol}"'
    ) \
    .whenMatchedUpdate(set =
        {
            "yhat": "predictions.yhat"
            ,"yhat_lower": "predictions.yhat_lower"
            ,"yhat_upper": "predictions.yhat_upper"
            ,"Generated": f"'{str(generated)}'"
        }
    ) \
    .whenNotMatchedInsert(values =
        {
            "Predict_time": "predictions.ds"
            ,"Symbol": f"'{symbol}'"
            ,"yhat": "predictions.yhat"
            ,"yhat_lower": "predictions.yhat_lower"
            ,"yhat_upper": "predictions.yhat_upper"
            ,"Generated": f"'{str(generated)}'"
        }
    ) \
    .execute()

StatementMeta(, , , Cancelled, )

In [None]:
# establish begin/end dates for prediction
# returns an empty dataframe

def make_prediction_dataframe(fromdate = datetime.datetime.utcnow()):

    enddate = fromdate + datetime.timedelta(days=7)

    print(f'Beginning of forecast: {fromdate}')
    print(f'End of forecast: {enddate}')

    future = pd.DataFrame({'ds': pd.date_range(start=fromdate, end=enddate, freq='T')})
    return future

StatementMeta(, , , Cancelled, )

In [None]:
import mlflow

def load_and_predict(prediction):

    loaded_model = mlflow.prophet.load_model(prediction.model_uri)

    prediction_start = datetime.datetime.utcnow()
    prediction_start = prediction_start.replace(second=0, microsecond=0)

    # predict_df = loaded_model.make_future_dataframe(periods=60*24*7, freq='min', include_history = False)
    predict_df = make_prediction_dataframe(prediction_start)

    forecast = loaded_model.predict(predict_df)
    return forecast

StatementMeta(, , , Cancelled, )

## Main loop to build predicitions for each symbol

In [None]:
# build the predictions for each model, store in lakehouse
for prediction in prediction_list:
    print(f"{prediction.symbol} {prediction.model_uri}")
    forecast = load_and_predict(prediction)
    write_predictions(forecast, prediction.symbol, prediction.generated)

StatementMeta(, , , Cancelled, )

In [None]:
# spark.sql("DELETE FROM stocks_prediction")

StatementMeta(, , , Cancelled, )

In [None]:
df = spark.sql("SELECT * FROM stocks_prediction ORDER BY predict_time DESC LIMIT 1000")
display(df)

StatementMeta(, , , Cancelled, )