<div style="font-size:18pt; padding-top:20px; text-align:center"><b>Bikeshare and </b> <span style="font-weight:bold; color:green">Spark DataFrames</span></div><hr>
<div style="text-align:right;">Sergei Papulin <span style="font-style: italic;font-weight: bold;">(papulin_bmstu@mail.ru)</span></div>

<a name="0"></a>
<div><span style="font-size:14pt; font-weight:bold">Contents</span>
    <ol>
        <li><a href="#1">Bike Trips</a></li>
        <li><a href="#2">NYC Zones</a></li>
        <li><a href="#3">Number of Stations per Zone</a></li>
        <li><a href="#4">Calculating out-degrees</a></li>
        <li><a href="#5">References</a></li>
    </ol>
</div>

Install the `geopandas` python library:

`pip install geopandas --user`

In [None]:
import json
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point, Polygon, MultiPolygon
from geopandas.tools import sjoin

Install the `Folium` python library to plot maps:

`pip install folium --user`

In [None]:
import folium
from folium.plugins import HeatMap, HeatMapWithTime

In [None]:
# https://github.com/python-visualization/folium/issues/812
def embed_map(m):
    from IPython.display import IFrame

    m.save('index.html')
    return IFrame('index.html', width='100%', height='750px')

[OPTIONAL] **Environment Setup**

In [None]:
import os
import sys

os.environ["SPARK_HOME"]="/usr/lib/spark"
os.environ["PYSPARK_PYTHON"]="/opt/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/opt/anaconda3/bin/python"

spark_home = os.environ.get("SPARK_HOME")
sys.path.insert(0, os.path.join(spark_home, "python"))
sys.path.insert(0, os.path.join(spark_home, "python/lib/py4j-0.10.7-src.zip"))

Run Spark Context

In [None]:
import pyspark
from pyspark.sql import SparkSession, Row

In [None]:
packages = "graphframes:graphframes:0.6.0-spark2.3-s_2.11"

In [None]:
conf = pyspark.SparkConf() \
        .set("spark.executor.memory", "1g") \
        .set("spark.executor.core", "2") \
        .set("spark.jars.packages", packages)\
        .setAppName("bikeGraphApp") \
        .setMaster("local[4]")

In [None]:
spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .getOrCreate()

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [None]:
import graphframes as gf

<a name="1"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">1. Bike Trips</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">To contents</a></div>
    </div>
</div>

Trips (bikeshare): <a href="https://s3.amazonaws.com/tripdata/201902-citibike-tripdata.csv.zip">data</a> | <a href="https://www.citibikenyc.com/system-data">description</a><br>

In [None]:
trips_data_path = "file:///YOUR_PATH/data/bikes/201902-citibike-tripdata.csv"

[OPTIONAL] Copy the local dataset to HDFS

In [None]:
df_trips = spark.read.load(trips_data_path, 
                           format="csv", 
                           header="true", 
                           inferSchema="true",
                           sep=",")

print("Total number of trips:", df_trips.count())
df_trips.show(5)

In [None]:
df_stations_start = df_trips.select(F.col("start station id").alias("id"), 
                              F.col("start station latitude").alias("lat"), 
                              F.col("start station longitude").alias("lng"))\
                            .distinct()

df_stations_end = df_trips.select(F.col("end station id").alias("id"), 
                              F.col("end station latitude").alias("lat"), 
                              F.col("end station longitude").alias("lng"))\
                            .distinct()

df_stations = df_stations_start.union(df_stations_end).distinct()

print("Number of stations:", df_stations.count())
print("Number of partitions:", df_stations.rdd.getNumPartitions())
print("Mean of partition size:", df_stations.rdd.mapPartitions(lambda x: [sum(1 for _ in x)]).mean())

df_stations.show(5)

Reduce the number of partitions:

In [None]:
df_stations = df_stations.coalesce(4).persist()
df_stations.rdd.getNumPartitions()

