In [1]:
# important: please modify the path as your local spark location 
spark_path = "D:/spark-2.3.1-bin-hadoop2.7"

In [2]:
# configure spark, a message regrads time will print out if success
import findspark
from datetime import datetime
findspark.init("D:/spark-2.3.1-bin-hadoop2.7")
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("ibm_hr_final")
sc = SparkContext(conf = conf)
print("spark is well set at " + str(datetime.now()))

spark is well set at 2018-08-01 12:01:52.591993


In [3]:
# open spark session
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)
spark

In [4]:
# load dataset
ibm_hr = spark.read.csv("../data/WA_Fn-UseC_-HR-Employee-Attrition.csv", header = True, mode = "DROPMALFORMED")
ibm_hr.count()

1470

In [5]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
# define function to transform boolean
def bool_to_int(b):
    if b == "Yes":
        return 1
    else:
        return 0
# register user defined function with spark SQL
udf_bool_to_int = udf(bool_to_int, IntegerType())
# add column
ibm_hr = ibm_hr.withColumn("Attrition_numerical", udf_bool_to_int("Attrition"))
# check the result
ibm_hr.select("Attrition", "Attrition_numerical").show(3)

+---------+-------------------+
|Attrition|Attrition_numerical|
+---------+-------------------+
|      Yes|                  1|
|       No|                  0|
|      Yes|                  1|
+---------+-------------------+
only showing top 3 rows



In [7]:
ibm_hr = ibm_hr.drop("Attrition")
ibm_hr.count(), len(ibm_hr.columns)

(1470, 35)

In [8]:
categorical = ['BusinessTravel', 'Department', 'EducationField', 'Gender', 'JobRole', 'MaritalStatus', 'OverTime']
numerical = [u'Age', u'DailyRate', u'DistanceFromHome', u'Education', u'EmployeeNumber', u'EnvironmentSatisfaction', \
             u'HourlyRate', u'JobInvolvement', u'JobLevel', u'JobSatisfaction', \
             u'MonthlyIncome', u'MonthlyRate', u'NumCompaniesWorked', \
             u'PercentSalaryHike', u'PerformanceRating', u'RelationshipSatisfaction', \
             u'StockOptionLevel', u'TotalWorkingYears', \
             u'TrainingTimesLastYear', u'WorkLifeBalance', u'YearsAtCompany', \
             u'YearsInCurrentRole', u'YearsSinceLastPromotion', u'YearsWithCurrManager']
len(categorical), len(numerical)

(7, 24)

In [9]:
# use pandas to get_dummies
import pandas as pd
pd_cat = pd.get_dummies(ibm_hr.select(categorical).toPandas())
pd_cat.head(3)

Unnamed: 0,BusinessTravel_Non-Travel,BusinessTravel_Travel_Frequently,BusinessTravel_Travel_Rarely,Department_Human Resources,Department_Research & Development,Department_Sales,EducationField_Human Resources,EducationField_Life Sciences,EducationField_Marketing,EducationField_Medical,...,JobRole_Manufacturing Director,JobRole_Research Director,JobRole_Research Scientist,JobRole_Sales Executive,JobRole_Sales Representative,MaritalStatus_Divorced,MaritalStatus_Married,MaritalStatus_Single,OverTime_No,OverTime_Yes
0,0,0,1,0,0,1,0,1,0,0,...,0,0,0,1,0,0,0,1,0,1
1,0,1,0,0,1,0,0,1,0,0,...,0,0,1,0,0,0,1,0,1,0
2,0,0,1,0,1,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,1


In [13]:
pd_att = ibm_hr.select("Attrition_numerical").toPandas()
pd_att.head(3)

Unnamed: 0,Attrition_numerical
0,1
1,0
2,1


In [14]:
pd_num = ibm_hr.select(numerical).toPandas()
pd_num.head(3)

Unnamed: 0,Age,DailyRate,DistanceFromHome,Education,EmployeeNumber,EnvironmentSatisfaction,HourlyRate,JobInvolvement,JobLevel,JobSatisfaction,...,PerformanceRating,RelationshipSatisfaction,StockOptionLevel,TotalWorkingYears,TrainingTimesLastYear,WorkLifeBalance,YearsAtCompany,YearsInCurrentRole,YearsSinceLastPromotion,YearsWithCurrManager
0,41,1102,1,2,1,2,94,3,2,4,...,3,1,0,8,0,1,6,4,0,5
1,49,279,8,1,2,3,61,2,2,2,...,4,4,1,10,3,3,10,7,1,7
2,37,1373,2,2,4,4,92,2,1,3,...,3,2,0,7,3,3,0,0,0,0


In [25]:
pd_final = pd.concat([pd_num, pd_cat, pd_att], axis=1)
pd_final = pd_final.astype(int)
pd_final.dtypes

