In [1]:
'''need to set your environment variables to run pyspark in the Jupyter Notebook (as opposed to the shell). 
This can be achieved easily by adding two environment variables:
set PYSPARK_DRIVER_PYTHON=jupyter
set PYSPARK_DRIVER_PYTHON_OPTS=notebook
Then navigate to the location where you want to store the new notebook and run pyspark again in your shell, 
but add a packages flag and indicate you want to use the GraphFrames package. 
Here, the newest version is used, but any older version can be used by changing the last part of the argument:
pyspark --packages graphframes:graphframes:0.5.0-spark2.1-s_2.1
'''
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Ops').getOrCreate()


In [2]:
from graphframes import *

In [3]:
from graphframes import GraphFrame

In [4]:
bikeStations = spark.read.option("header","True").csv(r"C:\Users\pohch\Desktop\Spark\spark-2.4.4-bin-hadoop2.7\mnt\defg\bike-data\201508_station_data.csv")

In [5]:
tripData = spark.read.option("header","True").csv(r"C:\Users\pohch\Desktop\Spark\spark-2.4.4-bin-hadoop2.7\mnt\defg\bike-data\201508_trip_data.csv")

In [6]:
stationVertices = bikeStations.withColumnRenamed("name", "id").distinct()
tripEdges = tripData.withColumnRenamed("Start Station", "src").withColumnRenamed("End Station", "dst")

In [7]:
stationGraph = GraphFrame(stationVertices, tripEdges)
stationGraph.cache()

GraphFrame(v:[id: string, station_id: string ... 5 more fields], e:[src: string, dst: string ... 9 more fields])

In [8]:
print ("Total Number of Stations:"+str(stationGraph.vertices.count()))
print ("Total Number of Trips in Graph:"+str(stationGraph.edges.count()))
print ("Total Number of Trips in Orginal Data:"+str(tripData.count()))


Total Number of Stations:70
Total Number of Trips in Graph:354152
Total Number of Trips in Orginal Data:354152


In [9]:
stationGraph.triangleCount().describe()

DataFrame[summary: string, count: string, station_id: string, id: string, lat: string, long: string, dockcount: string, landmark: string, installation: string]

In [10]:
from pyspark.sql.functions import desc
stationGraph.edges.groupBy("src","dst").count().orderBy(desc("count")).show(10)

+--------------------+--------------------+-----+
|                 src|                 dst|count|
+--------------------+--------------------+-----+
|San Francisco Cal...|     Townsend at 7th| 3748|
|Harry Bridges Pla...|Embarcadero at Sa...| 3145|
|     2nd at Townsend|Harry Bridges Pla...| 2973|
|     Townsend at 7th|San Francisco Cal...| 2734|
|Harry Bridges Pla...|     2nd at Townsend| 2640|
|Embarcadero at Fo...|San Francisco Cal...| 2439|
|   Steuart at Market|     2nd at Townsend| 2356|
|Embarcadero at Sa...|   Steuart at Market| 2330|
|     Townsend at 7th|San Francisco Cal...| 2192|
|Temporary Transba...|San Francisco Cal...| 2184|
+--------------------+--------------------+-----+
only showing top 10 rows



In [11]:
stationGraph.edges.where("src='Townsend at 7th' OR dst = 'Townsend at 7th'")\
.groupBy("src", "dst")\
.count()\
.orderBy(desc("count"))\
.show(10)

+--------------------+--------------------+-----+
|                 src|                 dst|count|
+--------------------+--------------------+-----+
|San Francisco Cal...|     Townsend at 7th| 3748|
|     Townsend at 7th|San Francisco Cal...| 2734|
|     Townsend at 7th|San Francisco Cal...| 2192|
|     Townsend at 7th|Civic Center BART...| 1844|
|Civic Center BART...|     Townsend at 7th| 1765|
|San Francisco Cal...|     Townsend at 7th| 1198|
|Temporary Transba...|     Townsend at 7th|  834|
|     Townsend at 7th|Harry Bridges Pla...|  827|
|   Steuart at Market|     Townsend at 7th|  746|
|     Townsend at 7th|Temporary Transba...|  740|
+--------------------+--------------------+-----+
only showing top 10 rows



