## Overview
This notebook will display simple regression task with PySpark on Seoul Bike Data to predict the demand of bike to rent.

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.regression import LinearRegression, GBTRegressor

from pyspark.sql.functions import col, isnan, when, count, abs
from pyspark.sql.types import IntegerType

### Load dataset

In [0]:
file_location = "/FileStore/tables/SeoulBikeData.csv"
file_type = "csv"
# read the csv dataset
df = spark.read.csv(file_location, header=True, inferSchema=True)

In [0]:
# show 5 initial rows in the dataset
df.show(5)

+----------+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-------+----------+---------------+
|      Date|Rented Bike Count|Hour|Temperature(�C)|Humidity(%)|Wind speed (m/s)|Visibility (10m)|Dew point temperature(�C)|Solar Radiation (MJ/m2)|Rainfall(mm)|Snowfall (cm)|Seasons|   Holiday|Functioning Day|
+----------+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-------+----------+---------------+
|01/12/2017|              254|   0|           -5.2|         37|             2.2|            2000|                    -17.6|                    0.0|         0.0|          0.0| Winter|No Holiday|            Yes|
|01/12/2017|              204|   1|           -5.5|         38|             0.8|            2000|                    -17.6|                    0.0|         0.0|

In [0]:
# printing the schema of the dataset
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Rented Bike Count: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Temperature(�C): double (nullable = true)
 |-- Humidity(%): integer (nullable = true)
 |-- Wind speed (m/s): double (nullable = true)
 |-- Visibility (10m): integer (nullable = true)
 |-- Dew point temperature(�C): double (nullable = true)
 |-- Solar Radiation (MJ/m2): double (nullable = true)
 |-- Rainfall(mm): double (nullable = true)
 |-- Snowfall (cm): double (nullable = true)
 |-- Seasons: string (nullable = true)
 |-- Holiday: string (nullable = true)
 |-- Functioning Day: string (nullable = true)



### Check null values

In [0]:
# check whether there is the null values in the dataset (None, NULL, '', and NaN)
df.select([count(when(col(c).contains('None') |
                            col(c).contains('NULL') |
                            (col(c) == '' ) |
                            col(c).isNull() |
                            isnan(c), c 
                           )).alias(c)
                    for c in df.columns]).show()

+----+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-------+-------+---------------+
|Date|Rented Bike Count|Hour|Temperature(�C)|Humidity(%)|Wind speed (m/s)|Visibility (10m)|Dew point temperature(�C)|Solar Radiation (MJ/m2)|Rainfall(mm)|Snowfall (cm)|Seasons|Holiday|Functioning Day|
+----+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-------+-------+---------------+
|   0|                0|   0|              0|          0|               0|               0|                        0|                      0|           0|            0|      0|      0|              0|
+----+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-------+-------+------------

### Transform categorical values

In [0]:
# columns with categorical values
categorical_cols = ['Seasons', 'Holiday', 'Functioning Day']

In [0]:
# using StringIndexer, we will transform data in the categorical columns into numeric values
indexers = StringIndexer(inputCols=categorical_cols,
            outputCols=['{}_idx'.format(c) for c in categorical_cols])

In [0]:
tfm_df = indexers.fit(df).transform(df)

In [0]:
tfm_df.select(['Seasons_idx', 'Holiday_idx', 'Functioning Day_idx']).show()

+-----------+-----------+-------------------+
|Seasons_idx|Holiday_idx|Functioning Day_idx|
+-----------+-----------+-------------------+
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|                0.0|
|        3.0|        0.0|         

### Transform date into day, month, and year

In [0]:
date = {'day': [1, 2], 'month': [4, 2], 'year': [7, 4]}

# transform values in the Date columns into day, month, and year columns
for idx, val in date.items():
    tfm_df = tfm_df.withColumn(idx, 
                               col('Date').substr(startPos=val[0], length=val[1]).cast(IntegerType()))

In [0]:
tfm_df.select(['Date', 'day', 'month', 'year']).show()

+----------+---+-----+----+
|      Date|day|month|year|
+----------+---+-----+----+
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
|01/12/2017|  1|   12|2017|
+----------+---+-----+----+
only showing top 20 rows



