***GENERATED CODE FOR scmsdeliveryhistorydatasetclass PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import os
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class HDFSConnector:

    def fetch(spark, config):
        ################### INPUT HADOOP HOST PORT TO CONNECT WITH ###############################
        hdfs_server = str(os.environ['HDFS_SERVER'])
        hdfs_port = int(os.environ['HDFS_PORT'])
        df = spark.read.options(header='true', inferschema='true').csv(
            f"hdfs://{hdfs_server}:{hdfs_port}{eval(config)['url']}", header='true')
        display(df.limit(2).toPandas())
        return df

    def put(df, spark, config):
        return df.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                              delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


***TRANSFORMATIONS FUNCTIONS THAT WILL BE APPLIED ON DATA***

In [None]:
import json
from pyspark.ml.feature import Binarizer
from pyspark.sql.functions import round
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def cleanValueForFE(self, value):
        if value == None:
            return ""
        elif str(value) == 'nan':
            return "nan"
        else:
            return value

    def replaceByMean(self, feature, df, mean_=-1):
        df1 = df
        df1 = df1.dropna()
        meanValue = self.cleanValueForFE(df1.select(
            mean(col(feature.name)).alias('mean')).collect()[0]["mean"])
        df = df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                      meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        df1 = df
        df1 = df1.dropna()
        maxValue = self.cleanValueForFE(df1.select(
            max(col(feature.name)).alias('max')).collect()[0]["max"])
        df = df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        df1 = df
        df1 = df1.dropna()
        minValue = self.cleanValueForFE(df1.select(
            min(col(feature.name)).alias('min')).collect()[0]["min"])
        df = df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        df1 = df
        df1 = df1.dropna()
        stddevValue = self.cleanValueForFE(df1.select(
            stddev(col(feature.name)).alias('stddev')).collect()[0]["stddev"])
        df = df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        df1 = df
        df1 = df1.dropna()
        fillValue = self.cleanValueForFE(
            df.where(col(feature.name).isNotNull()).head(1)[0][feature.name])
        df = df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


def StringIndexerTransform(df, params, transformationData={}):
    dfReturn = df
    feature = params["feature"]

    dfReturn = dfReturn.fillna({feature: ''})
    outcol = feature + "_stringindexer"
    indexer = StringIndexer(
        inputCol=feature, outputCol=outcol, handleInvalid="skip")
    indexed = indexer.fit(dfReturn).transform(dfReturn)
    dfReturn = indexed
    distinct_values_list = dfReturn.select(
        outcol).distinct().rdd.map(lambda r: r[0]).collect()
    len_distinct_values_list = len(distinct_values_list)
    if len_distinct_values_list <= 4:
        changed_type_df = dfReturn.withColumn(
            outcol, dfReturn[outcol].cast(IntegerType()))
        return changed_type_df
    return dfReturn


def BinarizerTransform(df, params, transformationData={}):
    dfReturn = df
    transform_params = params
    feature = transform_params['feature']
    outcol = feature + "_binarizer"
    dfReturn = dfReturn.withColumn("feature_cast", dfReturn[feature].cast("double")).drop(feature)\
        .withColumnRenamed("feature_cast", feature)

    dfReturn = dfReturn.fillna({feature: 0.0})
    binarizer = Binarizer(threshold=float(
        transformationData['threshold']), inputCol=feature, outputCol=outcol)
    binarizedDataFrame = binarizer.transform(dfReturn)

    # binarizedDataFrame=binarizedDataFrame.drop(feature).withColumnRenamed(outcol,feature)

    dfReturn = binarizedDataFrame
    dfReturn = dfReturn.withColumn(feature, round(dfReturn[feature], 2))

    return dfReturn


