In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

In [2]:
sqlContext = SQLContext(sc)
df = sqlContext.read.load('file:///home/cloudera/Downloads/big-data-4/daily_weather.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')
df.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [3]:
featureColumns = ['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am',
        'max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am',
        'rain_duration_9am']

In [5]:
df = df.drop('number')

In [6]:
df = df.na.drop()

In [10]:
df.count(), len(df.columns)

(1064, 10)

In [11]:
df.head()

Row(air_pressure_9am=918.0600000000087, air_temp_9am=74.82200000000041, avg_wind_direction_9am=271.1, avg_wind_speed_9am=2.080354199999768, max_wind_direction_9am=295.39999999999986, max_wind_speed_9am=2.863283199999908, rain_accumulation_9am=0.0, rain_duration_9am=0.0, relative_humidity_9am=42.42000000000046, relative_humidity_3pm=36.160000000000494)

In [12]:
binarizer = Binarizer(threshold=24.999999, inputCol='relative_humidity_3pm', outputCol='label')

In [13]:
binarizedDF = binarizer.transform(df)

In [15]:
binarizedDF.select('label').show()

+-----+
|label|
+-----+
|  1.0|
|  0.0|
|  0.0|
|  0.0|
|  1.0|
|  1.0|
|  0.0|
|  1.0|
|  0.0|
|  1.0|
|  1.0|
|  1.0|
|  1.0|
|  1.0|
|  0.0|
|  0.0|
|  0.0|
|  1.0|
|  0.0|
|  0.0|
+-----+
only showing top 20 rows



In [17]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol='features')

In [18]:
assembled = assembler.transform(binarizedDF)

In [19]:
(training, testing) = assembled.randomSplit([0.8, 0.2], seed=13234)

In [21]:
training.count()

854

In [24]:
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features', impurity='gini', minInstancesPerNode=20, maxDepth=5)

In [29]:
pipeline = Pipeline(stages=[dt])

In [30]:
model = pipeline.fit(training)

In [32]:
prediction = model.transform(testing)

In [33]:
prediction.select('label', 'prediction').show()

+-----+----------+
|label|prediction|
+-----+----------+
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  1.0|       0.0|
|  1.0|       1.0|
+-----+----------+
only showing top 20 rows

