***GENERATED CODE FOR multivariateprophet PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class DBFSConnector:

    def fetch(spark, config):
        df = spark.read.\
            options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                    inferschema='true',
                    delimiter=eval(config)["delimiter"])\
            .csv(eval(config)['url'])
        display(df.limit(2).toPandas())
        return df

    def put(df, path):
        df.to_csv("/dbfs" + eval(config)['url'], header=True, index=False)
        return True


***TRANSFORMATIONS FUNCTIONS THAT WILL BE APPLIED ON DATA***

In [None]:
import json
import pyspark
from pyspark.sql.functions import col, when
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def replaceByMean(self, feature, df, mean_=-1):

        meanValue = df.select(mean(col(feature.name)).alias(
            'mean')).collect()[0]["mean"]
        df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                                         meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        maxValue = df.select(max(col(feature.name)).alias('max')).collect()[
            0]["max"]
        df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        minValue = df.select(min(col(feature.name)).alias('min')).collect()[
            0]["min"]
        df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        stddevValue = df.select(stddev(col(feature.name)).alias(
            'stddev')).collect()[0]["stddev"]
        df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        fillValue = df.where(col(feature.name).isNotNull()
                             ).head(1)[0][feature.name]
        df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


class TransformationTimeSeriesForecastingMain:
    # TODO: change df argument in run with following
    def run(transformationDF, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]['featureList']
        ForecastFE = configObj["FE"]
        featuresSelectedList = [ForecastFE['features']
                                ['timecolumn'], ForecastFE['features']['tocompare']]
        transformedDF = transformationDF.select(
            [c for c in transformationDF.columns if c in featuresSelectedList])
        transformedDF = CleanseData().replaceNullValues(featureData, transformedDF)

        transformedDF.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
        df = transformedDF.toPandas()
        return df


***AUTOML FUNCTIONS***

In [None]:
from prophet import Prophet
import pandas as pd


def driverManualProphet(df, timeColumn, tocompare):
    df.rename(columns={timeColumn: "ds", tocompare: "y"}, inplace=True)
    df['ds'] = df['ds'].astype(str).str[:-6]
    df['ds'] = pd.to_datetime(df['ds'])
    m = Prophet()
    m.fit(df)
    future = m.make_future_dataframe(periods=30)
    forecast = m.predict(future)
    forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()
    m.plot(forecast)
    return m


***READING DATAFRAME***

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

#%run multivariateprophetHooks.ipynb
try:
	#sourcePreExecutionHook()

	tamakidrivetrain = DBFSConnector.fetch(spark, "{'url': '/FileStore/platform/testdata/1643836134043_TamakiDrive_Train.csv', 'filename': 'TamakiDrive_Train.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'dbfs_token': '', 'dbfs_domain': '', 'FilePath': '/Retail and Marketing Models/Multivariate Time series/TamakiDrive_Train.csv', 'viewFileName': 'TamakiDrive_Train.csv', 'is_header': 'Use Header Line', 'baseType': 'dbfs', 'server_url': '/numtraPlatform/NumtraPlatformV2/uploads/platform/', 'results_url': 'http://52.142.62.122:4040/api/read/hdfs'}")
	#sourcePostExecutionHook(tamakidrivetrain)

except Exception as ex: 
	logging.error(ex)


***TRANSFORMING DATAFRAME***

In [None]:
#%run multivariateprophetHooks.ipynb
try:
	#transformationPreExecutionHook()

	timeseriesfe = TransformationTimeSeriesForecastingMain.run(tamakidrivetrain, json.dumps( {"FE": {"functionList": [{"function": "log"}, {"function": "Original"}], "stage_attributes": {}, "featureList": [{"transformationsData": [{}], "feature": "date", "type": "date", "selected": "True", "replaceby": "random", "stats": {"count": "", "mean": "", "stddev": "", "min": "", "max": "", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "date"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "cycling_counter", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "76", "mean": "1077.33", "stddev": "344.72", "min": "314", "max": "1954", "missing": "0"}, "updatedLabel": "cycling_counter"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "temp", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "76", "mean": "15.39", "stddev": "3.59", "min": "7.7", "max": "23.98571429", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "temp"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "rain", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "76", "mean": "0.08", "stddev": "0.2", "min": "0.0", "max": "1.128571429", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "rain"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "sun", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "76", "mean": "0.45", "stddev": "0.25", "min": "0.0", "max": "0.9714285709999999", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "sun"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "wind", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "76", "mean": "4.75", "stddev": "2.36", "min": "1.2357142859999999", "max": "11.26428571", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "wind"}], "features": {"timecolumn": "date", "tocompare": "cycling_counter"}, "dataPercentage": "50", "originalfile": "/FileStore/platform/testdata/1643836134043_TamakiDrive_Train.csv", "statFunction": {"function": "Original", "parameter": ""}}}))

	#transformationPostExecutionHook(timeseriesfe)

except Exception as ex: 
	logging.error(ex)


***TRAIN MODEL***

In [None]:
#%run multivariateprophetHooks.ipynb
try:
	#mlPreExecutionHook()

	modelData = driverManualProphet(timeseriesfe, 'date', 'cycling_counter')

	#mlPostExecutionHook(model)

except Exception as ex: 
	logging.error(ex)
