# PySpark Apache Sedona example

GraphFrame tutorials: https://graphframes.github.io/graphframes/docs/_site/user-guide.html

## Important notes

Make sure you run the following commands in your terminal to set up environment variables before start `jupyter-notebook --no-browser`. If you didn't do it, please kill your notebook in the termnal, run the commands and restart your notebook.

The first two are Spark paths. The last one is Python3 path.

```
export SPARK_HOME=/home/ubuntu/spark-3.0.3-bin-hadoop3.2
export PYTHONPATH=/home/ubuntu/spark-3.0.3-bin-hadoop3.2/PYTHON
export PYSPARK_PYTHON=/usr/bin/python3
```

**Your machine might have different Spark paths depending on your OS username and Spark location. Do NOT just copy/paste it blindly.**

**Your machine might have a different Python 3 path. You can find your Python3 path using `whereis python3` in the terminal. Usually the first one is the one you want.**

## Create Spark Session

In [1]:
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
import geopandas as gpd

spark = SparkSession. \
    builder. \
    appName('Python Spark Apache Sedona example'). \
    master('local[*]'). \
    config("spark.serializer", KryoSerializer.getName). \
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.1-incubating,'
           'org.datasyslab:geotools-wrapper:geotools-24.1'). \
    getOrCreate()

SedonaRegistrator.registerAll(spark)

21/09/19 14:12:41 WARN Utils: Your hostname, JIAYU1AB6 resolves to a loopback address: 127.0.1.1; using 192.168.1.12 instead (on interface eth0)
21/09/19 14:12:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
:: loading settings :: url = jar:file:/home/ubuntu/spark-3.0.3-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.sedona#sedona-python-adapter-3.0_2.12 added as a dependency
org.datasyslab#geotools-wrapper added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a35c5f88-76dd-4113-8091-8552a90f45fe;1.0
	confs: [default]
	found org.apache.sedona#sedona-python-adapter-3.0_2.12;1.0.1-incubating in central
	found org.locationtech.jts#jts-core;1.18.0 in central
	found org.wololo#jts2geojson;0.16.1 in central
	found com.fasterxml.jackson.core#jackson-databind;2.12.2 in central
	found 

True

## Cluster mode and pseudo mode

local[*] means pseudo mode with all available CPU cores.

You can use spark://IP-address , the URL you find from Spark web ui
to enable cluster mode, such as spark://JIAYU1AB6.localdomain:7077

The notebook under the cluster mode will distribute the computation tasks to your Spark cluster.

Make sure you shutdown and restart this notebook when switch mode

## Load state boundries in WKT TSV and convert WKT string column to a geometry column

Data source:

Ubuntu
`wget https://raw.githubusercontent.com/DataOceanLab/CPTS-415-Project-Examples/main/boundary-each-state.tsv`

MacOS
`curl -O https://raw.githubusercontent.com/DataOceanLab/CPTS-415-Project-Examples/main/boundary-each-state.tsv`

Pay attention to the longitude/latitude order in each record.

### More constructor examples: https://sedona.apache.org/api/sql/Constructor/

In [2]:
states_wkt = spark.read.option("delimiter", "\t").option("header", "false").csv("boundary-each-state.tsv").toDF("s_name","s_bound")
states_wkt.show()
states_wkt.printSchema()

states = states_wkt.selectExpr("s_name", "ST_GeomFromWKT(s_bound) as s_bound")
states.show()
states.printSchema()
states.createOrReplaceTempView("states")

