<br><br><br>
<span style="color:red;font-size:60px">GraphFrames Assignment</span>
<br><br>
In this assignment, you need to do the following:
<li>Read the file 201710-citibike-tripdata.csv</li>
<li>Construct a graph with stations as vertices and trips between stations as edges</li>
<li>Vertex Ids are station numbers and Vertex attributes are station names</li>
<li>Edge attributes are trip duration (durations are in seconds)</li>
<li>Then answer the questions below</li>

In [1]:
%%init_spark
launcher.packages= ["graphframes:graphframes:0.8.2-spark3.2-s_2.12"]

In [2]:
//GraphFrame imports
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._


//GraphX imports
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD




Intitializing Scala interpreter ...

Spark Web UI available at http://localhost:4043
SparkContext available as 'sc' (version = 3.2.0, master = local[*], app id = local-1648710643302)
SparkSession available as 'spark'


import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD


<br><br><br>
<span style="color:green;font-size:xx-large">Step 1: Construct the graph</span>
<br><br>
<li>read the data file and drop the header line</li>
<li>create a vertex rdd (the union of start stations and end stations</li>
<li>create an edge rdd (the trips - start station id, end station id, duration</li>
<li>create a graph</li>

In [3]:
val text = sc.textFile("201710-citibike-tripdata.csv")
val text_no_header = text.mapPartitionsWithIndex{ (idx,iter) => if (idx==0) iter.drop(1) else iter}

//Construct vertices and edges here

//create vertices
val start_rdd = text_no_header.map(x => x.split(",")).map(y => (y(3), y(4))) //id, name
val end_rdd = text_no_header.map(x => x.split(",")).map(y => (y(7), y(8))) //id, name
val vertex_rdd = (start_rdd union end_rdd).distinct
//create edges
val edge_rdd = text_no_header.map(x => x.split(",")).map(y => (y(3), y(7), y(0).toDouble)) //start_id, end_id, duration

//create dataframes
val vertices = spark.createDataFrame(vertex_rdd).toDF("id","location")
val edges = spark.createDataFrame(edge_rdd).toDF("src","dst","dur")

//create graph
val g = GraphFrame(vertices,edges)

text: org.apache.spark.rdd.RDD[String] = 201710-citibike-tripdata.csv MapPartitionsRDD[1] at textFile at <console>:37
text_no_header: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:38
start_rdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at map at <console>:43
end_rdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[6] at map at <console>:44
vertex_rdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[10] at distinct at <console>:45
edge_rdd: org.apache.spark.rdd.RDD[(String, String, Double)] = MapPartitionsRDD[12] at map at <console>:47
vertices: org.apache.spark.sql.DataFrame = [id: string, location: string]
edges: org.apache.spark.sql.DataFrame = [src: string, dst: string ... 1 more field]...


<br><br><br>
<span style="color:green;font-size:xx-large">Step 2: Basic questions</span>
<br><br>

<li>How many citibike stations are there in the network?</li>
<li>How many trips were made in the month in question?</li>
<li>How many trips started and ended at the same station?</li>
<li>How many station to station connections are there (at least one edge exists between station i and station j and i is not equal to j)?</li>
<li>Your code should print:</li>
<pre>
Total number of stations: 785
Total number of trips.  : 1897592
Trips that started and ended at the same station: 33245
Number of station to station connections: 107524
</pre>

In [4]:
//You might need this
def make_undirected_graph(g: GraphFrame) = {
    val u_edge_df = g.find("(a)-[]->(b)")
        .select($"a.id".as("src"),$"b.id".as("dst"))
        .withColumn("swap",when(col("src")<col("dst"),col("dst")))
        .withColumn("dst",
                    when(col("swap").isNotNull,col("src"))
                    .otherwise(col("dst")))
        .withColumn("src",
                    when(col("swap").isNotNull,col("swap"))
                   .otherwise(col("src")))
        .drop(col("swap"))
        .distinct
    val u_vertices_df = g.vertices
    val u_g = GraphFrame(u_vertices_df,u_edge_df)    
    u_g
}

make_undirected_graph: (g: org.graphframes.GraphFrame)org.graphframes.GraphFrame


In [5]:
//g.vertices.count

In [6]:
//g.edges.count

In [7]:
//g.find("(a)-[]->(a)").count

In [8]:
val g_undirected = make_undirected_graph(g)
//g_undirected.find("(a)-[]->(b)").filter("a.id != b.id").distinct.count

g_undirected: org.graphframes.GraphFrame = GraphFrame(v:[id: string, location: string], e:[src: string, dst: string])


In [9]:
println("Total number of stations: "+ g.vertices.count)
println("Total number of trips.  : "+ g.edges.count)
println("Trips that started and ended at the same station: "+ g.find("(a)-[]->(a)").count)
println("Number of station to station connections: "+ g_undirected.find("(a)-[]->(b)").filter("a.id != b.id").distinct.count)

Total number of stations: 785
Total number of trips.  : 1897592
Trips that started and ended at the same station: 33245
Number of station to station connections: 107524


<br><br><br>
<span style="color:green;font-size:xx-large">Step 3: Find the Station from which most trips originate</span>
<br><br>
<li>Note that the graph has one edge for each trip (i.e., there are many edges between two vertices)</li>
<li>The function <span style="color:blue">outDegrees</span> returns the number of outgoing edges from every vertex</li>
<li>Print the name of the station with most originating trips</li>
<li>Your code should print:</li>
<pre>  
The station from which most trips originate is: "Pershing Square North"
</pre>

In [10]:
val maxOutId = g.outDegrees.orderBy(desc("outDegree")).select("id").take(1)(0)(0)
val maxOutName = g.vertices.filter($"id" === maxOutId).select("location").take(1)(0)(0)

println("The station from which most trips originate is: "+maxOutName)

The station from which most trips originate is: "Pershing Square North"


maxOutId: Any = 519
maxOutName: Any = "Pershing Square North"


<br><br><br>
<span style="color:green;font-size:xx-large">STEP 4: Proportion of trips for each station that start and end at that same station</span>
<br><br>
<li>Create a GraphX graph from the GraphFrames graph (use the method that retains datatypes)</li>
<li>Use aggregateMessages to calculate the number of trips that start and end at the same vertex (for each vertex)</li>
<li>Convert the resulting (VertexRDD) to a DataFrame</li>
<li>Using join (and select), add the location of the station column to the df</li>
<li>Join this df to the out degrees df created earlier</li>
<li>Divide the same trips column by the out degrees column and select the appropriate rows</li>
<li>Sort the resulting df by this proportion in descending order</li>
<li>Your output should be the following dataframe</li>
<li>Note: Though you can use aggregateMessages in GraphFrames, you must use the GraphX version for this assignment</li>
    
<pre>
+----+--------------------+-----+---------+-------------------+
|  id|            location|trips|outDegree|               prop|
+----+--------------------+-----+---------+-------------------+
|3488|  "8D QC Station 01"|    1|        1|                1.0|
|3245|"NYCBS DEPOT - DE...|    1|        2|                0.5|
|3182|"Yankee Ferry Ter...|  309|      900| 0.3433333333333333|
|3254|  "Soissons Landing"|  358|     1100|0.32545454545454544|
|3342|"Pioneer St & Ric...|   59|      299|0.19732441471571907|
|3477|"39 St & 2 Ave - ...|   45|      245| 0.1836734693877551|
|3532|"Ditmars Blvd & 1...|   70|      407|  0.171990171990172|
|3180|"Brooklyn Bridge ...|  232|     1354|0.17134416543574593|
|3423|"West Drive & Pro...|  367|     2463|0.14900527811611855|
|3636|"Expansion Wareho...|    1|        8|              0.125|
|3302|"Columbus Ave & W...|   74|      598|0.12374581939799331|
|3120|"Center Blvd & Bo...|   74|      622| 0.1189710610932476|
|3514|"Astoria Park S &...|   34|      299|0.11371237458193979|
|3479|      "Picnic Point"|   77|      712|0.10814606741573034|
|3594|"Montgomery St & ...|    9|       87|0.10344827586206896|
|3524|    "19 St & 24 Ave"|   24|      249| 0.0963855421686747|
|3349|"Grand Army Plaza...|  237|     2570|0.09221789883268483|
|3333|"Columbia St & Lo...|    5|       55|0.09090909090909091|
|3354|"3 St & Prospect ...|  142|     1572|0.09033078880407125|
|3607|    "31 Ave & 14 St"|   10|      113|0.08849557522123894|
+----+--------------------+-----+---------+-------------------+
only showing top 20 rows
</pre>

In [11]:
//Create a GraphX graph from the GraphFrames graph (use the method that retains datatypes)
val v = g.vertices.rdd.map(r => (r(0).toString.toLong,r(1).toString))
val e = g.edges.rdd.map(r => Edge(r(0).toString.toLong,r(1).toString.toLong,r(2).toString.toDouble))
val gx: Graph[String, Double] = Graph(v, e)

v: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[79] at map at <console>:38
e: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Double]] = MapPartitionsRDD[85] at map at <console>:39
gx: org.apache.spark.graphx.Graph[String,Double] = org.apache.spark.graphx.impl.GraphImpl@2e74eaf1


