In [40]:

  
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
import scipy
import os
os.environ["SPARK_HOME"] = "C:\\spark-2.3.1-bin-hadoop2.7\\spark-2.3.1-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = "C:\\winutils\\"

from pyspark.python.pyspark.shell import spark

data = spark.read.load("Absenteeism_at_work.csv", format="csv", header=True, delimiter=",")
data = data.withColumn("MOA", data["Month of absence"] - 0).withColumn("label", data['Height'] - 0). \
    withColumn("ROA", data["Reason for absence"] - 0). \
    withColumn("distance", data["Distance from Residence to Work"] - 0). \
    withColumn("BMI", data["Body mass index"] - 0)
#data.show()

assem = VectorAssembler(inputCols=["label", "distance"], outputCol='features')
data = assem.transform(data)

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

y_true = data.select("BMI").rdd.flatMap(lambda x: x).collect()
y_pred = data.select("ROA").rdd.flatMap(lambda x: x).collect()

confusionmatrix = confusion_matrix(y_true, y_pred)

precision = precision_score(y_true, y_pred, average='micro')

recall = recall_score(y_true, y_pred, average='micro')

treeModel = model.stages[2]
# summary only
print(treeModel)
print("Decision Tree - Test Accuracy = %g" % (accuracy))
print("Decision Tree - Test Error = %g" % (1.0 - accuracy))

print("The Confusion Matrix for Decision Tree Model is :\n" + str(confusionmatrix))

print("The precision score for Decision Tree Model is: " + str(precision))

print("The recall score for Decision Tree Model is: " + str(recall))


                                                                                

