In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("bike-sharing-demand").getOrCreate()

In [2]:
trainDF = spark.read.csv("./data/train.csv", header=True, inferSchema=True)
testDF = spark.read.csv("./data/test.csv", header=True, inferSchema=True)

In [3]:
trainDF.printSchema()

root
 |-- datetime: string (nullable = true)
 |-- season: integer (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- weather: integer (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- casual: integer (nullable = true)
 |-- registered: integer (nullable = true)
 |-- count: integer (nullable = true)



In [4]:
trainDF.count()

10886

In [5]:
trainDF = trainDF.na.drop()

In [6]:
trainDF.count()

10886

In [7]:
trainDF.show()

+-------------------+------+-------+----------+-------+-----+------+--------+---------+------+----------+-----+
|           datetime|season|holiday|workingday|weather| temp| atemp|humidity|windspeed|casual|registered|count|
+-------------------+------+-------+----------+-------+-----+------+--------+---------+------+----------+-----+
|2011-01-01 00:00:00|     1|      0|         0|      1| 9.84|14.395|      81|      0.0|     3|        13|   16|
|2011-01-01 01:00:00|     1|      0|         0|      1| 9.02|13.635|      80|      0.0|     8|        32|   40|
|2011-01-01 02:00:00|     1|      0|         0|      1| 9.02|13.635|      80|      0.0|     5|        27|   32|
|2011-01-01 03:00:00|     1|      0|         0|      1| 9.84|14.395|      75|      0.0|     3|        10|   13|
|2011-01-01 04:00:00|     1|      0|         0|      1| 9.84|14.395|      75|      0.0|     0|         1|    1|
|2011-01-01 05:00:00|     1|      0|         0|      2| 9.84| 12.88|      75|   6.0032|     0|         1

In [8]:
from pyspark.sql.functions import when, lit
trainDF = trainDF.withColumn('season_1', when(trainDF['season']==1, lit(1)).otherwise(lit(0))) \
                .withColumn('season_2', when(trainDF['season']==2, lit(1)).otherwise(lit(0))) \
                .withColumn('season_3', when(trainDF['season']==3, lit(1)).otherwise(lit(0))) \
                .withColumn('season_4', when(trainDF['season']==4, lit(1)).otherwise(lit(0))).drop(trainDF['season'])

In [9]:
trainDF.columns

['datetime',
 'holiday',
 'workingday',
 'weather',
 'temp',
 'atemp',
 'humidity',
 'windspeed',
 'casual',
 'registered',
 'count',
 'season_1',
 'season_2',
 'season_3',
 'season_4']

In [10]:
trainDF = trainDF.withColumn('weather_1', when(trainDF['weather']==1, lit(1)).otherwise(lit(0))) \
                .withColumn('weather_2', when(trainDF['weather']==2, lit(1)).otherwise(lit(0))) \
                .withColumn('weather_3', when(trainDF['weather']==3, lit(1)).otherwise(lit(0))) \
                .withColumn('weather_4', when(trainDF['weather']==4, lit(1)).otherwise(lit(0))).drop(trainDF['weather'])

In [11]:
trainDF.columns

['datetime',
 'holiday',
 'workingday',
 'temp',
 'atemp',
 'humidity',
 'windspeed',
 'casual',
 'registered',
 'count',
 'season_1',
 'season_2',
 'season_3',
 'season_4',
 'weather_1',
 'weather_2',
 'weather_3',
 'weather_4']

In [12]:
from pyspark.sql.functions import split

trainDF = trainDF.withColumn('year', split(split(trainDF['datetime'], ' ')[0], '-')[0].cast('int'))
trainDF = trainDF.withColumn('month', split(split(trainDF['datetime'], ' ')[0], '-')[1].cast('int'))
trainDF = trainDF.withColumn('day', split(split(trainDF['datetime'], ' ')[0], '-')[2].cast('int'))
trainDF = trainDF.withColumn('hour', split(split(trainDF['datetime'], ' ')[1], ':')[0].cast('int'))

In [13]:
trainDF.select('year', 'month', 'day', 'hour').show()

+----+-----+---+----+
|year|month|day|hour|
+----+-----+---+----+
|2011|    1|  1|   0|
|2011|    1|  1|   1|
|2011|    1|  1|   2|
|2011|    1|  1|   3|
|2011|    1|  1|   4|
|2011|    1|  1|   5|
|2011|    1|  1|   6|
|2011|    1|  1|   7|
|2011|    1|  1|   8|
|2011|    1|  1|   9|
|2011|    1|  1|  10|
|2011|    1|  1|  11|
|2011|    1|  1|  12|
|2011|    1|  1|  13|
|2011|    1|  1|  14|
|2011|    1|  1|  15|
|2011|    1|  1|  16|
|2011|    1|  1|  17|
|2011|    1|  1|  18|
|2011|    1|  1|  19|
+----+-----+---+----+
only showing top 20 rows



In [14]:
trainDF.groupBy('month').sum('count').show()

+-----+----------+
|month|sum(count)|
+-----+----------+
|   12|    160160|
|    1|     79884|
|    6|    220733|
|    3|    133501|
|    5|    200147|
|    9|    212529|
|    4|    167402|
|    8|    213516|
|    7|    214617|
|   10|    207434|
|   11|    176440|
|    2|     99113|
+-----+----------+



In [15]:
## gathering all the features into one array using VectorAssembler
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['holiday', \
                                         'workingday', \
                                         'temp', \
                                         'atemp', \
                                         'humidity', \
                                         'windspeed', \
                                         'casual', \
                                         'registered', \
                                         'season_1', \
                                         'season_2', \
                                         'season_3', \
                                         'season_4', \
                                         'weather_1', \
                                         'weather_2', \
                                         'weather_3', \
                                         'weather_4', \
                                         'year', \
                                         'month', \
                                         'day', \
                                         'hour'], outputCol="features")

In [16]:
output = assembler.transform(trainDF)
output.show()

+-------------------+-------+----------+-----+------+--------+---------+------+----------+-----+--------+--------+--------+--------+---------+---------+---------+---------+----+-----+---+----+--------------------+
|           datetime|holiday|workingday| temp| atemp|humidity|windspeed|casual|registered|count|season_1|season_2|season_3|season_4|weather_1|weather_2|weather_3|weather_4|year|month|day|hour|            features|
+-------------------+-------+----------+-----+------+--------+---------+------+----------+-----+--------+--------+--------+--------+---------+---------+---------+---------+----+-----+---+----+--------------------+
|2011-01-01 00:00:00|      0|         0| 9.84|14.395|      81|      0.0|     3|        13|   16|       1|       0|       0|       0|        1|        0|        0|        0|2011|    1|  1|   0|(20,[2,3,4,6,7,8,...|
|2011-01-01 01:00:00|      0|         0| 9.02|13.635|      80|      0.0|     8|        32|   40|       1|       0|       0|       0|        1|  

In [17]:
final_df = output.select('datetime', 'features', 'count')

In [18]:
final_df.show()

+-------------------+--------------------+-----+
|           datetime|            features|count|
+-------------------+--------------------+-----+
|2011-01-01 00:00:00|(20,[2,3,4,6,7,8,...|   16|
|2011-01-01 01:00:00|(20,[2,3,4,6,7,8,...|   40|
|2011-01-01 02:00:00|(20,[2,3,4,6,7,8,...|   32|
|2011-01-01 03:00:00|(20,[2,3,4,6,7,8,...|   13|
|2011-01-01 04:00:00|(20,[2,3,4,7,8,12...|    1|
|2011-01-01 05:00:00|(20,[2,3,4,5,7,8,...|    1|
|2011-01-01 06:00:00|(20,[2,3,4,6,8,12...|    2|
|2011-01-01 07:00:00|(20,[2,3,4,6,7,8,...|    3|
|2011-01-01 08:00:00|(20,[2,3,4,6,7,8,...|    8|
|2011-01-01 09:00:00|(20,[2,3,4,6,7,8,...|   14|
|2011-01-01 10:00:00|(20,[2,3,4,5,6,7,...|   36|
|2011-01-01 11:00:00|(20,[2,3,4,5,6,7,...|   56|
|2011-01-01 12:00:00|(20,[2,3,4,5,6,7,...|   84|
|2011-01-01 13:00:00|(20,[2,3,4,5,6,7,...|   94|
|2011-01-01 14:00:00|(20,[2,3,4,5,6,7,...|  106|
|2011-01-01 15:00:00|(20,[2,3,4,5,6,7,...|  110|
|2011-01-01 16:00:00|(20,[2,3,4,5,6,7,...|   93|
|2011-01-01 17:00:00

In [19]:
from pyspark.ml.regression import LinearRegression
train_data, test_data = final_df.randomSplit([0.80, 0.20])
regressor = LinearRegression(featuresCol='features', labelCol='count')
regressor = regressor.fit(train_data)

In [20]:
regressor.coefficients

DenseVector([0.0, -0.0, -0.0, 0.0, -0.0, -0.0, 1.0, 1.0, 0.0002, 0.0001, -0.0001, -0.0002, 0.0, -0.0, 0.0, 0.0, 0.0, 0.0, -0.0, 0.0])

In [21]:
regressor.intercept

-0.04313314507693853

In [22]:
pred_results = regressor.evaluate(test_data)

In [23]:
pred_results.predictions.show()

+-------------------+--------------------+-----+------------------+
|           datetime|            features|count|        prediction|
+-------------------+--------------------+-----+------------------+
|2011-01-01 09:00:00|(20,[2,3,4,6,7,8,...|   14|13.999978514932085|
|2011-01-01 10:00:00|(20,[2,3,4,5,6,7,...|   36| 35.99997175991032|
|2011-01-01 15:00:00|(20,[2,3,4,5,6,7,...|  110|109.99995345296183|
|2011-01-01 17:00:00|(20,[2,3,4,5,6,7,...|   67| 66.99994766371339|
|2011-01-01 22:00:00|(20,[2,3,4,5,6,7,...|   28|27.999941070989152|
|2011-01-02 11:00:00|(20,[2,3,4,5,6,7,...|   70| 69.99995487572122|
|2011-01-02 17:00:00|(20,[2,3,4,5,6,7,...|   65|  64.9999914338179|
|2011-01-02 21:00:00|(20,[2,3,4,5,6,7,...|   31| 30.99999966114603|
|2011-01-02 23:00:00|(20,[2,3,4,5,7,8,...|    8| 8.000004244636637|
|2011-01-03 06:00:00|(20,[1,2,3,4,5,7,...|   30|29.999978281770453|
|2011-01-03 08:00:00|[0.0,1.0,5.74,6.0...|  154|153.99998685119584|
|2011-01-03 09:00:00|[0.0,1.0,6.56,6.8...|   88|

In [24]:
## Root Mean Square Error and Mean Absolute Error
pred_results.meanAbsoluteError

4.0823111889379854e-05

In [25]:
pred_results.meanSquaredError

2.551507048289733e-09

In [26]:
modelPath = "./model/bike-sharing-demand.model"
regressor.write().overwrite().save(modelPath)

In [27]:
pred_results.predictions.show()

+-------------------+--------------------+-----+------------------+
|           datetime|            features|count|        prediction|
+-------------------+--------------------+-----+------------------+
|2011-01-01 09:00:00|(20,[2,3,4,6,7,8,...|   14|13.999978514932085|
|2011-01-01 10:00:00|(20,[2,3,4,5,6,7,...|   36| 35.99997175991032|
|2011-01-01 15:00:00|(20,[2,3,4,5,6,7,...|  110|109.99995345296183|
|2011-01-01 17:00:00|(20,[2,3,4,5,6,7,...|   67| 66.99994766371339|
|2011-01-01 22:00:00|(20,[2,3,4,5,6,7,...|   28|27.999941070989152|
|2011-01-02 11:00:00|(20,[2,3,4,5,6,7,...|   70| 69.99995487572122|
|2011-01-02 17:00:00|(20,[2,3,4,5,6,7,...|   65|  64.9999914338179|
|2011-01-02 21:00:00|(20,[2,3,4,5,6,7,...|   31| 30.99999966114603|
|2011-01-02 23:00:00|(20,[2,3,4,5,7,8,...|    8| 8.000004244636637|
|2011-01-03 06:00:00|(20,[1,2,3,4,5,7,...|   30|29.999978281770453|
|2011-01-03 08:00:00|[0.0,1.0,5.74,6.0...|  154|153.99998685119584|
|2011-01-03 09:00:00|[0.0,1.0,6.56,6.8...|   88|

In [45]:
df_to_db = pred_results.predictions.select('datetime', 'prediction')

In [46]:
df_to_db.show()

+-------------------+------------------+
|           datetime|        prediction|
+-------------------+------------------+
|2011-01-01 09:00:00|13.999978514932085|
|2011-01-01 10:00:00| 35.99997175991032|
|2011-01-01 15:00:00|109.99995345296183|
|2011-01-01 17:00:00| 66.99994766371339|
|2011-01-01 22:00:00|27.999941070989152|
|2011-01-02 11:00:00| 69.99995487572122|
|2011-01-02 17:00:00|  64.9999914338179|
|2011-01-02 21:00:00| 30.99999966114603|
|2011-01-02 23:00:00| 8.000004244636637|
|2011-01-03 06:00:00|29.999978281770453|
|2011-01-03 08:00:00|153.99998685119584|
|2011-01-03 09:00:00| 87.99998795697006|
|2011-01-03 10:00:00|  43.9999903454323|
|2011-01-03 22:00:00|19.999977487268207|
|2011-01-04 01:00:00|1.9999778223908962|
|2011-01-04 05:00:00|3.9999736287562837|
|2011-01-04 07:00:00| 93.99997195450267|
|2011-01-04 08:00:00|178.99997674959792|
|2011-01-04 09:00:00| 99.99997524996259|
|2011-01-04 17:00:00|211.99999326740388|
+-------------------+------------------+
only showing top

In [47]:
df_to_db.printSchema()

root
 |-- datetime: string (nullable = true)
 |-- prediction: double (nullable = false)



In [48]:
from pyspark.sql.functions import to_timestamp
# df.select(to_timestamp(df.t, 'yyyy-MM-dd HH:mm:ss').alias('dt'))
df_to_db = df_to_db.withColumn('datetime', to_timestamp(df_to_db['datetime'], 'yyyy-MM-dd HH:mm:ss'))

In [50]:
df_to_db.printSchema()

root
 |-- datetime: timestamp (nullable = true)
 |-- prediction: double (nullable = false)



In [51]:
mode = "overwrite"
url = "jdbc:oracle:thin:@localhost:1521:orcl"
properties = {
    "user": "hr",
    "password": "hr"
 }
#CREATE TABLE bike_sharing_preditions( prediction VARCHAR(50), datetime DOUBLE);
df_to_db.write.jdbc(url=url, table="bike_sharing_predictions", mode=mode, properties=properties)
# Select (prediction, true label) and compute test error