class TransformationMain:
    # TODO: change df argument in run with following
    def run(transformationDF, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]
        transformationDF = CleanseData().replaceNullValues(featureData, transformationDF)
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Project Code', 'transformation_label': 'String Indexer'}], 'feature': 'Project Code', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
                                                  'count': '500', 'mean': '', 'stddev': '', 'min': '100-CI-T01', 'max': '144-BW-T01', 'missing': '0', 'distinct': '30'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Project Code'}, {'feature_label': 'Project Code', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Project Code')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'PQ #', 'transformation_label': 'String Indexer'}], 'feature': 'PQ #', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'Pre-PQ Process', 'max': 'Pre-PQ Process', 'missing': '0', 'distinct': '1'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'PQ #'}, {'feature_label': 'PQ #', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('PQ #')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'PO / SO #', 'transformation_label': 'String Indexer'}], 'feature': 'PO / SO #', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'SCMS-1', 'max': 'SCMS-87', 'missing': '0', 'distinct': '358'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'PO / SO #'}, {'feature_label': 'PO / SO #', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('PO / SO #')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'ASN/DN #', 'transformation_label': 'String Indexer'}], 'feature': 'ASN/DN #', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'ASN-1028', 'max': 'ASN-990', 'missing': '0', 'distinct': '391'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'ASN/DN #'}, {'feature_label': 'ASN/DN #', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('ASN/DN #')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Country', 'transformation_label': 'String Indexer'}], 'feature': 'Country', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'Botswana', 'max': 'Zimbabwe', 'missing': '0', 'distinct': '19'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Country'}, {'feature_label': 'Country', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Country')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Managed By', 'transformation_label': 'String Indexer'}], 'feature': 'Managed By', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'PMO - US', 'max': 'PMO - US', 'missing': '0', 'distinct': '1'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Managed By'}, {'feature_label': 'Managed By', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Managed By')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Fulfill Via', 'transformation_label': 'String Indexer'}], 'feature': 'Fulfill Via', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'Direct Drop', 'max': 'Direct Drop', 'missing': '0', 'distinct': '1'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Fulfill Via'}, {'feature_label': 'Fulfill Via', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Fulfill Via')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Vendor INCO Term', 'transformation_label': 'String Indexer'}], 'feature': 'Vendor INCO Term', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'CIP', 'max': 'FCA', 'missing': '0', 'distinct': '5'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Vendor INCO Term'}, {'feature_label': 'Vendor INCO Term', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Vendor INCO Term')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Shipment Mode', 'transformation_label': 'String Indexer'}], 'feature': 'Shipment Mode', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'Air', 'max': 'Truck', 'missing': '0', 'distinct': '3'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Shipment Mode'}, {'feature_label': 'Shipment Mode', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Shipment Mode')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'PQ First Sent to Client Date', 'transformation_label': 'String Indexer'}], 'feature': 'PQ First Sent to Client Date', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'Pre-PQ Process', 'max': 'Pre-PQ Process', 'missing': '0', 'distinct': '1'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'PQ First Sent to Client D...'}, {'feature_label': 'PQ First Sent to Client Date', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop(
            'PQ First Sent to Client Date')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'PO Sent to Vendor Date', 'transformation_label': 'String Indexer'}], 'feature': 'PO Sent to Vendor Date', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': '1/10/2007', 'max': 'Date Not Captured', 'missing': '0', 'distinct': '150'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'PO Sent to Vendor Date'}, {'feature_label': 'PO Sent to Vendor Date', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('PO Sent to Vendor Date')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Scheduled Delivery Date', 'transformation_label': 'String Indexer'}], 'feature': 'Scheduled Delivery Date', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': '1-Apr-09', 'max': '9-Sep-08', 'missing': '0', 'distinct': '263'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Scheduled Delivery Date'}, {'feature_label': 'Scheduled Delivery Date', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Scheduled Delivery Date')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Delivered to Client Date', 'transformation_label': 'String Indexer'}], 'feature': 'Delivered to Client Date', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': '1-Apr-08', 'max': '9-Sep-08', 'missing': '0', 'distinct': '283'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Delivered to Client Date'}, {'feature_label': 'Delivered to Client Date', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop(
            'Delivered to Client Date')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Delivery Recorded Date', 'transformation_label': 'String Indexer'}], 'feature': 'Delivery Recorded Date', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': '1-Apr-08', 'max': '9-Sep-08', 'missing': '0', 'distinct': '283'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Delivery Recorded Date'}, {'feature_label': 'Delivery Recorded Date', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Delivery Recorded Date')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Product Group', 'transformation_label': 'String Indexer'}], 'feature': 'Product Group', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'ACT', 'max': 'MRDT', 'missing': '0', 'distinct': '5'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Product Group'}, {'feature_label': 'Product Group', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Product Group')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Sub Classification', 'transformation_label': 'String Indexer'}], 'feature': 'Sub Classification', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'ACT', 'max': 'Pediatric', 'missing': '0', 'distinct': '6'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Sub Classification'}, {'feature_label': 'Sub Classification', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Sub Classification')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Vendor', 'transformation_label': 'String Indexer'}], 'feature': 'Vendor', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'ABBOTT LABORATORIES (PUERTO RICO)', 'max': 'ZEPHYR BIOMEDICALS', 'missing': '0', 'distinct': '39'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Vendor'}, {'feature_label': 'Vendor', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Vendor')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Item Description', 'transformation_label': 'String Indexer'}], 'feature': 'Item Description', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': '#102198**Didanosine 200mg [Videx], tablets, 60 Tabs', 'max': 'Zidovudine 300mg, tablets, 60 Tabs', 'missing': '0', 'distinct': '86'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Item Description'}, {'feature_label': 'Item Description', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Item Description')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Molecule/Test Type', 'transformation_label': 'String Indexer'}], 'feature': 'Molecule/Test Type', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'Abacavir', 'max': 'Zidovudine', 'missing': '0', 'distinct': '45'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Molecule/Test Type'}, {'feature_label': 'Molecule/Test Type', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Molecule/Test Type')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Brand', 'transformation_label': 'String Indexer'}], 'feature': 'Brand', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'Aluvia', 'max': 'Ziagen', 'missing': '0', 'distinct': '37'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Brand'}, {'feature_label': 'Brand', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Brand')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Dosage', 'transformation_label': 'String Indexer'}], 'feature': 'Dosage', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': '100/25mg', 'max': 'N/A', 'missing': '0', 'distinct': '33'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Dosage'}, {'feature_label': 'Dosage', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Dosage')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Dosage Form', 'transformation_label': 'String Indexer'}], 'feature': 'Dosage Form', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'Capsule', 'max': 'Test kit - Ancillary', 'missing': '0', 'distinct': '16'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Dosage Form'}, {'feature_label': 'Dosage Form', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Dosage Form')
        transformationDF = BinarizerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Line Item Value', 'threshold': 78091.06, 'transformation_label': 'Binarizer'}], 'feature': 'Line Item Value', 'type': 'real', 'selected': 'True', 'replaceby': 'mean', 'stats': {
            'count': '500', 'mean': '78091.21', 'stddev': '185718.9', 'min': '0.7', 'max': '2520000.0', 'missing': '0'}, 'transformation': [{'transformation': 'Binarizer', 'selectedAsDefault': 1}], 'updatedLabel': 'Line Item Value'}, {'feature_label': 'Line Item Value', 'threshold': 78091.06, 'transformation_label': 'Binarizer'})
        transformationDF = transformationDF.drop('Line Item Value')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Manufacturing Site', 'transformation_label': 'String Indexer'}], 'feature': 'Manufacturing Site', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'ABBSP', 'max': 'bioLytical Laboratories', 'missing': '0', 'distinct': '47'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Manufacturing Site'}, {'feature_label': 'Manufacturing Site', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Manufacturing Site')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'First Line Designation', 'transformation_label': 'String Indexer'}], 'feature': 'First Line Designation', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '', 'stddev': '', 'min': 'No', 'max': 'Yes', 'missing': '0', 'distinct': '2'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'First Line Designation'}, {'feature_label': 'First Line Designation', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('First Line Designation')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Weight (Kilograms)', 'transformation_label': 'String Indexer'}], 'feature': 'Weight (Kilograms)', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '4254.61', 'stddev': '50742.03', 'min': '1', 'max': 'Weight Captured Separately', 'missing': '0', 'distinct': '261'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Weight (Kilograms)'}, {'feature_label': 'Weight (Kilograms)', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Weight (Kilograms)')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Freight Cost (USD)', 'transformation_label': 'String Indexer'}], 'feature': 'Freight Cost (USD)', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '500', 'mean': '9915.87', 'stddev': '12157.84', 'min': '0.75', 'max': 'See ASN-93 (ID#:1281)', 'missing': '0', 'distinct': '294'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Freight Cost (USD)'}, {'feature_label': 'Freight Cost (USD)', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Freight Cost (USD)')
        display(transformationDF.limit(2).toPandas())
        return transformationDF


***AUTOML FUNCTIONS***

In [None]:
from tpot import TPOTClassifier
from sklearn.model_selection import train_test_split
import pyspark


def functionClassification(sparkDF, listOfFeatures, label):
    sparkDF.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
    df = (sparkDF.toPandas())
    X = (df.drop(label, axis=1))[listOfFeatures].values
    y = df[label].values
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, random_state=1, test_size=0.1)
    tpotModel = TPOTClassifier(verbosity=3, n_jobs=-1, generations=10, max_time_mins=5,
                               population_size=15, use_dask=True)
    tpotModel.fit(X_train, y_train)
    display(" Accuracy of Model : %s" % tpotModel.score(X_test, y_test))
    data = {'model': tpotModel,
            'X_test': X_test,
            'y_test': y_test,
            'label': label,
            'columnNames': listOfFeatures}
    return data


