In [1]:
import oracledb
import os

import geopandas as gpd
from pyspark.sql import SparkSession

from sedona.spark import SedonaContext

In [2]:
additional_packages = [
    'org.postgresql:postgresql:42.7.4',
    'mysql:mysql-connector-java:8.0.33',
    'com.oracle.database.jdbc:ojdbc8:23.6.0.24.10',
    'org.mongodb.spark:mongo-spark-connector_2.12:10.4.0',
    'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1',
    'org.apache.hadoop:hadoop-aws:3.3.6',
    'org.apache.hadoop:hadoop-client-api:3.4.1',
    'org.apache.sedona:sedona-spark-3.5_2.12:1.7.0',
    'org.datasyslab:geotools-wrapper:1.7.0-28.5',
    'uk.co.gresearch.spark:spark-extension_2.12:2.11.0-3.4'
]

config = (
    SedonaContext.builder()
    .config(
     "spark.driver.memory", "2G"   
    )
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.access.key", "sedona") \
    .config("spark.hadoop.fs.s3a.secret.key", "sedona_password") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config(
      "spark.hadoop.fs.s3a.aws.credentials.provider",
      "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")\
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017")
    .config("spark.jars.packages", ",".join(additional_packages))
    .getOrCreate()
)

sedona = SedonaContext.create(config)

24/12/18 19:42:04 WARN Utils: Your hostname, Pawels-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.115 instead (on interface en0)
24/12/18 19:42:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/pawelkocinski/Desktop/projects/apache-sedona-book/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/pawelkocinski/.ivy2/cache
The jars for the packages stored in: /Users/pawelkocinski/.ivy2/jars
org.postgresql#postgresql added as a dependency
mysql#mysql-connector-java added as a dependency
com.oracle.database.jdbc#ojdbc8 added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
org.apache.hadoop#hadoop-client-api added as a dependency
org.apache.sedona#sedona-spark-3.5_2.12 added as a dependency
org.datasyslab#geotools-wrapper added as a dependency
uk.co.gresearch.spark#spark-extension_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0e4aca9e-f493-4699-bbaf-6d82b29db881;1.0
	confs: [default]
	found org.postgresql#postgresql;42.7.4 in central
	found org.checkerframework#checker-qual;3.42.0 in central
	found mysql#mysql-connector-java;8.0.33 in central
	found com.m

# Postgis

In [3]:
database_name = "sedona"
user_name = "sedona"
password = "sedona"

postgresql_url = f"jdbc:postgresql://localhost:5432/{database_name}"
postgresql_properties = {
    "user": "sedona",
    "password": "postgis",
}

table_name = "points"
df = sedona.read \
    .format("jdbc") \
    .option("url", postgresql_url) \
    .option("user", postgresql_properties["user"]) \
    .option("password", postgresql_properties["password"]) \
    .option("dbtable", "points") \
    .option("driver", "org.postgresql.Driver") \
    .load()

df.show(3)

+-------+--------------------+
|   name|            location|
+-------+--------------------+
|Point A|0101000020E610000...|
|Point B|0101000020E610000...|
|Point C|0101000020E610000...|
+-------+--------------------+



In [4]:
# transforming postgis EWKB column to 

In [5]:
df.selectExpr("name", "ST_GeomFromEWKB(location) AS geom").show()

+-------+-------------+
|   name|         geom|
+-------+-------------+
|Point A|POINT (10 20)|
|Point B|POINT (30 40)|
|Point C|POINT (50 60)|
+-------+-------------+



In [6]:
df.selectExpr("name", "ST_SRID(ST_GeomFromEWKB(location)) AS geom").show()

+-------+----+
|   name|geom|
+-------+----+
|Point A|4326|
|Point B|4326|
|Point C|4326|
+-------+----+



# MySQL

In [7]:
database_name = "sedona"
user_name = "sedona"
password = "sedona"
table_name = "points"

mysql_url = f"jdbc:mysql://localhost:3306/{database_name}"

df = sedona.read \
    .format("jdbc") \
    .option("url", mysql_url) \
    .option("user", user_name) \
    .option("password", password) \
    .option("dbtable", "points") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load()

df.show()