In [12]:
//Use aggregateMessages to calculate the number of trips that start and end at the same vertex (for each vertex)
//Convert the resulting (VertexRDD) to a DataFrame
val cycles = gx.aggregateMessages[Int](ec => if (ec.srcAttr == ec.dstAttr) ec.sendToSrc(1), (x,y) => x+y).toDF("id","trips")

//Using join (and select), add the location of the station column to the df
val gx_v = gx.vertices.toDF("id","location") //dataframe of vertices
val cycles_loc = cycles.join(gx_v, Seq("id"), "left").select("id","location","trips")

//Join this df to the out degrees df created earlier
val out_degree = g.outDegrees //dataframe of outdegrees
val cycles_loc_out = cycles_loc.join(out_degree, Seq("id"), "left")

//Divide the same trips column by the out degrees column and select the appropriate rows
//Sort the resulting df by this proportion in descending order
val df_output = cycles_loc_out.withColumn("prop", $"trips"/$"outDegree").orderBy(desc("prop"))

df_output.show

+----+--------------------+-----+---------+-------------------+
|  id|            location|trips|outDegree|               prop|
+----+--------------------+-----+---------+-------------------+
|3488|  "8D QC Station 01"|    1|        1|                1.0|
|3245|"NYCBS DEPOT - DE...|    1|        2|                0.5|
|3182|"Yankee Ferry Ter...|  309|      900| 0.3433333333333333|
|3254|  "Soissons Landing"|  358|     1100|0.32545454545454544|
|3342|"Pioneer St & Ric...|   59|      299|0.19732441471571907|
|3477|"39 St & 2 Ave - ...|   45|      245| 0.1836734693877551|
|3532|"Ditmars Blvd & 1...|   70|      407|  0.171990171990172|
|3180|"Brooklyn Bridge ...|  232|     1354|0.17134416543574593|
|3423|"West Drive & Pro...|  367|     2463|0.14900527811611855|
|3636|"Expansion Wareho...|    1|        8|              0.125|
|3302|"Columbus Ave & W...|   74|      598|0.12374581939799331|
|3120|"Center Blvd & Bo...|   74|      622| 0.1189710610932476|
|3514|"Astoria Park S &...|   34|      2

