***GENERATED CODE FOR customercomplaint3 PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import os
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class HDFSConnector:

    def fetch(spark, config):
        ################### INPUT HADOOP HOST PORT TO CONNECT WITH ###############################
        hdfs_server = str(os.environ['HDFS_SERVER'])
        hdfs_port = int(os.environ['HDFS_PORT'])
        df = spark.read.options(header='true', inferschema='true').csv(
            f"hdfs://{hdfs_server}:{hdfs_port}{eval(config)['url']}", header='true')
        display(df.limit(2).toPandas())
        return df

    def put(df, spark, config):
        return df.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                              delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


***TRANSFORMATIONS FUNCTIONS THAT WILL BE APPLIED ON DATA***

In [None]:
from pyspark.sql.functions import dayofmonth, month, year, col
import json
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def cleanValueForFE(self, value):
        if value == None:
            return ""
        elif str(value) == 'nan':
            return "nan"
        else:
            return value

    def replaceByMean(self, feature, df, mean_=-1):
        df1 = df
        df1 = df1.dropna()
        meanValue = self.cleanValueForFE(df1.select(
            mean(col(feature.name)).alias('mean')).collect()[0]["mean"])
        df = df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                      meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        df1 = df
        df1 = df1.dropna()
        maxValue = self.cleanValueForFE(df1.select(
            max(col(feature.name)).alias('max')).collect()[0]["max"])
        df = df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        df1 = df
        df1 = df1.dropna()
        minValue = self.cleanValueForFE(df1.select(
            min(col(feature.name)).alias('min')).collect()[0]["min"])
        df = df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        df1 = df
        df1 = df1.dropna()
        stddevValue = self.cleanValueForFE(df1.select(
            stddev(col(feature.name)).alias('stddev')).collect()[0]["stddev"])
        df = df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        df1 = df
        df1 = df1.dropna()
        fillValue = self.cleanValueForFE(
            df.where(col(feature.name).isNotNull()).head(1)[0][feature.name])
        df = df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


def StringIndexerTransform(df, params, transformationData={}):
    dfReturn = df
    feature = params["feature"]

    dfReturn = dfReturn.fillna({feature: ''})
    outcol = feature + "_stringindexer"
    indexer = StringIndexer(
        inputCol=feature, outputCol=outcol, handleInvalid="skip")
    indexed = indexer.fit(dfReturn).transform(dfReturn)
    dfReturn = indexed
    distinct_values_list = dfReturn.select(
        outcol).distinct().rdd.map(lambda r: r[0]).collect()
    len_distinct_values_list = len(distinct_values_list)
    if len_distinct_values_list <= 4:
        changed_type_df = dfReturn.withColumn(
            outcol, dfReturn[outcol].cast(IntegerType()))
        return changed_type_df
    return dfReturn


def ExtractDateTransform(df, params, transformationData={}):
    transform_params = params
    dfReturn = df
    feature = transform_params['feature']
    dfReturn = dfReturn.fillna({feature: ''})
    dfReturn = dfReturn.withColumn(
        feature+'dayofmonth', dayofmonth(col(feature)))
    dfReturn = dfReturn.withColumn(feature+'month', month(col(feature)))
    dfReturn = dfReturn.withColumn(feature+'year', year(col(feature)))
    return dfReturn


