***GENERATED CODE FOR tsbinbuck PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import os
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class HDFSConnector:

    def fetch(spark, config):
        ################### INPUT HADOOP HOST PORT TO CONNECT WITH ###############################
        hdfs_server = str(os.environ['HDFS_SERVER'])
        hdfs_port = int(os.environ['HDFS_PORT'])
        df = spark.read.options(header='true', inferschema='true').csv(
            f"hdfs://{hdfs_server}:{hdfs_port}{eval(config)['url']}", header='true')
        display(df.limit(2).toPandas())
        return df

    def put(df, spark, config):
        return df.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                              delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


***TRANSFORMATIONS FUNCTIONS THAT WILL BE APPLIED ON DATA***

In [None]:
import pyspark
from pyspark.ml.feature import Bucketizer
import json
from pyspark.sql.functions import round
from pyspark.ml.feature import Binarizer
from pyspark.sql.functions import col, when
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def cleanValueForFE(self, value):
        if value == None:
            return ""
        elif str(value) == 'nan':
            return "nan"
        else:
            return value

    def replaceByMean(self, feature, df, mean_=-1):
        df1 = df
        df1 = df1.dropna()
        meanValue = self.cleanValueForFE(df1.select(
            mean(col(feature.name)).alias('mean')).collect()[0]["mean"])
        df = df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                      meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        df1 = df
        df1 = df1.dropna()
        maxValue = self.cleanValueForFE(df1.select(
            max(col(feature.name)).alias('max')).collect()[0]["max"])
        df = df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        df1 = df
        df1 = df1.dropna()
        minValue = self.cleanValueForFE(df1.select(
            min(col(feature.name)).alias('min')).collect()[0]["min"])
        df = df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        df1 = df
        df1 = df1.dropna()
        stddevValue = self.cleanValueForFE(df1.select(
            stddev(col(feature.name)).alias('stddev')).collect()[0]["stddev"])
        df = df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        df1 = df
        df1 = df1.dropna()
        fillValue = self.cleanValueForFE(
            df.where(col(feature.name).isNotNull()).head(1)[0][feature.name])
        df = df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


def BinarizerTransform(df, params, transformationData={}):
    dfReturn = df
    transform_params = params
    feature = transform_params['feature']
    outcol = feature + "_binarizer"
    dfReturn = dfReturn.withColumn("feature_cast", dfReturn[feature].cast("double")).drop(feature)\
        .withColumnRenamed("feature_cast", feature)

    dfReturn = dfReturn.fillna({feature: 0.0})
    binarizer = Binarizer(threshold=float(
        transformationData['threshold']), inputCol=feature, outputCol=outcol)
    binarizedDataFrame = binarizer.transform(dfReturn)

    # binarizedDataFrame=binarizedDataFrame.drop(feature).withColumnRenamed(outcol,feature)

    dfReturn = binarizedDataFrame
    dfReturn = dfReturn.withColumn(feature, round(dfReturn[feature], 2))

    return dfReturn


def bucketizerTransform(df, params, transformationData={}):
    dfReturn = df
    transform_params = params
    feature = transform_params['feature']

    dfReturn = dfReturn.fillna({feature: '0.0'})
    splits = []
    for bucket in transformationData["buckets"]:
        if float(bucket["lower_value"]) not in splits:
            splits.append(float(bucket["lower_value"]))
        if float(bucket["upper_value"]) not in splits:
            splits.append(float(bucket["upper_value"]))

    outcol = feature + "_bucketizer"
    bucketizer = Bucketizer(splits=splits, inputCol=feature, outputCol=outcol)
    bucketized = bucketizer.transform(dfReturn)
    dfReturn = bucketized
    dfReturn = dfReturn.withColumn(feature, round(dfReturn[feature], 2))

    return dfReturn


