In [26]:
# Section must be included at the beginning of each new notebook. Remember to change the app name. 
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.appName('demo2').getOrCreate()

In [27]:
df = spark.read.csv("datadummies.csv",inferSchema=True,header=True)

In [28]:
df.columns

['Visitors',
 'UniquePageviews',
 'Date',
 'Time',
 'DayOfWeek',
 'Month',
 'Season',
 'Weekend',
 'SchoolHoliday',
 'PublicHoliday',
 'Cruiseship',
 'EventInt',
 'EventExt',
 'DayMon',
 'DayTue',
 'DayWed',
 'DayThu',
 'DayFri',
 'DaySat',
 'DaySun',
 'MonthJan',
 'MonthFeb',
 'MonthMar',
 'MonthApr',
 'MonthMay',
 'MonthJun',
 'MonthJul',
 'MonthAug',
 'MonthSep',
 'MonthOct',
 'MonthNov',
 'MonthDec']

In [30]:
#featureIdx = VectorIndexer(inputCol=['UniquePageviews', 'Weekend', 'Month', 'SchoolHoliday', 'PublicHoliday', 'Cruiseship', 'EventInt', 'EventExt'], outputCol='indexedFeatures', maxCategories=12).fit(df)


strindexer = StringIndexer(inputCol='Month', outputCol='MonthIdx')
dfidx = strindexer.fit(df).transform(df)
dfidx.show(2)

+--------+----------------+--------------------+----+---------+-----+------+-------+-------------+-------------+----------+--------+--------+------+------+------+------+------+------+------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|Visitors| UniquePageviews|                Date|Time|DayOfWeek|Month|Season|Weekend|SchoolHoliday|PublicHoliday|Cruiseship|EventInt|EventExt|DayMon|DayTue|DayWed|DayThu|DayFri|DaySat|DaySun|MonthJan|MonthFeb|MonthMar|MonthApr|MonthMay|MonthJun|MonthJul|MonthAug|MonthSep|MonthOct|MonthNov|MonthDec|MonthIdx|
+--------+----------------+--------------------+----+---------+-----+------+-------+-------------+-------------+----------+--------+--------+------+------+------+------+------+------+------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+
|    4364|           910.0|2016-03-24 00:00:...|   1|      Thu|  Mar|Autumn|

In [37]:
df_va = VectorAssembler(inputCols=['UniquePageviews', 'Weekend', 'MonthIdx', 'SchoolHoliday', 'PublicHoliday', 'Cruiseship', 'EventInt', 'EventExt'], outputCol='indexedFeatures').transform(dfidx)
finaldf = df_va.select(['indexedFeatures', 'Visitors'])

In [38]:
finaldf.head(1)

[Row(indexedFeatures=SparseVector(8, {0: 910.0}), Visitors=4364)]

In [39]:
finaldf.show(10)

+--------------------+--------+
|     indexedFeatures|Visitors|
+--------------------+--------+
|     (8,[0],[910.0])|    4364|
|(8,[0,4],[970.571...|    8116|
|(8,[0,1],[972.714...|    9268|
|(8,[0,1],[1009.28...|    8360|
|(8,[0,4],[1033.28...|    6172|
|(8,[0,5],[1013.28...|    4421|
|(8,[0],[971.85714...|    3260|
|(8,[0],[916.71428...|    3460|
|(8,[0,2,5],[855.4...|    4542|
|(8,[0,1,2],[858.8...|    5444|
+--------------------+--------+
only showing top 10 rows



In [42]:
(trainingData, testData) = finaldf.randomSplit([0.7, 0.3])
rf = RandomForestRegressor(featuresCol="indexedFeatures", labelCol="Visitors")
pipeline = Pipeline(stages=[rf])

In [43]:
# This creates a pipeline model (transformer)
model = pipeline.fit(trainingData)

In [46]:
# Use the pipeline model (transformer) to transform the test dataset addint predictions
predictions = model.transform(testData)
predictions.columns

# Separate new columns
predictions.select("prediction", "Visitors", "indexedFeatures").show(5)

+------------------+--------+--------------------+
|        prediction|Visitors|     indexedFeatures|
+------------------+--------+--------------------+
| 3906.236848540689|    3762|(8,[0],[782.42857...|
| 3999.119407159606|    4295|(8,[0],[918.42857...|
|  4338.24869607774|    3676|     (8,[0],[965.0])|
|4367.6997487762055|    3650|(8,[0],[1023.7142...|
|4472.2305566846535|    3267|(8,[0],[1101.5714...|
+------------------+--------+--------------------+
only showing top 5 rows



In [48]:
# Evaluate the model based on the predictions
evaluator = RegressionEvaluator(
    labelCol="Visitors", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 765.505


In [50]:
rfModel = model.stages[0]
print(rfModel)

RandomForestRegressionModel (uid=rfr_a7108f13a682) with 20 trees