class TransformationMain:
    # TODO: change df argument in run with following
    def run(transformationDF, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]
        transformationDF = CleanseData().replaceNullValues(featureData, transformationDF)
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Submitted via', 'transformation_label': 'String Indexer'}], 'feature': 'Submitted via', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
                                                  'count': '500', 'mean': '', 'stddev': '', 'min': 'Phone', 'max': 'Web Referral', 'missing': '0', 'distinct': '6'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Submitted via'}, {'feature_label': 'Submitted via', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Submitted via')
        transformationDF = ExtractDateTransform(transformationDF, {'transformationsData': [{'feature_label': 'Date submitted', 'transformation_label': 'Extract Date'}], 'feature': 'Date submitted', 'type': 'date', 'selected': 'True', 'replaceby': 'random', 'stats': {
            'count': '', 'mean': '', 'stddev': '', 'min': '', 'max': '', 'missing': '0'}, 'transformation': [{'transformation': 'Extract Date', 'selectedAsDefault': 1}], 'generated': 'False', 'updatedLabel': 'Date submitted'}, {'feature_label': 'Date submitted', 'transformation_label': 'Extract Date'})
        transformationDF = transformationDF.drop('Date submitted')
        transformationDF = ExtractDateTransform(transformationDF, {'transformationsData': [{'feature_label': 'Date received', 'transformation_label': 'Extract Date'}], 'feature': 'Date received', 'type': 'date', 'selected': 'True', 'replaceby': 'random', 'stats': {
            'count': '', 'mean': '', 'stddev': '', 'min': '', 'max': '', 'missing': '0'}, 'transformation': [{'transformation': 'Extract Date', 'selectedAsDefault': 1}], 'generated': 'False', 'updatedLabel': 'Date received'}, {'feature_label': 'Date received', 'transformation_label': 'Extract Date'})
        transformationDF = transformationDF.drop('Date received')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'State', 'transformation_label': 'String Indexer'}], 'feature': 'State', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'AK', 'max': 'WV', 'missing': '0', 'distinct': '46'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'State'}, {'feature_label': 'State', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('State')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Product', 'transformation_label': 'String Indexer'}], 'feature': 'Product', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'Checking or savings account', 'max': 'Vehicle loan or lease', 'missing': '0', 'distinct': '8'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Product'}, {'feature_label': 'Product', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Product')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Sub-product', 'transformation_label': 'String Indexer'}], 'feature': 'Sub-product', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'CD (Certificate of Deposit)', 'max': 'Virtual currency', 'missing': '0', 'distinct': '27'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Sub-product'}, {'feature_label': 'Sub-product', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Sub-product')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Issue', 'transformation_label': 'String Indexer'}], 'feature': 'Issue', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'Advertising and marketing, including promotional offers', 'max': 'Written notification about debt', 'missing': '0', 'distinct': '44'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Issue'}, {'feature_label': 'Issue', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Issue')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Sub-issue', 'transformation_label': 'String Indexer'}], 'feature': 'Sub-issue', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {'count': '428', 'mean': '', 'stddev': '', 'min': 'Account information incorrect',
                                                                                                                                                                                                                                                                    'max': 'You never received your bill or did not know a payment was due', 'missing': '83', 'distinct': '76'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Sub-issue'}, {'feature_label': 'Sub-issue', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Sub-issue')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Company public response', 'transformation_label': 'String Indexer'}], 'feature': 'Company public response', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {'count': '484', 'mean': '', 'stddev': '', 'min': 'Company believes the complaint is the result of a misunderstanding',
                                                                                                                                                                                                                                                                                                'max': 'Company has responded to the consumer and the CFPB and chooses not to provide a public response', 'missing': '16', 'distinct': '2'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Company public response'}, {'feature_label': 'Company public response', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Company public response')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Company response to consumer', 'transformation_label': 'String Indexer'}], 'feature': 'Company response to consumer', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'Closed with explanation', 'max': 'In progress', 'missing': '0', 'distinct': '4'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Company response to consu...'}, {'feature_label': 'Company response to consumer', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop(
            'Company response to consumer')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Timely response?', 'transformation_label': 'String Indexer'}], 'feature': 'Timely response?', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '495', 'mean': '', 'stddev': '', 'min': 'No', 'max': 'Yes', 'missing': '12', 'distinct': '2'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Timely response?'}, {'feature_label': 'Timely response?', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Timely response?')
        display(transformationDF.limit(2).toPandas())
        return transformationDF


***AUTOML FUNCTIONS***

In [None]:
from sklearn.model_selection import train_test_split
from tpot import TPOTRegressor
import pyspark


def functionRegression(sparkDF, listOfFeatures, label):
    sparkDF.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
    df = sparkDF.toPandas()
    X = (df.drop(label, axis=1))[listOfFeatures].values
    y = df[label].values
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, random_state=1, test_size=0.1)
    tpotModel = TPOTRegressor(verbosity=3, generations=10, max_time_mins=5,
                              n_jobs=-1, random_state=25, population_size=15, use_dask=True)
    tpotModel.fit(X_train, y_train)
    display(" Error rate of Model : %s" % tpotModel.score(X_test, y_test))
    data = {'model': tpotModel,
            'X_test': X_test,
            'y_test': y_test,
            'label': label,
            'columnNames': listOfFeatures}
    return data


***READING DATAFRAME***

In [None]:
############## CREATE SPARK SESSION ############################ ENTER YOUR SPARK MASTER IP AND PORT TO CONNECT TO SERVER ################
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').getOrCreate()
#%run customercomplaint3Hooks.ipynb
try:
	#sourcePreExecutionHook()

	consumercomplaints = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/uploadedSourceFiles/Consumer_Complaints.csv', 'filename': 'Consumer_Complaints.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'is_header': 'Use Header Line', 'domain': 'http://172.31.59.158', 'port': '40070', 'dirPath': '/FileStore/platform', 'server_url': '/nexusMax/NexusMaxPlatform/uploads/platform/'}")
	#sourcePostExecutionHook(consumercomplaints)

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***TRANSFORMING DATAFRAME***

