# Bike rental 

In [434]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

## Data Exploration

In [435]:
# create spark session
spark = SparkSession.builder.appName('BikeRentals').getOrCreate()

# load bike rental dataset
df = spark.read.csv("bike_rental_dataset.csv", inferSchema=True, header = True)

In [436]:
df.show()

+------+---+----+---+-------+----------+----------+----+----+---------+---------+----+------+
|season| yr|mnth| hr|holiday|workingday|weathersit|temp| hum|windspeed|dayOfWeek|days|demand|
+------+---+----+---+-------+----------+----------+----+----+---------+---------+----+------+
|     1|  0|   1|  0|      0|         0|         1|0.24|0.81|      0.0|      Sat|   0|    16|
|     1|  0|   1|  1|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|    40|
|     1|  0|   1|  2|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|    32|
|     1|  0|   1|  3|      0|         0|         1|0.24|0.75|      0.0|      Sat|   0|    13|
|     1|  0|   1|  4|      0|         0|         1|0.24|0.75|      0.0|      Sat|   0|     1|
|     1|  0|   1|  5|      0|         0|         2|0.24|0.75|   0.0896|      Sat|   0|     1|
|     1|  0|   1|  6|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|     2|
|     1|  0|   1|  7|      0|         0|         1| 0.2|0.86

##### Show dataframes when the day is monday

In [437]:
df.where(df.dayOfWeek=="Mon").show()

+------+---+----+---+-------+----------+----------+----+----+-------------------+---------+----+------+
|season| yr|mnth| hr|holiday|workingday|weathersit|temp| hum|          windspeed|dayOfWeek|days|demand|
+------+---+----+---+-------+----------+----------+----+----+-------------------+---------+----+------+
|     1|  0|   1|  0|      0|         1|         1|0.22|0.44|             0.3582|      Mon|   1|     5|
|     1|  0|   1|  1|      0|         1|         1| 0.2|0.44|             0.4179|      Mon|   2|     2|
|     1|  0|   1|  4|      0|         1|         1|0.16|0.47|             0.3881|      Mon|   2|     1|
|     1|  0|   1|  5|      0|         1|         1|0.16|0.47|             0.2836|      Mon|   2|     3|
|     1|  0|   1|  6|      0|         1|         1|0.14| 0.5|             0.3881|      Mon|   2|    30|
|     1|  0|   1|  7|      0|         1|         1|0.14| 0.5|0.19399999999999998|      Mon|   2|    64|
|     1|  0|   1|  8|      0|         1|         1|0.14| 0.5|   

#### Show schema

In [438]:
df.printSchema()

