In [24]:
import findspark
findspark.init()
findspark.find()
import pyspark

In [25]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

In [26]:
from pyspark import SparkContext
sc = SparkContext("local", "My App")
sqlContext = SQLContext(sc)
df = sqlContext.read.load('daily_weather.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')
df.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [27]:
featureColumns = ['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am',
        'max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am',
        'rain_duration_9am']

In [28]:
df.select('number').show(4)

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
+------+
only showing top 4 rows



In [29]:
#Not be using this column
df = df.drop('number')

In [30]:
#remove all na rows
df = df.na.drop() 

In [31]:
df.count(), len(df.columns)

(1064, 10)

In [32]:
#Create categorical variable from numeric for classification
binarizer = Binarizer(threshold=24.99999, inputCol="relative_humidity_3pm", outputCol="label")
binarizedDF = binarizer.transform(df)

In [33]:
#View some data
binarizedDF.select("relative_humidity_3pm","label").show(10)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
|    76.74000000000046|  1.0|
|   33.930000000000256|  1.0|
|   21.385656725200974|  0.0|
|    74.92000000000041|  1.0|
|   24.030000000000427|  0.0|
|     68.0500000000012|  1.0|
+---------------------+-----+
only showing top 10 rows



In [34]:
#Create feature vector columns aggregrates
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
assembled = assembler.transform(binarizedDF)

In [42]:
#Split the data
(trainingData, testData) = assembled.randomSplit([0.7,0.3], seed = 13234 )

In [43]:
trainingData.count(), testData.count()

(731, 333)

In [37]:
#define classifier
#Maxdepth is the stopping criteria with 'gini' impurity measure for split
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5,
                            minInstancesPerNode=20, impurity="gini")

In [38]:
#Put the classifier in a pipeline with one stage that is the classifier itself no other processing for now
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)

In [39]:
#Predict on test
predictions = model.transform(testData)

In [41]:
#Show predicted vs original labels
predictions.select("prediction", "label").show(20)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  1.0|
|       0.0|  1.0|
|       0.0|  1.0|
|       1.0|  1.0|
+----------+-----+
only showing top 20 rows



In [19]:
predictions.select("prediction", "label").write.save(path="predictions.csv",
                                                     format="com.databricks.spark.csv",
                                                     header='true')

In [1]:
sc.stop()

NameError: name 'sc' is not defined