Age                                  int32
DailyRate                            int32
DistanceFromHome                     int32
Education                            int32
EmployeeNumber                       int32
EnvironmentSatisfaction              int32
HourlyRate                           int32
JobInvolvement                       int32
JobLevel                             int32
JobSatisfaction                      int32
MonthlyIncome                        int32
MonthlyRate                          int32
NumCompaniesWorked                   int32
PercentSalaryHike                    int32
PerformanceRating                    int32
RelationshipSatisfaction             int32
StockOptionLevel                     int32
TotalWorkingYears                    int32
TrainingTimesLastYear                int32
WorkLifeBalance                      int32
YearsAtCompany                       int32
YearsInCurrentRole                   int32
YearsSinceLastPromotion              int32
YearsWithCu

In [27]:
ibm_hr_final = spark.createDataFrame(pd_final)
for c in ibm_hr_final.columns:
    ibm_hr_final = ibm_hr_final.withColumn(c, ibm_hr_final[c].cast(IntegerType()))
print("The number of rows in final df is: " + str(ibm_hr_final.count()))
ibm_hr_final.printSchema()

The number of rows in final df is: 1470
root
 |-- Age: integer (nullable = true)
 |-- DailyRate: integer (nullable = true)
 |-- DistanceFromHome: integer (nullable = true)
 |-- Education: integer (nullable = true)
 |-- EmployeeNumber: integer (nullable = true)
 |-- EnvironmentSatisfaction: integer (nullable = true)
 |-- HourlyRate: integer (nullable = true)
 |-- JobInvolvement: integer (nullable = true)
 |-- JobLevel: integer (nullable = true)
 |-- JobSatisfaction: integer (nullable = true)
 |-- MonthlyIncome: integer (nullable = true)
 |-- MonthlyRate: integer (nullable = true)
 |-- NumCompaniesWorked: integer (nullable = true)
 |-- PercentSalaryHike: integer (nullable = true)
 |-- PerformanceRating: integer (nullable = true)
 |-- RelationshipSatisfaction: integer (nullable = true)
 |-- StockOptionLevel: integer (nullable = true)
 |-- TotalWorkingYears: integer (nullable = true)
 |-- TrainingTimesLastYear: integer (nullable = true)
 |-- WorkLifeBalance: integer (nullable = true)
 |-- 

In [28]:
feature_col = ibm_hr_final.columns
feature_col.remove("Attrition_numerical")
len(feature_col), len(ibm_hr_final.columns)

(52, 53)

In [30]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer
assembler = VectorAssembler(inputCols = feature_col, outputCol = "features")
assembled = assembler.transform(ibm_hr_final)
(training_data, testing_data) = assembled.randomSplit([0.8, 0.2], seed = 13234)
training_data.count(), testing_data.count()

(1176, 294)

In [31]:
dt = DecisionTreeClassifier(labelCol = "Attrition_numerical", featuresCol = "features", \
                           maxDepth = 7, minInstancesPerNode = 20, impurity = "gini")

In [32]:
pipeline = Pipeline(stages = [dt])
model = pipeline.fit(training_data)

In [34]:
predictions = model.transform(testing_data)

In [37]:
predictions

DataFrame[Age: int, DailyRate: int, DistanceFromHome: int, Education: int, EmployeeNumber: int, EnvironmentSatisfaction: int, HourlyRate: int, JobInvolvement: int, JobLevel: int, JobSatisfaction: int, MonthlyIncome: int, MonthlyRate: int, NumCompaniesWorked: int, PercentSalaryHike: int, PerformanceRating: int, RelationshipSatisfaction: int, StockOptionLevel: int, TotalWorkingYears: int, TrainingTimesLastYear: int, WorkLifeBalance: int, YearsAtCompany: int, YearsInCurrentRole: int, YearsSinceLastPromotion: int, YearsWithCurrManager: int, BusinessTravel_Non-Travel: int, BusinessTravel_Travel_Frequently: int, BusinessTravel_Travel_Rarely: int, Department_Human Resources: int, Department_Research & Development: int, Department_Sales: int, EducationField_Human Resources: int, EducationField_Life Sciences: int, EducationField_Marketing: int, EducationField_Medical: int, EducationField_Other: int, EducationField_Technical Degree: int, Gender_Female: int, Gender_Male: int, JobRole_Healthcare R

In [39]:
predictions.select("prediction", "Attrition_numerical").show(10)

+----------+-------------------+
|prediction|Attrition_numerical|
+----------+-------------------+
|       0.0|                  0|
|       1.0|                  1|
|       1.0|                  1|
|       1.0|                  1|
|       0.0|                  1|
|       0.0|                  0|
|       0.0|                  0|
|       0.0|                  0|
|       0.0|                  0|
|       1.0|                  1|
+----------+-------------------+
only showing top 10 rows



In [44]:
correct = predictions.filter(predictions["prediction"] == predictions["Attrition_numerical"]) \
    .select("prediction", "Attrition_numerical").count()
total = predictions.count()
print("The precision is: "+"{:.2%}".format(correct / total))

The precision is: 82.99%
