In [1]:
sc

In [2]:
spark

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.functions import (
    col, count, date_format,
    window, date_sub, hour,
    date_trunc, mean, month,
    next_day, to_timestamp, weekofyear,
    window, year, dayofweek
)

# Notebook for training ml models for stations and write back the predictions in a BigQuery table
---
### Reading in the complete dataset

In [4]:
bike_df = spark.read.format('bigquery').option(
    'project', 'welu420'
).option('table', 'Bikesharing.System_Data').load()

In [5]:
bike_df.show(3)

[Stage 0:>                                                          (0 + 1) / 1]

+-----------+--------+-------------------+-------------------+--------------------+--------------------+--------------+--------------------+------------------+------------+----------------+-----------------+----------------+-----------------+-----------+-------+----+------------+-------------+----+----+-----------+-------+
|Bike_number|Duration|         Start_date|           End_date|       Start_station|Start_station_number|Start_capacity|         End_station|End_station_number|End_capacity|       start_lat|        start_lon|         end_lat|          end_lon|Member_type|Holiday|temp|rel_humidity|precipitation|snow|wspd|sun_minutes|ride_id|
+-----------+--------+-------------------+-------------------+--------------------+--------------------+--------------+--------------------+------------------+------------+----------------+-----------------+----------------+-----------------+-----------+-------+----+------------+-------------+----+----+-----------+-------+
|     W00939|     376|201

                                                                                

#### Code for the development purpose. Not to use in the production.

In [7]:
# filter to get the dataframe for a particular station
# union_station = bike_df.filter((bike_df.Start_station == 'Columbus Circle / Union Station') | 
#                                (bike_df.End_station == 'Columbus Circle / Union Station'))

In [8]:
# del bike_df

In [9]:
# union_station.toPandas().head(3)

                                                                                

Unnamed: 0,Bike_number,Duration,Start_date,End_date,Start_station,Start_station_number,Start_capacity,End_station,End_station_number,End_capacity,...,end_lon,Member_type,Holiday,temp,rel_humidity,precipitation,snow,wspd,sun_minutes,ride_id
0,W00721,587,2011-02-27 15:11:50,2011-02-27 15:21:37,13th & D St NE,31622,31,Columbus Circle / Union Station,31623,55,...,-77.004936,Casual,,6.1,79.0,0.0,,11.2,,198071
1,W00779,412,2011-03-22 09:31:08,2011-03-22 09:38:00,13th & D St NE,31622,31,Columbus Circle / Union Station,31623,55,...,-77.004936,Member,,11.7,86.0,0.0,,11.2,,244029
2,W00416,2811,2011-04-18 15:02:10,2011-04-18 15:49:02,Georgetown Harbor / 30th St NW,31215,19,Columbus Circle / Union Station,31623,55,...,-77.004936,Casual,,17.2,54.0,0.0,,7.6,,308511


In [10]:
# mean temperature for replacing nan values in temperature column
# mean_temp = union_station.select(mean('temp')).collect()[0][0]
# mean_temp

                                                                                

17.523396611798503

In [11]:
union_station = union_station.withColumn('minutes',union_station.Duration/60)

In [12]:
converting duration in minutes and saving in a new column
union_station = union_station.withColumn('minutes',union_station.Duration/60)
handling missing values in the data
union_station = union_station.fillna(
    {
        'temp': mean_temp,
        'Holiday': 'No-Holiday',
        'rel_humidity': 0,
        'precipitation': 0,
        'snow': 0,
        'sun_minutes': 0,
    }
)

In [13]:
# creating new columns: hour_of_day, day_of_week, month and year from timestamp
union_station = union_station.withColumn('hour_of_day', hour('Start_date').alias('hour_of_day'))
union_station = union_station.withColumn('year', year('Start_date').alias('year'))
union_station = union_station.withColumn('month', month('Start_date').alias('month'))
union_station = union_station.withColumn('day_of_week', dayofweek('Start_date').alias('day_of_week'))

