- configuring WherobotsDB to access S3 buckets
- loading Shapefile data to Spatial DataFrames
- performing a spatial join using SQL
- visualizing geospatial data
- writing results as GeoParquet

In [1]:
from sedona.spark import *
from pyspark.sql.functions import desc
import os

In [2]:
config = SedonaContext.builder().appName('sedona-example-python')\
    .config('spark.hadoop.fs.s3a.bucket.wherobots-examples.aws.credentials.provider','org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider')\
    .getOrCreate()
sedona = SedonaContext.create(config)
sc = sedona.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

In [3]:
# Read the countries shapefiles from S3
s3BucketName = 'wherobots-examples'
countries = ShapefileReader.readToGeometryRDD(sc, 's3://%s/data/ne_50m_admin_0_countries_lakes/' % s3BucketName)
# Convert the Spatial RDD to a Spatial DataFrame using the Adapter
countries_df = Adapter.toDf(countries, sedona)
countries_df.createOrReplaceTempView("country")
countries_df.printSchema()

[Stage 3:>                                                          (0 + 1) / 1]

root
 |-- geometry: geometry (nullable = true)
 |-- featurecla: string (nullable = true)
 |-- scalerank: string (nullable = true)
 |-- LABELRANK: string (nullable = true)
 |-- SOVEREIGNT: string (nullable = true)
 |-- SOV_A3: string (nullable = true)
 |-- ADM0_DIF: string (nullable = true)
 |-- LEVEL: string (nullable = true)
 |-- TYPE: string (nullable = true)
 |-- ADMIN: string (nullable = true)
 |-- ADM0_A3: string (nullable = true)
 |-- GEOU_DIF: string (nullable = true)
 |-- GEOUNIT: string (nullable = true)
 |-- GU_A3: string (nullable = true)
 |-- SU_DIF: string (nullable = true)
 |-- SUBUNIT: string (nullable = true)
 |-- SU_A3: string (nullable = true)
 |-- BRK_DIFF: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- NAME_LONG: string (nullable = true)
 |-- BRK_A3: string (nullable = true)
 |-- BRK_NAME: string (nullable = true)
 |-- BRK_GROUP: string (nullable = true)
 |-- ABBREV: string (nullable = true)
 |-- POSTAL: string (nullable = true)
 |-- FORMAL_EN: st

                                                                                

In [4]:
num_rows = countries_df.count()
print(f"Liczba wierszy w countries_df: {num_rows}")

[Stage 5:>                                                          (0 + 1) / 1]

Liczba wierszy w countries_df: 241


                                                                                

In [5]:
# Read the airports shapefiles from S3
airports = ShapefileReader.readToGeometryRDD(sc, 's3://%s/data/ne_50m_airports/' % s3BucketName)
# Convert the Spatial RDD to a Spatial DataFrame using the Adapter
airports_df = Adapter.toDf(airports, sedona)
airports_df.createOrReplaceTempView("airport")
airports_df.printSchema()

[Stage 8:>                                                          (0 + 1) / 1]

root
 |-- geometry: geometry (nullable = true)
 |-- scalerank: string (nullable = true)
 |-- featurecla: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- abbrev: string (nullable = true)
 |-- location: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- wikipedia: string (nullable = true)
 |-- natlscale: string (nullable = true)



                                                                                

In [6]:
num_rows = airports_df.count()
print(f"Liczba wierszy w airports_df: {num_rows}")

[Stage 10:>                                                         (0 + 1) / 1]

Liczba wierszy w airports_df: 281


                                                                                

In [7]:
# Run a spatial join query to find airports in each country
result = sedona.sql('SELECT c.geometry as country_geom, c.NAME_EN, a.geometry as airport_geom, a.name FROM country c, airport a WHERE ST_Contains(c.geometry, a.geometry)')
# Aggregate the results to find the number of airports in each country
aggregateResult = result.groupBy('NAME_EN', 'country_geom').count()
aggregateResult.orderBy(desc('count')).show()

                                                                                

+--------------------+--------------------+-----+
|             NAME_EN|        country_geom|count|
+--------------------+--------------------+-----+
|United States of ...|MULTIPOLYGON (((-...|   35|
|              Canada|MULTIPOLYGON (((-...|   15|
|              Brazil|MULTIPOLYGON (((-...|   12|
|              Mexico|MULTIPOLYGON (((-...|   12|
|People's Republic...|MULTIPOLYGON (((1...|    7|
|               India|MULTIPOLYGON (((6...|    7|
|             Germany|MULTIPOLYGON (((9...|    5|
|               Chile|MULTIPOLYGON (((-...|    5|
|              Russia|MULTIPOLYGON (((1...|    5|
|           Australia|MULTIPOLYGON (((1...|    5|
|            Colombia|MULTIPOLYGON (((-...|    4|
|        South Africa|MULTIPOLYGON (((2...|    4|
|                Iran|MULTIPOLYGON (((5...|    4|
|           Argentina|MULTIPOLYGON (((-...|    3|
|             Nigeria|MULTIPOLYGON (((7...|    3|
|            Pakistan|POLYGON ((76.7668...|    3|
|               Egypt|POLYGON ((36.8713...|    3|


In [8]:
result.show(4)

                                                                                

+--------------------+--------+--------------------+------------+
|        country_geom| NAME_EN|        airport_geom|        name|
+--------------------+--------+--------------------+------------+
|MULTIPOLYGON (((3...|Zimbabwe|POINT (28.6225520...|    Bulawayo|
|MULTIPOLYGON (((3...|Zimbabwe|POINT (31.1014 -1...|Harare Int'l|
|MULTIPOLYGON (((3...|  Zambia|POINT (28.4455443...|Lusaka Int'l|
|MULTIPOLYGON (((5...|   Yemen|POINT (45.030602 ...|  Aden Int'l|
+--------------------+--------+--------------------+------------+
only showing top 4 rows



                                                                                

In [9]:
# Visualize results using SedonaKepler
result_map = SedonaKepler.create_map(df=aggregateResult, name='Airport_Count')
result_map

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


                                                                                

KeplerGl(data={'Airport_Count': {'index': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 1…

In [10]:
# Write the results to a GeoParquet file
aggregateResult.write.format('geoparquet').mode('overwrite').save(os.getenv("USER_S3_PATH") + 'airport_country.parquet')

                                                                                