# Quickstart notebook

In [0]:
from pyspark.sql.functions import *

## Geometry constructors and the Mosaic internal geometry format

Mosaic allows uses to create new Point geometries from a pair of Spark DoubleTypes columns.

In [0]:
from mosaic import st_point

lons = [-80., -80., -70., -70., -80.]
lats = [ 35.,  45.,  45.,  35.,  35.]

bounds_df = (
  spark
  .createDataFrame({"lon": lon, "lat": lat} for lon, lat in zip(lons, lats))
  .coalesce(1)
  .withColumn("point_geom", st_point("lon", "lat"))
)
bounds_df.show()

_Note: the first time you import from the Mosaic Python package, the supporting Scala classes will be copied to your cluster nodes. You can, therefore, expect this first command to take slightly longer to execute than subsequent actions._

Mosaic Point geometries can be aggregated into LineString and Polygon geometries using the respective constructors.

In [0]:
from mosaic import st_makeline

bounds_df = (
  bounds_df
  .groupBy()
  .agg(collect_list("point_geom").alias("bounding_coords"))
  .select(st_makeline("bounding_coords").alias("bounding_ring"))
)
bounds_df.show()

In [0]:
from mosaic import st_makepolygon

bounds_df = bounds_df.select(st_makepolygon("bounding_ring").alias("bounds"))
bounds_df.show()

## Geometry clipping without an index

Mosaic implements set intersection functions: contains, intersects, overlaps etc. Here you can see `st_contains` being used to clip points by a polygon geometry.

In [0]:
tripsTable = spark.table("delta.`/databricks-datasets/nyctaxi/tables/nyctaxi_yellow`")

In [0]:
from mosaic import st_contains
trips = (
  tripsTable
  .limit(5_000_000)
  .repartition(sc.defaultParallelism * 20)
  .drop("vendorId", "rateCodeId", "store_and_fwd_flag", "payment_type")
  .withColumn("pickup_geom", st_point("pickup_longitude", "pickup_latitude"))
  .withColumn("dropoff_geom", st_point("dropoff_longitude", "dropoff_latitude"))
  .crossJoin(bounds_df)
  .where(st_contains("bounds", "pickup_geom"))
  .where(st_contains("bounds", "dropoff_geom"))
  .cache()
)

In [0]:
trips.show()

## Read from GeoJson, compute some basic geometry attributes

You've seen how Mosaic can create geometries from Spark native data types but it also provides functions to translate Well Known Text (WKT), Well Known Binary (WKB) and GeoJSON representations to Mosaic geometries.

In [0]:
from mosaic import st_geomfromgeojson

geoJsonDF = (
  spark.read.format("json")
  .load("dbfs:/FileStore/shared_uploads/stuart.lynn@databricks.com/NYC_Taxi_Zones.geojson")
  .withColumn("geometry", st_geomfromgeojson(to_json(col("geometry"))))
  .select("properties.*", "geometry")
  .drop("shape_area", "shape_leng")
)

geoJsonDF.show()

Mosaic provides a number of functions for extracting the properties of geometries. Here are some that are relevant to Polygon geometries:

In [0]:
from mosaic import st_area, st_length
(
  geoJsonDF
  .withColumn("calculatedArea", abs(st_area("geometry")))
  .withColumn("calculatedLength", st_length("geometry"))
  .select("geometry", "calculatedArea", "calculatedLength")
).show()

In [0]:
geoJsonDF.count()

## Example point-in-poly with indexing

Mosaic has built-in support for the popular spatial indexing library, H3. The user has access to functions for generating point indices and the sets of indices covering polygons, allowing point-in-polygon joins to be transformed into deterministic SQL joins.

In [0]:
from mosaic import point_index

trips_with_geom = (
  trips
  .withColumn("pickup_h3", point_index(lng="pickup_longitude", lat="pickup_latitude", resolution=lit(10)))
  .withColumn("dropoff_h3", point_index(lng="dropoff_longitude", lat="dropoff_latitude", resolution=lit(10)))
)

trips_with_geom.show()

In [0]:
from mosaic import polyfill

neighbourhoods = (
  geoJsonDF
  .repartition(sc.defaultParallelism)
  .select("*", explode(polyfill("geometry", lit(10))).alias("h3"))
  .drop("geometry")
)

neighbourhoods.show()

In [0]:
joined_df = trips_with_geom.alias("t").join(neighbourhoods.alias("n"), on=expr("t.pickup_h3 = n.h3"), how="inner")
joined_df.count()

## Mosaic spatial join optimizations

Mosaic provides easy access to the optimized spatial join technique described in [this](https://databricks.com/blog/2021/10/11/efficient-point-in-polygon-joins-via-pyspark-and-bng-geospatial-indexing.html) blog post.

(this step is required due to incompatibility between Databricks Runtime and open source Spark)

In [0]:
%run ../GeneratorsHotfix

In [0]:
from mosaic.patch import mosaic_explode

mosaic_neighbourhoods = (
  geoJsonDF
  .repartition(sc.defaultParallelism)
  .select("*", mosaic_explode("geometry", lit(10)))
  .drop("geometry")
)

mosaic_neighbourhoods.show()

Now the two datasets can be joined first on H3 index, with any false positives removed through a contains filter on a much simpler geometry.

In [0]:
mosaic_joined_df = (
  trips_with_geom.alias("t")
  .join(mosaic_neighbourhoods.alias("n"), on=expr("t.pickup_h3 = n.h3"), how="inner")
  .where(
    ~col("is_core") | 
    st_contains("wkb", "pickup_geom")
  )
)

mosaic_joined_df.show()