In [2]:
from pyspark import SQLContext 
from pyspark.ml.feature import Binarizer,VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

In [3]:
sqlContext= SQLContext(sc)
#csv file loaded into spark dataframe object
df= sqlContext.read.load('file:///home/cloudera/Downloads/big-data-4/daily_weather.csv',format='com.databricks.spark.csv',
                         header='true',inferSchema='true')

In [4]:
featureColumns=['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am','max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am' ]

In [5]:
#drop unused and missing data
df=df.drop('number')

In [6]:
df=df.na.drop()

In [7]:
df.count(), len(df.columns)

(1064, 10)

In [8]:
#create categorical variable using binarizer function to denote if the humidity is not low.
#If the value is less than 25%, then we want the categorical value to be 0, otherwise the categorical value should be 1.
#We can create this categorical variable as a column in a DataFrame using Binarizer:
binarizer= Binarizer(threshold=24.999999, inputCol="relative_humidity_3pm", outputCol="label")
binarizedDF= binarizer.transform(df)

In [9]:
#Aggregate features. Let's aggregate the features we will use to make predictions into a single column:
assembler= VectorAssembler(inputCols=featureColumns,outputCol="features")
assembledDF= assembler.transform(binarizedDF)

In [10]:
#Split training and test data. We can split the data by calling randomSplit():
(trainingData, testData)= assembledDF.randomSplit([0.8,0.2],seed=13234)

In [11]:
trainingData.count(), testData.count()

(854, 210)

In [12]:
#create decision tree
dt=DecisionTreeClassifier(labelCol="label", featuresCol="features",maxDepth=5, minInstancesPerNode=20, impurity='gini')

In [13]:
#We can create a model by training the decision tree. This is done by executing it in a Pipeline:
pipeline=Pipeline(stages=[dt])
model= pipeline.fit(trainingData)

In [14]:
predictions= model.transform(testData)

In [15]:
predictions.select('prediction', 'label').show(10) 

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
+----------+-----+
only showing top 10 rows



In [16]:
#Let's save only the prediction and label columns to a CSV file:
predictions.select('prediction', 'label').write.save('file:///home/cloudera/Downloads/big-data-4/prediction.csv',
                                                     format='com.databricks.spark.csv',header='true',inferSchema='true')