# Geospatial Join and Visualization Tutorial #
This is a tutorial to show you how to join your geospatial data to polygon geometry and visualize it in Carto.

# Spark VS GeoPandas #
Geopandas is a very easy format to work with. It just "works" without too much fuss. But when using Databricks, it is best to stay in in Spark. Like with Pandas which uses Geopandas for geospatial analysis, Spark uses Apache Sedona for geospatial analysis. Sedona is very performant and optimized for working in Lakehouses like Databricks Delta Tables. 

To turn any Spark DataFrame or sdf, simply read the column with a Sedona function. Sedona functions being with "ST". Such as ```ST_Point*()``` to read the column as a Point, or ```ST_GeomFromWKB()``` to read binary polygon geometry like the kind found in Carto. This will create a geometry column, but it will be in binary. You need to *deserialize* the binary into Well-Known-Text by calling ```ST_ASWKT()``` Spatial Joins must happen on deserialized data, but Carto needs to preserve the seriallized data as a well-known binaryto render on their platform.

start with a geospatial-enabled cluster with Apache Sedona installed.

In the upper right hand corner, select "Connect" and select "geospatial-cluster-smallest-available". It may take a few minutes to start.

In [0]:
import sedona

In [0]:
from pyspark.sql.functions import col, expr

Go to the catalog to find the path to your data set. They are stored as Delta Tables, a variant of Parquety which are light-weight, static files on S3, which is easy to query and has version control features.

## read the table ##

In [0]:
#this reads the 311 data as a vanilla spark dataframe. It has a latitude and lognitude coordinate
sdf = spark.read.table("moo_ops_workspace.threeoneone.sr_mirror_2024")
sdf.printSchema()
sdf.display(5)

## import tools 

#Parse the Lat Lon into a point#
## this will turn it into a Sedona geospatial dataframe ##


Carto stores everything as a WKB or Well-Known Binary. To join two geometries in Spark/Sedona they need to both be deserialized into Well Known Text

You may convert to WKB using ST_AsBinary(). This is called serialization. This converts it into a from that Carto can read.

You may reverse this by ST_GeomFromWKB. This is called deserialization. This converts it into a format that Databricks can read.

'ST' Either stands for Spatial Type or 'Spatial Transformation', not sure. When you cast the Latitudes and Longitudes using ST_Point, your Spark Dataframe becomes a Sedona Geospatial dataframe.

When it is time to upload to carto, the columns need to be serialized again into a Well-Known-Binary

In [0]:
#turn your spark dataframe into a sedona geodataframe by creating an ST_POINT colum.
points_311 = sdf.withColumn("the_geom", expr(f"ST_AsBinary(ST_Point(LON, LAT))"))
#geom deserializaed is a string, well-known text format.
points_311 = points_311.withColumn("geom_deserialized", expr("ST_ASWKT(ST_GeomFromWKB(the_geom))"))
points_311.display(5)

# Import a geospatial dataset #
## In this step we import a Carto-Prepared geometry. ##
Sedona is already registered in the geospatial clusters.

When you attempt to import a map into Carto, it will throw an error and ask you if you want to prepare it. A *Prepared map* adds the Carto max and min bounding box to the dataframe. You may prepare a map in the Carto interface by uploading a map, attempting to display it, and there will be a warning about the map needs to be prepared, and asking you to prepare it. When you have prepared it, it will save as a table in your catalog.

In [0]:
nypd_map = spark.read.table("moo_ops_workspace.carto.nypd_prepared")
nypd_map.display()
### read the pinary geometry from the Carto prepared table, simplify the geometry to ease processing time, and deserialize it.
nypd_map = nypd_map.withColumn("geom_deserialized", expr(f"ST_ASWKT(ST_SIMPLIFY(ST_GeomFromWKB(the_geom), 0.001))"))
nypd_map.display(5)
nypd_map.printSchema()

## Join the two maps using geospatial SQL

the point geometry will be suprerceded with the polygon geometry

To take advantage of multiple workers, partition the data 

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", 200)
points_311 = points_311.repartition(200)
nypd_map = nypd_map.repartition(200)


In [0]:
#pass the data frames in as variables in an python f-string
points_311.createOrReplaceTempView("points_311")
nypd_map.createOrReplaceTempView("nypd_map")
#group by all the geometry columns to keep them.
#In order to keep the map carto-prepared you need to preserve the __carto_ columns.
#the group by will separate into groups by the deserialized geometry. 
# filter by date because this takes a long time. 3 months about 2 minutes.
# only seems to be using the minimum two workers.
result_df = spark.sql(f"""SELECT
                        FIRST(nypd_map.__carto_xmin) AS __carto_xmin,
                        FIRST(nypd_map.__carto_xmax) AS __carto_xmax,
                        FIRST(nypd_map.__carto_ymin) AS __carto_ymin, 
                        FIRST (nypd_map.__carto_ymax) AS __carto_ymax,
                        COUNT(*) as 311_calls, 
                        FIRST(nypd_map.precinct) AS precinct,
                        FIRST(nypd_map.the_geom) AS the_geom,
                        nypd_map.geom_deserialized
                        
                      FROM points_311, nypd_map
                      --ST_Contains is where the spatial join happens. It is joined on the deserialized columns.
                      WHERE ST_Contains(nypd_map.geom_deserialized, points_311.geom_deserialized)
                        AND points_311.created_date >= '2024-09-01'
                      --include in the group by all the columns you want to keep.
                      GROUP BY 
                        nypd_map.geom_deserialized
                      """)
