In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null 
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [4]:
!ls /usr/lib/jvm/     

default-java		   java-11-openjdk-amd64     java-8-openjdk-amd64
java-1.11.0-openjdk-amd64  java-1.8.0-openjdk-amd64


In [5]:
import os
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"]="/content/spark-3.1.1-bin-hadoop2.7"

In [6]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
#spark=SparkSession.builder.master("local[*]").getOrCreate()

from pyspark import SparkContext
sc = SparkContext("local", "First App")

In [7]:
import sys,tempfile,urllib
from pyspark.sql.functions import *
from pyspark.sql import *

In [8]:
!git clone "https://github.com/chetankumar1996/hr-analytics-using-spark.git"

fatal: destination path 'hr-analytics-using-spark' already exists and is not an empty directory.


In [9]:
!ls hr-analytics-using-spark

aug_train.csv  README.md


In [10]:
sqlContext = SQLContext(sc)
train_df=sqlContext.read.option("header",True)\
                   .option("inferSchema",True)\
                   .csv("hr-analytics-using-spark/aug_train.csv")

In [11]:
train_df.show(30)

+-----------+--------+----------------------+------+--------------------+-------------------+---------------+----------------+----------+------------+--------------+------------+--------------+------+
|enrollee_id|    city|city_development_index|gender| relevent_experience|enrolled_university|education_level|major_discipline|experience|company_size|  company_type|last_new_job|training_hours|target|
+-----------+--------+----------------------+------+--------------------+-------------------+---------------+----------------+----------+------------+--------------+------------+--------------+------+
|       8949|city_103|                  0.92|  Male|Has relevent expe...|      no_enrollment|       Graduate|            STEM|       >20|        null|          null|           1|            36|   1.0|
|      29725| city_40|    0.7759999999999999|  Male|No relevent exper...|      no_enrollment|       Graduate|            STEM|        15|       50-99|       Pvt Ltd|          >4|            47|   

In [12]:
train_df.count()

19158

In [13]:
train_df.printSchema()

root
 |-- enrollee_id: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- city_development_index: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- relevent_experience: string (nullable = true)
 |-- enrolled_university: string (nullable = true)
 |-- education_level: string (nullable = true)
 |-- major_discipline: string (nullable = true)
 |-- experience: string (nullable = true)
 |-- company_size: string (nullable = true)
 |-- company_type: string (nullable = true)
 |-- last_new_job: string (nullable = true)
 |-- training_hours: integer (nullable = true)
 |-- target: double (nullable = true)



In [14]:
train_df.describe().show()

+-------+------------------+-------+----------------------+------+--------------------+-------------------+---------------+----------------+-----------------+------------+-------------------+------------------+-----------------+-------------------+
|summary|       enrollee_id|   city|city_development_index|gender| relevent_experience|enrolled_university|education_level|major_discipline|       experience|company_size|       company_type|      last_new_job|   training_hours|             target|
+-------+------------------+-------+----------------------+------+--------------------+-------------------+---------------+----------------+-----------------+------------+-------------------+------------------+-----------------+-------------------+
|  count|             19158|  19158|                 19158| 14650|               19158|              18772|          18698|           16345|            19093|       13220|              13018|             18735|            19158|              19158|
|   

Cleaning

"gender","relevent_experience","enrolled_university","education_level","last_new_job","company_type","company_size","experience","major_discipline"

In [15]:
 train_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in train_df.columns]).show()

+-----------+----+----------------------+------+-------------------+-------------------+---------------+----------------+----------+------------+------------+------------+--------------+------+
|enrollee_id|city|city_development_index|gender|relevent_experience|enrolled_university|education_level|major_discipline|experience|company_size|company_type|last_new_job|training_hours|target|
+-----------+----+----------------------+------+-------------------+-------------------+---------------+----------------+----------+------------+------------+------------+--------------+------+
|          0|   0|                     0|  4508|                  0|                386|            460|            2813|        65|        5938|        6140|         423|             0|     0|
+-----------+----+----------------------+------+-------------------+-------------------+---------------+----------------+----------+------------+------------+------------+--------------+------+