In [14]:
# conditional column: 1 if holiday, 0 otherwise
union_station = union_station.withColumn(
    'is_holiday',
    F.when((F.col("Holiday") == 'No-Holiday'), 0).otherwise(1)
)

In [15]:
# conditional column: 1 if weekend, 0 if day of week between 0-5
union_station = union_station.withColumn(
    'workday',
    F.when((F.col('day_of_week') == 5) & (F.col('day_of_week') == 6), 0).otherwise(1)
)

In [16]:
union_station.show(3)

In [17]:
# rides ending at this station
# duration is at least 5 minutes
incomings = union_station.filter( \
    (union_station['End_station'] == 'Columbus Circle / Union Station') & \
    (union_station['minutes'] >= 5))

# rides begining from this station
outgoings = union_station.filter( \
    (union_station['Start_station'] == 'Columbus Circle / Union Station') & \
    (union_station['minutes'] >= 5))

In [18]:
incomings.show(2)

22/06/02 13:21:12 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-----------+--------+-------------------+-------------------+--------------+--------------------+--------------+--------------------+------------------+------------+----------------+-----------------+----------------+-----------------+-----------+----------+----+------------+-------------+----+----+-----------+-------+-----------------+-----------+----+-----+-----------+----------+-------+
|Bike_number|Duration|         Start_date|           End_date| Start_station|Start_station_number|Start_capacity|         End_station|End_station_number|End_capacity|       start_lat|        start_lon|         end_lat|          end_lon|Member_type|   Holiday|temp|rel_humidity|precipitation|snow|wspd|sun_minutes|ride_id|          minutes|hour_of_day|year|month|day_of_week|is_holiday|workday|
+-----------+--------+-------------------+-------------------+--------------+--------------------+--------------+--------------------+------------------+------------+----------------+-----------------+-----------

                                                                                

In [19]:
incomings.dtypes

In [21]:
features: temp, rel_humidity, precipitation, snow, wspd, sun_minutes, is_holiday, workday, month, hour_of_day, year
to predict: rentals in hour

In [22]:
# grouping by every hour to get the total number of rides for every hour
# using mean for other features to keep them in the table
# mean makes no difference as the values for one hour are always same
incomings = (
   incomings
    .groupBy(window(col("Start_date"), "1 hour").alias("hour"))
    .agg(
        F.count('ride_id').alias('rentals_in_hour'), F.mean('temp').alias('temp'), F.mean('rel_humidity').alias('rel_humidity'),
        F.mean('precipitation').alias('precipitation'), F.mean('snow').alias('snow'), F.mean('wspd').alias('wspd'),
        F.mean('sun_minutes').alias('sun_minutes'), F.mean('is_holiday').alias('is_holiday'),
        F.mean('workday').alias('workday'), F.mean('month').alias('month'), F.mean('hour_of_day').alias('hour_of_day'),
        F.mean('year').alias('year')
    )
)

outgoings = (
   outgoings
    .groupBy(window(col("Start_date"), "1 hour").alias("hour"))
    .agg(
        F.count('ride_id').alias('rentals_in_hour'), F.mean('temp').alias('temp'), F.mean('rel_humidity').alias('rel_humidity'),
        F.mean('precipitation').alias('precipitation'), F.mean('snow').alias('snow'), F.mean('wspd').alias('wspd'),
        F.mean('sun_minutes').alias('sun_minutes'), F.mean('is_holiday').alias('is_holiday'),
        F.mean('workday').alias('workday'), F.mean('month').alias('month'), F.mean('hour_of_day').alias('hour_of_day'),
        F.mean('year').alias('year')
    )
)

In [23]:
incomings.toPandas().head(2)

                                                                                

Unnamed: 0,hour,rentals_in_hour,temp,rel_humidity,precipitation,snow,wspd,sun_minutes,is_holiday,workday,month,hour_of_day,year
0,"(2012-06-12 07:00:00, 2012-06-12 08:00:00)",11,23.9,71.0,0.0,0.0,16.6,0.0,0.0,1.0,6.0,7.0,2012.0
1,"(2013-08-26 08:00:00, 2013-08-26 09:00:00)",35,18.9,73.0,0.0,0.0,13.0,0.0,0.0,1.0,8.0,8.0,2013.0