cycles: org.apache.spark.sql.DataFrame = [id: bigint, trips: int]
gx_v: org.apache.spark.sql.DataFrame = [id: bigint, location: string]
cycles_loc: org.apache.spark.sql.DataFrame = [id: bigint, location: string ... 1 more field]
out_degree: org.apache.spark.sql.DataFrame = [id: string, outDegree: int]
cycles_loc_out: org.apache.spark.sql.DataFrame = [id: bigint, location: string ... 2 more fields]
df_output: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, location: string ... 3 more fields]


<br><br><br>
<span style="color:green;font-size:xx-large">STEP 5: Create a new graph that contains all edges except for those between the same station</span>
<br><br>


In [13]:
val new_edges = g.edges.filter("src != dst")
val new_g = GraphFrame(vertices,new_edges)

new_edges: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [src: string, dst: string ... 1 more field]
new_g: org.graphframes.GraphFrame = GraphFrame(v:[id: string, location: string], e:[src: string, dst: string ... 1 more field])


<br><br><br>
<span style="color:green;font-size:xx-large">STEP 6: Calculate the average duration between every pair of stations</span>
<br><br>
<li>use the new graph from step 5 for this</li>
<li>I'll let you figure this out but this should be really easy (think SQL)</li>
<pre>
+----+----+------------------+
| src| dst|                 m|
+----+----+------------------+
| 504| 350| 772.7647058823529|
| 433| 527| 532.9677419354839|
| 434| 470|316.52272727272725|
| 438| 151|  546.195652173913|
| 445| 507| 553.3947368421053|
|2021| 446| 827.6904761904761|
| 116| 518| 1115.857142857143|
|3435| 358| 895.6666666666666|
|3402|3414| 634.1666666666666|
| 498| 495| 801.2272727272727|
|3637| 418|             889.7|
| 380|3260|419.42105263157896|
|3360| 507|            1442.0|
| 326| 247|             713.0|
|3358| 467| 500.6666666666667|
|3164| 457|502.97241379310344|
| 498| 528| 434.3207547169811|
| 405|3256| 843.2168674698795|
| 477|2000|2672.3333333333335|
|3226|3163| 686.4444444444445|
+----+----+------------------+
only showing top 20 rows
<pre>

