<br><br><br>
<span style="color:red;font-size:60px">GraphFrames Assignment</span>
<br><br>
In this assignment, you need to do the following:
<li>Read the file 201710-citibike-tripdata.csv</li>
<li>Construct a graph with stations as vertices and trips between stations as edges</li>
<li>Vertex Ids are station numbers and Vertex attributes are station names</li>
<li>Edge attributes are trip duration (durations are in seconds)</li>
<li>Then answer the questions below</li>

In [1]:
%%init_spark
launcher.packages= ["graphframes:graphframes:0.8.2-spark3.2-s_2.12"]

In [2]:
//GraphFrame imports
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._


//GraphX imports
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD




Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.2:4041
SparkContext available as 'sc' (version = 3.2.1, master = local[*], app id = local-1649035831441)
SparkSession available as 'spark'


import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD


<br><br><br>
<span style="color:green;font-size:xx-large">Step 1: Construct the graph</span>
<br><br>
<li>read the data file and drop the header line</li>
<li>create a vertex rdd (the union of start stations and end stations</li>
<li>create an edge rdd (the trips - start station id, end station id, duration</li>
<li>create a graph</li>

In [3]:
val text = sc.textFile("201710-citibike-tripdata.csv")
// drop the header line
val data = text.mapPartitionsWithIndex{ (idx,iter) => if (idx==0) iter.drop(1) else iter}
// create vertex rdd
val start = data.map(r => r.split(",")).map(r => (r(3).toLong,r(4)))
val end = data.map(r => r.split(",")).map(r => (r(7).toLong,r(8)))
val vertices =(start union end ).distinct
// create edge rdd
val edges = data.map(r => r.split(",")).map(r => Edge(r(3).toLong,r(7).toLong,r(0).toInt))
// create a graph
val g: Graph[String, Int] = Graph(vertices, edges)


text: org.apache.spark.rdd.RDD[String] = 201710-citibike-tripdata.csv MapPartitionsRDD[1] at textFile at <console>:37
data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:39
start: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[4] at map at <console>:41
end: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[6] at map at <console>:42
vertices: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[10] at distinct at <console>:43
edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Int]] = MapPartitionsRDD[12] at map at <console>:45
g: org.apache.spark.graphx.Graph[String,Int] = org.apache.spark.graphx.impl.GraphImpl@229c60c4


<br><br><br>
<span style="color:green;font-size:xx-large">Step 2: Basic questions</span>
<br><br>

<li>How many citibike stations are there in the network?</li>
<li>How many trips were made in the month in question?</li>
<li>How many trips started and ended at the same station?</li>
<li>How many station to station connections are there (at least one edge exists between station i and station j and i is not equal to j)?</li>
<li>Your code should print:</li>
<pre>
Total number of stations: 785
Total number of trips.  : 1897592
Trips that started and ended at the same station: 33245
Number of station to station connections: 107524
</pre>

In [4]:
//You might need this
def make_undirected_graph(g: GraphFrame) = {
    val u_edge_df = g.find("(a)-[]->(b)")
        .select($"a.id".as("src"),$"b.id".as("dst"))
        .withColumn("swap",when(col("src")<col("dst"),col("dst")))
        .withColumn("dst",
                    when(col("swap").isNotNull,col("src"))
                    .otherwise(col("dst")))
        .withColumn("src",
                    when(col("swap").isNotNull,col("swap"))
                   .otherwise(col("src")))
        .drop(col("swap"))
        .distinct
    val u_vertices_df = g.vertices
    val u_g = GraphFrame(u_vertices_df,u_edge_df)    
    u_g
}

make_undirected_graph: (g: org.graphframes.GraphFrame)org.graphframes.GraphFrame


In [5]:
val station_number = g.vertices.count
val trip_number = g.edges.count
val same_station = g.edges.filter(r => r.srcId == r.dstId).count
val stat_to_stat = g.convertToCanonicalEdges().edges.filter(r => r.srcId != r.dstId).count


