In [1]:
from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

In [2]:
sc = SparkContext()
sqlContext = SQLContext(sc)

In [3]:
df=sqlContext.read.load('daily_weather.csv', format='com.databricks.spark.csv', header='true', inferSchema='true')
len(df.columns)

11

In [4]:
featureColumns = ['air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [5]:
df=df.drop('number')
df=df.na.drop()
df.count(), len(df.columns)

(1064, 10)

In [6]:
binarizer = Binarizer(threshold=24.99999, inputCol='relative_humidity_3pm',outputCol='label')
binarizedDF = binarizer.transform(df)

In [7]:
binarizedDF.select('relative_humidity_3pm','label').show(5)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|                36.16|  1.0|
|           19.4265968|  0.0|
|                14.46|  0.0|
|          12.74254735|  0.0|
|                76.74|  1.0|
+---------------------+-----+
only showing top 5 rows



In [8]:
assembler = VectorAssembler(inputCols=featureColumns,outputCol='features')
assembled = assembler.transform(binarizedDF)

In [16]:
(trainingData, testData) = assembled.randomSplit([0.7,0.3], seed=13234)
trainingData.count(), testData.count()

(742, 322)

In [11]:
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features', maxDepth=5, minInstancesPerNode=20, impurity='gini')

In [12]:
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)

In [19]:
predictions = model.transform(testData)
predictions.select('prediction','label').show(20)
predictions.select('prediction','label').write.save(path='/Users/miteshgandhi/Documents/knime_workspace/predictions.csv', format='com.databricks.spark.csv', header='true')

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       0.0|  0.0|
+----------+-----+
only showing top 20 rows

