In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import functions as F
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.types import *
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator




 

from pyspark.ml import Pipeline
import warnings
warnings.simplefilter(action='ignore')

#Encoding all categorical features
from pyspark.ml.feature import  StringIndexer, VectorAssembler,  VectorIndexer



In [2]:
spark = (SparkSession.builder.appName("Akshi_capstone_ml")\
        .config("hive.metastore.uris","thrift://ip-10-1-2-24.ap-south-1.compute.internal:9083")\
        .enableHiveSupport().getOrCreate())
spark

In [3]:
spark.sql("use akshialabs").show()
spark.sql("show tables").show()

++
||
++
++

+----------+-----------------+-----------+
|  database|        tableName|isTemporary|
+----------+-----------------+-----------+
|akshialabs|              clv|      false|
|akshialabs|          default|      false|
|akshialabs|      departments|      false|
|akshialabs|         dept_emp|      false|
|akshialabs|     dept_manager|      false|
|akshialabs|         duration|      false|
|akshialabs|        employees|      false|
|akshialabs|            grace|      false|
|akshialabs|             hvac|      false|
|akshialabs|    hvac_building|      false|
|akshialabs|       hvac_final|      false|
|akshialabs|hvac_temperatures|      false|
|akshialabs|         kweblist|      false|
|akshialabs|            late1|      false|
|akshialabs|            late2|      false|
|akshialabs|           latlon|      false|
|akshialabs|  left_emp_tenure|      false|
|akshialabs|         loandata|      false|
|akshialabs|   migrate_status|      false|
|akshialabs|               nt|      false

In [4]:
Employee_data = spark.sql("select t1.dept_name,t2.dept_no,t3.birth_date,t3.emp_no,t3.emp_title_id,t3.first_name,t3.hire_date,t3.last_date,t3.last_name,t3.last_performance_rating,t3.left_,t3.no_of_projects,t3.sex,t4.salary,t5.title from  departments t1 inner join  dept_emp t2 on t1.dept_no = t2.dept_no inner join employees t3 on t2.emp_no = t3.emp_no inner join salaries t4 on t4.emp_no = t3.emp_no inner join titles t5 on t5.title_id = t3.emp_title_id")

In [5]:
Employee_data.show()
 

+------------------+-------+----------+------+------------+----------+----------+----------+---------+-----------------------+-----+--------------+---+------+----------------+
|         dept_name|dept_no|birth_date|emp_no|emp_title_id|first_name| hire_date| last_date|last_name|last_performance_rating|left_|no_of_projects|sex|salary|           title|
+------------------+-------+----------+------+------------+----------+----------+----------+---------+-----------------------+-----+--------------+---+------+----------------+
|  Iwayama|                      A|    0|             8|  F| 40000|Technique Leader|
|  dAstous|                      B|    0|             9|  M| 40000|    Senior Staff|
|   Danlos|                      A|    0|             2|  F| 60268| Senior Engineer|
|   Danlos|                      A|    0|             2|  F| 60268| Senior Engineer|
|  Rullman|                      C|    0|             6|  F| 54602|           Staff|
|  Rullman|                      C|    0|      

In [6]:
Employee_data.columns

['dept_name',
 'dept_no',
 'birth_date',
 'emp_no',
 'emp_title_id',
 'first_name',
 'hire_date',
 'last_date',
 'last_name',
 'last_performance_rating',
 'left_',
 'no_of_projects',
 'sex',
 'salary',
 'title']

In [7]:
final = Employee_data
for col in Employee_data.columns:
 final = Employee_data.withColumnRenamed(col,col.replace(" ", "_"))

In [8]:
final.show()

+------------------+-------+----------+------+------------+----------+----------+----------+---------+-----------------------+-----+--------------+---+------+----------------+
|         dept_name|dept_no|birth_date|emp_no|emp_title_id|first_name| hire_date| last_date|last_name|last_performance_rating|left_|no_of_projects|sex|salary|           title|
+------------------+-------+----------+------+------------+----------+----------+----------+---------+-----------------------+-----+--------------+---+------+----------------+
|  Iwayama|                      A|    0|             8|  F| 40000|Technique Leader|
|  dAstous|                      B|    0|             9|  M| 40000|    Senior Staff|
|   Danlos|                      A|    0|             2|  F| 60268| Senior Engineer|
|   Danlos|                      A|    0|             2|  F| 60268| Senior Engineer|
|  Rullman|                      C|    0|             6|  F| 54602|           Staff|
|  Rullman|                      C|    0|      

In [10]:
final.createTempView("final_")

In [11]:
spark.sql('select distinct left_ from final_').show()

final = final.withColumn("no_of_projects", final.no_of_projects.cast('int'))
final = final.withColumn("salary", final.no_of_projects.cast('int'))
final = final.withColumn("left_", final.left_.cast('int'))

+-----+
|left_|
+-----+
|    1|
|    0|
+-----+



In [12]:
final.printSchema()

root
 |-- dept_name: string (nullable = true)
 |-- dept_no: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- emp_no: integer (nullable = true)
 |-- emp_title_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- hire_date: string (nullable = true)
 |-- last_date: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- last_performance_rating: string (nullable = true)
 |-- left_: integer (nullable = true)
 |-- no_of_projects: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- title: string (nullable = true)



In [13]:
continuous_features = [
 'no_of_projects',
 'salary']

In [14]:
final.show()

