In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('lrexample').getOrCreate()

21/12/08 22:22:34 WARN Utils: Your hostname, ortiz-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
21/12/08 22:22:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/08 22:22:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/12/08 22:22:42 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [10]:
df = spark.read.csv("hour.csv", header="true", inferSchema="true")
df.cache()
df.show()

21/12/08 22:32:59 WARN CacheManager: Asked to cache already cached data.        


+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|instant|    dteday|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|casual|registered|cnt|
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|      1|2011-01-01|     1|  0|   1|  0|      0|      6|         0|         1|0.24|0.2879|0.81|      0.0|     3|        13| 16|
|      2|2011-01-01|     1|  0|   1|  1|      0|      6|         0|         1|0.22|0.2727| 0.8|      0.0|     8|        32| 40|
|      3|2011-01-01|     1|  0|   1|  2|      0|      6|         0|         1|0.22|0.2727| 0.8|      0.0|     5|        27| 32|
|      4|2011-01-01|     1|  0|   1|  3|      0|      6|         0|         1|0.24|0.2879|0.75|      0.0|     3|        10| 13|
|      5|2011-01-01|     1|  0|   1|  4|      0|      6|         0|         1|0.24|0.2879|0.75|      0.0

In [11]:
df = df.drop("instant").drop("dteday").drop("casual").drop("registered")
df.printSchema()

root
 |-- season: integer (nullable = true)
 |-- yr: integer (nullable = true)
 |-- mnth: integer (nullable = true)
 |-- hr: integer (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- weathersit: integer (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: integer (nullable = true)



In [12]:
from pyspark.sql.functions import col  
df = df.select([col(c).cast("double").alias(c) for c in df.columns])
df.printSchema()

root
 |-- season: double (nullable = true)
 |-- yr: double (nullable = true)
 |-- mnth: double (nullable = true)
 |-- hr: double (nullable = true)
 |-- holiday: double (nullable = true)
 |-- weekday: double (nullable = true)
 |-- workingday: double (nullable = true)
 |-- weathersit: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: double (nullable = true)



In [13]:
train, test = df.randomSplit([0.7, 0.3])
print("We have %d training examples and %d test examples." % (train.count(), test.count()))



We have 12178 training examples and 5201 test examples.


                                                                                

In [14]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = df.columns
featuresCols.remove('cnt')
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

In [15]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(labelCol="cnt")

In [16]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 100])\
  .build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [17]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])

In [18]:
pipelineModel = pipeline.fit(train)

                                                                                

In [21]:
predictions = pipelineModel.transform(test)
(predictions.select("cnt", "prediction", *featuresCols)).show()

+----+-------------------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+
| cnt|         prediction|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|
+----+-------------------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+
|25.0|  23.75291071441593|   1.0|0.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0| 0.1|0.0758|0.42|   0.3881|
|33.0| 26.899140114633568|   1.0|0.0| 1.0|0.0|    0.0|    0.0|       0.0|       1.0|0.16|0.1818| 0.8|   0.1045|
|12.0|  7.965393284335664|   1.0|0.0| 1.0|0.0|    0.0|    2.0|       1.0|       1.0|0.14|0.1667|0.59|   0.1045|
| 5.0| 12.920733062770417|   1.0|0.0| 1.0|0.0|    0.0|    2.0|       1.0|       1.0|0.16|0.1818|0.55|   0.1045|
| 6.0|  10.52987961420547|   1.0|0.0| 1.0|0.0|    0.0|    3.0|       1.0|       1.0| 0.2|0.2576|0.64|      0.0|
| 7.0| 5.8753301109575276|   1.0|0.0| 1.0|0.0|    0.0|    3.0|       1.0|       2.0|0.16| 0.197|0.86|   

In [22]:
rmse = evaluator.evaluate(predictions)
print("RMSE on our test set: %g" % rmse)

[Stage 5702:>                                                       (0 + 1) / 1]

RMSE on our test set: 44.7291


                                                                                