In [None]:
#%run customercomplaint3Hooks.ipynb
try:
	#transformationPreExecutionHook()

	customercomplaintautofe = TransformationMain.run(consumercomplaints,json.dumps( {"FE": [{"transformationsData": [{"transformation_label": "novalue"}], "feature": "Complain+F63+A+A1:K100", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "5199037.46", "stddev": "1375282.94", "min": "2472740", "max": "7389849", "missing": "0"}, "updatedLabel": "Complain+F63+A+A1:K100"}, {"transformationsData": [{"feature_label": "Submitted via", "transformation_label": "String Indexer"}], "feature": "Submitted via", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Phone", "max": "Web Referral", "missing": "0", "distinct": "6"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Submitted via"}, {"transformationsData": [{"feature_label": "Date submitted", "transformation_label": "Extract Date"}], "feature": "Date submitted", "type": "date", "selected": "True", "replaceby": "random", "stats": {"count": "", "mean": "", "stddev": "", "min": "", "max": "", "missing": "0"}, "transformation": [{"transformation": "Extract Date", "selectedAsDefault": 1}], "generated": "False", "updatedLabel": "Date submitted"}, {"transformationsData": [{"feature_label": "Date received", "transformation_label": "Extract Date"}], "feature": "Date received", "type": "date", "selected": "True", "replaceby": "random", "stats": {"count": "", "mean": "", "stddev": "", "min": "", "max": "", "missing": "0"}, "transformation": [{"transformation": "Extract Date", "selectedAsDefault": 1}], "generated": "False", "updatedLabel": "Date received"}, {"transformationsData": [{"feature_label": "State", "transformation_label": "String Indexer"}], "feature": "State", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "AK", "max": "WV", "missing": "0", "distinct": "46"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "State"}, {"transformationsData": [{"feature_label": "Product", "transformation_label": "String Indexer"}], "feature": "Product", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Checking or savings account", "max": "Vehicle loan or lease", "missing": "0", "distinct": "8"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Product"}, {"transformationsData": [{"feature_label": "Sub-product", "transformation_label": "String Indexer"}], "feature": "Sub-product", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "CD (Certificate of Deposit)", "max": "Virtual currency", "missing": "0", "distinct": "27"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Sub-product"}, {"transformationsData": [{"feature_label": "Issue", "transformation_label": "String Indexer"}], "feature": "Issue", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Advertising and marketing, including promotional offers", "max": "Written notification about debt", "missing": "0", "distinct": "44"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Issue"}, {"transformationsData": [{"feature_label": "Sub-issue", "transformation_label": "String Indexer"}], "feature": "Sub-issue", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "428", "mean": "", "stddev": "", "min": "Account information incorrect", "max": "You never received your bill or did not know a payment was due", "missing": "83", "distinct": "76"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Sub-issue"}, {"transformationsData": [{"feature_label": "Company public response", "transformation_label": "String Indexer"}], "feature": "Company public response", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "484", "mean": "", "stddev": "", "min": "Company believes the complaint is the result of a misunderstanding", "max": "Company has responded to the consumer and the CFPB and chooses not to provide a public response", "missing": "16", "distinct": "2"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Company public response"}, {"transformationsData": [{"feature_label": "Company response to consumer", "transformation_label": "String Indexer"}], "feature": "Company response to consumer", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Closed with explanation", "max": "In progress", "missing": "0", "distinct": "4"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Company response to consu..."}, {"transformationsData": [{"feature_label": "Timely response?", "transformation_label": "String Indexer"}], "feature": "Timely response?", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "495", "mean": "", "stddev": "", "min": "No", "max": "Yes", "missing": "12", "distinct": "2"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Timely response?"}]}))

	#transformationPostExecutionHook(customercomplaintautofe)

except Exception as ex: 
	logging.error(ex)


***TRAIN MODEL***

In [None]:
#%run customercomplaint3Hooks.ipynb
try:
	#mlPreExecutionHook()

	dataAutoML=functionRegression(customercomplaintautofe, ["Submitted via_stringindexer", "Date submitted_dayofmonth", "Date submitted_month", "Date submitted_year", "Date received_dayofmonth", "Date received_month", "Date received_year", "State_stringindexer", "Product_stringindexer", "Sub-product_stringindexer", "Issue_stringindexer", "Sub-issue_stringindexer", "Company public response_stringindexer", "Company response to consumer_stringindexer", "Timely response?_stringindexer"], "Complain+F63+A+A1:K100")

	#mlPostExecutionHook(dataAutoML)

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***PREDICT ON TRAINED MODEL***

In [None]:
import pandas as pd
import numpy as np
import sklearn.metrics

try:
    model=dataAutoML ['model']
    X_test=dataAutoML['X_test']
    y_test=dataAutoML['y_test']
    label=dataAutoML['label']
    columnNames=dataAutoML['columnNames']
    if label in columnNames:
        columnNames.remove(label)
    predicted=label+"_predicted"
    y_predicted=model.predict(X_test)
    df =pd.DataFrame(X_test , columns=columnNames)
    df[label]=y_test
    df[predicted]=y_predicted
    columnNames.insert(0,predicted)
    columnNames.insert(0,label)
    df = df[columnNames]
    R2 = np.round(sklearn.metrics.r2_score(y_test, y_predicted), 1)
    Mean_Squared_Error = np.round(sklearn.metrics.mean_squared_error(y_test, y_predicted), 1)
    Mean_Absolute_Error = np.round(sklearn.metrics.mean_absolute_error(y_test, y_predicted), 1)
    display(" R2 score of Prediction on test data    : %s"%R2)
    display(" Mean Squared Error of Prediction on test data    : %s"%Mean_Squared_Error)
    display(" Mean Absolute Error of Prediction on test data   : %s"%Mean_Absolute_Error)
    display(df.head())
except Exception as ex:
    logging.error(ex)

spark.stop()