station_number: Long = 785
trip_number: Long = 1897592
same_station: Long = 33245
stat_to_stat: Long = 107524


In [6]:
// import org.apache.spark.graphx.Graph
// import org.apache.spark.sql.Row
// val g2: GraphFrame = GraphFrame.fromGraphX(g)
// make_undirected_graph(g2).edges.distinct.count

<br><br><br>
<span style="color:green;font-size:xx-large">Step 3: Find the Station from which most trips originate</span>
<br><br>
<li>Note that the graph has one edge for each trip (i.e., there are many edges between two vertices)</li>
<li>The function <span style="color:blue">outDegrees</span> returns the number of outgoing edges from every vertex</li>
<li>Print the name of the station with most originating trips</li>
<li>Your code should print:</li>
<pre>  
The station from which most trips originate is: "Pershing Square North"
</pre>

In [7]:
// method1 -- use Graph
import org.apache.spark.graphx.Graph
import org.apache.spark.sql.Row
val g2: GraphFrame = GraphFrame.fromGraphX(g)
// find the id number
val mapped_id = g2.outDegrees.orderBy(desc("outDegree")).take(1).map(r=> r(0))
println(mapped_id)

[Ljava.lang.Object;@5d86c187


import org.apache.spark.graphx.Graph
import org.apache.spark.sql.Row
g2: org.graphframes.GraphFrame = GraphFrame(v:[id: bigint, attr: string], e:[src: bigint, dst: bigint ... 1 more field])
mapped_id: Array[Any] = Array(519)


In [8]:
// select name for that corresponding id
println(g2.vertices.filter("id=519").select("attr").first.getString(0))

"Pershing Square North"


In [9]:
// method2 -- use GraphX
val out_most = g.outDegrees.collect.sortBy(_._2).reverse(0)
g.vertices.filter(r => r._1 == out_most._1).foreach(r => println(r._2))


"Pershing Square North"


out_most: (org.apache.spark.graphx.VertexId, Int) = (519,17995)


<br><br><br>
<span style="color:green;font-size:xx-large">STEP 4: Proportion of trips for each station that start and end at that same station</span>
<br><br>
<li>Create a GraphX graph from the GraphFrames graph (use the method that retains datatypes)</li>
<li>Use aggregateMessages to calculate the number of trips that start and end at the same vertex (for each vertex)</li>
<li>Convert the resulting (VertexRDD) to a DataFrame</li>
<li>Using join (and select), add the location of the station column to the df</li>
<li>Join this df to the out degrees df created earlier</li>
<li>Divide the same trips column by the out degrees column and select the appropriate rows</li>
<li>Sort the resulting df by this proportion in descending order</li>
<li>Your output should be the following dataframe</li>
<li>Note: Though you can use aggregateMessages in GraphFrames, you must use the GraphX version for this assignment</li>
    
<pre>
+----+--------------------+-----+---------+-------------------+
|  id|            location|trips|outDegree|               prop|
+----+--------------------+-----+---------+-------------------+
|3488|  "8D QC Station 01"|    1|        1|                1.0|
|3245|"NYCBS DEPOT - DE...|    1|        2|                0.5|
|3182|"Yankee Ferry Ter...|  309|      900| 0.3433333333333333|
|3254|  "Soissons Landing"|  358|     1100|0.32545454545454544|
|3342|"Pioneer St & Ric...|   59|      299|0.19732441471571907|
|3477|"39 St & 2 Ave - ...|   45|      245| 0.1836734693877551|
|3532|"Ditmars Blvd & 1...|   70|      407|  0.171990171990172|
|3180|"Brooklyn Bridge ...|  232|     1354|0.17134416543574593|
|3423|"West Drive & Pro...|  367|     2463|0.14900527811611855|
|3636|"Expansion Wareho...|    1|        8|              0.125|
|3302|"Columbus Ave & W...|   74|      598|0.12374581939799331|
|3120|"Center Blvd & Bo...|   74|      622| 0.1189710610932476|
|3514|"Astoria Park S &...|   34|      299|0.11371237458193979|
|3479|      "Picnic Point"|   77|      712|0.10814606741573034|
|3594|"Montgomery St & ...|    9|       87|0.10344827586206896|
|3524|    "19 St & 24 Ave"|   24|      249| 0.0963855421686747|
|3349|"Grand Army Plaza...|  237|     2570|0.09221789883268483|
|3333|"Columbia St & Lo...|    5|       55|0.09090909090909091|
|3354|"3 St & Prospect ...|  142|     1572|0.09033078880407125|
|3607|    "31 Ave & 14 St"|   10|      113|0.08849557522123894|
+----+--------------------+-----+---------+-------------------+
only showing top 20 rows
</pre>

In [10]:
// val v = g.vertices.rdd.map(r => (r(0).toString.toLong,r(1).toString))
// val e = g.edges.rdd.map(r => Edge(r(0).toString.toLong,r(1).toString.toLong,r(2).toString.toDouble))
// val gx: Graph[String, Double] = Graph(v, e)

In [11]:
// calculate the number of trips
val tripNum_sameVer = g.aggregateMessages[Int](ec => {if (ec.srcId==ec.dstId) ec.sendToDst(1) else ec.sendToDst(0)},
                                                   (x,y) => x+y)
// Convert the resulting (VertexRDD) to a DataFrame
val tripNum_df = spark.createDataFrame(tripNum_sameVer).toDF("id_trip","trips")
// add the location of the station column to the df
val stat_name = spark.createDataFrame(g.vertices).toDF("id","location")
val joined_name = tripNum_df.select("id_trip","trips").join(stat_name,tripNum_df("id_trip")===stat_name("id"))
// Join this df to the out degrees df created earlier
val stat_degree = spark.createDataFrame(g.outDegrees).toDF("id","outDegree")
val joined_degree = joined_name.select("id_trip","trips","location").join(stat_degree,joined_name("id_trip")===stat_degree("id")).drop("id_trip")
// Divide the same trips column by the out degrees column
val joined_withprop =joined_degree.withColumn("prop",col("trips")/col("outDegree"))
// Sort the resulting df by this proportion in descending order
val final_df = joined_withprop.select("id","location","trips","outDegree","prop").orderBy(desc("prop"))
final_df.show


+----+--------------------+-----+---------+-------------------+
|  id|            location|trips|outDegree|               prop|
+----+--------------------+-----+---------+-------------------+
|3488|  "8D QC Station 01"|    1|        1|                1.0|
|3245|"NYCBS DEPOT - DE...|    1|        2|                0.5|
|3182|"Yankee Ferry Ter...|  309|      900| 0.3433333333333333|
|3254|  "Soissons Landing"|  358|     1100|0.32545454545454544|
|3342|"Pioneer St & Ric...|   59|      299|0.19732441471571907|
|3477|"39 St & 2 Ave - ...|   45|      245| 0.1836734693877551|
|3532|"Ditmars Blvd & 1...|   70|      407|  0.171990171990172|
|3180|"Brooklyn Bridge ...|  232|     1354|0.17134416543574593|
|3423|"West Drive & Pro...|  367|     2463|0.14900527811611855|
|3636|"Expansion Wareho...|    1|        8|              0.125|
|3302|"Columbus Ave & W...|   74|      598|0.12374581939799331|
|3120|"Center Blvd & Bo...|   74|      622| 0.1189710610932476|
|3514|"Astoria Park S &...|   34|      2

tripNum_sameVer: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[66] at RDD at VertexRDD.scala:57
tripNum_df: org.apache.spark.sql.DataFrame = [id_trip: bigint, trips: int]
stat_name: org.apache.spark.sql.DataFrame = [id: bigint, location: string]
joined_name: org.apache.spark.sql.DataFrame = [id_trip: bigint, trips: int ... 2 more fields]
stat_degree: org.apache.spark.sql.DataFrame = [id: bigint, outDegree: int]
joined_degree: org.apache.spark.sql.DataFrame = [trips: int, location: string ... 2 more fields]
joined_withprop: org.apache.spark.sql.DataFrame = [trips: int, location: string ... 3 more fields]
final_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, location: string ... 3 more fields]


