Reading and writing various data sources,
Aggregating and analyzing raster and vector data,
Calculating a basic risk score across several paramaters and at various scales,
Visualizing analytical results.

In [4]:
%%time
from sedona.spark import *
from sedona.sql import st_functions as stf
from sedona.sql import st_constructors as stc
from sedona.sql import st_predicates as stp
from pyspark.sql.functions import expr, col, count, isnan, when, explode,broadcast,min as spark_min, monotonically_increasing_id, year, lit, to_timestamp
from pyspark.sql.window import Window

import os



config = SedonaContext.builder().\
     config("spark.hadoop.fs.s3a.bucket.wherobots-examples.aws.credentials.provider",
            "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider").\
     getOrCreate()
sedona = SedonaContext.create(config)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

CPU times: user 445 ms, sys: 50 ms, total: 495 ms
Wall time: 26.3 s


Data

1.Overture Maps Foundation Layers
Transportation
Places
Service Stations
Housing
Emergency Services
Buildings

2.European Commision
River Flood Hazard Maps
Landslide Risk

Load Vector Assets

In [5]:
pl_df= sedona.sql(f"""
SELECT
    geometry
FROM 
    wherobots_open_data.overture_2024_07_22.divisions_division_area 
WHERE
    country ='PL'
AND
    subtype = 'country'

 """)

In [6]:
pl_df.show()

                                                                                