+------------------+-------+----------+------+------------+----------+----------+----------+---------+-----------------------+-----+--------------+---+------+----------------+
|         dept_name|dept_no|birth_date|emp_no|emp_title_id|first_name| hire_date| last_date|last_name|last_performance_rating|left_|no_of_projects|sex|salary|           title|
+------------------+-------+----------+------+------------+----------+----------+----------+---------+-----------------------+-----+--------------+---+------+----------------+
|  Iwayama|                      A|    0|             8|  F|     8|Technique Leader|
|  dAstous|                      B|    0|             9|  M|     9|    Senior Staff|
|   Danlos|                      A|    0|             2|  F|     2| Senior Engineer|
|   Danlos|                      A|    0|             2|  F|     2| Senior Engineer|
|  Rullman|                      C|    0|             6|  F|     6|           Staff|
|  Rullman|                      C|    0|      

In [15]:
categorical_features = ['dept_name',
 'last_performance_rating',
 'sex',
 'title']

In [16]:
y = ['left_']

In [17]:
# create object of StringIndexer class and specify input and output column
SI_dept_name = StringIndexer(inputCol='dept_name',outputCol='dept_name_Index')
SI_last_performance_rating = StringIndexer(inputCol='last_performance_rating',outputCol='last_performance_rating_Index')
SI_sex = StringIndexer(inputCol='sex',outputCol='sex_Index')
SI_title = StringIndexer(inputCol='title',outputCol='title_Index')

In [18]:
# transform the data
final = SI_dept_name.fit(final).transform(final)
final = SI_last_performance_rating.fit(final).transform(final)
final = SI_sex.fit(final).transform(final)
final = SI_title.fit(final).transform(final)

In [19]:
final.show()

+------------------+-------+----------+------+------------+----------+----------+----------+---------+-----------------------+-----+--------------+---+------+----------------+---------------+-----------------------------+---------+-----------+
|         dept_name|dept_no|birth_date|emp_no|emp_title_id|first_name| hire_date| last_date|last_name|last_performance_rating|left_|no_of_projects|sex|salary|           title|dept_name_Index|last_performance_rating_Index|sex_Index|title_Index|
+------------------+-------+----------+------+------------+----------+----------+----------+---------+-----------------------+-----+--------------+---+------+----------------+---------------+-----------------------------+---------+-----------+
|  Iwayama|                      A|    0|             8|  F|     8|Technique Leader|            0.0|                          1.0|      1.0|        4.0|
|  dAstous|                      B|    0|             9|  M|     9|    Senior Staff|            7.0|               

In [20]:
assesmble=VectorAssembler(inputCols=['no_of_projects',
 'salary',
 'dept_name_Index',
 'last_performance_rating_Index',
 'sex_Index',
 'title_Index'],outputCol='features')

In [21]:
final_data=assesmble.transform(final)

final_data.show()

+------------------+-------+----------+------+------------+----------+----------+----------+---------+-----------------------+-----+--------------+---+------+----------------+---------------+-----------------------------+---------+-----------+--------------------+
|         dept_name|dept_no|birth_date|emp_no|emp_title_id|first_name| hire_date| last_date|last_name|last_performance_rating|left_|no_of_projects|sex|salary|           title|dept_name_Index|last_performance_rating_Index|sex_Index|title_Index|            features|
+------------------+-------+----------+------+------------+----------+----------+----------+---------+-----------------------+-----+--------------+---+------+----------------+---------------+-----------------------------+---------+-----------+--------------------+
|  Iwayama|                      A|    0|             8|  F|     8|Technique Leader|            0.0|                          1.0|      1.0|        4.0|[8.0,8.0,0.0,1.0,...|
|  dAstous|                    

In [22]:
df=final_data.select('features','left_')

df.printSchema()

(train, test) = df.randomSplit([.7,.3])

train.show(2)


root
 |-- features: vector (nullable = true)
 |-- left_: integer (nullable = true)

+-------------------+-----+
|           features|left_|
+-------------------+-----+
|(6,[0,1],[1.0,1.0])|    0|
|(6,[0,1],[1.0,1.0])|    0|
+-------------------+-----+
only showing top 2 rows



In [24]:
#  RandomForestClassifier

rf = RandomForestClassifier(labelCol='left_', 
                            featuresCol='features',
                            maxDepth=5)

model = rf.fit(train)

rf_predictions = model.transform(test)

In [25]:
multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'left_', metricName = 'accuracy')

print('Random Forest classifier Accuracy:', multi_evaluator.evaluate(rf_predictions))


Random Forest classifier Accuracy: 0.9006830422964665


In [26]:
#Create Pipeline

In [27]:
continuous_features

['no_of_projects', 'salary']

In [28]:
categorical_features

['dept_name', 'last_performance_rating', 'sex', 'title']

In [29]:
# Vector Assembler
inputCols=['no_of_projects',
 'salary',
 'dept_name_Index',
 'last_performance_rating_Index',
 'sex_Index',
 'title_Index']


In [30]:
# String Indexer
indexer =   [StringIndexer(inputCol=c, outputCol="{}_Index".format(c) ) for c in categorical_features]


In [31]:

# Vector Assembler
assembler = VectorAssembler(inputCols = inputCols, outputCol = "features")

In [32]:
# ML Model
rfm = RandomForestClassifier(featuresCol="features",
                              labelCol="label",
                              numTrees=50,
                              maxDepth=5,
                              featureSubsetStrategy='onethird')

# Creating Pipeline
pipeline1 = Pipeline( stages= [indexer, assembler, rfm])