***GENERATED CODE FOR stockexchangeprediction PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

***TRANSFORMATIONS FUNCTIONS THAT WILL BE APPLIED ON DATA***

In [None]:
from pyspark.sql.functions import dayofmonth, month, year, col
from pyspark.sql.functions import col, udf, round
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col, udf, unix_timestamp
import json
from pyspark.sql.functions import col, round
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import col, udf
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, when
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def replaceByMean(self, feature, df, mean_=-1):

        meanValue = df.select(mean(col(feature.name)).alias(
            'mean')).collect()[0]["mean"]
        df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                                         meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        maxValue = df.select(max(col(feature.name)).alias('max')).collect()[
            0]["max"]
        df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        minValue = df.select(min(col(feature.name)).alias('min')).collect()[
            0]["min"]
        df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        stddevValue = df.select(stddev(col(feature.name)).alias(
            'stddev')).collect()[0]["stddev"]
        df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        fillValue = df.where(col(feature.name).isNotNull()
                             ).head(1)[0][feature.name]
        df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


def ConvertToDate(df, params, transformationData={}):
    dfReturn = df
    feature = params["feature"]
    dateFormate = transformationData["dateFormat"]
    dfReturn = dfReturn.withColumn(feature, unix_timestamp(feature, dateFormate)
                                   .cast("double").cast("timestamp"))
    dfReturn = dfReturn.withColumn(
        feature + '_dayofmonth', dayofmonth(col(feature)))
    dfReturn = dfReturn.withColumn(feature + '_month', month(col(feature)))
    dfReturn = dfReturn.withColumn(feature + '_year', year(col(feature)))
    return dfReturn


def StringIndexerTransform(df, params, transformationData={}):
    dfReturn = df
    feature = params["feature"]

    dfReturn = dfReturn.fillna({feature: ''})
    outcol = feature + "_stringindexer"
    indexer = StringIndexer(
        inputCol=feature, outputCol=outcol, handleInvalid="skip")
    indexed = indexer.fit(dfReturn).transform(dfReturn)
    dfReturn = indexed
    distinct_values_list = dfReturn.select(
        outcol).distinct().rdd.map(lambda r: r[0]).collect()
    len_distinct_values_list = len(distinct_values_list)
    if len_distinct_values_list <= 4:
        changed_type_df = dfReturn.withColumn(
            outcol, dfReturn[outcol].cast(IntegerType()))
        return changed_type_df
    return dfReturn


def vectorAssemblerTransform(df, param):

    dfReturn = df

    if (type(param) == str):
        outcol = param + "_vector"
        assembler = VectorAssembler(inputCols=[param], outputCol=outcol)
        dfReturn = assembler.transform(dfReturn)
        return dfReturn

    if (type(param) == list):
        vecAssembler = VectorAssembler(inputCols=param, outputCol="features")
        new_df = vecAssembler.transform(df)
        return new_df


def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    return udf(to_array_, ArrayType(DoubleType()))(col)


def minMaxScalarTransform(df, params, transformationData={}):
    transform_params = params
    dfReturn = df
    feature = transform_params['feature']

    dfReturn = dfReturn.fillna({feature: '0.0'})
    lowerbound = float(transformationData["min_max_scalar"].get("min", 0.0))
    upperbound = float(transformationData["min_max_scalar"].get("max", 1.0))
    outcol = feature + "_minmaxscalar"

    featureVector = feature + "_vector"
    dfReturn = vectorAssemblerTransform(dfReturn, feature)

    mmScaleModel = MinMaxScaler(
        inputCol=featureVector, outputCol=outcol, min=lowerbound, max=upperbound)
    scaledata = mmScaleModel.fit(dfReturn).transform(dfReturn)

    dfReturn = scaledata

    dfReturn = dfReturn.withColumn("final_col", to_array(dfReturn[outcol]))\
        .select(dfReturn.schema.names + [col("final_col")[0]])

    dfReturn = dfReturn.drop(outcol).drop(featureVector)\
        .withColumnRenamed("final_col[0]", outcol)
    dfReturn = dfReturn.withColumn(feature, round(dfReturn[outcol], 2))
    return dfReturn