class TransformationTimeSeriesForecastingMain:
    # TODO: change df argument in run with following
    def run(transformationDF, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]['featureList']
        ForecastFE = configObj["FE"]
        featuresSelectedList = [ForecastFE['features']
                                ['timecolumn'], ForecastFE['features']['tocompare']]
        transformedDF = transformationDF.select(
            [c for c in transformationDF.columns if c in featuresSelectedList])
        transformedDF = CleanseData().replaceNullValues(featureData, transformedDF)
        transformationDF = BinarizerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Ship Mode_Same Day', 'threshold': '1.19', 'transformation_label': 'Binarizer'}], 'feature': 'Ship Mode_Same Day', 'type': 'real', 'selected': 'True', 'replaceby': 'mean', 'stats': {
                                              'count': '145', 'mean': '0.15', 'stddev': '0.57', 'min': '0.0', 'max': '4.0', 'missing': '0'}, 'transformation': [{'transformation': 'Binarizer', 'selectedAsDefault': 1}], 'updatedLabel': 'Ship Mode_Same Day'}, {'feature_label': 'Ship Mode_Same Day', 'threshold': '1.19', 'transformation_label': 'Binarizer'})
        transformationDF = transformationDF.drop('Ship Mode_Same Day')
        transformationDF = bucketizerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Segment_Home Office', 'buckets': [{'lower_value': '-Infinity', 'upper_value': '3'}, {'lower_value': '3', 'upper_value': '6'}, {'lower_value': '6', 'upper_value': '9'}, {'lower_value': '9', 'upper_value': '+Infinity'}], 'transformation_label': 'Bucketizer'}], 'feature': 'Segment_Home Office', 'type': 'real', 'selected': 'True', 'replaceby': 'mean', 'stats': {
            'count': '145', 'mean': '0.83', 'stddev': '1.63', 'min': '0.0', 'max': '11.0', 'missing': '0'}, 'transformation': [{'transformation': 'Bucketizer', 'selectedAsDefault': 1}], 'updatedLabel': 'Segment_Home Office'}, {'feature_label': 'Segment_Home Office', 'buckets': [{'lower_value': '-Infinity', 'upper_value': '3'}, {'lower_value': '3', 'upper_value': '6'}, {'lower_value': '6', 'upper_value': '9'}, {'lower_value': '9', 'upper_value': '+Infinity'}], 'transformation_label': 'Bucketizer'})
        transformationDF = transformationDF.drop('Segment_Home Office')

        transformedDF.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
        df = transformedDF.toPandas()
        return df


***AUTOML FUNCTIONS***

In [None]:
from prophet import Prophet
import pandas as pd


def driverProphet(df):
    df.columns = ['ds', 'y']
    df['ds'] = df['ds'].astype(str).str[:-6]
    df['ds'] = pd.to_datetime(df['ds'])
    m = Prophet()
    m.fit(df)
    future = m.make_future_dataframe(periods=365)
    forecast = m.predict(future)
    forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail()
    m.plot(forecast)
    return m


***READING DATAFRAME***

In [None]:
############## CREATE SPARK SESSION ############################ ENTER YOUR SPARK MASTER IP AND PORT TO CONNECT TO SERVER ################
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').getOrCreate()
#%run tsbinbuckHooks.ipynb
try:
	#sourcePreExecutionHook()

	sssalehitoryyecu = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/testdata/1716367374465_SSSalesHistory.csv', 'filename': 'SSSalesHistory.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'FilePath': '/TimeSeries/SuperStore/dataFiles/SSSalesHistory.csv', 'viewFileName': 'SSSalesHistory.csv', 'is_header': 'Use Header Line', 'baseType': 'hdfs', 'server_url': '/nexusMax/NexusMaxPlatform/uploads/platform/', 'results_url': 'http://dnm.bfirst.ai:44040/api/read/hdfs'}")

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***TRANSFORMING DATAFRAME***

