# Bicycle Sharing Demand Forcasting

## Overview
### I. Data Preprocessing and Feature Extraction
- Add Weekday Column
- Convert Categorical Variables
- Check Missing Values
- Extract Date and Time Features

### II. Model Development
- Linear Regression
- Random Forest
- Gradient-Boosted Tree Regression
- Best Model

### III. Predict New Data

In [1]:
import pandas as pd
import calendar
from pyspark.sql.functions import col, when, year, month, dayofmonth, hour, date_format
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

## I. Data Transformation and Feature Extraction

In [2]:
# Read data in Spark
df = spark.read.load('../project-bike-sharing-demand-forecast/data/train.csv',
                     format='csv', inferSchema=True, header=True)
df.show(5)

+--------------------+------+-------+----------+-------+----+------+--------+---------+------+----------+-----+
|            datetime|season|holiday|workingday|weather|temp| atemp|humidity|windspeed|casual|registered|count|
+--------------------+------+-------+----------+-------+----+------+--------+---------+------+----------+-----+
|2011-01-01 00:00:...|     1|      0|         0|      1|9.84|14.395|      81|      0.0|     3|        13|   16|
|2011-01-01 01:00:...|     1|      0|         0|      1|9.02|13.635|      80|      0.0|     8|        32|   40|
|2011-01-01 02:00:...|     1|      0|         0|      1|9.02|13.635|      80|      0.0|     5|        27|   32|
|2011-01-01 03:00:...|     1|      0|         0|      1|9.84|14.395|      75|      0.0|     3|        10|   13|
|2011-01-01 04:00:...|     1|      0|         0|      1|9.84|14.395|      75|      0.0|     0|         1|    1|
+--------------------+------+-------+----------+-------+----+------+--------+---------+------+----------

In [3]:
# Summary
df.toPandas().describe().transpose()

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
season,10886.0,2.506614,1.116174,1.0,2.0,3.0,4.0,4.0
holiday,10886.0,0.028569,0.166599,0.0,0.0,0.0,0.0,1.0
workingday,10886.0,0.680875,0.466159,0.0,0.0,1.0,1.0,1.0
weather,10886.0,1.418427,0.633839,1.0,1.0,1.0,2.0,4.0
temp,10886.0,20.23086,7.79159,0.82,13.94,20.5,26.24,41.0
atemp,10886.0,23.655084,8.474601,0.76,16.665,24.24,31.06,45.455
humidity,10886.0,61.88646,19.245033,0.0,47.0,62.0,77.0,100.0
windspeed,10886.0,12.799395,8.164537,0.0,7.0015,12.998,16.9979,56.9969
casual,10886.0,36.021955,49.960477,0.0,4.0,17.0,49.0,367.0
registered,10886.0,155.552177,151.039033,0.0,36.0,118.0,222.0,886.0


In [4]:
# Data type
df.printSchema()