In [14]:
val mean_durations = new_g.edges.groupBy("src","dst").mean("dur").withColumnRenamed("avg(dur)","m")
mean_durations.show

+----+----+------------------+
| src| dst|                 m|
+----+----+------------------+
|3417|3377| 418.3333333333333|
|3467| 502| 784.7058823529412|
| 341| 311|1655.6610169491526|
|3407|2002|            1539.0|
| 161|3435| 585.3466666666667|
|3090| 433|            1415.0|
| 490| 450| 479.7047619047619|
| 237| 496|             468.5|
|3542|3511|             558.5|
|2009|3461|          654.5625|
| 339|2008|1464.6363636363637|
|3259| 526| 687.4615384615385|
| 326| 483|270.91954022988506|
|3438|3147|307.68333333333334|
|3139|3135| 525.5714285714286|
| 284| 487| 791.1666666666666|
|3158|3295|            1039.0|
| 417| 307|             734.0|
| 342| 307|504.26190476190476|
|3428| 435| 356.9760479041916|
+----+----+------------------+
only showing top 20 rows



mean_durations: org.apache.spark.sql.DataFrame = [src: string, dst: string ... 1 more field]


In [15]:
//plugging in any (src,dst) value from the sample output provided, we get the same mean duration. 
// for example, we get the same output for the first row in the sample output
mean_durations.filter($"src" === 504).filter($"dst" === 350).show

+---+---+-----------------+
|src|dst|                m|
+---+---+-----------------+
|504|350|772.7647058823529|
+---+---+-----------------+