In [None]:
#%run tsbinbuckHooks.ipynb
try:
	#transformationPreExecutionHook()

	tsbinbuckfeatureforecast = TransformationTimeSeriesForecastingMain.run(sssalehitoryyecu, json.dumps( {"FE": {"functionList": [{"function": "Original"}], "stage_attributes": {}, "featureList": [{"transformationsData": [{}], "feature": "Order_Date", "type": "date", "selected": "True", "replaceby": "random", "stats": {"count": "", "mean": "", "stddev": "", "min": "", "max": "", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Order_Date"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Sales", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "1146.65", "stddev": "2582.4", "min": "3.0", "max": "28106.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Sales"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Ship Mode_First Class", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "0.68", "stddev": "1.4", "min": "0.0", "max": "7.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Ship Mode_First Class"}, {"transformationsData": [{"feature_label": "Ship Mode_Same Day", "threshold": "1.19", "transformation_label": "Binarizer"}], "feature": "Ship Mode_Same Day", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "0.15", "stddev": "0.57", "min": "0.0", "max": "4.0", "missing": "0"}, "transformation": [{"transformation": "Binarizer", "selectedAsDefault": 1}], "updatedLabel": "Ship Mode_Same Day"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Ship Mode_Second Class", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "0.9", "stddev": "1.59", "min": "0.0", "max": "9.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Ship Mode_Second Class"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Ship Mode_Standard Class", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "3.18", "stddev": "3.6", "min": "0.0", "max": "17.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Ship Mode_Standard Class"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Segment_Consumer", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "2.62", "stddev": "2.75", "min": "0.0", "max": "16.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Segment_Consumer"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Segment_Corporate", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "1.46", "stddev": "2.35", "min": "0.0", "max": "11.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Segment_Corporate"}, {"transformationsData": [{"feature_label": "Segment_Home Office", "buckets": [{"lower_value": "-Infinity", "upper_value": "3"}, {"lower_value": "3", "upper_value": "6"}, {"lower_value": "6", "upper_value": "9"}, {"lower_value": "9", "upper_value": "+Infinity"}], "transformation_label": "Bucketizer"}], "feature": "Segment_Home Office", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "0.83", "stddev": "1.63", "min": "0.0", "max": "11.0", "missing": "0"}, "transformation": [{"transformation": "Bucketizer", "selectedAsDefault": 1}], "updatedLabel": "Segment_Home Office"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Region_Central", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "1.07", "stddev": "1.73", "min": "0.0", "max": "8.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Region_Central"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Region_East", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "1.28", "stddev": "1.98", "min": "0.0", "max": "8.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Region_East"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Region_South", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "1.23", "stddev": "2.03", "min": "0.0", "max": "9.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Region_South"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Region_West", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "1.26", "stddev": "1.71", "min": "0.0", "max": "7.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Region_West"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Category_Furniture", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "0.9", "stddev": "1.25", "min": "0.0", "max": "7.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Category_Furniture"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Category_Office Supplies", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "3.2", "stddev": "2.88", "min": "0.0", "max": "16.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Category_Office Supplies"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Category_Technology", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "145", "mean": "0.74", "stddev": "1.09", "min": "0.0", "max": "7.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Category_Technology"}], "features": {"timecolumn": "Order_Date", "tocompare": "Sales"}, "dataPercentage": 100, "originalfile": "/FileStore/platform/testdata/1716367374465_SSSalesHistory.csv", "statFunction": {"function": "Original", "parameter": ""}}}))

	#transformationPostExecutionHook(tsbinbuckfeatureforecast)

except Exception as ex: 
	logging.error(ex)


***TRAIN MODEL***

In [None]:
#%run tsbinbuckHooks.ipynb
try:
	#mlPreExecutionHook()

	model = driverProphet(tsbinbuckfeatureforecast)

	#mlPostExecutionHook(model)

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***PREDICT ON TRAINED MODEL***

In [None]:
try:
        future = model.make_future_dataframe(periods=30,freq='MS',include_history=False)
        predictedDataFrame = model.predict(future)
        display(model.plot(predictedDataFrame))
        display(model.plot_components(predictedDataFrame))
        display(predictedDataFrame)
except Exception as ex:
    logging.error(ex)

spark.stop()


