In [1]:
import findspark
findspark.init()

In [2]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### dataset: Absenteeism at work

In [3]:
data = spark.read.csv("Absenteeism_at_work.csv", inferSchema=True, header = True, sep=",")

In [4]:
data.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Reason for absence: integer (nullable = true)
 |-- Month of absence: integer (nullable = true)
 |-- Day of the week: integer (nullable = true)
 |-- Seasons: integer (nullable = true)
 |-- Transportation expense: integer (nullable = true)
 |-- Distance from Residence to Work: integer (nullable = true)
 |-- Service time: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Work load Average/day : double (nullable = true)
 |-- Hit target: integer (nullable = true)
 |-- Disciplinary_failure: integer (nullable = true)
 |-- Education: integer (nullable = true)
 |-- Son: integer (nullable = true)
 |-- Social drinker: integer (nullable = true)
 |-- Social smoker: integer (nullable = true)
 |-- Pet: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Height: integer (nullable = true)
 |-- Body mass index: integer (nullable = true)
 |-- Absenteeism_time_in_hours: integer (nullable = true)



In [5]:
data = data.withColumn("MOA", data["Month of absence"]). \
            withColumn("label", data['Height']). \
            withColumn("ROA", data["Reason for absence"]). \
            withColumn("distance", data["Distance from Residence to Work"]). \
            withColumn("BMI", data["Body mass index"])

In [6]:
data[["MOA", "label", "ROA", "distance", "BMI"]].show()

+---+-----+---+--------+---+
|MOA|label|ROA|distance|BMI|
+---+-----+---+--------+---+
|  7|  172| 26|      36| 30|
|  7|  178|  0|      13| 31|
|  7|  170| 23|      51| 31|
|  7|  168|  7|       5| 24|
|  7|  172| 23|      36| 30|
|  7|  170| 23|      51| 31|
|  7|  172| 22|      52| 27|
|  7|  168| 23|      50| 23|
|  7|  196| 19|      12| 25|
|  7|  172| 22|      11| 29|
|  7|  168|  1|      50| 23|
|  7|  168|  1|      50| 23|
|  7|  168| 11|      50| 23|
|  7|  170| 11|      51| 31|
|  7|  170| 23|      51| 31|
|  7|  170| 14|      25| 23|
|  7|  170| 23|      51| 31|
|  7|  170| 21|      51| 31|
|  7|  167| 11|      29| 25|
|  8|  165| 23|      25| 32|
+---+-----+---+--------+---+
only showing top 20 rows



In [7]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

In [8]:
assem = VectorAssembler(inputCols=['label', 'distance'], outputCol='features')
data = assem.transform(data)

In [9]:
labelIndexer = StringIndexer(inputCol='label', outputCol='indexedLabel')

In [10]:
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4)#.fit(data)

In [21]:
(trainingData, testData) = data.randomSplit([0.7, 0.3], seed=1)

In [22]:
dt_clf = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

In [23]:
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt_clf])

In [24]:
model = pipeline.fit(trainingData)

In [25]:
# Make predictions.
predictions = model.transform(testData)

In [31]:
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show()

+----------+------------+------------+
|prediction|indexedLabel|    features|
+----------+------------+------------+
|       1.0|         1.0|[172.0,11.0]|
|       1.0|         1.0|[172.0,11.0]|
|       1.0|         1.0|[172.0,11.0]|
|       1.0|         1.0|[172.0,11.0]|
|       1.0|         1.0|[172.0,11.0]|
|       8.0|        12.0|[163.0,29.0]|
|       8.0|        12.0|[163.0,29.0]|
|       0.0|         0.0|[170.0,51.0]|
|       0.0|         0.0|[170.0,51.0]|
|       0.0|         0.0|[170.0,51.0]|
|       0.0|         0.0|[170.0,51.0]|
|       0.0|         0.0|[170.0,51.0]|
|       0.0|         0.0|[170.0,51.0]|
|       0.0|         0.0|[170.0,51.0]|
|       0.0|         0.0|[170.0,51.0]|
|       0.0|         0.0|[170.0,51.0]|
|       0.0|         0.0|[170.0,51.0]|
|       0.0|         0.0|[170.0,51.0]|
|       0.0|         0.0|[170.0,51.0]|
|       0.0|         0.0|[170.0,51.0]|
+----------+------------+------------+
only showing top 20 rows



In [27]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel",\
                                              predictionCol="prediction",\
                                              metricName="accuracy")

In [28]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score

