<h1>Spatial Queries in PySpark</h1>


This notebook shows you show to use use spatial queries in Spark environments. The notebook uses the spatio-temporal library that is pre-installed on all Spark environments in Watson Studio. You will learn how to perform common spatial queries in Spark. 

The types of spatial queries you will learn to use are:
- In a set of points, find all the points that are within a certain distance to a particular point. For example, find all the hospitals that are within a certain distance to a given location.
- In a set of polygons, find all the polygons that contain a particular point. For example, find all the risk areas for fires, floods, or hurricanes that contain a particular location.
- In a set of points, find all the points that are contained within a particular polygon. For example, find all the retail outlets in a particular region.

Often, a spatial function has one parameter that refers to a spatial column in one table and a second parameter that refers to a spatial constant or to a spatial column in another table. This notebook shows you how to use functions to access and combine data of different types to perform spatial queries.

This notebook runs on Python and Spark.


## Table of Contents


1. [Register the Spark SQL spatial functions](#register)
2. [Get sample data](#getData)
3. [Create a geometry column](#createColumn)
4. [Register the data frames](#registerDataframe)
5. [Run spatial queries](#runQueries)  
6. [Summary](#summary)




<a id="register"></a>
## 1. Register the Spark SQL spatial functions

Register the Spark SQL spatial functions:

In [None]:
spark._jvm.org.apache.spark.sql.types.SqlGeometry.registerAll(spark._jsparkSession)

<a id="getData"></a>
## 2. Get sample data

This notebook uses a sample data set that is available in the IBM Watson Studio Gallery. Direct links are used by default to make sure this notebook is publicly runnable.

In your own cases, you should use your preferred way of loading data into a Spark dataframe, depending on where your data source sits.

Here are some hints if you are using IBM Cloud Object Storage:
- If your data is uploaded directly into the current project, you can simply click `Code snippets` button in the menu bar, find your dataset, and then, under Insert as, select `SparkSession DataFrame`. Code that adds a Spark data frame will be generated automatically.
- If your data is hosted in a designated bucket, you can use `ibmos2spark` to read the data into a Spark data frame.


Read the hospital data where each hospital's location is a latitude-longitude point:

In [None]:
import pandas as pd
from pyspark.sql.types import *

In [None]:
from urllib.request import Request, urlopen
    
req = Request('https://api.dataplatform.cloud.ibm.com/v2/gallery-assets/entries/5562ced564e776edc5f91e13d48d8309/data?accessKey=466875ad0187d4ea757478e5c1130b59')
req.add_header('User-Agent', 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:77.0) Gecko/20100101 Firefox/77.0')
content = urlopen(req)

hospital_pdf = pd.read_csv(content)
print(hospital_pdf)

If you run into an error running the above code due to link not found, please download the `hospitals.csv` data set in the Watson Studio gallery and insert it manually using the method given above.

In [None]:
hospital_schema = StructType([StructField('id', IntegerType()),
                              StructField('name', StringType()),
                              StructField('city', StringType()),
                              StructField('state', StringType()),
                              StructField('lon', DoubleType()),
                              StructField('lat', DoubleType())])

In [None]:
hospital_df = spark.createDataFrame(hospital_pdf, hospital_schema)

In [None]:
hospital_df.show(3)

Read the county data where each county is a polygon/multipolygon:

In [None]:
from urllib.request import Request, urlopen  # Python 3
    
req = Request('https://api.dataplatform.cloud.ibm.com/v2/gallery-assets/entries/c8cc28f4c30dc4d8c0b13f18c50c3244/data?accessKey=c8cc28f4c30dc4d8c0b13f18c50fa2d5')
req.add_header('User-Agent', 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:77.0) Gecko/20100101 Firefox/77.0')
content = urlopen(req)

counties_pdf = pd.read_csv(content)[['NAME', 'STATE_NAME', 'POP2000', 'shape_WKT']]
print(counties_pdf)

In [None]:
counties_schema = StructType([StructField('NAME', StringType()),
                              StructField('STATE_NAME', StringType()),
                              StructField('POP2000', IntegerType()),
                              StructField('shape_WKT', StringType())])

In [None]:
counties_df = spark.createDataFrame(counties_pdf, counties_schema)

In [None]:
counties_df.show(3)

<a id="createColumn"></a>
## 3. Create a geometry column for hospital and county data

The raw spatial data in the data frame can be of various types, for example **columns indicating latitude and longitude** or **column indicating wkt string for the geometry**, and so on.

Therefore, the first step is to use a spatial query to generate a new spatial column that combines the data in these columns.  
For example, use the function:
- `ST_Point(lon_col, lat_col)` if the raw spatial data is in a latitude column and a longitude column  
- `ST_WKTTOSQL(wkt_col)` if the raw spatial data is in a column containing the wkt string form of the geometry  

For the full list of possible query functions, see [Geospatial Toolkit functions](https://www.ibm.com/support/knowledgecenter/en/SSCJDQ/com.ibm.swg.im.dashdb.analytics.doc/doc/geo_functions.html).

Create a geometry column for the hospital data using `ST_Point(lon, lat)`:

In [None]:
hospital_df.createOrReplaceTempView("hospitals")
hospital_df = spark.sql("SELECT *, ST_Point(lon, lat) as location from hospitals")
hospital_df.show(3, False)

Create a geometry column for the county data using `ST_WKTToSQL(wkt_string)`:

In [None]:
counties_df.createOrReplaceTempView('counties')
counties_df = spark.sql("SELECT NAME, STATE_NAME, POP2000, ST_WKTToSQL(shape_WKT) as shape from counties")
counties_df.show(3)

<a id="registerDataframe"></a>
## 4. Register the hospital and county data frames as a temporary view

A data frame can also be used to create a temporary view. Registering a data frame as a table allows you to run SQL queries over its data. Register the hospital and county data frames as a temporary view: 

In [None]:
#spark.sql.legacy.storeAnalyzedPlanForView = True

#SparkSession.sql("set spark.sql.legacy.storeAnalyzedPlanForView = true")


spark.conf.set("spark.sql.legacy.storeAnalyzedPlanForView","False")

print(spark.conf.get("spark.sql.legacy.storeAnalyzedPlanForView"))


In [None]:
hospital_df.createOrReplaceTempView('hospitals_temp')
counties_df.createOrReplaceTempView('counties_temp')

<a id="runQueries"></a>
## 5. Run spatial queries

1. [Example 1: Query to determine points closest to another point](#ex1)
1. [Example 2: Queries to determine which polygon contains a point](#ex2)
1. [Example 3: Queries to determine the points in a polygon](#ex3)
1. [Example 4: Spatial join queries to determine points in a polygon](#ex4)
1. [Example 5: Spatial join queries with additional predicates and aggregation](#ex5)
1. [Example 6: Window queries](#ex6)
1. [Example 7: Distance queries](#ex7)

<a id = "ex1"></a>
### Example 1: Query to determine points closest to another point

This sample query shows you how to find the hospitals that are within a certain distance of a given location (which is constructed using the `ST_Point` constructor).

In [None]:
spark.sql("""
SELECT name, city, state
FROM hospitals_temp
WHERE ST_Distance(location, ST_Point(-77.574722, 43.146732)) < 10000.0
""").show()

<a id = "ex2"></a>
### Example 2: Queries to determine which polygon contains a point

The following sample queries show you how to use spatial functions to determine which polygon contains a given point. The examples use the following functions:

1. `ST_Contains(geom1, geom2)`: returns TRUE if the `geom2` values are completely contained by the polygons identified by `geom1`.
2. `ST_Within(geom1, geom2)`: returns TRUE if the `geom1` values are within the polygons identified by `geom2`.
3. `ST_Intersects(geom1, geom2)`: returns TRUE if `geom1` and `geom2` intersect spatially in any way. This can be that they  touch, cross, or contain one other.

In [None]:
spark.sql("""
SELECT NAME 
FROM counties_temp 
WHERE ST_Contains(shape, ST_Point(-74.237, 42.037))
""").show()

In [None]:
spark.sql("""
SELECT NAME
FROM counties_temp
WHERE
ST_Within(ST_Point(-74.237, 42.037), shape)
""").show()

In [None]:
spark.sql("""
SELECT NAME
FROM counties_temp
WHERE
ST_Intersects(shape, ST_Point(-74.237, 42.037))
""").show()

<a id = "ex3"></a>
### Example 3: Queries to determine the points in a polygon

Each of the following queries determine which hospitals are located within the specified polygon, which is defined as a constant using the  well-known text (WKT) representation. The polygon definition consists of the character string POLYGON followed by a pair of $x$ and $y$ coordinates for each vertex, separated by a comma. The individual $x$ and $y$ values are separated by a space. The entire list of coordinate pairs must be in parentheses.

In [None]:
spark.sql("""
SELECT name
FROM hospitals_temp
WHERE
ST_Contains(ST_WKTToSQL('POLYGON ((-74.0 42.0, -73.0 42.0, -73.0 43.0, -74.0 43.0, -74.0 42.0))'), location)
""").show(3)

In [None]:
spark.sql("""
SELECT name 
FROM hospitals_temp
WHERE ST_Within(location, ST_WKTToSQL('POLYGON ((-74.0 42.0, -73.0 42.0, -73.0 43.0, -74.0 43.0, -74.0 42.0))'))
""").show(3)

In [None]:
spark.sql("""
SELECT name 
FROM hospitals_temp
WHERE ST_Intersects(location, ST_WKTToSQL('POLYGON ((-74.0 42.0, -73.0 42.0, -73.0 43.0, -74.0 43.0, -74.0 42.0))'))
""").show(3)

<a id = "ex4"></a>
### Example 4: Spatial join queries to determine points in a polygon

Just as a regular join function can join two tables based on the values in columns that contain character or numeric data, spatial join functions can be used to join tables based on the values in the columns that contain spatial data. The following examples use the **counties** and **hospitals** tables.

You can use the spatial join function to find the hospitals located within a specific county. For example, the following query returns a list of all the hospitals in the Dutchess county:

In [None]:
spark.sql("""
SELECT c.NAME, h.name 
FROM counties_temp AS c, hospitals_temp AS h 
WHERE c.NAME = 'Dutchess' 
AND ST_Intersects(c.shape, h.location)
""").show()

Alternatively, you can use the SQL `JOIN ... ON ...` notation, which is equivalent to a spatial predicate in the `WHERE` clause. For example, the following query produces the same result set as the previous query:

In [None]:
spark.sql("""
SELECT h.name, c.NAME
FROM counties_temp AS c
JOIN hospitals_temp AS h
ON c.NAME = 'Dutchess'
AND ST_Intersects(h.location, c.shape)
""").show()

The following query returns the name of the county in which a particular hospital is located:

In [None]:
spark.sql("""
SELECT c.NAME, h.name
FROM hospitals_temp AS h, counties_temp AS c
WHERE ST_Intersects(h.location, c.shape)
AND h.name = 'Vassar Brothers Hospital'
""").show()

<a id = "ex5"></a>
### Example 5: Spatial join queries with additional predicates and aggregation

This example shows you how to use spatial joins in conjunction with additional predicates and aggregation, which can address business problems. These examples continue to use the hospitals and counties tables, but the same principles could be applied to any other type of data.

The following example queries the hospitals within each county in New York state, qualifying by the state name in the counties table.

In [None]:
spark.sql("""
SELECT c.NAME, h.name
FROM counties_temp AS c, hospitals_temp AS h
WHERE ST_Intersects(h.location, c.shape)
AND c.STATE_NAME='New York'
ORDER BY c.NAME, h.name
""").show(3)

The same results can be obtained by rewriting the above query and using the fields from the hospitals table:

In [None]:
spark.sql("""
SELECT c.NAME, h.name
FROM hospitals_temp AS h, counties_temp AS c
WHERE ST_Intersects(h.location, c.shape)
AND h.state='NY'
ORDER BY c.NAME, h.name
""").show(3)

The following example lists the number of hospitals per county in New York:

In [None]:
spark.sql("""
SELECT c.NAME, COUNT(h.name) AS hospital_count
FROM counties_temp AS c, hospitals_temp AS h
WHERE ST_Intersects(h.location, c.shape)
AND c.STATE_NAME='New York'
GROUP BY c.NAME
""").show(3)

To identify counties where the population is underserved by hospitals, an interesting metric might be the number of people per hospital in each county. Using the population of each county in the year 2000, you can calculate this number.

In [None]:
spark.sql("""
SELECT c.NAME, 
COUNT(h.name) AS hospital_count, 
c.POP2000 AS Population, 
c.POP2000/COUNT(h.name) AS people_per_hospital
FROM counties_temp AS c, hospitals_temp AS h
WHERE c.STATE_NAME='New York'
AND ST_Intersects(h.location, c.shape)
GROUP BY c.NAME, c.POP2000
ORDER BY people_per_hospital DESC
""").show(3)

With additional detail, such as number of beds, number of doctors per hospital, you could determine a better measure for health care coverage per state and population.

<a id = "ex6"></a>
### Example 6: Window queries

A common use case for mapping applications, and in particular for web mapping, is to select objects that fall within a specific rectangular region. This can be done by creating a polygon to represent the rectangle and using the `ST_Intersects` spatial predicate.

In [None]:
spark.sql("""
SELECT name
FROM hospitals_temp
WHERE ST_Intersects(location, ST_WKTToSQL(
 'POLYGON ((-74.0 42.0, -73.0 42.0, -73.0 43.0, -74.0 43.0, -74.0 42.0))'))
""").show(3)

<a id = "ex7"></a>
### Example 7: Distance queries

Another common spatial query is to find things within a specified distance of a particular location. You have probably used web-mapping applications to get this kind of information. You can issue SQL queries from your application for questions like:

- Find customers within 10 miles of a store
- Find ATMs within 500 meters of the current location
- Find competitive stores within 10 kilometers of a proposed store location

The spatial function used for these queries is `ST_Distance`, which computes the distance between the spatial values and returns a result in meters. 

The following query generates eight results:

In [None]:
spark.sql("""
SELECT name
FROM hospitals_temp
WHERE ST_Intersects(location, ST_WKTToSQL(
 'POLYGON ((-74.0 42.0, -73.0 42.0, -73.0 43.0, -74.0 43.0, -74.0 42.0))'))
""").show(3)

A different way of querying the same location above is to use the `ST_Buffer` function, where a circular buffer is created around the given geometry and the desired geometries within that buffer are determined. The `ST_Buffer` function takes as parameters a spatial geometry and a distance in meters to the buffer around this spatial value. The results are the same as when you us `ST_Intersects`.

In [None]:
spark.sql("""
SELECT name
FROM hospitals_temp
WHERE
ST_Intersects(location,
  ST_Buffer(ST_Point(-74.237, 42.037), 46800.0))
ORDER BY name
""").show(3)

The following query returns the distance from a specified point to each object within a 30 mile (or approximately 46800m) radius:

In [None]:
spark.sql("""
SELECT name, ST_Distance(location, ST_Point(-74.237, 42.037)) AS distance
FROM hospitals_temp
WHERE ST_Distance(location, ST_Point(-74.237, 42.037)) < 46800.0
ORDER BY distance
""").show(3)

You could also use `ST_Buffer` to compute the spatial relation and then determine the distance as is shown in the following query:

In [None]:
spark.sql("""
SELECT name, ST_Distance(location, ST_Point(-74.237, 42.037)) AS distance
FROM hospitals_temp
WHERE
  ST_Intersects(location,
  ST_Buffer(ST_Point(-74.237, 42.037), 46800.0))
ORDER BY distance
""").show(3)

A key difference to be noted here is that the `ST_Buffer` in this package supports buffering of arbitrary geometries and can be used to compute in that manner. Note that:
- The `ST_Buffer` query on large geometries can be expensive.
- For a large number of geometries, the user is advised to calculate the buffers separately, store the buffers in columns, and operate on the stored buffers.

In [None]:
spark.sql("""
SELECT name, ST_Distance(location, ST_WKTToSQL(
 'LINESTRING (-74.0 42.0, -73.0 42.0)'))
FROM hospitals_temp
WHERE ST_Intersects(location, ST_Buffer(ST_WKTToSQL(
 'LINESTRING (-74.0 42.0, -73.0 42.0)'), 46800.0))
""").show(3)

<a id="summary"></a>
##  Summary

In this notebook, you learned how to query spatial data you downloaded from the IBM Watson Studio Gallery. You registered each data frame (one with data on hospitals and another with county information) as a table to run your queries on. The sample queries showed you how to determine the hospitals within a certain distance or in a polygon, to find the name of the county in which a hospital is located, or to identify the counties where the population is underserved by hospitals. The sample queries showed you how to use and combine the most common Spark SQL spatial functions in queries. 

### Author

**Linsong Chu**, Research Engineer at IBM Research

Copyright © 2019 IBM. This notebook and its source code are released under the terms of the MIT License.

<div style="background:#F5F7FA; height:110px; padding: 2em; font-size:14px;">
<span style="font-size:18px;color:#152935;">Love this notebook? </span>
<span style="font-size:15px;color:#152935;float:right;margin-right:40px;">Don't have an account yet?</span><br>
<span style="color:#5A6872;">Share it with your colleagues and help them discover the power of Watson Studio!</span>
<span style="border: 1px solid #3d70b2;padding:8px;float:right;margin-right:40px; color:#3d70b2;"><a href="https://ibm.co/wsnotebooks" target="_blank" style="color: #3d70b2;text-decoration: none;">Sign Up</a></span><br>
</div>