## Network Analysis on Bikeshare trips in 2021
 - Data processing: perform geo join on original trip data to map start and end point with census tract
 - Network Anlaysis: use GraphFrames in PySpark on EMR cluster to construct network upon 5M+ trip data. Extract node statistics features

**1. Data preprocessing**

In [None]:
import pandas as pd
import censusgeocode as cg
import geopandas as gpd
from shapely.geometry import Point
import time

In [None]:
trips_21 = pd.read_csv('data/Divvy_Trips_2021.csv')
# Load census tract shapefile
census_tracts = gpd.read_file('data/boundaries/Boundaries - Census Tracts - 2010/geo_export_e157c189-cb89-47dd-97ed-50066e7f7496.shp')

In [None]:
# Spatial join for start points
start_time = time.time()
start_points = gpd.GeoDataFrame(trips_21, geometry=gpd.points_from_xy(trips_21['start_lng'], trips_21['start_lat']))
start_result = gpd.sjoin(start_points, census_tracts, op='within')
start_result = start_result.rename(columns={'geoid10': 'start_geoid10'})
print(f"Elapsed time: {time.time()-start_time: .2f} seconds")

In [None]:
result = start_result[['ride_id', 'rideable_type', 'started_at', 'ended_at',
       'start_station_name', 'start_station_id', 'end_station_name',
       'end_station_id', 'start_lat', 'start_lng', 'end_lat', 'end_lng',
       'member_casual', 'start_geoid10']]

In [None]:
# Spatial join for end points
start_time = time.time()
end_points = gpd.GeoDataFrame(result, geometry=gpd.points_from_xy(result['end_lng'], result['end_lat']))
result = gpd.sjoin(end_points, census_tracts, op='within')
result = result.rename(columns={'geoid10': 'end_geoid10'})
print(f"Elapsed time: {time.time()-start_time: .2f} seconds")

In [None]:
edges = result[['start_geoid10', 'end_geoid10']]

**2. Construct network using GraphFrames**

In [1]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "graphframes:graphframes:0.8.1-spark3.0-s_2.12",
        "spark.jars.repositories": "https://repos.spark-packages.org/",
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

In [2]:
sc.install_pypi_package('graphframes')

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1685053987239_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting graphframes
  Downloading https://files.pythonhosted.org/packages/0b/27/c7c7e1ced2fe9a905f865dd91faaec2ac8a8e313f511678c8ec92a41a153/graphframes-0.6-py2.py3-none-any.whl
Installing collected packages: graphframes
Successfully installed graphframes-0.6

In [3]:
from graphframes import *

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
# read in the nodes, each node represents a start or end point
nodes = spark.read.csv('s3://a3q2aiwenbucket/jupyter/jovyan/nodes_with_income.csv', header = True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# read in the edges, each edge represents a bikeshare trip
edges = spark.read.csv('s3://a3q2aiwenbucket/jupyter/jovyan/edges.csv', header = True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
nodes.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+---------------+------------+
|_c0|census_tract_id|income_level|
+---+---------------+------------+
|  0|    17031842400|      median|
|  1|    17031840300|      median|
+---+---------------+------------+
only showing top 2 rows

In [20]:
# add one id column on the trip data
from pyspark.sql.functions import monotonically_increasing_id
edges = edges.withColumn("id", monotonically_increasing_id())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
edges.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+-----------+---+
|    _c0|      start|        end| id|
+-------+-----------+-----------+---+
|5277602|17031411000|17031410100|  0|
|5277603|17031411000|17031410100|  1|
+-------+-----------+-----------+---+
only showing top 2 rows

In [28]:
# rename column names
nodes = nodes.withColumnRenamed("census_tract_id", "id")
edges = edges.withColumnRenamed("start","src")\
             .withColumnRenamed("end","dst")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# construct network
g = GraphFrame(nodes, edges)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
# extract features
# Degree Centrality
degrees = g.degrees
# PageRank
pagerank = g.pageRank(resetProbability=0.15, tol=0.01)
# Clustering Coefficient
clustering = g.clusteringCoefficient()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [58]:
degrees.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------+
|         id|degree|
+-----------+------+
|17031610800|    55|
|17031283800|    22|
|17031750500|  1501|
+-----------+------+
only showing top 3 rows

In [49]:
## top 10 census tracts
pagerank.vertices.select("id", "pagerank").sort('pagerank', ascending=False).show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+------------------+
|         id|          pagerank|
+-----------+------------------+
|17031836200|44.450858953388995|
|17031020801|24.800190689572936|
|17031010702|17.631725753784174|
|17031251600|17.604504826576633|
|17031410700|15.567637593720875|
|17031420300|15.135000931894533|
|17031010100|14.926536029236091|
|17031020302|12.251609580114986|
|17031190601|11.587320606492844|
|17031020901|10.885711597881127|
+-----------+------------------+
only showing top 10 rows

calculate number of trips from high_income area of each census tract

In [50]:
from pyspark.sql.functions import col
trips_from_high_income = g.find("(a)-[e]->(b)").filter(col("a.income_level") == "high")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
trips_from_high_income.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+
|                   a|                   e|                   b|
+--------------------+--------------------+--------------------+
|[200, 17031839200...|[5278328, 1703183...|[404, 17031410100...|
|[200, 17031839200...|[5278329, 1703183...|[404, 17031410100...|
|[200, 17031839200...|[5278330, 1703183...|[404, 17031410100...|
|[200, 17031839200...|[5278331, 1703183...|[404, 17031410100...|
|[125, 17031842500...|[5278421, 1703184...|[404, 17031410100...|
+--------------------+--------------------+--------------------+
only showing top 5 rows

In [52]:
high_income_edges = trips_from_high_income.select(trips_from_high_income.a.id.alias("src"), trips_from_high_income.b.id.alias("dst"))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
high_income_edges.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-----------+
|        src|        dst|
+-----------+-----------+
|17031839200|17031410100|
|17031839200|17031410100|
|17031839200|17031410100|
|17031839200|17031410100|
|17031842500|17031410100|
+-----------+-----------+
only showing top 5 rows

In [56]:
## construct a sub network with only edges from high-income area
g_high_income = GraphFrame(nodes, high_income_edges)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [60]:
# generate degrees of each node within sub network
links_to_high_income = g_high_income.degrees

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [66]:
links_to_high_income = links_to_high_income.withColumnRenamed("degree", "links_to_high_income")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [72]:
links_to_high_income.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+
|         id|links_to_high_income|
+-----------+--------------------+
|17031460100|                1103|
|17031670400|                  48|
|17031510300|                1236|
|17031020902|                 397|
|17031530100|                 234|
+-----------+--------------------+
only showing top 5 rows

generate features of nodes

In [78]:
# Join the statistics together
node_statistics = degrees.join(pagerank.vertices, "id", "left") \
    .join(clustering, "id", "left")
    .join(links_to_high_income, "id", "left")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [81]:
node_statistics.repartition(1).write.format("csv").\
        option("header", "true").mode("overwrite").\
        save("s3://a3q2aiwenbucket/jupyter/jovyan/node_statistics.csv")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…