accuracy = evaluator.evaluate(predictions)

y_true = data.select("BMI").rdd.flatMap(lambda x: x).collect()
y_pred = data.select("ROA").rdd.flatMap(lambda x: x).collect()

confusionmatrix = confusion_matrix(y_true, y_pred)

precision = precision_score(y_true, y_pred, average='micro')

recall = recall_score(y_true, y_pred, average='micro')

In [29]:
treeModel = model.stages[2]

In [30]:
print(treeModel)
print("Decision Tree - Test Accuracy = %g" % (accuracy))
print("Decision Tree - Test Error = %g" % (1.0 - accuracy))

print("The Confusion Matrix for Decision Tree Model is :\n" + str(confusionmatrix))

print("The precision score for Decision Tree Model is: " + str(precision))

print("The recall score for Decision Tree Model is: " + str(recall))

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_7a83899006ed) of depth 5 with 21 nodes
Decision Tree - Test Accuracy = 0.964286
Decision Tree - Test Error = 0.0357143
The Confusion Matrix for Decision Tree Model is :
[[0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 ...
 [2 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [5 0 0 ... 0 0 0]]
The precision score for Decision Tree Model is: 0.02972972972972973
The recall score for Decision Tree Model is: 0.02972972972972973


---

### dataset: zoo

In [15]:
zoo_data = spark.read.csv('zoo.csv', inferSchema=True,header=True)

In [56]:
data = zoo_data.drop('animal_name')

In [57]:
features.printSchema()

root
 |-- hair: integer (nullable = true)
 |-- feathers: integer (nullable = true)
 |-- eggs: integer (nullable = true)
 |-- milk: integer (nullable = true)
 |-- airborne: integer (nullable = true)
 |-- aquatic: integer (nullable = true)
 |-- predator: integer (nullable = true)
 |-- toothed: integer (nullable = true)
 |-- backbone: integer (nullable = true)
 |-- breathes: integer (nullable = true)
 |-- venomous: integer (nullable = true)
 |-- fins: integer (nullable = true)
 |-- legs: integer (nullable = true)
 |-- tail: integer (nullable = true)
 |-- domestic: integer (nullable = true)
 |-- catsize: integer (nullable = true)
 |-- class_type: integer (nullable = true)



In [53]:
assembler = VectorAssembler(inputCols=['hair', 'feathers', 'eggs', 'milk',\
                                      'airborne', 'aquatic', 'predator', 'toothed',\
                                      'backbone', 'breathes', 'venomous', 'fins',\
                                      'legs', 'tail', 'domestic', 'catsize'],outputCol='features')

In [54]:
from pyspark.ml.classification import DecisionTreeClassifier
dt_clf = DecisionTreeClassifier(featuresCol='features', labelCol='class_type')

In [55]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, dt_clf])

In [58]:
train_data,val_data = data.randomSplit([0.8,0.2])

In [59]:
model = pipeline.fit(train_data)

In [60]:
validations = model.transform(val_data)

In [72]:
validations[['prediction', 'class_type']].show()

+----------+----------+
|prediction|class_type|
+----------+----------+
|       1.0|         1|
|       1.0|         1|
|       4.0|         4|
|       7.0|         7|
|       7.0|         7|
|       4.0|         4|
|       4.0|         4|
|       2.0|         2|
|       2.0|         2|
|       2.0|         2|
|       2.0|         2|
|       1.0|         1|
|       1.0|         1|
|       1.0|         1|
|       1.0|         1|
|       1.0|         1|
|       1.0|         1|
|       1.0|         1|
|       1.0|         1|
|       1.0|         1|
+----------+----------+
only showing top 20 rows



