In [1]:
%matplotlib inline

import matplotlib.pyplot as plt
from IPython.display import display, HTML
from pyspark import StorageLevel
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
import geopandas as gpd
from sedona.utils.adapter import Adapter
from sedona.core.formatMapper.shapefileParser import ShapefileReader

In [2]:
spark = SparkSession.\
    builder.\
    master("local[*]").\
    appName("Sedona App").\
    config("spark.serializer", KryoSerializer.getName).\
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName).\
    config("spark.jars.packages", "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.0-incubating,org.datasyslab:geotools-wrapper:1.1.0-25.2").\
    getOrCreate()

22/09/12 14:49:24 WARN Utils: Your hostname, EN4102944L resolves to a loopback address: 127.0.1.1; using 10.218.105.94 instead (on interface eno1)
22/09/12 14:49:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/local/ASUAD/kchowdh1/.ivy2/cache
The jars for the packages stored in: /home/local/ASUAD/kchowdh1/.ivy2/jars
:: loading settings :: url = jar:file:/media/kchowdh1/fcf87b53-9c62-4c1d-9df2-cb2b83598bea/kanchan/program_files/spark-3.0.3-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.sedona#sedona-python-adapter-3.0_2.12 added as a dependency
org.datasyslab#geotools-wrapper added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b80ad89e-d614-440c-a6cb-68033116cf9b;1.0
	confs: [default]
	found org.apache.sedona#sedona-python-adapter-3.0_2.12;1.2.0-incubating in central
	found org.locationtech.jts#jts-core;1.18.0 in central
	found org.wololo#jts2geojson;0.

In [3]:
SedonaRegistrator.registerAll(spark)

                                                                                

True

In [4]:
sc = spark.sparkContext
sc.setSystemProperty("sedona.global.charset", "utf8")

## Load Taxi Trips Shape File

In [5]:
taxi_zone_rdd = ShapefileReader.readToGeometryRDD(sc, "data/taxi_trip/taxi_zones")

In [6]:
taxi_zone_df = Adapter.toDf(taxi_zone_rdd, spark)

In [7]:
taxi_zone_df.printSchema()

root
 |-- geometry: geometry (nullable = true)
 |-- OBJECTID: string (nullable = true)
 |-- Shape_Leng: string (nullable = true)
 |-- Shape_Area: string (nullable = true)
 |-- zone: string (nullable = true)
 |-- LocationID: string (nullable = true)
 |-- borough: string (nullable = true)



In [8]:
taxi_zone_df.show(5)

