## Bike Sharing Dataset

Data: This dataset contains bike rental info from 2011 and 2012 in Capital bikeshare system. [Source UCI Machine Learning Repo] (http://archive.ics.uci.edu/ml/datasets/Bike+Sharing+Dataset)

In [8]:
df = sqlContext.read.format('csv').option("header", 'true').load("wasb://datasets@ocpdemostorageaccount.blob.core.windows.net/hour.csv")
df.cache()

DataFrame[instant: string, dteday: string, season: string, yr: string, mnth: string, hr: string, holiday: string, weekday: string, workingday: string, weathersit: string, temp: string, atemp: string, hum: string, windspeed: string, casual: string, registered: string, cnt: string]

### DataSet info

- instant: record index
- dteday : date
- season : season (1:springer, 2:summer, 3:fall, 4:winter)
- yr : year (0: 2011, 1:2012)
- mnth : month ( 1 to 12)
- hr : hour (0 to 23)
- holiday : weather day is holiday or not (extracted from [Web Link])
- weekday : day of the week
- workingday : if day is neither weekend nor holiday is 1, otherwise is 0.
+ weathersit :
- 1: Clear, Few clouds, Partly cloudy, Partly cloudy
- 2: Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist
- 3: Light Snow, Light Rain + Thunderstorm + Scattered clouds, Light Rain + Scattered clouds
- 4: Heavy Rain + Ice Pallets + Thunderstorm + Mist, Snow + Fog
- temp : Normalized temperature in Celsius. The values are derived via (t-t_min)/(t_max-t_min), t_min=-8, t_max=+39 (only in hourly scale)
- atemp: Normalized feeling temperature in Celsius. The values are derived via (t-t_min)/(t_max-t_min), t_min=-16, t_max=+50 (only in hourly scale)
- hum: Normalized humidity. The values are divided to 100 (max)
- windspeed: Normalized wind speed. The values are divided to 67 (max)
- casual: count of casual users
- registered: count of registered users
- cnt: count of total rental bikes including both casual and registered

In [10]:
print "Our dataset has %d rows." % df.count()


Our dataset has 17379 rows.

In [11]:
df.show()

+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|instant|    dteday|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|casual|registered|cnt|
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|      1|2011-01-01|     1|  0|   1|  0|      0|      6|         0|         1|0.24|0.2879|0.81|        0|     3|        13| 16|
|      2|2011-01-01|     1|  0|   1|  1|      0|      6|         0|         1|0.22|0.2727| 0.8|        0|     8|        32| 40|
|      3|2011-01-01|     1|  0|   1|  2|      0|      6|         0|         1|0.22|0.2727| 0.8|        0|     5|        27| 32|
|      4|2011-01-01|     1|  0|   1|  3|      0|      6|         0|         1|0.24|0.2879|0.75|        0|     3|        10| 13|
|      5|2011-01-01|     1|  0|   1|  4|      0|      6|         0|         1|0.24|0.2879|0.75|        0

### Preprocess Data

Predict: cnt
Cleaning:
    - Remove casual, registered  as cnt is sum of these two
    - dteday 
    - instant
    

In [12]:
df = df.drop("instant").drop("dteday").drop("casual").drop("registered")
df.show()

+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+---+
|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|cnt|
+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+---+
|     1|  0|   1|  0|      0|      6|         0|         1|0.24|0.2879|0.81|        0| 16|
|     1|  0|   1|  1|      0|      6|         0|         1|0.22|0.2727| 0.8|        0| 40|
|     1|  0|   1|  2|      0|      6|         0|         1|0.22|0.2727| 0.8|        0| 32|
|     1|  0|   1|  3|      0|      6|         0|         1|0.24|0.2879|0.75|        0| 13|
|     1|  0|   1|  4|      0|      6|         0|         1|0.24|0.2879|0.75|        0|  1|
|     1|  0|   1|  5|      0|      6|         0|         2|0.24|0.2576|0.75|   0.0896|  1|
|     1|  0|   1|  6|      0|      6|         0|         1|0.22|0.2727| 0.8|        0|  2|
|     1|  0|   1|  7|      0|      6|         0|         1| 0.2|0.2576|0.86|        0|  3|

In [13]:
df.printSchema()

root
 |-- season: string (nullable = true)
 |-- yr: string (nullable = true)
 |-- mnth: string (nullable = true)
 |-- hr: string (nullable = true)
 |-- holiday: string (nullable = true)
 |-- weekday: string (nullable = true)
 |-- workingday: string (nullable = true)
 |-- weathersit: string (nullable = true)
 |-- temp: string (nullable = true)
 |-- atemp: string (nullable = true)
 |-- hum: string (nullable = true)
 |-- windspeed: string (nullable = true)
 |-- cnt: string (nullable = true)

In [15]:
#Converting all feils to double

from pyspark.sql.functions import col  # for indicating a column using a string in the line below
df = df.select([col(c).cast("double").alias(c) for c in df.columns])
df.printSchema()

root
 |-- season: double (nullable = true)
 |-- yr: double (nullable = true)
 |-- mnth: double (nullable = true)
 |-- hr: double (nullable = true)
 |-- holiday: double (nullable = true)
 |-- weekday: double (nullable = true)
 |-- workingday: double (nullable = true)
 |-- weathersit: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- cnt: double (nullable = true)

In [17]:
#Split data for training and testing
train, test = df.randomSplit([0.7, 0.3])
print "We have %d training examples and %d test examples." % (train.count(), test.count())

We have 12040 training examples and 5339 test examples.

In [18]:
#Using Grandient boosting trees (GBT) for prediction
# Features
#    yr, season, holiday, workingday, weathersit

from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = df.columns
featuresCols.remove('cnt')
#Concat all cols into single column
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
# This identifies categorical features and indexes them.
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

In [19]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(labelCol="cnt")

In [20]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
#  - maxDepth: max depth of each decision tree  
#  - maxIter: iterations, i.e., number of trees in each GBT 
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 100])\
  .build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())
# Declare the CrossValidator, which runs model tuning
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])
pipelineModel = pipeline.fit(train)
predictions = pipelineModel.transform(test)
predictions.select("cnt", "prediction", *featuresCols).show()

In [None]:
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse

In [None]:
Plot predictions against hr 