In [16]:
train_df.select("training_hours").distinct().show()

+--------------+
|training_hours|
+--------------+
|           148|
|            31|
|            85|
|            65|
|            53|
|           133|
|           322|
|            78|
|           108|
|           155|
|            34|
|           101|
|           126|
|            81|
|            28|
|           210|
|           300|
|            76|
|            26|
|            27|
+--------------+
only showing top 20 rows



In [17]:
train_df1=train_df.withColumn("experience",regexp_replace("experience", ">", ""))

In [18]:
train_df1=train_df1.withColumn("experience",regexp_replace("experience", "<", ""))

In [19]:
train_df1.select("experience").distinct().show()

+----------+
|experience|
+----------+
|         7|
|        15|
|        11|
|         3|
|         8|
|        16|
|      null|
|         5|
|        18|
|        17|
|         6|
|        19|
|         9|
|         1|
|        20|
|        10|
|         4|
|        12|
|        13|
|        14|
+----------+
only showing top 20 rows



In [20]:
train_df1.select("last_new_job").distinct().show()

+------------+
|last_new_job|
+------------+
|           3|
|        null|
|       never|
|           1|
|           4|
|          >4|
|           2|
+------------+



In [21]:
train_df1=train_df1.withColumn("last_new_job",regexp_replace("last_new_job", "never", "0"))
train_df1=train_df1.withColumn("last_new_job",regexp_replace("last_new_job", ">", ""))

In [22]:
train_df1.select("company_size").distinct().show()

+------------+
|company_size|
+------------+
|        null|
|       10/49|
|     100-500|
|       50-99|
|         <10|
|     500-999|
|   5000-9999|
|      10000+|
|   1000-4999|
+------------+



In [23]:
train_df1=train_df1.withColumn("company_size",regexp_replace("company_size", "<", "0-"))
train_df1=train_df1.withColumn("company_size",regexp_replace("company_size", "/", "-"))

In [24]:
train_df1.groupby("gender")\
        .count()\
        .show()

+------+-----+
|gender|count|
+------+-----+
|  null| 4508|
|Female| 1238|
| Other|  191|
|  Male|13221|
+------+-----+



In [25]:
train_df1.groupby("major_discipline")\
        .count()\
        .show()

+----------------+-----+
|major_discipline|count|
+----------------+-----+
| Business Degree|  327|
|            null| 2813|
|      Humanities|  669|
|        No Major|  223|
|           Other|  381|
|            STEM|14492|
|            Arts|  253|
+----------------+-----+



In [26]:
train_df1.groupby("last_new_job")\
        .count()\
        .show()

+------------+-----+
|last_new_job|count|
+------------+-----+
|           3| 1024|
|           0| 2452|
|        null|  423|
|           1| 8040|
|           4| 4319|
|           2| 2900|
+------------+-----+



In [27]:
train_df1.sort("city_development_index").show()
##.filter(train_df1.company_size.isNull() & train_df1.company_type.isNull()).show()

+-----------+--------+----------------------+------+--------------------+-------------------+---------------+----------------+----------+------------+-------------+------------+--------------+------+
|enrollee_id|    city|city_development_index|gender| relevent_experience|enrolled_university|education_level|major_discipline|experience|company_size| company_type|last_new_job|training_hours|target|
+-----------+--------+----------------------+------+--------------------+-------------------+---------------+----------------+----------+------------+-------------+------------+--------------+------+
|      27970| city_33|   0.44799999999999995|  Male|No relevent exper...|      no_enrollment|       Graduate|            STEM|         1|        null|         null|           0|            73|   1.0|
|      31179| city_33|   0.44799999999999995|  Male|Has relevent expe...|      no_enrollment|       Graduate|            STEM|         7|       50-99|      Pvt Ltd|           1|            28|   0.0|