+--------------------+---------+-------------------+-------------------+--------------------+----------+--------------------+
|            geometry| OBJECTID|         Shape_Leng|         Shape_Area|                zone|LocationID|             borough|
+--------------------+---------+-------------------+-------------------+--------------------+----------+--------------------+
|POLYGON ((933100....|        1| 1.16357453189e-001| 7.82306788500e-004|Newark Airport   ...|         1|EWR              ...|
|MULTIPOLYGON (((1...|        2| 4.33469666790e-001| 4.86634037837e-003|Jamaica Bay      ...|         2|Queens           ...|
|POLYGON ((1026308...|        3| 8.43411059012e-002| 3.14414156821e-004|Allerton/Pelham G...|         3|Bronx            ...|
|POLYGON ((992073....|        4| 4.35665270921e-002| 1.11871946192e-004|Alphabet City    ...|         4|Manhattan        ...|
|POLYGON ((935843....|        5| 9.21464898574e-002| 4.97957489363e-004|Arden Heights    ...|         5|Staten Island 

In [9]:
taxi_zone_df = taxi_zone_df.drop("Shape_Leng")
taxi_zone_df = taxi_zone_df.drop("Shape_Area")
taxi_zone_df = taxi_zone_df.drop("zone")
taxi_zone_df = taxi_zone_df.drop("LocationID")
taxi_zone_df = taxi_zone_df.drop("borough")

In [10]:
taxi_zone_df.printSchema()

root
 |-- geometry: geometry (nullable = true)
 |-- OBJECTID: string (nullable = true)



In [11]:
taxi_zone_df.show()

+--------------------+---------+
|            geometry| OBJECTID|
+--------------------+---------+
|POLYGON ((933100....|        1|
|MULTIPOLYGON (((1...|        2|
|POLYGON ((1026308...|        3|
|POLYGON ((992073....|        4|
|POLYGON ((935843....|        5|
|POLYGON ((966568....|        6|
|POLYGON ((1010804...|        7|
|POLYGON ((1005482...|        8|
|POLYGON ((1043803...|        9|
|POLYGON ((1044355...|       10|
|POLYGON ((983945....|       11|
|POLYGON ((979908....|       12|
|POLYGON ((980801....|       13|
|POLYGON ((974794....|       14|
|POLYGON ((1045882...|       15|
|POLYGON ((1048344...|       16|
|POLYGON ((1000036...|       17|
|POLYGON ((1016019...|       18|
|POLYGON ((1060888...|       19|
|POLYGON ((1016371...|       20|
+--------------------+---------+
only showing top 20 rows



## Load CSV File

In [12]:
tripDf = spark.read.format("csv").option("delimiter",",").option("header","true").load("data/taxi_trip/yellow_tripdata_2009-01.csv")
tripDf.createOrReplaceTempView("trip_df")
tripDf.show(5)

+-----------+--------------------+---------------------+---------------+-------------------+-------------------+------------------+---------+-----------------+-------------------+------------------+------------+------------------+---------+-------+------------------+---------+------------------+
|vendor_name|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Passenger_Count|      Trip_Distance|          Start_Lon|         Start_Lat|Rate_Code|store_and_forward|            End_Lon|           End_Lat|Payment_Type|          Fare_Amt|surcharge|mta_tax|           Tip_Amt|Tolls_Amt|         Total_Amt|
+-----------+--------------------+---------------------+---------------+-------------------+-------------------+------------------+---------+-----------------+-------------------+------------------+------------+------------------+---------+-------+------------------+---------+------------------+
|        VTS| 2009-01-04 02:52:00|  2009-01-04 03:02:00|              1| 2.6299999999999999|-73.9919569999999

In [13]:
#tripDf = tripDf.select("Trip_Pickup_DateTime", "Start_Lon", "Start_Lat", "Passenger_Count", "Trip_Distance", "Fare_Amt")
tripDf = spark.sql("select Start_Lon, Start_Lat from trip_df")
tripDf.createOrReplaceTempView("trip_df")
tripDf.show(5)

+-------------------+------------------+
|          Start_Lon|         Start_Lat|
+-------------------+------------------+
|-73.991956999999999|         40.721567|
|-73.982101999999998|40.736289999999997|
|-74.002587000000005|40.739747999999999|
|-73.974266999999998|40.790954999999997|
|-74.001580000000004|40.719382000000003|
+-------------------+------------------+
only showing top 5 rows



## Convert Latitude and Longitude to Point Geometry

In [14]:
tripDf = spark.sql("select ST_Point(double(trip_df.Start_Lat), double(trip_df.Start_Lon)) as point_loc from trip_df")
tripDf.createOrReplaceTempView("trip_df")
tripDf.show(5)

+--------------------+
|           point_loc|
+--------------------+
|POINT (40.721567 ...|
|POINT (40.73629 -...|
|POINT (40.739748 ...|
|POINT (40.790955 ...|
|POINT (40.719382 ...|
+--------------------+
only showing top 5 rows



## Find Distance Between Point and Polygon

In [15]:
tripDf = spark.sql("select * from trip_df limit 1")
tripDf.createOrReplaceTempView("trip_df")
tripDf.show(5)

+--------------------+
|           point_loc|
+--------------------+
|POINT (40.721567 ...|
+--------------------+



In [16]:
taxi_zone_df.createOrReplaceTempView("taxi_zone_df")

In [17]:
tripDf = spark.sql("select taxi_zone_df.geometry, trip_df.point_loc, ST_Distance(taxi_zone_df.geometry, trip_df.point_loc) as distance from taxi_zone_df, trip_df")
tripDf.createOrReplaceTempView("trip_df")
tripDf.show(5)

+--------------------+--------------------+------------------+
|            geometry|           point_loc|          distance|
+--------------------+--------------------+------------------+
|POLYGON ((933100....|POINT (40.721567 ...| 950259.1074797178|
|MULTIPOLYGON (((1...|POINT (40.721567 ...|1031073.0581317813|
|POLYGON ((1026308...|POINT (40.721567 ...|1053750.8730177905|
|POLYGON ((992073....|POINT (40.721567 ...|1009158.1687323012|
|POLYGON ((935843....|POINT (40.721567 ...| 938507.1175333005|
+--------------------+--------------------+------------------+
only showing top 5 rows



In [18]:
tripDf = spark.sql("select geometry, point_loc, distance, ST_Centroid(geometry) as centroid from trip_df")
tripDf.createOrReplaceTempView("trip_df")
tripDf.show(5)

+--------------------+--------------------+------------------+--------------------+
|            geometry|           point_loc|          distance|            centroid|
+--------------------+--------------------+------------------+--------------------+
|POLYGON ((933100....|POINT (40.721567 ...| 950259.1074797178|POINT (935996.821...|
|MULTIPOLYGON (((1...|POINT (40.721567 ...|1031073.0581317813|POINT (1031085.71...|
|POLYGON ((1026308...|POINT (40.721567 ...|1053750.8730177905|POINT (1026452.61...|
|POLYGON ((992073....|POINT (40.721567 ...|1009158.1687323012|POINT (990633.980...|
|POLYGON ((935843....|POINT (40.721567 ...| 938507.1175333005|POINT (931871.370...|
+--------------------+--------------------+------------------+--------------------+
only showing top 5 rows



In [19]:
tripDf = spark.sql("select geometry, point_loc, centroid, distance, ST_Distance(point_loc, centroid) as centroid_distance from trip_df")
tripDf.createOrReplaceTempView("trip_df")
tripDf.show(5)

+--------------------+--------------------+--------------------+------------------+------------------+
|            geometry|           point_loc|            centroid|          distance| centroid_distance|
+--------------------+--------------------+--------------------+------------------+------------------+
|POLYGON ((933100....|POINT (40.721567 ...|POINT (935996.821...| 950259.1074797178| 955336.1746068312|
|MULTIPOLYGON (((1...|POINT (40.721567 ...|POINT (1031085.71...|1031073.0581317813|1044021.1757055434|
|POLYGON ((1026308...|POINT (40.721567 ...|POINT (1026452.61...|1053750.8730177905|1057454.4648067043|
|POLYGON ((992073....|POINT (40.721567 ...|POINT (990633.980...|1009158.1687323012|1011186.2926528396|
|POLYGON ((935843....|POINT (40.721567 ...|POINT (931871.370...| 938507.1175333005| 942401.4134977745|
+--------------------+--------------------+--------------------+------------------+------------------+
only showing top 5 rows