+----------+------------+------------+
|prediction|indexedLabel|    features|
+----------+------------+------------+
|       1.0|         1.0|[172.0,11.0]|
|       1.0|         1.0|[172.0,11.0]|
|       1.0|         1.0|[172.0,11.0]|
|       1.0|         1.0|[172.0,11.0]|
|       1.0|         1.0|[172.0,52.0]|
+----------+------------+------------+
only showing top 5 rows

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_a894c3b809e7, depth=5, numNodes=19, numClasses=14, numFeatures=2
Decision Tree - Test Accuracy = 0.971154
Decision Tree - Test Error = 0.0288462
The Confusion Matrix for Decision Tree Model is :
[[0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 ...
 [2 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [5 0 0 ... 0 0 0]]
The precision score for Decision Tree Model is: 0.02972972972972973
The recall score for Decision Tree Model is: 0.02972972972972973


# 1. Decision tree

## import library

In [26]:
from pyspark.python.pyspark.shell import spark

## read data

In [33]:
df = spark.read.load('Absenteeism_at_work.csv', format="csv", header=True, delimiter=",", inferSchema=True)
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Reason for absence: integer (nullable = true)
 |-- Month of absence: integer (nullable = true)
 |-- Day of the week: integer (nullable = true)
 |-- Seasons: integer (nullable = true)
 |-- Transportation expense: integer (nullable = true)
 |-- Distance from Residence to Work: integer (nullable = true)
 |-- Service time: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Work load Average/day : double (nullable = true)
 |-- Hit target: integer (nullable = true)
 |-- Disciplinary_failure: integer (nullable = true)
 |-- Education: integer (nullable = true)
 |-- Son: integer (nullable = true)
 |-- Social drinker: integer (nullable = true)
 |-- Social smoker: integer (nullable = true)
 |-- Pet: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Height: integer (nullable = true)
 |-- Body mass index: integer (nullable = true)
 |-- Absenteeism_time_in_hours: integer (nullable = true)



In [34]:
df.show(n=5)

+---+------------------+----------------+---------------+-------+----------------------+-------------------------------+------------+---+----------------------+----------+--------------------+---------+---+--------------+-------------+---+------+------+---------------+-------------------------+
| ID|Reason for absence|Month of absence|Day of the week|Seasons|Transportation expense|Distance from Residence to Work|Service time|Age|Work load Average/day |Hit target|Disciplinary_failure|Education|Son|Social drinker|Social smoker|Pet|Weight|Height|Body mass index|Absenteeism_time_in_hours|
+---+------------------+----------------+---------------+-------+----------------------+-------------------------------+------------+---+----------------------+----------+--------------------+---------+---+--------------+-------------+---+------+------+---------------+-------------------------+
| 11|                26|               7|              3|      1|                   289|                        

Let display in pandas library for beauty format display

In [35]:
df.limit(10).toPandas()

Unnamed: 0,ID,Reason for absence,Month of absence,Day of the week,Seasons,Transportation expense,Distance from Residence to Work,Service time,Age,Work load Average/day,...,Disciplinary_failure,Education,Son,Social drinker,Social smoker,Pet,Weight,Height,Body mass index,Absenteeism_time_in_hours
0,11,26,7,3,1,289,36,13,33,239.554,...,0,1,2,1,0,1,90,172,30,4
1,36,0,7,3,1,118,13,18,50,239.554,...,1,1,1,1,0,0,98,178,31,0
2,3,23,7,4,1,179,51,18,38,239.554,...,0,1,0,1,0,0,89,170,31,2
3,7,7,7,5,1,279,5,14,39,239.554,...,0,1,2,1,1,0,68,168,24,4
4,11,23,7,5,1,289,36,13,33,239.554,...,0,1,2,1,0,1,90,172,30,2
5,3,23,7,6,1,179,51,18,38,239.554,...,0,1,0,1,0,0,89,170,31,2
6,10,22,7,6,1,361,52,3,28,239.554,...,0,1,1,1,0,4,80,172,27,8
7,20,23,7,6,1,260,50,11,36,239.554,...,0,1,4,1,0,0,65,168,23,4
8,14,19,7,2,1,155,12,14,34,239.554,...,0,1,2,1,0,0,95,196,25,40
9,1,22,7,2,1,235,11,14,37,239.554,...,0,3,1,0,0,1,88,172,29,8


- Shape of data

In [36]:
print((df.count(), len(df.columns)))

(740, 21)


In [44]:
df = df.withColumn("MOA", df["Month of absence"] - 0)
df.limit(10).toPandas()

Unnamed: 0,ID,Reason for absence,Month of absence,Day of the week,Seasons,Transportation expense,Distance from Residence to Work,Service time,Age,Work load Average/day,...,Education,Son,Social drinker,Social smoker,Pet,Weight,Height,Body mass index,Absenteeism_time_in_hours,MOA
0,11,26,7,3,1,289,36,13,33,239.554,...,1,2,1,0,1,90,172,30,4,7
1,36,0,7,3,1,118,13,18,50,239.554,...,1,1,1,0,0,98,178,31,0,7
2,3,23,7,4,1,179,51,18,38,239.554,...,1,0,1,0,0,89,170,31,2,7
3,7,7,7,5,1,279,5,14,39,239.554,...,1,2,1,1,0,68,168,24,4,7
4,11,23,7,5,1,289,36,13,33,239.554,...,1,2,1,0,1,90,172,30,2,7
5,3,23,7,6,1,179,51,18,38,239.554,...,1,0,1,0,0,89,170,31,2,7
6,10,22,7,6,1,361,52,3,28,239.554,...,1,1,1,0,4,80,172,27,8,7
7,20,23,7,6,1,260,50,11,36,239.554,...,1,4,1,0,0,65,168,23,4,7
8,14,19,7,2,1,155,12,14,34,239.554,...,1,2,1,0,0,95,196,25,40,7
9,1,22,7,2,1,235,11,14,37,239.554,...,3,1,0,0,1,88,172,29,8,7


## preprocessing

- convert string columns to float columns

In [23]:
df.schema.names


['ID',
 'Reason for absence',
 'Month of absence',
 'Day of the week',
 'Seasons',
 'Transportation expense',
 'Distance from Residence to Work',
 'Service time',
 'Age',
 'Work load Average/day ',
 'Hit target',
 'Disciplinary_failure',
 'Education',
 'Son',
 'Social drinker',
 'Social smoker',
 'Pet',
 'Weight',
 'Height',
 'Body mass index',
 'Absenteeism_time_in_hours']

In [None]:
data_df = df.withColumn("Reason for absence", df.call_time.cast('float')). \
                withColumn("Reason for absence", df.call_time.cast('float')). \
                withColumn("Reason for absence", df.call_time.cast('float')). \
                withColumn("Reason for absence", df.call_time.cast('float')). \
                withColumn("Reason for absence", df.call_time.cast('float')). \
                withColumn("Reason for absence", df.call_time.cast('float')). \
                withColumn("Reason for absence", df.call_time.cast('float')). \
                withColumn("Reason for absence", df.call_time.cast('float')). \
                withColumn("Reason for absence", df.call_time.cast('float')). \
                withColumn("Reason for absence", df.call_time.cast('float')). \
                withColumn("Reason for absence", df.call_time.cast('float')). \
                withColumn("Reason for absence", df.call_time.cast('float')). \
            

# 2. Naive bayes