In [None]:
df_stations.show(5)

Plot the stations on map

In [None]:
m = folium.Map()
for index, row in df_stations.toPandas().iterrows():
    folium.CircleMarker(location=(row["lat"], row["lng"]),
                        weight=1,
                        radius= 5,
                        color="seagreen",
                        fill_color="seagreen",
                        fill_opacity=0.5,
                        fill=True).add_to(m)
m.fit_bounds(m.get_bounds())
embed_map(m)

<a name="2"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">2. NYC Zones</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">To contents</a></div>
    </div>
</div>

NYC zones: <a href="https://data.cityofnewyork.us/api/geospatial/d3c5-ddgc?method=export&format=GeoJSON">data</a>

In [None]:
borough_data_path = "/YOUR_PATH/data/bikes/NYC Taxi Zones.geojson"

Plot the NYC zones

In [None]:
style_function = lambda x: {
    "color" : "orange",
    "weight": 1
}

folium.GeoJson(borough_data_path, name="geojson", style_function=style_function).add_to(m)
m.fit_bounds(m.get_bounds())
embed_map(m)

Convert the `zones` GeoJson to GeoDataFrame:

In [None]:
with open(borough_data_path) as f:
    zones_geojson = json.load(f)

In [None]:
column_name_list = [key for key, value in zones_geojson["features"][0]["properties"].items()]
column_name_list += ["geometry"]
column_name_list

In [None]:
def get_pandas_rows(features):
    for item in features:
        row = list()
        for key, value in item["properties"].items():
            row.append(value)        
        polygons = list()
        for polygon in item["geometry"]["coordinates"]:
            polygons.append(Polygon(polygon[0]))
        row.append(MultiPolygon(polygons=polygons))
        yield row

DataFrame:

In [None]:
df_zones_pn = pd.DataFrame(get_pandas_rows(zones_geojson["features"]), columns=column_name_list)
df_zones_pn.head(5)

GeoDataFrame:

In [None]:
gdf_zones = gpd.GeoDataFrame(df_zones_pn, geometry=df_zones_pn["geometry"])
gdf_zones.head(5)

<a name="3"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">3. Number of Stations per Zone</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">To contents</a></div>
    </div>
</div>

#### Approach 0 

Just out of curiosity, here is pandas alternative. This approach is reasonable if there is a modest dataset like in the task considered:

In [None]:
# %%timeit -n1
df_stations_pn = df_stations.toPandas()
points = gpd.GeoDataFrame(df_stations_pn,
                          geometry=gpd.points_from_xy(df_stations_pn["lng"], 
                                                      df_stations_pn["lat"]))
sjoin(points[["geometry"]], gdf_zones, how="left")\
        .groupby(["location_id", "zone"])["location_id"]\
        .count()\
        .reset_index(name="count")\
        .head(5)

#### Approach 1

In [None]:
bc_zones = spark.sparkContext.broadcast(gdf_zones)
bc_zones

In [None]:
def zone_contains_v1(row):
    point = Point((row["lng"], row["lat"]))
    for index, item in bc_zones.value.iterrows():
        if item["geometry"].contains(point):
            return (item["location_id"], item["zone"])

In [None]:
# %%timeit -n1
df_stations.rdd.map(zone_contains_v1).countByValue()

#### Approach 2

In [None]:
def zone_contains_v2(rows):
    points = list()
    for row in rows: 
        points.append([row["lng"], row["lat"]])
    if len(points) == 0:
        return list()
    df_points_pn = pd.DataFrame(points, columns=["lng", "lat"])
    gdf_points = gpd.GeoDataFrame(df_points_pn, 
                                  geometry=gpd.points_from_xy(df_points_pn["lng"],
                                                              df_points_pn["lat"]))
    for index, item in sjoin(gdf_points[["geometry"]], bc_zones.value, how="left").iterrows():
        yield (item["location_id"], item["zone"])