<br><br><br>
<span style="color:green;font-size:xx-large">STEP 7: Important stations</span><br><br>
Citibike wants to figure out how best to deploy its workers in checking whether a station is over-full (too many bikes) or needs more bikes. It figures that the best way to do this is to find out which stations are the most important in terms of flows:
<li>A station that has high bike returns and is connected to other stations with high bike returns is more likely to have too many bikes in its station and therefore should be monitored more often</li>
<li>A station that has high bike pickups and is connected to other stations with high bike pickups is more likely to be short of bikes and therefore should be monitored more often</li>
<li>Calculate the propensities for over-fullness and emptiness for every station</li>
<li>Report the 5 most important stations for over-fullness (use pageRank on the graph)</li>
<li>Report the 5 most important stations for emptiness (reverse all the edges on the graph and use pageRank)</li>
<li>Your results (Don't worry about the meaning of location names!):</li>
<li>Note: Assume a reset_probability of 0.15 and a tolerance of .0001 if you want the same results as mine</li>
<pre>
+---+--------------------+------------------+
| id|            location|          pagerank|
+---+--------------------+------------------+
|519|"Pershing Square ...| 4.930887390071603|
|426|"West St & Chambe...|3.7410934274030576|
|402|"Broadway & E 22 St"|  3.58520147183096|
|497|"E 17 St & Broadway"| 3.537658018512581|
|435|   "W 21 St & 6 Ave"| 3.438585855241344|
+---+--------------------+------------------+

+----+--------------------+------------------+
|  id|            location|          pagerank|
+----+--------------------+------------------+
|3197|      "Hs Don't Use"| 5.710640869520747|
| 519|"Pershing Square ...| 5.012823444592195|
|3480|      "WS Don't Use"| 4.272284643284593|
| 402|"Broadway & E 22 St"|3.4515211069038183|
| 497|"E 17 St & Broadway"|3.3347259745457443|
+----+--------------------+------------------+
</pre>


In [16]:
val ranks = g.pageRank.resetProbability(0.15).tol(0.0001).run() //takes long time to run
ranks.vertices.orderBy(desc("pagerank")).limit(5).show

+---+--------------------+------------------+
| id|            location|          pagerank|
+---+--------------------+------------------+
|519|"Pershing Square ...|4.9308873900715895|
|426|"West St & Chambe...| 3.741093427403052|
|402|"Broadway & E 22 St"|3.5852014718309544|
|497|"E 17 St & Broadway"|3.5376580185125768|
|435|   "W 21 St & 6 Ave"| 3.438585855241336|
+---+--------------------+------------------+



ranks: org.graphframes.GraphFrame = GraphFrame(v:[id: string, location: string ... 1 more field], e:[src: string, dst: string ... 2 more fields])


In [17]:
val inverted_edges = g.edges.withColumn("swap",$"dst").withColumn("dst",$"src").withColumn("src",$"swap").drop($"swap")
val i_g = GraphFrame(vertices,inverted_edges)  

val ranks_inv = i_g.pageRank.resetProbability(0.15).tol(0.0001).run() //takes long time to run
ranks_inv.vertices.orderBy(desc("pagerank")).limit(5).show

+----+--------------------+------------------+
|  id|            location|          pagerank|
+----+--------------------+------------------+
|3197|      "Hs Don't Use"|5.7106408695207485|
| 519|"Pershing Square ...| 5.012823444592195|
|3480|      "WS Don't Use"| 4.272284643284594|
| 402|"Broadway & E 22 St"| 3.451521106903816|
| 497|"E 17 St & Broadway"| 3.334725974545743|
+----+--------------------+------------------+



inverted_edges: org.apache.spark.sql.DataFrame = [src: string, dst: string ... 1 more field]
i_g: org.graphframes.GraphFrame = GraphFrame(v:[id: string, location: string], e:[src: string, dst: string ... 1 more field])
ranks_inv: org.graphframes.GraphFrame = GraphFrame(v:[id: string, location: string ... 1 more field], e:[src: string, dst: string ... 2 more fields])


<br><br><br>
<span style="color:green;font-size:xx-large">STEP 8: Calculate the clustering coefficient of every station</span><br><br>

<li>And report the top 20 stations by clustering coefficient</li>
<pre>
+----+--------------------+------------------+
|  id|            location|             coeff|
+----+--------------------+------------------+
|3040|     "GOW Tech Shop"|               1.0|
|3639|        "Harborside"|               1.0|
|3192|"Liberty Light Rail"|               1.0|
|3485| "NYCBS Depot - RIS"|               1.0|
|3647|    "48 Ave & 30 Pl"|               1.0|
|3279|       "Dixon Mills"|               1.0|
|3186|     "Grove St PATH"|               1.0|
| 153|   "E 40 St & 5 Ave"|               1.0|
| 339|"Avenue D & E 12 St"| 0.877201420748853|
|3464|"W 37 St & Broadway"|0.8679573382796197|
| 247|"Perry St & Bleec...|0.8602079768329604|
|3175|"W 70 St & Amster...|0.8592469808193227|
|3176|"W 64 St & West E...|0.8568452539928423|
|3623|"W 120 St & Clare...|0.8549019607843137|
|3491|  "E 118 St & 1 Ave"| 0.854122621564482|
| 266| "Avenue D & E 8 St"| 0.849218980253463|
|3441|   "10 Hudson Yards"|0.8482701509017299|
|3646|    "35 Ave & 10 St"|0.8333333333333334|
|3642|"E 98 St & Lexing...|             0.832|
| 444|"Broadway & W 24 St"|0.8283229697508064|
+----+--------------------+------------------+
only showing top 20 rows
</pre>

In [18]:
val triangles = g.triangleCount.run()//Get the number of triangles each vertex belongs to
val degrees = make_undirected_graph(g).degrees //Get the number of adjacent vertices for each vertex
val possible = degrees.withColumn("possible",col("degree")*(col("degree")-1)/lit(2)) //Calculate possible triangles
val joined = triangles.select("id","count").join(possible,Seq("id"))
    .join(vertices, Seq("id"))
val coeff = joined.withColumn("coeff",col("count")/col("possible"))
coeff.select("id","location","coeff").orderBy(desc("coeff")).limit(20).show

+----+--------------------+------------------+
|  id|            location|             coeff|
+----+--------------------+------------------+
|3647|    "48 Ave & 30 Pl"|               1.0|
|3639|        "Harborside"|               1.0|
|3192|"Liberty Light Rail"|               1.0|
|3485| "NYCBS Depot - RIS"|               1.0|
|3279|       "Dixon Mills"|               1.0|
| 153|   "E 40 St & 5 Ave"|               1.0|
|3186|     "Grove St PATH"|               1.0|
|3040|     "GOW Tech Shop"|               1.0|
| 339|"Avenue D & E 12 St"| 0.877201420748853|
|3464|"W 37 St & Broadway"|0.8679573382796197|
| 247|"Perry St & Bleec...|0.8602079768329604|
|3175|"W 70 St & Amster...|0.8592469808193227|
|3176|"W 64 St & West E...|0.8568452539928423|
|3623|"W 120 St & Clare...|0.8549019607843137|
|3491|  "E 118 St & 1 Ave"| 0.854122621564482|
| 266| "Avenue D & E 8 St"| 0.849218980253463|
|3441|   "10 Hudson Yards"|0.8482701509017299|
|3646|    "35 Ave & 10 St"|0.8333333333333334|
|3642|"E 98 S

triangles: org.apache.spark.sql.DataFrame = [count: bigint, id: string ... 1 more field]
degrees: org.apache.spark.sql.DataFrame = [id: string, degree: int]
possible: org.apache.spark.sql.DataFrame = [id: string, degree: int ... 1 more field]
joined: org.apache.spark.sql.DataFrame = [id: string, count: bigint ... 3 more fields]
coeff: org.apache.spark.sql.DataFrame = [id: string, count: bigint ... 4 more fields]
