# Geospatial Big Data dengan Apache Spark - Vektor Data

In [1]:
#####################################################
############### Inisialisasi Apcahe Spark ###########
#####################################################
import findspark
findspark.init('/usr/local/spark')
from pyspark.sql import SparkSession

In [2]:
# Inisialisasi pyspark
import pyspark

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import os.path, json, io
import matplotlib.pyplot as plt
import matplotlib
matplotlib.style.use('ggplot')
matplotlib.rcParams['figure.figsize'] = (16, 20)


import pyspark.sql.functions as func
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType,DecimalType
from pyspark.sql import SparkSession

import pandas as pd
from geopandas import GeoDataFrame
from shapely.geometry import Point, Polygon, shape 
from shapely import wkb, wkt 
from ast import literal_eval as make_tuple 

In [4]:
# Inisialisasi SparkSession
spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Spatial Big Data") \
    .config("spark.ui.enable", "true") \
    .getOrCreate()

sqlContext = SQLContext(spark)

In [5]:
json_df = spark.read.option("multiline", "true").json("hdfs://hadoop-master:9000/tmp/NYC_Taxi_Zones.geojson")

In [6]:
json_df.printSchema()

root
 |-- features: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- geometry: struct (nullable = true)
 |    |    |    |-- coordinates: array (nullable = true)
 |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |    |    |-- element: double (containsNull = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- properties: struct (nullable = true)
 |    |    |    |-- borough: string (nullable = true)
 |    |    |    |-- location_id: string (nullable = true)
 |    |    |    |-- objectid: string (nullable = true)
 |    |    |    |-- shape_area: string (nullable = true)
 |    |    |    |-- shape_leng: string (nullable = true)
 |    |    |    |-- zone: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- type: string (nullable = true)



In [7]:
json_df.createOrReplaceTempView("EWR")

In [8]:
EWR = sqlContext.sql("SELECT features.properties.borough, features.geometry.coordinates FROM EWR")

In [9]:
EWR.show()

+--------------------+--------------------+
|             borough|         coordinates|
+--------------------+--------------------+
|[EWR, Queens, Bro...|[[[[[-74.18445299...|
+--------------------+--------------------+



In [10]:
from pyspark.sql import functions as F

json_explode_df = ( json_df.select(
 "features",
 "type",
 F.explode(F.col("features.properties")).alias("properties")
).select("*",F.explode(F.col("features.geometry")).alias("geometry")).drop("features"))

display(json_explode_df)

DataFrame[type: string, properties: struct<borough:string,location_id:string,objectid:string,shape_area:string,shape_leng:string,zone:string>, geometry: struct<coordinates:array<array<array<array<double>>>>,type:string>]

In [11]:
json_explode_df.printSchema()

root
 |-- type: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- borough: string (nullable = true)
 |    |-- location_id: string (nullable = true)
 |    |-- objectid: string (nullable = true)
 |    |-- shape_area: string (nullable = true)
 |    |-- shape_leng: string (nullable = true)
 |    |-- zone: string (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)



In [12]:
json_explode_df.show()

+-----------------+--------------------+--------------------+
|             type|          properties|            geometry|
+-----------------+--------------------+--------------------+
|FeatureCollection|[EWR, 1, 1, 0.000...|[[[[[-74.18445299...|
|FeatureCollection|[EWR, 1, 1, 0.000...|[[[[[-73.82337597...|
|FeatureCollection|[EWR, 1, 1, 0.000...|[[[[[-73.84792614...|
|FeatureCollection|[EWR, 1, 1, 0.000...|[[[[[-73.97177410...|
|FeatureCollection|[EWR, 1, 1, 0.000...|[[[[[-74.17421738...|
|FeatureCollection|[EWR, 1, 1, 0.000...|[[[[[-74.06367318...|
|FeatureCollection|[EWR, 1, 1, 0.000...|[[[[[-73.90413637...|
|FeatureCollection|[EWR, 1, 1, 0.000...|[[[[[-73.92334041...|
|FeatureCollection|[EWR, 1, 1, 0.000...|[[[[[-73.78502434...|
|FeatureCollection|[EWR, 1, 1, 0.000...|[[[[[-73.95953658...|
|FeatureCollection|[EWR, 1, 1, 0.000...|[[[[[-73.78326624...|
|FeatureCollection|[EWR, 1, 1, 0.000...|[[[[[-74.00109809...|
|FeatureCollection|[EWR, 1, 1, 0.000...|[[[[[-74.01565756...|
|Feature

In [13]:
json_explode_df.createOrReplaceTempView("Manhattan")

In [14]:
MHN = sqlContext.sql("SELECT * FROM Manhattan WHERE properties.borough == 'Manhattan'")

In [15]:
MHN.show()

+-----------------+--------------------+--------------------+
|             type|          properties|            geometry|
+-----------------+--------------------+--------------------+
|FeatureCollection|[Manhattan, 4, 4,...|[[[[[-74.18445299...|
|FeatureCollection|[Manhattan, 4, 4,...|[[[[[-73.82337597...|
|FeatureCollection|[Manhattan, 4, 4,...|[[[[[-73.84792614...|
|FeatureCollection|[Manhattan, 4, 4,...|[[[[[-73.97177410...|
|FeatureCollection|[Manhattan, 4, 4,...|[[[[[-74.17421738...|
|FeatureCollection|[Manhattan, 4, 4,...|[[[[[-74.06367318...|
|FeatureCollection|[Manhattan, 4, 4,...|[[[[[-73.90413637...|
|FeatureCollection|[Manhattan, 4, 4,...|[[[[[-73.92334041...|
|FeatureCollection|[Manhattan, 4, 4,...|[[[[[-73.78502434...|
|FeatureCollection|[Manhattan, 4, 4,...|[[[[[-73.95953658...|
|FeatureCollection|[Manhattan, 4, 4,...|[[[[[-73.78326624...|
|FeatureCollection|[Manhattan, 4, 4,...|[[[[[-74.00109809...|
|FeatureCollection|[Manhattan, 4, 4,...|[[[[[-74.01565756...|
|Feature

In [16]:
geo_manhattan = json_explode_df.filter(json_explode_df['properties.borough'] == 'Manhattan')

In [17]:
geo_manhattan.first()

Row(type='FeatureCollection', properties=Row(borough='Manhattan', location_id='4', objectid='4', shape_area='0.000111871946192', shape_leng='0.0435665270921', zone='Alphabet City'), geometry=Row(coordinates=[[[[-74.18445299999996, 40.694995999999904], [-74.18448899999999, 40.69509499999987], [-74.18449799999996, 40.69518499999987], [-74.18438099999997, 40.69587799999989], [-74.18428199999994, 40.6962109999999], [-74.18402099999997, 40.697074999999884], [-74.18391299999996, 40.69750699999986], [-74.18375099999997, 40.69779499999988], [-74.18363399999998, 40.6983259999999], [-74.18356199999994, 40.698451999999875], [-74.18354399999998, 40.69855999999988], [-74.18350799999996, 40.69870399999992], [-74.18327399999998, 40.70008999999988], [-74.18315699999994, 40.701214999999884], [-74.18316599999997, 40.702384999999886], [-74.18313899999998, 40.7026279999999], [-74.18309399999998, 40.7028529999999], [-74.18299499999995, 40.70315899999985], [-74.18284199999994, 40.70346499999989], [-74.18264

In [18]:
import folium
import json

with open ("NYC_Taxi_Zones.geojson", "r") as myfile:
 nyc_data=myfile.read()

m = folium.Map(
 location=[40.7128, -74.0060],
 tiles='Stamen Terrain',
 zoom_start=12 
)
folium.GeoJson(json.loads(nyc_data)).add_to(m)
m 

In [19]:
import pandas as pd
import pyarrow


url = 'https://raw.githubusercontent.com/python-visualization/folium/master/examples/data'
state_geo = f'{url}/us-states.json'
state_unemployment = f'{url}/US_Unemployment_Oct2012.csv'
state_data = pd.read_csv(state_unemployment)

m = folium.Map(location=[48, -102], zoom_start=3)

folium.Choropleth(
    geo_data=state_geo,
    name='choropleth',
    data=state_data,
    columns=['State', 'Unemployment'],
    key_on='feature.id',
    fill_color='YlGn',
    fill_opacity=0.7,
    line_opacity=0.2,
    legend_name='Unemployment Rate (%)'
).add_to(m)

folium.LayerControl().add_to(m)

m

In [20]:
data_df = spark.read.format('com.databricks.spark.csv')\
    .options(header='true', inferschema='true') \
    .load('hdfs://hadoop-master:9000/tmp/nyc_taxi_trip.csv', header=True)

In [21]:
data_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)



In [22]:
data_df.show()

+---------+---------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+
|       id|vendor_id|    pickup_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|
+---------+---------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+
|id3004672|        1|2016-06-30 23:59:58|              1|-73.98812866210938| 40.73202896118164|-73.99017333984375| 40.75667953491211|                 N|
|id3505355|        1|2016-06-30 23:59:53|              1|-73.96420288085938| 40.67999267578125|-73.95980834960938| 40.65540313720703|                 N|
|id1217141|        1|2016-06-30 23:59:47|              1| -73.9974365234375| 40.73758316040039|-73.98616027832031|40.729522705078125|                 N|
|id2150126|        2|2016-06-30 23:59:41|              1|-73.95606994628906| 40.77

In [22]:
data_df.columns

['id',
 'vendor_id',
 'pickup_datetime',
 'passenger_count',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'store_and_fwd_flag']

In [25]:
data_df.createOrReplaceTempView("csv_data")

In [28]:
csv_df = sqlContext.sql("SELECT * FROM csv_data WHERE dropoff_longitude between -73.869 and -73.847")

In [30]:
csv_df.show()

+---------+---------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+
|       id|vendor_id|    pickup_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|
+---------+---------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+
|id1123000|        1|2016-06-30 22:06:06|              1|-73.96009063720703| 40.77066421508789|-73.86531066894531| 40.72541809082031|                 N|
|id2318002|        2|2016-06-30 21:47:01|              2|-73.97286987304688|40.761573791503906|-73.85396575927734| 40.72458267211914|                 N|
|id2035696|        1|2016-06-30 21:34:29|              1|-73.97600555419922| 40.76042938232422|-73.86721801757812| 40.76862335205078|                 N|
|id3498275|        1|2016-06-30 20:27:18|              1|-73.98937225341797| 40.75

In [23]:
data_pd_df = data_df.toPandas()

In [28]:
data_pd_df.head()

Unnamed: 0,id,vendor_id,pickup_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag
0,id3004672,1,2016-06-30 23:59:58,1,-73.988129,40.732029,-73.990173,40.75668,N
1,id3505355,1,2016-06-30 23:59:53,1,-73.964203,40.679993,-73.959808,40.655403,N
2,id1217141,1,2016-06-30 23:59:47,1,-73.997437,40.737583,-73.98616,40.729523,N
3,id2150126,2,2016-06-30 23:59:41,1,-73.95607,40.7719,-73.986427,40.730469,N
4,id1598245,1,2016-06-30 23:59:33,1,-73.970215,40.761475,-73.96151,40.75589,N
