***GENERATED CODE FOR 3 PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import os
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class HDFSConnector:

    def fetch(spark, config):
        ################### INPUT HADOOP HOST PORT TO CONNECT WITH ###############################
        hdfs_server = str(os.environ['HDFS_SERVER'])
        hdfs_port = int(os.environ['HDFS_PORT'])
        df = spark.read.options(header='true', inferschema='true').csv(
            f"hdfs://{hdfs_server}:{hdfs_port}{eval(config)['url']}", header='true')
        display(df.limit(2).toPandas())
        return df

    def put(df, spark, config):
        return df.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                              delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


***TRANSFORMATIONS FUNCTIONS THAT WILL BE APPLIED ON DATA***

In [None]:
import json
from pyspark.ml.feature import Binarizer
from pyspark.sql.functions import round
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def cleanValueForFE(self, value):
        if value == None:
            return ""
        elif str(value) == 'nan':
            return "nan"
        else:
            return value

    def replaceByMean(self, feature, df, mean_=-1):
        df1 = df
        df1 = df1.dropna()
        meanValue = self.cleanValueForFE(df1.select(
            mean(col(feature.name)).alias('mean')).collect()[0]["mean"])
        df = df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                      meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        df1 = df
        df1 = df1.dropna()
        maxValue = self.cleanValueForFE(df1.select(
            max(col(feature.name)).alias('max')).collect()[0]["max"])
        df = df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        df1 = df
        df1 = df1.dropna()
        minValue = self.cleanValueForFE(df1.select(
            min(col(feature.name)).alias('min')).collect()[0]["min"])
        df = df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        df1 = df
        df1 = df1.dropna()
        stddevValue = self.cleanValueForFE(df1.select(
            stddev(col(feature.name)).alias('stddev')).collect()[0]["stddev"])
        df = df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        df1 = df
        df1 = df1.dropna()
        fillValue = self.cleanValueForFE(
            df.where(col(feature.name).isNotNull()).head(1)[0][feature.name])
        df = df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


def StringIndexerTransform(df, params, transformationData={}):
    dfReturn = df
    feature = params["feature"]

    dfReturn = dfReturn.fillna({feature: ''})
    outcol = feature + "_stringindexer"
    indexer = StringIndexer(
        inputCol=feature, outputCol=outcol, handleInvalid="skip")
    indexed = indexer.fit(dfReturn).transform(dfReturn)
    dfReturn = indexed
    distinct_values_list = dfReturn.select(
        outcol).distinct().rdd.map(lambda r: r[0]).collect()
    len_distinct_values_list = len(distinct_values_list)
    if len_distinct_values_list <= 4:
        changed_type_df = dfReturn.withColumn(
            outcol, dfReturn[outcol].cast(IntegerType()))
        return changed_type_df
    return dfReturn


def BinarizerTransform(df, params, transformationData={}):
    dfReturn = df
    transform_params = params
    feature = transform_params['feature']
    outcol = feature + "_binarizer"
    dfReturn = dfReturn.withColumn("feature_cast", dfReturn[feature].cast("double")).drop(feature)\
        .withColumnRenamed("feature_cast", feature)

    dfReturn = dfReturn.fillna({feature: 0.0})
    binarizer = Binarizer(threshold=float(
        transformationData['threshold']), inputCol=feature, outputCol=outcol)
    binarizedDataFrame = binarizer.transform(dfReturn)

    # binarizedDataFrame=binarizedDataFrame.drop(feature).withColumnRenamed(outcol,feature)

    dfReturn = binarizedDataFrame
    dfReturn = dfReturn.withColumn(feature, round(dfReturn[feature], 2))

    return dfReturn