In [28]:
train_df1 = train_df1.withColumn("experience", train_df1["experience"].cast("integer"))
train_df1 = train_df1.withColumn("last_new_job", train_df1["last_new_job"].cast("integer"))

In [29]:
train_df1.stat.corr("city_development_index","experience")

0.3336409683569583

In [30]:
from pyspark.sql.window import Window
window = Window.partitionBy('enrollee_id')\
               .orderBy('city_development_index')

filled_column = last(train_df1['company_type']).over(window)

train_df2 = train_df1.withColumn('company_type', filled_column)

In [31]:
train_df2.show()

+-----------+--------+----------------------+------+--------------------+-------------------+---------------+----------------+----------+------------+--------------+------------+--------------+------+
|enrollee_id|    city|city_development_index|gender| relevent_experience|enrolled_university|education_level|major_discipline|experience|company_size|  company_type|last_new_job|training_hours|target|
+-----------+--------+----------------------+------+--------------------+-------------------+---------------+----------------+----------+------------+--------------+------------+--------------+------+
|        148| city_41|    0.8270000000000001|  null|Has relevent expe...|      no_enrollment|       Graduate|            STEM|        13|     100-500|       Pvt Ltd|           1|            52|   0.0|
|        463| city_16|                  0.91|  Male|Has relevent expe...|      no_enrollment|       Graduate|            STEM|        15|        null|          null|           2|             9|   

In [32]:
columns_to_drop = ['company_size','company_type','enrollee_id']
train_df2 = train_df1.drop(*columns_to_drop)

In [33]:
train_df2.printSchema()

root
 |-- city: string (nullable = true)
 |-- city_development_index: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- relevent_experience: string (nullable = true)
 |-- enrolled_university: string (nullable = true)
 |-- education_level: string (nullable = true)
 |-- major_discipline: string (nullable = true)
 |-- experience: integer (nullable = true)
 |-- last_new_job: integer (nullable = true)
 |-- training_hours: integer (nullable = true)
 |-- target: double (nullable = true)



In [34]:
train_df2= train_df2.na.fill(value="Female",subset=["gender"])

In [35]:
train_df2=train_df2.na.drop(subset=["major_discipline"])

In [36]:
mode =train_df1.groupby("enrolled_university").count().orderBy("count", ascending=False).first()[0]
train_df2=train_df2.na.fill(value=mode,subset=["enrolled_university"])

In [37]:
df_stats = train_df1.select(
    avg(col('experience')).alias('mean'),
    avg(col('last_new_job')).alias('avg')
).collect()

In [38]:
mean = df_stats[0]['mean']

In [39]:
mean2=df_stats[0]['avg']

In [40]:
train_df2=train_df2.na.fill(value=mean,subset=["experience"])
train_df2=train_df2.na.fill(value=mean2,subset=["last_new_job"])

In [41]:
train_df2.show()

+--------+----------------------+------+--------------------+-------------------+---------------+----------------+----------+------------+--------------+------+
|    city|city_development_index|gender| relevent_experience|enrolled_university|education_level|major_discipline|experience|last_new_job|training_hours|target|
+--------+----------------------+------+--------------------+-------------------+---------------+----------------+----------+------------+--------------+------+
|city_103|                  0.92|  Male|Has relevent expe...|      no_enrollment|       Graduate|            STEM|        20|           1|            36|   1.0|
| city_40|    0.7759999999999999|  Male|No relevent exper...|      no_enrollment|       Graduate|            STEM|        15|           4|            47|   0.0|
| city_21|                 0.624|Female|No relevent exper...|   Full time course|       Graduate|            STEM|         5|           0|            83|   0.0|
|city_115|                 0.789|F

In [42]:
from pyspark.ml.feature import OneHotEncoder,StringIndexer,VectorAssembler

