In [0]:
# Install the fbprophet library, if required.
%pip install fbprophet

In [0]:
# Required packages for this lab
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn import metrics
import time
from fbprophet import Prophet 

# Suppress uninformative warnings and errors
import warnings
warnings.filterwarnings('ignore')
import logging
logger = spark._jvm.org.apache.log4j
logging.getLogger("py4j").setLevel(logging.ERROR)
logging.getLogger('fbprophet').setLevel(logging.WARNING) 

In [0]:
# Import dataset from csv file
df = pd.read_csv('/dbfs/FileStore/ConcurrentModelTraining/data.csv', parse_dates=['date'], header='infer')
df.head(5)

In [0]:
# Depending on cluster configuration
# mode = 'single node'
mode = 'four workers'
number_disks = len(df.disk_id.unique())

In [0]:
# Create Spark DataFrame for storing results
from pyspark.sql.types import *

schema_log_time = StructType([
    StructField("method",StringType(),True),
    StructField("algorithm",StringType(),True),
    StructField("mode",StringType(),True),
    StructField("number_disks",IntegerType(),True),
    StructField("duration",FloatType(),True),
])

df_log_time = spark.createDataFrame(sc.emptyRDD(), schema_log_time)

In [0]:
# FBProphet algorithm for time series forecasting
# https://facebook.github.io/prophet/docs/quick_start.html#python-api
def prophet_predict(df: pd.DataFrame) -> list:
    df.reset_index(drop=True,inplace=True)
    
    # Split data into train test datasets
    df_train = df[0:int(len(df)*0.8)]
    df_test = df[int(len(df)*0.8):]
    
    # Number of periods to predict
    steps = len(df_test)
    
    # Format the data for fbprophet 
    prophet_df = df_train[['date', 'free_space_gb']].copy()
    prophet_df.columns = ['ds','y']
    
    # Train and fit the model
    model = Prophet(
          interval_width = 0.95,
          growth = 'linear')
    model.fit(prophet_df)

    # Make the future prediction
    future_pd = model.make_future_dataframe(
      periods = steps,
      freq = 'd',
      include_history = False
      )
    prediction = model.predict(future_pd)
    y_pred= prediction.yhat
    
    # Add column to DataFrame 
    df_full = df.copy()
    df_full['prediction'] = [None] * df_test.index.min() + list(y_pred)
    
    # Calculate RMSE metric
    rmse = np.sqrt(metrics.mean_squared_error(df_test.free_space_gb, y_pred))
    
    result = [df_full, rmse]
    return result


# Create our Linear Regression function
def regression(df:pd.DataFrame) -> list:
    df.reset_index(drop=True,inplace=True)
    X = df.index.values.reshape(-1,1)
    y = df.iloc[:, 1].values
    
    # Split data into train test datasets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0,shuffle=False)

    # Train and fit the model
    regressor = LinearRegression()
    regressor.fit(X_train, y_train)

    # Make the future prediction
    y_pred = regressor.predict(X_test)
    date_pred = df[df.index>=X_test.min()].date

    # Add column to DataFrame 
    df_full = df.copy()
    df_full['prediction'] = [None] * X_test.min() + list(y_pred)
    
    # Calculate RMSE metric
    rmse = np.sqrt(metrics.mean_squared_error(y_test, y_pred))
    result = [df_full, rmse]
    return result

In [0]:
# Sequential predictions fbprophet
start_time = time.perf_counter()

# Create output DataFrames
df_fbprophet_sequential_prediction = pd.DataFrame(columns = ['date','free_space_gb','disk_id','prediction'])
df_fbprophet_sequential_rmse = pd.DataFrame(columns = ['disk_id','rmse'])

for c in df['disk_id'].unique():
    df_each = df.loc[df['disk_id'] == c]
    df_each_prediction, rmse = prophet_predict(df_each)
    df_fbprophet_sequential_prediction = df_fbprophet_sequential_prediction.append(df_each_prediction, ignore_index=True)
    df_fbprophet_sequential_rmse = df_fbprophet_sequential_rmse.append({'disk_id': c,'rmse':rmse}, ignore_index=True)
duration_fbprophet_sequential = time.perf_counter() - start_time
print("--- {} seconds ---".format(duration_fbprophet_sequential))