In [62]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="class_type", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(validations)
print('A Decision Tree algorithm had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))

A Decision Tree algorithm had an accuracy of: 100.00%


In [67]:
from pyspark.ml.classification import RandomForestClassifier
rf_clf = RandomForestClassifier(featuresCol='features', labelCol='class_type')

In [68]:
pipeline = Pipeline(stages=[assembler, rf_clf])

In [69]:
model = pipeline.fit(train_data)

In [70]:
validations = model.transform(val_data)

In [71]:
acc_evaluator = MulticlassClassificationEvaluator(labelCol="class_type", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(validations)
print('A Decision Tree algorithm had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))

A Decision Tree algorithm had an accuracy of: 100.00%


---


### Healthcare stroke dataset

In [9]:
spark = SparkSession.builder.appName('stroke').getOrCreate()
train = spark.read.csv('train_2v.csv', inferSchema=True,header=True)

In [10]:
train.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [12]:
train.groupby('stroke').count().show()

+------+-----+
|stroke|count|
+------+-----+
|     1|  783|
|     0|42617|
+------+-----+



In [18]:
train.createOrReplaceTempView('table')

In [19]:
spark.sql("SELECT work_type, count(work_type) as work_type_count FROM table WHERE stroke == 1 GROUP BY work_type ORDER BY work_type_count DESC").show()

+-------------+---------------+
|    work_type|work_type_count|
+-------------+---------------+
|      Private|            441|
|Self-employed|            251|
|     Govt_job|             89|
|     children|              2|
+-------------+---------------+



In [20]:
spark.sql("SELECT gender, count(gender) as count_gender, count(gender)*100/sum(count(gender)) over() as percent FROM table GROUP BY gender").show()

+------+------------+-------------------+
|gender|count_gender|            percent|
+------+------------+-------------------+
|Female|       25665|  59.13594470046083|
| Other|          11|0.02534562211981567|
|  Male|       17724|  40.83870967741935|
+------+------------+-------------------+



In [22]:
train_f = train.na.fill('No Info', subset=['smoking_status'])
# fill in miss values with mean
from pyspark.sql.functions import mean
mean = train_f.select(mean(train_f['bmi'])).collect()
mean_bmi = mean[0][0]
train_f = train_f.na.fill(mean_bmi,['bmi'])

In [23]:
from pyspark.ml.feature import VectorAssembler,OneHotEncoder,StringIndexer

In [24]:
genderIndexer = StringIndexer(inputCol='gender', outputCol='indexedGender')
gender_encoder = OneHotEncoder(inputCol='indexedGender', outputCol='genderVec')

In [25]:
ever_marriedIndexer = StringIndexer(inputCol='ever_married', outputCol='indexedEver_married')
ever_married_encoder = OneHotEncoder(inputCol='indexedEver_married', outputCol='ever_marriedVec')

In [26]:
workTypeIndexer = StringIndexer(inputCol='work_type', outputCol='indexedWork_type')
workType_encoder = OneHotEncoder(inputCol='indexedWork_type', outputCol='workTypeVec')

In [27]:
ResidenceTypeIndexer = StringIndexer(inputCol='Residence_type', outputCol='indexedResidence_type')
ResidenceType_encoder = OneHotEncoder(inputCol='indexedResidence_type', outputCol='residenceTypeVec')

In [28]:
smoking_statusIdexer = StringIndexer(inputCol='smoking_status', outputCol='indexedSmoking_status')
smokingStatus_encoder = OneHotEncoder(inputCol='indexedSmoking_status', outputCol='smokingStatusVec')

In [29]:
assembler = VectorAssembler(inputCols=['genderVec','age','hypertension','heart_disease', 'ever_marriedVec','workTypeVec'\
                                       ,'residenceTypeVec', 'avg_glucose_level', 'bmi', 'smokingStatusVec'],outputCol='features')

In [30]:
from pyspark.ml.classification import DecisionTreeClassifier
dt_clf = DecisionTreeClassifier(featuresCol='features', labelCol='stroke')

In [31]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[genderIndexer, ever_marriedIndexer, workTypeIndexer, ResidenceTypeIndexer,smoking_statusIdexer,\
                            gender_encoder, ever_married_encoder, workType_encoder, ResidenceType_encoder, smokingStatus_encoder,\
                            assembler, dt_clf])

In [32]:
train_data,val_data = train_f.randomSplit([0.8,0.2])

In [33]:
model = pipeline.fit(train_data)

In [34]:
validations = model.transform(val_data)

In [35]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(validations)
print('A Decision Tree algorithm had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))

A Decision Tree algorithm had an accuracy of: 98.30%


In [45]:
validations.groupby('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0| 8768|
|       1.0|    1|
+----------+-----+



In [36]:
test = spark.read.csv('test_2v.csv', inferSchema=True,header=True)

In [37]:
test_f = test.na.fill('No Info', subset=['smoking_status'])
# fill in miss values with mean
#from pyspark.sql.functions import mean
#mean = train_f.select(mean(train_f['bmi'])).collect()
#mean_bmi = mean[0][0]
test_f = test_f.na.fill(mean_bmi,['bmi'])

In [38]:
predictions = model.transform(test_f)

In [44]:
predictions.groupby('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|18601|
+----------+-----+

