In [1]:
from pyspark.sql import Row, SQLContext
from pyspark.sql.types  import *
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer

## Importing DataFrame objects from MongoDB 

In [2]:
statusDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/msan697.status").load()

stationDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/msan697.station").load()

weatherDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/msan697.weather").load()

tripDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/msan697.trip").load()

In [3]:
statusDF.show(n=5)

+--------------------+---------------+---------------+----------+-------------------+
|                 _id|bikes_available|docks_available|station_id|               time|
+--------------------+---------------+---------------+----------+-------------------+
|[5a5d49a291bd3626...|              2|             25|         2|2013/08/29 12:06:01|
|[5a5d49a291bd3626...|              2|             25|         2|2013/08/29 12:07:01|
|[5a5d49a291bd3626...|              2|             25|         2|2013/08/29 12:08:01|
|[5a5d49a291bd3626...|              2|             25|         2|2013/08/29 12:09:01|
|[5a5d49a291bd3626...|              2|             25|         2|2013/08/29 12:10:01|
+--------------------+---------------+---------------+----------+-------------------+
only showing top 5 rows



In [4]:
stationDF.show(n=5)

+--------------------+--------+----------+---+-----------------+------------------+-------------------+--------------------+
|                 _id|    city|dock_count| id|installation_date|               lat|               long|                name|
+--------------------+--------+----------+---+-----------------+------------------+-------------------+--------------------+
|[5a5d498991bd3626...|San Jose|        19|  5|         8/5/2013|         37.331415|          -121.8932|    Adobe on Almaden|
|[5a5d498991bd3626...|San Jose|        15|  6|         8/7/2013|37.336721000000004|        -121.894074|    San Pedro Square|
|[5a5d498991bd3626...|San Jose|        15|  7|         8/7/2013|         37.333798|-121.88694299999999|Paseo de San Antonio|
|[5a5d498991bd3626...|San Jose|        15|  8|         8/5/2013|         37.330165|-121.88583100000001| San Salvador at 1st|
|[5a5d498991bd3626...|San Jose|        15|  9|         8/5/2013|         37.348742|-121.89471499999999|           Japantown|


In [5]:
weatherDF.show(n=2)

+--------------------+-----------+---------+------+---------------+------------------+------------+-----------------------------+-----------------+--------------------+------------------+----------------+-------------+------------------------------+------------------+---------------------+-------------------+---------------+------------+-----------------------------+-----------------+--------------------+--------------------+----------------+--------+
|                 _id|cloud_cover|     date|events|max_dew_point_f|max_gust_speed_mph|max_humidity|max_sea_level_pressure_inches|max_temperature_f|max_visibility_miles|max_wind_Speed_mph|mean_dew_point_f|mean_humidity|mean_sea_level_pressure_inches|mean_temperature_f|mean_visibility_miles|mean_wind_speed_mph|min_dew_point_f|min_humidity|min_sea_level_pressure_inches|min_temperature_f|min_visibility_miles|precipitation_inches|wind_dir_degrees|zip_code|
+--------------------+-----------+---------+------+---------------+------------------+--

In [6]:
tripDF.show(n=5)

+--------------------+-------+--------+---------------+--------------+--------------------+----+---------------+----------------+--------------------+-----------------+--------+
|                 _id|bike_id|duration|       end_date|end_station_id|    end_station_name|  id|     start_date|start_station_id|  start_station_name|subscription_type|zip_code|
+--------------------+-------+--------+---------------+--------------+--------------------+----+---------------+----------------+--------------------+-----------------+--------+
|[5a5d4ec891bd3626...|    520|      63|8/29/2013 14:14|            66|South Van Ness at...|4576|8/29/2013 14:13|              66|South Van Ness at...|       Subscriber|   94127|
|[5a5d4ec891bd3626...|    661|      70|8/29/2013 14:43|            10|  San Jose City Hall|4607|8/29/2013 14:42|              10|  San Jose City Hall|       Subscriber|   95138|
|[5a5d4ec891bd3626...|     48|      71|8/29/2013 10:17|            27|Mountain View Cit...|4130|8/29/2013 10:1

In [7]:
statusDF.columns

['_id', 'bikes_available', 'docks_available', 'station_id', 'time']

In [8]:
tripDF.columns

['_id',
 'bike_id',
 'duration',
 'end_date',
 'end_station_id',
 'end_station_name',
 'id',
 'start_date',
 'start_station_id',
 'start_station_name',
 'subscription_type',
 'zip_code']

## Adding features 

### Status Table

##### Weekday/Weekend 

In [9]:
#Adding day of week column
statusDF = statusDF.withColumn('dayofweek',date_format(from_unixtime(unix_timestamp(statusDF["time"][0:10], 'yyyy/MM/dd')),'EEEE'))

In [10]:
#Cleaning dayofweek column
statusDF = statusDF.withColumn("dayofweek", 
                    when(col("dayofweek").isNull(), date_format(from_unixtime(unix_timestamp(statusDF["time"][0:10], 'yyyy-MM-dd')),'EEEE')).
                        otherwise(col('dayofweek')))

In [11]:
statusDF.select('time','dayofweek').show(n=5)

+-------------------+---------+
|               time|dayofweek|
+-------------------+---------+
|2013/08/29 12:06:01| Thursday|
|2013/08/29 12:07:01| Thursday|
|2013/08/29 12:08:01| Thursday|
|2013/08/29 12:09:01| Thursday|
|2013/08/29 12:10:01| Thursday|
+-------------------+---------+
only showing top 5 rows



In [12]:
#Adding weekend column
statusDF = statusDF.withColumn("weekend", when(col('dayofweek') == 'Saturday',1).when(col('dayofweek') == 'Sunday', 1).otherwise(0))
#Adding weekday column
statusDF = statusDF.withColumn("weekday", when(col('dayofweek') == 'Saturday',0).when(col('dayofweek') == 'Sunday', 0).otherwise(1))

In [13]:
#Checking output
statusDF.select('time','dayofweek','weekend','weekday').where(statusDF.dayofweek == "Sunday").show(n=1)

+-------------------+---------+-------+-------+
|               time|dayofweek|weekend|weekday|
+-------------------+---------+-------+-------+
|2013/09/01 00:00:02|   Sunday|      1|      0|
+-------------------+---------+-------+-------+
only showing top 1 row



##### Hour of Day/Morning/Afternoon/Evening/Night 

In [14]:
#Adding hourofday column
statusDF = statusDF.withColumn('hourofday',statusDF["time"][12:2].cast(IntegerType()))

We define morning as the time between 5am and 12pm, afternoon between 12pm and 5pm, evening between 5pm and 11pm and night between 11pm and 5am.

In [15]:
#Adding morning column
statusDF = statusDF.withColumn("morning", when(col('hourofday').between(5,11),1).otherwise(0))
#Adding afternoon column
statusDF = statusDF.withColumn("afternoon", when(col('hourofday').between(12,16),1).otherwise(0))
#Adding evening column
statusDF = statusDF.withColumn("evening", when(col('hourofday').between(17,22),1).otherwise(0))
#Adding night column
statusDF = statusDF.withColumn("night", when(col('hourofday').between(23,24), 1).when(col('hourofday').between(0,4),1).otherwise(0))

In [16]:
statusDF.select('time','hourofday','morning','afternoon','evening','night').show(n=5)

+-------------------+---------+-------+---------+-------+-----+
|               time|hourofday|morning|afternoon|evening|night|
+-------------------+---------+-------+---------+-------+-----+
|2013/08/29 12:06:01|       12|      0|        1|      0|    0|
|2013/08/29 12:07:01|       12|      0|        1|      0|    0|
|2013/08/29 12:08:01|       12|      0|        1|      0|    0|
|2013/08/29 12:09:01|       12|      0|        1|      0|    0|
|2013/08/29 12:10:01|       12|      0|        1|      0|    0|
+-------------------+---------+-------+---------+-------+-----+
only showing top 5 rows



##### Month/Year

In [17]:
#Adding month column
statusDF = statusDF.withColumn('month',month(from_unixtime(unix_timestamp(statusDF["time"][0:10], 'yyyy/MM/dd'))))
# Adding year column
statusDF = statusDF.withColumn('year',year(from_unixtime(unix_timestamp(statusDF["time"][0:10], 'yyyy/MM/dd'))))

In [18]:
statusDF.show(n=1)

+--------------------+---------------+---------------+----------+-------------------+---------+-------+-------+---------+-------+---------+-------+-----+-----+----+
|                 _id|bikes_available|docks_available|station_id|               time|dayofweek|weekend|weekday|hourofday|morning|afternoon|evening|night|month|year|
+--------------------+---------------+---------------+----------+-------------------+---------+-------+-------+---------+-------+---------+-------+-----+-----+----+
|[5a5d49a291bd3626...|              2|             25|         2|2013/08/29 12:06:01| Thursday|      0|      1|       12|      0|        1|      0|    0|    8|2013|
+--------------------+---------------+---------------+----------+-------------------+---------+-------+-------+---------+-------+---------+-------+-----+-----+----+
only showing top 1 row



In [19]:
#Cleaning month column
statusDF = statusDF.withColumn("month", 
                    when(col("month").isNull(), month(from_unixtime(unix_timestamp(statusDF["time"][0:10], 'yyyy-MM-dd')))).
                        otherwise(col('month')))
#Cleaning year column
statusDF = statusDF.withColumn("year", 
                    when(col("year").isNull(), year(from_unixtime(unix_timestamp(statusDF["time"][0:10], 'yyyy-MM-dd')))).
                        otherwise(col('year')))

In [20]:
#Features so far
statusDF.select('station_id', 'weekend', 'weekday', 'morning', 'afternoon', 'evening', 'night', 'hourofday', 'month','year').show(n=10)

+----------+-------+-------+-------+---------+-------+-----+---------+-----+----+
|station_id|weekend|weekday|morning|afternoon|evening|night|hourofday|month|year|
+----------+-------+-------+-------+---------+-------+-----+---------+-----+----+
|         2|      0|      1|      0|        1|      0|    0|       12|    8|2013|
|         2|      0|      1|      0|        1|      0|    0|       12|    8|2013|
|         2|      0|      1|      0|        1|      0|    0|       12|    8|2013|
|         2|      0|      1|      0|        1|      0|    0|       12|    8|2013|
|         2|      0|      1|      0|        1|      0|    0|       12|    8|2013|
|         2|      0|      1|      0|        1|      0|    0|       12|    8|2013|
|         2|      0|      1|      0|        1|      0|    0|       12|    8|2013|
|         2|      0|      1|      0|        1|      0|    0|       12|    8|2013|
|         2|      0|      1|      0|        1|      0|    0|       12|    8|2013|
|         2|    

In [21]:
statusDF.take(1)

[Row(_id=Row(oid='5a5d49a291bd3626222c4ba7'), bikes_available=2, docks_available=25, station_id=2, time='2013/08/29 12:06:01', dayofweek='Thursday', weekend=0, weekday=1, hourofday=12, morning=0, afternoon=1, evening=0, night=0, month=8, year=2013)]

In [22]:
#sqlContext.sql("drop table if exists statusDF")
#statusDF.write.saveAsTable('statusDF')

In [23]:
sqlContext.sql("select * from statusDF limit 1").take(1)

[Row(_id=Row(oid='5a5d4b6591bd362622b233ec'), bikes_available=4, docks_available=11, station_id=16, time='2015-01-09 18:04:02', dayofweek='Friday', weekend=0, weekday=1, hourofday=18, morning=0, afternoon=0, evening=1, night=0, month=1, year=2015)]

##### Average number of bikes/docks available

In [24]:
statusDF_avg = sqlContext.sql("""
SELECT station_id, weekend, weekday, hourofday, month, year, dayofweek, morning, afternoon, evening, night,
avg(bikes_available) AS avg_bikes_available, 
avg(docks_available) AS avg_docks_available
FROM statusDF
GROUP BY 1,2,3,4,5,6,7,8,9,10,11
""")

In [25]:
statusDF_avg.take(1)

[Row(station_id=16, weekend=0, weekday=1, hourofday=9, month=2, year=2015, dayofweek='Monday', morning=1, afternoon=0, evening=0, night=0, avg_bikes_available=6.629166666666666, avg_docks_available=8.370833333333334)]

### Weather Table

##### Events

We define events with "fog" as 1, "rain" as 2, "fog-rain" as 3 and "rain-thunderstorms" as 4.

In [56]:
weatherDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/msan697.weather").load()


In [57]:
weatherDF = weatherDF.withColumn("events", when(col('events') == 'Fog', 1).\
                                 when(col('events').like ('%ain'),2).\
                                 when(col('events') == 'Fog-Rain',3).\
                                 when(col('events') == 'Rain-Thunderstorm',4).\
                                 otherwise(0))

##### Day of week/Weekend/Weekday/Month/Year

In [58]:
#Adding day of week column
weatherDF = weatherDF.withColumn('dayofweek',date_format(from_unixtime(unix_timestamp(weatherDF["date"], 'M/dd/yyyy')),'EEEE'))
#Adding weekend column
weatherDF = weatherDF.withColumn("weekend", when(col('dayofweek') == 'Saturday',1).when(col('dayofweek') == 'Sunday', 1).otherwise(0))
#Adding weekday column
weatherDF = weatherDF.withColumn("weekday", when(col('dayofweek') == 'Saturday',0).when(col('dayofweek') == 'Sunday', 0).otherwise(1))
#Adding month column
weatherDF = weatherDF.withColumn('month',month(from_unixtime(unix_timestamp(weatherDF["date"], 'M/dd/yyyy'))))
# Adding year column
weatherDF = weatherDF.withColumn('year',year(from_unixtime(unix_timestamp(weatherDF["date"], 'M/dd/yyyy'))))

In [59]:
weatherDF.select('date','dayofweek','weekend','weekday','month','year').show(n=5)

+---------+---------+-------+-------+-----+----+
|     date|dayofweek|weekend|weekday|month|year|
+---------+---------+-------+-------+-----+----+
|8/29/2013| Thursday|      0|      1|    8|2013|
| 9/2/2013|   Monday|      0|      1|    9|2013|
| 9/3/2013|  Tuesday|      0|      1|    9|2013|
| 9/4/2013|Wednesday|      0|      1|    9|2013|
| 9/5/2013| Thursday|      0|      1|    9|2013|
+---------+---------+-------+-------+-----+----+
only showing top 5 rows



##### Averaged weather variables

In [62]:
weatherDF = weatherDF.drop("_id", "date").groupBy("zip_code", "dayofweek", "month","year","weekend","weekday").agg(avg("max_temperature_f").alias("max_temperature_f"), 
                                                                                                                   avg("mean_temperature_f").alias("mean_temperature_f"),
                                                                                                                   avg("min_temperature_f").alias("min_temperature_f"), 
                                                                                                                   avg("max_dew_point_f").alias("max_dew_point_f"),
                                                                                                                   avg("mean_dew_point_f").alias("mean_dew_point_f"), 
                                                                                                                   avg("min_dew_point_f").alias("min_dew_point_f"),
                                                                                                                   avg("max_humidity").alias("max_humidity"), 
                                                                                                                   avg("mean_humidity").alias("mean_humidity"),
                                                                                                                   avg("min_humidity").alias("min_humidity"), 
                                                                                                                   avg("max_sea_level_pressure_inches").alias("max_sea_level_pressure_inches"),
                                                                                                                   avg("mean_sea_level_pressure_inches").alias("mean_sea_level_pressure_inches"),
                                                                                                                   avg("min_sea_level_pressure_inches").alias("min_sea_level_pressure_inches"),
                                                                                                                   avg("max_visibility_miles").alias("max_visibility_miles"), 
                                                                                                                   avg("mean_visibility_miles").alias("mean_visibility_miles"),
                                                                                                                   avg("min_visibility_miles").alias("min_visibility_miles"), 
                                                                                                                   avg("max_wind_Speed_mph").alias("max_wind_Speed_mph"),
                                                                                                                   avg("mean_wind_speed_mph").alias("mean_wind_speed_mph"), 
                                                                                                                   avg("max_gust_speed_mph").alias("max_gust_speed_mph"),
                                                                                                                   avg("precipitation_inches").alias("precipitation_inches"), 
                                                                                                                   avg("cloud_cover").alias("cloud_cover"),
                                                                                                                   avg("wind_dir_degrees").alias("wind_dir_degrees"),
                                                                                                                   avg("events").alias("events"))

IndentationError: unexpected indent (<ipython-input-62-2f7d0d6a9f2a>, line 2)

In [61]:
weatherDF.show(n=1)

+--------------------+-----------+---------+------+---------------+------------------+------------+-----------------------------+-----------------+--------------------+------------------+----------------+-------------+------------------------------+------------------+---------------------+-------------------+---------------+------------+-----------------------------+-----------------+--------------------+--------------------+----------------+--------+---------+-------+-------+-----+----+
|                 _id|cloud_cover|     date|events|max_dew_point_f|max_gust_speed_mph|max_humidity|max_sea_level_pressure_inches|max_temperature_f|max_visibility_miles|max_wind_Speed_mph|mean_dew_point_f|mean_humidity|mean_sea_level_pressure_inches|mean_temperature_f|mean_visibility_miles|mean_wind_speed_mph|min_dew_point_f|min_humidity|min_sea_level_pressure_inches|min_temperature_f|min_visibility_miles|precipitation_inches|wind_dir_degrees|zip_code|dayofweek|weekend|weekday|month|year|
+-------------

### Station Table

##### Station Age

In [None]:
# add age of the docks
stationDF = stationDF.withColumn('age', \
               datediff(from_unixtime(unix_timestamp(date_format(current_date(), "M/d/y"), 'MM/dd/yyy')),\
                              from_unixtime(unix_timestamp(stationDF['installation_date'], 'MM/dd/yyy'))))

In [None]:
stationDF.take(1)

### Trip Table

##### Day of week/Hour/Weekend/Weekday/Month/Year

In [None]:
tripDF = tripDF.withColumn('start_date', concat(col('start_date'),lit(':00'))).withColumn('end_date', concat(col('end_date'),lit(':00')))

In [None]:
tripDF = tripDF.withColumn('dayofweek',date_format(from_unixtime(unix_timestamp('start_date', 'MM/dd/yyy HH:mm:ss')),'EEEE'))\
.withColumn("weekend", when(col('dayofweek') == 'Saturday',1).when(col('dayofweek') == 'Sunday', 1).otherwise(0))\
.withColumn("weekday", when(col('dayofweek') == 'Saturday',0).when(col('dayofweek') == 'Sunday', 0).otherwise(1))\
.withColumn('hourofday',hour(from_unixtime(unix_timestamp('start_date', 'MM/dd/yyy HH:mm:ss'))))\
.withColumn('month',month(from_unixtime(unix_timestamp('start_date', 'MM/dd/yyy HH:mm:ss'))))\
.withColumn('year',year(from_unixtime(unix_timestamp('start_date', 'MM/dd/yyy HH:mm:ss'))))

In [None]:
tripDF.take(1)

##### Number of outgoing/incoming bikes at a station

In [None]:
outgoing_bikesDF = tripDF.groupBy('zip_code', 'start_station_id', 'hourofday', 'dayofweek', 'weekend', 'weekday', 'month', 'year').agg(count('*').alias('outgoing_bikes_count'))

In [None]:
incoming_bikesDF = tripDF.groupBy('zip_code', 'end_station_id', 'hourofday' , 'dayofweek', 'weekend', 'weekday', 'month', 'year').agg(count('*').alias('incoming_bikes_count'))

In [None]:
#sqlContext.sql("drop table if exists outgoing_bikesDF")
#sqlContext.sql("drop table if exists incoming_bikesDF")
#outgoing_bikesDF.write.saveAsTable("outgoing_bikesDF")
#incoming_bikesDF.write.saveAsTable("incoming_bikesDF")

In [None]:
incoming_bikesDF.take(1)

In [None]:
tripDF_final = sqlContext.sql("""
SELECT outgoing_bikesDF.zip_code AS zip_code,
start_station_id,
end_station_id,
outgoing_bikesDF.hourofday AS hourofday,
outgoing_bikesDF.dayofweek AS dayofweek,
outgoing_bikesDF.weekend AS weekend,
outgoing_bikesDF.weekday AS weekday,
outgoing_bikesDF.month AS month,
outgoing_bikesDF.year AS year,
outgoing_bikes_count,
incoming_bikes_count
FROM outgoing_bikesDF LEFT JOIN incoming_bikesDF 
ON outgoing_bikesDF.start_station_id = incoming_bikesDF.end_station_id
AND outgoing_bikesDF.hourofday = incoming_bikesDF.hourofday
AND outgoing_bikesDF.dayofweek = incoming_bikesDF.dayofweek
AND outgoing_bikesDF.weekend = incoming_bikesDF.weekend
AND outgoing_bikesDF.weekday = incoming_bikesDF.weekday
AND outgoing_bikesDF.month = incoming_bikesDF.month
AND outgoing_bikesDF.year = incoming_bikesDF.year
""")

##### Incoming traffic

We define incoming traffic as the *number of incoming bikes - number of outgoing bikes*.

In [None]:
tripDF_final = tripDF_final.withColumn("incoming_traffic", tripDF_final["incoming_bikes_count"] - tripDF_final["outgoing_bikes_count"])

### Joining the tables

We first join the *trips* table with *station* table based on the end station of the trips.

In [None]:
joined_df = stationDF.join(tripDF_final, stationDF.id == tripDF_final.end_station_id, how='left')

In [None]:
joined_df.take(1)

In [None]:
joined_df = joined_df.drop("start_station_id", "end_station_id", "_id", "installation_date")

In [None]:
joined_df = joined_df.withColumnRenamed("id", "station_id").withColumnRenamed("name", "station_name")

In [None]:
joined_df.take(1)

Next, we join the *joined trips* table, with *status* table based on the station id, hour, day of week, weekend/weekday, month and year. 

In [None]:
joined_df2 = joined_df.join(statusDF_avg, ["station_id", "hourofday", "dayofweek", "weekend", "weekday", "month", "year"])

In [None]:
joined_df2 = joined_df2.withColumnRenamed("dock_count", "total_capacity").withColumnRenamed("age", "station_age")

Finally, we join the *weather* table to the above joined table.

In [None]:
final_joined = joined_df2.join(weatherDF, ["zip_code", "dayofweek", "weekend", "weekday", "month", "year"])

In [None]:
final_joined.take(1)

In [None]:
#Final features
final_joined.columns