***GENERATED CODE FOR datasciencejobssalaries2024 PIPELINE.***

***DON'T EDIT THIS CODE.***

***CONNECTOR FUNCTIONS TO READ DATA.***

In [None]:
import os
import datetime
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)


class HDFSConnector:

    def fetch(spark, config):
        ################### INPUT HADOOP HOST PORT TO CONNECT WITH ###############################
        hdfs_server = str(os.environ['HDFS_SERVER'])
        hdfs_port = int(os.environ['HDFS_PORT'])
        df = spark.read.options(header='true', inferschema='true').csv(
            f"hdfs://{hdfs_server}:{hdfs_port}{eval(config)['url']}", header='true')
        display(df.limit(2).toPandas())
        return df

    def put(df, spark, config):
        return df.write.format('csv').options(header='true' if eval(config)["is_header"] == "Use Header Line" else 'false',
                                              delimiter=eval(config)["delimiter"]).save(("%s %s") % (datetime.datetime.now().strftime("%Y-%m-%d %H.%M.%S")+"_", eval(config)['url']))


***TRANSFORMATIONS FUNCTIONS THAT WILL BE APPLIED ON DATA***

In [None]:
import json
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import mean, stddev, min, max, col


class CleanseData:
    # def __init__(self,df):
    #     #print()

    def cleanValueForFE(self, value):
        if value == None:
            return ""
        elif str(value) == 'nan':
            return "nan"
        else:
            return value

    def replaceByMean(self, feature, df, mean_=-1):
        df1 = df
        df1 = df1.dropna()
        meanValue = self.cleanValueForFE(df1.select(
            mean(col(feature.name)).alias('mean')).collect()[0]["mean"])
        df = df.fillna(meanValue, subset=[feature.name])
        df.withColumn(feature.name, when(col(feature.name) == " ",
                      meanValue).otherwise(col(feature.name).cast("Integer")))
        return df

    def replaceByMax(self, feature, df, max_=-1):
        df1 = df
        df1 = df1.dropna()
        maxValue = self.cleanValueForFE(df1.select(
            max(col(feature.name)).alias('max')).collect()[0]["max"])
        df = df.fillna(maxValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", maxValue).otherwise(col(feature.name)))
        return df

    def replaceByMin(self, feature, df, min_=-1):
        df1 = df
        df1 = df1.dropna()
        minValue = self.cleanValueForFE(df1.select(
            min(col(feature.name)).alias('min')).collect()[0]["min"])
        df = df.fillna(minValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", minValue).otherwise(col(feature.name)))
        return df

    def replaceByStandardDeviation(self, feature, df, stddev_=-1):
        df1 = df
        df1 = df1.dropna()
        stddevValue = self.cleanValueForFE(df1.select(
            stddev(col(feature.name)).alias('stddev')).collect()[0]["stddev"])
        df = df.fillna(stddevValue, subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", stddevValue).otherwise(col(feature.name)))
        return df

    def replaceDateRandomly(self, feature, df):
        df1 = df
        df1 = df1.dropna()
        fillValue = self.cleanValueForFE(
            df.where(col(feature.name).isNotNull()).head(1)[0][feature.name])
        df = df.fillna(str(fillValue), subset=[feature.name])
        df = df.withColumn(feature.name,
                           when(col(feature.name) == " ", fillValue).otherwise(col(feature.name)))
        # print("CleanseData:replaceDateRandomly Schema : ", df.#printSchema())
        return df

    def replaceNullValues(self, fList, df):
        featuresList = df.schema.fields
        for featureObj in fList:
            for feat in featuresList:
                if featureObj["feature"] in feat.name:
                    featureName = feat
                    if "mean" in featureObj["replaceby"]:
                        df = self.replaceByMean(featureName, df)
                    elif "max" in featureObj["replaceby"]:
                        df = self.replaceByMax(featureName, df)
                    elif "min" in featureObj["replaceby"]:
                        df = self.replaceByMin(featureName, df)
                    elif "stddev" in featureObj["replaceby"]:
                        df = self.replaceByStandardDeviation(featureName, df)
                    elif "random" in featureObj["replaceby"]:
                        df = self.replaceDateRandomly(featureName, df)
        return df


def StringIndexerTransform(df, params, transformationData={}):
    dfReturn = df
    feature = params["feature"]

    dfReturn = dfReturn.fillna({feature: ''})
    outcol = feature + "_stringindexer"
    indexer = StringIndexer(
        inputCol=feature, outputCol=outcol, handleInvalid="skip")
    indexed = indexer.fit(dfReturn).transform(dfReturn)
    dfReturn = indexed
    distinct_values_list = dfReturn.select(
        outcol).distinct().rdd.map(lambda r: r[0]).collect()
    len_distinct_values_list = len(distinct_values_list)
    if len_distinct_values_list <= 4:
        changed_type_df = dfReturn.withColumn(
            outcol, dfReturn[outcol].cast(IntegerType()))
        return changed_type_df
    return dfReturn


class TransformationMain:
    # TODO: change df argument in run with following
    def run(transformationDF, config):
        configObj = json.loads(config)
        featureData = configObj["FE"]
        transformationDF = CleanseData().replaceNullValues(featureData, transformationDF)
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Column1', 'transformation_label': 'String Indexer'}], 'feature': 'Column1', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
                                                  'count': '39648', 'mean': '29739.92', 'stddev': '868797.31', 'min': '""Education:', 'max': '�and more!', 'missing': '0', 'distinct': '16105'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Column1'}, {'feature_label': 'Column1', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Column1')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Job Title', 'transformation_label': 'String Indexer'}], 'feature': 'Job Title', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {'count': '39648', 'mean': '59.0', 'stddev': '317.73', 'min': ' #37)',
                                                                                                                                                                                                                                                                    'max': '� we will continue to operate as near to normal as possible at a time when our services are most needed by the construction industry we serve.', 'missing': '0', 'distinct': '5387'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Job Title'}, {'feature_label': 'Job Title', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Job Title')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Salary Estimate', 'transformation_label': 'String Indexer'}], 'feature': 'Salary Estimate', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '75.79', 'stddev': '371.24', 'min': ' ""Green Card Holder"")', 'max': '� responsible for delivering value to our customers. In this role', 'missing': '0', 'distinct': '4365'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Salary Estimate'}, {'feature_label': 'Salary Estimate', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Salary Estimate')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Job Description', 'transformation_label': 'String Indexer'}], 'feature': 'Job Description', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {'count': '39648', 'mean': '1924.96', 'stddev': '11827.22', 'min': " & reports laboratory test data; enters data into LIS system and reviews it's completeness and accuracy. Reports and interprets data to physicians",
                                                                                                                                                                                                                                                                                'max': '��Missile Systems provides many different high-technology weapon systems solutions to our defense customers. The Systems Test Directorate at MS is seeking candidates for the Integration & Verification Center.', 'missing': '0', 'distinct': '2602'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Job Description'}, {'feature_label': 'Job Description', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Job Description')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Rating', 'transformation_label': 'String Indexer'}], 'feature': 'Rating', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '1762.41', 'stddev': '608.15', 'min': ' (Nasdaq: RUBY)', 'max': 'Wish', 'missing': '0', 'distinct': '1597'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Rating'}, {'feature_label': 'Rating', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Rating')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Company Name', 'transformation_label': 'String Indexer'}], 'feature': 'Company Name', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '74.22', 'stddev': '225.85', 'min': ' & more!', 'max': '� new and emerging approaches Mars hasn�t seen before.', 'missing': '0', 'distinct': '1028'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Company Name'}, {'feature_label': 'Company Name', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Company Name')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Location', 'transformation_label': 'String Indexer'}], 'feature': 'Location', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '107.57', 'stddev': '454.06', 'min': ' 10-key calculator and copier/fax.', 'max': 'Wholesale', 'missing': '0', 'distinct': '677'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Location'}, {'feature_label': 'Location', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Location')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Headquarters', 'transformation_label': 'String Indexer'}], 'feature': 'Headquarters', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '2.66', 'stddev': '2.23', 'min': ' & Facilities.', 'max': 'Vail Health', 'missing': '0', 'distinct': '483'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Headquarters'}, {'feature_label': 'Headquarters', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Headquarters')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Size', 'transformation_label': 'String Indexer'}], 'feature': 'Size', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '2.61', 'stddev': '2.02', 'min': ' ACL', 'max': 'Utilities', 'missing': '0', 'distinct': '358'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Size'}, {'feature_label': 'Size', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Size')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Founded', 'transformation_label': 'String Indexer'}], 'feature': 'Founded', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '-0.95', 'stddev': '0.48', 'min': ' 15 free therapy sessions + unlimited copay reimbursements for mental healthcare', 'max': 'eClinicalWorks, NextGen Healthcare, athenahealth', 'missing': '0', 'distinct': '429'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Founded'}, {'feature_label': 'Founded', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Founded')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Type of ownership', 'transformation_label': 'String Indexer'}], 'feature': 'Type of ownership', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '0.2', 'stddev': '1.0', 'min': ' 2018. For more information', 'max': 'Vermeer', 'missing': '0', 'distinct': '222'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Type of ownership'}, {'feature_label': 'Type of ownership', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Type of ownership')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Industry', 'transformation_label': 'String Indexer'}], 'feature': 'Industry', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '0.27', 'stddev': '1.23', 'min': ' AD&D insurance', 'max': 'Unknown / Non-Applicable', 'missing': '0', 'distinct': '188'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Industry'}, {'feature_label': 'Industry', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Industry')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Sector', 'transformation_label': 'String Indexer'}], 'feature': 'Sector', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '65.65', 'stddev': '36.57', 'min': ' 401(k) plan', 'max': 'Volume Integration, LLC', 'missing': '0', 'distinct': '250'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Sector'}, {'feature_label': 'Sector', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Sector')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Revenue', 'transformation_label': 'String Indexer'}], 'feature': 'Revenue', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '122.12', 'stddev': '52.41', 'min': ' Active Duty Wartime or Campaign Badge Veteran', 'max': 'Vionic Group', 'missing': '0', 'distinct': '275'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Revenue'}, {'feature_label': 'Revenue', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Revenue')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'Competitors', 'transformation_label': 'String Indexer'}], 'feature': 'Competitors', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '93.41', 'stddev': '44.05', 'min': ' Armed Forces Services Medal', 'max': 'Truckstop.com', 'missing': '0', 'distinct': '238'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'Competitors'}, {'feature_label': 'Competitors', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('Competitors')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'hourly', 'transformation_label': 'String Indexer'}], 'feature': 'hourly', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '22.47', 'stddev': '68.36', 'min': ' Business Process Reengineering (BPR)', 'max': 'webfx.com', 'missing': '0', 'distinct': '387'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'hourly'}, {'feature_label': 'hourly', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('hourly')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'employer_provided', 'transformation_label': 'String Indexer'}], 'feature': 'employer_provided', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {'count': '39648', 'mean': '47.96', 'stddev': '98.97', 'min': ' Cloud Computing',
                                                                                                                                                                                                                                                                                    'max': 'Works with engineers to evangelize data best practices and implement analytics solutions.', 'missing': '0', 'distinct': '64'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'employer_provided'}, {'feature_label': 'employer_provided', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('employer_provided')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'min_salary', 'transformation_label': 'String Indexer'}], 'feature': 'min_salary', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '3.6', 'stddev': '0.0', 'min': ' ""protected statuses""). We do not tolerate unlawful discrimination in any employment decisions', 'max': 'Remedy BPCI Partners, LLC.', 'missing': '0', 'distinct': '54'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'min_salary'}, {'feature_label': 'min_salary', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('min_salary')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'max_salary', 'transformation_label': 'String Indexer'}], 'feature': 'max_salary', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '3.2', 'stddev': '0.34', 'min': ' CA', 'max': 'TRANZACT', 'missing': '0', 'distinct': '54'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'max_salary'}, {'feature_label': 'max_salary', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('max_salary')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'avg_salary', 'transformation_label': 'String Indexer'}], 'feature': 'avg_salary', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '38.89', 'stddev': '56.51', 'min': ' CA', 'max': 'The Climate Corporation', 'missing': '0', 'distinct': '41'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'avg_salary'}, {'feature_label': 'avg_salary', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('avg_salary')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'company_txt', 'transformation_label': 'String Indexer'}], 'feature': 'company_txt', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '3.0', 'stddev': 'nan', 'min': ' HTML', 'max': 'Unknown', 'missing': '0', 'distinct': '31'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'company_txt'}, {'feature_label': 'company_txt', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('company_txt')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'job_state', 'transformation_label': 'String Indexer'}], 'feature': 'job_state', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '2.9', 'stddev': '0.04', 'min': ' CSS', 'max': '2.9', 'missing': '0', 'distinct': '25'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'job_state'}, {'feature_label': 'job_state', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('job_state')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'same_state', 'transformation_label': 'String Indexer'}], 'feature': 'same_state', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '1.65', 'stddev': '2.52', 'min': ' Angular', 'max': 'Karyopharm Therapeutics Inc.', 'missing': '0', 'distinct': '24'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'same_state'}, {'feature_label': 'same_state', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('same_state')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'age', 'transformation_label': 'String Indexer'}], 'feature': 'age', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '3.08', 'stddev': '1.78', 'min': ' Kafka', 'max': 'Unknown / Non-Applicable', 'missing': '0', 'distinct': '23'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'age'}, {'feature_label': 'age', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('age')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'python_yn', 'transformation_label': 'String Indexer'}], 'feature': 'python_yn', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '1.1', 'stddev': '2.15', 'min': ' AWS', 'max': 'Pro-Sphere Tek', 'missing': '0', 'distinct': '19'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'python_yn'}, {'feature_label': 'python_yn', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('python_yn')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'R_yn', 'transformation_label': 'String Indexer'}], 'feature': 'R_yn', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '0.08', 'stddev': '0.51', 'min': ' Azure', 'max': 'Excellent written and verbal communication skills.', 'missing': '0', 'distinct': '16'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'R_yn'}, {'feature_label': 'R_yn', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('R_yn')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'spark', 'transformation_label': 'String Indexer'}], 'feature': 'spark', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '', 'stddev': '', 'min': " GCP is preferred. Excellent interpersonal skills and the ability to work well individually and as a member of a project team are required. Excellent written and verbal communication skills required. A valid/clear driver's license is required.", 'max': 'LEAH Labs', 'missing': '0', 'distinct': '11'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'spark'}, {'feature_label': 'spark', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('spark')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'aws', 'transformation_label': 'String Indexer'}], 'feature': 'aws', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '', 'stddev': '', 'min': ' Python', 'max': 'Rochester, MN', 'missing': '0', 'distinct': '9'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'aws'}, {'feature_label': 'aws', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('aws')
        transformationDF = StringIndexerTransform(transformationDF, {'transformationsData': [{'feature_label': 'excel', 'transformation_label': 'String Indexer'}], 'feature': 'excel', 'type': 'string', 'selected': 'True', 'replaceby': 'max', 'stats': {
            'count': '39648', 'mean': '3.0', 'stddev': '0.0', 'min': ' BASH', 'max': 'Rochester, MN', 'missing': '0', 'distinct': '7'}, 'transformation': [{'transformation': 'String Indexer', 'selectedAsDefault': 1}], 'updatedLabel': 'excel'}, {'feature_label': 'excel', 'transformation_label': 'String Indexer'})
        transformationDF = transformationDF.drop('excel')
        display(transformationDF.limit(2).toPandas())
        return transformationDF


***AUTOML FUNCTIONS***

In [None]:
from sklearn.model_selection import train_test_split
from tpot import TPOTRegressor
import pyspark


def functionRegression(sparkDF, listOfFeatures, label):
    sparkDF.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
    df = sparkDF.toPandas()
    X = (df.drop(label, axis=1))[listOfFeatures].values
    y = df[label].values
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, random_state=1, test_size=0.1)
    tpotModel = TPOTRegressor(verbosity=3, generations=10, max_time_mins=5,
                              n_jobs=-1, random_state=25, population_size=15, use_dask=True)
    tpotModel.fit(X_train, y_train)
    display(" Error rate of Model : %s" % tpotModel.score(X_test, y_test))
    data = {'model': tpotModel,
            'X_test': X_test,
            'y_test': y_test,
            'label': label,
            'columnNames': listOfFeatures}
    return data