In [0]:
tfm_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Rented Bike Count: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Temperature(�C): double (nullable = true)
 |-- Humidity(%): integer (nullable = true)
 |-- Wind speed (m/s): double (nullable = true)
 |-- Visibility (10m): integer (nullable = true)
 |-- Dew point temperature(�C): double (nullable = true)
 |-- Solar Radiation (MJ/m2): double (nullable = true)
 |-- Rainfall(mm): double (nullable = true)
 |-- Snowfall (cm): double (nullable = true)
 |-- Seasons: string (nullable = true)
 |-- Holiday: string (nullable = true)
 |-- Functioning Day: string (nullable = true)
 |-- Seasons_idx: double (nullable = false)
 |-- Holiday_idx: double (nullable = false)
 |-- Functioning Day_idx: double (nullable = false)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)



### Taking only column with int and double datatype

In [0]:
# taking only column with numeric values
cols = [c for c, tp in tfm_df.dtypes if tp in ['int', 'double']]

In [0]:
numeric_df = tfm_df.select(cols)
numeric_df.show(5)

+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-----------+-----------+-------------------+---+-----+----+
|Rented Bike Count|Hour|Temperature(�C)|Humidity(%)|Wind speed (m/s)|Visibility (10m)|Dew point temperature(�C)|Solar Radiation (MJ/m2)|Rainfall(mm)|Snowfall (cm)|Seasons_idx|Holiday_idx|Functioning Day_idx|day|month|year|
+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-----------+-----------+-------------------+---+-----+----+
|              254|   0|           -5.2|         37|             2.2|            2000|                    -17.6|                    0.0|         0.0|          0.0|        3.0|        0.0|                0.0|  1|   12|2017|
|              204|   1|           -5.5|         38|             0.8|            2000|                    -1

### Assemble features for regression

In [0]:
col_features = cols.copy()

In [0]:
col_features.remove('Rented Bike Count')

In [0]:
assembler = VectorAssembler(inputCols=col_features, outputCol='features')

In [0]:
assembled_df = assembler.transform(numeric_df)

### Using MinMaxScaler

In [0]:
scaler = MinMaxScaler(inputCol='features', outputCol='scaled_features')

In [0]:
scaled_df = scaler.fit(assembled_df).transform(assembled_df)

In [0]:
scaled_df.select('scaled_features').show(5)

+--------------------+
|     scaled_features|
+--------------------+
|(15,[1,2,3,4,5,9,...|
|(15,[0,1,2,3,4,5,...|
|(15,[0,1,2,3,4,5,...|
|(15,[0,1,2,3,4,5,...|
|(15,[0,1,2,3,4,5,...|
+--------------------+
only showing top 5 rows



### Split train and test dataset

In [0]:
train_df, test_df = scaled_df.randomSplit([0.8, 0.2])

### Linear Regression

In [0]:
linear_regression = LinearRegression(featuresCol='scaled_features', 
                                     labelCol='Rented Bike Count')

In [0]:
lr_model = linear_regression.fit(train_df)

In [0]:
print('RMSE on train data: {}'.format(lr_model.summary.rootMeanSquaredError))
print('r2  on train data: {}'.format(lr_model.summary.r2))

RMSE on train data: 436.0990257956108
r2  on train data: 0.5435363531614852


In [0]:
lr_pred = model.transform(test_df)

In [0]:
lr_pred.select(['prediction', 'Rented Bike Count']).show(4)

+-------------------+-----------------+
|         prediction|Rented Bike Count|
+-------------------+-----------------+
| -37.68093203855915|                0|
|-377.88406732572787|                0|
| -54.45020961267511|                0|
| -36.13393346410077|                0|
+-------------------+-----------------+
only showing top 4 rows



In [0]:
evaluators = [RegressionEvaluator(labelCol='Rented Bike Count', 
                                predictionCol='prediction', metricName='{}'.format(e))
              for e in ['rmse', 'r2']]
print('RMSE = %g' % evaluators[0].evaluate(lr_pred))
print('r2 = %g' % evaluators[1].evaluate(lr_pred))

RMSE = 238.22
r2 = 0.862676


### GBT Regressor

In [0]:
regressor = GBTRegressor(featuresCol='scaled_features',
                            labelCol='Rented Bike Count')

In [0]:
gbt_model = regressor.fit(train_df)

In [0]:
gbt_pred = gbt_model.transform(test_df)

In [0]:
gbt_pred.select('prediction', 'Rented Bike Count').show(3)

+-------------------+-----------------+
|         prediction|Rented Bike Count|
+-------------------+-----------------+
| 24.583032529636238|                0|
|-366.62750122607207|                0|
|  5.235890211834931|                0|
+-------------------+-----------------+
only showing top 3 rows



In [0]:
print('RMSE = %g' % evaluators[0].evaluate(gbt_pred))
print('r2 = %g' % evaluators[1].evaluate(gbt_pred))

RMSE = 257.152
r2 = 0.83998
