In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.context import SparkContext
from pyspark import SparkConf
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer


In [2]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
sqlContext = SQLContext(sc)

In [3]:
df = sqlContext.read.load('datasets/daily_weather.csv',
                             format = 'com.databricks.spark.csv',
                             header = 'true', inferSchema = 'true')

In [4]:
df.columns

['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [5]:
featureColumns = ['air_pressure_9am',
                 'air_temp_9am', 'avg_wind_direction_9am','avg_wind_speed_9am',
                 'max_wind_direction_9am','max_wind_speed_9am','rain_accumulation_9am',
                 'rain_duration_9am']

In [6]:
df = df.drop('number')

In [7]:
df = df.na.drop()

In [8]:
df.count(), len(df.columns)

(1064, 10)

In [9]:
#Se elige un treshold para la selección de una clase.
binarizer = Binarizer(threshold = 24.99999, inputCol = 'relative_humidity_3pm',
                     outputCol = 'label')

In [10]:
binarizedDF = binarizer.transform(df)

In [11]:
binarizedDF.select('relative_humidity_3pm', 'label').show(4)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
+---------------------+-----+
only showing top 4 rows



In [12]:
#Agregar características que usaremos para hacer predicciones en una sola columna
assembler = VectorAssembler(inputCols = featureColumns,
                           outputCol = 'features')

assembled = assembler.transform(binarizedDF)

In [13]:
assembled.select('relative_humidity_3pm','label').show()

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
|    76.74000000000046|  1.0|
|   33.930000000000256|  1.0|
|   21.385656725200974|  0.0|
|    74.92000000000041|  1.0|
|   24.030000000000427|  0.0|
|     68.0500000000012|  1.0|
|    32.13000000000024|  1.0|
|     79.0900000000002|  1.0|
|    58.43000000000119|  1.0|
|   27.990000000000173|  1.0|
|   24.369999999999948|  0.0|
|   14.801705962979918|  0.0|
|    20.75568332171184|  0.0|
|    45.87000000000005|  1.0|
|    7.740000000000088|  0.0|
|   14.649909361535952|  0.0|
+---------------------+-----+
only showing top 20 rows



In [14]:
#Dividiendo el data en train and test

(trainingData, testData) = assembled.randomSplit([0.8,0.2], seed = 13234)

In [15]:
trainingData.count(), testData.count()

(846, 218)

In [16]:
dt = DecisionTreeClassifier(labelCol = 'label', 
                           featuresCol = 'features',
                           maxDepth = 5,
                           minInstancesPerNode = 20,
                           impurity = 'gini')

In [17]:
pipeline = Pipeline(stages=[dt])

In [18]:
model = pipeline.fit(trainingData)

In [19]:
#Making predictions
predictions = model.transform(testData)

In [20]:
predictions.select('prediction', 'label').show(20)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  1.0|
|       0.0|  1.0|
|       0.0|  1.0|
|       1.0|  1.0|
+----------+-----+
only showing top 20 rows



In [21]:
#Guardando las predicciones, tiene problemas en windows
#predictions.select("prediction", "label").write.save(path= 'predictions.csv', format = "com.databricks.spark.csv", header = 'true')

In [22]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics


In [23]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol = "prediction", metricName = 'accuracy')

In [24]:
#Accuracy
evaluator.evaluate(predictions)

0.7844036697247706

In [33]:
# The MulticlassMetrics class can be used to generate a confusion matrix of our classifier model. However, unlike MulticlassClassificationEvaluator,
#MulticlassMetrics works with RDDs of numbers and not DataFrames, so we need to convert our predictions DataFrame into an RDD.
#If we use the rdd attribute of predictions, we see this is an RDD of Row
#predictions.rdd.take(2)

In [42]:
#Instead, we can map the RDD to tuple to get an RDD of numbers:

#predictions.rdd.map(tuple).take(2)

In [43]:
#Let's create an instance of MulticlassMetrics with this RDD:
#metrics = MulticlassMetrics(predictions.rdd.map(tuple))

In [44]:
#The confusionMatrix() function returns a Spark Matrix, which we can convert to a Python Numpy array, and transpose to view
#metrics.confusionMatrix().toArray().transpose()