***GENERATED CODE FOR survicalno2 PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import os
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class HDFSConnector:

    def fetch(spark, config):
        ################### INPUT HADOOP HOST PORT TO CONNECT WITH ###############################
        hdfs_server = str(os.environ['HDFS_SERVER'])
        hdfs_port = int(os.environ['HDFS_PORT'])
        df = spark.read.options(header='true', inferschema='true').csv(
            f"hdfs://{hdfs_server}:{hdfs_port}{eval(config)['url']}", header='true')
        display(df.limit(2).toPandas())
        return df

    def put(df, spark, config):
        return df.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                              delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


***TRANSFORMATIONS FUNCTIONS THAT WILL BE APPLIED ON DATA***

In [None]:
from pyspark.sql.functions import col, udf, round
from pyspark.ml.feature import StandardScaler
from pyspark.sql.types import *
import json
from pyspark.ml.feature import Normalizer
from pyspark.sql.functions import col, round
from pyspark.sql.functions import col, udf
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, when
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def cleanValueForFE(self, value):
        if value == None:
            return ""
        elif str(value) == 'nan':
            return "nan"
        else:
            return value

    def replaceByMean(self, feature, df, mean_=-1):
        df1 = df
        df1 = df1.dropna()
        meanValue = self.cleanValueForFE(df1.select(
            mean(col(feature.name)).alias('mean')).collect()[0]["mean"])
        df = df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                      meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        df1 = df
        df1 = df1.dropna()
        maxValue = self.cleanValueForFE(df1.select(
            max(col(feature.name)).alias('max')).collect()[0]["max"])
        df = df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        df1 = df
        df1 = df1.dropna()
        minValue = self.cleanValueForFE(df1.select(
            min(col(feature.name)).alias('min')).collect()[0]["min"])
        df = df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        df1 = df
        df1 = df1.dropna()
        stddevValue = self.cleanValueForFE(df1.select(
            stddev(col(feature.name)).alias('stddev')).collect()[0]["stddev"])
        df = df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        df1 = df
        df1 = df1.dropna()
        fillValue = self.cleanValueForFE(
            df.where(col(feature.name).isNotNull()).head(1)[0][feature.name])
        df = df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


def vectorAssemblerTransform(df, param):

    dfReturn = df

    if (type(param) == str):
        outcol = param + "_vector"
        assembler = VectorAssembler(inputCols=[param], outputCol=outcol)
        dfReturn = assembler.transform(dfReturn)
        return dfReturn

    if (type(param) == list):
        vecAssembler = VectorAssembler(inputCols=param, outputCol="features")
        new_df = vecAssembler.transform(df)
        return new_df


def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    return udf(to_array_, ArrayType(DoubleType()))(col)


def standardScalarTransform(df, params, transformationData={}):
    dfReturn = df
    transform_params = params
    feature = transform_params['feature']
    dfReturn = dfReturn.fillna({feature: '0.0'})
    scalarFlags = transformationData["std_scalar"]
    if scalarFlags["mean_flag"]:
        stdflag = False
        meanflag = True
    elif scalarFlags["std_flag"]:
        stdflag = True
        meanflag = False
    outcol = feature + "_standardscalar"
    featureVector = feature + "_vector"
    dfReturn = vectorAssemblerTransform(dfReturn, feature)

    standardscale = StandardScaler(inputCol=featureVector, outputCol=outcol, withStd=stdflag,
                                   withMean=meanflag)
    scaledata = standardscale.fit(dfReturn).transform(dfReturn)
    dfReturn = scaledata.withColumn("final_col", to_array(scaledata[outcol]))\
        .select(scaledata.schema.names + [col("final_col")[0]])

    dfReturn = dfReturn.drop(outcol).drop(featureVector)\
        .withColumnRenamed("final_col[0]", outcol)
    dfReturn = dfReturn.withColumn(feature, round(dfReturn[outcol], 2))
    return dfReturn


