# Summarize NYC Taxi Trips into Hexagonal Bins
This notebook demonstrates how to use BDT's ST functions to summarize approximately 
1.5 million taxi trips taken in New York City into hexagonal bins.

You can download the taxi data from Kaggle [here](https://www.kaggle.com/c/nyc-taxi-trip-duration/data). 
You will need a Kaggle account to access the data.

## Start Spark
The following code will start a BDT-enabled Spark context within this notebook. Instead of submitting jobs to a separate Spark cluster running somewhere else, Spark executes within Pro. This limits the resources used for the job to the resources of the machine Pro is running on but it is very convenient for prototyping and developing BDT workflows. 

In [1]:
from spark_esri import spark_start, spark_stop

spark_stop()

config = {
    "spark.driver.memory":"4G",
    "spark.kryoserializer.buffer.max":"2024",
    "spark.jars": "C:\\Users\\%USERNAME%\\bdt3\\bdt-3.0.0-3.2.0-2.12-merge-20220329.5.jar",
    "spark.submit.pyFiles": "C:\\Users\\%USERNAME%\\bdt3\\bdt-3.0.0+snapshot.merge.20220329.5-py3.9.egg"
}

spark = spark_start(config=config)

Some more bootstrapping to alias the BDT functions and to activate the BDT license. 

In [7]:
import os
import bdt
bdt.auth(os.path.expanduser(os.path.join("~", "bdt3", "carsten.lic")))
from bdt import functions as F
from bdt import processors as P
from bdt import sinks as S
from pyspark.sql.functions import rand, col, lit

## Summarize the data using Spark SQL and BDT's spatial functions

Load the taxi data into a data frame

In [8]:
import os
import numpy as np
import pandas as pd

load_path = os.path.expanduser(os.path.join("~","kaggle","nyc-taxi-trip-duration","train.csv"))

schema = ",".join([
    "id string",
    "vendor_id string",
    "pickup_datetime timestamp",
    "dropoff_datetime timestamp",
    "passenger_count integer",
    "pickup_longitude double",
    "pickup_latitude double",
    "dropoff_longitude double",
    "dropoff_latitude double",
    "store_and_fwd_flag string",
    "trip_duration integer"
])

df = spark\
    .read\
    .format("csv")\
    .option("header",True)\
    .option("parserLib", "univocity")\
    .option("mode", "permissive")\
    .schema(schema)\
    .load(load_path)\
    .drop("id","vendor_id","passenger_count","store_and_fwd_flag")\
    .selectExpr("*","hour(pickup_datetime) pickup_hour")\
    .cache()

df.createOrReplaceTempView("v")

Apply Spark SQL to the data frame. `ST_AsHex()` converts each taxi trip's pickup 
location into a hexagon with a size of 500 m, `ST_FromHex()` converts the hexagon 
into a polygonal shape, which is then converted to WKT format using the 'ST_AsText' function.
`collect()` retrieves the results from Spark. 

In [10]:
rows = spark.sql("""
    SELECT COUNT(1) AS CNT, ST_AsText(ST_FromHex(ST_AsHex(pickup_longitude, pickup_latitude, 500), 500)) AS SHAPE FROM v
    GROUP BY ST_AsHex(pickup_longitude, pickup_latitude, 500)
""").collect()

Add the hexagon bins as an in-memory feature class to the current map.

In [12]:
import arcpy

ws = "memory"
nm = "HexBins"

fc = os.path.join(ws,nm)

arcpy.management.Delete(fc)

sp_ref = arcpy.SpatialReference(3857)
arcpy.management.CreateFeatureclass(ws,nm,"POLYGON",spatial_reference=sp_ref)
arcpy.management.AddField(fc, "CNT", "LONG")

# Note shape is expected to be in WKT
with arcpy.da.InsertCursor(fc, ["CNT", "SHAPE@WKT"]) as cursor:
    for row in rows:
        cursor.insertRow(row)

![](media/taxi_hexagons.png)