#Mapping the world's airports with GeoPandas, Spark and GeoMesa

We will import datasets of country borders and of airport location and map in which country each airport is located. The airport dataset contains the country where each airport is located, but for this exercise we will pretend we don't have that data, and map each airport to its country using only geographical information.

## Download and process country borders

We download country border shapes from [Natural Earth Data](http://www.naturalearthdata.com), a public domain dataset.

In [4]:
%sh curl -fLO https://www.naturalearthdata.com/http//www.naturalearthdata.com/download/110m/cultural/ne_110m_admin_0_countries.zip

In [5]:
%sh unzip -o ne_110m_admin_0_countries.zip

The data is in Shapefile format. For a pure Python solution we use the *geopandas* module to parse the Shapefile.

In [7]:
dbutils.library.installPyPI("geopandas", version="0.5.1")

In [8]:
import geopandas
country_shapes = geopandas.read_file("ne_110m_admin_0_countries.shp")
country_shapes = country_shapes[['NAME', 'ADM0_A3_IS', 'geometry']]
country_shapes.head(3)

Unnamed: 0,NAME,ADM0_A3_IS,geometry
0,Fiji,FJI,"(POLYGON ((180 -16.06713266364245, 180 -16.555..."
1,Tanzania,TZA,POLYGON ((33.90371119710453 -0.950000000000000...
2,W. Sahara,MAR,POLYGON ((-8.665589565454809 27.65642588959236...


We create a Spark temporary view in order to further process the data with Scala later on. We convert the geometry column from Shapely objects into their WKT string representation.

In [10]:
country_wkt = country_shapes.copy()
country_wkt['geometry'] = country_wkt['geometry'].apply(lambda x: x.wkt)
spark.createDataFrame(country_wkt).createOrReplaceTempView("countries")

In [11]:
table("countries").count()

## Download and process airport information

We download airport geolocation data from [OpenFlights](https://github.com/jpatokal/openflights).

In [13]:
%sh curl -fo /dbfs/airports.dat https://raw.githubusercontent.com/jpatokal/openflights/master/data/airports.dat

In [14]:
airports_schema = """Airport_ID long, Name string, City string, Country string,
  IATA string, ICAO string, Latitude double, Longitude double, Altitude double,
  Timezone string, DST string, Tz string, Type string, Source string"""
airports = spark.read.load("dbfs:/airports.dat",
                     format="csv", schema=airports_schema).cache()
display(airports.limit(3))
airports.createOrReplaceTempView("airports")

Airport_ID,Name,City,Country,IATA,ICAO,Latitude,Longitude,Altitude,Timezone,DST,Tz,Type,Source
1,Goroka Airport,Goroka,Papua New Guinea,GKA,AYGA,-6.081689834590001,145.391998291,5282.0,10,U,Pacific/Port_Moresby,airport,OurAirports
2,Madang Airport,Madang,Papua New Guinea,MAG,AYMD,-5.20707988739,145.789001465,20.0,10,U,Pacific/Port_Moresby,airport,OurAirports
3,Mount Hagen Kagamuga Airport,Mount Hagen,Papua New Guinea,HGU,AYMH,-5.826789855957031,144.29600524902344,5388.0,10,U,Pacific/Port_Moresby,airport,OurAirports


In [15]:
table("airports").count()

## Approach 1: Vectorized Shapely operations

The `Shapely` library offers a limited set of operations as vectorized implementations. We use them to avoid computing a `contains` operation for each pair of (country, airport).

In [17]:
import shapely.vectorized as sv

ap = table("airports").select("Longitude", "Latitude").toPandas()
country_shapes['airport_indexes'] = country_shapes.geometry.apply(lambda g: sv.contains(g, x=ap.Longitude, y=ap.Latitude).nonzero()[0])
country_shapes['airport_counts'] = country_shapes['airport_indexes'].apply(len)
country_shapes.sort_values(by='airport_counts', ascending=False).head(5)

Unnamed: 0,NAME,ADM0_A3_IS,geometry,airport_indexes,airport_counts
4,United States of America,USA,"(POLYGON ((-122.84 49.00000000000011, -120 49....","[3212, 3213, 3214, 3216, 3217, 3218, 3219, 322...",1451
3,Canada,CAN,"(POLYGON ((-122.84 49.00000000000011, -122.974...","[20, 21, 22, 23, 24, 25, 27, 28, 30, 31, 33, 3...",391
137,Australia,AUS,(POLYGON ((147.6892594748842 -40.8082581520226...,"[3125, 3126, 3127, 3128, 3129, 3130, 3131, 313...",287
18,Russia,RUS,"(POLYGON ((178.7253 71.0988, 180.0000000000001...","[2773, 2774, 2775, 2776, 2777, 2778, 2779, 278...",261
29,Brazil,BRA,POLYGON ((-53.37366166849824 -33.7683777809007...,"[2392, 2393, 2394, 2395, 2396, 2397, 2398, 239...",260


This performs extremely fast (under 1 second) on our dataset, running only on the driver node since Spark is not used. What if we needed to apply more complex operations that are not currently available as vectorized implementations, or we had more data than fits into memory? Let's explore different ways of distributing the computation with Spark.

## Approach 2: Shapely UDF (naive usage)

In this approach, we use the geometric operations in the Shapely library to map airports to countries. This results in a cartesian product (cross join), since the geometric `contains` function must be called for every pair (airport, country), so well over a million times. Spark tries to protect us against the potentially catastrophic effect of cross-joins, so we must enable them explicitly.

In [20]:
import shapely
import shapely.wkt

from pyspark.sql.functions import *

@udf("boolean")
def polygon_contains_point(polygon, point):
  sh_polygon = shapely.wkt.loads(polygon)
  sh_point = shapely.geometry.Point(point)
  return sh_polygon.contains(sh_point)
        
spark.conf.set("spark.sql.crossJoin.enabled", True)
display(table("countries")
  .join(table("airports"), polygon_contains_point("countries.geometry", array("airports.Longitude", "airports.Latitude")))
  .groupBy("Country")
  .count()
  .orderBy(desc("count"))
  .limit(5)
)

Country,count
United States,1441
Canada,391
Australia,287
Brazil,258
Russia,255


In [21]:
spark.conf.set("spark.sql.crossJoin.enabled", False)

The command takes a much longer time to run, because of the cross-join and because we are not using vectorized Shapely operations. It is not the best approach when combining large datasets, but could be very suitable for applications such as geofencing (computing whether a location is within an assigned shape). If each geofenced object has a unique polygon it should remain in, we match each location with a single polygon.

## Approach 3 - UDF with broadcast variable (avoiding cross join)

A more efficient approach is to broadcast the dataset that can be applied with a vectorized operation (in our case, the airport coordinates). We can also convert our plain UDF to a Pandas UDF so that it is called with entire series of data rather than one row at a time.

The Shapely vectorized `contains` function returns an array of booleans, indicating for each airport whether it is located in the given country. We could directly count the number of airports from that array, but for illustration purposes we use the Spark `explode` function function to convert the matching items into separate rows and allow joining to the airports table.

In [24]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from shapely import wkt

broadcastedAirports = spark.sparkContext.broadcast(table("airports").select("Airport_ID", "Longitude", "Latitude").toPandas())

@pandas_udf(ArrayType(LongType()))
def get_contained_airports(polygon):
  sh_polygon = polygon.apply(wkt.loads)
  ap = broadcastedAirports.value
  return sh_polygon.apply(lambda g: ap.Airport_ID[sv.contains(g, x=ap.Longitude, y=ap.Latitude)].tolist())

countries_airports = (
table("countries").withColumn("Airport_ID", get_contained_airports("countries.geometry"))
  .withColumn("Airport_ID", explode("Airport_ID"))
  .join(table("airports"), "Airport_ID")
  .select(col("countries.NAME").alias("Country"), col("airports.Name").alias("Airport"))
  .cache()
)
display(countries_airports.limit(6))

Country,Airport
Fiji,Nadi International Airport
Fiji,Nausori International Airport
Fiji,Labasa Airport
Fiji,Savusavu Airport
Tanzania,Arusha Airport
Tanzania,Julius Nyerere International Airport


In [25]:
display(countries_airports
  .groupBy("Country")
  .count()
  .orderBy(desc("count"))
  .limit(5)
       )

Country,count
United States of America,1451
Canada,391
Australia,287
Russia,261
Brazil,260


## Approach 4: GeoMesa processing

We use the GeoMesa Spark SQL functions (available in Scala) to convert the WKT shapes into GeoTools Geometry objects. To explore some additional operations, we will also compute the country area. Note that the `st_area` operation computes the area in *squared degrees* directly from geographical coordinates. As the Earth is not a perfect sphere, this makes comparisons between countries slightly imprecise.

In [27]:
%scala
import org.apache.spark.sql.SQLTypes
import org.apache.spark.sql.functions._
import org.locationtech.geomesa.spark.jts._

// Register custom Geometry types
SQLTypes.init(sqlContext)

val countries =
table("countries")
  .withColumn("geometry", st_geomFromWKT($"geometry"))
  .select(
    $"ADM0_A3_IS".as("country_code"),
    $"Name".as("country"),
    $"geometry",
    st_area($"geometry").as("area")
  )
  .cache
display(countries.limit(1))

country_code,country,geometry,area
FJI,Fiji,"MULTIPOLYGON (((180 -16.06713266364245, 180 -16.5552165666392, 179.3641426619641 -16.80135407694688, 178.7250593629971 -17.01204167436804, 178.5968385951171 -16.63915, 179.0966093629971 -16.4339842775474, 179.4135093629971 -16.3790542775474, 180 -16.06713266364245)), ((178.12557 -17.50481, 178.3736 -17.33992, 178.71806 -17.62846, 178.55271 -18.15059, 177.93266 -18.28799, 177.38146 -18.16432, 177.28504 -17.72465, 177.67087 -17.38114, 178.12557 -17.50481)), ((-179.7933201090486 -16.02088225674122, -179.9173693847653 -16.5017831356494, -180 -16.5552165666392, -180 -16.06713266364245, -179.7933201090486 -16.02088225674122)))",1.6395109959007912


We use GeoMesa Spark SQL [Spatial joins](https://www.geomesa.org/documentation/tutorials/dwithin-join.html) in order to map airports to countries. In our particular case, the code runs in around 10 seconds on a single worker node, given the overhead of distribution, but is highly scalable.

In [29]:
%scala

val airportCountries = countries.as("countries")
  .join(table("airports"), st_contains($"countries.geometry", st_point($"airports.Longitude", $"airports.Latitude")))
  .cache

 val airportsSummary = airportCountries.groupBy($"countries.country_code")
  .agg(
    first($"countries.country").as("country"),
    first($"countries.area").as("area"),
    count($"airports.Name").as("airports")
  )
  .orderBy(desc("airports"))
  .cache

display(airportsSummary)

country_code,country,area,airports
USA,United States of America,1122.28192077808,1451
CAN,Canada,1712.9952276493768,391
AUS,Australia,695.5455009461047,287
RUS,Russia,2935.205205440517,261
BRA,Brazil,710.1852431533746,260
CHN,China,954.6353412364668,240
DEU,Germany,45.92359430736883,232
FRA,France,72.61566570396077,212
IND,India,277.9247129920647,140
GBR,United Kingdom,34.20295398919943,131


The United States appear to be the country with the most airports.  In Databricks, switch to a World Map visualization to view the data graphically.

Let's normalize the data by country area (and take the square root of that value to generate a more interesting color scale when plotting the data as a world map).

In [31]:
%scala
display(airportsSummary
        .select($"country_code", sqrt($"airports"/$"area").as("airports_per_deg_sq_sqrt_scale"))
        .orderBy(desc("airports_per_deg_sq_sqrt_scale"))
)

country_code,airports_per_deg_sq_sqrt_scale
CYP,2.855161221472392
CHE,2.843929870455225
PRI,2.759370339976614
ISR,2.619056743190121
BEL,2.60547812306752
CRI,2.599750008360929
VUT,2.517114964720261
DEU,2.247636399010544
BHS,2.2362623706993867
NLD,2.179237860356214


Cyprus and Switzerland (CHE) are the countries with the highest airport density.