def normalizerTransform(df, params, transformationData={}):
    dfReturn = df
    transform_params = params
    feature = transform_params['feature']

    dfReturn = dfReturn.fillna({feature: '0.0'})

    dfReturn = dfReturn.withColumn("feature_cast", dfReturn[feature].cast("double")).drop(feature) \
        .withColumnRenamed("feature_cast", feature)

    outcol = feature + "_normalizer"
    p = transformationData["pNorm"]

    featureVector = feature + "_vector"
    dfReturn = vectorAssemblerTransform(dfReturn, feature)

    normalizer = Normalizer(inputCol=featureVector,
                            outputCol=outcol, p=float(p))
    normalized = normalizer.transform(dfReturn)

    dfReturn = normalized.withColumn("final_col", to_array(normalized[outcol])) \
        .select(normalized.schema.names + [col("final_col")[0]])

    dfReturn = dfReturn.drop(outcol).drop(featureVector)\
        .withColumnRenamed("final_col[0]", outcol)
    dfReturn = dfReturn.withColumn(feature, round(dfReturn[outcol], 2))
    return dfReturn


class TransformationMain:
    # TODO: change df argument in run with following
    def run(transformationDF, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]
        transformationDF = CleanseData().replaceNullValues(featureData, transformationDF)
        transformationDF = standardScalarTransform(transformationDF, {'transformationsData': [{'feature_label': 'AGE_DX', 'std_scalar': {'mean_flag': 'False', 'std_flag': 'True'}, 'transformation_label': 'Standard Scalar'}], 'feature': 'AGE_DX', 'transformation': [{'transformation': 'Standard Scalar', 'selectedAsDefault': 1}], 'type': 'numeric', 'replaceby': 'mean', 'selected': 'True', 'stats': {
                                                   'count': '222', 'mean': '37.11', 'stddev': '9.51', 'min': '17', 'max': '71', 'missing': '0'}, 'updatedLabel': 'AGE_DX'}, {'feature_label': 'AGE_DX', 'std_scalar': {'mean_flag': 'False', 'std_flag': 'True'}, 'transformation_label': 'Standard Scalar'})
        transformationDF = transformationDF.drop('AGE_DX')
        transformationDF = standardScalarTransform(transformationDF, {'transformationsData': [{'feature_label': 'EOD10_SZ', 'std_scalar': {'mean_flag': 'False', 'std_flag': 'True'}, 'transformation_label': 'Standard Scalar'}], 'feature': 'EOD10_SZ', 'transformation': [{'transformation': 'Standard Scalar', 'selectedAsDefault': 1}], 'type': 'numeric', 'replaceby': 'mean', 'selected': 'True', 'stats': {
            'count': '222', 'mean': '49.08', 'stddev': '27.32', 'min': '1', 'max': '160', 'missing': '0'}, 'updatedLabel': 'EOD10_SZ'}, {'feature_label': 'EOD10_SZ', 'std_scalar': {'mean_flag': 'False', 'std_flag': 'True'}, 'transformation_label': 'Standard Scalar'})
        transformationDF = transformationDF.drop('EOD10_SZ')
        transformationDF = standardScalarTransform(transformationDF, {'transformationsData': [{'feature_label': 'EOD10_EX', 'std_scalar': {'mean_flag': 'False', 'std_flag': 'True'}, 'transformation_label': 'Standard Scalar'}], 'feature': 'EOD10_EX', 'transformation': [{'transformation': 'Standard Scalar', 'selectedAsDefault': 1}], 'type': 'numeric', 'replaceby': 'mean', 'selected': 'True', 'stats': {
            'count': '222', 'mean': '18.79', 'stddev': '19.25', 'min': '10', 'max': '99', 'missing': '0'}, 'updatedLabel': 'EOD10_EX'}, {'feature_label': 'EOD10_EX', 'std_scalar': {'mean_flag': 'False', 'std_flag': 'True'}, 'transformation_label': 'Standard Scalar'})
        transformationDF = transformationDF.drop('EOD10_EX')
        transformationDF = standardScalarTransform(transformationDF, {'transformationsData': [{'feature_label': 'SURGPRIM', 'std_scalar': {'mean_flag': 'False', 'std_flag': 'True'}, 'transformation_label': 'Standard Scalar'}], 'feature': 'SURGPRIM', 'transformation': [{'transformation': 'Standard Scalar', 'selectedAsDefault': 1}], 'type': 'numeric', 'replaceby': 'mean', 'selected': 'True', 'stats': {
            'count': '222', 'mean': '42.39', 'stddev': '10.3', 'min': '30', 'max': '80', 'missing': '0'}, 'updatedLabel': 'SURGPRIM'}, {'feature_label': 'SURGPRIM', 'std_scalar': {'mean_flag': 'False', 'std_flag': 'True'}, 'transformation_label': 'Standard Scalar'})
        transformationDF = transformationDF.drop('SURGPRIM')
        transformationDF = normalizerTransform(transformationDF, {'transformationsData': [{'feature_label': 'SURV_TM', 'pNorm': '2.0', 'transformation_label': 'Normalizer'}], 'feature': 'SURV_TM', 'type': 'real', 'selected': 'True', 'replaceby': 'mean', 'stats': {
            'count': '222', 'mean': '6.76', 'stddev': '2.0', 'min': '0.0', 'max': '10.0', 'missing': '0'}, 'transformation': [{'transformation': 'Normalizer', 'selectedAsDefault': 1}], 'updatedLabel': 'SURV_TM'}, {'feature_label': 'SURV_TM', 'pNorm': '2.0', 'transformation_label': 'Normalizer'})
        transformationDF = transformationDF.drop('SURV_TM')
        display(transformationDF.limit(2).toPandas())
        return transformationDF