***READING DATAFRAME***

In [None]:
############## CREATE SPARK SESSION ############################ ENTER YOUR SPARK MASTER IP AND PORT TO CONNECT TO SERVER ################
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').getOrCreate()
#%run scmsdeliveryhistorydatasetclassHooks.ipynb
try:
	#sourcePreExecutionHook()

	scmsdeliveryhistorydataset = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/uploadedSourceFiles/SCMS_Delivery_History_Dataset.csv', 'filename': 'SCMS_Delivery_History_Dataset.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'is_header': 'Use Header Line', 'domain': 'http://172.31.59.158', 'port': '40070', 'dirPath': '/FileStore/platform', 'server_url': '/nexusMax/NexusMaxPlatform/uploads/platform/'}")
	#sourcePostExecutionHook(scmsdeliveryhistorydataset)

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***TRANSFORMING DATAFRAME***

In [None]:
#%run scmsdeliveryhistorydatasetclassHooks.ipynb
try:
	#transformationPreExecutionHook()

	scmsdeliveryhistorydatasetclassautofe = TransformationMain.run(scmsdeliveryhistorydataset,json.dumps( {"FE": [{"transformationsData": [{"transformation_label": "novalue"}], "feature": "ID", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "2415.97", "stddev": "1423.89", "min": "1", "max": "5047", "missing": "0"}, "updatedLabel": "ID"}, {"transformationsData": [{"feature_label": "Project Code", "transformation_label": "String Indexer"}], "feature": "Project Code", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "100-CI-T01", "max": "144-BW-T01", "missing": "0", "distinct": "30"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Project Code"}, {"transformationsData": [{"feature_label": "PQ #", "transformation_label": "String Indexer"}], "feature": "PQ #", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Pre-PQ Process", "max": "Pre-PQ Process", "missing": "0", "distinct": "1"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "PQ #"}, {"transformationsData": [{"feature_label": "PO / SO #", "transformation_label": "String Indexer"}], "feature": "PO / SO #", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "SCMS-1", "max": "SCMS-87", "missing": "0", "distinct": "358"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "PO / SO #"}, {"transformationsData": [{"feature_label": "ASN/DN #", "transformation_label": "String Indexer"}], "feature": "ASN/DN #", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "ASN-1028", "max": "ASN-990", "missing": "0", "distinct": "391"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "ASN/DN #"}, {"transformationsData": [{"feature_label": "Country", "transformation_label": "String Indexer"}], "feature": "Country", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Botswana", "max": "Zimbabwe", "missing": "0", "distinct": "19"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Country"}, {"transformationsData": [{"feature_label": "Managed By", "transformation_label": "String Indexer"}], "feature": "Managed By", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "PMO - US", "max": "PMO - US", "missing": "0", "distinct": "1"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Managed By"}, {"transformationsData": [{"feature_label": "Fulfill Via", "transformation_label": "String Indexer"}], "feature": "Fulfill Via", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Direct Drop", "max": "Direct Drop", "missing": "0", "distinct": "1"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Fulfill Via"}, {"transformationsData": [{"feature_label": "Vendor INCO Term", "transformation_label": "String Indexer"}], "feature": "Vendor INCO Term", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "CIP", "max": "FCA", "missing": "0", "distinct": "5"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Vendor INCO Term"}, {"transformationsData": [{"feature_label": "Shipment Mode", "transformation_label": "String Indexer"}], "feature": "Shipment Mode", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Air", "max": "Truck", "missing": "0", "distinct": "3"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Shipment Mode"}, {"transformationsData": [{"feature_label": "PQ First Sent to Client Date", "transformation_label": "String Indexer"}], "feature": "PQ First Sent to Client Date", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Pre-PQ Process", "max": "Pre-PQ Process", "missing": "0", "distinct": "1"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "PQ First Sent to Client D..."}, {"transformationsData": [{"feature_label": "PO Sent to Vendor Date", "transformation_label": "String Indexer"}], "feature": "PO Sent to Vendor Date", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "1/10/2007", "max": "Date Not Captured", "missing": "0", "distinct": "150"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "PO Sent to Vendor Date"}, {"transformationsData": [{"feature_label": "Scheduled Delivery Date", "transformation_label": "String Indexer"}], "feature": "Scheduled Delivery Date", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "1-Apr-09", "max": "9-Sep-08", "missing": "0", "distinct": "263"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Scheduled Delivery Date"}, {"transformationsData": [{"feature_label": "Delivered to Client Date", "transformation_label": "String Indexer"}], "feature": "Delivered to Client Date", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "1-Apr-08", "max": "9-Sep-08", "missing": "0", "distinct": "283"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Delivered to Client Date"}, {"transformationsData": [{"feature_label": "Delivery Recorded Date", "transformation_label": "String Indexer"}], "feature": "Delivery Recorded Date", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "1-Apr-08", "max": "9-Sep-08", "missing": "0", "distinct": "283"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Delivery Recorded Date"}, {"transformationsData": [{"feature_label": "Product Group", "transformation_label": "String Indexer"}], "feature": "Product Group", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "ACT", "max": "MRDT", "missing": "0", "distinct": "5"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Product Group"}, {"transformationsData": [{"feature_label": "Sub Classification", "transformation_label": "String Indexer"}], "feature": "Sub Classification", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "ACT", "max": "Pediatric", "missing": "0", "distinct": "6"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Sub Classification"}, {"transformationsData": [{"feature_label": "Vendor", "transformation_label": "String Indexer"}], "feature": "Vendor", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "ABBOTT LABORATORIES (PUERTO RICO)", "max": "ZEPHYR BIOMEDICALS", "missing": "0", "distinct": "39"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Vendor"}, {"transformationsData": [{"feature_label": "Item Description", "transformation_label": "String Indexer"}], "feature": "Item Description", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "#102198**Didanosine 200mg [Videx], tablets, 60 Tabs", "max": "Zidovudine 300mg, tablets, 60 Tabs", "missing": "0", "distinct": "86"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Item Description"}, {"transformationsData": [{"feature_label": "Molecule/Test Type", "transformation_label": "String Indexer"}], "feature": "Molecule/Test Type", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Abacavir", "max": "Zidovudine", "missing": "0", "distinct": "45"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Molecule/Test Type"}, {"transformationsData": [{"feature_label": "Brand", "transformation_label": "String Indexer"}], "feature": "Brand", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Aluvia", "max": "Ziagen", "missing": "0", "distinct": "37"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Brand"}, {"transformationsData": [{"feature_label": "Dosage", "transformation_label": "String Indexer"}], "feature": "Dosage", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "100/25mg", "max": "N/A", "missing": "0", "distinct": "33"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Dosage"}, {"transformationsData": [{"feature_label": "Dosage Form", "transformation_label": "String Indexer"}], "feature": "Dosage Form", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "Capsule", "max": "Test kit - Ancillary", "missing": "0", "distinct": "16"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Dosage Form"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Unit of Measure (Per Pack)", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "80.76", "stddev": "69.25", "min": "1", "max": "540", "missing": "0"}, "updatedLabel": "Unit of Measure (Per Pack..."}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Line Item Quantity", "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "type": "numeric", "replaceby": "mean", "selected": "True", "stats": {"count": "500", "mean": "4700.25", "stddev": "10912.58", "min": "1", "max": "95500", "missing": "0"}, "updatedLabel": "Line Item Quantity"}, {"transformationsData": [{"feature_label": "Line Item Value", "threshold": 78091.06, "transformation_label": "Binarizer"}], "feature": "Line Item Value", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "78091.21", "stddev": "185718.9", "min": "0.7", "max": "2520000.0", "missing": "0"}, "transformation": [{"transformation": "Binarizer", "selectedAsDefault": 1}], "updatedLabel": "Line Item Value"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Pack Price", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "38.79", "stddev": "62.41", "min": "0.01", "max": "400.0", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Pack Price"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Unit Price", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "500", "mean": "1.12", "stddev": "3.4", "min": "0.0", "max": "37.5", "missing": "0"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Unit Price"}, {"transformationsData": [{"feature_label": "Manufacturing Site", "transformation_label": "String Indexer"}], "feature": "Manufacturing Site", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "ABBSP", "max": "bioLytical Laboratories", "missing": "0", "distinct": "47"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Manufacturing Site"}, {"transformationsData": [{"feature_label": "First Line Designation", "transformation_label": "String Indexer"}], "feature": "First Line Designation", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "", "stddev": "", "min": "No", "max": "Yes", "missing": "0", "distinct": "2"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "First Line Designation"}, {"transformationsData": [{"feature_label": "Weight (Kilograms)", "transformation_label": "String Indexer"}], "feature": "Weight (Kilograms)", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "4254.61", "stddev": "50742.03", "min": "1", "max": "Weight Captured Separately", "missing": "0", "distinct": "261"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Weight (Kilograms)"}, {"transformationsData": [{"feature_label": "Freight Cost (USD)", "transformation_label": "String Indexer"}], "feature": "Freight Cost (USD)", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "500", "mean": "9915.87", "stddev": "12157.84", "min": "0.75", "max": "See ASN-93 (ID#:1281)", "missing": "0", "distinct": "294"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Freight Cost (USD)"}, {"transformationsData": [{"transformation_label": "novalue"}], "feature": "Line Item Insurance (USD)", "type": "real", "selected": "True", "replaceby": "mean", "stats": {"count": "440", "mean": "133.78", "stddev": "346.57", "min": "0.0", "max": "4939.2", "missing": "60"}, "transformation": [{"transformation": "novalue", "selectedAsDefault": 1}], "updatedLabel": "Line Item Insurance (USD)"}]}))

	#transformationPostExecutionHook(scmsdeliveryhistorydatasetclassautofe)

except Exception as ex: 
	logging.error(ex)


***TRAIN MODEL***

In [None]:
#%run scmsdeliveryhistorydatasetclassHooks.ipynb
try:
	#mlPreExecutionHook()

	dataAutoML=functionClassification(scmsdeliveryhistorydatasetclassautofe, ["ID", "Unit of Measure (Per Pack)", "Line Item Quantity", "Pack Price", "Line Item Insurance (USD)", "Project Code_stringindexer", "PQ #_stringindexer", "PO / SO #_stringindexer", "ASN/DN #_stringindexer", "Country_stringindexer", "Managed By_stringindexer", "Fulfill Via_stringindexer", "Vendor INCO Term_stringindexer", "Shipment Mode_stringindexer", "PQ First Sent to Client Date_stringindexer", "PO Sent to Vendor Date_stringindexer", "Scheduled Delivery Date_stringindexer", "Delivered to Client Date_stringindexer", "Delivery Recorded Date_stringindexer", "Product Group_stringindexer", "Sub Classification_stringindexer", "Vendor_stringindexer", "Item Description_stringindexer", "Molecule/Test Type_stringindexer", "Brand_stringindexer", "Dosage_stringindexer", "Dosage Form_stringindexer", "Line Item Value_binarizer", "Manufacturing Site_stringindexer", "First Line Designation_stringindexer", "Weight (Kilograms)_stringindexer", "Freight Cost (USD)_stringindexer"], "Unit Price")

	#mlPostExecutionHook(dataAutoML)

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***PREDICT ON TRAINED MODEL***

In [None]:
import pandas as pd
import numpy as np
import sklearn.metrics

try:
    model=dataAutoML['model']
    X_test=dataAutoML['X_test']
    y_test=dataAutoML['y_test']
    label=dataAutoML['label']
    columnNames=dataAutoML['columnNames']
    if label in columnNames:
        columnNames.remove(label)
    predicted=label+"_predicted"
    y_predicted=model.predict(X_test)
    df =pd.DataFrame(X_test , columns=columnNames)
    df[label]=y_test
    df[predicted]=y_predicted
    columnNames.insert(0,predicted)
    columnNames.insert(0,label)
    Accuracy = np.round((100 * sklearn.metrics.accuracy_score(y_true=y_test, y_pred=y_predicted)), 1)
    F1= np.round(
            (100 * sklearn.metrics.f1_score(y_true=y_test, y_pred=y_predicted, average="weighted")), 1)
    Precision= np.round((
                100 * sklearn.metrics.precision_score(y_true=y_test, y_pred=y_predicted, average="weighted")), 1)
    Recall = np.round((
                100 * sklearn.metrics.recall_score(y_true=y_test, y_pred=y_predicted, average="weighted")), 1)
    display(" Accuracy of Prediction on test data    : %s"%Accuracy)
    display(" F1 score of Prediction on test data    : %s"%F1)
    display(" Precision of Prediction on test data   : %s"%Precision)
    display(" Recall of Prediction on test data      : %s"%Recall)
    display(df.head())
except Exception as ex:
    logging.error(ex)

spark.stop()