def standardScalarTransform(df, params, transformationData={}):
    dfReturn = df
    transform_params = params
    feature = transform_params['feature']
    dfReturn = dfReturn.fillna({feature: '0.0'})
    scalarFlags = transformationData["std_scalar"]
    if scalarFlags["mean_flag"]:
        stdflag = False
        meanflag = True
    elif scalarFlags["std_flag"]:
        stdflag = True
        meanflag = False
    outcol = feature + "_standardscalar"
    featureVector = feature + "_vector"
    dfReturn = vectorAssemblerTransform(dfReturn, feature)

    standardscale = StandardScaler(inputCol=featureVector, outputCol=outcol, withStd=stdflag,
                                   withMean=meanflag)
    scaledata = standardscale.fit(dfReturn).transform(dfReturn)
    dfReturn = scaledata.withColumn("final_col", to_array(scaledata[outcol]))\
        .select(scaledata.schema.names + [col("final_col")[0]])

    dfReturn = dfReturn.drop(outcol).drop(featureVector)\
        .withColumnRenamed("final_col[0]", outcol)
    dfReturn = dfReturn.withColumn(feature, round(dfReturn[outcol], 2))
    return dfReturn


class TransformationMain:
    # TODO: change df argument in run with following
    def run(transformationDF, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]
        transformationDF = CleanseData().replaceNullValues(featureData, transformationDF)
        transformationDF = ConvertToDate(transformationDF, {'transformationsData': [{'feature_label': 'date', 'dateFormat': 'MM/dd/yyyy', 'transformation_label': 'Convert to Date'}], 'feature': 'date', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
                                         'count': '84918', 'mean': '', 'stddev': '', 'min': '01/02/2010', 'max': '31/12/2015', 'missing': '0'}, 'transformation': [{'transformation': 'Convert to Date', 'selectedAsDefault': 1}], 'updatedLabel': 'date'}, {'feature_label': 'date', 'dateFormat': 'MM/dd/yyyy', 'transformation_label': 'Convert to Date'})
        transformationDF = transformationDF.drop('date')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'symbol', 'transformation_label': 'String Indexer'}], 'feature': 'symbol', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '84918', 'mean': '', 'stddev': '', 'min': 'A', 'max': 'ZTS', 'missing': '0'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'symbol'}, {'feature_label': 'symbol', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('symbol')
        transformationDF = minMaxScalarTransform(transformationDF, {'transformationsData': [{'feature_label': 'open', 'min_max_scalar': {'min': '0', 'max': '100'}, 'transformation_label': 'Min Max Scalar'}], 'feature': 'open', 'type': 'real', 'selected': 'True', 'replaceby': 'mean', 'stats': {
            'count': '84918', 'mean': '64.83', 'stddev': '73.82', 'min': '1.66', 'max': '1498.73999', 'missing': '0'}, 'transformation': [{'transformation': 'Min Max Scalar', 'selectedAsDefault': 1}], 'updatedLabel': 'open'}, {'feature_label': 'open', 'min_max_scalar': {'min': '0', 'max': '100'}, 'transformation_label': 'Min Max Scalar'})
        transformationDF = transformationDF.drop('open')
        transformationDF = minMaxScalarTransform(transformationDF, {'transformationsData': [{'feature_label': 'close', 'min_max_scalar': {'min': '0', 'max': '100'}, 'transformation_label': 'Min Max Scalar'}], 'feature': 'close', 'type': 'real', 'selected': 'True', 'replaceby': 'mean', 'stats': {
            'count': '84918', 'mean': '64.85', 'stddev': '73.76', 'min': '1.59', 'max': '1479.819946', 'missing': '0'}, 'transformation': [{'transformation': 'Min Max Scalar', 'selectedAsDefault': 1}], 'updatedLabel': 'close'}, {'feature_label': 'close', 'min_max_scalar': {'min': '0', 'max': '100'}, 'transformation_label': 'Min Max Scalar'})
        transformationDF = transformationDF.drop('close')
        transformationDF = minMaxScalarTransform(transformationDF, {'transformationsData': [{'feature_label': 'low', 'min_max_scalar': {'min': '0', 'max': '100'}, 'transformation_label': 'Min Max Scalar'}], 'feature': 'low', 'type': 'real', 'selected': 'True', 'replaceby': 'mean', 'stats': {
            'count': '84918', 'mean': '64.17', 'stddev': '73.05', 'min': '1.53', 'max': '1478.01001', 'missing': '0'}, 'transformation': [{'transformation': 'Min Max Scalar', 'selectedAsDefault': 1}], 'updatedLabel': 'low'}, {'feature_label': 'low', 'min_max_scalar': {'min': '0', 'max': '100'}, 'transformation_label': 'Min Max Scalar'})
        transformationDF = transformationDF.drop('low')
        transformationDF = minMaxScalarTransform(transformationDF, {'transformationsData': [{'feature_label': 'high', 'min_max_scalar': {'min': '0', 'max': '100'}, 'transformation_label': 'Min Max Scalar'}], 'feature': 'high', 'type': 'real', 'selected': 'True', 'replaceby': 'mean', 'stats': {
            'count': '84918', 'mean': '65.48', 'stddev': '74.5', 'min': '1.81', 'max': '1498.73999', 'missing': '0'}, 'transformation': [{'transformation': 'Min Max Scalar', 'selectedAsDefault': 1}], 'updatedLabel': 'high'}, {'feature_label': 'high', 'min_max_scalar': {'min': '0', 'max': '100'}, 'transformation_label': 'Min Max Scalar'})
        transformationDF = transformationDF.drop('high')
        transformationDF = standardScalarTransform(transformationDF, {'transformationsData': [{'feature_label': 'volume', 'std_scalar': {'mean_flag': 'True', 'std_flag': 'False'}, 'transformation_label': 'Standard Scalar'}], 'feature': 'volume', 'transformation': [{'transformation': 'Standard Scalar', 'selectedAsDefault': 1}], 'type': 'numeric', 'replaceby': 'mean', 'selected': 'True', 'stats': {
            'count': '84918', 'mean': '5301615.65', 'stddev': '11911851.06', 'min': '0', 'max': '582294100', 'missing': '0'}, 'updatedLabel': 'volume'}, {'feature_label': 'volume', 'std_scalar': {'mean_flag': 'True', 'std_flag': 'False'}, 'transformation_label': 'Standard Scalar'})
        transformationDF = transformationDF.drop('volume')
        display(transformationDF.limit(2).toPandas())
        return transformationDF


***AUTOML FUNCTIONS***

In [None]:
from sklearn.model_selection import train_test_split
from tpot import TPOTRegressor
import pyspark


def functionRegression(sparkDF, listOfFeatures, label):
    sparkDF.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
    df = sparkDF.toPandas()
    X = (df.drop(label, axis=1))[listOfFeatures].values
    y = df[label].values
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, random_state=1, test_size=0.1)
    tpotModel = TPOTRegressor(verbosity=3, generations=10, max_time_mins=5,
                              n_jobs=-1, random_state=25, population_size=15)
    tpotModel.fit(X_train, y_train)
    display(" Error rate of Model : %s" % tpotModel.score(X_test, y_test))
    data = {'model': tpotModel,
            'X_test': X_test,
            'y_test': y_test,
            'label': label,
            'columnNames': listOfFeatures}
    return data