class TransformationMain:
    # TODO: change df argument in run with following
    def run(transformationDF, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]
        transformationDF = CleanseData().replaceNullValues(featureData, transformationDF)
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Date Recorded', 'transformation_label': 'String Indexer'}], 'feature': 'Date Recorded', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
                                                  'count': '500', 'mean': '', 'stddev': '', 'min': '1/10/2022', 'max': '9/9/2022', 'missing': '0', 'distinct': '207'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Date Recorded'}, {'feature_label': 'Date Recorded', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Date Recorded')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Town', 'transformation_label': 'String Indexer'}], 'feature': 'Town', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'Ansonia', 'max': 'Woodbury', 'missing': '0', 'distinct': '84'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Town'}, {'feature_label': 'Town', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Town')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Address', 'transformation_label': 'String Indexer'}], 'feature': 'Address', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': '0 MILLER RD', 'max': 'WESTBROOK RD', 'missing': '0', 'distinct': '499'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Address'}, {'feature_label': 'Address', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Address')
        transformationDF = BinarizerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Sale Amount', 'threshold': 497097.73, 'transformation_label': 'Binarizer'}], 'feature': 'Sale Amount', 'type': 'real', 'selected': 'True', 'replaceby': 'mean', 'stats': {
            'count': '500', 'mean': '497097.73', 'stddev': '931478.31', 'min': '3000.0', 'max': '1.35E7', 'missing': '0'}, 'transformation': [{'transformation': 'Binarizer', 'selectedAsDefault': 1}], 'updatedLabel': 'Sale Amount'}, {'feature_label': 'Sale Amount', 'threshold': 497097.73, 'transformation_label': 'Binarizer'})
        transformationDF = transformationDF.drop('Sale Amount')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Property Type', 'transformation_label': 'String Indexer'}], 'feature': 'Property Type', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'Apartments', 'max': 'Vacant Land', 'missing': '0', 'distinct': '5'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Property Type'}, {'feature_label': 'Property Type', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Property Type')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Residential Type', 'transformation_label': 'String Indexer'}], 'feature': 'Residential Type', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '442', 'mean': '', 'stddev': '', 'min': 'Condo', 'max': 'Two Family', 'missing': '58', 'distinct': '4'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Residential Type'}, {'feature_label': 'Residential Type', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Residential Type')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Non Use Code', 'transformation_label': 'String Indexer'}], 'feature': 'Non Use Code', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '89', 'mean': '', 'stddev': '', 'min': '01 - Family', 'max': '28 - Use Assessment', 'missing': '379', 'distinct': '14'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Non Use Code'}, {'feature_label': 'Non Use Code', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Non Use Code')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Assessor Remarks', 'transformation_label': 'String Indexer'}], 'feature': 'Assessor Remarks', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '110', 'mean': '', 'stddev': '', 'min': '012-0020', 'max': 'multi-parcel sale', 'missing': '405', 'distinct': '89'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Assessor Remarks'}, {'feature_label': 'Assessor Remarks', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Assessor Remarks')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'OPM remarks', 'transformation_label': 'String Indexer'}], 'feature': 'OPM remarks', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '22', 'mean': '', 'stddev': '', 'min': 'BELOW MARKET', 'max': 'TWO SALES SAME DAY - ALSO SEE  #210116', 'missing': '487', 'distinct': '11'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'OPM remarks'}, {'feature_label': 'OPM remarks', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('OPM remarks')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Location', 'transformation_label': 'String Indexer'}], 'feature': 'Location', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'POINT (-71.827320993 41.834271993)', 'max': 'POINT (-73.555591979 41.071525007)', 'missing': '0', 'distinct': '495'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Location'}, {'feature_label': 'Location', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Location')
        display(transformationDF.limit(2).toPandas())
        return transformationDF


***AUTOML FUNCTIONS***

In [None]:
from tpot import TPOTClassifier
from sklearn.model_selection import train_test_split
import pyspark


def functionClassification(sparkDF, listOfFeatures, label):
    sparkDF.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
    df = (sparkDF.toPandas())
    X = (df.drop(label, axis=1))[listOfFeatures].values
    y = df[label].values
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, random_state=1, test_size=0.1)
    tpotModel = TPOTClassifier(verbosity=3, n_jobs=-1, generations=10, max_time_mins=5,
                               population_size=15, use_dask=True)
    tpotModel.fit(X_train, y_train)
    display(" Accuracy of Model : %s" % tpotModel.score(X_test, y_test))
    data = {'model': tpotModel,
            'X_test': X_test,
            'y_test': y_test,
            'label': label,
            'columnNames': listOfFeatures}
    return data


***READING DATAFRAME***

In [None]:
############## CREATE SPARK SESSION ############################ ENTER YOUR SPARK MASTER IP AND PORT TO CONNECT TO SERVER ################
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').getOrCreate()
#%run 3Hooks.ipynb
try:
	#sourcePreExecutionHook()

	reaalestatesales = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/uploadedSourceFiles/Reaal_Estate_Sales_2021.csv', 'filename': 'Reaal_Estate_Sales_2021.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'is_header': 'Use Header Line', 'domain': 'http://172.31.59.158', 'port': '40070', 'dirPath': '/FileStore/platform', 'server_url': '/nexusMax/NexusMaxPlatform/uploads/platform/'}")
	#sourcePostExecutionHook(reaalestatesales)

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***TRANSFORMING DATAFRAME***

