***GENERATED CODE FOR heartfailureclassf PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import os
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class HDFSConnector:

    def fetch(spark, config):
        ################### INPUT HADOOP HOST PORT TO CONNECT WITH ###############################
        hdfs_server = str(os.environ['HDFS_SERVER'])
        hdfs_port = int(os.environ['HDFS_PORT'])
        df = spark.read.options(header='true', inferschema='true').csv(
            f"hdfs://{hdfs_server}:{hdfs_port}{eval(config)['url']}", header='true')
        display(df.limit(2).toPandas())
        return df

    def put(df, spark, config):
        return df.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                              delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


***TRANSFORMATIONS FUNCTIONS THAT WILL BE APPLIED ON DATA***

In [None]:
import json
from pyspark.sql.functions import round
from pyspark.ml.feature import Binarizer
from pyspark.sql.functions import col, when
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def cleanValueForFE(self, value):
        if value == None:
            return ""
        elif str(value) == 'nan':
            return "nan"
        else:
            return value

    def replaceByMean(self, feature, df, mean_=-1):
        df1 = df
        df1 = df1.dropna()
        meanValue = self.cleanValueForFE(df1.select(
            mean(col(feature.name)).alias('mean')).collect()[0]["mean"])
        df = df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                      meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        df1 = df
        df1 = df1.dropna()
        maxValue = self.cleanValueForFE(df1.select(
            max(col(feature.name)).alias('max')).collect()[0]["max"])
        df = df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        df1 = df
        df1 = df1.dropna()
        minValue = self.cleanValueForFE(df1.select(
            min(col(feature.name)).alias('min')).collect()[0]["min"])
        df = df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        df1 = df
        df1 = df1.dropna()
        stddevValue = self.cleanValueForFE(df1.select(
            stddev(col(feature.name)).alias('stddev')).collect()[0]["stddev"])
        df = df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        df1 = df
        df1 = df1.dropna()
        fillValue = self.cleanValueForFE(
            df.where(col(feature.name).isNotNull()).head(1)[0][feature.name])
        df = df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


def BinarizerTransform(df, params, transformationData={}):
    dfReturn = df
    transform_params = params
    feature = transform_params['feature']
    outcol = feature + "_binarizer"
    dfReturn = dfReturn.withColumn("feature_cast", dfReturn[feature].cast("double")).drop(feature)\
        .withColumnRenamed("feature_cast", feature)

    dfReturn = dfReturn.fillna({feature: 0.0})
    binarizer = Binarizer(threshold=float(
        transformationData['threshold']), inputCol=feature, outputCol=outcol)
    binarizedDataFrame = binarizer.transform(dfReturn)

    # binarizedDataFrame=binarizedDataFrame.drop(feature).withColumnRenamed(outcol,feature)

    dfReturn = binarizedDataFrame
    dfReturn = dfReturn.withColumn(feature, round(dfReturn[feature], 2))

    return dfReturn


class TransformationMain:
    # TODO: change df argument in run with following
    def run(transformationDF, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]
        transformationDF = CleanseData().replaceNullValues(featureData, transformationDF)
        transformationDF = BinarizerTransform(transformationDF, {'transformationsData': [{'feature_label': 'platelets', 'threshold': 263358.02675585286, 'transformation_label': 'Binarizer'}], 'feature': 'platelets', 'type': 'real', 'selected': 'True', 'replaceby': 'mean', 'stats': {
                                              'count': '299', 'mean': '263358.03', 'stddev': '97804.24', 'min': '25100.0', 'max': '850000.0', 'missing': '0'}, 'transformation': [{'transformation': 'Binarizer', 'selectedAsDefault': 1}], 'updatedLabel': 'platelets'}, {'feature_label': 'platelets', 'threshold': 263358.02675585286, 'transformation_label': 'Binarizer'})
        transformationDF = transformationDF.drop('platelets')
        display(transformationDF.limit(2).toPandas())
        return transformationDF


***AUTOML FUNCTIONS***

In [None]:
from sklearn.metrics import mean_absolute_error
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.linear_model import ElasticNet
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
import xgboost as xgb
import numpy as np
import lightgbm as lgb
import optuna
from sklearn import preprocessing
lbl = preprocessing.LabelEncoder()


