***GENERATED CODE FOR tamakipred1115 PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import os
import pandas as pd
from hdfs3 import HDFileSystem
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class HDFSConnector:

    def fetch(spark, config):
        ################### INPUT HADOOP HOST PORT TO CONNECT WITH ###############################
        hdfs_server = str(os.environ['HDFS_SERVER'])
        hdfs_port = int(os.environ['HDFS_PORT'])
        hdfs = HDFileSystem(host=hdfs_server, port=hdfs_port)
        #hdfs = HDFileSystem(host=eval(config)['host'], port=eval(config)['port'])
        with hdfs.open(eval(config)['url']) as f:
            dfPd = pd.read_csv(f, error_bad_lines=False)
        df = spark.createDataFrame(dfPd)
        display(df.limit(2).toPandas())
        return df

    def put(df, spark, config):
        return df.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                              delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


***SELECTED MODEL PREDICTION FUNCTION***

In [None]:
import sklearn.metrics
import mlflow.sklearn
import pyspark
import mlflow
import sklearn
import json
from pyspark.sql.functions import col, when
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def cleanValueForFE(self, value):
        if value == None:
            return ""
        elif str(value) == 'nan':
            return "nan"
        else:
            return value

    def replaceByMean(self, feature, df, mean_=-1):
        df1 = df
        df1 = df1.dropna()
        meanValue = self.cleanValueForFE(df1.select(
            mean(col(feature.name)).alias('mean')).collect()[0]["mean"])
        df = df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                      meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        df1 = df
        df1 = df1.dropna()
        maxValue = self.cleanValueForFE(df1.select(
            max(col(feature.name)).alias('max')).collect()[0]["max"])
        df = df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        df1 = df
        df1 = df1.dropna()
        minValue = self.cleanValueForFE(df1.select(
            min(col(feature.name)).alias('min')).collect()[0]["min"])
        df = df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        df1 = df
        df1 = df1.dropna()
        stddevValue = self.cleanValueForFE(df1.select(
            stddev(col(feature.name)).alias('stddev')).collect()[0]["stddev"])
        df = df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        df1 = df
        df1 = df1.dropna()
        fillValue = self.cleanValueForFE(
            df.where(col(feature.name).isNotNull()).head(1)[0][feature.name])
        df = df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


class TransformationMain:
    # TODO: change df argument in run with following
    def run(transformationDF, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]
        transformationDF = CleanseData().replaceNullValues(featureData, transformationDF)
        display(transformationDF.limit(2).toPandas())
        return transformationDF


class ModelPredictionClass:

    def __init__(self, sparkDf, runID, label, mType, model_path, subtype):
        self.sparkDf = sparkDf
        self.runID = runID
        self.y = label
        self.yTest = []
        self.yPredicted = []
        self.mType = mType
        self.model_path = model_path
        self.subtype = subtype

    def getmetricsRegression(self):
        scoreMetrics = {}
        scoreMetrics["R2"] = round(
            sklearn.metrics.r2_score(self.yTest, self.yPredicted), 1)
        scoreMetrics["Mean_Squared_Error"] = round(
            sklearn.metrics.mean_squared_error(self.yTest, self.yPredicted), 1)

        return scoreMetrics

    def getmetricsClassification(self):
        scoreMetrics = {}
        self.yPredicted = [int(x) for x in self.yPredicted]
        self.yTest = [int(x) for x in self.yTest]
        scoreMetrics["Accuracy"] = round(
            (100 * sklearn.metrics.accuracy_score(y_true=self.yTest, y_pred=self.yPredicted)), 1)
        scoreMetrics["F1"] = round(
            (100 * sklearn.metrics.f1_score(y_true=self.yTest, y_pred=self.yPredicted, average="weighted")), 1)
        scoreMetrics["Precision"] = round((
            100 * sklearn.metrics.precision_score(y_true=self.yTest, y_pred=self.yPredicted, average="weighted")),
            1)
        scoreMetrics["Recall"] = round((
            100 * sklearn.metrics.recall_score(y_true=self.yTest, y_pred=self.yPredicted, average="weighted")), 1)
        return scoreMetrics

    def getPrediction(self):
        scoreMetrics = {}
        probabilities = {}
        self.sparkDf.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
        self.sparkDf.toPandas()
        data = self.sparkDf.toPandas()
        complete_model_path = "runs:/" + self.runID + "/" + self.model_path
        self.yTest = list(data[self.y])
        data = data.drop(self.y, axis=1).values
        # model get
        sklearn_model = mlflow.sklearn.load_model(
            model_uri=complete_model_path)
        self.yPredicted = sklearn_model.predict(data).tolist()
        if "classification" == self.mType:
            self.yPredicted = [int(i) for i in self.yPredicted]
            scoreMetrics['result'] = self.getmetricsClassification()

            if self.subtype == "naivebayes" or self.subtype == "multinomialNB" or self.subtype == "GaussianNB" or self.subtype == "ComplementNB" or self.subtype == "BernoulliNB" or self.subtype == "CategoricalNB":
                classes = sklearn_model.classes_.tolist()
                scoreMetrics['classes'] = classes
                for i in range(len(classes)):
                    yproba = sklearn_model.predict_proba(data)[:, i].tolist()
                    my_list_rounded = list(
                        map(lambda x: round(x, ndigits=4), yproba))
                    col_name = "Probability_" + str(classes[i])
                    scoreMetrics[col_name] = my_list_rounded
                    probabilities[col_name] = my_list_rounded
                scoreMetrics['yTest'] = self.yTest
                scoreMetrics['yPredicted'] = self.yPredicted
                return scoreMetrics
            else:
                scoreMetrics['yTest'] = self.yTest
                scoreMetrics['yPredicted'] = self.yPredicted
                return scoreMetrics
        elif "regression" == self.mType:
            scoreMetrics['result'] = self.getmetricsRegression()
            scoreMetrics['yTest'] = self.yTest
            scoreMetrics['yPredicted'] = self.yPredicted
            return scoreMetrics

        elif "clustering" == self.mType:
            self.yPredicted = sklearn_model.predict(data).tolist()
            data['prediction'] = self.yPredicted
            return data


