# Graph Analysis with GraphX Tutorial
GraphX API를 통한 분석 예제 제공
사용 데이터 : [The Bay Area Bike Share portal](http://www.bayareabikeshare.com/open-data)

데이터는 BayAreaBikeShare라는 데이터 공유사이트에서 2014년 9월~2015년 8월까지의 공공자전거 이용현황을 오픈해놓은 것임

주의점 : GraphX computation is only supported using the Scala and RDD APIs.

* 예제 프로세스 :
  1. 데이터 로드
  1. 데이터 전처리
  1. Vertext 생성
  1. Edge 생성
  1. Graph 생성 및 알고리즘 테스트
    1. PageRank
    1. Trips From Vertext to Vertex
    1. In degree and Out degree
  
GraphX에서 제공하는 [PageRank](https://en.wikipedia.org/wiki/PageRank) 알고리즘은 네트웍 상의 중요한 정점을 찾는데 사용됨

In [29]:
import org.apache.spark.sql._

In [4]:
val sparkSession = SparkSession.builder.config(conf = sc.getConf).
                                        appName("spark session example").
                                        getOrCreate()

In [5]:
import sparkSession.implicits._

In [6]:
val sf_201508_station_data = sparkSession.read.format("com.databricks.spark.csv").
                                                option("header","true").
                                                option("mode","DROPMALFORMED").
                                                load("201508_station_data.csv")

In [7]:
val sf_201508_trip_data = sparkSession.read.format("com.databricks.spark.csv").
                                                option("header","true").
                                                option("mode","DROPMALFORMED").
                                                load("201508_trip_data.csv")

In [8]:
sf_201508_station_data.createOrReplaceTempView("sf_201508_station_data")
sf_201508_trip_data.createOrReplaceTempView("sf_201508_trip_data")

In [9]:
val bikeStations = sparkSession.sql("SELECT * FROM sf_201508_station_data")
val tripData = sparkSession.sql("SELECT * FROM sf_201508_trip_data")

In [10]:
bikeStations.printSchema()
tripData.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- dockcount: string (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: string (nullable = true)

root
 |-- Trip ID: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- Start Station: string (nullable = true)
 |-- Start Terminal: string (nullable = true)
 |-- End Date: string (nullable = true)
 |-- End Station: string (nullable = true)
 |-- End Terminal: string (nullable = true)
 |-- Bike #: string (nullable = true)
 |-- Subscriber Type: string (nullable = true)
 |-- Zip Code: string (nullable = true)



In [11]:
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

In [12]:
val justStations = bikeStations.selectExpr("float(station_id) as station_id","name").distinct()

In [13]:
justStations.show()

+----------+--------------------+
|station_id|                name|
+----------+--------------------+
|      59.0| Golden Gate at Polk|
|      22.0|Redwood City Calt...|
|      62.0|       2nd at Folsom|
|      80.0|Santa Clara Count...|
|      57.0|       5th at Howard|
|      13.0|       St James Park|
|      14.0|Arena Green / SAP...|
|      84.0|         Ryland Park|
|      46.0|Washington at Kea...|
|      39.0|  Powell Street BART|
|      31.0|San Antonio Shopp...|
|      82.0|Broadway St at Ba...|
|      35.0|University and Em...|
|      47.0|     Post at Kearney|
|      77.0|   Market at Sansome|
|      72.0|Civic Center BART...|
|      32.0|Castro Street and...|
|      36.0|California Ave Ca...|
|      50.0|Harry Bridges Pla...|
|      48.0|Embarcadero at Va...|
+----------+--------------------+
only showing top 20 rows



In [14]:
val completeTripData = tripData.
                        join(justStations, tripData("Start Station") === bikeStations("name")).
                        withColumnRenamed("station_id","start_station_id").drop("name").
                        join(justStations, tripData("End Station") === bikeStations("name")).
                        withColumnRenamed("station_id","end_station_id").drop("name")

In [15]:
val stations = completeTripData.
                select("start_station_id", "end_station_id").
                rdd.
                distinct(). // helps filter out duplicate trips
                flatMap(x => Iterable(x(0).asInstanceOf[Number].longValue, x(1).asInstanceOf[Number].longValue)). // helps us maintain types
                distinct().
                toDF() // return to a DF to make merging + joining easier

stations.take(1) // this is just a station_id at this point

Array([13])

In [16]:
val stationVertices: RDD[(VertexId, String)] = stations.
                                join(justStations, stations("value") === justStations("station_id")).
                                select("station_id","name").
                                rdd.
                                map(row => (row(0).asInstanceOf[Number].longValue, row(1).asInstanceOf[String]))
                                                
stationVertices.take(1)

Array((13,St James Park))

In [17]:
val stationEdges:RDD[Edge[Long]] = completeTripData.
                select("start_station_id", "end_station_id").
                rdd.
                map(row => Edge(row(0).asInstanceOf[Number].longValue, row(1).asInstanceOf[Number].longValue, 1))

In [18]:
val defaultStation = ("Missing Station")
val stationGraph = Graph(stationVertices, stationEdges, defaultStation)
stationGraph.cache()

org.apache.spark.graphx.impl.GraphImpl@64d4c54f

In [19]:
println("Total Number of Stations: " + stationGraph.numVertices)
println("Total Number of Trips: " + stationGraph.numEdges)

println("Total Number of Trips in Original Data: " + tripData.count)

Total Number of Stations: 68
Total Number of Trips: 339030
Total Number of Trips in Original Data: 354152


# PageRank

In [28]:
val ranks = stationGraph.pageRank(0.01).vertices
ranks.
    join(stationVertices).
    sortBy(_._2._1, ascending=false).  // sort by the rank
    take(10).
    foreach(x => println(x._2._2))

San Jose Diridon Caltrain Station
San Francisco Caltrain (Townsend at 4th)
Mountain View Caltrain Station
Redwood City Caltrain Station
San Francisco Caltrain 2 (330 Townsend)
Harry Bridges Plaza (Ferry Building)
2nd at Townsend
Santa Clara at Almaden
Townsend at 7th
Embarcadero at Sansome


# Trips From Station to Station
가장 많이 나온 경로 순으로 정렬

In [21]:
stationGraph.
    groupEdges((edge1, edge2) => edge1 + edge2).
    triplets.
    sortBy(_.attr, ascending=false).
    map(triplet => "There were " + triplet.attr.toString + " trips from " + triplet.srcAttr + " to " + triplet.dstAttr).
    take(10).
    foreach(println)

There were 3748 trips from San Francisco Caltrain 2 (330 Townsend) to Townsend at 7th
There were 3145 trips from Harry Bridges Plaza (Ferry Building) to Embarcadero at Sansome
There were 2973 trips from 2nd at Townsend to Harry Bridges Plaza (Ferry Building)
There were 2734 trips from Townsend at 7th to San Francisco Caltrain 2 (330 Townsend)
There were 2640 trips from Harry Bridges Plaza (Ferry Building) to 2nd at Townsend
There were 2439 trips from Embarcadero at Folsom to San Francisco Caltrain (Townsend at 4th)
There were 2356 trips from Steuart at Market to 2nd at Townsend
There were 2330 trips from Embarcadero at Sansome to Steuart at Market
There were 2192 trips from Townsend at 7th to San Francisco Caltrain (Townsend at 4th)
There were 2184 trips from Temporary Transbay Terminal (Howard at Beale) to San Francisco Caltrain (Townsend at 4th)


# In Degress and Out Degrees
Vertext의 [입력차수와 출력차수](http://mathworld.wolfram.com/VertexDegree.html)는 그래프 탐색에서 유용한 자료로 사용될 수 있음

다음 예제는 가장많은 입력차수를 가지는 Vertex와

가장 많은 출력차수를 가지는 Vertex

그리고 가장 많은 indegree를 가지지만 가장 적은 outdegree를 가지는 Vertex

그 반대의 경우 Vertex를 순위별로 출력함

In [22]:
// Indegree 상위 10개
stationGraph.
    inDegrees.
    join(stationVertices).
    sortBy(_._2._1, ascending=false).
    take(10).
    foreach(x => println(x._2._2 + " has " + x._2._1 + " in degrees."))

San Francisco Caltrain (Townsend at 4th) has 34166 in degrees.
San Francisco Caltrain 2 (330 Townsend) has 22109 in degrees.
Harry Bridges Plaza (Ferry Building) has 17403 in degrees.
2nd at Townsend has 15279 in degrees.
Townsend at 7th has 15126 in degrees.
Embarcadero at Sansome has 14705 in degrees.
Market at Sansome has 13778 in degrees.
Steuart at Market has 13383 in degrees.
Temporary Transbay Terminal (Howard at Beale) has 12748 in degrees.
Market at 10th has 9988 in degrees.


In [23]:
stationGraph.
    outDegrees.
    join(stationVertices).
    sortBy(_._2._1, ascending=false).
    take(10).
    foreach(x => println(x._2._2 + " has " + x._2._1 + " out degrees."))

San Francisco Caltrain (Townsend at 4th) has 25631 out degrees.
San Francisco Caltrain 2 (330 Townsend) has 21245 out degrees.
Harry Bridges Plaza (Ferry Building) has 16917 out degrees.
Temporary Transbay Terminal (Howard at Beale) has 14242 out degrees.
Embarcadero at Sansome has 13885 out degrees.
2nd at Townsend has 13746 out degrees.
Steuart at Market has 13478 out degrees.
Townsend at 7th has 13465 out degrees.
Market at 10th has 11449 out degrees.
Market at Sansome has 11317 out degrees.


In [24]:
stationGraph.
    inDegrees.
    join(stationGraph.outDegrees).
    join(stationVertices).
    map(x => (x._2._1._1.toDouble/x._2._1._2.toDouble, x._2._2+" (id:"+x._1+")")).
    sortBy(_._1, ascending=false).
    take(5).
    foreach(x => println(x._2 + " has a in/out degree ratio of " + x._1))

Redwood City Medical Center (id:26) has a in/out degree ratio of 1.5333333333333334
San Mateo County Center (id:23) has a in/out degree ratio of 1.4724409448818898
SJSU 4th at San Carlos (id:12) has a in/out degree ratio of 1.3621052631578947
San Francisco Caltrain (Townsend at 4th) (id:70) has a in/out degree ratio of 1.3329952011236395
Paseo de San Antonio (id:7) has a in/out degree ratio of 1.2535046728971964


In [27]:
stationGraph.
    outDegrees.
    join(stationGraph.inDegrees).
    join(stationVertices).
    map(x => (x._2._1._1.toDouble/x._2._1._2.toDouble, x._2._2)).
    sortBy(_._1, ascending=false).
    take(5).
    foreach(x => println(x._2 + " has a out/in degree ratio of " + x._1))

Grant Avenue at Columbus Avenue has a out/in degree ratio of 1.9841936280563102
2nd at Folsom has a out/in degree ratio of 1.6663080895008606
Powell at Post (Union Square) has a out/in degree ratio of 1.5090406830738323
Mezes Park has a out/in degree ratio of 1.4620689655172414
Evelyn Park and Ride has a out/in degree ratio of 1.3489655172413793