In [43]:
categoricalColumns = ["city","gender","relevent_experience","enrolled_university","education_level","major_discipline"]
stages = []

In [44]:
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [45]:
label_stringIdx = StringIndexer(inputCol = 'target', outputCol = 'label')
stages += [label_stringIdx]

In [46]:
numericCols = ["experience","last_new_job","training_hours","city_development_index"]
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [47]:
stages

[StringIndexer_089ac286a3ab,
 OneHotEncoder_3660f8b2d3bd,
 StringIndexer_7db43d6b9529,
 OneHotEncoder_27108c2f204e,
 StringIndexer_7a527e6bfb6f,
 OneHotEncoder_a57b919d1179,
 StringIndexer_5fe62a350fd9,
 OneHotEncoder_929ef3ab3a4f,
 StringIndexer_32a4d77b79fd,
 OneHotEncoder_63e623535ddb,
 StringIndexer_5394ec7ba2ed,
 OneHotEncoder_b93c1c6a58c2,
 StringIndexer_2f0a6cacd31d,
 VectorAssembler_74befe2719f0]

In [48]:
from pyspark.ml import Pipeline

In [49]:
df_cols=train_df2.select("city","city_development_index","gender","relevent_experience","enrolled_university","education_level","major_discipline","experience","last_new_job","training_hours","target")
cols=df_cols.columns
df_cols.printSchema()

root
 |-- city: string (nullable = true)
 |-- city_development_index: double (nullable = true)
 |-- gender: string (nullable = false)
 |-- relevent_experience: string (nullable = true)
 |-- enrolled_university: string (nullable = false)
 |-- education_level: string (nullable = true)
 |-- major_discipline: string (nullable = true)
 |-- experience: integer (nullable = true)
 |-- last_new_job: integer (nullable = true)
 |-- training_hours: integer (nullable = true)
 |-- target: double (nullable = true)



In [50]:
from pyspark.ml.classification import DecisionTreeClassifier

In [51]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df_cols)
df = pipelineModel.transform(df_cols)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- city: string (nullable = true)
 |-- city_development_index: double (nullable = true)
 |-- gender: string (nullable = false)
 |-- relevent_experience: string (nullable = true)
 |-- enrolled_university: string (nullable = false)
 |-- education_level: string (nullable = true)
 |-- major_discipline: string (nullable = true)
 |-- experience: integer (nullable = true)
 |-- last_new_job: integer (nullable = true)
 |-- training_hours: integer (nullable = true)
 |-- target: double (nullable = true)



In [52]:
(trainingData, testData) = df.randomSplit([0.7, 0.3])
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 11508
Test Dataset Count: 4837


In [134]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label')
dtModel = dt.fit(trainingData)
predictions = dtModel.transform(testData)

In [135]:
tp = predictions[(predictions.label == 1) & (predictions.prediction == 1)].count()
tn = predictions[(predictions.label == 0) & (predictions.prediction == 0)].count()
fp = predictions[(predictions.label == 0) & (predictions.prediction == 1)].count()
fn = predictions[(predictions.label == 1) & (predictions.prediction == 0)].count()
print("True Positive: ",tp,"\nTrue Negative: ",tn,"\nFalse Positive: ",fp,"\nFalse Negative: ",fn)
print("Recall: ",tp/(tp+fn))
print("Precision: ", tp/(tp+fp))

True Positive:  538 
True Negative:  3182 
False Positive:  361 
False Negative:  756
Recall:  0.41576506955177744
Precision:  0.5984427141268076


In [137]:
predictions.groupby("prediction")\
        .count()\
        .show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 3938|
|       1.0|  899|
+----------+-----+



In [174]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label',maxIter=1000)
lrModel = lr.fit(train)

In [175]:
predict = lrModel.transform(test)