<br><br><br>
<span style="color:green;font-size:xx-large">STEP 5: Create a new graph that contains all edges except for those between the same station</span>
<br><br>


In [12]:
val new_g = g.subgraph(ec => ec.srcId != ec.dstId, (v_id,v_attr) => true)


new_g: org.apache.spark.graphx.Graph[String,Int] = org.apache.spark.graphx.impl.GraphImpl@6ff5b88f


<br><br><br>
<span style="color:green;font-size:xx-large">STEP 6: Calculate the average duration between every pair of stations</span>
<br><br>
<li>use the new graph from step 5 for this</li>
<li>I'll let you figure this out but this should be really easy (think SQL)</li>
<pre>
+----+----+------------------+
| src| dst|                 m|
+----+----+------------------+
| 504| 350| 772.7647058823529|
| 433| 527| 532.9677419354839|
| 434| 470|316.52272727272725|
| 438| 151|  546.195652173913|
| 445| 507| 553.3947368421053|
|2021| 446| 827.6904761904761|
| 116| 518| 1115.857142857143|
|3435| 358| 895.6666666666666|
|3402|3414| 634.1666666666666|
| 498| 495| 801.2272727272727|
|3637| 418|             889.7|
| 380|3260|419.42105263157896|
|3360| 507|            1442.0|
| 326| 247|             713.0|
|3358| 467| 500.6666666666667|
|3164| 457|502.97241379310344|
| 498| 528| 434.3207547169811|
| 405|3256| 843.2168674698795|
| 477|2000|2672.3333333333335|
|3226|3163| 686.4444444444445|
+----+----+------------------+
only showing top 20 rows
<pre>

