In [1]:
from pyspark.sql import SparkSession
from delta import *
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from dataclasses import dataclass
from shapely.geometry import Point
from shapely.geometry import Polygon
from pyspark.sql import DataFrame
from os import getcwd
import shutil
import os
from sedona.sql.types import GeometryType
from shapely.wkt import loads
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

In [2]:
def cleanup():
    if os.path.exists("./spark-warehouse"):
        shutil.rmtree("./spark-warehouse")
cleanup()

In [3]:
def get_table_path(table_name: str) ->str :
    return f"{getcwd()}/spark-warehouse/{table_name}"

In [4]:
@dataclass
class GeomExtent:
    geom: Polygon
    identifier: int

        
@dataclass
class CityMeasure:
    city: str
    index: int
    measure: float
    geom: str
        
        
def create_table_as_select(delta_table_name: str, data_frame: DataFrame):
    view_name = f"{delta_table_name}_view"
    data_frame.createOrReplaceTempView(view_name)
    spark.sql(
        f"""
            CREATE TABLE {delta_table_name} using delta LOCATION '{get_table_path(delta_table_name)}' AS SELECT * FROM {view_name}
        """
    ).show()
    
    
def load_delta_table(table_name: str) -> DataFrame:
    return spark.sql(f"SELECT * FROM delta.`{get_table_path(table_name)}`")

In [5]:
spark = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.serializer", KryoSerializer.getName) \
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName) \
    .config('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.2.1-incubating,org.datasyslab:geotools-wrapper:1.1.0-25.2,io.delta:delta-core_2.12:2.1.0') \
    .getOrCreate()

22/10/01 12:20:11 WARN Utils: Your hostname, macpro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.100 instead (on interface en0)
22/10/01 12:20:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/Users/pawel.kocinski/Library/Caches/pypoetry/virtualenvs/binder-7uni60N0-py3.8/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/pawel.kocinski/.ivy2/cache
The jars for the packages stored in: /Users/pawel.kocinski/.ivy2/jars
org.apache.sedona#sedona-python-adapter-3.0_2.12 added as a dependency
org.datasyslab#geotools-wrapper added as a dependency
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-180758d7-bd3a-4285-833d-431142bd21e0;1.0
	confs: [default]
	found org.apache.sedona#sedona-python-adapter-3.0_2.12;1.2.1-incubating in central
	found org.locationtech.jts#jts-core;1.18.2 in local-m2-cache
	found org.wololo#jts2geojson;0.16.1 in central
	found org.apache.sedona#sedona-core-3.0_2.12;1.2.1-incubating in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.5.0 in local-m2-cache
	found org.apache.sedona#sedona-sql-3.0_2.12;1.2.1-incubating in central
	found org.datasyslab#geotools-wrapper;1.1.0-25.2 in central
	found io.delta#delta-core_2.12;2.1.0 in central
	found io.delta#delta-storage;2.1.0 in 

22/10/01 12:20:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [6]:
SedonaRegistrator.registerAll(spark)

                                                                                

True

# Create Delta table with geometry type

In [7]:
spark.sql("CREATE DATABASE delta")

DataFrame[]

In [8]:
DeltaTable.\
    create(spark).\
    location("locations").\
    addColumn("identifier", "BIGINT").\
    addColumn("geom", GeometryType()).\
    execute()

                                                                                

<delta.tables.DeltaTable at 0x7fe6dad34460>

# Create delta table as select

In [9]:
geospatial_df = spark.createDataFrame(
    [
        CityMeasure(index = 1, geom="POINT(19.936160 50.061486)", city="Cracow", measure = 0.0),
        CityMeasure(index = 1, geom="POINT(21.037308 52.226236)", city="Warsaw", measure = 0.0),
        CityMeasure(index = 1, geom="POINT(4.899059 52.363573)", city="Amsterdam", measure = 0.0),
        CityMeasure(index = 1, geom="POINT(-118.240286 34.004080)", city="Los Angeles", measure = 0.0),
        CityMeasure(index = 1, geom="POINT(-77.054092 38.934715)", city="Washington", measure = 0.0),
        CityMeasure(index = 1, geom="POINT(2.152872 41.377910)", city="Barcelona", measure = 0.0),
        CityMeasure(index = 1, geom="POINT(13.419083 52.504049)", city="Berlin", measure = 0.0),
        CityMeasure(index = 1, geom="POINT(-74.047214 40.742164)", city="New York", measure = 0.0)
        
    ],
)
geospatial_df.createOrReplaceTempView("sedona_table")