***AUTOML FUNCTIONS***

In [None]:
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
import pyspark


def linear_regression(sparkDF, features, labels):
    sparkDF.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
    df = (sparkDF.toPandas())
    lr_model = LinearRegression()
    X_train, X_test, y_train, y_test = train_test_split(
        df[features], df[labels])
    lr_model.fit(X_train, y_train)
    display(" Accuracy of Model : %s" % lr_model.score(X_test, y_test))

    data = {'model': lr_model,
            'X_test': X_test,
            'y_test': y_test,
            'label': labels,
            'columnNames': df.columns}
    return data


***READING DATAFRAME***

In [None]:
############## CREATE SPARK SESSION ############################ ENTER YOUR SPARK MASTER IP AND PORT TO CONNECT TO SERVER ################
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').getOrCreate()
#%run survicalno2Hooks.ipynb
try:
	#sourcePreExecutionHook()

	tumormarkerdatano = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/uploadedSourceFiles/TumorMarkerdataNo2.csv', 'filename': 'TumorMarkerdataNo2.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'dbfs_token': '', 'dbfs_domain': '', 'is_header': 'Use Header Line', 'server_url': '/nexusMax/NexusMaxPlatform/uploads/platform/', 'results_url': 'http://dnm.bfirst.ai:44040/api/read/hdfs'}")

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***TRANSFORMING DATAFRAME***

