In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession

In [3]:
#read weather data
spark = SparkSession.builder.getOrCreate()
data = spark.read.load('weatherdata.csv', format = 'csv', header = True, delimiter = ',')
#convert some columns's type string to numeric
data = data.withColumn('dewPoint', data['dewPoint'] - 0).withColumn('humidity', data['humidity'] - 0).\
withColumn('windSpeed', data['windSpeed'] - 0).withColumn('cloudCover', data['cloudCover'] - 0).\
withColumn('temperatureMin', data['temperatureMin'] - 0).withColumn('temperatureMax', data['temperatureMax'] - 0).\
withColumn('uvIndex', data['uvIndex'] - 0).withColumn('label', data['precipType'] - 0)
#data.show(5)

In [4]:
#get some column to vector
assembler = VectorAssembler(inputCols = ['dewPoint', 'humidity', 'windSpeed', 'cloudCover', \
                                         'temperatureMin', 'temperatureMax', 'uvIndex'], outputCol = 'features')
data = assembler.transform(data)

labelIndexer = StringIndexer(inputCol = 'label', outputCol = 'indexedLabel').fit(data)
featureIndexer = VectorIndexer(inputCol = 'features', outputCol = 'indexedFeatures', maxCategories = 4).fit(data)

In [5]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

rfModel = model.stages[2]
print(rfModel)  # summary only
print("Random Forest - Test Accuracy = %g" % (accuracy))
print("Random Forest - Test Error = %g" % (1.0 - accuracy))

+--------------+-----+--------------------+
|predictedLabel|label|            features|
+--------------+-----+--------------------+
|           1.0|  1.0|[21.85,0.78,2.33,...|
|           1.0|  1.0|[20.27,0.71,2.64,...|
|           1.0|  1.0|[21.54,0.71,4.02,...|
|           1.0|  1.0|[22.84,0.78,4.57,...|
|           1.0|  1.0|[21.15,0.72,2.4,0...|
+--------------+-----+--------------------+
only showing top 5 rows

RandomForestClassificationModel (uid=RandomForestClassifier_40cc888494c82667cf54) with 10 trees
Random Forest - Test Accuracy = 0.758356
Random Forest - Test Error = 0.241644