***READING DATAFRAME***

In [None]:
#%run stockexchangepredictionHooks.ipynb
try:
	#sourcePreExecutionHook()

	nycstockexchangeprices = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/uploadedSourceFiles/1613113878615_NYCStockExChangePrices.csv', 'file_type': 'Delimeted', 'delimiter': ',', 'is_header': 'Use Header Line', 'FilePath': '/Finance Models/New York Stock Exchange - Regression /NYCStockExChangePrices.csv', 'filename': '1613113878615_NYCStockExChangePrices.csv', 'viewFileName': 'NYCStockExChangePrices.csv'}")

except Exception as ex: 
	logging.error(ex)


***TRANSFORMING DATAFRAME***

In [None]:
#%run stockexchangepredictionHooks.ipynb
try:
	#transformationPreExecutionHook()

	autofe = TransformationMain.run(nycstockexchangeprices,json.dumps( {"FE": [{"transformationsData": [{"feature_label": "date", "dateFormat": "MM/dd/yyyy", "transformation_label": "Convert to Date"}], "feature": "date", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "84918", "mean": "", "stddev": "", "min": "01/02/2010", "max": "31/12/2015", "missing": "0"}, "transformation": [{"transformation": "Convert to Date", "selectedAsDefault": 1}], "updatedLabel": "date"}, {"transformationsData": [{"feature_label": "symbol", "transformation_label": "String Indexer"}], "feature": "symbol", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "84918", "mean": "", "stddev": "", "min": "A", "max": "ZTS", "missing": "0"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "symbol"}, {"transformationsData": [{"feature_label": "open", "min_max_scalar": {"min": "0", "max": "100"}, "transformation_label": "Min Max Scalar"}], "feature": "open", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "84918", "mean": "64.83", "stddev": "73.82", "min": "1.66", "max": "1498.73999", "missing": "0"}, "transformation": [{"transformation": "Min Max Scalar", "selectedAsDefault": 1}], "updatedLabel": "open"}, {"transformationsData": [{"feature_label": "close", "min_max_scalar": {"min": "0", "max": "100"}, "transformation_label": "Min Max Scalar"}], "feature": "close", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "84918", "mean": "64.85", "stddev": "73.76", "min": "1.59", "max": "1479.819946", "missing": "0"}, "transformation": [{"transformation": "Min Max Scalar", "selectedAsDefault": 1}], "updatedLabel": "close"}, {"transformationsData": [{"feature_label": "low", "min_max_scalar": {"min": "0", "max": "100"}, "transformation_label": "Min Max Scalar"}], "feature": "low", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "84918", "mean": "64.17", "stddev": "73.05", "min": "1.53", "max": "1478.01001", "missing": "0"}, "transformation": [{"transformation": "Min Max Scalar", "selectedAsDefault": 1}], "updatedLabel": "low"}, {"transformationsData": [{"feature_label": "high", "min_max_scalar": {"min": "0", "max": "100"}, "transformation_label": "Min Max Scalar"}], "feature": "high", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "84918", "mean": "65.48", "stddev": "74.5", "min": "1.81", "max": "1498.73999", "missing": "0"}, "transformation": [{"transformation": "Min Max Scalar", "selectedAsDefault": 1}], "updatedLabel": "high"}, {"transformationsData": [{"feature_label": "volume", "std_scalar": {"mean_flag": "True", "std_flag": "False"}, "transformation_label": "Standard Scalar"}], "feature": "volume", "transformation": [{"transformation": "Standard Scalar", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "84918", "mean": "5301615.65", "stddev": "11911851.06", "min": "0", "max": "582294100", "missing": "0"}, "updatedLabel": "volume"}, {"feature": "symbol_stringindexer_transform", "transformation": [{"transformation": "novalue", "selectedAsDefault": 0}], "transformationsData": [{"transformation_label": "novalue"}], "type": "real", "selected": "True", "stats": {"count": "84918", "mean": "232.0", "stddev": "139.96", "min": "0.0", "max": "500.0", "missing": "0"}, "updatedLabel": "symbol_stringindexer_tran..."}, {"feature": "date_converttodate_transform", "transformation": [{"transformation": "novalue", "selectedAsDefault": 0}], "selected": "True", "transformationsData": [{"transformation_label": "novalue"}], "type": "date", "stats": {"count": "", "mean": "", "stddev": "", "min": "", "max": "", "missing": "51317"}, "updatedLabel": "date_converttodate_transf..."}, {"feature": "date_dayofmonth", "transformation": [{"transformation": "novalue", "selectedAsDefault": 0}], "selected": "True", "transformationsData": [{"transformation_label": "novalue"}], "type": "numeric", "stats": {"count": "33601", "mean": "6.52", "stddev": "3.46", "min": "1", "max": "12", "missing": "51317"}, "updatedLabel": "date_dayofmonth"}, {"feature": "date_month", "transformation": [{"transformation": "novalue", "selectedAsDefault": 0}], "selected": "True", "transformationsData": [{"transformation_label": "novalue"}], "type": "numeric", "stats": {"count": "33601", "mean": "6.59", "stddev": "3.45", "min": "1", "max": "12", "missing": "51317"}, "updatedLabel": "date_month"}, {"feature": "date_year", "transformation": [{"transformation": "novalue", "selectedAsDefault": 0}], "selected": "True", "transformationsData": [{"transformation_label": "novalue"}], "type": "numeric", "stats": {"count": "33601", "mean": "2013.05", "stddev": "2.0", "min": "2010", "max": "2016", "missing": "51317"}, "updatedLabel": "date_year"}, {"feature": "open_minmaxscalar_transform", "transformation": [{"transformation": "novalue", "selectedAsDefault": 0}], "selected": "True", "transformationsData": [{"transformation_label": "novalue"}], "type": "real", "stats": {"count": "84918", "mean": "4.22", "stddev": "4.93", "min": "0.0", "max": "100.0", "missing": "0"}, "updatedLabel": "open_minmaxscalar_transfo..."}, {"feature": "close_minmaxscalar_transform", "transformation": [{"transformation": "novalue", "selectedAsDefault": 0}], "selected": "True", "transformationsData": [{"transformation_label": "novalue"}], "type": "real", "stats": {"count": "84918", "mean": "4.28", "stddev": "4.99", "min": "0.0", "max": "100.0", "missing": "0"}, "updatedLabel": "close_minmaxscalar_transf..."}, {"feature": "low_minmaxscalar_transform", "transformation": [{"transformation": "novalue", "selectedAsDefault": 0}], "selected": "True", "transformationsData": [{"transformation_label": "novalue"}], "type": "real", "stats": {"count": "84918", "mean": "4.24", "stddev": "4.95", "min": "0.0", "max": "100.0", "missing": "0"}, "updatedLabel": "low_minmaxscalar_transfor..."}, {"feature": "high_minmaxscalar_transform", "transformation": [{"transformation": "novalue", "selectedAsDefault": 0}], "selected": "True", "transformationsData": [{"transformation_label": "novalue"}], "type": "real", "stats": {"count": "84918", "mean": "4.25", "stddev": "4.98", "min": "0.0", "max": "100.0", "missing": "0"}, "updatedLabel": "high_minmaxscalar_transfo..."}, {"feature": "volume_standardscalar_transform", "transformation": [{"transformation": "novalue", "selectedAsDefault": 0}], "selected": "True", "transformationsData": [{"transformation_label": "novalue"}], "type": "real", "stats": {"count": "84918", "mean": "0.0", "stddev": "11911851.06", "min": "-5301615.646859347", "max": "5.769924843531407E8", "missing": "0"}, "updatedLabel": "volume_standardscalar_tra..."}]}))

	#transformationPostExecutionHook(autofe)