***READING DATAFRAME***

In [None]:
############## CREATE SPARK SESSION ############################ ENTER YOUR SPARK MASTER IP AND PORT TO CONNECT TO SERVER ################
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[1]').getOrCreate()
#%run datasciencejobssalaries2024Hooks.ipynb
try:
	#sourcePreExecutionHook()

	jobssalaryglassdoor = HDFSConnector.fetch(spark, "{'url': '/FileStore/platform/uploadedSourceFiles/Jobs_Salary_Glassdoor.csv', 'filename': 'Jobs_Salary_Glassdoor.csv', 'delimiter': ',', 'file_type': 'Delimeted', 'dbfs_token': '', 'dbfs_domain': '', 'is_header': 'Use Header Line', 'server_url': '/nexusMax/NexusMaxPlatform/uploads/platform/', 'results_url': 'http://ml.colaberry.com:44040/api/read/hdfs'}")

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***TRANSFORMING DATAFRAME***

In [None]:
#%run datasciencejobssalaries2024Hooks.ipynb
try:
	#transformationPreExecutionHook()

	jobssalary = TransformationMain.run(jobssalaryglassdoor,json.dumps( {"FE": [{"transformationsData": [{"feature_label": "Column1", "transformation_label": "String Indexer"}], "feature": "Column1", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "29739.92", "stddev": "868797.31", "min": "\"\"Education:", "max": "\ufffdand more!", "missing": "0", "distinct": "16105"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Column1"}, {"transformationsData": [{"feature_label": "Job Title", "transformation_label": "String Indexer"}], "feature": "Job Title", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "59.0", "stddev": "317.73", "min": " #37)", "max": "\ufffd we will continue to operate as near to normal as possible at a time when our services are most needed by the construction industry we serve.", "missing": "0", "distinct": "5387"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Job Title"}, {"transformationsData": [{"feature_label": "Salary Estimate", "transformation_label": "String Indexer"}], "feature": "Salary Estimate", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "75.79", "stddev": "371.24", "min": " \"\"Green Card Holder\"\")", "max": "\ufffd responsible for delivering value to our customers. In this role", "missing": "0", "distinct": "4365"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Salary Estimate"}, {"transformationsData": [{"feature_label": "Job Description", "transformation_label": "String Indexer"}], "feature": "Job Description", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "1924.96", "stddev": "11827.22", "min": " & reports laboratory test data; enters data into LIS system and reviews it's completeness and accuracy. Reports and interprets data to physicians", "max": "\ufffd\ufffdMissile Systems provides many different high-technology weapon systems solutions to our defense customers. The Systems Test Directorate at MS is seeking candidates for the Integration & Verification Center.", "missing": "0", "distinct": "2602"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Job Description"}, {"transformationsData": [{"feature_label": "Rating", "transformation_label": "String Indexer"}], "feature": "Rating", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "1762.41", "stddev": "608.15", "min": " (Nasdaq: RUBY)", "max": "Wish", "missing": "0", "distinct": "1597"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Rating"}, {"transformationsData": [{"feature_label": "Company Name", "transformation_label": "String Indexer"}], "feature": "Company Name", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "74.22", "stddev": "225.85", "min": " & more!", "max": "\ufffd new and emerging approaches Mars hasn\ufffdt seen before.", "missing": "0", "distinct": "1028"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Company Name"}, {"transformationsData": [{"feature_label": "Location", "transformation_label": "String Indexer"}], "feature": "Location", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "107.57", "stddev": "454.06", "min": " 10-key calculator and copier/fax.", "max": "Wholesale", "missing": "0", "distinct": "677"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Location"}, {"transformationsData": [{"feature_label": "Headquarters", "transformation_label": "String Indexer"}], "feature": "Headquarters", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "2.66", "stddev": "2.23", "min": " & Facilities.", "max": "Vail Health", "missing": "0", "distinct": "483"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Headquarters"}, {"transformationsData": [{"feature_label": "Size", "transformation_label": "String Indexer"}], "feature": "Size", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "2.61", "stddev": "2.02", "min": " ACL", "max": "Utilities", "missing": "0", "distinct": "358"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Size"}, {"transformationsData": [{"feature_label": "Founded", "transformation_label": "String Indexer"}], "feature": "Founded", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "-0.95", "stddev": "0.48", "min": " 15 free therapy sessions + unlimited copay reimbursements for mental healthcare", "max": "eClinicalWorks, NextGen Healthcare, athenahealth", "missing": "0", "distinct": "429"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Founded"}, {"transformationsData": [{"feature_label": "Type of ownership", "transformation_label": "String Indexer"}], "feature": "Type of ownership", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "0.2", "stddev": "1.0", "min": " 2018. For more information", "max": "Vermeer", "missing": "0", "distinct": "222"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Type of ownership"}, {"transformationsData": [{"feature_label": "Industry", "transformation_label": "String Indexer"}], "feature": "Industry", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "0.27", "stddev": "1.23", "min": " AD&D insurance", "max": "Unknown / Non-Applicable", "missing": "0", "distinct": "188"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Industry"}, {"transformationsData": [{"feature_label": "Sector", "transformation_label": "String Indexer"}], "feature": "Sector", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "65.65", "stddev": "36.57", "min": " 401(k) plan", "max": "Volume Integration, LLC", "missing": "0", "distinct": "250"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Sector"}, {"transformationsData": [{"feature_label": "Revenue", "transformation_label": "String Indexer"}], "feature": "Revenue", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "122.12", "stddev": "52.41", "min": " Active Duty Wartime or Campaign Badge Veteran", "max": "Vionic Group", "missing": "0", "distinct": "275"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Revenue"}, {"transformationsData": [{"feature_label": "Competitors", "transformation_label": "String Indexer"}], "feature": "Competitors", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "93.41", "stddev": "44.05", "min": " Armed Forces Services Medal", "max": "Truckstop.com", "missing": "0", "distinct": "238"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "Competitors"}, {"transformationsData": [{"feature_label": "hourly", "transformation_label": "String Indexer"}], "feature": "hourly", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "22.47", "stddev": "68.36", "min": " Business Process Reengineering (BPR)", "max": "webfx.com", "missing": "0", "distinct": "387"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "hourly"}, {"transformationsData": [{"feature_label": "employer_provided", "transformation_label": "String Indexer"}], "feature": "employer_provided", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "47.96", "stddev": "98.97", "min": " Cloud Computing", "max": "Works with engineers to evangelize data best practices and implement analytics solutions.", "missing": "0", "distinct": "64"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "employer_provided"}, {"transformationsData": [{"feature_label": "min_salary", "transformation_label": "String Indexer"}], "feature": "min_salary", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "3.6", "stddev": "0.0", "min": " \"\"protected statuses\"\"). We do not tolerate unlawful discrimination in any employment decisions", "max": "Remedy BPCI Partners, LLC.", "missing": "0", "distinct": "54"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "min_salary"}, {"transformationsData": [{"feature_label": "max_salary", "transformation_label": "String Indexer"}], "feature": "max_salary", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "3.2", "stddev": "0.34", "min": " CA", "max": "TRANZACT", "missing": "0", "distinct": "54"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "max_salary"}, {"transformationsData": [{"feature_label": "avg_salary", "transformation_label": "String Indexer"}], "feature": "avg_salary", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "38.89", "stddev": "56.51", "min": " CA", "max": "The Climate Corporation", "missing": "0", "distinct": "41"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "avg_salary"}, {"transformationsData": [{"feature_label": "company_txt", "transformation_label": "String Indexer"}], "feature": "company_txt", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "3.0", "stddev": "nan", "min": " HTML", "max": "Unknown", "missing": "0", "distinct": "31"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "company_txt"}, {"transformationsData": [{"feature_label": "job_state", "transformation_label": "String Indexer"}], "feature": "job_state", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "2.9", "stddev": "0.04", "min": " CSS", "max": "2.9", "missing": "0", "distinct": "25"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "job_state"}, {"transformationsData": [{"feature_label": "same_state", "transformation_label": "String Indexer"}], "feature": "same_state", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "1.65", "stddev": "2.52", "min": " Angular", "max": "Karyopharm Therapeutics Inc.", "missing": "0", "distinct": "24"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "same_state"}, {"transformationsData": [{"feature_label": "age", "transformation_label": "String Indexer"}], "feature": "age", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "3.08", "stddev": "1.78", "min": " Kafka", "max": "Unknown / Non-Applicable", "missing": "0", "distinct": "23"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "age"}, {"transformationsData": [{"feature_label": "python_yn", "transformation_label": "String Indexer"}], "feature": "python_yn", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "1.1", "stddev": "2.15", "min": " AWS", "max": "Pro-Sphere Tek", "missing": "0", "distinct": "19"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "python_yn"}, {"transformationsData": [{"feature_label": "R_yn", "transformation_label": "String Indexer"}], "feature": "R_yn", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "0.08", "stddev": "0.51", "min": " Azure", "max": "Excellent written and verbal communication skills.", "missing": "0", "distinct": "16"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "R_yn"}, {"transformationsData": [{"feature_label": "spark", "transformation_label": "String Indexer"}], "feature": "spark", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "", "stddev": "", "min": " GCP is preferred. Excellent interpersonal skills and the ability to work well individually and as a member of a project team are required. Excellent written and verbal communication skills required. A valid/clear driver's license is required.", "max": "LEAH Labs", "missing": "0", "distinct": "11"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "spark"}, {"transformationsData": [{"feature_label": "aws", "transformation_label": "String Indexer"}], "feature": "aws", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "", "stddev": "", "min": " Python", "max": "Rochester, MN", "missing": "0", "distinct": "9"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "aws"}, {"transformationsData": [{"feature_label": "excel", "transformation_label": "String Indexer"}], "feature": "excel", "type": "string", "selected": "True", "replaceby": "max", "stats": {"count": "39648", "mean": "3.0", "stddev": "0.0", "min": " BASH", "max": "Rochester, MN", "missing": "0", "distinct": "7"}, "transformation": [{"transformation": "String Indexer", "selectedAsDefault": 1}], "updatedLabel": "excel"}]}))

	#transformationPostExecutionHook(jobssalary)

except Exception as ex: 
	logging.error(ex)


***TRAIN MODEL***

In [None]:
#%run datasciencejobssalaries2024Hooks.ipynb
try:
	#mlPreExecutionHook()

	dataAutoML=functionRegression(jobssalary, ["Column1_stringindexer", "Job Title_stringindexer", "Job Description_stringindexer", "Rating_stringindexer", "Company Name_stringindexer", "Location_stringindexer", "Headquarters_stringindexer", "Size_stringindexer", "Founded_stringindexer", "Type of ownership_stringindexer", "Industry_stringindexer", "Sector_stringindexer", "Revenue_stringindexer", "Competitors_stringindexer", "hourly_stringindexer", "employer_provided_stringindexer", "min_salary_stringindexer", "max_salary_stringindexer", "avg_salary_stringindexer", "company_txt_stringindexer", "job_state_stringindexer", "same_state_stringindexer", "age_stringindexer", "python_yn_stringindexer", "R_yn_stringindexer", "spark_stringindexer", "aws_stringindexer", "excel_stringindexer"], "Salary Estimate_stringindexer")

	#mlPostExecutionHook(dataAutoML)

except Exception as ex: 
	logging.error(ex)
#spark.stop()


***PREDICT ON TRAINED MODEL***

In [None]:
import pandas as pd
import numpy as np
import sklearn.metrics

try:
    model=dataAutoML ['model']
    X_test=dataAutoML['X_test']
    y_test=dataAutoML['y_test']
    label=dataAutoML['label']
    columnNames=dataAutoML['columnNames']
    if label in columnNames:
        columnNames.remove(label)
    predicted=label+"_predicted"
    y_predicted=model.predict(X_test)
    df =pd.DataFrame(X_test , columns=columnNames)
    df[label]=y_test
    df[predicted]=y_predicted
    columnNames.insert(0,predicted)
    columnNames.insert(0,label)
    df = df[columnNames]
    R2 = np.round(sklearn.metrics.r2_score(y_test, y_predicted), 1)
    Mean_Squared_Error = np.round(sklearn.metrics.mean_squared_error(y_test, y_predicted), 1)
    Mean_Absolute_Error = np.round(sklearn.metrics.mean_absolute_error(y_test, y_predicted), 1)
    display(" R2 score of Prediction on test data    : %s"%R2)
    display(" Mean Squared Error of Prediction on test data    : %s"%Mean_Squared_Error)
    display(" Mean Absolute Error of Prediction on test data   : %s"%Mean_Absolute_Error)
    display(df.head())
except Exception as ex:
    logging.error(ex)

spark.stop()

