In [1]:
sc

In [7]:
spark.conf.set("spark-metastore","../")

In [8]:
ls -ltr ../../data/bike-data



total 42016
-rw-rw-r-- 1 niranjan niranjan     5272 Mar 11 12:00 201508_station_data.csv
-rw-rw-r-- 1 niranjan niranjan 43012650 Mar 11 12:00 201508_trip_data.csv


In [15]:
less ../../data/bike-data/201508_trip_data.csv

In [28]:
trip_raw_df = spark.read \
                .format("csv") \
                .option("header","true") \
                .option("inferSchema","true") \
                .load("../../data/bike-data/201508_trip_data.csv")

In [29]:
station_raw_df = spark.read \
                    .format("csv") \
                    .option("header","true") \
                    .option("inferSchema","true") \
                    .load("../../data/bike-data/201508_station_data.csv")

In [30]:
trip_raw_df.printSchema()

root
 |-- Trip ID: integer (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Start Station: string (nullable = true)
 |-- Start Terminal: integer (nullable = true)
 |-- End Date: string (nullable = true)
 |-- End Station: string (nullable = true)
 |-- End Terminal: integer (nullable = true)
 |-- Bike #: integer (nullable = true)
 |-- Subscriber Type: string (nullable = true)
 |-- Zip Code: string (nullable = true)



In [31]:
station_raw_df.printSchema()

root
 |-- station_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dockcount: integer (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: string (nullable = true)



In [34]:
from pyspark.sql.functions import col

trip_columns = [col(i).alias(i.lower().replace(" #","").replace(" ","_")) for i in trip_raw_df.columns]
trip_df = trip_raw_df.select(trip_columns)
trip_df.printSchema()


root
 |-- trip_id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_date: string (nullable = true)
 |-- start_station: string (nullable = true)
 |-- start_terminal: integer (nullable = true)
 |-- end_date: string (nullable = true)
 |-- end_station: string (nullable = true)
 |-- end_terminal: integer (nullable = true)
 |-- bike: integer (nullable = true)
 |-- subscriber_type: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [35]:
station_columns = [col(i).alias(i.lower().replace(" ","_")) for i in station_raw_df.columns]
station_df = station_raw_df.select(station_columns)
station_df.printSchema()

root
 |-- station_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dockcount: integer (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: string (nullable = true)



In [47]:
trip_df.show(2,truncate=False)

Using matplotlib backend: Qt5Agg
Populating the interactive namespace from numpy and matplotlib
+-------+--------+---------------+------------------------------------+--------------+---------------+----------------------------------------+------------+----+---------------+--------+
|trip_id|duration|start_date     |start_station                       |start_terminal|end_date       |end_station                             |end_terminal|bike|subscriber_type|zip_code|
+-------+--------+---------------+------------------------------------+--------------+---------------+----------------------------------------+------------+----+---------------+--------+
|913460 |765     |8/31/2015 23:26|Harry Bridges Plaza (Ferry Building)|50            |8/31/2015 23:39|San Francisco Caltrain (Townsend at 4th)|70          |288 |Subscriber     |2139    |
|913459 |1036    |8/31/2015 23:11|San Antonio Shopping Center         |31            |8/31/2015 23:28|Mountain View City Hall                 |27          |

In [48]:
station_df.show(2,False)

+----------+---------------------------------+---------+-----------+---------+--------+------------+
|station_id|name                             |lat      |long       |dockcount|landmark|installation|
+----------+---------------------------------+---------+-----------+---------+--------+------------+
|2         |San Jose Diridon Caltrain Station|37.329732|-121.901782|27       |San Jose|8/6/2013    |
|3         |San Jose Civic Center            |37.330698|-121.888979|15       |San Jose|8/5/2013    |
+----------+---------------------------------+---------+-----------+---------+--------+------------+
only showing top 2 rows



In [50]:
from pyspark.sql.functions import desc
most_visited_df = trip_df.groupBy("end_station")\
                            .count()\
                            .withColumnRenamed("count","visits")\
                            .sort(desc("visits"))
        

In [51]:
most_visited_df.explain()

== Physical Plan ==
*Sort [visits#423L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(visits#423L DESC NULLS LAST, 200)
   +- *HashAggregate(keys=[end_station#199], functions=[count(1)])
      +- Exchange hashpartitioning(end_station#199, 200)
         +- *HashAggregate(keys=[end_station#199], functions=[partial_count(1)])
            +- *Project [End Station#126 AS end_station#199]
               +- *FileScan csv [End Station#126] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/niranjan/working/BigDataWorkspace/data/bike-data/201508_trip_data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<End Station:string>


In [53]:
most_visited_df.show(truncate=False)

+---------------------------------------------+------+
|end_station                                  |visits|
+---------------------------------------------+------+
|San Francisco Caltrain (Townsend at 4th)     |34810 |
|San Francisco Caltrain 2 (330 Townsend)      |22523 |
|Harry Bridges Plaza (Ferry Building)         |17810 |
|2nd at Townsend                              |15463 |
|Townsend at 7th                              |15422 |
|Embarcadero at Sansome                       |15065 |
|Market at Sansome                            |13916 |
|Steuart at Market                            |13617 |
|Temporary Transbay Terminal (Howard at Beale)|12966 |
|Powell Street BART                           |10239 |
|Market at 10th                               |10220 |
|Market at 4th                                |9685  |
|2nd at South Park                            |8253  |
|5th at Howard                                |8147  |
|Civic Center BART (7th at Market)            |7714  |
|Howard at

In [90]:
from pyspark.sql.functions import count,countDistinct
most_visited_landmark_df = trip_df.join(station_df, col("end_terminal")==col("station_id"),"inner")\
                                .groupBy("landmark","subscriber_type")\
                                .agg(count("station_id"),countDistinct("end_terminal"))\
                                .sort(desc("count(station_id)")) 
                            

In [91]:
most_visited_landmark_df.explain()

== Physical Plan ==
*Sort [count(station_id)#2395L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(count(station_id)#2395L DESC NULLS LAST, 200)
   +- *HashAggregate(keys=[landmark#221, subscriber_type#202], functions=[count(station_id#216), count(distinct end_terminal#200)])
      +- Exchange hashpartitioning(landmark#221, subscriber_type#202, 200)
         +- *HashAggregate(keys=[landmark#221, subscriber_type#202], functions=[merge_count(station_id#216), partial_count(distinct end_terminal#200)])
            +- *HashAggregate(keys=[landmark#221, subscriber_type#202, end_terminal#200], functions=[merge_count(station_id#216)])
               +- Exchange hashpartitioning(landmark#221, subscriber_type#202, end_terminal#200, 200)
                  +- *HashAggregate(keys=[landmark#221, subscriber_type#202, end_terminal#200], functions=[partial_count(station_id#216)])
                     +- *BroadcastHashJoin [end_terminal#200], [station_id#216], Inner, BuildRight
                 

In [76]:
trip_df.cache()
station_df.cache()

DataFrame[station_id: int, name: string, lat: double, long: double, dockcount: int, landmark: string, installation: string]

In [92]:
most_visited_landmark_df.show(truncate=False)

+-------------+---------------+-----------------+----------------------------+
|landmark     |subscriber_type|count(station_id)|count(DISTINCT end_terminal)|
+-------------+---------------+-----------------+----------------------------+
|San Francisco|Subscriber     |282477           |35                          |
|San Francisco|Customer       |38633            |35                          |
|San Jose     |Subscriber     |15440            |16                          |
|Mountain View|Subscriber     |8751             |7                           |
|San Jose     |Customer       |2515             |16                          |
|Palo Alto    |Subscriber     |1878             |5                           |
|Redwood City |Subscriber     |1671             |7                           |
|Palo Alto    |Customer       |1235             |5                           |
|Mountain View|Customer       |1228             |7                           |
|Redwood City |Customer       |324              |7  