class GradientBoostingClassifierOptunaDriverClass:
    def set_tag(trial, df_new, features, label):
        def objective(trial):
            cat_col = df_new.select_dtypes(
                exclude=['int', 'float']).columns.values
            for c in cat_col:
                df_new[c] = df_new[c].astype('category')
                df_new[c] = lbl.fit_transform(df_new[c].astype(str))
            XRaw = df_new[features].values
            yRaw = df_new[label].values
            X_train, X_test, y_train, y_test = train_test_split(
                XRaw, yRaw, random_state=42)
            max_depth = trial.suggest_int("max_depth", 2, 32)
            n_est = trial.suggest_int('n_estimators', 1, 50)
            max_features = trial.suggest_categorical(
                'max_features', ['auto', 'sqrt', 'log2'])
            max_leaf_nodes = trial.suggest_int('max_leaf_nodes', 2, 10)
            classifier_obj = GradientBoostingClassifier(max_depth=max_depth, n_estimators=n_est,
                                                        max_features=max_features, max_leaf_nodes=max_leaf_nodes)

            classifier_obj.fit(X_train, y_train)
            preds = classifier_obj.predict(X_test)
            acc = accuracy_score(np.round(preds), y_test).mean()
            return acc
        study = optuna.create_study(direction='maximize')
        study.optimize(objective, n_trials=10, timeout=600)
        best_parameters = study.best_trial.params

        return best_parameters


class RandomForestClassifierOptunaDriverClass:
    def set_tag(trial, df_new, features, label):
        def objective(trial):
            cat_col = df_new.select_dtypes(
                exclude=['int', 'float']).columns.values
            for c in cat_col:
                df_new[c] = df_new[c].astype('category')
                df_new[c] = lbl.fit_transform(df_new[c].astype(str))
            XRaw = df_new[features].values
            yRaw = df_new[label].values
            X_train, X_test, y_train, y_test = train_test_split(
                XRaw, yRaw, random_state=42)

            rf_max_depth = int(trial.suggest_int("max_depth", 2, 15))
            n_est = trial.suggest_int('n_estimators', 1, 50)
            max_leaf_nodes = trial.suggest_int('max_leaf_nodes', 2, 10)
            max_features = trial.suggest_categorical(
                'max_features', ['auto', 'sqrt', 'log2'])
            classifier_obj = RandomForestClassifier(
                max_depth=rf_max_depth, n_estimators=n_est, max_features=max_features, max_leaf_nodes=max_leaf_nodes)
            classifier_obj.fit(X_train, y_train)
            preds = classifier_obj.predict(X_test)
            acc = accuracy_score(np.round(preds), y_test).mean()
            return acc
        study = optuna.create_study(direction='maximize')
        study.optimize(objective, n_trials=10, timeout=600)
        best_parameters = study.best_trial.params

        return best_parameters


class LightGBMClassifierOptunaDriverClass:
    def set_tag(trial, df_new, features, label):
        def objective(trial):
            cat_col = df_new.select_dtypes(
                exclude=['int', 'float']).columns.values
            for c in cat_col:
                df_new[c] = df_new[c].astype('category')
                df_new[c] = lbl.fit_transform(df_new[c].astype(str))
            XRaw = df_new[features].values
            yRaw = df_new[label].values
            X_train, X_test, y_train, y_test = train_test_split(
                XRaw, yRaw, random_state=42)
            param = {
                'task': 'train',
                'boosting_type': 'gbdt',
                'objective': 'binary',
                'metric': 'binary_logloss',
                "lambda_l1": trial.suggest_loguniform("lambda_l1", 1e-8, 10.0),
                "lambda_l2": trial.suggest_loguniform("lambda_l2", 1e-8, 10.0),
                "num_leaves": trial.suggest_int("num_leaves", 2, 50),
                "feature_fraction": trial.suggest_uniform("feature_fraction", 0.4, 1.0),
                "bagging_fraction": trial.suggest_uniform("bagging_fraction", 0.4, 1.0),
                "bagging_freq": trial.suggest_int("bagging_freq", 1, 7),
                "min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
            }
            y_train_copy = [item for sublist in y_train for item in sublist]
            if len(list(set(tuple(y_train_copy)))) > 2:
                param['num_class'] = len(list(set(y_train_copy)))
                param['objective'] = 'multiclass'
                param['metric'] = 'multi_logloss'

            gbm = lgb.LGBMClassifier(**param)
            gbm.fit(X_train, y_train)

            preds = gbm.predict(X_test)
            acc = accuracy_score(np.round(preds), y_test).mean()

            return acc

        study = optuna.create_study(direction='maximize')
        study.optimize(objective, n_trials=10, timeout=600)
        best_parameters = study.best_trial.params

        return best_parameters