In [24]:
incomings.count()

                                                                                

59131

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import GBTRegressor

In [26]:
# train test split
outgoings_train = outgoings.where(F.col('year') < 2018)
outgoings_test = outgoings.where(F.col('year') > 2017)
incomings_train = incomings.where(F.col('year') < 2018)
incomings_test = incomings.where(F.col('year') > 2017)

In [27]:
# temp rel_humidity precipitation snow wspd sun_minutes is_holiday workday month hour year
va = VectorAssembler(inputCols=['temp', 'rel_humidity', 'precipitation', 'snow', 'wspd',
                               'sun_minutes', 'is_holiday', 'workday', 'month', 'hour_of_day', 'year'],outputCol='features')
outgoings_ml = va.transform(outgoings).select('features','rentals_in_hour')
incomings_ml = va.transform(incomings).select('features','rentals_in_hour')

In [None]:
from pyspark.ml import Pipeline

In [29]:
# creating our pipeline
# stage 1: create feature vector
vectorising_stage = VectorAssembler(inputCols=['temp', 'rel_humidity', 'precipitation', 'snow', 'wspd',
                                               'sun_minutes', 'is_holiday', 'workday', 'month', 'hour_of_day',
                                               'year'], outputCol='features')
# stage 2: scale feature vector
scaling_stage = StandardScaler(inputCol="features", outputCol="features_scaled")

# stage 3: estimator
gbr_stage = GBTRegressor(featuresCol='features_scaled', labelCol='rentals_in_hour')

gbr_pipe = Pipeline(stages=[vectorising_stage, scaling_stage, gbr_stage])

In [31]:
outgoings_model = gbr_pipe.fit(outgoings_train)
outgoings_train = outgoings_model.transform(outgoings_train)

In [32]:
# evaluate the model
from pyspark.ml.evaluation import RegressionEvaluator

outgoings_test = outgoings_model.transform(outgoings_test)

# select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(labelCol="rentals_in_hour", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(outgoings_test)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

[Stage 335:>                                                        (0 + 8) / 9]

Root Mean Squared Error (RMSE) on test data = 5.87169


                                                                                

In [33]:
incomings_model = gbr_pipe.fit(incomings_train)
incomings_train = incomings_model.transform(incomings_train)

                                                                                

In [34]:
incomings_test = incomings_model.transform(incomings_test)
# select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(labelCol="rentals_in_hour", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(incomings_test)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)



Root Mean Squared Error (RMSE) on test data = 6.38498


                                                                                

In [35]:
import pandas as pd

In [36]:
outgoings_pd_df = outgoings_test.toPandas()
outgoings_pd_df.head()

                                                                                

Unnamed: 0,hour,rentals_in_hour,temp,rel_humidity,precipitation,snow,wspd,sun_minutes,is_holiday,workday,month,hour_of_day,year,features,features_scaled,prediction
0,"(2018-10-02 18:00:00, 2018-10-02 19:00:00)",23,28.9,57.0,0.0,0.0,20.5,0.0,0.0,1.0,10.0,18.0,2018.0,"[28.9, 57.0, 0.0, 0.0, 20.5, 0.0, 0.0, 1.0, 10...","[2.9452533173777047, 2.973778726395736, 0.0, 0...",18.43924
1,"(2018-05-25 08:00:00, 2018-05-25 09:00:00)",37,18.9,81.0,0.0,0.0,7.6,0.0,0.0,1.0,5.0,8.0,2018.0,"[18.9, 81.0, 0.0, 0.0, 7.6, 0.0, 0.0, 1.0, 5.0...","[1.9261345224373223, 4.225896084878151, 0.0, 0...",27.576794
2,"(2018-07-20 08:00:00, 2018-07-20 09:00:00)",32,22.2,76.0,0.0,0.0,0.0,0.0,0.0,1.0,7.0,8.0,2018.0,"(22.2, 76.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 7.0...","(2.2624437247676483, 3.9650383018609814, 0.0, ...",27.576794
3,"(2018-07-14 20:00:00, 2018-07-14 21:00:00)",10,32.8,31.0,0.0,0.0,22.3,0.0,0.0,1.0,7.0,20.0,2018.0,"[32.8, 31.0, 0.0, 0.0, 22.300000000000004, 0.0...","[3.3427096474044533, 1.6173182547064529, 0.0, ...",6.295239
4,"(2018-10-18 18:00:00, 2018-10-18 19:00:00)",19,14.4,36.0,0.0,0.0,24.1,0.0,0.0,1.0,10.0,18.0,2018.0,"[14.4, 36.0, 0.0, 0.0, 24.1, 0.0, 0.0, 1.0, 10...","[1.4675310647141504, 1.8781760377236227, 0.0, ...",12.93982


