In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\spark\\spark-3.0.3-bin-hadoop2.7'

In [2]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

In [3]:
sc = SparkContext()
sqlContext = SQLContext(sc)
df = sqlContext.read.load('daily_weather.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')
df.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [4]:
featureColumns = ['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am',
        'max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am',
        'rain_duration_9am']

In [5]:
df = df.drop('number')

In [6]:
df = df.na.drop()

In [7]:
df.count(), len(df.columns)

(1064, 10)

In [8]:
binarizer = Binarizer(threshold=24.99999, inputCol="relative_humidity_3pm", outputCol="label")
binarizedDF = binarizer.transform(df)

In [9]:
binarizedDF.select("relative_humidity_3pm","label").show(4)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
+---------------------+-----+
only showing top 4 rows



In [10]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
assembled = assembler.transform(binarizedDF)

In [11]:
(trainingData, testData) = assembled.randomSplit([0.8,0.2], seed=13234)

In [12]:
trainingData.count(), testData.count()

(846, 218)

In [13]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5,
                            minInstancesPerNode=20, impurity="gini")

In [14]:
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)

In [15]:
predictions = model.transform(testData)

In [16]:
predictions.select("prediction", "label").show(15)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
+----------+-----+
only showing top 15 rows



In [17]:
#Model Evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [18]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")

In [19]:
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g " % (accuracy))

Accuracy = 0.784404 


In [20]:
predictions.select("prediction", "label").rdd.take(2)

[Row(prediction=1.0, label=1.0), Row(prediction=1.0, label=1.0)]

In [21]:
predictions.select("prediction", "label").rdd.map(tuple).take(2)

[(1.0, 1.0), (1.0, 1.0)]

In [22]:
metrics = MulticlassMetrics(predictions.select("prediction", "label").rdd.map(tuple))

In [23]:
metrics.confusionMatrix().toArray().transpose()

array([[87., 28.],
       [19., 84.]])