class xgboostClassifierOptunaDriverClass:
    def set_tag(trial, df_new, features, label):
        def objective(trial):
            cat_col = df_new.select_dtypes(
                exclude=['int', 'float']).columns.values
            for c in cat_col:
                df_new[c] = df_new[c].astype('category')
                df_new[c] = lbl.fit_transform(df_new[c].astype(str))
            XRaw = df_new[features].values
            yRaw = df_new[label].values
            X_train, X_test, y_train, y_test = train_test_split(
                XRaw, yRaw, random_state=42)

            n_estimators = trial.suggest_int('n_estimators', 1, 50)
            max_depth = trial.suggest_int('max_depth', 1, 10)
            min_child_weight = trial.suggest_int('min_child_weight', 1, 10)
            scale_pos_weight = trial.suggest_int('scale_pos_weight', 1, 50)
            subsample = trial.suggest_discrete_uniform(
                'subsample', 0.5, 0.9, 0.1)
            colsample_bytree = trial.suggest_discrete_uniform(
                'colsample_bytree', 0.5, 0.9, 0.1)

            xgboost_tuna = xgb.XGBClassifier(random_state=42,
                                             n_estimators=n_estimators,
                                             max_depth=max_depth,
                                             min_child_weight=min_child_weight,
                                             scale_pos_weight=scale_pos_weight,
                                             subsample=subsample,
                                             colsample_bytree=colsample_bytree,
                                             )
            xgboost_tuna.fit(X_train, y_train)
            tuna_pred_test = xgboost_tuna.predict(X_test)

            return ((accuracy_score(y_test, tuna_pred_test)))

        study = optuna.create_study(direction='maximize')
        study.optimize(objective, n_trials=10, timeout=600)
        best_parameters = study.best_trial.params

        return best_parameters


class ElasticNetOptunaDriverClass:
    def set_tag(trial, df_new, features, label):
        def objective(trial):
            cat_col = df_new.select_dtypes(
                exclude=['int', 'float']).columns.values
            for c in cat_col:
                df_new[c] = df_new[c].astype('category')
                df_new[c] = lbl.fit_transform(df_new[c].astype(str))
            XRaw = df_new[features].values
            yRaw = df_new[label].values
            X_train, X_test, y_train, y_test = train_test_split(
                XRaw, yRaw, random_state=42)

            l1_ratio = trial.suggest_loguniform("l1_ratio", 0.1, 1)
            alpha = trial.suggest_loguniform("alpha", 0.1, 1)
            selection = trial.suggest_categorical(
                "selection", ['cyclic', 'random'])
            e_net = ElasticNet(
                alpha=alpha, l1_ratio=l1_ratio, selection=selection)
            e_net.fit(X_train, y_train)
            predictions = e_net.predict(X_test)
            loss = mean_absolute_error(predictions, y_test)
            return loss
        study = optuna.create_study(direction='minimize')
        study.optimize(objective, n_trials=20, timeout=600)
        best_parameters = study.best_trial.params

        return best_parameters


class PolynomialregressionOptunaDriverClass:
    def set_tag(trial, XRaw, yRaw):
        def objective(trial):
            X_train, X_test, y_train, y_test = train_test_split(
                XRaw, yRaw, random_state=42)
            degree = trial.suggest_int("degree", 2, 5)
            polynomial = PolynomialFeatures(degree=degree)
            x_train_transformed = polynomial.fit_transform(X_train)
            x_test_transformed = polynomial.fit_transform(X_test)
            clf = LinearRegression()
            clf.fit(x_train_transformed, y_train)
            predictions = clf.predict(x_test_transformed)
            loss = mean_absolute_error(predictions, y_test)
            return loss
        study = optuna.create_study(direction='minimize')
        study.optimize(objective, n_trials=5, timeout=600)
        best_parameters = study.best_trial.params

        return best_parameters


def XGboostClassifier(df, features, labels, TrainingPercent=100):
    rows = df.count()
    percentrow = TrainingPercent / 100 * rows
    percentrows = int(percentrow)
    df = df.limit(percentrows)
    df = (df.toPandas())

    class_optuna = xgboostClassifierOptunaDriverClass()
    optuna_parameters = class_optuna.set_tag(df, features, labels)

    xgboost_model = XGBClassifier(**optuna_parameters)

    X_train, X_test, y_train, y_test = train_test_split(
        df[features], df[labels])

    xgboost_model.fit(X_train, y_train)

    display(" Accuracy of Model : %s" % xgboost_model.score(X_test, y_test))

    data = {'model': xgboost_model,
            'X_test': X_test,
            'y_test': y_test,
            'label': labels,
            'columnNames': df.columns}
    return data


