***GENERATED CODE FOR class PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import os
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class HDFSConnector:

    def fetch(spark, config):
        ################### INPUT HADOOP HOST PORT TO CONNECT WITH ###############################
        hdfs_server = str(os.environ['HDFS_SERVER'])
        hdfs_port = int(os.environ['HDFS_PORT'])
        df = spark.read.options(header='true', inferschema='true').csv(
            f"hdfs://{hdfs_server}:{hdfs_port}{eval(config)['url']}", header='true')
        display(df.limit(2).toPandas())
        return df

    def put(df, spark, config):
        return df.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                              delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


***TRANSFORMATIONS FUNCTIONS THAT WILL BE APPLIED ON DATA***

In [None]:
import json
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def cleanValueForFE(self, value):
        if value == None:
            return ""
        elif str(value) == 'nan':
            return "nan"
        else:
            return value

    def replaceByMean(self, feature, df, mean_=-1):
        df1 = df
        df1 = df1.dropna()
        meanValue = self.cleanValueForFE(df1.select(
            mean(col(feature.name)).alias('mean')).collect()[0]["mean"])
        df = df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                      meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        df1 = df
        df1 = df1.dropna()
        maxValue = self.cleanValueForFE(df1.select(
            max(col(feature.name)).alias('max')).collect()[0]["max"])
        df = df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        df1 = df
        df1 = df1.dropna()
        minValue = self.cleanValueForFE(df1.select(
            min(col(feature.name)).alias('min')).collect()[0]["min"])
        df = df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        df1 = df
        df1 = df1.dropna()
        stddevValue = self.cleanValueForFE(df1.select(
            stddev(col(feature.name)).alias('stddev')).collect()[0]["stddev"])
        df = df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        df1 = df
        df1 = df1.dropna()
        fillValue = self.cleanValueForFE(
            df.where(col(feature.name).isNotNull()).head(1)[0][feature.name])
        df = df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


def StringIndexerTransform(df, params, transformationData={}):
    dfReturn = df
    feature = params["feature"]

    dfReturn = dfReturn.fillna({feature: ''})
    outcol = feature + "_stringindexer"
    indexer = StringIndexer(
        inputCol=feature, outputCol=outcol, handleInvalid="skip")
    indexed = indexer.fit(dfReturn).transform(dfReturn)
    dfReturn = indexed
    distinct_values_list = dfReturn.select(
        outcol).distinct().rdd.map(lambda r: r[0]).collect()
    len_distinct_values_list = len(distinct_values_list)
    if len_distinct_values_list <= 4:
        changed_type_df = dfReturn.withColumn(
            outcol, dfReturn[outcol].cast(IntegerType()))
        return changed_type_df
    return dfReturn


class TransformationMain:
    # TODO: change df argument in run with following
    def run(transformationDF, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]
        transformationDF = CleanseData().replaceNullValues(featureData, transformationDF)
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Region Code', 'transformation_label': 'String Indexer'}], 'feature': 'Region Code', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
                                                  'count': '500', 'mean': '', 'stddev': '', 'min': 'ATL', 'max': 'SFO', 'missing': '0', 'distinct': '10'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Region Code'}, {'feature_label': 'Region Code', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Region Code')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'State Code', 'transformation_label': 'String Indexer'}], 'feature': 'State Code', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'AK ', 'max': 'WY ', 'missing': '0', 'distinct': '52'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'State Code'}, {'feature_label': 'State Code', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('State Code')
        display(transformationDF.limit(2).toPandas())
        return transformationDF


***AUTOML FUNCTIONS***

In [None]:
from tpot import TPOTClassifier
from sklearn.model_selection import train_test_split
import pyspark


def functionClassification(sparkDF, listOfFeatures, label):
    sparkDF.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
    df = (sparkDF.toPandas())
    X = (df.drop(label, axis=1))[listOfFeatures].values
    y = df[label].values
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, random_state=1, test_size=0.1)
    tpotModel = TPOTClassifier(verbosity=3, n_jobs=-1, generations=10, max_time_mins=5,
                               population_size=15, use_dask=True)
    tpotModel.fit(X_train, y_train)
    display(" Accuracy of Model : %s" % tpotModel.score(X_test, y_test))
    data = {'model': tpotModel,
            'X_test': X_test,
            'y_test': y_test,
            'label': label,
            'columnNames': listOfFeatures}
    return data