In [176]:
tp = predict[(predict.label == 1) & (predict.prediction == 1)].count()
tn = predict[(predict.label == 0) & (predict.prediction == 0)].count()
fp = predict[(predict.label == 0) & (predict.prediction == 1)].count()
fn = predict[(predict.label == 1) & (predict.prediction == 0)].count()
print("True Positive: ",tp,"\nTrue Negative: ",tn,"\nFalse Positive: ",fp,"\nFalse Negative: ",fn)
print("Recall: ",tp/(tp+fn))
print("Precision: ", tp/(tp+fp))

True Positive:  326 
True Negative:  2173 
False Positive:  250 
False Negative:  488
Recall:  0.4004914004914005
Precision:  0.5659722222222222


In [177]:
predict.groupby("prediction")\
        .count()\
        .show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 2661|
|       1.0|  576|
+----------+-----+



In [171]:
from pyspark.ml.classification import RandomForestClassifier
RFC = RandomForestClassifier(featuresCol = 'features', labelCol = 'label', numTrees=7,maxDepth=7,seed=52)
RFCmodel = dt.fit(train)
pred = RFCmodel.transform(test)

In [172]:
tp = pred[(pred.label == 1) & (pred.prediction == 1)].count()
tn = pred[(pred.label == 0) & (pred.prediction == 0)].count()
fp = pred[(pred.label == 0) & (pred.prediction == 1)].count()
fn = pred[(pred.label == 1) & (pred.prediction == 0)].count()
print("True Positive: ",tp,"\nTrue Negative: ",tn,"\nFalse Positive: ",fp,"\nFalse Negative: ",fn)
print("Recall: ",tp/(tp+fn))
print("Precision: ", tp/(tp+fp))

True Positive:  342 
True Negative:  2149 
False Positive:  274 
False Negative:  472
Recall:  0.4201474201474201
Precision:  0.5551948051948052


In [173]:
trainingData.groupby("target")\
        .count()\
        .show()

+------+-----+
|target|count|
+------+-----+
|   0.0| 8574|
|   1.0| 2934|
+------+-----+



In [168]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(featuresCol = 'features', labelCol = 'label', maxIter=100,maxDepth=6)
model = gbt.fit(train)
pred1 = model.transform(test)

In [169]:
pred1.groupby("prediction")\
        .count()\
        .show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 2579|
|       1.0|  658|
+----------+-----+



In [170]:
tp = pred1[(pred1.label == 1) & (pred1.prediction == 1)].count()
tn = pred1[(pred1.label == 0) & (pred1.prediction == 0)].count()
fp = pred1[(pred1.label == 0) & (pred1.prediction == 1)].count()
fn = pred1[(pred1.label == 1) & (pred1.prediction == 0)].count()
print("True Positive: ",tp,"\nTrue Negative: ",tn,"\nFalse Positive: ",fp,"\nFalse Negative: ",fn)
print("Recall: ",tp/(tp+fn))
print("Precision: ", tp/(tp+fp))

True Positive:  368 
True Negative:  2133 
False Positive:  290 
False Negative:  446
Recall:  0.4520884520884521
Precision:  0.5592705167173252


In [160]:
zeros = df.filter(df["Target"]==0)


In [167]:
ones = df.filter(df["Target"]==1)
# split datasets into training and testing
train0, test0 = zeros.randomSplit([0.8,0.2], seed=1234)
train1, test1 = ones.randomSplit([0.8,0.2], seed=1234)
# stack datasets back together
train = train0.union(train1)
test = test0.union(test1)

In [162]:
zeros=zeros.take(2934)

AttributeError: ignored

In [None]:
zeros.toDF("label","features","city","city_development_index","gender","relevent_experience","enrolled_university","education_level","major_discipline","experience","last_new_job","training_hours","target").collect()

In [166]:
zeros

DataFrame[label: double, features: vector, city: string, city_development_index: double, gender: string, relevent_experience: string, enrolled_university: string, education_level: string, major_discipline: string, experience: int, last_new_job: int, training_hours: int, target: double]