In [13]:
val pair_df = new_g.edges.map(e => (e.srcId.toInt,e.dstId.toInt,e.attr)).toDF("src","dst","attr")
val pair_mean = pair_df.groupBy("src","dst").agg(avg("attr").as("m"))
// test with result, compare with the given case
// pair_mean.filter(pair_mean("src")===433 && pair_mean("dst")=== 527).show

pair_mean.show


+---+----+------------------+
|src| dst|                 m|
+---+----+------------------+
| 72| 484| 627.6666666666666|
|116| 504| 712.1111111111111|
|116| 518| 1115.857142857143|
|120| 410|            1498.2|
|127| 485|           1333.25|
|127| 486|            855.25|
|127|3378|1789.3333333333333|
|150| 433|           2071.28|
|151| 116|             913.0|
|157|3419| 592.6666666666666|
|161| 531| 594.4285714285714|
|164|3443| 555.1081081081081|
|173|3295|            1348.6|
|195| 253|            1214.8|
|195| 355| 825.3793103448276|
|195| 442|            1314.0|
|228| 509|             984.2|
|229|3093|            1405.7|
|236| 253|             497.0|
|236| 341| 911.6363636363636|
+---+----+------------------+
only showing top 20 rows



pair_df: org.apache.spark.sql.DataFrame = [src: int, dst: int ... 1 more field]
pair_mean: org.apache.spark.sql.DataFrame = [src: int, dst: int ... 1 more field]