In [12]:
stationGraph.edges.where("src='Townsend at 7th' OR dst = 'Townsend at 7th'")\
.count()

28999

In [13]:
Edges = stationGraph.edges.toPandas()

In [14]:
Edges.describe()
Edges.head()

Unnamed: 0,Trip ID,Duration,Start Date,src,Start Terminal,End Date,dst,End Terminal,Bike #,Subscriber Type,Zip Code
0,913460,765,8/31/2015 23:26,Harry Bridges Plaza (Ferry Building),50,8/31/2015 23:39,San Francisco Caltrain (Townsend at 4th),70,288,Subscriber,2139
1,913459,1036,8/31/2015 23:11,San Antonio Shopping Center,31,8/31/2015 23:28,Mountain View City Hall,27,35,Subscriber,95032
2,913455,307,8/31/2015 23:13,Post at Kearny,47,8/31/2015 23:18,2nd at South Park,64,468,Subscriber,94107
3,913454,409,8/31/2015 23:10,San Jose City Hall,10,8/31/2015 23:17,San Salvador at 1st,8,68,Subscriber,95113
4,913453,789,8/31/2015 23:09,Embarcadero at Folsom,51,8/31/2015 23:22,Embarcadero at Sansome,60,487,Customer,9069


In [15]:
townand7thEdges = stationGraph.edges.where("src='Townsend at 7th' OR dst = 'Townsend at 7th'")
subgraph = GraphFrame(stationGraph.vertices, townand7thEdges)

In [16]:
from pyspark.sql.functions import desc
ranks = stationGraph.pageRank(resetProbability=0.15,maxIter =10)
ranks.vertices\
.orderBy(desc("pagerank"))\
.select("id","pagerank")\
.show(10)

+--------------------+------------------+
|                  id|          pagerank|
+--------------------+------------------+
|San Jose Diridon ...| 4.051504835989958|
|San Francisco Cal...| 3.351183296428705|
|Mountain View Cal...|2.5143907710155586|
|Redwood City Calt...|2.3263087713711696|
|San Francisco Cal...| 2.231144291369857|
|Harry Bridges Pla...|1.8251120118882906|
|     2nd at Townsend|  1.58212177850392|
|Santa Clara at Al...|1.5730074084907522|
|     Townsend at 7th|1.5684565805340673|
|Embarcadero at Sa...| 1.541424208774895|
+--------------------+------------------+
only showing top 10 rows



In [17]:
# In and Out degrees
inDeg = stationGraph.inDegrees
inDeg.orderBy(desc("indegree")).show(5,False)
outDeg = stationGraph.outDegrees
outDeg.orderBy(desc("outdegree")).show(5, False)
Degrees = stationGraph.degrees
Degrees.orderBy(desc("degree")).show(20, False)
inDeg1=inDeg.toPandas()
outDeg1=outDeg.toPandas()
Degrees1 = Degrees.toPandas()


+----------------------------------------+--------+
|id                                      |inDegree|
+----------------------------------------+--------+
|San Francisco Caltrain (Townsend at 4th)|34810   |
|San Francisco Caltrain 2 (330 Townsend) |22523   |
|Harry Bridges Plaza (Ferry Building)    |17810   |
|2nd at Townsend                         |15463   |
|Townsend at 7th                         |15422   |
+----------------------------------------+--------+
only showing top 5 rows

+---------------------------------------------+---------+
|id                                           |outDegree|
+---------------------------------------------+---------+
|San Francisco Caltrain (Townsend at 4th)     |26304    |
|San Francisco Caltrain 2 (330 Townsend)      |21758    |
|Harry Bridges Plaza (Ferry Building)         |17255    |
|Temporary Transbay Terminal (Howard at Beale)|14436    |
|Embarcadero at Sansome                       |14158    |
+------------------------------------------