***READING DATAFRAME***

In [None]:
############## CREATE SPARK SESSION ############################ ENTER YOUR SPARK MASTER IP AND PORT TO CONNECT TO SERVER ################
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').getOrCreate()
#%run classHooks.ipynb
try:
	#sourcePreExecutionHook()

	ssadisability = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/uploadedSourceFiles/ssa_disability.csv', 'filename': 'ssa_disability.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'is_header': 'Use Header Line', 'domain': 'http://172.31.59.158', 'port': '40070', 'dirPath': '/FileStore/platform', 'server_url': '/nexusMax/NexusMaxPlatform/uploads/platform/'}")
	#sourcePostExecutionHook(ssadisability)

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***TRANSFORMING DATAFRAME***

In [None]:
#%run classHooks.ipynb
try:
	#transformationPreExecutionHook()

	classautofe = TransformationMain.run(ssadisability,json.dumps( {"FE": [{"transformationsData": [{"feature_label": "Region Code", "transformation_label": "String Indexer"}], "feature": "Region Code", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "ATL", "max": "SFO", "missing": "0", "distinct": "10"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Region Code"}, {"transformationsData": [{"feature_label": "State Code", "transformation_label": "String Indexer"}], "feature": "State Code", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "AK ", "max": "WY ", "missing": "0", "distinct": "52"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "State Code"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Date", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "2005.32", "stddev": "2.78", "min": "2001", "max": "2010", "missing": "0"}, "updatedLabel": "Date"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Population age 18-64*", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "3615996.39", "stddev": "4045541.47", "min": "308796", "max": "23765508", "missing": "0"}, "updatedLabel": "Population age 18-64*"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "SSA Disability Beneficiaries  age 18-64*", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "194976.85", "stddev": "197614.39", "min": "11780", "max": "1125410", "missing": "0"}, "updatedLabel": "SSA Disability Beneficiar..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Percent of Adult Population Receiving SSA Adult Disability Benefits", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "5.56", "stddev": "1.78", "min": "2.6", "max": "12.42", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Percent of Adult Populati..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Eligible Adult Population*", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "3421019.54", "stddev": "3854803.68", "min": "297016", "max": "22640098", "missing": "0"}, "updatedLabel": "Eligible Adult Population..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Adult Receipts", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "41800.94", "stddev": "43419.25", "min": "2459", "max": "249112", "missing": "0"}, "updatedLabel": "Adult Receipts"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Eligible Adult Population Filing Rate", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "1.25", "stddev": "0.4", "min": "0.55", "max": "2.64", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Eligible Adult Population..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Favorable Adult Determinations", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "14225.64", "stddev": "16058.22", "min": "764", "max": "97457", "missing": "0"}, "updatedLabel": "Favorable Adult Determina..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Eligible Adult Population Allowance Rate", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "0.42", "stddev": "0.11", "min": "0.16", "max": "1.08", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Eligible Adult Population..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "All Adult Determinations", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "40124.45", "stddev": "41920.61", "min": "2450", "max": "233656", "missing": "0"}, "updatedLabel": "All Adult Determinations"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Adult Favorable  Determination Rate", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "36.61", "stddev": "7.68", "min": "21.5", "max": "66.22", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Adult Favorable  Determin..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Population under age 18*", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "1412077.88", "stddev": "1661249.37", "min": "0", "max": "9435682", "missing": "0"}, "updatedLabel": "Population under age 18*"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "SSI Disabled Child (DC) Beneficiaries*", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "20136.24", "stddev": "23054.02", "min": "0", "max": "120530", "missing": "0"}, "updatedLabel": "SSI Disabled Child (DC) B..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Percent of Population under age 18 Receiving SSI DC Benefits", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "1.38", "stddev": "0.72", "min": "0.0", "max": "3.97", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Percent of Population und..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Eligible Child Population*", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "1391941.64", "stddev": "1639951.73", "min": "0", "max": "9337164", "missing": "0"}, "updatedLabel": "Eligible Child Population..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "SSI Disabled Child (DC) Receipts", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "7951.52", "stddev": "8678.8", "min": "0", "max": "46852", "missing": "0"}, "updatedLabel": "SSI Disabled Child (DC) R..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Eligible Child Population Filing Rate", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "0.55", "stddev": "0.31", "min": "0.0", "max": "1.69", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Eligible Child Population..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Favorable SSI Child (DC) Determinations", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "3282.43", "stddev": "3806.37", "min": "0", "max": "20286", "missing": "0"}, "updatedLabel": "Favorable SSI Child (DC) ..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Eligible Child Population Allowance Rate", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "0.23", "stddev": "0.11", "min": "0.0", "max": "0.68", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Eligible Child Population..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "All SSI Disabled Child Determinations", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "7742.08", "stddev": "8458.88", "min": "0", "max": "42956", "missing": "0"}, "updatedLabel": "All SSI Disabled Child De..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "SSI Disabled Child Allowance Rate", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "45.83", "stddev": "11.87", "min": "0.0", "max": "79.46", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "SSI Disabled Child Allowa..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "All Determinations", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "47866.53", "stddev": "50030.59", "min": "2731", "max": "269121", "missing": "0"}, "updatedLabel": "All Determinations"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "All Favorable Determinations", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "17508.07", "stddev": "19727.76", "min": "894", "max": "114527", "missing": "0"}, "updatedLabel": "All Favorable Determinati..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Favorable Determination Rate", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "37.92", "stddev": "7.46", "min": "22.98", "max": "65.55", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Favorable Determination R..."}]}))

	#transformationPostExecutionHook(classautofe)

