In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

In [2]:
sqlContext = SQLContext(sc)
df = sqlContext.read.load('file:///home/cloudera/Downloads/big-data/minute_weather.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')
df.columns

['rowID',
 'hpwren_timestamp',
 'air_pressure',
 'air_temp',
 'avg_wind_direction',
 'avg_wind_speed',
 'max_wind_direction',
 'max_wind_speed',
 'min_wind_direction',
 'min_wind_speed',
 'rain_accumulation',
 'rain_duration',
 'relative_humidity']

In [3]:
featureColumns = ['air_pressure','air_temp','avg_wind_direction','avg_wind_speed',
        'max_wind_direction','max_wind_speed','rain_accumulation',
        'rain_duration']

In [4]:
filteredDF = df.filter((df.rowID % 15) == 0)

In [5]:
filteredDF = filteredDF.drop('hpwren_timestamp')

In [6]:
filteredDF = filteredDF.na.drop()

In [7]:
filteredDF.count(), len(filteredDF.columns)

(105790, 12)

In [8]:
binarizer = Binarizer(threshold = 24.99999, inputCol = "relative_humidity", outputCol = "label")

In [9]:
binarizedDF = binarizer.transform(filteredDF)

In [10]:
binarizedDF.select("relative_humidity", "label").show(4)

+-----------------+-----+
|relative_humidity|label|
+-----------------+-----+
|             45.2|  1.0|
|             57.9|  1.0|
|             57.1|  1.0|
|             51.4|  1.0|
+-----------------+-----+
only showing top 4 rows



In [11]:
assembler = VectorAssembler(inputCols = featureColumns, outputCol = "features")

In [12]:
assembled = assembler.transform(binarizedDF)

In [13]:
(trainingData, testData) = assembled.randomSplit([0.7, 0.3], seed = 129)

In [14]:
trainingData.count(), testData.count()

(73793, 31997)

In [15]:
dt = DecisionTreeClassifier(labelCol = "label", featuresCol = "features", maxDepth = 8, minInstancesPerNode = 20, impurity = "gini")

In [16]:
pipeline = Pipeline(stages = [dt])

In [17]:
model = pipeline.fit(trainingData)

In [18]:
predictions = model.transform(testData)

In [19]:
predictions.select("prediction", "label").show(10)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
+----------+-----+
only showing top 10 rows



In [21]:
predictions.select("prediction", "label").coalesce(1).write.save(path='file:///home/cloudera/Downloads/big-data/predictions_q2.csv',
                                                     format='com.databricks.spark.csv',
                                                     header='true')

In [22]:
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [23]:
sqlContext1 = SQLContext(sc)
predictions = sqlContext1.read.load('file:///home/cloudera/Downloads/big-data/predictions_q2.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')

In [24]:
evaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "precision")

In [25]:
accuracy = evaluator.evaluate(predictions)

In [26]:
print("Accuracy = %g " % (accuracy))

Accuracy = 0.815639 


In [27]:
predictions.rdd.take(2)

[Row(prediction=1.0, label=1.0), Row(prediction=1.0, label=1.0)]

In [28]:
predictions.rdd.map(tuple).take(2)

[(1.0, 1.0), (1.0, 1.0)]

In [29]:
metrics = MulticlassMetrics(predictions.rdd.map(tuple))

In [30]:
metrics.confusionMatrix().toArray().transpose()

array([[  3590.,   1319.],
       [  4580.,  22508.]])