In [1]:
# Importing the Libraries
from graphframes import *
from functools import reduce
from pyspark.sql.functions import col, lit, when, concat, desc
from pyspark import SparkContext, SQLContext

In [2]:
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = ("--packages  graphframes:graphframes:0.5.0-spark2.0-s_2.11 pyspark-shell")

In [3]:
# Creating Spark Context instance
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [4]:
# 1. Reading Datafile
station_data_df= sqlContext.read.format("csv").option("header", "true").csv('201508_station_data.csv')
trips_df= sqlContext.read.format("csv").option("header", "true").csv('201508_trip_data.csv')

In [5]:
station_data_df.show()

+----------+--------------------+---------+-----------+---------+------------+------------+
|station_id|                name|      lat|       long|dockcount|    landmark|installation|
+----------+--------------------+---------+-----------+---------+------------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|    San Jose|    8/6/2013|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|    San Jose|    8/5/2013|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|    San Jose|    8/6/2013|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|    San Jose|    8/5/2013|
|         6|    San Pedro Square|37.336721|-121.894074|       15|    San Jose|    8/7/2013|
|         7|Paseo de San Antonio|37.333798|-121.886943|       15|    San Jose|    8/7/2013|
|         8| San Salvador at 1st|37.330165|-121.885831|       15|    San Jose|    8/5/2013|
|         9|           Japantown|37.348742|-121.894715|       15|    San Jose|  

In [6]:
trips_df.show()

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|       Start Station|Start Terminal|       End Date|         End Station|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913460|     765|8/31/2015 23:26|Harry Bridges Pla...|            50|8/31/2015 23:39|San Francisco Cal...|          70|   288|     Subscriber|    2139|
| 913459|    1036|8/31/2015 23:11|San Antonio Shopp...|            31|8/31/2015 23:28|Mountain View Cit...|          27|    35|     Subscriber|   95032|
| 913455|     307|8/31/2015 23:13|      Post at Kearny|            47|8/31/2015 23:18|   2nd at South Park|          64|   468|     Subscriber|   94107|
| 913454|     409|8/31/2015 23:10|  San Jose City Hall|            10|8/31/2015 23

In [59]:
from pyspark.sql.types import IntegerType
station_data_df = station_data_df.withColumn("dockcount",station_data_df["dockcount"].cast(IntegerType()))
station_data_df

DataFrame[station_id: string, name: string, lat: string, long: string, dockcount: int, landmark: string, installation: string]

In [60]:
final_trips_df = trips_df.withColumn("Duration",trips_df["Duration"].cast(IntegerType()))
final_trips_df