root
 |-- datetime: timestamp (nullable = true)
 |-- season: integer (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- weather: integer (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- casual: integer (nullable = true)
 |-- registered: integer (nullable = true)
 |-- count: integer (nullable = true)



### Add Weekday Column

In [5]:
df_weekday = df.select('datetime', date_format('datetime', 'u').\
                        alias('weekday')).withColumn('datetime2', df.datetime).\
                        drop('datetime')
df_weekday.show(5)
df = df.join(df_weekday, df.datetime == df_weekday.datetime2).drop('datetime2')
df.show(5)

+-------+--------------------+
|weekday|           datetime2|
+-------+--------------------+
|      6|2011-01-01 00:00:...|
|      6|2011-01-01 01:00:...|
|      6|2011-01-01 02:00:...|
|      6|2011-01-01 03:00:...|
|      6|2011-01-01 04:00:...|
+-------+--------------------+
only showing top 5 rows

+--------------------+------+-------+----------+-------+----+------+--------+---------+------+----------+-----+-------+
|            datetime|season|holiday|workingday|weather|temp| atemp|humidity|windspeed|casual|registered|count|weekday|
+--------------------+------+-------+----------+-------+----+------+--------+---------+------+----------+-----+-------+
|2011-01-01 00:00:...|     1|      0|         0|      1|9.84|14.395|      81|      0.0|     3|        13|   16|      6|
|2011-01-01 01:00:...|     1|      0|         0|      1|9.02|13.635|      80|      0.0|     8|        32|   40|      6|
|2011-01-01 02:00:...|     1|      0|         0|      1|9.02|13.635|      80|      0.0|     5|  

In [6]:
df.count()

10886

In [7]:
df.printSchema()

root
 |-- datetime: timestamp (nullable = true)
 |-- season: integer (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- weather: integer (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- casual: integer (nullable = true)
 |-- registered: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- weekday: string (nullable = true)



### Convert Categorical Variables
In spark 3.0, OneHotEncoder could transform several columns in one transformer.

In [8]:
seasonIndexer = StringIndexer(inputCol='season', outputCol='seasonIndex')
seasonEncoder = OneHotEncoder(inputCol='seasonIndex', outputCol='seasonVec')

holidayIndexer = StringIndexer(inputCol='holiday', outputCol='holidayIndex')

workingdayIndexer = StringIndexer(inputCol='workingday', outputCol='workingdayIndex')

weatherIndexer = StringIndexer(inputCol='weather', outputCol='weatherIndex')
weatherEncoder = OneHotEncoder(inputCol='weatherIndex', outputCol='weatherVec')

weekdayIndexer = StringIndexer(inputCol='weekday', outputCol='weekdayIndex')
weekdayEncoder = OneHotEncoder(inputCol='weekdayIndex', outputCol='weekdayVec')

pipeline = Pipeline(stages=[seasonIndexer, seasonEncoder, 
                            holidayIndexer, workingdayIndexer, 
                            weatherIndexer, weatherEncoder, 
                            weekdayIndexer, weekdayEncoder])
preprocess = pipeline.fit(df)
df_transformed = preprocess.transform(df)
df_transformed.show(5)

+--------------------+------+-------+----------+-------+----+------+--------+---------+------+----------+-----+-------+-----------+---------+------------+---------------+------------+-------------+------------+-------------+
|            datetime|season|holiday|workingday|weather|temp| atemp|humidity|windspeed|casual|registered|count|weekday|seasonIndex|seasonVec|holidayIndex|workingdayIndex|weatherIndex|   weatherVec|weekdayIndex|   weekdayVec|
+--------------------+------+-------+----------+-------+----+------+--------+---------+------+----------+-----+-------+-----------+---------+------------+---------------+------------+-------------+------------+-------------+
|2011-01-01 00:00:...|     1|      0|         0|      1|9.84|14.395|      81|      0.0|     3|        13|   16|      6|        3.0|(3,[],[])|         0.0|            1.0|         0.0|(3,[0],[1.0])|         0.0|(6,[0],[1.0])|
|2011-01-01 01:00:...|     1|      0|         0|      1|9.02|13.635|      80|      0.0|     8|      

In [9]:
df_transformed.printSchema()

root
 |-- datetime: timestamp (nullable = true)
 |-- season: integer (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- weather: integer (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- casual: integer (nullable = true)
 |-- registered: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- seasonIndex: double (nullable = true)
 |-- seasonVec: vector (nullable = true)
 |-- holidayIndex: double (nullable = true)
 |-- workingdayIndex: double (nullable = true)
 |-- weatherIndex: double (nullable = true)
 |-- weatherVec: vector (nullable = true)
 |-- weekdayIndex: double (nullable = true)
 |-- weekdayVec: vector (nullable = true)



### Check Missing Values

In [10]:
df_transformed.toPandas().isnull().sum()

datetime           0
season             0
holiday            0
workingday         0
weather            0
temp               0
atemp              0
humidity           0
windspeed          0
casual             0
registered         0
count              0
weekday            0
seasonIndex        0
seasonVec          0
holidayIndex       0
workingdayIndex    0
weatherIndex       0
weatherVec         0
weekdayIndex       0
weekdayVec         0
dtype: int64

### Extract Date and Time Features

In [11]:
df_transformed = df_transformed.withColumn('year', year(df_transformed.datetime))
df_transformed = df_transformed.withColumn('month', month(df_transformed.datetime))
df_transformed = df_transformed.withColumn('day', dayofmonth(df_transformed.datetime))
df_transformed = df_transformed.withColumn('hour', hour(df_transformed.datetime))
df_transformed.show(5)

+--------------------+------+-------+----------+-------+----+------+--------+---------+------+----------+-----+-------+-----------+---------+------------+---------------+------------+-------------+------------+-------------+----+-----+---+----+
|            datetime|season|holiday|workingday|weather|temp| atemp|humidity|windspeed|casual|registered|count|weekday|seasonIndex|seasonVec|holidayIndex|workingdayIndex|weatherIndex|   weatherVec|weekdayIndex|   weekdayVec|year|month|day|hour|
+--------------------+------+-------+----------+-------+----+------+--------+---------+------+----------+-----+-------+-----------+---------+------------+---------------+------------+-------------+------------+-------------+----+-----+---+----+
|2011-01-01 00:00:...|     1|      0|         0|      1|9.84|14.395|      81|      0.0|     3|        13|   16|      6|        3.0|(3,[],[])|         0.0|            1.0|         0.0|(3,[0],[1.0])|         0.0|(6,[0],[1.0])|2011|    1|  1|   0|
|2011-01-01 01:00:..

In [12]:
# Counts by month and hour
df_transformed.groupBy('month').count().orderBy('month').show()
df_transformed.groupBy('hour').count().orderBy('hour').show()

+-----+-----+
|month|count|
+-----+-----+
|    1|  884|
|    2|  901|
|    3|  901|
|    4|  909|
|    5|  912|
|    6|  912|
|    7|  912|
|    8|  912|
|    9|  909|
|   10|  911|
|   11|  911|
|   12|  912|
+-----+-----+

+----+-----+
|hour|count|
+----+-----+
|   0|  455|
|   1|  454|
|   2|  448|
|   3|  433|
|   4|  442|
|   5|  452|
|   6|  455|
|   7|  455|
|   8|  455|
|   9|  455|
|  10|  455|
|  11|  455|
|  12|  456|
|  13|  456|
|  14|  456|
|  15|  456|
|  16|  456|
|  17|  456|
|  18|  456|
|  19|  456|
+----+-----+
only showing top 20 rows



## II. Model Development

In [13]:
df_transformed.columns

['datetime',
 'season',
 'holiday',
 'workingday',
 'weather',
 'temp',
 'atemp',
 'humidity',
 'windspeed',
 'casual',
 'registered',
 'count',
 'weekday',
 'seasonIndex',
 'seasonVec',
 'holidayIndex',
 'workingdayIndex',
 'weatherIndex',
 'weatherVec',
 'weekdayIndex',
 'weekdayVec',
 'year',
 'month',
 'day',
 'hour']

In [14]:
# Split the dataset into train and train_test
vectorAssembler = VectorAssembler(inputCols = ['temp', 'atemp', 'humidity', 'windspeed',
                                               'holidayIndex', 'workingdayIndex', 
                                               'seasonVec', 'weatherVec', 'weekdayVec',
                                               'year', 'month', 'day', 'hour'],
                                  outputCol = 'features')

df_v = vectorAssembler.transform(df_transformed).select(['features', 'count'])
trainingData, testData = df_v.randomSplit([0.7, 0.3], seed=123)
trainingData.show(5)

+--------------------+-----+
|            features|count|
+--------------------+-----+
|(22,[0,1,2,3,4,5,...|   51|
|(22,[0,1,2,3,4,5,...|   15|
|(22,[0,1,2,3,4,5,...|   37|
|(22,[0,1,2,3,4,5,...|   59|
|(22,[0,1,2,3,4,5,...|   10|
+--------------------+-----+
only showing top 5 rows



In [18]:
trainingData.select('features').collect()[:5]

[Row(features=SparseVector(22, {0: 15.58, 1: 19.695, 2: 76.0, 3: 19.0012, 4: 1.0, 5: 1.0, 6: 1.0, 9: 1.0, 15: 1.0, 18: 2012.0, 19: 10.0, 20: 8.0})),
 Row(features=SparseVector(22, {0: 13.94, 1: 16.665, 2: 81.0, 3: 12.998, 4: 1.0, 5: 1.0, 6: 1.0, 9: 1.0, 15: 1.0, 18: 2012.0, 19: 10.0, 20: 8.0, 21: 2.0})),
 Row(features=SparseVector(22, {0: 13.94, 1: 16.665, 2: 87.0, 3: 11.0014, 4: 1.0, 5: 1.0, 6: 1.0, 9: 1.0, 15: 1.0, 18: 2012.0, 19: 10.0, 20: 8.0, 21: 1.0})),
 Row(features=SparseVector(22, {0: 13.94, 1: 17.425, 2: 76.0, 3: 7.0015, 4: 1.0, 5: 1.0, 6: 1.0, 9: 1.0, 15: 1.0, 18: 2012.0, 19: 10.0, 20: 8.0, 21: 6.0})),
 Row(features=SparseVector(22, {0: 13.94, 1: 17.425, 2: 81.0, 3: 7.0015, 4: 1.0, 5: 1.0, 6: 1.0, 9: 1.0, 15: 1.0, 18: 2012.0, 19: 10.0, 20: 8.0, 21: 3.0}))]

### Linear Regression

In [19]:
lr = LinearRegression(featuresCol='features', labelCol='count', maxIter=10, regParam=0.3, elasticNetParam=0.8)
model = lr.fit(trainingData)

trainingSummary = model.summary
print("R2 on training data: %f" % trainingSummary.r2)
print("RMSE on training data: %f" % trainingSummary.rootMeanSquaredError)

R2 on training data: 0.396315
RMSE on training data: 140.887702


In [20]:
predictions = model.transform(testData)
predictions.select("prediction", "count", "features").show(5)

lr_evaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'count', metricName = 'r2')
print('R2 on test data = %g' % lr_evaluator.evaluate(predictions))

+------------------+-----+--------------------+
|        prediction|count|            features|
+------------------+-----+--------------------+
|138.49513314547949|   49|(22,[0,1,2,3,4,5,...|
|101.19490156535176|    7|(22,[0,1,2,3,4,5,...|
|134.53935378280585|   19|(22,[0,1,2,3,4,5,...|
|141.26062935087248|   71|(22,[0,1,2,3,4,5,...|
|139.45178153016604|   20|(22,[0,1,2,3,4,5,...|
+------------------+-----+--------------------+
only showing top 5 rows

R2 on test data = 0.399798


In [21]:
lr_evaluator = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'count', metricName = 'rmse')
print('RMSE on test data = %g' % lr_evaluator.evaluate(predictions))

RMSE on test data = 139.947


In [22]:
# Another way to get RMSE
test_result = model.evaluate(testData)
print("RMSE: %g" % test_result.rootMeanSquaredError)

RMSE: 139.947


### Random Forest


In [23]:
rf = RandomForestRegressor(featuresCol='features', labelCol = 'count')
model = rf.fit(trainingData)
# trainingSummary = model.summary
# print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
# print("r2: %f" % trainingSummary.r2)

predictions = model.transform(testData)
predictions.select("prediction", "count", "features").show(5)

evaluator = RegressionEvaluator(labelCol="count", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R2 on test data = %g" % r2)

evaluator = RegressionEvaluator(labelCol="count", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("RMSE on test data = %g" % rmse)

+------------------+-----+--------------------+
|        prediction|count|            features|
+------------------+-----+--------------------+
| 38.01774180400779|   49|(22,[0,1,2,3,4,5,...|
|31.319701589655875|    7|(22,[0,1,2,3,4,5,...|
|31.319701589655875|   19|(22,[0,1,2,3,4,5,...|
|50.693602259880905|   71|(22,[0,1,2,3,4,5,...|
|29.823022653806664|   20|(22,[0,1,2,3,4,5,...|
+------------------+-----+--------------------+
only showing top 5 rows

R2 on test data = 0.627869
RMSE on test data = 110.195


### Gradient-Boosted Tree Regression

In [24]:
%%time
gbt = GBTRegressor(featuresCol='features', labelCol = 'count', maxIter=100)
model = gbt.fit(trainingData)

predictions = model.transform(testData)
predictions.select("prediction", "count", "features").show(5)

evaluator = RegressionEvaluator(labelCol="count", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R2 on test data = %g" % r2)

evaluator = RegressionEvaluator(labelCol="count", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("RMSE on test data = %g" % rmse)

+------------------+-----+--------------------+
|        prediction|count|            features|
+------------------+-----+--------------------+
|150.92764873664677|   49|(22,[0,1,2,3,4,5,...|
| 4.409337127145523|    7|(22,[0,1,2,3,4,5,...|
| 16.66925700791984|   19|(22,[0,1,2,3,4,5,...|
| 41.57749804093572|   71|(22,[0,1,2,3,4,5,...|
|44.750680504485594|   20|(22,[0,1,2,3,4,5,...|
+------------------+-----+--------------------+
only showing top 5 rows

R2 on test data = 0.940077
RMSE on test data = 44.2192
Wall time: 3min 1s


### Best model
It seems gradient-boosted tree regression could generate much better prediction than linear regression or random forest.

## III. Predict New Data

In [25]:
df_submit = spark.read.load('../project-bike-sharing-demand-forecast/data/test.csv',
                            format='csv', inferSchema=True, header=True)

df_weekday = df_submit.select('datetime', date_format('datetime', 'u').\
                              alias('weekday')).withColumn('datetime2', df_submit.datetime).\
                              drop('datetime')
df_submit = df_submit.join(df_weekday, df_submit.datetime == df_weekday.datetime2).drop('datetime2')
df_submit.show(5)

df_submit = preprocess.transform(df_submit)

df_submit = df_submit.withColumn('year', year(df_submit.datetime))
df_submit = df_submit.withColumn('month', month(df_submit.datetime))
df_submit = df_submit.withColumn('day', dayofmonth(df_submit.datetime))
df_submit = df_submit.withColumn('hour', hour(df_submit.datetime))

df_submit = vectorAssembler.transform(df_submit).select(['datetime', 'features'])
df_submit.show(5)

+--------------------+------+-------+----------+-------+-----+------+--------+---------+-------+
|            datetime|season|holiday|workingday|weather| temp| atemp|humidity|windspeed|weekday|
+--------------------+------+-------+----------+-------+-----+------+--------+---------+-------+
|2011-01-20 00:00:...|     1|      0|         1|      1|10.66|11.365|      56|  26.0027|      4|
|2011-01-20 01:00:...|     1|      0|         1|      1|10.66|13.635|      56|      0.0|      4|
|2011-01-20 02:00:...|     1|      0|         1|      1|10.66|13.635|      56|      0.0|      4|
|2011-01-20 03:00:...|     1|      0|         1|      1|10.66| 12.88|      56|  11.0014|      4|
|2011-01-20 04:00:...|     1|      0|         1|      1|10.66| 12.88|      56|  11.0014|      4|
+--------------------+------+-------+----------+-------+-----+------+--------+---------+-------+
only showing top 5 rows

+--------------------+--------------------+
|            datetime|            features|
+-------------

In [26]:
df_submit.count()

6493

In [27]:
%%time
gbt = GBTRegressor(featuresCol='features', labelCol = 'count', maxIter=100)
model = gbt.fit(df_v)

Wall time: 2min 59s


In [28]:
predictions = model.transform(df_submit)
predictions.select('datetime', 'prediction', 'features').show(5)

+--------------------+------------------+--------------------+
|            datetime|        prediction|            features|
+--------------------+------------------+--------------------+
|2011-01-20 00:00:...|19.780597824939615|(22,[0,1,2,3,9,14...|
|2011-01-20 01:00:...|-5.383336840951198|(22,[0,1,2,9,14,1...|
|2011-01-20 02:00:...|-10.46068656792588|(22,[0,1,2,9,14,1...|
|2011-01-20 03:00:...|-7.311465358099026|(22,[0,1,2,3,9,14...|
|2011-01-20 04:00:...|-7.005824313567958|(22,[0,1,2,3,9,14...|
+--------------------+------------------+--------------------+
only showing top 5 rows



In [29]:
new_prediction = predictions.withColumn('count', when(predictions.prediction < 0, 0).otherwise(predictions.prediction))
new_prediction.show(5)
new_prediction.select(['datetime', 'count']).toPandas().to_csv('../project-bike-sharing-demand-forecast/data/submission.csv', index=False)
# 0.72496 Rank 2559

+--------------------+--------------------+------------------+------------------+
|            datetime|            features|        prediction|             count|
+--------------------+--------------------+------------------+------------------+
|2011-01-20 00:00:...|(22,[0,1,2,3,9,14...|19.780597824939615|19.780597824939615|
|2011-01-20 01:00:...|(22,[0,1,2,9,14,1...|-5.383336840951198|               0.0|
|2011-01-20 02:00:...|(22,[0,1,2,9,14,1...|-10.46068656792588|               0.0|
|2011-01-20 03:00:...|(22,[0,1,2,3,9,14...|-7.311465358099026|               0.0|
|2011-01-20 04:00:...|(22,[0,1,2,3,9,14...|-7.005824313567958|               0.0|
+--------------------+--------------------+------------------+------------------+
only showing top 5 rows



There's still room to develop my best model, and more works for prediction improvements will be done in the future. Below is an overview of the current progress.