In [0]:
# Save results
rows = [['sequential', 'fbprophet', mode, number_disks, duration_fbprophet_sequential]]
columns = ['method', 'algorithm', 'mode','number_disks', 'duration']
df_sequential_time = spark.createDataFrame(rows, columns)
 
# Add this to our DataFrame
df_log_time = df_log_time.union(df_sequential_time)=

In [0]:
# Sequential predictions linear regression
start_time = time.perf_counter()

# Create output DataFrames
df_regression_sequential_prediction = pd.DataFrame(columns = ['date','free_space_gb','disk_id','prediction'])
df_regression_sequential_rmse = pd.DataFrame(columns = ['disk_id','rmse'])

for c in df['disk_id'].unique():
    df_each = df.loc[df['disk_id'] == c]
    df_each_prediction, rmse = regression(df_each)
    df_regression_sequential_prediction = df_regression_sequential_prediction.append(df_each_prediction, ignore_index=True)
    df_regression_sequential_rmse = df_regression_sequential_rmse.append({'disk_id': c,'rmse':rmse}, ignore_index=True)
duration_regression_sequential = time.perf_counter() - start_time
print("--- {} seconds ---".format(duration_regression_sequential))

In [0]:
# Save results
rows = [['sequential', 'linear regression', mode, number_disks, duration_regression_sequential]]
columns = ['method', 'algorithm', 'mode','number_disks', 'duration']
df_sequential_time = spark.createDataFrame(rows, columns)
 
# Add this to our DataFrame
df_log_time = df_log_time.union(df_sequential_time)

In [0]:
# Parallel processing with concurrent.futures for fbprophet
# CPU intensive process, so will look to use ProcessPoolExecutor 
import concurrent.futures

# Create DataFrame of DataFrames to input into concurrent futures
disk_dfs = []
disks = df['disk_id'].unique()
for i in disks:
    disk_dfs.append(df[df.disk_id == i].copy())

# Create output DataFrames
df_fbprophet_concurrent_prediction = pd.DataFrame(columns = ['date','free_space_gb','disk_id','prediction'])
df_fbprophet_concurrent_rmse = pd.DataFrame(columns = ['disk_id','rmse'])

start_time = time.perf_counter()

# Concurrent futures method of processing predictions in parallel fashion
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
    future = list(map(lambda x: executor.submit(prophet_predict, x), disk_dfs))
    finished,unfinished = concurrent.futures.wait(future)
    for x in finished:
        try:
            df_fbprophet_concurrent_prediction = df_fbprophet_concurrent_prediction.append(x.result()[0])
            disk_id = x.result()[0].disk_id[0]
            rmse = x.result()[1]
            new_row = {'disk_id': disk_id,'rmse':rmse}
            df_fbprophet_concurrent_rmse = df_fbprophet_concurrent_rmse.append(new_row, ignore_index=True)
        except Exception as e:
            print(e,type(e))
            
duration_fbprophet_concurrent = time.perf_counter() - start_time
print("--- {} seconds ---".format(duration_fbprophet_concurrent))

In [0]:
# Save results
rows = [['concurrent.futures', 'fbprophet', mode, number_disks, duration_fbprophet_concurrent]]
columns = ['method', 'algorithm', 'mode','number_disks', 'duration']
df_concurrent_time = spark.createDataFrame(rows, columns)

# Add this to our DataFrame
df_log_time = df_log_time.union(df_concurrent_time)

In [0]:
# Parallel processing with concurrent.futures for linear regression
# CPU intensive process, so will look to use ProcessPoolExecutor 
import concurrent.futures

# Create DataFrame of DataFrames to input into concurrent futures
disk_dfs = []
disks = df['disk_id'].unique()
for i in disks:
    disk_dfs.append(df[df.disk_id == i].copy())

# Create output DataFrames
df_regression_concurrent_prediction = pd.DataFrame(columns = ['date','free_space_gb','disk_id','prediction'])
df_regression_concurrent_rmse = pd.DataFrame(columns = ['disk_id','rmse'])

start_time = time.perf_counter()