+-------------+--------------------+
|       s_name|             s_bound|
+-------------+--------------------+
|       Alaska|POLYGON((-141.020...|
|      Alabama|POLYGON((-88.1955...|
|     Arkansas|POLYGON((-94.0416...|
|      Arizona|POLYGON((-112.598...|
|   California|POLYGON((-124.400...|
|     Colorado|POLYGON((-109.044...|
|  Connecticut|POLYGON((-73.4875...|
|     Delaware|POLYGON((-75.7919...|
|      Florida|POLYGON((-87.6050...|
|      Georgia|POLYGON((-85.6082...|
|       Hawaii|POLYGON((-154.628...|
|         Iowa|POLYGON((-95.7623...|
|        Idaho|POLYGON((-117.031...|
|     Illinois|POLYGON((-90.6290...|
|      Indiana|POLYGON((-87.5253...|
|       Kansas|POLYGON((-102.050...|
|     Kentucky|POLYGON((-89.5372...|
|    Louisiana|POLYGON((-94.0430...|
|Massachusetts|POLYGON((-72.7789...|
|     Maryland|POLYGON((-79.4778...|
+-------------+--------------------+
only showing top 20 rows

root
 |-- s_name: string (nullable = true)
 |-- s_bound: string (nullable = true)

+--

## Load city locations in CSV and convert the string column to a geometry column

Data source:

Ubuntu
`wget https://raw.githubusercontent.com/DataOceanLab/CPTS-415-Project-Examples/main/cities.csv`

MacOS
`curl -O https://raw.githubusercontent.com/DataOceanLab/CPTS-415-Project-Examples/main/cities.csv`

Pay attention to the longitude/latitude order in each record.

In [3]:
cities_csv = spark.read.option("delimiter", ",").option("header", "false").csv("cities.csv").toDF("c_name","c_loc")
cities_csv.show()
cities_csv.printSchema()

cities = cities_csv.selectExpr("c_name", "ST_PointFromText(c_loc, \'_\') as c_loc")
cities.show()
cities.printSchema()
cities.createOrReplaceTempView("cities")

+-------+--------------------+
| c_name|               c_loc|
+-------+--------------------+
|Pullman|-117.167126_46.73...|
|Phoenix|-112.092128_33.50...|
+-------+--------------------+

root
 |-- c_name: string (nullable = true)
 |-- c_loc: string (nullable = true)

+-------+--------------------+
| c_name|               c_loc|
+-------+--------------------+
|Pullman|POINT (-117.16712...|
|Phoenix|POINT (-112.09212...|
+-------+--------------------+

root
 |-- c_name: string (nullable = true)
 |-- c_loc: geometry (nullable = false)



## Find cities in each state. This is a join query.

### More predicate examples: https://sedona.apache.org/api/sql/Predicate/

In [4]:
city_per_state = spark.sql("select * from states s, cities c where ST_Contains(s.s_bound, c.c_loc)")
city_per_state.show()

21/09/19 14:12:51 WARN JoinQuery: UseIndex is true, but no index exists. Will build index on the fly.


+----------+--------------------+-------+--------------------+
|    s_name|             s_bound| c_name|               c_loc|
+----------+--------------------+-------+--------------------+
|Washington|POLYGON ((-123.32...|Pullman|POINT (-117.16712...|
|   Arizona|POLYGON ((-112.59...|Phoenix|POINT (-112.09212...|
+----------+--------------------+-------+--------------------+



## Calculate the distance from Seattle to each city

Seattle location: -122.313323 47.622715 found on Google Maps: 47°37'21.8"N 122°18'48.0"W

You can open Google Maps and randomly click a place to find its location. Pay attention to its longitude/latitude order.

The distance is not in any particular unit (kilometers, or miles) but it can still tell you how which city is closer to Seattle

### More function examples: https://sedona.apache.org/api/sql/Function/

In [5]:
dist_to_seattle = spark.sql("select c_name, ST_Distance(c_loc, ST_Point(-122.313323, 47.622715)) as dist from cities")
dist_to_seattle.show()

+-------+------------------+
| c_name|              dist|
+-------+------------------+
|Pullman|  5.22191398200746|
|Phoenix|17.428931546401486|
+-------+------------------+



## Convert Geospatial DataFrame to GeoPandas DataFrame for visualization

In [6]:
geopandas_df = gpd.GeoDataFrame(states.toPandas(), geometry="s_bound")
geopandas_df

Unnamed: 0,s_name,s_bound
0,Alaska,"POLYGON ((-141.02050 70.01870, -141.72910 70.1..."
1,Alabama,"POLYGON ((-88.19550 35.00410, -85.60680 34.991..."
2,Arkansas,"POLYGON ((-94.04160 33.02250, -91.20570 33.007..."
3,Arizona,"POLYGON ((-112.59890 36.99930, -110.86300 37.0..."
4,California,"POLYGON ((-124.40090 41.99830, -123.62370 42.0..."
5,Colorado,"POLYGON ((-109.04480 37.00040, -102.04240 36.9..."
6,Connecticut,"POLYGON ((-73.48750 42.04980, -73.42470 42.051..."
7,Delaware,"POLYGON ((-75.79190 39.71880, -75.78370 39.521..."
8,Florida,"POLYGON ((-87.60500 30.99880, -86.56130 30.996..."
9,Georgia,"POLYGON ((-85.60820 34.99740, -84.72660 34.990..."