DataFrame[Trip ID: string, Duration: int, Start Date: string, Start Station: string, Start Terminal: string, End Date: string, End Station: string, End Terminal: string, Bike #: string, Subscriber Type: string, Zip Code: string]

In [29]:
# Creating Vertices
vertices = station_data_df.withColumnRenamed("name","id").distinct()
vertices.show()

+----------+--------------------+---------+-----------+---------+-------------+------------+
|station_id|                  id|      lat|       long|dockcount|     landmark|installation|
+----------+--------------------+---------+-----------+---------+-------------+------------+
|        56|     Beale at Market|37.792251|-122.397086|       19|San Francisco|   8/20/2013|
|        84|         Ryland Park|37.342725|-121.895617|       15|     San Jose|    4/9/2014|
|         7|Paseo de San Antonio|37.333798|-121.886943|       15|     San Jose|    8/7/2013|
|        27|Mountain View Cit...|37.389218|-122.081896|       15|Mountain View|   8/16/2013|
|        12|SJSU 4th at San C...|37.332808|-121.883891|       19|     San Jose|    8/7/2013|
|        31|San Antonio Shopp...|37.400443|-122.108338|       15|Mountain View|  12/31/2013|
|        72|Civic Center BART...|37.781039|-122.411748|       23|San Francisco|   8/23/2013|
|        74|   Steuart at Market|37.794139|-122.394434|       23|San F

In [31]:
# Creating Edges
edges=final_trips_df.withColumnRenamed("Start Station","src").withColumnRenamed("End Station","dst")
edges.show()

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|                 src|Start Terminal|       End Date|                 dst|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913460|     765|8/31/2015 23:26|Harry Bridges Pla...|            50|8/31/2015 23:39|San Francisco Cal...|          70|   288|     Subscriber|    2139|
| 913459|    1036|8/31/2015 23:11|San Antonio Shopp...|            31|8/31/2015 23:28|Mountain View Cit...|          27|    35|     Subscriber|   95032|
| 913455|     307|8/31/2015 23:13|      Post at Kearny|            47|8/31/2015 23:18|   2nd at South Park|          64|   468|     Subscriber|   94107|
| 913454|     409|8/31/2015 23:10|  San Jose City Hall|            10|8/31/2015 23

In [32]:
edges

DataFrame[Trip ID: string, Duration: int, Start Date: string, src: string, Start Terminal: string, End Date: string, dst: string, End Terminal: string, Bike #: string, Subscriber Type: string, Zip Code: string]

In [33]:
# Creating Graph
graph = GraphFrame(vertices, edges)
graph

GraphFrame(v:[id: string, station_id: string ... 5 more fields], e:[src: string, dst: string ... 9 more fields])

In [34]:
# Vertex InDegree
graph.inDegrees.show()

+--------------------+--------+
|                  id|inDegree|
+--------------------+--------+
|       2nd at Folsom|     676|
|California Ave Ca...|      93|
|Powell at Post (U...|     576|
| Golden Gate at Polk|     470|
|Yerba Buena Cente...|     741|
|   Market at Sansome|    1928|
|     Spear at Folsom|     844|
|         MLK Library|     119|
|           Japantown|     146|
|Commercial at Mon...|     819|
|Paseo de San Antonio|     205|
| San Salvador at 1st|      68|
|Rengstorff Avenue...|      95|
|     Townsend at 7th|    2119|
|Civic Center BART...|     889|
|         Ryland Park|     127|
|San Jose Diridon ...|     709|
|San Jose Civic Ce...|      62|
|          Mezes Park|      16|
|                null|       1|
+--------------------+--------+
only showing top 20 rows



In [35]:
# Vertex OutDegree
graph.outDegrees.show()

+--------------------+---------+
|                  id|outDegree|
+--------------------+---------+
|       2nd at Folsom|     1125|
|California Ave Ca...|       83|
|Powell at Post (U...|      850|
| Golden Gate at Polk|      572|
|Yerba Buena Cente...|      627|
|   Market at Sansome|     1585|
|     Spear at Folsom|      848|
|         MLK Library|      155|
|           Japantown|      157|
|Commercial at Mon...|      850|
|Paseo de San Antonio|      179|
| San Salvador at 1st|       55|
|Rengstorff Avenue...|      117|
|     Townsend at 7th|     1915|
|Civic Center BART...|     1016|
|         Ryland Park|      144|
|San Jose Diridon ...|      702|
|San Jose Civic Ce...|       94|
|          Mezes Park|       29|
|SJSU - San Salvad...|      130|
+--------------------+---------+
only showing top 20 rows



In [15]:
# Concatnate the chunks
concat = station_data_df.select(concat("lat", lit(","),"long")).alias("location")
concat.show()

+--------------------+
|concat(lat, ,, long)|
+--------------------+
|37.329732,-121.90...|
|37.330698,-121.88...|
|37.333988,-121.89...|
| 37.331415,-121.8932|
|37.336721,-121.89...|
|37.333798,-121.88...|
|37.330165,-121.88...|
|37.348742,-121.89...|
|37.337391,-121.88...|
|37.335885,-121.88566|
|37.332808,-121.88...|
|37.339301,-121.88...|
|37.332692,-121.90...|
|37.333955,-121.87...|
|37.481758,-122.22...|
|37.486078,-122.23...|
|37.487616,-122.22...|
|37.484219,-122.22...|
|37.48537,-122.203288|
|37.487682,-122.22...|
+--------------------+
only showing top 20 rows



In [16]:
# Remove Duplicates
station_data_df.dropDuplicates().show()

+----------+--------------------+---------+-----------+---------+-------------+------------+
|station_id|                name|      lat|       long|dockcount|     landmark|installation|
+----------+--------------------+---------+-----------+---------+-------------+------------+
|        62|       2nd at Folsom|37.785299|-122.396236|       19|San Francisco|   8/22/2013|
|        29|San Antonio Caltr...| 37.40694|-122.106758|       23|Mountain View|   8/15/2013|
|         9|           Japantown|37.348742|-121.894715|       15|     San Jose|    8/5/2013|
|        12|SJSU 4th at San C...|37.332808|-121.883891|       19|     San Jose|    8/7/2013|
|        23|San Mateo County ...|37.487616|-122.229951|       15| Redwood City|   8/15/2013|
|        39|  Powell Street BART|37.783871|-122.408433|       19|San Francisco|   8/25/2013|
|        31|San Antonio Shopp...|37.400443|-122.108338|       15|Mountain View|  12/31/2013|
|        56|     Beale at Market|37.792251|-122.397086|       19|San F

In [36]:
# Renaming the Columns
final_station_df = station_data_df.withColumnRenamed("name","Station Name").withColumnRenamed("station_id","Station Id")

In [18]:
final_station_df.show()

+----------+--------------------+---------+-----------+---------+------------+------------+
|Station Id|        Station Name|      lat|       long|dockcount|    landmark|installation|
+----------+--------------------+---------+-----------+---------+------------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|    San Jose|    8/6/2013|
|         3|San Jose Civic Ce...|37.330698|-121.888979|       15|    San Jose|    8/5/2013|
|         4|Santa Clara at Al...|37.333988|-121.894902|       11|    San Jose|    8/6/2013|
|         5|    Adobe on Almaden|37.331415|  -121.8932|       19|    San Jose|    8/5/2013|
|         6|    San Pedro Square|37.336721|-121.894074|       15|    San Jose|    8/7/2013|
|         7|Paseo de San Antonio|37.333798|-121.886943|       15|    San Jose|    8/7/2013|
|         8| San Salvador at 1st|37.330165|-121.885831|       15|    San Jose|    8/5/2013|
|         9|           Japantown|37.348742|-121.894715|       15|    San Jose|  

In [37]:
# Motif Findings

# edge between a, b and also edge between b,a
motifs = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
motifs.show()

+--------------------+--------------------+--------------------+--------------------+
|                   a|                   e|                   b|                  e2|
+--------------------+--------------------+--------------------+--------------------+
|[50, Harry Bridge...|[913460, 765, 8/3...|[70, San Francisc...|[846562, 693, 7/1...|
|[50, Harry Bridge...|[913460, 765, 8/3...|[70, San Francisc...|[847142, 812, 7/1...|
|[50, Harry Bridge...|[913460, 765, 8/3...|[70, San Francisc...|[847998, 592, 7/1...|
|[50, Harry Bridge...|[913460, 765, 8/3...|[70, San Francisc...|[848471, 580, 7/1...|
|[50, Harry Bridge...|[913460, 765, 8/3...|[70, San Francisc...|[848858, 709, 7/1...|
|[50, Harry Bridge...|[913460, 765, 8/3...|[70, San Francisc...|[848963, 587, 7/1...|
|[50, Harry Bridge...|[913460, 765, 8/3...|[70, San Francisc...|[848970, 659, 7/1...|
|[50, Harry Bridge...|[913460, 765, 8/3...|[70, San Francisc...|[849070, 509, 7/1...|
|[50, Harry Bridge...|[913460, 765, 8/3...|[70, San Fr

In [38]:
# Stateful Queries
# Get Edges with duration greater than 500
graph.edges.filter("duration > 764").sort("duration").show()

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|                 src|Start Terminal|       End Date|                 dst|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913296|     765|8/31/2015 19:20|       Howard at 2nd|            63|8/31/2015 19:33|     Townsend at 7th|          65|   564|     Subscriber|   94116|
| 864402|     765|7/27/2015 20:20|Mountain View Cit...|            27|7/27/2015 20:32|Rengstorff Avenue...|          33|   715|       Customer|     nil|
| 865412|     765|7/28/2015 14:42|Broadway St at Ba...|            82|7/28/2015 14:55|     2nd at Townsend|          61|   425|     Subscriber|   94025|
| 864740|     765| 7/28/2015 8:12|San Francisco Cal...|            69| 7/28/2015 8

In [39]:
# Subgraphs
subgraph_vertices = graph.vertices.filter("dockcount > 25")
subgraph_edges = graph.edges.filter("duration > 500")
subgraph = GraphFrame(subgraph_vertices,subgraph_edges)
print(subgraph)

GraphFrame(v:[id: string, station_id: string ... 5 more fields], e:[src: string, dst: string ... 9 more fields])


In [40]:
# Printing Subgraph vertices
subgraph.vertices.show()

+----------+--------------------+---------+-----------+---------+-------------+------------+
|station_id|                  id|      lat|       long|dockcount|     landmark|installation|
+----------+--------------------+---------+-----------+---------+-------------+------------+
|         2|San Jose Diridon ...|37.329732|-121.901782|       27|     San Jose|    8/6/2013|
|        67|      Market at 10th|37.776619|-122.417385|       27|San Francisco|   8/23/2013|
|        61|     2nd at Townsend|37.780526|-122.390288|       27|San Francisco|   8/22/2013|
|        77|   Market at Sansome|37.789625|-122.400811|       27|San Francisco|   8/25/2013|
+----------+--------------------+---------+-----------+---------+-------------+------------+



In [41]:
# Printing subgraph edges
subgraph.edges.show()

+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
|Trip ID|Duration|     Start Date|                 src|Start Terminal|       End Date|                 dst|End Terminal|Bike #|Subscriber Type|Zip Code|
+-------+--------+---------------+--------------------+--------------+---------------+--------------------+------------+------+---------------+--------+
| 913460|     765|8/31/2015 23:26|Harry Bridges Pla...|            50|8/31/2015 23:39|San Francisco Cal...|          70|   288|     Subscriber|    2139|
| 913459|    1036|8/31/2015 23:11|San Antonio Shopp...|            31|8/31/2015 23:28|Mountain View Cit...|          27|    35|     Subscriber|   95032|
| 913453|     789|8/31/2015 23:09|Embarcadero at Fo...|            51|8/31/2015 23:22|Embarcadero at Sa...|          60|   487|       Customer|    9069|
| 913451|     896|8/31/2015 23:07|Embarcadero at Fo...|            51|8/31/2015 23

# Bonus Part

In [42]:
# 1.Vertex degree
graph.degrees.show()

+--------------------+------+
|                  id|degree|
+--------------------+------+
|       2nd at Folsom|  1801|
|California Ave Ca...|   176|
|Powell at Post (U...|  1426|
| Golden Gate at Polk|  1042|
|Yerba Buena Cente...|  1368|
|   Market at Sansome|  3513|
|     Spear at Folsom|  1692|
|         MLK Library|   274|
|           Japantown|   303|
|Commercial at Mon...|  1669|
|Paseo de San Antonio|   384|
| San Salvador at 1st|   123|
|Rengstorff Avenue...|   212|
|     Townsend at 7th|  4034|
|Civic Center BART...|  1905|
|         Ryland Park|   271|
|San Jose Diridon ...|  1411|
|San Jose Civic Ce...|   156|
|          Mezes Park|    45|
|                null|     1|
+--------------------+------+
only showing top 20 rows



In [43]:
# 2.most common destinations
common_destinations = graph.edges.groupBy("src", "dst").count().orderBy(desc("count")).limit(10)
common_destinations.show()

+--------------------+--------------------+-----+
|                 src|                 dst|count|
+--------------------+--------------------+-----+
|     Townsend at 7th|San Francisco Cal...|  558|
|Harry Bridges Pla...|Embarcadero at Sa...|  514|
|San Francisco Cal...|     Townsend at 7th|  465|
|Embarcadero at Sa...|   Steuart at Market|  455|
|     2nd at Townsend|Harry Bridges Pla...|  445|
|Embarcadero at Fo...|San Francisco Cal...|  413|
|  Powell Street BART|San Francisco Cal...|  406|
|Temporary Transba...|San Francisco Cal...|  397|
|      Market at 10th|San Francisco Cal...|  394|
|   Steuart at Market|     2nd at Townsend|  390|
+--------------------+--------------------+-----+



In [55]:
# 3.highest ratio of in degrees but fewest out degrees
inDegrees = graph.inDegrees
outDegrees = graph.outDegrees
degreeRatio = inDegrees.join(outDegrees, inDegrees["id"] == outDegrees["id"]).drop(outDegrees["id"]).selectExpr("id", "double(inDegree)/double(outDegree) as degreeRatio")
degreeRatio.orderBy(desc("degreeRatio")).limit(10).show()

+--------------------+------------------+
|                  id|       degreeRatio|
+--------------------+------------------+
|San Mateo County ...|               3.0|
|Redwood City Medi...|               1.4|
|San Francisco Cal...| 1.343956704750451|
|    San Pedro Square| 1.319327731092437|
|       Park at Olive| 1.309090909090909|
|   Franklin at Maple|1.2857142857142858|
| San Salvador at 1st|1.2363636363636363|
|Stanford in Redwo...|1.2280701754385965|
|   Market at Sansome|1.2164037854889589|
|SJSU 4th at San C...|               1.2|
+--------------------+------------------+



In [58]:
# 4.Save graphs generated to a file.
graph.vertices.coalesce(1).write.csv("vertices")
graph.edges.coalesce(1).write.csv("edges")