## Using Spark GraphFrames to process BikeShare data

In [None]:
import pandas as pd

In [None]:
trips_filename = '../201508_trip_data.csv'
stations_filename = '../201508_station_data.csv'

trips_pdf = pd.read_csv(trips_filename)
stations_pdf = pd.read_csv(stations_filename)

In [None]:
import pyspark
from pyspark.sql import SparkSession
from graphframes import *

In [None]:
session = SparkSession.builder.appName("BikeShareGF").getOrCreate()

In [None]:
display(stations_pdf.head())

In [None]:
# create nodes from stations.
#
# Vertex DataFrame: A vertex DataFrame should contain a special column named "id" 
# which specifies unique IDs for each vertex in the graph.
#
stations_pdf = stations_pdf.rename(columns={'station_id':'id'})
vertices = session.createDataFrame(stations_pdf)

In [None]:
vertices.printSchema()

In [None]:
display(trips_pdf.head())

In [None]:
# pick our set of columns to represent the edges
edge_columns = ['Start Terminal', 'End Terminal', 'Trip ID', 'Start Date', 'End Date']

trips_edges_pdf = trips_pdf.loc[:,edge_columns]

In [None]:
# rename the start and end terminal to 'src' and 'dst'
# An edge DataFrame should contain two special columns: "src" (source vertex ID of edge) 
# and "dst" (destination vertex ID of edge).
trips_edges_pdf = trips_edges_pdf.rename(columns={'Start Terminal':'src', 'End Terminal':'dst'})

In [None]:
display(trips_edges_pdf.head())

In [None]:
edges = session.createDataFrame(trips_edges_pdf)

In [None]:
edges.printSchema()

In [None]:
g = GraphFrame(vertices, edges)

In [None]:
print(g.vertices.count())

In [None]:
print(g.edges.count())

In [None]:
g.inDegrees.sort('inDegree', ascending=False).show()

In [None]:
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")

In [None]:
display(motifs.count())

In [None]:
# Find chains of 4 vertices.
chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

In [None]:
chain4.count()