root
 |-- season: integer (nullable = true)
 |-- yr: integer (nullable = true)
 |-- mnth: integer (nullable = true)
 |-- hr: integer (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- weathersit: integer (nullable = true)
 |-- temp: double (nullable = true)
 |-- hum: double (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- dayOfWeek: string (nullable = true)
 |-- days: integer (nullable = true)
 |-- demand: integer (nullable = true)



#### show number reservation by week

In [439]:
df.groupBy('dayOfWeek').count().show()

+---------+-----+
|dayOfWeek|count|
+---------+-----+
|      Sun| 2502|
|      Mon| 2479|
|      Sat| 2512|
|      Wed| 2475|
|      Tue| 2453|
|      Fri| 2487|
|      Thr| 2471|
+---------+-----+



#### show number reservation by month

In [440]:
df.groupBy('mnth').count().show()

+----+-----+
|mnth|count|
+----+-----+
|  12| 1483|
|   1| 1429|
|   6| 1440|
|   3| 1473|
|   5| 1488|
|   9| 1437|
|   4| 1437|
|   8| 1475|
|   7| 1488|
|  10| 1451|
|  11| 1437|
|   2| 1341|
+----+-----+



#### show number reservation by month

In [441]:
df.groupBy('mnth').count().show()

+----+-----+
|mnth|count|
+----+-----+
|  12| 1483|
|   1| 1429|
|   6| 1440|
|   3| 1473|
|   5| 1488|
|   9| 1437|
|   4| 1437|
|   8| 1475|
|   7| 1488|
|  10| 1451|
|  11| 1437|
|   2| 1341|
+----+-----+



#### show number reservation by year

In [442]:
df.groupBy('yr').count().show()

+---+-----+
| yr|count|
+---+-----+
|  1| 8734|
|  0| 8645|
+---+-----+



#### show number reservation by season

In [443]:
df.groupBy('season').count().show()

+------+-----+
|season|count|
+------+-----+
|     1| 4242|
|     3| 4496|
|     4| 4232|
|     2| 4409|
+------+-----+



## Transformation

In [444]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='dayOfWeek', outputCol='day_cat')
indexed_data = indexer.fit(df).transform(df)

In [445]:
indexed_data.show()

+------+---+----+---+-------+----------+----------+----+----+---------+---------+----+------+-------+
|season| yr|mnth| hr|holiday|workingday|weathersit|temp| hum|windspeed|dayOfWeek|days|demand|day_cat|
+------+---+----+---+-------+----------+----------+----+----+---------+---------+----+------+-------+
|     1|  0|   1|  0|      0|         0|         1|0.24|0.81|      0.0|      Sat|   0|    16|    0.0|
|     1|  0|   1|  1|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|    40|    0.0|
|     1|  0|   1|  2|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|    32|    0.0|
|     1|  0|   1|  3|      0|         0|         1|0.24|0.75|      0.0|      Sat|   0|    13|    0.0|
|     1|  0|   1|  4|      0|         0|         1|0.24|0.75|      0.0|      Sat|   0|     1|    0.0|
|     1|  0|   1|  5|      0|         0|         2|0.24|0.75|   0.0896|      Sat|   0|     1|    0.0|
|     1|  0|   1|  6|      0|         0|         1|0.22| 0.8|      0.0|      Sat| 

In [446]:
indexed_data.groupBy('day_cat').count().orderBy('day_cat').show(n=15)

+-------+-----+
|day_cat|count|
+-------+-----+
|    0.0| 2512|
|    1.0| 2502|
|    2.0| 2487|
|    3.0| 2479|
|    4.0| 2475|
|    5.0| 2471|
|    6.0| 2453|
+-------+-----+



## Create the first model that is given to us

In [447]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

In [448]:
indexed_data.columns

['season',
 'yr',
 'mnth',
 'hr',
 'holiday',
 'workingday',
 'weathersit',
 'temp',
 'hum',
 'windspeed',
 'dayOfWeek',
 'days',
 'demand',
 'day_cat']

In [449]:
vec = VectorAssembler(
  inputCols= [
    'season',
    'yr',
    'mnth',
    'hr',
    'holiday',
    'workingday',
    'weathersit',
    'temp',
    'hum',
    'windspeed',
    'day_cat'
    ],
   outputCol = 'features'                  
)
data = vec.transform(indexed_data)

display(data)

DataFrame[season: int, yr: int, mnth: int, hr: int, holiday: int, workingday: int, weathersit: int, temp: double, hum: double, windspeed: double, dayOfWeek: string, days: int, demand: int, day_cat: double, features: vector]

In [450]:
for item in data.take(1)[0]:
    print(item)

1
0
1
0
0
0
1
0.24
0.81
0.0
Sat
0
16
0.0
(11,[0,2,6,7,8],[1.0,1.0,1.0,0.24,0.81])


In [451]:
for item in data.take(3):
    print(item, end="\n\n")

Row(season=1, yr=0, mnth=1, hr=0, holiday=0, workingday=0, weathersit=1, temp=0.24, hum=0.81, windspeed=0.0, dayOfWeek='Sat', days=0, demand=16, day_cat=0.0, features=SparseVector(11, {0: 1.0, 2: 1.0, 6: 1.0, 7: 0.24, 8: 0.81}))

Row(season=1, yr=0, mnth=1, hr=1, holiday=0, workingday=0, weathersit=1, temp=0.22, hum=0.8, windspeed=0.0, dayOfWeek='Sat', days=0, demand=40, day_cat=0.0, features=SparseVector(11, {0: 1.0, 2: 1.0, 3: 1.0, 6: 1.0, 7: 0.22, 8: 0.8}))

Row(season=1, yr=0, mnth=1, hr=2, holiday=0, workingday=0, weathersit=1, temp=0.22, hum=0.8, windspeed=0.0, dayOfWeek='Sat', days=0, demand=32, day_cat=0.0, features=SparseVector(11, {0: 1.0, 2: 1.0, 3: 2.0, 6: 1.0, 7: 0.22, 8: 0.8}))



##### Get features column and demand column

In [452]:
model_data = data.select('features', 'demand')
model_data.show(truncate=False)

+---------------------------------------------------+------+
|features                                           |demand|
+---------------------------------------------------+------+
|(11,[0,2,6,7,8],[1.0,1.0,1.0,0.24,0.81])           |16    |
|(11,[0,2,3,6,7,8],[1.0,1.0,1.0,1.0,0.22,0.8])      |40    |
|(11,[0,2,3,6,7,8],[1.0,1.0,2.0,1.0,0.22,0.8])      |32    |
|(11,[0,2,3,6,7,8],[1.0,1.0,3.0,1.0,0.24,0.75])     |13    |
|(11,[0,2,3,6,7,8],[1.0,1.0,4.0,1.0,0.24,0.75])     |1     |
|[1.0,0.0,1.0,5.0,0.0,0.0,2.0,0.24,0.75,0.0896,0.0] |1     |
|(11,[0,2,3,6,7,8],[1.0,1.0,6.0,1.0,0.22,0.8])      |2     |
|(11,[0,2,3,6,7,8],[1.0,1.0,7.0,1.0,0.2,0.86])      |3     |
|(11,[0,2,3,6,7,8],[1.0,1.0,8.0,1.0,0.24,0.75])     |8     |
|(11,[0,2,3,6,7,8],[1.0,1.0,9.0,1.0,0.32,0.76])     |14    |
|[1.0,0.0,1.0,10.0,0.0,0.0,1.0,0.38,0.76,0.2537,0.0]|36    |
|[1.0,0.0,1.0,11.0,0.0,0.0,1.0,0.36,0.81,0.2836,0.0]|56    |
|[1.0,0.0,1.0,12.0,0.0,0.0,1.0,0.42,0.77,0.2836,0.0]|84    |
|[1.0,0.0,1.0,13.0,0.0,0

##### Split randomly datas

In [453]:
train_data, test_data = model_data.randomSplit([0.7, 0.3])

In [454]:
model_data.describe().show()

+-------+------------------+
|summary|            demand|
+-------+------------------+
|  count|             17379|
|   mean|189.46308763450142|
| stddev| 181.3875990918646|
|    min|                 1|
|    max|               977|
+-------+------------------+



In [455]:
train_data.describe().show()

+-------+------------------+
|summary|            demand|
+-------+------------------+
|  count|             12035|
|   mean|189.87486497714997|
| stddev|181.07587853298674|
|    min|                 1|
|    max|               976|
+-------+------------------+



In [456]:
test_data.describe().show()

+-------+------------------+
|summary|            demand|
+-------+------------------+
|  count|              5344|
|   mean|188.53574101796409|
| stddev| 182.1012284855826|
|    min|                 1|
|    max|               977|
+-------+------------------+



##### Create the linear regression model

In [457]:
lr = LinearRegression(labelCol='demand')

In [458]:
train_data.cache()
test_data.cache()

DataFrame[features: vector, demand: int]

In [459]:
lr_model = lr.fit(train_data)

23/02/27 12:33:01 WARN Instrumentation: [4553b589] regParam is zero, which might cause numerical instability and overfitting.


##### get the summary of the model

In [460]:
summary = lr_model.summary
print("explained variance =", summary.explainedVariance)
print("mean absolute error =", summary.meanAbsoluteError)

explained variance = 12549.328489263007
mean absolute error = 106.26800667712139


In [461]:
summary.r2

0.38276777972947296

In [462]:
summary.predictions.show(n=20, truncate = False)

+-----------------------------------------------+------+-------------------+
|features                                       |demand|prediction         |
+-----------------------------------------------+------+-------------------+
|(11,[0,1,2,6,7,8],[2.0,1.0,5.0,1.0,0.6,0.83])  |153.0 |116.90169778316846 |
|(11,[0,1,2,6,7,8],[3.0,1.0,8.0,2.0,0.7,0.61])  |135.0 |202.53209304747514 |
|(11,[0,1,2,6,7,8],[4.0,1.0,12.0,1.0,0.26,0.81])|108.0 |62.91089805521192  |
|(11,[0,2,3,6,7,8],[1.0,1.0,1.0,1.0,0.22,0.8])  |40.0  |-75.93549659122657 |
|(11,[0,2,3,6,7,8],[1.0,1.0,2.0,1.0,0.22,0.8])  |32.0  |-68.30971673360057 |
|(11,[0,2,3,6,7,8],[1.0,1.0,2.0,2.0,0.18,0.55]) |16.0  |-35.47202238977879 |
|(11,[0,2,3,6,7,8],[1.0,1.0,3.0,1.0,0.24,0.75]) |13.0  |-45.286139727894074|
|(11,[0,2,3,6,7,8],[1.0,1.0,4.0,1.0,0.24,0.75]) |1.0   |-37.66035987026809 |
|(11,[0,2,3,6,7,8],[1.0,1.0,4.0,2.0,0.16,0.59]) |5.0   |-33.666433500655266|
|(11,[0,2,3,6,7,8],[1.0,1.0,6.0,1.0,0.22,0.8])  |2.0   |-37.80659730309663 |

##### Evaluate the model

In [463]:
test_results = lr_model.evaluate(test_data)

In [464]:
test_results.residuals.show(n=10)

+-------------------+
|          residuals|
+-------------------+
| 13.000498033759701|
| 14.675212012651428|
|  -32.1409108911277|
|-1.6583186733249704|
| 49.292213358281266|
| 22.360021014691373|
| -69.60317209286863|
| 134.59329713283037|
|  96.23523499374329|
|  77.96455073678902|
+-------------------+
only showing top 10 rows



In [465]:
test_results.residuals.groupBy().avg().show() 

+------------------+
|    avg(residuals)|
+------------------+
|0.5084773049173261|
+------------------+



In [466]:
from pyspark.sql.functions import abs
df = test_results.residuals
df.select(abs(df.residuals)).groupBy().avg().show()

+-------------------+
|avg(abs(residuals))|
+-------------------+
|  105.2737009090852|
+-------------------+



##### Obtains means squarred error

In [467]:
print("r2 = %g"%test_results.r2)   # my model explains x % of the variance of the data
print("rootMeanSquaredError = %g"%test_results.rootMeanSquaredError)
print("meanAbsoluteError = %g"%test_results.meanAbsoluteError)

r2 = 0.397747
rootMeanSquaredError = 141.306
meanAbsoluteError = 105.274


So we dont have a good score.. We can do some :
- decrease regularization parameter? 
- Add more features? Feature Engineering?
- Polynomial Regression? other algorithms? Trees? 

## Get some insights

In [468]:
from pyspark.sql.functions import avg, stddev, format_number

In [469]:
insights = lr_model.evaluate(data)
pred = insights.predictions

In [470]:
pred.take(1)

[Row(season=1, yr=0, mnth=1, hr=0, holiday=0, workingday=0, weathersit=1, temp=0.24, hum=0.81, windspeed=0.0, dayOfWeek='Sat', days=0, demand=16, day_cat=0.0, features=SparseVector(11, {0: 1.0, 2: 1.0, 6: 1.0, 7: 0.24, 8: 0.81}), prediction=-79.87443723248417)]

In [471]:
pred_res = pred.withColumn('res_abs', abs(pred.prediction-pred.demand))
pred_res.take(1)

[Row(season=1, yr=0, mnth=1, hr=0, holiday=0, workingday=0, weathersit=1, temp=0.24, hum=0.81, windspeed=0.0, dayOfWeek='Sat', days=0, demand=16, day_cat=0.0, features=SparseVector(11, {0: 1.0, 2: 1.0, 6: 1.0, 7: 0.24, 8: 0.81}), prediction=-79.87443723248417, res_abs=95.87443723248417)]

##### get some insights per season

In [472]:
pred_res.groupBy('season').agg(
    format_number(avg('res_abs'), 2).alias('avg_abs_residual'),
    format_number(avg('demand'), 2).alias('avg_demand'),
    format_number(stddev('prediction'), 2).alias('stddev_prediction'),
    format_number(stddev('demand'), 2).alias('stddev_demand')
).sort('season').show()

+------+----------------+----------+-----------------+-------------+
|season|avg_abs_residual|avg_demand|stddev_prediction|stddev_demand|
+------+----------------+----------+-----------------+-------------+
|     1|           77.48|    111.11|            97.93|       119.22|
|     2|          109.21|    208.34|           106.70|       188.36|
|     3|          128.01|    236.02|           100.41|       197.71|
|     4|          107.72|    198.87|            95.10|       182.97|
+------+----------------+----------+-----------------+-------------+



##### get some insights per month

In [473]:
pred_res.groupBy('mnth').agg(
    format_number(avg('res_abs'), 2).alias('avg_abs_residual'),
    format_number(avg('demand'), 2).alias('avg_demand'),
    format_number(stddev('prediction'), 2).alias('stddev_prediction'),
    format_number(stddev('demand'), 2).alias('stddev_demand')
).sort('mnth').show()

+----+----------------+----------+-----------------+-------------+
|mnth|avg_abs_residual|avg_demand|stddev_prediction|stddev_demand|
+----+----------------+----------+-----------------+-------------+
|   1|           68.52|     94.42|            93.21|        99.91|
|   2|           79.71|    112.87|            99.46|       112.49|
|   3|           95.32|    155.41|           104.39|       163.54|
|   4|          106.06|    187.26|           109.72|       181.14|
|   5|          108.43|    222.91|           102.66|       187.72|
|   6|          124.61|    240.52|           103.73|       196.04|
|   7|          131.05|    231.82|            97.13|       187.48|
|   8|          126.71|    238.10|            94.32|       200.44|
|   9|          127.09|    240.77|            96.91|       214.61|
|  10|          116.95|    222.16|            96.45|       203.48|
|  11|           99.23|    177.34|            91.96|       158.97|
|  12|           85.17|    142.30|            89.31|       141

##### get some insights per day of week

##### get some insights per hour

In [474]:
pred_res.groupBy('hr').agg(
    format_number(avg('res_abs'), 2).alias('avg_abs_residual'),
    format_number(avg('demand'), 2).alias('avg_demand'),
    format_number(stddev('prediction'), 2).alias('stddev_prediction'),
    format_number(stddev('demand'), 2).alias('stddev_demand')
).sort('hr').show()

+---+----------------+----------+-----------------+-------------+
| hr|avg_abs_residual|avg_demand|stddev_prediction|stddev_demand|
+---+----------------+----------+-----------------+-------------+
|  0|           60.01|     53.90|            77.26|        42.31|
|  1|           71.67|     33.38|            76.19|        33.54|
|  2|           79.56|     22.87|            74.83|        26.58|
|  3|           89.76|     11.73|            72.56|        13.24|
|  4|           96.75|      6.35|            71.63|         4.14|
|  5|           87.44|     19.89|            72.52|        13.20|
|  6|           53.35|     76.04|            73.29|        55.08|
|  7|          142.74|    212.06|            76.42|       161.44|
|  8|          248.91|    359.01|            81.12|       235.19|
|  9|           76.95|    219.31|            83.78|        93.70|
| 10|           69.50|    173.67|            87.20|       102.21|
| 11|           80.36|    208.14|            88.79|       127.50|
| 12|     

In [475]:
pred_res.groupBy('dayofweek').agg(
    format_number(avg('res_abs'), 2).alias('avg_abs_residual'),
    format_number(avg('demand'), 2).alias('avg_demand'),
    format_number(stddev('prediction'), 2).alias('stddev_prediction'),
    format_number(stddev('demand'), 2).alias('stddev_demand')
).sort('dayofweek').show()

+---------+----------------+----------+-----------------+-------------+
|dayofweek|avg_abs_residual|avg_demand|stddev_prediction|stddev_demand|
+---------+----------------+----------+-----------------+-------------+
|      Fri|          100.54|    196.14|           114.42|       174.08|
|      Mon|          106.41|    183.74|           110.65|       179.51|
|      Sat|           99.24|    190.21|           114.15|       179.82|
|      Sun|           99.16|    177.47|           108.74|       168.17|
|      Thr|          111.47|    196.44|           111.45|       188.01|
|      Tue|          112.73|    191.24|           109.87|       187.82|
|      Wed|          112.46|    191.13|           114.64|       190.89|
+---------+----------------+----------+-----------------+-------------+



##### get some insights per temperature

In [476]:
pred_res.groupBy('temp').agg(
    format_number(avg('res_abs'), 2).alias('avg_abs_residual'),
    format_number(avg('demand'), 2).alias('avg_demand'),
    format_number(stddev('prediction'), 2).alias('stddev_prediction'),
    format_number(stddev('demand'), 2).alias('stddev_demand')
).sort('temp').show()

+----+----------------+----------+-----------------+-------------+
|temp|avg_abs_residual|avg_demand|stddev_prediction|stddev_demand|
+----+----------------+----------+-----------------+-------------+
|0.02|           63.05|     41.88|            55.74|        80.88|
|0.04|           65.20|     35.62|            54.68|        57.78|
|0.06|           62.65|     42.00|            66.56|        30.58|
|0.08|           51.96|     28.24|            68.73|        27.02|
| 0.1|           67.78|     49.29|            71.93|        74.69|
|0.12|           71.97|     58.42|            79.36|        70.43|
|0.14|           70.05|     55.11|            85.32|        61.83|
|0.16|           68.01|     65.58|            76.48|        73.08|
|0.18|           63.89|     60.12|            73.14|        65.68|
| 0.2|           69.59|     79.75|            75.25|       101.83|
|0.22|           67.22|     69.91|            75.69|        93.25|
|0.24|           77.15|     80.16|            77.47|       105

## Improve the model : add dummy variables

In [477]:
from pyspark.ml.feature import OneHotEncoder

##### Reload the dataset

In [478]:
df = spark.read.csv("bike_rental_dataset.csv", inferSchema=True, header = True)

In [479]:
df.show()

+------+---+----+---+-------+----------+----------+----+----+---------+---------+----+------+
|season| yr|mnth| hr|holiday|workingday|weathersit|temp| hum|windspeed|dayOfWeek|days|demand|
+------+---+----+---+-------+----------+----------+----+----+---------+---------+----+------+
|     1|  0|   1|  0|      0|         0|         1|0.24|0.81|      0.0|      Sat|   0|    16|
|     1|  0|   1|  1|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|    40|
|     1|  0|   1|  2|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|    32|
|     1|  0|   1|  3|      0|         0|         1|0.24|0.75|      0.0|      Sat|   0|    13|
|     1|  0|   1|  4|      0|         0|         1|0.24|0.75|      0.0|      Sat|   0|     1|
|     1|  0|   1|  5|      0|         0|         2|0.24|0.75|   0.0896|      Sat|   0|     1|
|     1|  0|   1|  6|      0|         0|         1|0.22| 0.8|      0.0|      Sat|   0|     2|
|     1|  0|   1|  7|      0|         0|         1| 0.2|0.86

##### Create dummy variables

In [480]:
season_indexer = StringIndexer(inputCol="season", outputCol="season_index")
season_encoder = OneHotEncoder(inputCol="season_index", outputCol="season_dummy")

In [481]:
year_indexer = StringIndexer(inputCol="yr", outputCol="year_index")
year_encoder = OneHotEncoder(inputCol="year_index", outputCol="year_dummy")

In [482]:
month_indexer = StringIndexer(inputCol="mnth", outputCol="mnth_index")
month_encoder = OneHotEncoder(inputCol="mnth_index", outputCol="mnth_dummy")

In [483]:
hour_indexer = StringIndexer(inputCol="hr", outputCol="hr_index")
hour_encoder = OneHotEncoder(inputCol="hr_index", outputCol="hr_dummy")

In [484]:
weekday_indexer = StringIndexer(inputCol="dayOfWeek", outputCol="weekday_index")
weekday_encoder = OneHotEncoder(inputCol="weekday_index", outputCol="weekday_dummy")

In [485]:
weather_indexer = StringIndexer(inputCol="weathersit", outputCol="weather_index")
weather_encoder = OneHotEncoder(inputCol="weather_index", outputCol="weather_dummy")

In [486]:
holiday_indexer = StringIndexer(inputCol="holiday", outputCol="holiday_index")
holiday_encoder = OneHotEncoder(inputCol="holiday_index", outputCol="holiday_dummy")

In [487]:
workingday_indexer = StringIndexer(inputCol="workingday", outputCol="workingday_index")
workingday_encoder = OneHotEncoder(inputCol="workingday_index", outputCol="workingday_dummy")

##### fit encoders and indexors to data

In [488]:
data = season_indexer.fit(data).transform(data)
data = season_encoder.fit(data).transform(data)

In [489]:
data = year_indexer.fit(data).transform(data)
data = year_encoder.fit(data).transform(data)

In [490]:
data = month_indexer.fit(data).transform(data)
data = month_encoder.fit(data).transform(data)

In [491]:
data = hour_indexer.fit(data).transform(data)
data = hour_encoder.fit(data).transform(data)

In [492]:
data = weekday_indexer.fit(data).transform(data)
data = weekday_encoder.fit(data).transform(data)

In [493]:
data = weather_indexer.fit(data).transform(data)
data = weather_encoder.fit(data).transform(data)

In [494]:
data = holiday_indexer.fit(data).transform(data)
data = holiday_encoder.fit(data).transform(data)

In [495]:
data = workingday_indexer.fit(data).transform(data)
data = workingday_encoder.fit(data).transform(data)

In [496]:
display(data)

DataFrame[season: int, yr: int, mnth: int, hr: int, holiday: int, workingday: int, weathersit: int, temp: double, hum: double, windspeed: double, dayOfWeek: string, days: int, demand: int, day_cat: double, features: vector, season_index: double, season_dummy: vector, year_index: double, year_dummy: vector, mnth_index: double, mnth_dummy: vector, hr_index: double, hr_dummy: vector, weekday_index: double, weekday_dummy: vector, weather_index: double, weather_dummy: vector, holiday_index: double, holiday_dummy: vector, workingday_index: double, workingday_dummy: vector]

##### Add the dummy variables to the DataFrame

In [497]:
assembler = VectorAssembler(
    inputCols=[
        "season_dummy",
        "year_dummy",
        "mnth_dummy",
        "hr_dummy",
        "weekday_dummy",
        "weather_dummy",
        "holiday_dummy",
        "workingday_dummy",
        "temp",
        "hum",
        "windspeed"
        ]
    )

In [498]:
data_assembler = assembler.transform(data)

In [499]:
data_select = data_assembler.select("features", "demand")
data_select.show()

+--------------------+------+
|            features|demand|
+--------------------+------+
|(11,[0,2,6,7,8],[...|    16|
|(11,[0,2,3,6,7,8]...|    40|
|(11,[0,2,3,6,7,8]...|    32|
|(11,[0,2,3,6,7,8]...|    13|
|(11,[0,2,3,6,7,8]...|     1|
|[1.0,0.0,1.0,5.0,...|     1|
|(11,[0,2,3,6,7,8]...|     2|
|(11,[0,2,3,6,7,8]...|     3|
|(11,[0,2,3,6,7,8]...|     8|
|(11,[0,2,3,6,7,8]...|    14|
|[1.0,0.0,1.0,10.0...|    36|
|[1.0,0.0,1.0,11.0...|    56|
|[1.0,0.0,1.0,12.0...|    84|
|[1.0,0.0,1.0,13.0...|    94|
|[1.0,0.0,1.0,14.0...|   106|
|[1.0,0.0,1.0,15.0...|   110|
|[1.0,0.0,1.0,16.0...|    93|
|[1.0,0.0,1.0,17.0...|    67|
|[1.0,0.0,1.0,18.0...|    35|
|[1.0,0.0,1.0,19.0...|    37|
+--------------------+------+
only showing top 20 rows



## Test the model with dumy variables

In [500]:
train_data, test_data = data_select.randomSplit([0.7, 0.3])

In [501]:
data_select.describe().show()

+-------+------------------+
|summary|            demand|
+-------+------------------+
|  count|             17379|
|   mean|189.46308763450142|
| stddev| 181.3875990918646|
|    min|                 1|
|    max|               977|
+-------+------------------+



In [502]:
train_data.describe().show()

+-------+------------------+
|summary|            demand|
+-------+------------------+
|  count|             12185|
|   mean|188.91981945014362|
| stddev|180.64641641321109|
|    min|                 1|
|    max|               970|
+-------+------------------+



In [503]:
test_data.describe().show()

+-------+------------------+
|summary|            demand|
+-------+------------------+
|  count|              5194|
|   mean| 190.7375818251829|
| stddev|183.12578416414303|
|    min|                 1|
|    max|               977|
+-------+------------------+



In [504]:
lr_dummy = LinearRegression(labelCol='demand')

In [505]:
train_data.cache()
test_data.cache()

DataFrame[features: vector, demand: int]

In [506]:
lr_dummy_model = lr_dummy.fit(train_data)

23/02/27 12:33:05 WARN Instrumentation: [67faf139] regParam is zero, which might cause numerical instability and overfitting.


In [507]:
summary_dummy = lr_dummy_model.summary
print("explained variance =", summary_dummy.explainedVariance)
print("mean absolute error =", summary_dummy.meanAbsoluteError)

explained variance = 12697.284990609
mean absolute error = 105.44729536637277


In [508]:
summary_dummy.r2

0.3891238134036027

In [509]:
summary_dummy.predictions.show(n=20, truncate = False)

+-----------------------------------------------+------+-------------------+
|features                                       |demand|prediction         |
+-----------------------------------------------+------+-------------------+
|(11,[0,1,2,6,7,8],[2.0,1.0,3.0,1.0,0.58,0.68]) |156.0 |135.20393278863173 |
|(11,[0,1,2,6,7,8],[3.0,1.0,6.0,1.0,0.64,0.83]) |116.0 |142.79254000302075 |
|(11,[0,1,2,6,7,8],[4.0,1.0,12.0,1.0,0.26,0.81])|108.0 |59.86328899983247  |
|(11,[0,1,2,6,7,8],[4.0,1.0,12.0,1.0,0.3,0.7])  |94.0  |92.36707091755346  |
|(11,[0,2,3,6,7,8],[1.0,1.0,1.0,1.0,0.22,0.8])  |40.0  |-80.47989898237392 |
|(11,[0,2,3,6,7,8],[1.0,1.0,2.0,2.0,0.18,0.55]) |16.0  |-41.37631251490641 |
|(11,[0,2,3,6,7,8],[1.0,1.0,3.0,2.0,0.16,0.59]) |8.0   |-46.921290267934474|
|(11,[0,2,3,6,7,8],[1.0,1.0,4.0,2.0,0.16,0.59]) |5.0   |-39.11266790697113 |
|(11,[0,2,3,6,7,8],[1.0,1.0,5.0,1.0,0.16,0.59]) |1.0   |-25.54381129337562 |
|(11,[0,2,3,6,7,8],[1.0,1.0,6.0,1.0,0.22,0.8])  |2.0   |-41.436787177557186|

In [510]:
test_results = lr_dummy_model.evaluate(test_data)

In [511]:
test_results.residuals.show(n=10)

+------------------+
|         residuals|
+------------------+
| 14.21580771007154|
| 40.90016103278441|
|-61.66903515775698|
|104.67127662141058|
|  62.5768602498764|
|42.768237888913056|
| 53.84615272374398|
| 23.20147699845979|
| -49.8080187767971|
| 110.0435796823062|
+------------------+
only showing top 10 rows



In [512]:
test_results.residuals.groupBy().avg().show() 

+-----------------+
|   avg(residuals)|
+-----------------+
|1.342185169836071|
+-----------------+



In [513]:
df = test_results.residuals
df.select(abs(df.residuals)).groupBy().avg().show()

+-------------------+
|avg(abs(residuals))|
+-------------------+
| 107.02640182475514|
+-------------------+



In [514]:
print("r2 = %g"%test_results.r2)   # my model explains x % of the variance of the data
print("rootMeanSquaredError = %g"%test_results.rootMeanSquaredError)
print("meanAbsoluteError = %g"%test_results.meanAbsoluteError)

r2 = 0.383123
rootMeanSquaredError = 143.816
meanAbsoluteError = 107.026


## Improve the model by doing cross validation

In [539]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [531]:
lr = LinearRegression(featuresCol="features", labelCol="demand")

In [534]:
# Define the pipeline
pipeline = Pipeline(stages=[lr])

In [535]:
# Split the data into training and test sets
train, test = data_select.randomSplit([0.7, 0.3])

In [538]:
# Define the parameter grid to search over
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 1.0])
             .build())

# Define the evaluation metric and the cross-validator
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="demand", metricName="rmse")
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Fit the cross-validator to the training data
cvModel = cv.fit(train)

# Evaluate the performance of the best model on the test data
predictions = cvModel.transform(test)
rmse = evaluator.evaluate(predictions)
print("RMSE on test data = {:.3f}".format(rmse))
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
print("R2 score on test data: {}".format(r2))

RMSE on test data = 142.116
R2 score on test data: 0.3845824333360707


## Use other model

In [540]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Load and preprocess your data
lr = LinearRegression(featuresCol="features", labelCol="demand")
dt = DecisionTreeRegressor(featuresCol="features", labelCol="demand")
rf = RandomForestRegressor(featuresCol="features", labelCol="demand")

# Define the pipelines for each algorithm
lr_pipeline = Pipeline(stages=[lr])
dt_pipeline = Pipeline(stages=[dt])
rf_pipeline = Pipeline(stages=[rf])

# Split the data into training and test sets
train, test = data.randomSplit([0.8, 0.2], seed=42)

# Define the parameter grids to search over for each algorithm
lr_paramGrid = (ParamGridBuilder()
                .addGrid(lr.regParam, [0.01, 0.1, 1.0])
                .build())
dt_paramGrid = (ParamGridBuilder()
                .addGrid(dt.maxDepth, [2, 5, 10])
                .build())
rf_paramGrid = (ParamGridBuilder()
                .addGrid(rf.numTrees, [10, 50, 100])
                .addGrid(rf.maxDepth, [2, 5, 10])
                .build())

# Define the evaluation metric and the cross-validators for each algorithm
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="demand", metricName="rmse")
r2_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="demand", metricName="r2")
lr_cv = CrossValidator(estimator=lr_pipeline, estimatorParamMaps=lr_paramGrid, evaluator=evaluator, numFolds=5)
dt_cv = CrossValidator(estimator=dt_pipeline, estimatorParamMaps=dt_paramGrid, evaluator=evaluator, numFolds=5)
rf_cv = CrossValidator(estimator=rf_pipeline, estimatorParamMaps=rf_paramGrid, evaluator=evaluator, numFolds=5)

# Fit the cross-validators to the training data
lr_cvModel = lr_cv.fit(train)
dt_cvModel = dt_cv.fit(train)
rf_cvModel = rf_cv.fit(train)

# Evaluate the performance of each algorithm on the test data
lr_predictions = lr_cvModel.transform(test)
lr_rmse = evaluator.evaluate(lr_predictions)
lr_r2 = r2_evaluator.evaluate(lr_predictions)
print("Linear Regression - RMSE: {:.2f}, R2 Score: {:.2f}".format(lr_rmse, lr_r2))
                                                                  
dt_predictions = dt_cvModel.transform(test)
dt_rmse = evaluator.evaluate(dt_predictions)
dt_r2 = r2_evaluator.evaluate(dt_predictions)
print("Decision Tree Regression - RMSE: {:.2f}, R2 Score: {:.2f}".format(dt_rmse, dt_r2))

rf_predictions = rf_cvModel.transform(test)
rf_rmse = evaluator.evaluate(rf_predictions)
rf_r2 = r2_evaluator.evaluate(rf_predictions)
print("Random Forest Regression - RMSE: {:.2f}, R2 Score: {:.2f}".format(rf_rmse, rf_r2))



23/02/27 13:05:25 WARN DAGScheduler: Broadcasting large task binary with size 1100.8 KiB
23/02/27 13:05:25 WARN DAGScheduler: Broadcasting large task binary with size 1698.6 KiB
23/02/27 13:05:28 WARN DAGScheduler: Broadcasting large task binary with size 1334.1 KiB
23/02/27 13:05:29 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
23/02/27 13:05:29 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/02/27 13:05:30 WARN DAGScheduler: Broadcasting large task binary with size 1017.3 KiB
23/02/27 13:05:30 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


[Stage 2037:>                                                       (0 + 1) / 1]

23/02/27 13:05:31 WARN DAGScheduler: Broadcasting large task binary with size 1645.1 KiB


                                                                                

23/02/27 13:05:36 WARN DAGScheduler: Broadcasting large task binary with size 1323.6 KiB
23/02/27 13:05:36 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
23/02/27 13:05:37 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


[Stage 2082:>                                                       (0 + 1) / 1]

23/02/27 13:05:38 WARN DAGScheduler: Broadcasting large task binary with size 1144.7 KiB


                                                                                

23/02/27 13:05:38 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB


[Stage 2084:>                                                       (0 + 1) / 1]

23/02/27 13:05:39 WARN DAGScheduler: Broadcasting large task binary with size 2016.5 KiB


                                                                                

23/02/27 13:05:40 WARN DAGScheduler: Broadcasting large task binary with size 13.4 MiB


[Stage 2086:>                                                       (0 + 1) / 1]

23/02/27 13:05:41 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


                                                                                

23/02/27 13:05:44 WARN DAGScheduler: Broadcasting large task binary with size 1112.3 KiB
23/02/27 13:05:44 WARN DAGScheduler: Broadcasting large task binary with size 1724.9 KiB
23/02/27 13:05:47 WARN DAGScheduler: Broadcasting large task binary with size 1331.0 KiB
23/02/27 13:05:47 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
23/02/27 13:05:48 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/02/27 13:05:48 WARN DAGScheduler: Broadcasting large task binary with size 1012.8 KiB
23/02/27 13:05:49 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/02/27 13:05:49 WARN DAGScheduler: Broadcasting large task binary with size 1625.4 KiB
23/02/27 13:05:54 WARN DAGScheduler: Broadcasting large task binary with size 1321.2 KiB
23/02/27 13:05:54 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
23/02/27 13:05:55 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


[Stage 2229:>                                                       (0 + 1) / 1]

23/02/27 13:05:56 WARN DAGScheduler: Broadcasting large task binary with size 1147.5 KiB


                                                                                

23/02/27 13:05:57 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB


[Stage 2231:>                                                       (0 + 1) / 1]

23/02/27 13:05:57 WARN DAGScheduler: Broadcasting large task binary with size 2019.3 KiB


                                                                                

23/02/27 13:05:59 WARN DAGScheduler: Broadcasting large task binary with size 13.4 MiB


[Stage 2233:>                                                       (0 + 1) / 1]

23/02/27 13:06:00 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


                                                                                

23/02/27 13:06:03 WARN DAGScheduler: Broadcasting large task binary with size 1117.5 KiB
23/02/27 13:06:03 WARN DAGScheduler: Broadcasting large task binary with size 1722.8 KiB
23/02/27 13:06:06 WARN DAGScheduler: Broadcasting large task binary with size 1332.8 KiB
23/02/27 13:06:06 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
23/02/27 13:06:07 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/02/27 13:06:07 WARN DAGScheduler: Broadcasting large task binary with size 1012.7 KiB
23/02/27 13:06:08 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


[Stage 2331:>                                                       (0 + 1) / 1]

23/02/27 13:06:08 WARN DAGScheduler: Broadcasting large task binary with size 1642.9 KiB


                                                                                

23/02/27 13:06:13 WARN DAGScheduler: Broadcasting large task binary with size 1323.2 KiB
23/02/27 13:06:13 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
23/02/27 13:06:14 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


[Stage 2376:>                                                       (0 + 1) / 1]

23/02/27 13:06:15 WARN DAGScheduler: Broadcasting large task binary with size 1150.6 KiB


                                                                                

23/02/27 13:06:15 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB


[Stage 2378:>                                                       (0 + 1) / 1]

23/02/27 13:06:16 WARN DAGScheduler: Broadcasting large task binary with size 2037.4 KiB


                                                                                

23/02/27 13:06:17 WARN DAGScheduler: Broadcasting large task binary with size 13.5 MiB


[Stage 2380:>                                                       (0 + 1) / 1]

23/02/27 13:06:18 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


                                                                                

23/02/27 13:06:21 WARN DAGScheduler: Broadcasting large task binary with size 1104.5 KiB
23/02/27 13:06:21 WARN DAGScheduler: Broadcasting large task binary with size 1696.5 KiB
23/02/27 13:06:24 WARN DAGScheduler: Broadcasting large task binary with size 1330.8 KiB
23/02/27 13:06:25 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
23/02/27 13:06:25 WARN DAGScheduler: Broadcasting large task binary with size 3.9 MiB
23/02/27 13:06:26 WARN DAGScheduler: Broadcasting large task binary with size 1007.6 KiB
23/02/27 13:06:26 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB
23/02/27 13:06:27 WARN DAGScheduler: Broadcasting large task binary with size 1618.8 KiB
23/02/27 13:06:31 WARN DAGScheduler: Broadcasting large task binary with size 1324.1 KiB
23/02/27 13:06:32 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
23/02/27 13:06:33 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


[Stage 2523:>                                                       (0 + 1) / 1]

23/02/27 13:06:33 WARN DAGScheduler: Broadcasting large task binary with size 1147.8 KiB


                                                                                

23/02/27 13:06:34 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB


[Stage 2525:>                                                       (0 + 1) / 1]

23/02/27 13:06:35 WARN DAGScheduler: Broadcasting large task binary with size 2036.7 KiB


                                                                                

23/02/27 13:06:36 WARN DAGScheduler: Broadcasting large task binary with size 13.5 MiB


[Stage 2527:>                                                       (0 + 1) / 1]

23/02/27 13:06:37 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


                                                                                

23/02/27 13:06:40 WARN DAGScheduler: Broadcasting large task binary with size 1111.0 KiB
23/02/27 13:06:40 WARN DAGScheduler: Broadcasting large task binary with size 1719.4 KiB
23/02/27 13:06:43 WARN DAGScheduler: Broadcasting large task binary with size 1332.0 KiB
23/02/27 13:06:43 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
23/02/27 13:06:44 WARN DAGScheduler: Broadcasting large task binary with size 4.0 MiB
23/02/27 13:06:44 WARN DAGScheduler: Broadcasting large task binary with size 1016.3 KiB
23/02/27 13:06:45 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


[Stage 2625:>                                                       (0 + 1) / 1]

23/02/27 13:06:45 WARN DAGScheduler: Broadcasting large task binary with size 1649.9 KiB


                                                                                

23/02/27 13:06:50 WARN DAGScheduler: Broadcasting large task binary with size 1322.7 KiB
23/02/27 13:06:51 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
23/02/27 13:06:51 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


[Stage 2670:>                                                       (0 + 1) / 1]

23/02/27 13:06:52 WARN DAGScheduler: Broadcasting large task binary with size 1146.7 KiB


                                                                                

23/02/27 13:06:53 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB


[Stage 2672:>                                                       (0 + 1) / 1]

23/02/27 13:06:53 WARN DAGScheduler: Broadcasting large task binary with size 2032.5 KiB


                                                                                

23/02/27 13:06:55 WARN DAGScheduler: Broadcasting large task binary with size 13.4 MiB


[Stage 2674:>                                                       (0 + 1) / 1]

23/02/27 13:06:56 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB


                                                                                

23/02/27 13:06:59 WARN DAGScheduler: Broadcasting large task binary with size 1230.2 KiB
23/02/27 13:07:00 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB


                                                                                

23/02/27 13:07:01 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


[Stage 2695:>                                                       (0 + 1) / 1]

23/02/27 13:07:01 WARN DAGScheduler: Broadcasting large task binary with size 1167.6 KiB


                                                                                

23/02/27 13:07:02 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB


[Stage 2697:>                                                       (0 + 1) / 1]

23/02/27 13:07:03 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB


                                                                                

23/02/27 13:07:04 WARN DAGScheduler: Broadcasting large task binary with size 13.7 MiB


[Stage 2699:>                                                       (0 + 1) / 1]

23/02/27 13:07:05 WARN DAGScheduler: Broadcasting large task binary with size 3.4 MiB


                                                                                

Linear Regression - RMSE: 143.12, R2 Score: 0.38
Decision Tree Regression - RMSE: 57.67, R2 Score: 0.90
Random Forest Regression - RMSE: 70.99, R2 Score: 0.85