<br><br><br>
<span style="color:green;font-size:xx-large">STEP 7: Important stations</span><br><br>
Citibike wants to figure out how best to deploy its workers in checking whether a station is over-full (too many bikes) or needs more bikes. It figures that the best way to do this is to find out which stations are the most important in terms of flows:
<li>A station that has high bike returns and is connected to other stations with high bike returns is more likely to have too many bikes in its station and therefore should be monitored more often</li>
<li>A station that has high bike pickups and is connected to other stations with high bike pickups is more likely to be short of bikes and therefore should be monitored more often</li>
<li>Calculate the propensities for over-fullness and emptiness for every station</li>
<li>Report the 5 most important stations for over-fullness (use pageRank on the graph)</li>
<li>Report the 5 most important stations for emptiness (reverse all the edges on the graph and use pageRank)</li>
<li>Your results (Don't worry about the meaning of location names!):</li>
<li>Note: Assume a reset_probability of 0.15 and a tolerance of .0001 if you want the same results as mine</li>
<pre>
+---+--------------------+------------------+
| id|            location|          pagerank|
+---+--------------------+------------------+
|519|"Pershing Square ...| 4.930887390071603|
|426|"West St & Chambe...|3.7410934274030576|
|402|"Broadway & E 22 St"|  3.58520147183096|
|497|"E 17 St & Broadway"| 3.537658018512581|
|435|   "W 21 St & 6 Ave"| 3.438585855241344|
+---+--------------------+------------------+

+----+--------------------+------------------+
|  id|            location|          pagerank|
+----+--------------------+------------------+
|3197|      "Hs Don't Use"| 5.710640869520747|
| 519|"Pershing Square ...| 5.012823444592195|
|3480|      "WS Don't Use"| 4.272284643284593|
| 402|"Broadway & E 22 St"|3.4515211069038183|
| 497|"E 17 St & Broadway"|3.3347259745457443|
+----+--------------------+------------------+
</pre>


In [14]:
val over_full = g2.pageRank.resetProbability(0.15).tol(0.0001).run()
                    .vertices.orderBy(desc("pagerank"))
over_full.show(5)

+---+--------------------+------------------+
| id|                attr|          pagerank|
+---+--------------------+------------------+
|519|"Pershing Square ...| 4.930887390071603|
|426|"West St & Chambe...|3.7410934274030576|
|402|"Broadway & E 22 St"|  3.58520147183096|
|497|"E 17 St & Broadway"| 3.537658018512581|
|435|   "W 21 St & 6 Ave"| 3.438585855241344|
+---+--------------------+------------------+
only showing top 5 rows



over_full: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, attr: string ... 1 more field]


In [15]:
//reverse all the edges on the graph, generate new graphframe
val reversed_edges = data.map(r => r.split(",")).map(r => Edge(r(7).toLong,r(3).toLong,r(0).toInt))
val reversed_g: Graph[String, Int] = Graph(vertices, reversed_edges)
val reversed_g2: GraphFrame = GraphFrame.fromGraphX(reversed_g)


reversed_edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Int]] = MapPartitionsRDD[988] at map at <console>:41
reversed_g: org.apache.spark.graphx.Graph[String,Int] = org.apache.spark.graphx.impl.GraphImpl@699030f5
reversed_g2: org.graphframes.GraphFrame = GraphFrame(v:[id: bigint, attr: string], e:[src: bigint, dst: bigint ... 1 more field])


In [16]:
val emptiness = reversed_g2.pageRank.resetProbability(0.15).tol(0.0001).run()
                    .vertices.orderBy(desc("pagerank"))
emptiness.show(5)

+----+--------------------+------------------+
|  id|                attr|          pagerank|
+----+--------------------+------------------+
|3197|      "Hs Don't Use"| 5.710640869520747|
| 519|"Pershing Square ...| 5.012823444592195|
|3480|      "WS Don't Use"| 4.272284643284593|
| 402|"Broadway & E 22 St"|3.4515211069038183|
| 497|"E 17 St & Broadway"|3.3347259745457443|
+----+--------------------+------------------+
only showing top 5 rows



emptiness: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, attr: string ... 1 more field]


<br><br><br>
<span style="color:green;font-size:xx-large">STEP 8: Calculate the clustering coefficient of every station</span><br><br>