# Concurrent futures method of processing predictions in parallel fashion
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
    future = list(map(lambda x: executor.submit(regression, x), disk_dfs))
    finished,unfinished = concurrent.futures.wait(future)
    for x in finished:
        try:
            df_regression_concurrent_prediction=df_regression_concurrent_prediction.append(x.result()[0])
            disk_id = x.result()[0].disk_id[0]
            rmse = x.result()[1]
            new_row = {'disk_id': disk_id,'rmse':rmse}
            df_regression_concurrent_rmse = df_regression_concurrent_rmse.append(new_row, ignore_index=True)
        except Exception as e:
            print(e,type(e))
            
duration_regression_concurrent = time.perf_counter() - start_time
print("--- {} seconds ---".format(duration_regression_concurrent))

In [0]:
# Save results
rows = [['concurrent.futures', 'linear regression', mode, number_disks, duration_regression_concurrent]]
columns = ['method', 'algorithm', 'mode','number_disks', 'duration']
df_concurrent_time = spark.createDataFrame(rows, columns)

# Add this to our DataFrame
df_log_time = df_log_time.union(df_concurrent_time)

In [0]:
from pyspark.sql.types import *

# Define schema of output results
input_schema =StructType([
  StructField('date',DateType()),
  StructField('free_space_gb',FloatType()),
  StructField('disk_id',IntegerType())
])

#Create Spark DataFrame of csv file
spark_df = spark.read \
    .option("header", True) \
    .schema(input_schema) \
    .csv("dbfs:/FileStore/ConcurrentModelTraining/data.csv")

In [0]:
# Define schema of output results
result_schema =StructType([
  StructField('date',DateType()),
  StructField('free_space_gb',FloatType()),
  StructField('disk_id',IntegerType()),
  StructField('prediction',FloatType())
])

# Define schema of evaluation results
evaluation_schema =StructType([
  StructField('disk_id',IntegerType()),
  StructField('rmse',FloatType())
])


In [0]:
# Create new fbprophet function for Pandas UDFs
# No longer need to specify the @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP) decorator since Apache Spark 3.0:
# https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html
def prophet_predict_udf(df: pd.DataFrame) -> pd.DataFrame:
    df.reset_index(drop=True,inplace=True)
    
    # Split data into train test datasets
    df_train = df[0:int(len(df)*0.8)]
    df_test = df[int(len(df)*0.8):]
    
    # Number of periods to predict
    steps = len(df_test)

    # Format the data for fbprophet
    prophet_df = df_train[['date', 'free_space_gb']].copy()
    prophet_df.columns = ['ds','y']
    
    # Train and fit the model
    model = Prophet(
          interval_width = 0.95,
          growth = 'linear')
    model.fit(prophet_df)

    # Make the future predictio
    future_pd = model.make_future_dataframe(
      periods = steps,
      freq = 'd',
      include_history = False
      )

    prediction = model.predict(future_pd)
    y_pred= prediction.yhat
    
    # Add column to DataFrame 
    df_full = df.copy()
    df_full['prediction'] = [None] * df_test.index.min() + list(y_pred)
    
    return df_full

# Create new regression function for Pandas UDFs
def regression_udf(df: pd.DataFrame) -> pd.DataFrame:
    df.reset_index(drop=True,inplace=True)
    X = df.index.values.reshape(-1,1)
    y = df.iloc[:, 1].values
    
    # Split data into train test datasets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0,shuffle=False)
    
    # Train and fit the model
    regressor = LinearRegression()
    regressor.fit(X_train, y_train)
    
    # Make the future prediction
    y_pred = regressor.predict(X_test)
    date_pred = df[df.index>=X_test.min()].date
    
    # Add column to DataFrame
    df_full = df.copy()
    df_full['prediction'] = [None] * X_test.min() + list(y_pred)

    return df_full


# Separate out RMSE function 
def evaluation_rmse(df: pd.DataFrame) -> pd.DataFrame:
    disk = df.disk_id.iloc[0]
    
    # Calculate RMSE metric
    rmse = np.sqrt(metrics.mean_squared_error(df.dropna().free_space_gb, df.dropna().prediction))

    results = {'disk_id':[disk], 'rmse':[rmse]}
    return pd.DataFrame.from_dict( results )

In [0]:
# Parallel executions with Pandas UDFs for fbprophet
start_time = time.perf_counter()