In [10]:
geospatial_df.show()

+-----------+--------------------+-----+-------+
|       city|                geom|index|measure|
+-----------+--------------------+-----+-------+
|     Cracow|POINT(19.936160 5...|    1|    0.0|
|     Warsaw|POINT(21.037308 5...|    1|    0.0|
|  Amsterdam|POINT(4.899059 52...|    1|    0.0|
|Los Angeles|POINT(-118.240286...|    1|    0.0|
| Washington|POINT(-77.054092 ...|    1|    0.0|
|  Barcelona|POINT(2.152872 41...|    1|    0.0|
|     Berlin|POINT(13.419083 5...|    1|    0.0|
|   New York|POINT(-74.047214 ...|    1|    0.0|
+-----------+--------------------+-----+-------+



In [11]:
table_name = "sedona_delta"

spark.sql(
f"""
    CREATE TABLE IF NOT EXISTS {table_name} using delta LOCATION '{get_table_path(table_name)}' AS SELECT * FROM sedona_table
"""
)

22/10/01 12:20:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
22/10/01 12:20:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
22/10/01 12:20:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers




DataFrame[]

# Insert into table

In [12]:
delta_table_name = "insert_geospatial_data"

In [13]:
europe_extent = """
POLYGON((
    -21.289062499999996 73.22863503912812,
    43.046875 73.22863503912812,
    43.046875 38.82781590516771,
    -21.289062499999996 38.82781590516771,
    -21.289062499999996 73.22863503912812
))
""".replace("  ", "").replace("\n", "")



europe_extent = spark.createDataFrame(
    [GeomExtent(loads(europe_extent), 1)]
)
europe_extent.createOrReplaceTempView("extent")

In [14]:
existing_records = spark.createDataFrame(
    [
        CityMeasure(index = 6, geom=loads("POINT(-77.054092 38.934715)"), city="Washington", measure = 0.0),
    ],
)

create_table_as_select(delta_table_name, existing_records)

++
||
++
++



In [15]:
additional_records = spark.createDataFrame(
    [
        CityMeasure(index = 1, geom=loads("POINT(21.037308 52.226236)"), city="Warsaw", measure = 0.0),
        CityMeasure(index = 2, geom=loads("POINT(4.899059 52.363573)"), city="Amsterdam", measure = 0.0),
        CityMeasure(index = 6, geom=loads("POINT(-77.054092 38.934715)"), city="Washington", measure = 0.0),
        CityMeasure(index = 3, geom=loads("POINT(13.419083 52.504049)"), city="Berlin", measure = 0.0),
    ],
)

geospatial_data_view_name = "new_records"
additional_records.createOrReplaceTempView(geospatial_data_view_name)

In [16]:
load_delta_table(delta_table_name).show()

+----------+--------------------+-----+-------+
|      city|                geom|index|measure|
+----------+--------------------+-----+-------+
|Washington|POINT (-77.054092...|    6|    0.0|
+----------+--------------------+-----+-------+



In [17]:
spark.sql(
f"""
INSERT INTO delta.`{get_table_path(delta_table_name)}` (city, geom, index, measure)
    SELECT i.city, i.geom, i.index, i.measure FROM {geospatial_data_view_name} AS i, extent AS b
    WHERE St_Intersects(i.geom, b.geom)
"""
)

22/10/01 12:20:25 WARN RangeJoinExec: [SedonaSQL] Join dominant side partition number 12 is larger than 1/2 of the dominant side count 4
22/10/01 12:20:25 WARN RangeJoinExec: [SedonaSQL] Try to use follower side partition number 12
22/10/01 12:20:25 WARN RangeJoinExec: [SedonaSQL] Join follower side partition number is also larger than 1/2 of the dominant side count 4
22/10/01 12:20:25 WARN RangeJoinExec: [SedonaSQL] Try to use 1/2 of the dominant side count 2 as the partition number of both sides
22/10/01 12:20:25 WARN JoinQuery: UseIndex is true, but no index exists. Will build index on the fly.


DataFrame[]

In [18]:
load_delta_table(delta_table_name).show()

+----------+--------------------+-----+-------+
|      city|                geom|index|measure|
+----------+--------------------+-----+-------+
| Amsterdam|POINT (4.899059 5...|    2|    0.0|
|    Berlin|POINT (13.419083 ...|    3|    0.0|
|Washington|POINT (-77.054092...|    6|    0.0|
|    Warsaw|POINT (21.037308 ...|    1|    0.0|
+----------+--------------------+-----+-------+



# Update Table

In [19]:
spark.sql(f"""
 UPDATE delta.`{get_table_path(delta_table_name)}`
 SET measure = 1.0 where ST_Distance(geom, ST_GeomFromText('POINT(21.00 52.00)')) < 1.0
""")

DataFrame[num_affected_rows: bigint]

In [20]:
load_delta_table(delta_table_name).show()

+----------+--------------------+-----+-------+
|      city|                geom|index|measure|
+----------+--------------------+-----+-------+
| Amsterdam|POINT (4.899059 5...|    2|    0.0|
|    Berlin|POINT (13.419083 ...|    3|    0.0|
|Washington|POINT (-77.054092...|    6|    0.0|
|    Warsaw|POINT (21.037308 ...|    1|    1.0|
+----------+--------------------+-----+-------+



# Merge into table

In [21]:
additional_records_to_merge = spark.createDataFrame(
    [
        CityMeasure(index = 1, geom=loads("POINT(21.037308 52.226236)"), city="Warsaw", measure = 2.0),
        CityMeasure(index = 5, geom=loads("POINT(4.899059 52.363573)"), city="Amsterdam", measure = 4.0),
        CityMeasure(index = 6, geom=loads("POINT(-77.054092 38.934715)"), city="Washington", measure = 5.0),
    ]
).repartition(1)

In [22]:
geospatial_data_to_merge_view_name = "new_records"
additional_records_to_merge.createOrReplaceTempView(geospatial_data_view_name)

In [23]:
spark.sql(f"""
 MERGE INTO delta.`{get_table_path(delta_table_name)}` AS m
 USING {geospatial_data_to_merge_view_name} AS u
 ON ST_Intersects(st_buffer(m.geom, 2.0), u.geom)
 WHEN MATCHED THEN
 UPDATE SET
  measure = u.measure
 WHEN NOT MATCHED
 THEN INSERT (
 index,
 geom,
 measure,
 city
 )
 VALUES(
  u.index,
  u.geom,
  u.measure,
  u.city
 )

""")

22/10/01 12:20:28 WARN RangeJoinExec: [SedonaSQL] Join dominant side partition number 5 is larger than 1/2 of the dominant side count 4
22/10/01 12:20:28 WARN RangeJoinExec: [SedonaSQL] Try to use follower side partition number 1
22/10/01 12:20:28 WARN JoinQuery: UseIndex is true, but no index exists. Will build index on the fly.
22/10/01 12:20:29 WARN MergeIntoCommand: Merge source has SQLMetric(id: 1586, name: Some(number of source rows), value: 3) rows in initial scan but SQLMetric(id: 1587, name: Some(number of source rows (during repeated scan)), value: 6) rows in second scan


DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [24]:
load_delta_table(delta_table_name).show()

+----------+--------------------+-----+-------+
|      city|                geom|index|measure|
+----------+--------------------+-----+-------+
|    Warsaw|POINT (21.037308 ...|    1|    2.0|
| Amsterdam|POINT (4.899059 5...|    2|    4.0|
|Washington|POINT (-77.054092...|    6|    5.0|
|    Berlin|POINT (13.419083 ...|    3|    0.0|
+----------+--------------------+-----+-------+