In [37]:
outgoings_pd_df.hour = pd.to_datetime(outgoings_pd_df.hour.str[0])

In [38]:
outgoings_pd_df = outgoings_pd_df[['hour', 'prediction']]
outgoings_pd_df.head()

Unnamed: 0,hour,prediction
0,2018-10-02 18:00:00,18.43924
1,2018-05-25 08:00:00,27.576794
2,2018-07-20 08:00:00,27.576794
3,2018-07-14 20:00:00,6.295239
4,2018-10-18 18:00:00,12.93982


In [39]:
outgoings_pd_df.prediction = outgoings_pd_df.prediction.round(0).astype(int)

In [40]:
outgoings_pd_df.head(2)

Unnamed: 0,hour,prediction
0,2018-10-02 18:00:00,18
1,2018-05-25 08:00:00,28


In [41]:
outgoings_preds = spark.createDataFrame(outgoings_pd_df)

In [42]:
bucket = "gs://dscb430-analysis-data-bucket/ml_temp"
spark.conf.set('temporaryGcsBucket', bucket)

In [43]:
outgoings_preds.show()

[Stage 659:>                                                        (0 + 1) / 1]

+-------------------+----------+
|               hour|prediction|
+-------------------+----------+
|2018-10-02 18:00:00|        18|
|2018-05-25 08:00:00|        28|
|2018-07-20 08:00:00|        28|
|2018-07-14 20:00:00|         6|
|2018-10-18 18:00:00|        13|
|2018-05-26 23:00:00|         2|
|2018-07-11 21:00:00|         5|
|2018-02-09 07:00:00|        19|
|2019-07-18 06:00:00|        15|
|2019-11-18 08:00:00|        26|
|2018-07-18 19:00:00|        10|
|2019-02-15 18:00:00|        18|
|2019-01-10 14:00:00|         3|
|2019-04-05 17:00:00|         9|
|2018-11-28 07:00:00|        23|
|2018-02-27 17:00:00|        14|
|2018-02-13 09:00:00|         6|
|2019-09-27 15:00:00|         7|
|2018-03-31 18:00:00|        13|
|2019-07-11 14:00:00|         6|
+-------------------+----------+
only showing top 20 rows



                                                                                

In [44]:
outgoings_preds.write \
   .format("bigquery") \
   .option("temporaryGcsBucket","dscb430-analysis-data-bucket/ml_temp") \
   .mode("overwrite") \
   .save("welu420.Bikesharing.unionstation_preds")

                                                                                

In [45]:
# type(outgoings)

pyspark.sql.dataframe.DataFrame

#### Function for getting and saving predictions for stations
A better option would be to create a class for this and divide different operations to methods 

In [48]:
from pyspark.sql import DataFrame as SparkDataFrame