# Create output DataFrames
df_fbprophet_udfs_prediction = spark_df.groupBy('disk_id').applyInPandas(prophet_predict_udf, result_schema).cache()
df_fbprophet_udfs_rmse = df_fbprophet_udfs_prediction.groupBy('disk_id').applyInPandas(evaluation_rmse, evaluation_schema).cache()

# df_fbprophet_udfs_prediction.show()
df_fbprophet_udfs_rmse.show()

duration_fbprophet_udfs = time.perf_counter() - start_time
print("--- {} seconds ---".format(duration_fbprophet_udfs))

In [0]:
# Save results
rows = [['pandas UDFs', 'fbprophet', mode, number_disks, duration_fbprophet_udfs]]
columns = ['method', 'algorithm', 'mode','number_disks', 'duration']
df_udfs_time = spark.createDataFrame(rows, columns)

# Add this to our DataFrame
df_log_time = df_log_time.union(df_udfs_time)

In [0]:
# Parallel executions with Pandas UDFs for linear regression
start_time = time.perf_counter()

# Create output DataFrames
df_regression_udfs_prediction = spark_df.groupBy('disk_id').applyInPandas(regression_udf, result_schema).cache()
df_regression_udfs_rmse = df_regression_udfs_prediction.groupBy('disk_id').applyInPandas(evaluation_rmse, evaluation_schema).cache()

# df_regression_udfs_prediction.show()
df_regression_udfs_rmse.show()

duration_regression_udfs = time.perf_counter() - start_time
print("--- {} seconds ---".format(duration_regression_udfs))

In [0]:
#Save results
rows = [['pandas UDFs', 'linear regression', mode, number_disks, duration_regression_udfs]]
columns = ['method', 'algorithm', 'mode','number_disks', 'duration']
df_udfs_time = spark.createDataFrame(rows, columns)

# Add this to our DataFrame
df_log_time = df_log_time.union(df_udfs_time)

In [0]:
# Sequential predictions both algorithms
start_time = time.perf_counter()

# Create output DataFrames
df_fbprophet_sequential_prediction = pd.DataFrame(columns = ['date','free_space_gb','disk_id','prediction'])
df_fbprophet_sequential_rmse = pd.DataFrame(columns = ['disk_id','rmse'])
df_regression_sequential_prediction = pd.DataFrame(columns = ['date','free_space_gb','disk_id','prediction'])
df_regression_sequential_rmse = pd.DataFrame(columns = ['disk_id','rmse'])

for c in df['disk_id'].unique():
    df_each = df.loc[df['disk_id'] == c]
    
    # FBProphet predictions
    df_each_fbprophet_prediction, fbprophet_rmse = prophet_predict(df_each)
    df_fbprophet_sequential_prediction = df_fbprophet_sequential_prediction.append(df_each_fbprophet_prediction, ignore_index=True)
    df_fbprophet_sequential_rmse = df_fbprophet_sequential_rmse.append({'disk_id': c,'rmse':fbprophet_rmse}, ignore_index=True)
    
    # Linear regression predictions
    df_each_regression_prediction, regression_rmse = regression(df_each)
    df_regression_sequential_prediction = df_regression_sequential_prediction.append(df_each_regression_prediction, ignore_index=True)
    df_regression_sequential_rmse = df_regression_sequential_rmse.append({'disk_id': c,'rmse':regression_rmse}, ignore_index=True)
    
    # Combine RMSE DataFrames
    df_rmse_sequential = pd.merge(df_fbprophet_sequential_rmse,df_regression_sequential_rmse,how="inner",on='disk_id', suffixes=('_fbprophet','_regression'))    
    
duration_combined_sequential = time.perf_counter() - start_time
print("--- {} seconds ---".format(duration_combined_sequential))

In [0]:
rows = [['sequential', 'combined', mode, number_disks, duration_combined_sequential]]
columns = ['method', 'algorithm', 'mode','number_disks', 'duration']
df_sequential_combined_time = spark.createDataFrame(rows, columns)
 
# Add this to our DataFrame
df_log_time = df_log_time.union(df_sequential_combined_time)

In [0]:
def concurrent_full(df:pd.DataFrame) -> pd.DataFrame:
    # FBProphet predictions
    df_fbprophet_prediction, fbprophet_rmse = prophet_predict(df)
    
    # Linear regression predictions
    df_regression_prediction, regression_rmse = regression(df)
    
    # Create RMSE DataFrame
    disk_id = df.disk_id[0]
    df_rmse = pd.DataFrame({'disk_id': [disk_id], 'rmse_fbprophet':[fbprophet_rmse],'rmse_regression':[regression_rmse]})
    
    result = [df_fbprophet_prediction, df_regression_prediction, df_rmse]
    return result