***CONNECTOR FUNCTIONS TO WRITE DATA.***

In [None]:
import datetime
import requests
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class NumtraConnector:

    def put(inStages, inStagesData, stageId, spark, config):
        path = eval(config)['server_url']
        baseType = eval(config)['baseType']
        results_url = eval(config)['results_url']
        server = eval(config)['server']
        originalfile = eval(config)['orignalKey']
        eval(config)['pathOnly']
        filename = eval(config)['filename']
        eval(config)['ser']
        eval(config)['user']
        eval(config)['password']
        eval(config)['authSource']
        eval(config)['user_id']
        eval(config)['parent_id']
        eval(config)['project_id']
        time = str(int(datetime.datetime.now().timestamp()))

        inStagesData[inStages[0]]

        print(path)
        print(baseType)
        print(results_url)
        print(server)
        print(originalfile)
        print(filename)

        args = {
            'url': path,
            'baseType': baseType,
            'originalfile': originalfile,
            'filename': time + filename
        }

        response = requests.post(results_url, args)
        return response


***READING DATAFRAME***

In [None]:
############## CREATE SPARK SESSION ############################ ENTER YOUR SPARK MASTER IP AND PORT TO CONNECT TO SERVER ################
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').getOrCreate()
#%run tamakipred1115Hooks.ipynb
try:
	#sourcePreExecutionHook()

	tamakidrivetest = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/uploadedSourceFiles/TamakiDrive_Test1.csv', 'filename': 'TamakiDrive_Test1.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'dbfs_token': '', 'dbfs_domain': '', 'is_header': 'Use Header Line', 'server_url': '/numtraPlatform/NumtraPlatformV3/uploads/platform/', 'results_url': 'http://ml.numtra.com:44040/api/read/hdfs'}")

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***PREDICTING GIVEN DATA WITH SELECTED MODEL***

In [None]:
#%run tamakipred1115Hooks.ipynb
try:
	#prediction-pipelinePreExecutionHook()

	tamakidrivetest = TransformationMain.run(tamakidrivetest,json.dumps( {"FE": [{"transformationsData": [{}], "feature": "date", "type": "date", "selected": "True", "replaceby": "random", "stats": {"count": "", "mean": "", "stddev": "", "min": "", "max": "", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "date"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "cycling_counter", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "1098.43", "stddev": "326.08", "min": "165", "max": "2262", "missing": "0"}, "updatedLabel": "cycling_counter"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "temp", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "16.79", "stddev": "3.6", "min": "7.078571429", "max": "23.67142857", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "temp"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "rain", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "0.12", "stddev": "0.39", "min": "0.0", "max": "4.15", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "rain"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "sun", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "0.45", "stddev": "0.27", "min": "0.0", "max": "0.971428571", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "sun"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "wind", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "4.9", "stddev": "2.33", "min": "0.964285714", "max": "12.71428571", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "wind"}]}))

MPRe =  ModelPredictionClass(tamakidrivetest,abb6bbc8a6ca43f3a9f41cacb225ba7c, None, timeseriesforecasting, platformmodelsArimatamakiarima1511_20231115-080138timeseriesforecasting, arima):
responseMetrics = MPRe.getPrediction()
#prediction-pipelinePostExecutionHook(responseMetrics)


except Exception as ex: 
	logging.error(ex)
spark.stop()


***WRITING DATAFRAME***

In [None]:
#%run tamakipred1115Hooks.ipynb
try:
	#sinkPreExecutionHook()

	tamaki = NumtraConnector.fetch(spark, "{'samplefile': None, 'samplecount': None, 'originalcount': None, 'orignalKey': None, 'pathOnly': '/Finance/Tamaki', 'project_id': '6512e4bdb91dbaa301dd0efe', 'parent_id': '6512e4bdb91dbaa301dd0efe', 'original_schema': [], 'actual_schema': [], 'server': 'https://ml.numtra.com:443', 'server_url': '/numtraPlatform/NumtraPlatformV3/uploads/platform/', 'delimiter': ',', 'file_type': 'Delimeted', 'filename': 'TamakiArimaPred151123.csv', 'token': '', 'domain': '', 'is_header': 'Use Header Line', 'url': '/FileStore/platform/uploadedSourceFiles/part-00000-af4b3a78-ae25-4e66-993e-5c1e020fa94d-c000.csv', 'results_url': 'http://ml.numtra.com:44040/api/read/hdfs'}")

except Exception as ex: 
	logging.error(ex)