***READING DATAFRAME***

In [None]:
############## CREATE SPARK SESSION ############################ ENTER YOUR SPARK MASTER IP AND PORT TO CONNECT TO SERVER ################
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').getOrCreate()
#%run heartfailureclassfHooks.ipynb
try:
	#sourcePreExecutionHook()

	heartfailurehitory = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/testdata/1708348442600_HeartFailureHistory.csv', 'filename': 'HeartFailureHistory.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'dbfs_token': '', 'dbfs_domain': '', 'FilePath': '/Healthcare/HeartFailure/HFData/HeartFailureHistory.csv', 'viewFileName': 'HeartFailureHistory.csv', 'is_header': 'Use Header Line', 'baseType': 'hdfs', 'server_url': '/nexusMax/NexusMaxPlatform/uploads/platform/', 'results_url': 'http://dnm.bfirst.ai:44040/api/read/hdfs'}")

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***TRANSFORMING DATAFRAME***

In [None]:
#%run heartfailureclassfHooks.ipynb
try:
	#transformationPreExecutionHook()

	autofe = TransformationMain.run(heartfailurehitory,json.dumps( {"FE": [{"transformationsData": [{"transformation_label": "novalue"}], "feature": "age", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "299", "mean": "60.83", "stddev": "11.89", "min": "40.0", "max": "95.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "age"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "anaemia", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "299", "mean": "0.43", "stddev": "0.5", "min": "0", "max": "1", "missing": "0"}, "updatedLabel": "anaemia"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "creatinine_phosphokinase", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "299", "mean": "581.84", "stddev": "970.29", "min": "23", "max": "7861", "missing": "0"}, "updatedLabel": "creatinine_phosphokinase"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "diabetes", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "299", "mean": "0.42", "stddev": "0.49", "min": "0", "max": "1", "missing": "0"}, "updatedLabel": "diabetes"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "ejection_fraction", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "299", "mean": "38.08", "stddev": "11.83", "min": "14", "max": "80", "missing": "0"}, "updatedLabel": "ejection_fraction"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "high_blood_pressure", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "299", "mean": "0.35", "stddev": "0.48", "min": "0", "max": "1", "missing": "0"}, "updatedLabel": "high_blood_pressure"}, {"transformationsData": [{"feature_label": "platelets", "threshold": 263358.02675585286, "transformation_label": "Binarizer"}], "feature": "platelets", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "299", "mean": "263358.03", "stddev": "97804.24", "min": "25100.0", "max": "850000.0", "missing": "0"}, "transformation": [{"transformation": "Binarizer", "selectedAsDefault": 1}], "updatedLabel": "platelets"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "serum_creatinine", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "299", "mean": "1.39", "stddev": "1.03", "min": "0.5", "max": "9.4", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "serum_creatinine"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "serum_sodium", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "299", "mean": "136.63", "stddev": "4.41", "min": "113", "max": "148", "missing": "0"}, "updatedLabel": "serum_sodium"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "sex", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "299", "mean": "0.65", "stddev": "0.48", "min": "0", "max": "1", "missing": "0"}, "updatedLabel": "sex"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "smoking", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "299", "mean": "0.32", "stddev": "0.47", "min": "0", "max": "1", "missing": "0"}, "updatedLabel": "smoking"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "time", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "299", "mean": "130.26", "stddev": "77.61", "min": "4", "max": "285", "missing": "0"}, "updatedLabel": "time"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "DEATH_EVENT", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "299", "mean": "0.32", "stddev": "0.47", "min": "0", "max": "1", "missing": "0"}, "updatedLabel": "DEATH_EVENT"}]}))

	#transformationPostExecutionHook(autofe)

except Exception as ex: 
	logging.error(ex)


***TRAIN MODEL***

In [None]:
#%run heartfailureclassfHooks.ipynb
try:
	#mlPreExecutionHook()

	dataAutoML=XGboostClassifier(autofe, ["age", "anaemia", "creatinine_phosphokinase", "diabetes", "ejection_fraction", "high_blood_pressure", "serum_creatinine", "serum_sodium", "sex", "smoking", "time", "platelets_binarizer"], "DEATH_EVENT")

	#mlPostExecutionHook(dataAutoML)

except Exception as ex: 
	logging.error(ex)
#spark.stop()