In [18]:
import pandas as pd
Degreesum = inDeg1.join(outDeg1.set_index("id"), on='id').join(Degrees1.set_index("id"),on='id')
Degreesum["in%"]=Degreesum["inDegree"]/Degreesum["degree"]
Degreesum["out%"]=Degreesum["outDegree"]/Degreesum["degree"]
Degreesum.plot()
Degreesum.head()

Unnamed: 0,id,inDegree,outDegree,degree,in%,out%
0,2nd at Folsom,4727,7999,12726,0.371444,0.628556
1,California Ave Caltrain Station,496,400,896,0.553571,0.446429
2,Powell at Post (Union Square),4134,6425,10559,0.391514,0.608486
3,Golden Gate at Polk,2852,3646,6498,0.438904,0.561096
4,Yerba Buena Center of the Arts (3rd @ Howard),6288,5523,11811,0.532385,0.467615


In [19]:
from pyspark.sql.functions import desc
degreeRatio = inDeg.join(outDeg,"id").join(Degrees,"id")\
.selectExpr("id","double(inDegree)/double(Degree) as InRatio","double(outDegree)/double(Degree) as outRatio","double(inDegree)/double(outDegree) as InOutdegreeRatio")


degreeRatio\
.orderBy(desc("InOutdegreeRatio")).show(10, False)
degreeRatio\
.orderBy("InOutdegreeRatio").show(10, False)


+----------------------------------------+------------------+-------------------+------------------+
|id                                      |InRatio           |outRatio           |InOutdegreeRatio  |
+----------------------------------------+------------------+-------------------+------------------+
|Redwood City Medical Center             |0.6052631578947368|0.39473684210526316|1.5333333333333334|
|San Mateo County Center                 |0.5955414012738853|0.40445859872611467|1.4724409448818898|
|SJSU 4th at San Carlos                  |0.5766488413547237|0.4233511586452763 |1.3621052631578947|
|San Francisco Caltrain (Townsend at 4th)|0.5695912556860948|0.43040874431390513|1.3233728710462287|
|Washington at Kearny                    |0.5668457905878521|0.4331542094121479 |1.3086466165413533|
|Paseo de San Antonio                    |0.5562467599792639|0.44375324002073613|1.2535046728971964|
|California Ave Caltrain Station         |0.5535714285714286|0.44642857142857145|1.24      

In [20]:
# Breadth First Search
bfsResult = stationGraph.bfs(fromExpr="id='Townsend at 7th'", toExpr="id ='Spear at Folsom'",maxPathLength=2)
bfsResult.show(10)

