In [1]:
from graphframes import GraphFrame

In [2]:
from pyspark.sql.functions import desc, expr

In [3]:
rides = spark.read.csv('gs://capstone-231016/ride/*.csv', header=True)
# rides = spark.read.csv('gs://capstone-231016/ride/2013-08 - Citi Bike trip data.csv', header=True)

In [4]:
vertices = rides.select('start station id', 'start station name')\
    .withColumnRenamed('start station id', 'id')\
    .withColumnRenamed('start station name', 'name')\
    .distinct()
edges = rides.select('start station id', 'end station id')\
    .withColumnRenamed('start station id', 'src')\
    .withColumnRenamed('end station id', 'dst')

In [5]:
g = GraphFrame(vertices, edges)

In [6]:
g.cache()

GraphFrame(v:[id: string, name: string], e:[src: string, dst: string])

In [7]:
g.vertices.count()

908

In [8]:
g.edges.count()

53844385

<h2>Top Visitor Locations

In [197]:
circle = g.edges.where('src == dst').groupBy('src').count().withColumnRenamed('count', 'num')
total = g.edges.groupBy('src').count().withColumnRenamed('count', 'denom')

In [200]:
import pyspark.sql.functions as F

visitor = circle.join(total, ['src']) \
    .withColumn('percent', (F.col('num')/F.col('denom')))\
    .select('src', 'percent')\
    .join(g.vertices, circle.src == vertices.id)\
    .select('src', 'name', 'percent')\
    .orderBy('percent', ascending=False)

visitor\
    .repartition(1)\
    .write.format('com.databricks.spark.csv')\
    .option('header', 'true')\
    .save('gs://capstone-231016/s/visitor')

<h2>Page Rank

In [9]:
ranks = g.pageRank(resetProbability=0.15, maxIter=2)
ranks = ranks.vertices\
    .orderBy(desc('pagerank'))\
    .select('id', 'name', 'pagerank')

In [10]:
ranks.show(20, False)

+----+------------------------+------------------+
|id  |name                    |pagerank          |
+----+------------------------+------------------+
|519 |E 42 St & Vanderbilt Ave|11.673778462572699|
|519 |Pershing Square North   |11.673778462572699|
|519 |Pershing Square N       |11.673778462572699|
|3016|Kent Ave & N 7 St       |4.53481183782187  |
|3016|Mobile 01               |4.53481183782187  |
|517 |Pershing Square S       |4.134364841109948 |
|517 |E 41 St & Madison Ave   |4.134364841109948 |
|517 |Pershing Square South   |4.134364841109948 |
|532 |S 5 Pl & S 4 St         |3.916160807527277 |
|532 |S 5 Pl & S 5 St         |3.916160807527277 |
|504 |1 Ave & E 15 St         |3.58390525866511  |
|504 |1 Ave & E 16 St         |3.58390525866511  |
|468 |Broadway & W 56 St      |3.434931389065668 |
|468 |Broadway & W 55 St      |3.434931389065668 |
|527 |E 33 St & 1 Ave         |3.4003469859324356|
|527 |E 33 St & 2 Ave         |3.4003469859324356|
|2006|Central Park S & 6 Ave  |

In [11]:
ranks\
    .repartition(1)\
    .write.format('com.databricks.spark.csv')\
    .option('header', 'true')\
    .save('gs://capstone-231016/s/ranks')

<h2>In vs Out

In [14]:
inDeg = g.inDegrees
outDeg = g.outDegrees
ratio = inDeg.join(outDeg, 'id')\
    .selectExpr('id', 'double(inDegree)/double(outDegree) AS ratio')\

ratio = ratio.join(g.vertices, ['id']) \
    .select('id', 'name', 'ratio') \
    .orderBy(desc('ratio'))

In [15]:
ratio.show(20, False)

+----+---------------------------------------------+------------------+
|id  |name                                         |ratio             |
+----+---------------------------------------------+------------------+
|3250|NYCBS Depot - PIT                            |11.926829268292684|
|3219|NYCBS Depot - STY                            |4.120743034055727 |
|3240|NYCBS Depot BAL - DYR                        |3.7450980392156863|
|3432|Bike in Movie Night | Prospect Park Bandshell|2.874747474747475 |
|3432|NYCBS Depot - GOW                            |2.874747474747475 |
|3636|Expansion Warehouse 333 Johnson Ave          |2.090909090909091 |
|3468|NYCBS Depot - STY - Garage 4                 |2.0430622009569377|
|3631|Crown St & Bedford Ave                       |2.0               |
|3543|Morningside Dr & Amsterdam Ave               |2.0               |
|3446|NYCBS Depot - STY - Valet Scan               |2.0               |
|3385|2 Ave & E 105 St                             |2.0         

In [16]:
ratio\
    .repartition(1)\
    .write.format('com.databricks.spark.csv')\
    .option('header', 'true')\
    .save('gs://capstone-231016/s/ratio')

<h2>Islands

In [18]:
one = g.edges\
    .groupBy('src', 'dst')\
    .count()
two = g.edges.groupBy('dst', 'src')\
    .count()\
    .withColumnRenamed('dst', 'dst2')\
    .withColumnRenamed('src', 'src2')\
    .withColumnRenamed('count', 'count2')

In [19]:
connections = one.join(two, (one.src == two.dst2) & (one.dst == two.src2)) \
    .withColumn('total', expr('count + count2'))\
    .select('src', 'dst', 'total')

In [20]:
connections = connections.join(g.vertices, connections.src == g.vertices.id)\
    .selectExpr('src', 'name AS src_name', 'dst', 'total')
connections = connections.join(g.vertices, connections.dst == g.vertices.id)\
    .selectExpr('src', 'src_name', 'dst', 'name AS dst_name', 'total')

In [21]:
connections.orderBy('total', ascending=False)\
    .show(20, False)

+----+---------------------------------+----+---------------------------------+------+
|src |src_name                         |dst |dst_name                         |total |
+----+---------------------------------+----+---------------------------------+------+
|2006|Central Park S & 6 Ave           |2006|Central Park S & 6 Ave           |114726|
|281 |Grand Army Plaza & Central Park S|281 |Grand Army Plaza & Central Park S|49478 |
|387 |Centre St & Chambers St          |387 |Centre St & Chambers St          |40142 |
|499 |Broadway & W 60 St               |499 |Broadway & W 60 St               |38776 |
|426 |West St & Chambers St            |514 |12 Ave & W 40 St                 |33392 |
|514 |12 Ave & W 40 St                 |426 |West St & Chambers St            |33392 |
|426 |West St & Chambers St            |426 |West St & Chambers St            |28768 |
|509 |9 Ave & W 22 St                  |435 |W 21 St & 6 Ave                  |27635 |
|435 |W 21 St & 6 Ave                  |509

In [23]:
connections\
    .orderBy('total', ascending=False)\
    .repartition(1)\
    .write.format('com.databricks.spark.csv')\
    .option('header', 'true')\
    .save('gs://capstone-231016/s/connections')