+-------+--------------------+
|   name|            location|
+-------+--------------------+
|Point A|[E6 10 00 00 01 0...|
|Point B|[E6 10 00 00 01 0...|
|Point C|[E6 10 00 00 01 0...|
+-------+--------------------+



In [8]:
df.selectExpr(
    "name",
    "ST_GeomFromMySQL(location) AS geom"
).show(3, False)

AnalysisException: [UNRESOLVED_ROUTINE] Cannot resolve function `ST_GeomFromMySQL` on search path [`system`.`builtin`, `system`.`session`, `spark_catalog`.`default`].; line 1 pos 0

In [None]:
df.selectExpr(
    "name",
    "ST_SRID(ST_GeomFromMySQL(location)) AS srid"
).show(3, False)

# OracleDB

In [9]:
oracle_url = "jdbc:oracle:thin:@//localhost:1521/FREEPDB1"

table_name = "points"

df = sedona.read \
    .format("jdbc") \
    .option("url", oracle_url) \
    .option("query", "SELECT name, SDO_UTIL.TO_WKBGEOMETRY(location) AS wkb from points") \
    .option("user", "sedona") \
    .option("password", "sedona") \
    .option("driver", "oracle.jdbc.OracleDriver") \
    .load()

In [10]:
df.show()

+-------+--------------------+
|   NAME|                 WKB|
+-------+--------------------+
|Point A|[00 00 00 00 01 4...|
|Point B|[00 00 00 00 01 4...|
|Point C|[00 00 00 00 01 4...|
+-------+--------------------+



In [11]:
# Step 4: Perform Data Processing
df.selectExpr("ST_GeomFromWKB(wkb) as geom").show()

+-------------+
|         geom|
+-------------+
|POINT (10 20)|
|POINT (30 40)|
|POINT (50 60)|
+-------------+



# MongoDB

In [12]:
from pyspark.sql.functions import col, to_json

df = sedona.read \
    .option("database", "sedona") \
    .option("collection", "points") \
    .format("mongodb").load()

df.selectExpr("location").show(2, False)

+-----------------------------+
|location                     |
+-----------------------------+
|{Point, [-74.006, 40.7128]}  |
|{Point, [-118.2437, 34.0522]}|
+-----------------------------+



In [13]:
df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |-- name: string (nullable = true)



In [14]:
df.withColumn("location", to_json(col("location")))\
    .selectExpr("name", "ST_GeomFromGeoJSON(location) AS geom")\
    .show(2, False)


+-------+-------------------------+
|name   |geom                     |
+-------+-------------------------+
|Point A|POINT (-74.006 40.7128)  |
|Point B|POINT (-118.2437 34.0522)|
+-------+-------------------------+



# Bulk read from postgres and store as geoparquet on s3

In [15]:
database_name = "sedona"
user_name = "sedona"
password = "sedona"

postgresql_url = f"jdbc:postgresql://localhost:5432/{database_name}"
postgresql_properties = {
    "user": "sedona",
    "password": "postgis",
}

query = """
SELECT * FROM polygons WHERE COALESCE(CREATED_AT, UPDATED_AT) BETWEEN '2020-01-01 00:00:00' AND '2024-01-01 00:00:00'
"""

start_date = "2024-01-01 11:00:00"
end_date = "2024-01-01 12:00:00"
table_name = "points"
df = sedona.read \
    .format("jdbc") \
    .option("url", postgresql_url) \
    .option("user", postgresql_properties["user"]) \
    .option("password", postgresql_properties["password"]) \
    .option("query", query) \
    .option("driver", "org.postgresql.Driver") \
    .load()

df.selectExpr("name", "ST_GeomFromEWKB(location)", "created_at", "updated_at").show()

+---------+-------------------------+-------------------+-------------------+
|     name|st_geomfromewkb(location)|         created_at|         updated_at|
+---------+-------------------------+-------------------+-------------------+
|Polygon B|     POLYGON ((70 80, ...|2020-01-01 00:00:00|2024-01-01 11:36:00|
|Polygon C|     POLYGON ((130 140...|2023-01-01 00:00:00|2024-01-01 11:16:00|
+---------+-------------------------+-------------------+-------------------+



# Reading and storing cdc data into s3 geoparquet

In [16]:
import pyspark.sql.functions as f
import pyspark.sql.types as t 

df = sedona \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "sedona-debezium.public.points") \
  .load()

In [17]:
data = t.StructType([
    t.StructField("name", t.StringType(), True),
    t.StructField("location", t.StructType([
        t.StructField("srid", t.IntegerType()),
        t.StructField("wkb", t.BinaryType())
    ]))
])

before = t.StructField("before", data, True)

after = t.StructField("after", data, True)

payload = t.StructField(
    "payload", t.StructType([before, after])
)

schema = t.StructType([
    payload
])

In [18]:
geometry_df = df.select(f.from_json(f.expr("CAST(value AS STRING)"), schema).alias("data"))\
    .selectExpr(
        "data.payload.after.name as name",
        "data.payload.after.location.wkb as wkb",
        "data.payload.after.location.srid AS srid"
    )\
    .selectExpr("name", "ST_SetSRID(ST_GeomFromWKB(wkb), srid) AS geom")

In [19]:
geometry_df.withColumn("geohash", f.expr("ST_GeoHash(geom, 5)"))\
    .orderBy("geohash")\
    .write\
    .mode("overwrite")\
    .format("geoparquet")\
    .save("s3a://sedona/postgis-cdc-batch")

24/12/18 19:42:40 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
24/12/18 19:42:41 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                