In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar xf spark-3.1.1-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark
!pip install pyspark



In [None]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.sql("select 'spark' as hello ")
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.python.pyspark.shell import spark

# read input and show
df = spark.read.options(delimiter=',', header=True).csv('/content/Absenteeism_at_work.csv')
df = df.withColumn("MOA", df["Month of absence"] - 0).withColumn("label", df['Transportation expense'] - 0).withColumn("ROA", df["Reason for absence"] - 0).\
    withColumn("distance", df["Distance from Residence to Work"] - 0).withColumn("BMI", df["Body mass index"] - 0)
df.show(5)

+---+------------------+----------------+---------------+-------+----------------------+-------------------------------+------------+---+----------------------+----------+--------------------+---------+---+--------------+-------------+---+------+------+---------------+-------------------------+---+-----+----+--------+----+
| ID|Reason for absence|Month of absence|Day of the week|Seasons|Transportation expense|Distance from Residence to Work|Service time|Age|Work load Average/day |Hit target|Disciplinary_failure|Education|Son|Social drinker|Social smoker|Pet|Weight|Height|Body mass index|Absenteeism_time_in_hours|MOA|label| ROA|distance| BMI|
+---+------------------+----------------+---------------+-------+----------------------+-------------------------------+------------+---+----------------------+----------+--------------------+---------+---+--------------+-------------+---+------+------+---------------+-------------------------+---+-----+----+--------+----+
| 11|                26| 

In [None]:
# combine column label and distance to new column name features
# https://spark.apache.org/docs/latest/ml-features#vectorindexer
merge_col = VectorAssembler(inputCols=["label", "distance"], outputCol='features')
df = merge_col.transform(df)
df.select("features").show(5)

+------------+
|    features|
+------------+
|[289.0,36.0]|
|[118.0,13.0]|
|[179.0,51.0]|
| [279.0,5.0]|
|[289.0,36.0]|
+------------+
only showing top 5 rows



In [None]:
# create a column indexedLabel mark label with most frequency start from 0.0 (most frequency) to n(least frequency)
labelIndexerFrequency = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(df)
# use transform(df) to show, not use when run
# labelIndexerFrequency = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(df).transform(df)
# labelIndexerFrequency.show(5)

In [None]:
#decide which features should be treated as categorical
featureIndexerFrequency = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df)

In [None]:
# https://stackoverflow.com/questions/44981407/pyspark-ml-how-to-save-pipeline-and-randomforestclassificationmodel
# split to 70-30
(trainingData, testData) = df.randomSplit([0.7, 0.3])
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexerFrequency.labels)
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexerFrequency, featureIndexerFrequency, rf, labelConverter])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Decision Tree - Test Accuracy = %g" % (accuracy))
print("Decision Tree - Test Error = %g" % (1.0 - accuracy))
rfModel = model.stages[2]
print(rfModel) # summary only

+--------------+-----+------------+
|predictedLabel|label|    features|
+--------------+-----+------------+
|         235.0|235.0|[235.0,11.0]|
|         235.0|235.0|[235.0,11.0]|
|         235.0|235.0|[235.0,11.0]|
|         235.0|235.0|[235.0,11.0]|
|         235.0|235.0|[235.0,11.0]|
+--------------+-----+------------+
only showing top 5 rows

Decision Tree - Test Accuracy = 0.940171
Decision Tree - Test Error = 0.0598291
RandomForestClassificationModel: uid=RandomForestClassifier_9bf8dc19afca, numTrees=10, numClasses=24, numFeatures=2


In [None]:
# save model
rfModel.save('/content/myModel')