In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

## load data

In [33]:
sqlContext = SQLContext(sc)
df = sqlContext.read.load('file:///home/cloudera/Downloads/big-data-4/daily_weather.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')
df.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [3]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
number,1095,547.0,316.24357700987383,0,1094
air_pressure_9am,1092,918.8825513138097,3.1841611803868353,907.9900000000024,929.3200000000012
air_temp_9am,1090,64.93300141287075,11.175514003175877,36.752000000000685,98.90599999999992
avg_wind_direction_9am,1091,142.23551070057584,69.13785928889183,15.500000000000046,343.4
avg_wind_speed_9am,1092,5.50828424225493,4.552813465531715,0.69345139999974,23.554978199999763
max_wind_direction_9am,1092,148.9535179651692,67.23801294602951,28.89999999999991,312.19999999999993
max_wind_speed_9am,1091,7.019513529175272,5.59820917078096,1.1855782000000479,29.84077959999996
rain_accumulation_9am,1089,0.20307895225211126,1.5939521253574904,0.0,24.01999999999907
rain_duration_9am,1092,294.1080522756142,1598.078778660148,0.0,17704.0


### drop unused and missing data

In [4]:
df = df.drop('number')

In [5]:
df = df.na.drop() 

In [6]:
df.count(), len(df.columns)

(1064, 10)

## Create categorical variable

In [7]:
binarizer = Binarizer(threshold=24.99999, inputCol="relative_humidity_3pm", outputCol="label")
binarizedDF = binarizer.transform(df)

In [8]:
binarizedDF.select("relative_humidity_3pm","label").show(4)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
+---------------------+-----+
only showing top 4 rows



## aggregate features

In [9]:
featureColumns = ['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am',
        'max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am',
        'rain_duration_9am']

In [10]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
assembled = assembler.transform(binarizedDF)

## Split training and test data

In [37]:
(trainingData, testData) = assembled.randomSplit([0.8,0.2], seed = 13234 )

In [38]:
trainingData.count(), testData.count()

(854, 210)

In [13]:
testData['air_pressure_9am','features','label'].show()

+-----------------+--------------------+-----+
| air_pressure_9am|            features|label|
+-----------------+--------------------+-----+
|908.9700000000045|[908.970000000004...|  1.0|
|912.8900000000112|[912.890000000011...|  1.0|
|912.9900000000129|[912.990000000012...|  1.0|
|913.0600000000032|[913.060000000003...|  1.0|
|913.0700000000083|[913.070000000008...|  1.0|
|913.4900000000082|[913.490000000008...|  1.0|
|913.9700000000129|[913.970000000012...|  0.0|
|914.4900000000038|[914.490000000003...|  1.0|
| 915.030000000007|[915.030000000007...|  1.0|
|915.1000000000028|[915.100000000002...|  1.0|
|915.1019014359358|[915.101901435935...|  0.0|
|915.6000000000056|[915.600000000005...|  1.0|
|915.6000000000067|[915.600000000006...|  1.0|
| 915.700000000002|[915.700000000002...|  1.0|
|915.7100000000058|[915.710000000005...|  0.0|
|915.8400000000056|[915.840000000005...|  1.0|
|915.8700000000059|[915.870000000005...|  1.0|
|915.9144357399191|[915.914435739919...|  0.0|
|916.04000000

## Create and train decision tree classifier

In [14]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5,
                            minInstancesPerNode=20, impurity="gini")

In [15]:
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)

In [16]:
predictions = model.transform(testData)

In [31]:
predictions.select("prediction", "label").show()

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       0.0|  1.0|
|       1.0|  1.0|
+----------+-----+
only showing top 20 rows



In [18]:
predictions.columns

['air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm',
 'label',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [19]:
predictions.select("rawPrediction","prediction", "label").show(10)

+-------------+----------+-----+
|rawPrediction|prediction|label|
+-------------+----------+-----+
|  [2.0,163.0]|       1.0|  1.0|
|  [2.0,163.0]|       1.0|  1.0|
|  [2.0,163.0]|       1.0|  1.0|
|  [2.0,163.0]|       1.0|  1.0|
|  [2.0,163.0]|       1.0|  1.0|
|  [2.0,163.0]|       1.0|  1.0|
|   [30.0,9.0]|       0.0|  0.0|
|  [2.0,163.0]|       1.0|  1.0|
|   [8.0,23.0]|       1.0|  1.0|
|  [2.0,163.0]|       1.0|  1.0|
+-------------+----------+-----+
only showing top 10 rows



## Save predictions to CSV

In [20]:
type(predictions)

pyspark.sql.dataframe.DataFrame

predictions.select("prediction", "label").write.save(path="file:///home/cloudera/Downloads/big-data-4/predictions",
                                                     format="com.databricks.spark.csv",
                                                     header='true')

In [27]:
predictions.select("prediction", "label").coalesce(1).write.save(path="file:///home/cloudera/predictions.csv",
                                                     format="com.databricks.spark.csv",
                                                     header='true')

In [28]:
!head /home/cloudera/predictions.csv/part-00000

prediction,label
1.0,1.0
1.0,1.0
1.0,1.0
1.0,1.0
1.0,1.0
1.0,1.0
0.0,0.0
1.0,1.0
1.0,1.0
