In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

In [2]:
sqlContext = SQLContext(sc)
df = sqlContext.read.load('file:///home/cloudera/Downloads/big-data-4/daily_weather.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')
df.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [3]:
featureColumns = ['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am',
        'max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am',
        'rain_duration_9am']

In [4]:
df = df.drop('number')

In [5]:
df = df.na.drop()

In [6]:
df.count(), len(df.columns)

(1064, 10)

In [7]:
binarizer = Binarizer(threshold = 24.99999, inputCol = 'relative_humidity_3pm', outputCol = 'label')
binarizerdf = binarizer.transform(df)

In [8]:
binarizerdf.select('relative_humidity_3pm', 'label').show()

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
|    76.74000000000046|  1.0|
|   33.930000000000256|  1.0|
|   21.385656725200974|  0.0|
|    74.92000000000041|  1.0|
|   24.030000000000427|  0.0|
|     68.0500000000012|  1.0|
|    32.13000000000024|  1.0|
|     79.0900000000002|  1.0|
|    58.43000000000119|  1.0|
|   27.990000000000173|  1.0|
|   24.369999999999948|  0.0|
|   14.801705962979918|  0.0|
|    20.75568332171184|  0.0|
|    45.87000000000005|  1.0|
|    7.740000000000088|  0.0|
|   14.649909361535952|  0.0|
+---------------------+-----+
only showing top 20 rows



In [9]:
assembler = VectorAssembler(inputCols = featureColumns, outputCol = 'features')
assembled = assembler.transform(binarizerdf)

In [10]:
(trianingData,testData) = assembled.randomSplit([0.7, 0.3], seed = 13234)



In [11]:
trianingData.count(), testData.count()

(730, 334)

In [12]:
dt = DecisionTreeClassifier(labelCol = 'label', featuresCol = 'features', maxDepth = 5, minInstancesPerNode=20, impurity = 'gini')

In [13]:
pipeline = Pipeline(stages = [dt])
model = pipeline.fit(trianingData)
predictions = model.transform(testData)

In [14]:
predictions.select('prediction', 'label').show(20)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       0.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  0.0|
|       1.0|  1.0|
|       0.0|  0.0|
+----------+-----+
only showing top 20 rows



In [15]:
predictions.select('prediction', 'label').write.save(path = 'file///home/cloudera/Downloads/big-data-4/preds_1a.csv', 
                                                     format = 'com.databricks.spark.csv', header = 'true')

In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

predictions = predictions.select('prediction', 'label')


In [17]:
predictions.take(5)

[Row(prediction=1.0, label=1.0),
 Row(prediction=1.0, label=1.0),
 Row(prediction=1.0, label=1.0),
 Row(prediction=1.0, label=1.0),
 Row(prediction=1.0, label=1.0)]

In [18]:
evaluator = MulticlassClassificationEvaluator(labelCol= 'label', predictionCol = 'prediction', metricName = 'precision')

In [19]:
evaluator.evaluate(predictions)

0.7544910179640718

In [20]:
predictions.rdd.take(2)

[Row(prediction=1.0, label=1.0), Row(prediction=1.0, label=1.0)]

In [21]:
predictions.rdd.map(tuple).take(2)

[(1.0, 1.0), (1.0, 1.0)]

In [22]:
metrics = MulticlassMetrics(predictions.rdd.map(tuple))

In [23]:
metrics.confusionMatrix().toArray().transpose()

array([[ 134.,   53.],
       [  29.,  118.]])