+--------------------+--------------------+--------------------+
|                from|                  e0|                  to|
+--------------------+--------------------+--------------------+
|[65, Townsend at ...|[913371, 663, 8/3...|[49, Spear at Fol...|
|[65, Townsend at ...|[913265, 658, 8/3...|[49, Spear at Fol...|
|[65, Townsend at ...|[911919, 722, 8/3...|[49, Spear at Fol...|
|[65, Townsend at ...|[910777, 704, 8/2...|[49, Spear at Fol...|
|[65, Townsend at ...|[908994, 1115, 8/...|[49, Spear at Fol...|
|[65, Townsend at ...|[906912, 892, 8/2...|[49, Spear at Fol...|
|[65, Townsend at ...|[905201, 980, 8/2...|[49, Spear at Fol...|
|[65, Townsend at ...|[904010, 969, 8/2...|[49, Spear at Fol...|
|[65, Townsend at ...|[903375, 850, 8/2...|[49, Spear at Fol...|
|[65, Townsend at ...|[899944, 910, 8/2...|[49, Spear at Fol...|
+--------------------+--------------------+--------------------+
only showing top 10 rows



In [21]:
#Connected Components
spark.sparkContext.setCheckpointDir("tmp/checkpoints")

In [22]:
minGraph = GraphFrame(stationVertices, tripEdges.sample(False, 0.2))
cc = minGraph.connectedComponents()

In [104]:
cc.where("component != 0").show()

+----------+--------------------+---------+-----------+---------+-------------+------------+------------+
|station_id|                  id|      lat|       long|dockcount|     landmark|installation|   component|
+----------+--------------------+---------+-----------+---------+-------------+------------+------------+
|        47|     Post at Kearney|37.788975|-122.403452|       19|San Francisco|   8/19/2013|317827579904|
|        46|Washington at Kea...|37.795425|-122.404767|       15|San Francisco|   8/19/2013| 17179869184|
+----------+--------------------+---------+-----------+---------+-------------+------------+------------+



In [106]:
scc = minGraph.stronglyConnectedComponents(maxIter=3)

In [131]:
scc.orderBy(desc("component")).show(10, False)

+----------+---------------------------------+---------+-----------+---------+-------------+------------+------------+
|station_id|id                               |lat      |long       |dockcount|landmark     |installation|component   |
+----------+---------------------------------+---------+-----------+---------+-------------+------------+------------+
|47        |Post at Kearney                  |37.788975|-122.403452|19       |San Francisco|8/19/2013   |317827579904|
|6         |San Pedro Square                 |37.336721|-121.894074|15       |San Jose     |8/7/2013    |128849018880|
|8         |San Salvador at 1st              |37.330165|-121.885831|15       |San Jose     |8/5/2013    |128849018880|
|7         |Paseo de San Antonio             |37.333798|-121.886943|15       |San Jose     |8/7/2013    |128849018880|
|13        |St James Park                    |37.339301|-121.889937|15       |San Jose     |8/6/2013    |128849018880|
|16        |SJSU - San Salvador at 9th       |37

In [23]:
# Motif Finding
motifs = stationGraph.find("(a)-[ab]->(b);(b)-[bc]->(c); (c)-[ca]->(a)")

In [34]:
motifs.ab["Start Date"]

Column<b'ab[Start Date]'>

In [41]:
#query motifs as a Dataframe
from pyspark.sql.functions import expr, to_timestamp

cyclepattern = motifs\
.selectExpr('*', 
"""to_timestamp(ab['Start Date'], 'MM/dd/yyyy HH:mm') as abStart""",
"""to_timestamp(bc['Start Date'], 'MM/dd/yyyy HH:mm') as bcStart""",
"""to_timestamp(ca['Start Date'], 'MM/dd/yyyy HH:mm') as caStart""")\
.where ("ca['Bike #']=bc['Bike #']")\
.where ("ab['Bike #']=bc['Bike #']")\
.where ("a.id != b.id")\
.where ("b.id != c.id")\
.where ("abStart < bcStart")\
.where ("bcStart < caStart")\
.orderBy(expr("cast(caStart as long)- cast(abStart as long)"))\
.selectExpr("a.id", "b.id", "c.id","ab['Start Date']","ca['End Date']")



In [42]:
cyclepattern.show(5, False)

+----------------------------------------+----------------------------------------+----------------------------------------+---------------+---------------+
|id                                      |id                                      |id                                      |ab.Start Date  |ca.End Date    |
+----------------------------------------+----------------------------------------+----------------------------------------+---------------+---------------+
|San Francisco Caltrain 2 (330 Townsend) |Townsend at 7th                         |San Francisco Caltrain (Townsend at 4th)|5/19/2015 16:09|5/19/2015 16:33|
|Harry Bridges Plaza (Ferry Building)    |2nd at Townsend                         |San Francisco Caltrain (Townsend at 4th)|6/16/2015 8:03 |6/16/2015 8:31 |
|San Francisco Caltrain (Townsend at 4th)|2nd at Townsend                         |Steuart at Market                       |10/8/2014 17:12|10/8/2014 17:42|
|2nd at Townsend                         |San Francisco Ca