In [1]:
import json
import pandas as pd
import tflearn
import numpy as np

from datetime import datetime
from parameters import Parameters
from pyspark.sql import SparkSession
from functions import Functions
from DataCleansing import DataCleansing
from Training import Training
from collections import Counter
from imblearn.over_sampling import SMOTE
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder

Instructions for updating:
non-resource variables are not supported in the long term
curses is not supported on this machine (please install/reinstall curses for an optimal experience)


In [2]:
class Functions:

    # Reads JSON file and saves "config" as jsonData
    @staticmethod
    def readJSON():
        with open('data/model_data.json') as f:
            data = json.load(f)
            jsonData = data['config']
        return jsonData

    # Loads CSV file and applies configuration
    @staticmethod
    def loadCsvFile(spark, file_name):
        print("Loading CSV Data")
        spark_df = spark.read.format("csv") \
            .option("delimiter", ",") \
            .option("inferSchema", "true") \
            .option("header", True) \
            .load(Parameters.CSV_PATH + file_name)

        # Transform the Apache dataframe into Pandas dataframe
        df_data = spark_df.select('*').toPandas()

        # Reads data points from JSON
        json_data = Functions.readJSON()
        man = json_data['list_of_datapoint']['mandatory']
        opt = json_data['list_of_datapoint']['optional']

        # Combines data points and saves them as preferred_datafields
        preferred_datafields = man + opt

        if preferred_datafields != '':
            df_data = df_data[preferred_datafields]
        return df_data

    # Checks missing values
    @staticmethod
    def checkMissingValues(df_clean):
        percent_missing = df_clean.isnull().sum() * 100 / len(df_clean)
        return percent_missing

    # Get name of the columns with missing values
    @staticmethod
    def getMissingValueColumns(df_clean, required_percentage):
        columns = df_clean.columns
        percent = Functions.checkMissingValues(df_clean)
        percent_missing_val = pd.DataFrame({'column_name': columns, 'percent_missing': percent}).sort_values(
            by='percent_missing', ascending=True)
        # print(percent_missing_val)

        # if "percent_missing" >50%, then noting as 'higher missing values'
        missing_val_columns = percent_missing_val.loc[
            percent_missing_val['percent_missing'] > required_percentage].index.tolist()
        # print("Columns with more than 50% of the data missing: ", missing_val_columns)

        return missing_val_columns

    @staticmethod
    def missingValueImputation(df_clean):
        df_clean['Position'] = df_clean.groupby('Department').Position.transform(lambda x: x.fillna(x.mode()[0]))
        return df_clean

    @staticmethod
    def dropColumns(df_clean, column):
        df_clean = df_clean.drop(column, axis=1)
        return df_clean

In [3]:
class Parameters:

    CSV_PATH = 'data/'
    DATE_FORMAT = '%Y-%m-%d'

In [4]:
class DataCleansing:
    # Converts into DATE_FORMAT
    @staticmethod
    def convertDateTime(df_data, column):
        df_data[column] = df_data[column].apply(
            lambda x: np.nan if (x == 'null' or x == '0' or x == 0 or x is None or x == np.nan) else datetime.utcfromtimestamp(
                int(x)).strftime(Parameters.DATE_FORMAT))
        # print(df_data)
        return df_data

    # Data Cleaning
    @staticmethod
    def dataCleaning(df_data):
        # check how long the dataframe is
        # df = len(df_data)
        # print(df)

        # number of columns
        # df = df_data.columns
        # df = df.size
        # print(df)

        # number of Nan values in each column
        # for column in df_data.columns:
        #     df = df_data[column].isnull()
        #     print(column, " - ", df)

        # check if the values in the given column in unique
        # df_data['Employee_ID'].is_unique
        # print('Employee ID is unique')

        # Converts null and 0 into np.nan
        for column in df_data.columns:
            if column in ('Employee_ID', 'Education', 'Job_Satisfaction', 'Marital_Status'):
                pass
            elif column == 'Years_At_Company':
                df_data[column] = df_data[column].fillna(0)
                if column in 'Years_At_Company':
                    df_data = DataCleansing.convertDateTime(df_data, 'Years_At_Company')
            elif column == 'Start_Date':
                df_data[column] = df_data[column].fillna(0)
                if column in 'Start_Date':
                    df_data = DataCleansing.convertDateTime(df_data, 'Start_Date')
            else:
                df_data[column] = df_data[column].apply(lambda x: np.nan if x == 'null' or x == '0' or x == 0 or
                                                                            x == None else x)
        return df_data

    # Imputation
    @staticmethod
    def imputeOnce(df_clean):
        # data = self.loadCsvFile('HR-Employee-Dataset.csv')
        # dataframe = self.replaceNaN(data)
        # print(df_clean.columns)
        imputer = IterativeImputer()
        imputer.fit(df_clean)
        Xtrans = imputer.transform(df_clean)
        imputed_df = pd.DataFrame(Xtrans, columns=df_clean.columns)
        # print(imputed_df)
        # print('Missing after impute: %d' % sum(isnan(Xtrans).flatten()))
        # Utilities.exportCsvFile(imputed_df, file_name='missing_val_imputed_dataframe')
        return imputed_df

    @staticmethod
    def labelEncoding(df_data):
        # get categorical columns
        labelEncoding = preprocessing.LabelEncoder()
        catCols = df_data.select_dtypes("object").columns
        # print(catCols)
        for column in catCols:
            df_data[column] = labelEncoding.fit_transform(df_data[column])
        return df_data