In [None]:
#%run 3Hooks.ipynb
try:
	#transformationPreExecutionHook()

	autofe = TransformationMain.run(reaalestatesales,json.dumps( {"FE": [{"transformationsData": [{"transformation_label": "novalue"}], "feature": "Serial Number", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "1850414.05", "stddev": "13772470.95", "min": "10020", "max": "211200026", "missing": "0"}, "updatedLabel": "Serial Number"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "List Year", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "2021.0", "stddev": "0.0", "min": "2021", "max": "2021", "missing": "0"}, "updatedLabel": "List Year"}, {"transformationsData": [{"feature_label": "Date Recorded", "transformation_label": "String Indexer"}], "feature": "Date Recorded", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "1/10/2022", "max": "9/9/2022", "missing": "0", "distinct": "207"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Date Recorded"}, {"transformationsData": [{"feature_label": "Town", "transformation_label": "String Indexer"}], "feature": "Town", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Ansonia", "max": "Woodbury", "missing": "0", "distinct": "84"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Town"}, {"transformationsData": [{"feature_label": "Address", "transformation_label": "String Indexer"}], "feature": "Address", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "0 MILLER RD", "max": "WESTBROOK RD", "missing": "0", "distinct": "499"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Address"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Assessed Value", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "260120.42", "stddev": "507595.81", "min": "0", "max": "7350000", "missing": "0"}, "updatedLabel": "Assessed Value"}, {"transformationsData": [{"feature_label": "Sale Amount", "threshold": 497097.73, "transformation_label": "Binarizer"}], "feature": "Sale Amount", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "497097.73", "stddev": "931478.31", "min": "3000.0", "max": "1.35E7", "missing": "0"}, "transformation": [{"transformation": "Binarizer", "selectedAsDefault": 1}], "updatedLabel": "Sale Amount"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Sales Ratio", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "0.62", "stddev": "1.09", "min": "0.0", "max": "21.74267334", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Sales Ratio"}, {"transformationsData": [{"feature_label": "Property Type", "transformation_label": "String Indexer"}], "feature": "Property Type", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Apartments", "max": "Vacant Land", "missing": "0", "distinct": "5"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Property Type"}, {"transformationsData": [{"feature_label": "Residential Type", "transformation_label": "String Indexer"}], "feature": "Residential Type", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "442", "mean": "", "stddev": "", "min": "Condo", "max": "Two Family", "missing": "58", "distinct": "4"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Residential Type"}, {"transformationsData": [{"feature_label": "Non Use Code", "transformation_label": "String Indexer"}], "feature": "Non Use Code", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "89", "mean": "", "stddev": "", "min": "01 - Family", "max": "28 - Use Assessment", "missing": "379", "distinct": "14"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Non Use Code"}, {"transformationsData": [{"feature_label": "Assessor Remarks", "transformation_label": "String Indexer"}], "feature": "Assessor Remarks", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "110", "mean": "", "stddev": "", "min": "012-0020", "max": "multi-parcel sale", "missing": "405", "distinct": "89"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Assessor Remarks"}, {"transformationsData": [{"feature_label": "OPM remarks", "transformation_label": "String Indexer"}], "feature": "OPM remarks", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "22", "mean": "", "stddev": "", "min": "BELOW MARKET", "max": "TWO SALES SAME DAY - ALSO SEE  #210116", "missing": "487", "distinct": "11"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "OPM remarks"}, {"transformationsData": [{"feature_label": "Location", "transformation_label": "String Indexer"}], "feature": "Location", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "POINT (-71.827320993 41.834271993)", "max": "POINT (-73.555591979 41.071525007)", "missing": "0", "distinct": "495"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Location"}]}))

	#transformationPostExecutionHook(autofe)

except Exception as ex: 
	logging.error(ex)


***TRAIN MODEL***

In [None]:
#%run 3Hooks.ipynb
try:
	#mlPreExecutionHook()

	dataAutoML=functionClassification(autofe, ["Serial Number", "List Year", "Assessed Value", "Date Recorded_stringindexer", "Town_stringindexer", "Address_stringindexer", "Sale Amount_binarizer", "Property Type_stringindexer", "Residential Type_stringindexer", "Non Use Code_stringindexer", "Assessor Remarks_stringindexer", "OPM remarks_stringindexer", "Location_stringindexer"], "Sales Ratio")

	#mlPostExecutionHook(dataAutoML)

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***PREDICT ON TRAINED MODEL***

In [None]:
import pandas as pd
import numpy as np
import sklearn.metrics

try:
    model=dataAutoML['model']
    X_test=dataAutoML['X_test']
    y_test=dataAutoML['y_test']
    label=dataAutoML['label']
    columnNames=dataAutoML['columnNames']
    if label in columnNames:
        columnNames.remove(label)
    predicted=label+"_predicted"
    y_predicted=model.predict(X_test)
    df =pd.DataFrame(X_test , columns=columnNames)
    df[label]=y_test
    df[predicted]=y_predicted
    columnNames.insert(0,predicted)
    columnNames.insert(0,label)
    Accuracy = np.round((100 * sklearn.metrics.accuracy_score(y_true=y_test, y_pred=y_predicted)), 1)
    F1= np.round(
            (100 * sklearn.metrics.f1_score(y_true=y_test, y_pred=y_predicted, average="weighted")), 1)
    Precision= np.round((
                100 * sklearn.metrics.precision_score(y_true=y_test, y_pred=y_predicted, average="weighted")), 1)
    Recall = np.round((
                100 * sklearn.metrics.recall_score(y_true=y_test, y_pred=y_predicted, average="weighted")), 1)
    display(" Accuracy of Prediction on test data    : %s"%Accuracy)
    display(" F1 score of Prediction on test data    : %s"%F1)
    display(" Precision of Prediction on test data   : %s"%Precision)
    display(" Recall of Prediction on test data      : %s"%Recall)
    display(df.head())
except Exception as ex:
    logging.error(ex)

spark.stop()

