# Mapping Longitude and Latitude to GeoJSON polygon

## Reference:
https://stackoverflow.com/questions/20776205/point-in-polygon-with-geojson-in-python

### Read GeoJSON and find out which area a given point existed.

In [1]:
import json
from shapely.geometry import shape, Point
# depending on your version, use: from shapely.geometry import shape, Point

In [2]:
# load GeoJSON file containing sectors
with open('nyu-2451-36743-geojson.json') as f:
    js = json.load(f)

In [3]:
len(js['features'])

263

In [4]:
# 讀取全部 263 區域
# js['features']

In [5]:
js['features'][1]

{'type': 'Feature',
 'id': 'nyu_2451_36743.2',
 'geometry': {'type': 'MultiPolygon',
  'coordinates': [[[[-73.82337597260661, 40.63898704717672],
     [-73.82277105438692, 40.635576914085114],
     [-73.82265046764824, 40.63536884414307],
     [-73.82253791037438, 40.635158179711176],
     [-73.82243347854202, 40.634945100565325],
     [-73.8223372611914, 40.63472978853997],
     [-73.82224934035098, 40.634512427373444],
     [-73.8221697909667, 40.63429320255107],
     [-73.82209868083862, 40.634072301146496],
     [-73.82203607056275, 40.63384991166225],
     [-73.82198201348179, 40.633626223869],
     [-73.82193655563644, 40.63340142864299],
     [-73.82189973572993, 40.63317571780406],
     [-73.82187158509224, 40.6329492839511],
     [-73.82185212765629, 40.63272232029804],
     [-73.82184137993583, 40.632495020508884],
     [-73.82183935101226, 40.63226757853258],
     [-73.8218460425281, 40.632040188437315],
     [-73.8210750553332, 40.62973377456623],
     [-73.82081417085445, 

In [6]:
# construct point based on lon/lat returned by geocoder
# 隨便給一個點的經緯度
point = Point(-73.9919569999999, 40.721567)

In [7]:
# check each polygon to see if it contains the point
# 看看剛剛給的點是不是在某一個區域中

for feature in js['features']:
    polygon = shape(feature['geometry'])
    if polygon.contains(point):
        print('Found containing polygon:', feature['properties']['locationid'])

Found containing polygon: 148


### Reading file from HDFS

In [9]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName("Convert_Long_Lat_to_NYCTaxi_Zones").getOrCreate()

In [15]:
! hdfs dfs -ls /user/hadoop/NYCTaxi/cleaned_2015*

-r--r--r--   3 hadoop hadoop 2690588905 2018-10-19 14:49 /user/hadoop/NYCTaxi/cleaned_2015_Green.csv
-r--r--r--   3 hadoop hadoop 10277369495 2018-10-19 16:18 /user/hadoop/NYCTaxi/cleaned_2015_Yellow-01-06.csv
-r--r--r--   3 hadoop hadoop  1544941392 2018-10-19 16:19 /user/hadoop/NYCTaxi/cleaned_2015_Yellow-07.csv
-r--r--r--   3 hadoop hadoop  1487253966 2018-10-19 16:19 /user/hadoop/NYCTaxi/cleaned_2015_Yellow-08.csv
-r--r--r--   3 hadoop hadoop  1500404945 2018-10-19 16:20 /user/hadoop/NYCTaxi/cleaned_2015_Yellow-09.csv
-r--r--r--   3 hadoop hadoop  1646011669 2018-10-19 16:20 /user/hadoop/NYCTaxi/cleaned_2015_Yellow-10.csv
-r--r--r--   3 hadoop hadoop  1511311354 2018-10-19 16:21 /user/hadoop/NYCTaxi/cleaned_2015_Yellow-11.csv
-r--r--r--   3 hadoop hadoop  1530943746 2018-10-19 16:21 /user/hadoop/NYCTaxi/cleaned_2015_Yellow-12.csv


In [16]:
df_load = sparkSession.read.csv('/user/hadoop/NYCTaxi/cleaned_2015_Yellow-08.csv', header = 'true')

In [17]:
df_load.show(n=5)

+-------------------+-------------------+-------------+-------------------+------------------+---------+-------------------+------------------+--------+-----+-------+
|            PU_Time|            DO_Time|Trip_Distance|             PU_Lon|            PU_Lat|Rate_Code|             DO_Lon|            DO_Lat|Fare_Amt|Extra|Tip_Amt|
+-------------------+-------------------+-------------+-------------------+------------------+---------+-------------------+------------------+--------+-----+-------+
|2015-08-01 00:00:15|2015-08-01 00:36:21|         7.22|-73.999809265136719|40.743339538574219|        1|-73.942848205566406|40.806621551513672|    29.5|  0.5|      0|
|2015-08-01 00:00:16|2015-08-01 00:14:52|         2.30|-73.977043151855469|    40.77490234375|        1|-73.978256225585937|40.749862670898438|      12|  0.5|   2.93|
|2015-08-01 00:00:16|2015-08-01 00:06:30|         1.50|-73.959121704101563|40.775127410888672|        1|-73.980392456054688|40.782314300537109|       7|  0.5|   1.65

In [19]:
type(df_load)

pyspark.sql.dataframe.DataFrame

In [22]:
df_load.count()

11130304

In [23]:
len(df_load.columns)

11

In [20]:
df_load.printSchema()

root
 |-- PU_Time: string (nullable = true)
 |-- DO_Time: string (nullable = true)
 |-- Trip_Distance: string (nullable = true)
 |-- PU_Lon: string (nullable = true)
 |-- PU_Lat: string (nullable = true)
 |-- Rate_Code: string (nullable = true)
 |-- DO_Lon: string (nullable = true)
 |-- DO_Lat: string (nullable = true)
 |-- Fare_Amt: string (nullable = true)
 |-- Extra: string (nullable = true)
 |-- Tip_Amt: string (nullable = true)



In [32]:
puPoint = df_load.select('PU_Lon','PU_Lat')

In [33]:
puPoint.show(5)

+-------------------+------------------+
|             PU_Lon|            PU_Lat|
+-------------------+------------------+
|-73.999809265136719|40.743339538574219|
|-73.977043151855469|    40.77490234375|
|-73.959121704101563|40.775127410888672|
| -73.97662353515625|40.780746459960937|
|-73.978591918945313|40.785919189453125|
+-------------------+------------------+
only showing top 5 rows



In [34]:
type(puPoint)

pyspark.sql.dataframe.DataFrame

### Combine the two above:

In [35]:
def rowByRow(row):
    return (row.PU_Lon, row.PU_Lat)

In [36]:
puPointRow = puPoint.rdd.map(rowByRow)

In [37]:
type(puPointRow)

pyspark.rdd.PipelinedRDD

In [41]:
puPointRow.show(5)

AttributeError: 'PipelinedRDD' object has no attribute 'show'