+--------------------+
|            geometry|
+--------------------+
|POLYGON ((18.7776...|
+--------------------+



In [7]:
pl_geom_wkt = pl_df.collect()[0].geometry

                                                                                

In [8]:
type(pl_geom_wkt)

shapely.geometry.polygon.Polygon

In [9]:
kepler_map = SedonaKepler.create_map()
SedonaKepler.add_df(kepler_map, pl_df, name="PL")
kepler_map

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


                                                                                

KeplerGl(data={'PL': {'index': [0], 'columns': ['geometry'], 'data': [['POLYGON ((18.7776846000000006 49.68426…

Change the number of partitions in a DataFrame to 12. This help evenly distribute data and improve parallel computation performance.

In [13]:
%%time
# read Overture Maps Foundation table.
pl_service_stations_df = sedona.table("""wherobots_open_data.overture_2024_07_22.places_place""")\
                               .where(expr(f"""ST_INTERSECTS(geometry,ST_GEOMFROMWKT("{pl_geom_wkt}"))"""))\
                               .where("""categories.primary in ('truck_repair_and_services_for_businesses','truck_repair','transmission_repair',
                                                                    'oil_change_station','brake_service_and_repair','automotive_services_and_repair',
                                                                    'automotive_repair','auto_glass_service','auto_body_shop')""")

pl_emergency_services_df  = sedona.table("""wherobots_open_data.overture_2024_07_22.places_place""")\
                               .where(expr(f"""ST_INTERSECTS(geometry,ST_GEOMFROMWKT("{pl_geom_wkt}"))"""))\
                               .where("""categories.primary in ('law_enforcement','police_department','air_ambulance',
                                                                'disaster_response_services','emergency_medicine','emergency_roadside_service',
                                                                'emergency_room','fire_department','hospital','towing_service','urgent_care_clinic')""")

pl_transportation_df = sedona.table("""wherobots_open_data.overture_2024_07_22.transportation_segment""")\
                          .where(expr(f"""ST_INTERSECTS(geometry,ST_GEOMFROMWKT("{pl_geom_wkt}"))"""))\
                          .where(expr("class = 'motorway'"))

pl_buildings_df = sedona.table("""wherobots_open_data.overture_2024_07_22.buildings_building""")\
                     .where(expr(f"""ST_INTERSECTS(geometry,ST_GEOMFROMWKT("{pl_geom_wkt}"))"""))\
                     .where("""class in ('residential','apartments') """)



# repartition, count and create temporary views
pl_emergency_services_df = pl_emergency_services_df.repartition(12)
print(f"Emergency Services: {pl_emergency_services_df.count()}")
pl_emergency_services_df.createOrReplaceTempView("pl_emergency_services")

pl_service_stations_df = pl_service_stations_df.repartition(12)
print(f"Service Stations: {pl_service_stations_df.count()}")
pl_service_stations_df.createOrReplaceTempView("pl_service_stations")

pl_transportation_df = pl_transportation_df.repartition(12)
print(f"Transportation: {pl_transportation_df.count()}")
pl_transportation_df.createOrReplaceTempView("pl_transportation")

pl_buildings_df = pl_buildings_df.repartition(12)
print(f"Buildings: {pl_buildings_df.count()}")
pl_buildings_df.createOrReplaceTempView("pl_buildings")

                                                                                

Emergency Services: 8741


                                                                                

Service Stations: 10720


                                                                                

Transportation: 2814




Buildings: 1075939
CPU times: user 135 ms, sys: 3.05 ms, total: 138 ms
Wall time: 29.6 s


                                                                                

Load Raster Assets

In [14]:
%%time
# Read directory of Flood Risk 
flood_layers_dir = f"""s3://wherobots-examples/data/examples/global_flood_hazard_layers/europe"""


CPU times: user 3 μs, sys: 0 ns, total: 3 μs
Wall time: 5.48 μs


In [15]:
# Create a DataFrame then temporary view containing raster data
flood_layers_df = sedona.read.format("binaryFile")\
    .option("pathGlobFilter", "*.tif")\
    .option("recursiveFileLookup", "true")\
    .load(flood_layers_dir).drop("content").withColumn("rast", expr("RS_FromPath(path)"))

1️⃣ Reading raster files (.tif)

flood_layers_df = sedona.read.format("binaryFile")
binaryFile → loads binary files (e.g. images, rasters).

2️⃣ Filtering GeoTIFF files (*.tif)

.option("pathGlobFilter", "*.tif")
Downloads only files with the .tif extension.

3️⃣ Recursive folder search

.option("recursiveFileLookup", "true")
Searches for .tif files also in flood_layers_dir subfolders.

4️⃣ Loading files and removing the content column

.load(flood_layers_dir).drop("content") content contains binary data of the file, which is not needed.

5️⃣ Converting a file path to a raster

.withColumn("rast", expr("RS_FromPath(path)")) RS_FromPath(path) → A Sedona function that converts a .tif file path to a raster object.

In [16]:
flood_layers_df.show()



+--------------------+-------------------+---------+--------------------+
|                path|   modificationTime|   length|                rast|
+--------------------+-------------------+---------+--------------------+
|s3://wherobots-ex...|2024-09-03 17:58:04|425584926|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:57:49|408248797|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:57:49|394744774|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:58:06|387654925|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:58:06|378680038|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:58:03|372259813|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:57:51|364481691|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:57:57|353070088|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:57:49|328471548|LazyLoadOutDbGrid...|
+--------------------+-------------------+---------+--------------------+



                                                                                

The table contains the following columns:

path – the path to the file that is stored on the cloud resource, in S3 (e.g. s3://wherobots-ex...). It contains the full location of the file on the cloud drive.

modificationTime – the date and time of the last modification of the file. The date format is YYYY-MM-DD HH:mm:ss (e.g. 2024-09-03 17:58:04).

length – the size of the file in bytes 
rast – a raster object stored in memory. The values ​​in this column are represented by an object of the LazyLoadOutDbGrid type, which is specific to Sedona technology and contains raster data in the form of delayed loading. In this case, it is a representation of raster data (e.g. map, image) containing geolocated information.

In [17]:
flood_layers_df.createOrReplaceTempView("flood_raster")
flood_layers_df.show()

+--------------------+-------------------+---------+--------------------+
|                path|   modificationTime|   length|                rast|
+--------------------+-------------------+---------+--------------------+
|s3://wherobots-ex...|2024-09-03 17:58:04|425584926|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:57:49|408248797|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:57:49|394744774|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:58:06|387654925|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:58:06|378680038|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:58:03|372259813|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:57:51|364481691|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:57:57|353070088|LazyLoadOutDbGrid...|
|s3://wherobots-ex...|2024-09-03 17:57:49|328471548|LazyLoadOutDbGrid...|
+--------------------+-------------------+---------+--------------------+



In [18]:
flood_raster_tiled= sedona.sql(f"""
SELECT 
    RS_TileExplode(rast, 256, 256) AS (x, y, tile), 
    REVERSE(SPLIT(RS_BandPath(rast), '/'))[0] AS path,
    int(REPLACE(SPLIT(REVERSE(SPLIT(RS_BandPath(rast), '/'))[0],'_')[1],'RP','')) as event
FROM 
    flood_raster
WHERE 
    RS_Intersects(rast,ST_GEOMFROMWKT("{pl_geom_wkt}"))
AND
    int(REPLACE(SPLIT(REVERSE(SPLIT(RS_BandPath(rast), '/'))[0],'_')[1],'RP',''))  in (10, 50, 100)""")

In [19]:
flood_raster_tiled = flood_raster_tiled.repartition(12)
flood_raster_tiled.createOrReplaceTempView("flood_raster_tiled")
flood_raster_tiled.show()



+---+---+--------------------+--------------------+-----+
|  x|  y|                tile|                path|event|
+---+---+--------------------+--------------------+-----+
| 18|134|OutDbGridCoverage...|Europe_RP50_fille...|   50|
| 38| 14|OutDbGridCoverage...|Europe_RP50_fille...|   50|
|422|187|OutDbGridCoverage...|Europe_RP50_fille...|   50|
|264| 38|OutDbGridCoverage...|Europe_RP50_fille...|   50|
|208|116|OutDbGridCoverage...|Europe_RP50_fille...|   50|
|302|186|OutDbGridCoverage...|Europe_RP50_fille...|   50|
|321|183|OutDbGridCoverage...|Europe_RP50_fille...|   50|
|203|103|OutDbGridCoverage...|Europe_RP50_fille...|   50|
|247|163|OutDbGridCoverage...|Europe_RP50_fille...|   50|
|269|134|OutDbGridCoverage...|Europe_RP50_fille...|   50|
| 74|162|OutDbGridCoverage...|Europe_RP50_fille...|   50|
|340| 17|OutDbGridCoverage...|Europe_RP50_fille...|   50|
| 35|107|OutDbGridCoverage...|Europe_RP50_fille...|   50|
|415| 84|OutDbGridCoverage...|Europe_RP50_fille...|   50|
| 34| 67|OutDb

                                                                                

In [20]:
%%time
# Read the landslide risk tif
landslide_risk_tif = f"""s3://wherobots-examples/data/examples/EuropeanLandslideSusceptibilityMap/elsus_v2_4326_cog.tif"""
# 0 (-2147483647) = no data; 1 = very low; 2 = low; 3 = moderate; 4 = high; 5 = very high

# Create a DataFrame then temporary view containing raster data
landslide_risk_df = sedona.read.format("binaryFile")\
    .load(landslide_risk_tif).drop("content").withColumn("rast", expr("RS_FromPath(path)"))

landslide_risk_df.createOrReplaceTempView("land_slide_risk_raster")
landslide_risk_df.show()
# tile the raster into 256x256 images and save as a temporary view 
landslide_risk_tiled_df= sedona.sql(f"""
SELECT 
    RS_TileExplode(rast, 256, 256) AS (x, y, tile), 
    REVERSE(SPLIT(RS_BandPath(rast), '/'))[0] AS path 
FROM 
    land_slide_risk_raster
WHERE 
    RS_Intersects(rast,ST_GEOMFROMWKT("{pl_geom_wkt}"))
""")
landslide_risk_tiled_df= landslide_risk_tiled_df.repartition(12)
landslide_risk_tiled_df.createOrReplaceTempView("land_slide_risk_tiled")
landslide_risk_tiled_df.show()

+--------------------+-------------------+----------+--------------------+
|                path|   modificationTime|    length|                rast|
+--------------------+-------------------+----------+--------------------+
|s3://wherobots-ex...|2024-09-03 18:02:44|3494856744|LazyLoadOutDbGrid...|
+--------------------+-------------------+----------+--------------------+

+---+---+--------------------+--------------------+
|  x|  y|                tile|                path|
+---+---+--------------------+--------------------+
| 68| 63|OutDbGridCoverage...|elsus_v2_4326_cog...|
|102| 10|OutDbGridCoverage...|elsus_v2_4326_cog...|
|110|  3|OutDbGridCoverage...|elsus_v2_4326_cog...|
| 65| 48|OutDbGridCoverage...|elsus_v2_4326_cog...|
| 17| 45|OutDbGridCoverage...|elsus_v2_4326_cog...|
| 50| 49|OutDbGridCoverage...|elsus_v2_4326_cog...|
| 49| 16|OutDbGridCoverage...|elsus_v2_4326_cog...|
| 63| 56|OutDbGridCoverage...|elsus_v2_4326_cog...|
| 31| 31|OutDbGridCoverage...|elsus_v2_4326_cog...|


The first step in the analysis process is to create an H3 hexagon grid over our road network. We use H3 level 6, each hexagon is about 36 km2, and each edge (and the distance from the center to any vertex) is 3.7 km. Caching (for faster access), repartitioning (for better layout), and creating a temporary view (for reuse in spatialSQL).

Creating a transportation risk index using H3 Hex bins as our mapping unit.
1.Isolate the transportation cooridors of Interest
2.Generate H3 hex bin map based on the intersection of these coordors
3.Engineer features based spatial relationships and add them to the H3 hex bin
4.Normalize feartures from 0-1 and then weight our features
5.Calcualte final risk score

In [None]:
Creating hexagons (H3 cells) in spatial analysis has several key advantages, especially when dealing with the analysis of geographic and spatial data. In this case, hexagons (H3 cells) are used to better map, analyze, and aggregate data in a geographic context. Here are some reasons why H3 hexagons are used:

1. Regularity and uniformity of coverage
H3 is a cell system based on spherical hexagons that provide uniform coverage of the earth's surface, without the so-called geographic distortions that can occur in traditional rectangular grids (e.g. in cartographic projections).

Instead of using rectangular grids that can have irregularities in different latitudes (e.g. distortions near the poles), H3 cells are perfectly even and have the same dimensions in every part of the map.

2. Optimization of spatial computations
H3 cells allow for more efficient spatial processing. Using hexagons as spatial units facilitates various operations, such as:

Calculating distances,

Aggregating data (e.g. calculating average values within a cell),

Spatial joins (e.g. identifying which areas are adjacent to each other).

3. Precision and scalability
Scalability: H3 allows creation cells at different levels of resolution, from larger cells (which cover large areas) to very small cells (which are ideal for very precise analyses in small areas). The resolution can be controlled using the hex_scale variable.

Precision: Using H3, you can perform more detailed spatial analyses because each cell covers a precisely defined geographic area. Cells can be used to monitor, analyze, or visualize data at different levels of detail.

4. Ease of data aggregation
In traditional rectangular grid systems, it can be more difficult to perform data aggregations (e.g. average speed within a given area) due to the irregularities in the shape of the cells. The hexagonal cells of H3 facilitate such calculations because their shape makes neighboring cells similar in size and shape, which makes them easier to aggregate.

5. Better coverage of spatial data
When dealing with spatial data analysis such as roads, point hazards (e.g. flood risk), road accidents, landslides, etc., H3 provides the ability to create a uniform spatial structure where each point and area is assigned to a specific cell.

This makes it easier to perform area-based analyses such as risk assessments (e.g. accident risk depending on a road) or flood risk assessments within a given cell.

6. Efficiency in data processing
Using H3, data is processed in a more optimized way and can be parallelized more easily in distributed systems such as Apache Spark. H3 cells are well suited for parallel and distributed computing because they allow data to be divided in a way that is easy to process on multiple machines.

7. Analysis in the context of networks (e.g. roads)
For roads and other transport networks, H3 cells allow for easy mapping of entire networks over a geographical area. It is easy to determine which roads or points are close to each other (e.g. in the case of traffic speed or accident risk analysis).

Creating H3 hexagons in spatial analysis enables:

Precise mapping and representation of geographic areas.

Efficient spatial operations (aggregation, joins, distance calculations).

Easier management of spatial data in distributed systems (e.g. Apache Spark).

Improved performance in calculations and analyses, especially with large data sets.


In [21]:
%%time
hex_scale= 6
road_hex = sedona.sql(f""" 
WITH h3_ids AS (SELECT 
   DISTINCT EXPLODE
               (
                   ST_H3CellIDs(geometry, {hex_scale}, true)
               ) AS h3 
FROM 
pl_transportation )


SELECT 
    h3,
    EXPLODE (ST_H3ToGeom(ARRAY(h3))) AS hex_geometry
FROM 
    h3_ids
""")
road_hex = road_hex.repartition(12)
road_hex.createOrReplaceTempView("hex_bins_d")

CPU times: user 2.94 ms, sys: 349 μs, total: 3.29 ms
Wall time: 73 ms


In [41]:
SedonaKepler.create_map(road_hex, "Hiking Trails")

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


                                                                                

KeplerGl(data={'Hiking Trails': {'index': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 1…

In [22]:
road_hex.count()

                                                                                

377

In [23]:
road_hex.show(3)



+------------------+--------------------+
|                h3|        hex_geometry|
+------------------+--------------------+
|604012742961004543|POLYGON ((15.9738...|
|604012398558314495|POLYGON ((19.4787...|
|604012531970736127|POLYGON ((18.6985...|
+------------------+--------------------+
only showing top 3 rows



                                                                                

Join the max speed limit from road network to out hexagons.
Left join to retain all mapping units and the join is predicated on the spatial relationship "Intersects".
Again repartition and create a temp view.

In [24]:
%%time
# 13 s
hex_scale= 6
road_speed_hex = sedona.sql(f""" 
SELECT
    FIRST(ARRAY_MAX(t2.speed_limits.max_speed.value)) AS speed_limit,
    FIRST(t1.hex_geometry) AS geometry,
    t1.h3
FROM 
    hex_bins_d t1
LEFT JOIN 
    pl_transportation t2
ON
    ST_INTERSECTS(t1.hex_geometry, t2.geometry)
GROUP BY 
    t1.h3
""")
road_speed_hex = road_speed_hex.repartition(12)
road_speed_hex.createOrReplaceTempView("road_speed_hex")
road_speed_hex.count()



CPU times: user 8.75 ms, sys: 0 ns, total: 8.75 ms
Wall time: 7 s


                                                                                

377

In [45]:
road_speed_hex.show()

                                                                                

+-----------+--------------------+------------------+
|speed_limit|            geometry|                h3|
+-----------+--------------------+------------------+
|        140|POLYGON ((15.5126...|604012733699981311|
|        140|POLYGON ((22.3708...|604013109778055167|
|        140|POLYGON ((18.4898...|604012552640266239|
|        140|POLYGON ((23.1164...|604013119307513855|
|        140|POLYGON ((18.3677...|604012524186107903|
|        140|POLYGON ((15.4080...|604012733834199039|
|        140|POLYGON ((14.9898...|604012783628976127|
|        140|POLYGON ((16.2677...|604012626594234367|
|        140|POLYGON ((19.6475...|604033363367428095|
|        140|POLYGON ((17.6502...|604012573041360895|
|         70|POLYGON ((22.7627...|604013120112820223|
|        140|POLYGON ((21.9543...|604012891674247167|
|        140|POLYGON ((19.2100...|604012411577434111|
|       NULL|POLYGON ((17.6293...|604012710346096639|
|        140|POLYGON ((20.8522...|604013037703135231|
|        140|POLYGON ((19.01

In [25]:
road_speed_hex.printSchema()

root
 |-- speed_limit: integer (nullable = true)
 |-- geometry: geometry (nullable = true)
 |-- h3: long (nullable = true)



In [48]:
kepler_map = SedonaKepler.create_map()
SedonaKepler.add_df(kepler_map, road_speed_hex, name="road_speed_hex")
kepler_map


User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


Out of range float values are not JSON compliant
Supporting this message is deprecated in jupyter-client 7, please make sure your message is JSON-compliant
  content = self.pack(content)


KeplerGl(data={'road_speed_hex': {'index': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, …

Hex bins are identified along major road networks and as an added benefit we brought in the speed limits.
Process of aggregating the rest of risk paramaters to H3 cell

Service Stations counts
Landslide counts
Emergency Services counts
Landslide potential
Flooding potential

In [26]:
sedona.sql(""" SELECT count(distinct h3) from hex_bins_d""").show()



+------------------+
|count(DISTINCT h3)|
+------------------+
|               377|
+------------------+



                                                                                

In [27]:
road_speed_hex.show(3)

                                                                                

+-----------+--------------------+------------------+
|speed_limit|            geometry|                h3|
+-----------+--------------------+------------------+
|        140|POLYGON ((15.5126...|604012733699981311|
|        140|POLYGON ((22.3708...|604013109778055167|
|        140|POLYGON ((18.4898...|604012552640266239|
+-----------+--------------------+------------------+
only showing top 3 rows



In [28]:
%%time

service_stations_df = sedona.sql("""
SELECT
    h.h3,
    count(p.id) as service_stations
FROM 
    hex_bins_d h
LEFT JOIN
    pl_service_stations p on ST_CONTAINS(h.hex_geometry,p.geometry)
GROUP BY 
    h.h3
""")
service_stations_df = service_stations_df.repartition(12)
service_stations_df.createOrReplaceTempView("hex_service_stations")
service_stations_df.count()



CPU times: user 8.53 ms, sys: 494 μs, total: 9.03 ms
Wall time: 6.5 s


                                                                                

377

In [29]:
service_stations_df.show(3)

                                                                                

+------------------+----------------+
|                h3|service_stations|
+------------------+----------------+
|604028358254133247|               0|
|604013306272808959|               2|
|604012595053068287|               0|
+------------------+----------------+
only showing top 3 rows



In [30]:
%%time

emergency_services_df = sedona.sql("""
SELECT
    h.h3,
    count(p.id) as emergency_services
FROM 
    hex_bins_d h
LEFT JOIN
    pl_emergency_services p on ST_CONTAINS(h.hex_geometry,p.geometry)
GROUP BY 
    h.h3
""")
emergency_services_df = emergency_services_df.repartition(12)
emergency_services_df.createOrReplaceTempView("hex_emergency_services")
emergency_services_df.count()



CPU times: user 5.58 ms, sys: 4.55 ms, total: 10.1 ms
Wall time: 6.65 s


                                                                                

377

In [31]:
emergency_services_df.show(3)

                                                                                

+------------------+------------------+
|                h3|emergency_services|
+------------------+------------------+
|604028358254133247|                 0|
|604012739739779071|                 0|
|604028357717262335|                 1|
+------------------+------------------+
only showing top 3 rows



In [32]:
pl_buildings_df.show(3)



+--------------------+--------------------+--------------------+-------+--------------------+-----------+-----------+-----+-----+---------+------+--------------+----------+----------------------+----------+---------+------------+---------------+-------------+----------+--------------+----------------+----------+-----------+
|                  id|            geometry|                bbox|version|             sources|    subtype|      class|names|level|has_parts|height|is_underground|num_floors|num_floors_underground|min_height|min_floor|facade_color|facade_material|roof_material|roof_shape|roof_direction|roof_orientation|roof_color|roof_height|
+--------------------+--------------------+--------------------+-------+--------------------+-----------+-----------+-----+-----+---------+------+--------------+----------+----------------------+----------+---------+------------+---------------+-------------+----------+--------------+----------------+----------+-----------+
|08b1f53c13694fff0...|

                                                                                

In [33]:
%%time    
res_buildings_df = sedona.sql("""


SELECT
    h.h3,
    count(b.id) as res_building
FROM 
    hex_bins_d h
LEFT JOIN
    pl_buildings b on ST_CONTAINS(h.hex_geometry,ST_CENTROID(b.geometry))
GROUP BY 
    h.h3
""")
res_buildings_df = res_buildings_df.repartition(12)
res_buildings_df.createOrReplaceTempView("hex_res_buildings")
res_buildings_df.count()



CPU times: user 6.63 ms, sys: 3.61 ms, total: 10.2 ms
Wall time: 6.46 s


                                                                                

377

In [34]:
%%time
res_buildings_df.where("res_building>0").show()



+------------------+------------+
|                h3|res_building|
+------------------+------------+
|604012529823252479|        1891|
|604028369662640127|         239|
|604029749286666239|         348|
|604012411443216383|           2|
|604012742558351359|          20|
|604033640929689599|          28|
|604010499645898751|        1018|
|604028357717262335|          20|
|604033364038516735|         180|
|604029455215624191|         460|
|604033675289427967|           1|
|604033635695198207|          12|
|604012710211878911|          89|
|604012427280908287|          97|
|604033394774376447|          18|
|604012521233317887|         382|
|604012703098339327|         268|
|604012661222408191|        1892|
|604012634781515775|         369|
|604012496805691391|          61|
+------------------+------------+
only showing top 20 rows

CPU times: user 20.5 ms, sys: 17.6 ms, total: 38 ms
Wall time: 19.1 s


                                                                                

In [36]:
res_buildings_df.count()

                                                                                

377

In [37]:
%%time

enriched_hex_df = sedona.sql("""
SELECT
    r.h3,
    r.geometry,
    r.speed_limit,
    s.service_stations,
    r.res_building,
    e.emergency_services
FROM
    road_speed_hex r
LEFT JOIN    
    hex_service_stations s using(h3) 
LEFT JOIN
    hex_res_buildings r using(h3) 
LEFT JOIN
    hex_emergency_services e using(h3)
""")

enriched_hex_df = enriched_hex_df.repartition(12)
enriched_hex_df.createOrReplaceTempView("enriched_hex")
enriched_hex_df.count()



CPU times: user 9.18 ms, sys: 602 μs, total: 9.78 ms
Wall time: 6.73 s


                                                                                

377

In [38]:
%%time
enriched_hex_df.show()

                                                                                

+------------------+--------------------+-----------+----------------+------------+------------------+
|                h3|            geometry|speed_limit|service_stations|res_building|emergency_services|
+------------------+--------------------+-----------+----------------+------------+------------------+
|604033363367428095|POLYGON ((19.6475...|        140|               0|         118|                 0|
|604013036629393407|POLYGON ((20.6428...|         50|               1|          33|                 2|
|604013119307513855|POLYGON ((23.1164...|        140|               0|           4|                 0|
|604012739739779071|POLYGON ((16.3459...|       NULL|               1|          23|                 0|
|604033636366286847|POLYGON ((18.8859...|        140|               0|           6|                 0|
|604033361354162175|POLYGON ((19.8468...|        140|               0|          88|                 0|
|604012500563787775|POLYGON ((17.1567...|        140|               0|   

In [61]:
kepler_map = SedonaKepler.create_map()
SedonaKepler.add_df(kepler_map, enriched_hex_df, name="roads")
kepler_map

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


Out of range float values are not JSON compliant
Supporting this message is deprecated in jupyter-client 7, please make sure your message is JSON-compliant
  content = self.pack(content)


KeplerGl(data={'roads': {'index': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 2…

Raster Data Joining with Zonal Statistics

In [39]:
%%time

landslide_df = sedona.sql("""
WITH matched_tile AS (
  SELECT 
      t2.tile,
      t1.h3,
      t1.geometry
  FROM 
      enriched_hex t1
  LEFT JOIN   
      land_slide_risk_tiled t2
  ON ST_OVERLAPS(RS_ENVELOPE(tile), t1.geometry)  
)

SELECT 
    max(RS_ZonalStats(tile, geometry, 'max')) AS max_landslide_class,
    matched_tile.h3 
FROM matched_tile
group by matched_tile.h3
""")

CPU times: user 1.42 ms, sys: 159 μs, total: 1.57 ms
Wall time: 65.2 ms


In [40]:
landslide_df.show(3)



+-------------------+------------------+
|max_landslide_class|                h3|
+-------------------+------------------+
|               NULL|604013119307513855|
|               NULL|604012529957470207|
|               NULL|604012670214995967|
+-------------------+------------------+
only showing top 3 rows



                                                                                

In [41]:
matched_tile_df = sedona.sql("""
SELECT 
    t2.tile,
    t1.h3,
    t1.geometry
FROM 
    enriched_hex t1
LEFT JOIN   
    land_slide_risk_tiled t2
ON ST_OVERLAPS(RS_ENVELOPE(tile), t1.geometry)
""")

In [42]:
matched_tile_df.printSchema()

root
 |-- tile: raster (nullable = true)
 |-- h3: long (nullable = true)
 |-- geometry: geometry (nullable = true)



In [43]:
matched_tile_df.count()

                                                                                

488

In [68]:
SedonaKepler.create_map(matched_tile_df, "Hiking Trails")

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


                                                                                

KeplerGl(data={'Hiking Trails': {'index': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 1…

In [44]:
matched_tile_df.createOrReplaceTempView("matched_tile")

In [45]:
landslide_df = sedona.sql("""
SELECT 
    MAX(RS_ZonalStats(tile, geometry, 'max')) AS max_landslide_class,
    h3 
FROM matched_tile
GROUP BY h3
""")

In [46]:
landslide_df.show()



+-------------------+------------------+
|max_landslide_class|                h3|
+-------------------+------------------+
|               NULL|604013119307513855|
|               NULL|604012529957470207|
|               NULL|604012670214995967|
|               NULL|604029475616718847|
|                2.0|604012531702300671|
|               NULL|604013120783908863|
|               NULL|604013106556829695|
|               NULL|604012731820933119|
|               NULL|604012396410830847|
|               NULL|604013118367989759|
|               NULL|604013037703135231|
|               NULL|604012492510724095|
|               NULL|604012550358564863|
|               NULL|604012635452604415|
|               NULL|604012396947701759|
|               NULL|604033361354162175|
|               NULL|604010508235833343|
|               NULL|604029749823537151|
|                1.0|604033397995601919|
|               NULL|604012739068690431|
+-------------------+------------------+
only showing top

                                                                                

In [47]:
landslide_df = landslide_df.repartition(12)

In [48]:
landslide_risk_tiled_df.unpersist()

DataFrame[x: int, y: int, tile: udt, path: string]

In [49]:
landslide_risk_tiled_df.show(3)

+---+---+--------------------+--------------------+
|  x|  y|                tile|                path|
+---+---+--------------------+--------------------+
| 68| 63|OutDbGridCoverage...|elsus_v2_4326_cog...|
|102| 10|OutDbGridCoverage...|elsus_v2_4326_cog...|
|110|  3|OutDbGridCoverage...|elsus_v2_4326_cog...|
+---+---+--------------------+--------------------+
only showing top 3 rows



In [50]:
landslide_df = landslide_df.repartition(12)
landslide_risk_tiled_df.unpersist()
landslide_df.createOrReplaceTempView('hex_landslide')
landslide_df.count()

                                                                                

377

In [51]:
landslide_df.show(3)



+-------------------+------------------+
|max_landslide_class|                h3|
+-------------------+------------------+
|               NULL|604010507833180159|
|               NULL|604012894224383999|
|               NULL|604013043206062079|
+-------------------+------------------+
only showing top 3 rows



                                                                                

In [52]:
landslide_df.printSchema()

root
 |-- max_landslide_class: double (nullable = true)
 |-- h3: long (nullable = true)



In [53]:
%%time
landslide_df.where("max_landslide_class is not null").show()



+-------------------+------------------+
|max_landslide_class|                h3|
+-------------------+------------------+
|                1.0|604012398960967679|
|                1.0|604033373702193151|
|                1.0|604012337757683711|
|                1.0|604033674886774783|
|                1.0|604028696080154623|
|                2.0|604012742558351359|
|                2.0|604012733834199039|
|                1.0|604012749269237759|
|                2.0|604012735981682687|
|                2.0|604013304930631679|
|                3.0|604028371541688319|
|                3.0|604013296340697087|
|                1.0|604028698496073727|
|                1.0|604012595187286015|
|                1.0|604012891808464895|
|                1.0|604013120112820223|
|                1.0|604033670726025215|
|                2.0|604012749403455487|
|                2.0|604012662296150015|
|                1.0|604012661222408191|
+-------------------+------------------+
only showing top

                                                                                

In [54]:
%%time

flood_df = sedona.sql("""
WITH matched_tile AS (
  SELECT 
      t2.tile,
      t2.event,
      t1.h3,
      t1.geometry
  FROM 
       enriched_hex t1
  LEFT JOIN   
      flood_raster_tiled t2
  ON ST_OVERLAPS(RS_ENVELOPE(t2.tile), t1.geometry) 
)

SELECT 
    COUNT(RS_ZonalStats(matched_tile.tile,geometry, 'count')) AS flood_point_cnt,
    matched_tile.h3, 
    matched_tile.event
FROM matched_tile
GROUP BY 
h3, 
event
""")
flood_df = flood_df.repartition(12)
flood_df.createOrReplaceTempView('hex_flood')
flood_df.count()



CPU times: user 20.5 ms, sys: 862 μs, total: 21.4 ms
Wall time: 21.4 s


                                                                                

853

In [56]:
flood_df.printSchema()

root
 |-- flood_point_cnt: long (nullable = false)
 |-- h3: long (nullable = true)
 |-- event: integer (nullable = true)



In [57]:
flood_df.show(3)



+---------------+------------------+-----+
|flood_point_cnt|                h3|event|
+---------------+------------------+-----+
|              2|604033401619480575|  100|
|              2|604033641198125055|   10|
|              2|604033672739291135|  100|
+---------------+------------------+-----+
only showing top 3 rows



                                                                                

In [84]:
sedona.sql("SELECT * from hex_flood where h3=604033401619480575 order by event DESC").show()



+---------------+------------------+-----+
|flood_point_cnt|                h3|event|
+---------------+------------------+-----+
|              2|604033401619480575|  100|
|              2|604033401619480575|   50|
|              2|604033401619480575|   10|
+---------------+------------------+-----+



                                                                                

In [58]:
%%time
flood_pivot_df = sedona.sql("""

SELECT * FROM hex_flood
    PIVOT (
        SUM(flood_point_cnt) AS a
        FOR event IN ('10' AS ten_year,'20' as twenty_year,'30' as thirty,'40' as fourty,'50' as fifty,'75' as seventyfive,'100' as one_hundred,'200' as two_hundred,'500' as five_hundred)
    )

""")
flood_pivot_df = flood_pivot_df.repartition(15)

flood_pivot_df.createOrReplaceTempView('hex_flood_pivot')

CPU times: user 2.29 ms, sys: 0 ns, total: 2.29 ms
Wall time: 59.8 ms


In [59]:
flood_pivot_df.show()



+------------------+--------+-----------+------+------+-----+-----------+-----------+-----------+------------+
|                h3|ten_year|twenty_year|thirty|fourty|fifty|seventyfive|one_hundred|two_hundred|five_hundred|
+------------------+--------+-----------+------+------+-----+-----------+-----------+-----------+------------+
|604012396679266303|       2|       NULL|  NULL|  NULL|    2|       NULL|          2|       NULL|        NULL|
|604033361354162175|       2|       NULL|  NULL|  NULL|    2|       NULL|          2|       NULL|        NULL|
|604013036629393407|       4|       NULL|  NULL|  NULL|    4|       NULL|          4|       NULL|        NULL|
|604033434771259391|    NULL|       NULL|  NULL|  NULL| NULL|       NULL|       NULL|       NULL|        NULL|
|604029749823537151|    NULL|       NULL|  NULL|  NULL| NULL|       NULL|       NULL|       NULL|        NULL|
|604033353435316223|       2|       NULL|  NULL|  NULL|    2|       NULL|          2|       NULL|        NULL|
|

                                                                                

In [60]:
flood_pivot_df.count()

                                                                                

377

In [61]:
landslide_df.select(expr('MAX(max_landslide_class)')).show()



+------------------------+
|max(max_landslide_class)|
+------------------------+
|                     4.0|
+------------------------+



                                                                                

In [62]:
landslide_df.select(expr('MIN(max_landslide_class)')).show()

                                                                                

+------------------------+
|min(max_landslide_class)|
+------------------------+
|                     1.0|
+------------------------+



In [63]:
enriched_hex_df.select(expr('Max(speed_limit)')).show()

                                                                                

+----------------+
|max(speed_limit)|
+----------------+
|             140|
+----------------+



In [None]:
enriched_hex_df.select(expr('MIN(speed_limit)')).show()

In [None]:
enriched_hex_df.select(expr('MAX(service_stations)')).show()

In [None]:
enriched_hex_df.select(expr('MIN(service_stations)')).show()

In [None]:
enriched_hex_df.select(expr('MAX(res_building)')).show()

In [None]:
enriched_hex_df.select(expr('MIN(res_building)')).show()

In [None]:
enriched_hex_df.select(expr('MAX(emergency_services)')).show()

In [None]:
enriched_hex_df.select(expr('MIN(emergency_services)')).show()

In [None]:
flood_pivot_df.select(expr('MAX(ten_year)')).show()

In [None]:
flood_pivot_df.select(expr('MIN(ten_year)')).show()

In [None]:
flood_pivot_df.select(expr('MAX(twenty_year)')).show()

In [None]:
flood_pivot_df.select(expr('MIN(twenty_year)')).show()

In [None]:
flood_pivot_df.select(expr('MAX(thirty)')).show()

In [None]:
flood_pivot_df.select(expr('MIN(thirty)')).show()

In [None]:
flood_pivot_df.select(expr('MAX(fourty)')).show()

In [None]:
flood_pivot_df.select(expr('MIN(fourty)')).show()

In [None]:
flood_pivot_df.select(expr('MAX(fifty)')).show()

In [None]:
flood_pivot_df.select(expr('MIN(fifty)')).show()

In [None]:
flood_pivot_df.select(expr('MAX(seventyfive)')).show()

In [None]:
flood_pivot_df.select(expr('MIN(seventyfive)')).show()

In [None]:
flood_pivot_df.select(expr('MAX(one_hundred)')).show()

In [None]:
flood_pivot_df.select(expr('MIN(one_hundred)')).show()

In [None]:
flood_pivot_df.select(expr('MAX(two_hundred)')).show()

In [None]:
flood_pivot_df.select(expr('MIN(two_hundred)')).show()

In [None]:
flood_pivot_df.select(expr('MAX(five_hundred)')).show()

In [None]:
flood_pivot_df.select(expr('MIN(five_hundred)')).show()

In [64]:
%%time
combined_df = sedona.sql("""
SELECT
    t1.geometry,
    .3 *(COALESCE((t1.res_building - 0) / (21042 -0 ),0)) AS res_building,
1 - (.5 *(COALESCE((t1.emergency_services - 0) / (113 - 0),0))) AS emergency_services,
1 - (.5 *(COALESCE((t1.service_stations - 0) / (59 - 0),0))) AS service_stations,
    .3 *(COALESCE((t1.speed_limit -30 ) / (130 - 30 ),0)) AS speed_limit,
    .6 *(COALESCE(1,0)) AS max_landslide_class,
    .4 *(COALESCE((t3.ten_year - 0) / (4 - 0),0)) AS ten_flood_point_cnt,
    .3 *(COALESCE((t3.twenty_year - 0) / (4 - 0),0)) AS twenty_flood_point_cnt,
    .2 *(COALESCE((t3.thirty - 0) / (4 - 0),0)) AS thirty_flood_point_cnt,
    .1 *(COALESCE((t3.fourty - 0) / (4 - 0),0)) AS fourty_flood_point_cnt,
    .09 *(COALESCE((t3.fifty - 0) / (4 - 0),0)) AS fifty_flood_point_cnt,
    .08 *(COALESCE((t3.seventyfive - 0) / (4 - 0),0)) AS seventyfive_flood_point_cnt,
    .07 *(COALESCE((t3.one_hundred - 0) / (4 - 0),0)) AS one_hundred_flood_point_cnt,
    .06 *(COALESCE((t3.two_hundred - 0) / (4 - 0),0)) AS two_hundred_flood_point_cnt,
    .05 *(COALESCE((t3.five_hundred - 0) / (4 - 0),0)) AS five_hundred_flood_point_cnt

FROM 
    enriched_hex t1
LEFT JOIN 
    hex_flood_pivot t3 USING (h3)
LEFT JOIN 
    hex_landslide t4 USING (h3)

""")


combined_df = combined_df.repartition(12)
combined_df.createOrReplaceTempView('combined_hex')
combined_df.count()



CPU times: user 11.1 ms, sys: 114 μs, total: 11.2 ms
Wall time: 7.63 s


                                                                                

377

In [65]:
final_hex = sedona.sql("""

SELECT
    geometry,
res_building+emergency_services+service_stations+speed_limit+max_landslide_class+ten_flood_point_cnt+twenty_flood_point_cnt+thirty_flood_point_cnt+fourty_flood_point_cnt+fifty_flood_point_cnt+seventyfive_flood_point_cnt+one_hundred_flood_point_cnt+two_hundred_flood_point_cnt+five_hundred_flood_point_cnt as risk_score
FROM
    combined_hex
""")

In [66]:
final_hex.createOrReplaceTempView("final_risk")

In [None]:
final_hex_normal= sedona.sql("""
SELECT 
    geometry,
    (risk_score - 2.191 ) / (4.25-1.337) as norm_risk_score
FROM 
    final_risk 

""")
final_hex_normal=final_hex_normal.cache()
final_hex_normal.count()

In [None]:
kepler_map = SedonaKepler.create_map()
SedonaKepler.add_df(kepler_map, final_hex_normal, name="combined_df")
kepler_map