In [None]:
# %%timeit -n1
df_stations.rdd.mapPartitions(zone_contains_v2).countByValue()

#### Convert to Dataframe

In [None]:
def zone_contains_v2_df(rows):
    points = list()
    for row in rows: 
        points.append([row["lng"], row["lat"]])
    if len(points) == 0:
        return list()
    df_points_pn = pd.DataFrame(points, columns=["lng", "lat"])
    gdf_points = gpd.GeoDataFrame(df_points_pn, 
                                  geometry=gpd.points_from_xy(df_points_pn["lng"],
                                                              df_points_pn["lat"]))
    for index, item in sjoin(gdf_points[["geometry"]], bc_zones.value, how="left").iterrows():
        yield Row(id=item["location_id"], zone=item["zone"])

In [None]:
#%%timeit -n1
df_stations_count = df_stations.rdd.mapPartitions(zone_contains_v2_df).toDF()\
                        .groupBy("id", "zone")\
                        .agg(F.count("id").alias("count"))
df_stations_count.show()

#### Plotting on Map

In [None]:
df_count_pn = df_stations_count.toPandas()

Just out of curiosity here is pandas alternative:


```python
df_count_pn = sjoin(points[["geometry"]], gdf_zones, how="left")\
    .groupby(["location_id", "zone"])["location_id"]\
    .count()\
    .reset_index(name="count")

df_count_pn.head(5)
```

In [None]:
m = folium.Map()

folium.Choropleth(
    geo_data=zones_geojson,
    data=df_count_pn,
    columns=["id", "count"],
    name="Number of stations",
    legend_name="Number of stations",
    key_on="feature.properties.location_id",
    highlight=True,
    nan_fill_color="grey",
    nan_fill_opacity=0.1,
    fill_color="YlOrRd",
    fill_opacity=0.7,
    line_opacity=0.2,
).add_to(m)
m.fit_bounds(m.get_bounds())
embed_map(m)

<a name="4"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">4. Calculating out-degrees</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">To contents</a></div>
    </div>
</div>

The total number of trips that start on a given station:

In [None]:
df_trips_start_counts = df_trips.select(F.col("start station id").alias("id"),
                                        F.col("start station latitude").alias("lat"), 
                                        F.col("start station longitude").alias("lng"))\
                                .groupBy("id")\
                                .agg(F.first("lat").alias("lat"), F.first("lng").alias("lng"), 
                                     F.count("id").alias("count"))
df_trips_start_counts.show()

Basic stats for the `count` column:

In [None]:
df_trips_start_counts.describe("count").show()

Median:

In [None]:
median = df_trips_start_counts.approxQuantile("count", [0.5], 0)[0]
median

Extract the `max` value:

In [None]:
max_start_count = df_trips_start_counts.select(F.max("count").alias("max")).rdd.collect()[0]["max"]
max_start_count

Convert the `df_trips_start_counts` dataframe to matrix:

In [None]:
trips_matrix = df_trips_start_counts.toPandas()[["lat", "lng", "count"]].values

Plot `HeatMap` for counts:

In [None]:
m = folium.Map()
HeatMap(trips_matrix, radius=15, max_val=max_start_count).add_to(m)
m.fit_bounds(m.get_bounds())
embed_map(m)

<a name="5"></a>
<div style="display:table; width:100%; padding-top:10px; padding-bottom:10px; border-bottom:1px solid lightgrey">
    <div style="display:table-row">
        <div style="display:table-cell; width:80%; font-size:14pt; font-weight:bold">5. References</div>
    	<div style="display:table-cell; width:20%; text-align:center; background-color:whitesmoke; border:1px solid lightgrey"><a href="#0">To contents</a></div>
    </div>
</div>

[GeoPandas](http://geopandas.org)

[Shapely](https://github.com/Toblerity/Shapely)

[Folium](https://github.com/python-visualization/folium)