def get_save_predictions_for(station: str, bike_df: SparkDataFrame) -> None:
    """
    Args:
        station: (str) Name of the station.
        bike_df: (SparkDataFrame) data for all the stations.
    Returns: None,
                trains the model for the given station and writes back the predictions (incomings and outgoing)
                to the table in BigQuery.
    Example(s):
        (i) # get all unique stations in the dataframe
            stations = get_unique_stations(dataframe(stations_col)) # get syntax for pyspark
            for station in stations:
                get_save_predictions_for(station, pyspark_dataframe)
                
        (ii) station = 'union_station' # station which exists in the data
             get_save_predictions_for(station, pyspark_dataframe)
    """
    global bucket
    
    # filter to get the dataframe for the given station
    station_df = bike_df.filter((bike_df.Start_station == station) | 
                               (bike_df.End_station == station))
    # handling missing values and preparing data
    # converting duration in minutes and saving in a new column
    station_df = station_df.withColumn('minutes',station_df.Duration/60)
    # mean temperature to replace the nan values in temperature column
    mean_temp = station_df.select(mean('temp')).collect()[0][0]
    
    # replace nan values
    station_df = station_df.fillna(
        {
            'temp': mean_temp,
            'Holiday': 'No-Holiday',
            'rel_humidity': 0,
            'precipitation': 0,
            'snow': 0,
            'sun_minutes': 0,
        }
    )
    # creating new columns: hour_of_day, day_of_week, month and year from timestamp
    station_df = station_df.withColumn('hour_of_day', hour('Start_date').alias('hour_of_day'))
    station_df = station_df.withColumn('year', year('Start_date').alias('year'))
    station_df = station_df.withColumn('month', month('Start_date').alias('month'))
    station_df = station_df.withColumn('day_of_week', dayofweek('Start_date').alias('day_of_week'))
    # conditional column: 1 if holiday, 0 otherwise
    station_df = station_df.withColumn(
        'is_holiday',
        F.when((F.col("Holiday") == 'No-Holiday'), 0).otherwise(1)
    )
    # conditional column: 1 if weekend, 0 if day of week between 0-5
    station_df = station_df.withColumn(
        'workday',
        F.when((F.col('day_of_week') == 5) & (F.col('day_of_week') == 6), 0).otherwise(1)
    )
    # rides ending at this station
    # duration is at least 5 minutes
    incomings = station_df.filter( \
        (station_df['End_station'] == 'Columbus Circle / Union Station') & \
        (station_df['minutes'] >= 5))

    # rides begining from this station
    outgoings = station_df.filter( \
        (station_df['Start_station'] == 'Columbus Circle / Union Station') & \
        (station_df['minutes'] >= 5))
    # features: temp, rel_humidity, precipitation, snow, wspd, sun_minutes, is_holiday, workday, month, hour_of_day, year
    # to predict: rentals in hour
    # # grouping by every hour to get the total number of rides for every hour
    # # using mean for other features to keep them in the table
    # # mean makes no difference as the values for one hour are always same
    incomings = (
       incomings
        .groupBy(window(col("Start_date"), "1 hour").alias("hour"))
        .agg(
            F.count('ride_id').alias('rentals_in_hour'), F.mean('temp').alias('temp'), F.mean('rel_humidity').alias('rel_humidity'),
            F.mean('precipitation').alias('precipitation'), F.mean('snow').alias('snow'), F.mean('wspd').alias('wspd'),
            F.mean('sun_minutes').alias('sun_minutes'), F.mean('is_holiday').alias('is_holiday'),
            F.mean('workday').alias('workday'), F.mean('month').alias('month'), F.mean('hour_of_day').alias('hour_of_day'),
            F.mean('year').alias('year')
        )
    )

    outgoings = (
       outgoings
        .groupBy(window(col("Start_date"), "1 hour").alias("hour"))
        .agg(
            F.count('ride_id').alias('rentals_in_hour'), F.mean('temp').alias('temp'), F.mean('rel_humidity').alias('rel_humidity'),
            F.mean('precipitation').alias('precipitation'), F.mean('snow').alias('snow'), F.mean('wspd').alias('wspd'),
            F.mean('sun_minutes').alias('sun_minutes'), F.mean('is_holiday').alias('is_holiday'),
            F.mean('workday').alias('workday'), F.mean('month').alias('month'), F.mean('hour_of_day').alias('hour_of_day'),
            F.mean('year').alias('year')
        )
    )
    # train test split
    outgoings_train = outgoings.where(F.col('year') < 2018)
    outgoings_test = outgoings.where(F.col('year') > 2017)
    incomings_train = incomings.where(F.col('year') < 2018)
    incomings_test = incomings.where(F.col('year') > 2017)
    # creating our pipeline
    # stage 1: create feature vector
    vectorising_stage = VectorAssembler(inputCols=['temp', 'rel_humidity', 'precipitation', 'snow', 'wspd',
                                                   'sun_minutes', 'is_holiday', 'workday', 'month', 'hour_of_day',
                                                   'year'], outputCol='features')
    # stage 2: scale feature vector
    scaling_stage = StandardScaler(inputCol="features", outputCol="features_scaled")

    # stage 3: estimator
    gbr_stage = GBTRegressor(featuresCol='features_scaled', labelCol='rentals_in_hour')

    gbr_pipe = Pipeline(stages=[vectorising_stage, scaling_stage, gbr_stage])
    
    # fit the model on train data
    # model/pipe for outgoings
    outgoings_model = gbr_pipe.fit(outgoings_train)
    # transform train data
    outgoings_train = outgoings_model.transform(outgoings_train)
    # transform test data and make predictions (running the pipeline on it)
    # predictions for outgoings
    outgoings_test = outgoings_model.transform(outgoings_test)

    # select (prediction, true label) and compute test error
    # evaluate the model 
    evaluator = RegressionEvaluator(labelCol="rentals_in_hour", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(outgoings_test)
    # fit the model for the incomings
    incomings_model = gbr_pipe.fit(incomings_train)
    incomings_train = incomings_model.transform(incomings_train)
    # transform test data and make predictions (running the pipeline on it)
    # predictions for incomings
    incomings_test = incomings_model.transform(incomings_test)
    # select (prediction, true label) and compute test error
    evaluator = RegressionEvaluator(labelCol="rentals_in_hour", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(incomings_test)
    # need to get the prediction column and clean the table before uploading
    outgoings_pd_df = outgoings_test.toPandas()
    incomings_pd_df = incomings_test.toPandas()
    # datetime col was a tuple
    outgoings_pd_df.hour = pd.to_datetime(outgoings_pd_df.hour.str[0])
    incomings_pd_df.hour = pd.to_datetime(incomings_pd_df.hour.str[0])
    # dropping all other columns
    outgoings_pd_df = outgoings_pd_df[['hour', 'prediction']]
    incomings_pd_df = incomings_pd_df[['hour', 'prediction']]
    outgoings_pd_df.prediction = outgoings_pd_df.prediction.round(0).astype(int)
    incomings_pd_df.prediction = incomings_pd_df.prediction.round(0).astype(int)
    # writing it back to a Spark DataFrame
    outgoings_preds = spark.createDataFrame(outgoings_pd_df)
    incomings_preds = spark.createDataFrame(incomings_pd_df)
    
    outgoings_preds.write \
      .format("bigquery") \
      .option("temporaryGcsBucket","dscb430-analysis-data-bucket/ml_temp") \
      .mode("append") \
      .save("welu420.Bikesharing.outgoing_predictions")
    incomings_preds.write \
      .format("bigquery") \
      .option("temporaryGcsBucket","dscb430-analysis-data-bucket/ml_temp") \
      .mode("overwrite") \
      .save("welu420.Bikesharing.incoming_predictions")

In [None]:
stations = [x.Start_station for x in bike_df.select('Start_station').distinct().collect()]
#stations

In [None]:
import pandas as pd
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

bucket = "gs://dscb430-analysis-data-bucket/ml_temp"
spark.conf.set('temporaryGcsBucket', bucket)

for station in stations:
    get_save_predictions_for(station, bike_df)