In [5]:
class Training:
    @staticmethod
    def X_prepare(df_clean):
        # print(df_clean.columns)
        X_label = df_clean[['Position', 'Department', 'Gender', 'Education_Field']]
        # X_label_columns = X_label.columns
        X_number = df_clean[['Age', 'Education',
                             'Job_Satisfaction', 'Hourly_Rate', 'Monthly_Rate',
                             'Monthly_Income',
                             'Percent_Salary_Hike', 'Performance_Rating', 'Total_Working_Years',
                             'Work_Life_Balance', 'Start_Date'
                             ]]
        encoder = OneHotEncoder()
        X_label_onehot = encoder.fit_transform(X_label).toarray()
        scaler = StandardScaler()
        X_number_standard = scaler.fit_transform(X_number)
        # X_number_pd = pd.DataFrame(X_number_standard, columns=X_number.columns)
        X = np.concatenate((X_number_standard, X_label_onehot), axis=1)
        # print('X-->', pd.DataFrame(X))
        # print('X.shape: ', X.shape)
        return X

    @staticmethod
    def y_prepare(y_sampling):
        y = y_sampling[:, np.newaxis]
        encoder = OneHotEncoder()
        y = encoder.fit_transform(y).toarray()
        # print(y.shape)
        return y

    @staticmethod
    def imbalanceAndModelTraining(df_clean):
        y_sampling = df_clean['is_active']
        x_sampling = df_clean.drop(columns=['is_active'])
        counter = Counter(y_sampling)
        # print(counter)

        if df_clean.isnull().sum().sum() == 0:
            # SMOTE
            oversample = SMOTE(random_state=42, k_neighbors=1)
            x_smote, y_smote = oversample.fit_sample(x_sampling, y_sampling)
            counter = Counter(y_smote)
            # print(counter)
            X = Training.X_prepare(df_clean)
            y = Training.y_prepare(y_sampling)
            # Splitting the dataset into the Training set and Test set
            x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=0)

            # feature importance
            rf_params = {
                'n_jobs': -1,
                'n_estimators': 1000,
                #     'warm_start': True,
                'max_features': 0.3,
                'max_depth': 4,
                'min_samples_leaf': 2,
                'max_features': 'sqrt',
                'random_state': 0,
                'verbose': 0
            }
            x_train_fea, x_test_fea, y_train_fea, y_test_fea = train_test_split(x_sampling, y_sampling, test_size=0.20,
                                                                                random_state=0)
            rfc = RandomForestClassifier(**rf_params)
            rfc.fit(x_train_fea, y_train_fea)
            importances = rfc.feature_importances_
            importances = pd.DataFrame({'feature': x_sampling, 'importance': 100 * np.round(importances, 3)})
            importances = importances.sort_values('importance', ascending=False).set_index('feature')

            print('-0-----------------------', importances)

            # Model Training
            net = tflearn.input_data(shape=[None, X.shape[1]])
            net = tflearn.fully_connected(net, 6, activation='relu')
            net = tflearn.fully_connected(net, 6, activation='relu')
            net = tflearn.fully_connected(net, 6, activation='relu')
            net = tflearn.fully_connected(net, 2, activation='softmax')
            net = tflearn.regression(net)
            model = tflearn.DNN(net)
            model.fit(x_train, y_train, n_epoch=120, batch_size=32, show_metric=True)

            y_pred = model.predict(X)

            df_turnover = pd.DataFrame(
                {'emp_identifier': df_clean.Employee_ID, 'turnover_percent': 100 * y_pred[:, 0]})
            print(df_turnover)

            score_test = model.evaluate(x_test, y_test)
            print('X_test, y_test Accuracy: %0.4f%%' % (score_test[0] * 100))

        return df_clean

In [6]:
func = Functions()
class MainNew:

    def __init__(self):
        self.spark = SparkSession.builder.appName("MLOps").getOrCreate()

    # Function to run all the functions
    def run(self):
        raw_data = func.loadCsvFile(spark=self.spark, file_name='HR-Employee-Dataset-Large.csv')
        df_clean = DataCleansing.dataCleaning(raw_data)
        qualify_data = func.checkMissingValues(df_clean)
        unqualified_columns = func.getMissingValueColumns(df_clean, 50)
        df_clean = func.dropColumns(df_clean, unqualified_columns)
        df_clean = func.missingValueImputation(df_clean)
        df_encode = DataCleansing.labelEncoding(df_clean)
        df_clean = DataCleansing.imputeOnce(df_encode)
        df_clean = Training.imbalanceAndModelTraining(df_clean)
        # # test.printCSV(df_clean)

if __name__ == '__main__':
    mainNew = MainNew()
    mainNew.run()

Training Step: 4439  | total loss: [1m[32m0.34474[0m[0m | time: 0.043s
| Adam | epoch: 120 | loss: 0.34474 - acc: 0.8699 -- iter: 1152/1176
Training Step: 4440  | total loss: [1m[32m0.36853[0m[0m | time: 0.044s
| Adam | epoch: 120 | loss: 0.36853 - acc: 0.8579 -- iter: 1176/1176
--
      emp_identifier  turnover_percent
0             1001.0         70.591393
1             1002.0         86.407455
2             1003.0         79.107384
3             1004.0         81.797485
4             1005.0         85.845879
...              ...               ...
1465          2466.0         92.981216
1466          2467.0         94.278839
1467          2468.0         98.448738
1468          2469.0         92.799294
1469          2470.0         84.638916

[1470 rows x 2 columns]
X_test, y_test Accuracy: 83.3333%