result_df.display()


**Note: Carto requires the Geometry column to be called the_geom**

# Save the Map#

By saving the map, this should refresh the carto map. Carto reads the geospatial delta table.

In [0]:
%sql
DROP TABLE IF EXISTS moo_ops_workspace.threeoneone.311_calls_by_precinct

In [0]:

#save result_df as a delta table in the catalog
#this will scale up to use available workers.
result_df.write.format("delta").mode("overwrite").saveAsTable("moo_ops_workspace.threeoneone.311_calls_by_precinct")

In [0]:
#save as a non-geospatial table
no_map_df = result_df[['311_calls', 'precinct']]
no_map_df.write.format('delta').mode("overwrite").saveAsTable("moo_ops_workspace.threeoneone.311_calls_no_map")

# Visualization in PyDeck #

Display using pyDeck

This isn't working for me.

In [0]:
# Define a function to map 311_calls to colors
def get_color(calls):
    if calls < 20000:
        return [255, 255, 204, 160]
    elif calls < 40000:
        return [255, 237, 160, 160]
    elif calls < 60000:
        return [254, 217, 118, 160]
    elif calls < 80000:
        return [254, 178, 76, 160]
    else:
        return [253, 141, 60, 160]

In [0]:
def get_color_rgb(calls):
    if calls < 20000:
        return "rgb(255, 255, 204)"
    elif calls < 40000:
        return "rgb(255, 237, 160)"
    elif calls < 60000:
        return "rgb(254, 217, 118)"
    elif calls < 80000:
        return "rgb(254, 178, 76)"
    else:
        return "rgb(253, 141, 60)"

In [0]:
import pydeck as pdk
import pandas as pd
import geopandas as gpd
import shapely
import sys
sys.setrecursionlimit(100000)


In [0]:
# have to get out of spark dataframe and into a geopandas geodataframe.
df = result_df.toPandas()

In [0]:
#load the geometry into a geopandas dataframe
df['geometry'] = df['the_geom'].apply(shapely.wkb.loads)
#df['geometry'] = df['geom_deserialized']
gdf = gpd.GeoDataFrame(df, geometry='geometry', crs='EPSG:4326')
gdf.head(5)
#apply a fill color for the data
gdf['fill_color'] = gdf['311_calls'].apply(get_color)
gdf['fill_color_rgb'] = gdf['311_calls'].apply(get_color_rgb)
del gdf['the_geom']
del gdf['geom_deserialized']
#simplify the geometry to make it smaller. Prevents the json recursion from getting too deep.
gdf['geometry'] = gdf['geometry'].simplify(0.0001)
geojson = gdf.to_json()


In [0]:
gdf.head(5)
geojson[:1000]

*Deck doesn't seem to be working for me*

In [0]:

LAND_COVER = [[-74.2654, 40.4964], [-73.7004, 40.4964], [-73.7004, 40.9156], [-74.2654, 40.9156]]
geojson_layer = pdk.Layer(
    "GeoJsonLayer",
    geojson,
    opacity=0.8,
    stroked=True, 
    filled=True,
    get_fill_color='properties.fill_color',
    pickable=True)
'''
polygon_layer = pdk.Layer(
    "PolygonLayer",
    LAND_COVER,
    get_polygon='-',
    get_fill_color='[0, 0, 0, 20]',)
'''
view_state = pdk.ViewState(latitude=40.7128, longitude=-74.0060, zoom=11, bearing=0, pitch=0)

r = pdk.Deck(layers=[geojson_layer], 
             initial_view_state=view_state)
r.to_html()

# Visualization using Folium #

Was able to get this to work. Folium is widely used in industry for map visualization.

In [0]:
import folium

In [0]:
gdf['fill_color_rgb'][:5]

In [0]:
geojson = gdf.to_json()

In [0]:
m = folium.Map(location=[40.7128, -74.0060], zoom_start=11)
# folium takes color properties in the format "rgb(255, 255, 255)" or "rgba(255, 255, 255, 1.0)". It also accepts hex "#ff0000" for color
folium.GeoJson(geojson, 
               style_function=lambda feature: { 'fillColor': feature['properties']['fill_color_rgb'],
                                                'color': 'black', 
                                                'weight': 1, 
                                                'fillOpacity': 0.8
                                             }           
               ).add_to(m)
display(m)