## Using Spark GraphFrames to process BikeShare data

In [1]:
import pandas as pd

In [2]:
trips_filename = '../201508_trip_data.csv'
stations_filename = '../201508_station_data.csv'

trips_pdf = pd.read_csv(trips_filename)
stations_pdf = pd.read_csv(stations_filename)

In [3]:
import pyspark
from pyspark.sql import SparkSession
from graphframes import *

In [4]:
session = SparkSession.builder.appName("BikeShareGF").getOrCreate()

In [5]:
display(stations_pdf.head())

Unnamed: 0,station_id,name,lat,long,dockcount,landmark,installation
0,2,San Jose Diridon Caltrain Station,37.329732,-121.901782,27,San Jose,8/6/2013
1,3,San Jose Civic Center,37.330698,-121.888979,15,San Jose,8/5/2013
2,4,Santa Clara at Almaden,37.333988,-121.894902,11,San Jose,8/6/2013
3,5,Adobe on Almaden,37.331415,-121.8932,19,San Jose,8/5/2013
4,6,San Pedro Square,37.336721,-121.894074,15,San Jose,8/7/2013


In [6]:
# create nodes from stations.
#
# Vertex DataFrame: A vertex DataFrame should contain a special column named "id" 
# which specifies unique IDs for each vertex in the graph.
#
stations_pdf = stations_pdf.rename(columns={'station_id':'id'})
vertices = session.createDataFrame(stations_pdf)

In [7]:
vertices.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- dockcount: long (nullable = true)
 |-- landmark: string (nullable = true)
 |-- installation: string (nullable = true)



In [8]:
display(trips_pdf.head())

Unnamed: 0,Trip ID,Duration,Start Date,Start Station,Start Terminal,End Date,End Station,End Terminal,Bike #,Subscriber Type,Zip Code
0,913460,765,8/31/2015 23:26,Harry Bridges Plaza (Ferry Building),50,8/31/2015 23:39,San Francisco Caltrain (Townsend at 4th),70,288,Subscriber,2139
1,913459,1036,8/31/2015 23:11,San Antonio Shopping Center,31,8/31/2015 23:28,Mountain View City Hall,27,35,Subscriber,95032
2,913455,307,8/31/2015 23:13,Post at Kearny,47,8/31/2015 23:18,2nd at South Park,64,468,Subscriber,94107
3,913454,409,8/31/2015 23:10,San Jose City Hall,10,8/31/2015 23:17,San Salvador at 1st,8,68,Subscriber,95113
4,913453,789,8/31/2015 23:09,Embarcadero at Folsom,51,8/31/2015 23:22,Embarcadero at Sansome,60,487,Customer,9069


In [None]:
# pick our set of columns to represent the edges
edge_columns = ['Start Terminal', 'End Terminal', 'Trip ID', 'Start Date', 'End Date']

trips_edges_pdf = trips_pdf.loc[:,edge_columns]

In [11]:
# rename the start and end terminal to 'src' and 'dst'
# An edge DataFrame should contain two special columns: "src" (source vertex ID of edge) 
# and "dst" (destination vertex ID of edge).
trips_edges_pdf = trips_edges_pdf.rename(columns={'Start Terminal':'src', 'End Terminal':'dst'})

In [12]:
display(trips_edges_pdf.head())

Unnamed: 0,src,dst,Trip ID,Start Date,End Date
0,50,70,913460,8/31/2015 23:26,8/31/2015 23:39
1,31,27,913459,8/31/2015 23:11,8/31/2015 23:28
2,47,64,913455,8/31/2015 23:13,8/31/2015 23:18
3,10,8,913454,8/31/2015 23:10,8/31/2015 23:17
4,51,60,913453,8/31/2015 23:09,8/31/2015 23:22


In [13]:
edges = session.createDataFrame(trips_edges_pdf)

In [21]:
edges.printSchema()

root
 |-- src: long (nullable = true)
 |-- dst: long (nullable = true)
 |-- Trip ID: long (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- End Date: string (nullable = true)



In [15]:
g = GraphFrame(vertices, edges)

In [23]:
print(g.vertices.count())

70


In [24]:
print(g.edges.count())

354152


In [32]:
g.inDegrees.sort('inDegree', ascending=False).show()

+---+--------+
| id|inDegree|
+---+--------+
| 70|   34810|
| 69|   22523|
| 50|   17810|
| 61|   15463|
| 65|   15422|
| 60|   15065|
| 77|   13916|
| 74|   13617|
| 55|   12966|
| 39|   10239|
| 67|   10220|
| 76|    9685|
| 64|    8253|
| 57|    8147|
| 72|    7714|
| 63|    7275|
| 51|    7229|
| 82|    7159|
| 54|    6687|
| 56|    6330|
+---+--------+
only showing top 20 rows



In [33]:
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")

In [35]:
display(motifs.count())

256638490

In [36]:
# Find chains of 4 vertices.
chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

In [None]:
chain4.count()