**Basic Classification with PySpark**

In [1]:
#Import libraries
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

In [2]:
#Initialize SQL context
sqlContext = SQLContext(sc)
df = sqlContext.read.load('file:///home/cloudera/Downloads/big-data-4/daily_weather.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')

In [3]:
#Look at dataframe columns
df.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [4]:
#Select the features that we are going to need
featureColumns = ['air_pressure_9am','air_temp_9am','avg_wind_direction_9am','avg_wind_speed_9am',
        'max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am',
        'rain_duration_9am']

In [5]:
#Drop number column
df = df.drop('number')

In [6]:
#Drop rows with na values on the dataframe
df = df.na.drop()

In [7]:
#Look at df shape
df.count(), len(df.columns)

(1064, 10)

In [8]:
#Create a binarizer on the target feature (0,1)
binarizer = Binarizer(threshold = 24.99999, inputCol = 'relative_humidity_3pm', outputCol = 'label')
binarizedDF = binarizer.transform(df)

In [9]:
#Look at target feature labels
binarizedDF.select('relative_humidity_3pm','label').show(4)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
+---------------------+-----+
only showing top 4 rows



In [10]:
#Create an unique vector with all the features to perform the training
assembler = VectorAssembler(inputCols = featureColumns, outputCol = 'features')
assembled = assembler.transform(binarizedDF)

In [11]:
#Split data into train and test data
(trainingData, testData) = assembled.randomSplit([0.7,0.3], seed = 13234)

In [12]:
#Look at train and test data shapes
trainingData.count(), testData.count()

(730, 334)

In [13]:
#Initialize a Decision Tree Classifier
dt = DecisionTreeClassifier(labelCol='label', featuresCol = 'features', maxDepth = 5, minInstancesPerNode = 20, impurity = 'gini')

In [14]:
#Create a Pipeline and train the Decision Tree
pipeline = Pipeline(stages = [dt])
model = pipeline.fit(trainingData)

In [15]:
#Predict on the test set
predictions = model.transform(testData)

In [16]:
#Look at predictions and true labels
predictions.select(['prediction','label']).show(10)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       1.0|  1.0|
+----------+-----+
only showing top 10 rows



In [17]:
#Save prediciotns and true labels to a csv file
predictions.select(['prediction','label']).write.save(path = 'file:///home/cloudera/Downloads/big-data-4/predictions.csv', format = 'com.databricks.spark.csv', header = 'true')