In [None]:
#%run survicalno2Hooks.ipynb
try:
	#transformationPreExecutionHook()

	autofe = TransformationMain.run(tumormarkerdatano,json.dumps( {"FE": [{"transformationsData": [{"feature_label": "MAR_STAT", "std_scalar": {"mean_flag": "False", "std_flag": "True"}, "transformation_label": "novalue"}], "feature": "MAR_STAT", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "222", "mean": "0.35", "stddev": "0.48", "min": "0", "max": "1", "missing": "0"}, "updatedLabel": "MAR_STAT"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "RACE", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "222", "mean": "0.9", "stddev": "0.3", "min": "0", "max": "1", "missing": "0"}, "updatedLabel": "RACE"}, {"transformationsData": [{"feature_label": "AGE_DX", "std_scalar": {"mean_flag": "False", "std_flag": "True"}, "transformation_label": "Standard Scalar"}], "feature": "AGE_DX", "transformation": [{"transformation": "Standard Scalar", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "222", "mean": "37.11", "stddev": "9.51", "min": "17", "max": "71", "missing": "0"}, "updatedLabel": "AGE_DX"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "HISTO3V", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "222", "mean": "0.98", "stddev": "0.15", "min": "0", "max": "1", "missing": "0"}, "updatedLabel": "HISTO3V"}, {"transformationsData": [{"feature_label": "EOD10_SZ", "std_scalar": {"mean_flag": "False", "std_flag": "True"}, "transformation_label": "Standard Scalar"}], "feature": "EOD10_SZ", "transformation": [{"transformation": "Standard Scalar", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "222", "mean": "49.08", "stddev": "27.32", "min": "1", "max": "160", "missing": "0"}, "updatedLabel": "EOD10_SZ"}, {"transformationsData": [{"feature_label": "EOD10_EX", "std_scalar": {"mean_flag": "False", "std_flag": "True"}, "transformation_label": "Standard Scalar"}], "feature": "EOD10_EX", "transformation": [{"transformation": "Standard Scalar", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "222", "mean": "18.79", "stddev": "19.25", "min": "10", "max": "99", "missing": "0"}, "updatedLabel": "EOD10_EX"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "TUMOR_1V", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "222", "mean": "1.23", "stddev": "1.03", "min": "0", "max": "4", "missing": "0"}, "updatedLabel": "TUMOR_1V"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "TUMOR_2V", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "222", "mean": "1.97", "stddev": "1.65", "min": "0", "max": "6", "missing": "0"}, "updatedLabel": "TUMOR_2V"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "TUMOR_3V", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "222", "mean": "2.09", "stddev": "2.05", "min": "0", "max": "6", "missing": "0"}, "updatedLabel": "TUMOR_3V"}, {"transformationsData": [{"feature_label": "SURGPRIM", "std_scalar": {"mean_flag": "False", "std_flag": "True"}, "transformation_label": "Standard Scalar"}], "feature": "SURGPRIM", "transformation": [{"transformation": "Standard Scalar", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "222", "mean": "42.39", "stddev": "10.3", "min": "30", "max": "80", "missing": "0"}, "updatedLabel": "SURGPRIM"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "RADIATN", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "222", "mean": "1.47", "stddev": "1.55", "min": "1", "max": "7", "missing": "0"}, "updatedLabel": "RADIATN"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "RAD_SURG", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "222", "mean": "2.82", "stddev": "0.74", "min": "0", "max": "4", "missing": "0"}, "updatedLabel": "RAD_SURG"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "o_dth_cl", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "222", "mean": "0.09", "stddev": "0.29", "min": "0", "max": "1", "missing": "0"}, "updatedLabel": "o_dth_cl"}, {"transformationsData": [{"feature_label": "SURV_TM", "pNorm": "2.0", "transformation_label": "Normalizer"}], "feature": "SURV_TM", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "222", "mean": "6.76", "stddev": "2.0", "min": "0.0", "max": "10.0", "missing": "0"}, "transformation": [{"transformation": "Normalizer", "selectedAsDefault": 1}], "updatedLabel": "SURV_TM"}]}))

	#transformationPostExecutionHook(autofe)

except Exception as ex: 
	logging.error(ex)


***TRAIN MODEL***

In [None]:
#%run survicalno2Hooks.ipynb
try:
	#mlPreExecutionHook()

	dataAutoML=linear_regression(autofe, ["MAR_STAT", "RACE", "HISTO3V", "TUMOR_1V", "TUMOR_2V", "TUMOR_3V", "RADIATN", "RAD_SURG", "o_dth_cl", "AGE_DX_standardscalar", "EOD10_SZ_standardscalar", "EOD10_EX_standardscalar", "SURGPRIM_standardscalar"], "SURV_TM_normalizer")

	#mlPostExecutionHook(dataAutoML)

except Exception as ex: 
	logging.error(ex)
#spark.stop()