except Exception as ex: 
	logging.error(ex)


***TRAIN MODEL***

In [None]:
#%run stockexchangepredictionHooks.ipynb
try:
	#mlPreExecutionHook()

	dataAutoML=functionRegression(autofe, ["date_converttodate", "symbol_stringindexer", "open_minmaxscalar", "low_minmaxscalar", "high_minmaxscalar", "volume_standardscalar", "date_dayofmonth", "date_month", "date_year"], "close_minmaxscalar")

	#mlPostExecutionHook(dataAutoML)

except Exception as ex: 
	logging.error(ex)


***PREDICT ON TRAINED MODEL***

In [None]:
import pandas as pd
import numpy as np
import sklearn.metrics

try:
    model=dataAutoML ['model']
    X_test=dataAutoML['X_test']
    y_test=dataAutoML['y_test']
    label=dataAutoML['label']
    columnNames=dataAutoML['columnNames']
    if label in columnNames:
        columnNames.remove(label)
    predicted=label+"_predicted"
    y_predicted=model.predict(X_test)
    df =pd.DataFrame(X_test , columns=columnNames)
    df[label]=y_test
    df[predicted]=y_predicted
    columnNames.insert(0,predicted)
    columnNames.insert(0,label)
    df = df[columnNames]
    R2 = np.round(sklearn.metrics.r2_score(y_test, y_predicted), 1)
    Mean_Squared_Error = np.round(sklearn.metrics.mean_squared_error(y_test, y_predicted), 1)
    Mean_Absolute_Error = np.round(sklearn.metrics.mean_absolute_error(y_test, y_predicted), 1)
    display(" R2 score of Prediction on test data    : %s"%R2)
    display(" Mean Squared Error of Prediction on test data    : %s"%Mean_Squared_Error)
    display(" Mean Absolute Error of Prediction on test data   : %s"%Mean_Absolute_Error)
    display(df.head())
except Exception as ex:
    logging.error(ex)