<li>And report the top 20 stations by clustering coefficient</li>
<pre>
+----+--------------------+------------------+
|  id|            location|             coeff|
+----+--------------------+------------------+
|3040|     "GOW Tech Shop"|               1.0|
|3639|        "Harborside"|               1.0|
|3192|"Liberty Light Rail"|               1.0|
|3485| "NYCBS Depot - RIS"|               1.0|
|3647|    "48 Ave & 30 Pl"|               1.0|
|3279|       "Dixon Mills"|               1.0|
|3186|     "Grove St PATH"|               1.0|
| 153|   "E 40 St & 5 Ave"|               1.0|
| 339|"Avenue D & E 12 St"| 0.877201420748853|
|3464|"W 37 St & Broadway"|0.8679573382796197|
| 247|"Perry St & Bleec...|0.8602079768329604|
|3175|"W 70 St & Amster...|0.8592469808193227|
|3176|"W 64 St & West E...|0.8568452539928423|
|3623|"W 120 St & Clare...|0.8549019607843137|
|3491|  "E 118 St & 1 Ave"| 0.854122621564482|
| 266| "Avenue D & E 8 St"| 0.849218980253463|
|3441|   "10 Hudson Yards"|0.8482701509017299|
|3646|    "35 Ave & 10 St"|0.8333333333333334|
|3642|"E 98 St & Lexing...|             0.832|
| 444|"Broadway & W 24 St"|0.8283229697508064|
+----+--------------------+------------------+
only showing top 20 rows
</pre>

In [17]:
val triangles = g2.triangleCount.run().withColumnRenamed("id","t_id") //Get the number of triangles each vertex belongs to
val degrees = make_undirected_graph(g2).degrees //Get the number of adjacent vertices for each vertex
val possible = degrees.withColumn("possible",col("degree")*(col("degree")-1)/lit(2)) //Calculate possible triangles
val joined = triangles.select("t_id","count","attr").join(possible,triangles("t_id")===possible("id")) 
val coeff = joined.withColumn("coeff",col("count")/col("possible"))

triangles: org.apache.spark.sql.DataFrame = [count: bigint, t_id: bigint ... 1 more field]
degrees: org.apache.spark.sql.DataFrame = [id: bigint, degree: int]
possible: org.apache.spark.sql.DataFrame = [id: bigint, degree: int ... 1 more field]
joined: org.apache.spark.sql.DataFrame = [t_id: bigint, count: bigint ... 4 more fields]
coeff: org.apache.spark.sql.DataFrame = [t_id: bigint, count: bigint ... 5 more fields]


In [18]:
val fianl_result = coeff.select("id","attr","coeff").orderBy(desc("coeff")).withColumnRenamed("attr","location")
fianl_result.show


+----+--------------------+------------------+
|  id|            location|             coeff|
+----+--------------------+------------------+
|3279|       "Dixon Mills"|               1.0|
|3647|    "48 Ave & 30 Pl"|               1.0|
|3040|     "GOW Tech Shop"|               1.0|
|3186|     "Grove St PATH"|               1.0|
| 153|   "E 40 St & 5 Ave"|               1.0|
|3192|"Liberty Light Rail"|               1.0|
|3485| "NYCBS Depot - RIS"|               1.0|
|3639|        "Harborside"|               1.0|
| 339|"Avenue D & E 12 St"| 0.877201420748853|
|3464|"W 37 St & Broadway"|0.8679573382796197|
| 247|"Perry St & Bleec...|0.8602079768329604|
|3175|"W 70 St & Amster...|0.8592469808193227|
|3176|"W 64 St & West E...|0.8568452539928423|
|3623|"W 120 St & Clare...|0.8549019607843137|
|3491|  "E 118 St & 1 Ave"| 0.854122621564482|
| 266| "Avenue D & E 8 St"| 0.849218980253463|
|3441|   "10 Hudson Yards"|0.8482701509017299|
|3646|    "35 Ave & 10 St"|0.8333333333333334|
|3642|"E 98 S

fianl_result: org.apache.spark.sql.DataFrame = [id: bigint, location: string ... 1 more field]