In [0]:
# Parallel processing with concurrent.futures
# CPU intensive process, so will look to use ProcessPoolExecutor 
import concurrent.futures

# Create DataFrame of DataFrames to input into concurrent futures
disk_dfs = []
disks = df['disk_id'].unique()
for i in disks:
    disk_dfs.append(df[df.disk_id == i].copy())

# Create output DataFrames
df_fbprophet_concurrent_prediction = pd.DataFrame(columns = ['date','free_space_gb','disk_id','prediction'])
df_regression_concurrent_prediction = pd.DataFrame(columns = ['date','free_space_gb','disk_id','prediction'])
df_rmse_concurrent = pd.DataFrame(columns = ['disk_id','rmse_fbprophet', 'rmse_regression'])

start_time = time.perf_counter()

# Concurrent futures method of processing predictions in parallel fashion
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
    future = list(map(lambda x: executor.submit(concurrent_full, x), disk_dfs))
    finished,unfinished = concurrent.futures.wait(future)
    for x in finished:
        try:
            df_fbprophet_concurrent_prediction = df_fbprophet_concurrent_prediction.append(x.result()[0])
            df_regression_concurrent_prediction = df_regression_concurrent_prediction.append(x.result()[1])
            df_rmse_concurrent = df_rmse_concurrent.append(x.result()[2])
        except Exception as e:
            print(e,type(e))
 
            
duration_combined_concurrent = time.perf_counter() - start_time
print("--- {} seconds ---".format(duration_combined_concurrent))

In [0]:
# Save results
rows = [['concurrent.futures', 'combined', mode, number_disks, duration_combined_concurrent]]
columns = ['method', 'algorithm', 'mode', 'number_disks', 'duration']
df_concurrent_combined_time = spark.createDataFrame(rows, columns)
 
# Add this to our DataFrame
df_log_time = df_log_time.union(df_concurrent_combined_time)

In [0]:
# Parallel executions with Pandas UDFs
start_time = time.perf_counter()

# Create output DataFrames
df_fbprophet_udfs_prediction = spark_df.groupBy('disk_id').applyInPandas(prophet_predict_udf, result_schema).cache()
df_fbprophet_udfs_rmse = df_fbprophet_udfs_prediction.groupBy('disk_id').applyInPandas(evaluation_rmse, evaluation_schema).cache()
df_regression_udfs_prediction = spark_df.groupBy('disk_id').applyInPandas(regression_udf, result_schema).cache()
df_regression_udfs_rmse = df_regression_udfs_prediction.groupBy('disk_id').applyInPandas(evaluation_rmse, evaluation_schema).cache()

df_fbprophet_udfs_rmse = df_fbprophet_udfs_rmse.withColumnRenamed('rmse','rmse_fbprophet').withColumnRenamed('disk_id','disk_id_fbprophet')
df_regression_udfs_rmse = df_regression_udfs_rmse.withColumnRenamed('rmse','rmse_regression').withColumnRenamed('disk_id','disk_id_regression')
df_rmse_udfs = df_fbprophet_udfs_rmse.join(
    df_regression_udfs_rmse,
    df_fbprophet_udfs_rmse.disk_id_fbprophet == df_regression_udfs_rmse.disk_id_regression,
    "inner")
df_rmse_udfs.select("disk_id_fbprophet","rmse_fbprophet","rmse_regression").show()

duration_combined_udfs = time.perf_counter() - start_time
print("--- {} seconds ---".format(duration_combined_udfs))

In [0]:
rows = [['pandas UDFs', 'combined', mode, number_disks, duration_combined_udfs]]
columns = ['method', 'algorithm', 'mode', 'number_disks', 'duration']
df_udfs_combined_time = spark.createDataFrame(rows, columns)

# Add this to our DataFrame
df_log_time = df_log_time.union(df_udfs_combined_time)

In [0]:
df_log_time.show()

In [0]:
# Final step: add to SQL table and create visuals.
df_log_time.write.mode("append").saveAsTable("methods")