except Exception as ex: 
	logging.error(ex)


***TRAIN MODEL***

In [None]:
#%run classHooks.ipynb
try:
	#mlPreExecutionHook()

	dataAutoML=functionClassification(classautofe, ["Date", "Population age 18-64*", "SSA Disability Beneficiaries  age 18-64*", "Percent of Adult Population Receiving SSA Adult Disability Benefits", "Eligible Adult Population*", "Adult Receipts", "Favorable Adult Determinations", "Eligible Adult Population Allowance Rate", "All Adult Determinations", "Adult Favorable  Determination Rate", "Population under age 18*", "SSI Disabled Child (DC) Beneficiaries*", "Percent of Population under age 18 Receiving SSI DC Benefits", "Eligible Child Population*", "SSI Disabled Child (DC) Receipts", "Eligible Child Population Filing Rate", "Favorable SSI Child (DC) Determinations", "Eligible Child Population Allowance Rate", "All SSI Disabled Child Determinations", "SSI Disabled Child Allowance Rate", "All Determinations", "All Favorable Determinations", "Favorable Determination Rate", "Region Code_stringindexer", "State Code_stringindexer"], "Eligible Adult Population Filing Rate")

	#mlPostExecutionHook(dataAutoML)

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***PREDICT ON TRAINED MODEL***

In [None]:
import pandas as pd
import numpy as np
import sklearn.metrics

try:
    model=dataAutoML['model']
    X_test=dataAutoML['X_test']
    y_test=dataAutoML['y_test']
    label=dataAutoML['label']
    columnNames=dataAutoML['columnNames']
    if label in columnNames:
        columnNames.remove(label)
    predicted=label+"_predicted"
    y_predicted=model.predict(X_test)
    df =pd.DataFrame(X_test , columns=columnNames)
    df[label]=y_test
    df[predicted]=y_predicted
    columnNames.insert(0,predicted)
    columnNames.insert(0,label)
    Accuracy = np.round((100 * sklearn.metrics.accuracy_score(y_true=y_test, y_pred=y_predicted)), 1)
    F1= np.round(
            (100 * sklearn.metrics.f1_score(y_true=y_test, y_pred=y_predicted, average="weighted")), 1)
    Precision= np.round((
                100 * sklearn.metrics.precision_score(y_true=y_test, y_pred=y_predicted, average="weighted")), 1)
    Recall = np.round((
                100 * sklearn.metrics.recall_score(y_true=y_test, y_pred=y_predicted, average="weighted")), 1)
    display(" Accuracy of Prediction on test data    : %s"%Accuracy)
    display(" F1 score of Prediction on test data    : %s"%F1)
    display(" Precision of Prediction on test data   : %s"%Precision)
    display(" Recall of Prediction on test data      : %s"%Recall)
    display(df.head())
except Exception as ex:
    logging.error(ex)

spark.stop()

