In [1]:
from pathlib import Path
import teehr
from datetime import datetime
import pytz
from pyspark.sql import SparkSession, DataFrame

In [2]:
evaluation_path = str(Path.home() / "temp" / "iceberg" / "evaluation")
warehouse_path = str(Path(evaluation_path) / "spark-warehouse")
catalog_name = "local"
schema_name = "db"

In [3]:
from sedona.spark import *

config = (
    SedonaContext.builder()
    .config(
        "spark.jars.packages",
        "org.apache.sedona:sedona-spark-3.5_2.12:1.7.1,"
        "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.0,"
        "org.datasyslab:geotools-wrapper:1.7.1-28.5"
    )
    .config(
        "spark.jars.repositories",
        "https://artifacts.unidata.ucar.edu/repository/unidata-all",
    )
    .config(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    )
    .config(
        f"spark.sql.catalog.{catalog_name}",
        "org.apache.iceberg.spark.SparkCatalog"
    )
    .config(
        f"spark.sql.catalog.{catalog_name}.type", "hadoop"
    )
    .config(
        f"spark.sql.catalog.{catalog_name}.warehouse",
        f"{warehouse_path}/{catalog_name}"
    )
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .config("spark.sql.session.timeZone", "UTC")
    .config("spark.driver.host", "localhost")
    .config("spark.driver.bindAddress", "localhost")
    .config("spark.driver.memory", "16g")
    .getOrCreate()
)
sedona = SedonaContext.create(config)

https://artifacts.unidata.ucar.edu/repository/unidata-all added as a remote repository with the name: repo-1
Ivy Default Cache set to: /Users/mdenno/.ivy2/cache
The jars for the packages stored in: /Users/mdenno/.ivy2/jars
org.apache.sedona#sedona-spark-3.5_2.12 added as a dependency
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.datasyslab#geotools-wrapper added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-be543c5d-ab17-4fce-adf8-d7906586c233;1.0
	confs: [default]
	found org.apache.sedona#sedona-spark-3.5_2.12;1.7.1 in central
	found org.apache.sedona#sedona-common;1.7.1 in central
	found org.apache.commons#commons-math3;3.6.1 in central
	found org.locationtech.jts#jts-core;1.20.0 in central
	found org.wololo#jts2geojson;0.16.1 in central


:: loading settings :: url = jar:file:/Users/mdenno/repos/teehr/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.locationtech.spatial4j#spatial4j;0.8 in central
	found com.google.geometry#s2-geometry;2.0.0 in central
	found com.google.guava#guava;25.1-jre in central
	found com.google.code.findbugs#jsr305;3.0.2 in central
	found org.checkerframework#checker-qual;2.0.0 in central
	found com.google.errorprone#error_prone_annotations;2.1.3 in central
	found com.google.j2objc#j2objc-annotations;1.1 in central
	found org.codehaus.mojo#animal-sniffer-annotations;1.14 in central
	found com.uber#h3;4.1.1 in central
	found net.sf.geographiclib#GeographicLib-Java;1.52 in central
	found com.github.ben-manes.caffeine#caffeine;2.9.2 in central
	found org.checkerframework#checker-qual;3.10.0 in central
	found com.google.errorprone#error_prone_annotations;2.5.1 in central
	found org.apache.sedona#sedona-spark-common-3.5_2.12;1.7.1 in central
	found org.apache.sedona#shade-proto;1.7.1 in central
	found org.xerial#sqlite-jdbc;3.41.2.2 in central
	found commons-lang#commons-lang;2.6 in central
	found gra

In [4]:
# Create an Evaluation object and create the directory
ev = teehr.Evaluation(
    dir_path=evaluation_path,
    spark=sedona
)

# Make some views and query

In [5]:
attribute_names = [row.attribute_name for row in sedona.sql(f"""
    SELECT DISTINCT(attribute_name) FROM local.db.location_attributes
""").collect()]
attribute_names_sql = ", ".join([f"'{name}'" for name in attribute_names])

sedona.sql(f"""
    CREATE OR REPLACE TEMP VIEW locations_view AS (
        WITH location_attributes_pivot AS (
            SELECT *
            FROM (
                SELECT location_id, attribute_name, value
                FROM local.db.location_attributes
            ) src
            PIVOT (
                max(value) FOR attribute_name IN ({attribute_names_sql})
            )
        )
        SELECT l.id, l.name, ST_GeomFromWKB(l.geometry) AS geometry, la.*
        FROM local.db.locations l
        LEFT JOIN location_attributes_pivot la
        ON l.id = la.location_id
        WHERE l.id IS NOT NULL
        AND la.location_id IS NOT NULL
    )
""")

25/06/04 14:54:37 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[]

In [6]:
sedona.sql(f"""
    SELECT *
    FROM locations_view
    WHERE id = 'usgs-01013500'
""").show()

+-------------+--------------------+--------------------+-------------+-----------------+----------------+-----------------+------------------+-----------------+-----------------+------------------+-----------------+-----------------+----------+-------------+----------------+-------------------+----------------+----------------+---------+----------+---------------+-----------+-----------+--------------+--------------------+-----------+-----------------+------------+---------------------+
|           id|                name|            geometry|  location_id|        frac_snow|         aridity|    p_seasonality|                q5|              q95|     runoff_ratio|            q_mean|   baseflow_index|    soil_porosity|slope_mean|drainage_area|          p_mean|dom_land_cover_frac|        pet_mean|       slope_fdc|elev_mean|frac_urban|NID_dam_lengths|zero_q_freq|high_q_freq|high_prec_freq|        ecoregion_L2|forest_frac|   dom_land_cover|stream_order|river_forecast_center|
+-------------

In [7]:
sedona.sql("""
CREATE OR REPLACE TEMP VIEW joined_timeseries_view AS (
    SELECT
        sf.reference_time
        , sf.value_time as value_time
        , pf.location_id as primary_location_id
        , sf.location_id as secondary_location_id
        , pf.value as primary_value
        , sf.value as secondary_value
        , sf.configuration_name
        , sf.unit_name
        , sf.variable_name
        , sf.member
    FROM local.db.secondary_timeseries sf
    JOIN local.db.location_crosswalks cf
        on cf.secondary_location_id = sf.location_id
    JOIN local.db.primary_timeseries pf
        on cf.primary_location_id = pf.location_id
        and sf.value_time = pf.value_time
        and sf.unit_name = pf.unit_name
        and sf.variable_name = pf.variable_name
)
""", )


DataFrame[]

In [8]:
sedona.sql(f"SELECT * FROM joined_timeseries_view").show()

[Stage 12:>                                                         (0 + 4) / 4]

+--------------+-------------------+-------------------+---------------------+-------------+---------------+-------------------+---------+--------------------+------+
|reference_time|         value_time|primary_location_id|secondary_location_id|primary_value|secondary_value| configuration_name|unit_name|       variable_name|member|
+--------------+-------------------+-------------------+---------------------+-------------+---------------+-------------------+---------+--------------------+------+
|          NULL|1990-11-17 00:00:00|      usgs-01013500|        usgs-01013500|    70.827515|       99.89531|marrmot_37_hbv_obj1|    m^3/s|streamflow_daily_...|  NULL|
|          NULL|1990-11-17 00:00:00|      usgs-01013500|        usgs-01013500|    70.827515|       60.40052|   camels_daymet_05|    m^3/s|streamflow_daily_...|  NULL|
|          NULL|1990-11-17 00:00:00|      usgs-01013500|         nwm30-724696|    70.827515|       66.35166|nwm30_retrospective|    m^3/s|streamflow_daily_...|  NULL

                                                                                

In [9]:
sdf = sedona.sql("""
    SELECT river_forecast_center, avg(primary_value) as ave FROM joined_timeseries_view jtv
        JOIN locations_view lv
        ON jtv.primary_location_id = lv.id
    GROUP BY river_forecast_center;
""")
sdf.show()



+---------------------+------------------+
|river_forecast_center|               ave|
+---------------------+------------------+
|                CBRFC|1.5549045075001895|
|                NWRFC| 24.37332440867924|
|                NCRFC|10.509092417281442|
|                WGRFC|3.0123704995564933|
|                MARFC| 6.642655581080046|
|                NERFC|11.451879210220044|
|                LMRFC|14.989565046280838|
|                OHRFC|  9.97940754219624|
|                ABRFC|3.4584774255171933|
|                SERFC| 6.083878497829004|
|                CNRFC|12.503939524957193|
|                MBRFC| 7.807546943110796|
+---------------------+------------------+



                                                                                

# Lets play with inserts and merges

In [10]:
sedona.sql(f"""
    SELECT * FROM local.db.primary_timeseries
    WHERE location_id == 'usgs-14182500'
    ORDER BY value_time ASC
    LIMIT 10
""").show()

+--------------+-------------------+------------------+---------+--------------------+---------+-------------+
|reference_time|         value_time|configuration_name|unit_name|       variable_name|    value|  location_id|
+--------------+-------------------+------------------+---------+--------------------+---------+-------------+
|          NULL|1989-01-01 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|24.823256|usgs-14182500|
|          NULL|1989-01-02 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|21.668287|usgs-14182500|
|          NULL|1989-01-03 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|31.285395|usgs-14182500|
|          NULL|1989-01-04 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|38.038963|usgs-14182500|
|          NULL|1989-01-05 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...| 37.80299|usgs-14182500|
|          NULL|1989-01-06 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|29.954504|usgs-14182500|
|

In [11]:
schema = ev.primary_timeseries.schema_func().to_structtype()
schema

StructType([StructField('reference_time', TimestampType(), True), StructField('value_time', TimestampType(), True), StructField('value', FloatType(), True), StructField('variable_name', StringType(), True), StructField('configuration_name', StringType(), True), StructField('unit_name', StringType(), True), StructField('location_id', StringType(), True)])

In [12]:
# Create some rows to add
df = sedona.createDataFrame(
    [
        (None, datetime(1989, 1, 1, 0, 0), 24.82, "streamflow_daily_mean", "usgs_observations", "m^3/s", "usgs-14182500"),
        (None, datetime(1989, 1, 2, 0, 0), 21.67, "streamflow_daily_mean", "usgs_observations", "m^3/s", "usgs-14182500"),
    ],
    schema
)
df.show()

+--------------+-------------------+-----+--------------------+------------------+---------+-------------+
|reference_time|         value_time|value|       variable_name|configuration_name|unit_name|  location_id|
+--------------+-------------------+-----+--------------------+------------------+---------+-------------+
|          NULL|1989-01-01 05:00:00|24.82|streamflow_daily_...| usgs_observations|    m^3/s|usgs-14182500|
|          NULL|1989-01-02 05:00:00|21.67|streamflow_daily_...| usgs_observations|    m^3/s|usgs-14182500|
+--------------+-------------------+-----+--------------------+------------------+---------+-------------+



In [13]:
def merge_into(
        spark: SparkSession,
        ev: teehr.Evaluation,
        df: DataFrame
):
    # This could come from self
    pt = ev.primary_timeseries

    # Create a temporary view for the source DataFrame
    df.createOrReplaceTempView("source")

    # Define the join condition for the MERGE operation
    # Join-on String with null-safe equality
    join_on_string = " AND ".join([
        f"t.{field} <=> s.{field}" for field in pt.unique_column_set
    ])

    # Update any fields that are not part of the unique key
    update_set = list(set(df.columns) - set(pt.unique_column_set))
    update_set_string = ", ".join([
        f"t.{field} = s.{field}" for field in update_set
    ])

    fields_string = ", ".join([f"{field}" for field in df.columns])

    # on_string
    spark.sql(f"""
        MERGE INTO local.db.primary_timeseries t
        USING (SELECT * FROM source) s
        ON {join_on_string}
        WHEN MATCHED THEN UPDATE SET {update_set_string}
        WHEN NOT MATCHED THEN
        INSERT ({fields_string}) VALUES ({fields_string})
    """)

In [14]:
# Merge the DataFrame into the primary_timeseries table
merge_into(spark=sedona, ev=ev, df=df)

                                                                                

In [15]:
# Query
sedona.sql(f"""
    SELECT * FROM local.db.primary_timeseries
    WHERE location_id == 'usgs-14182500'
    ORDER BY value_time ASC
    LIMIT 10
""").show()

+--------------+-------------------+------------------+---------+--------------------+---------+-------------+
|reference_time|         value_time|configuration_name|unit_name|       variable_name|    value|  location_id|
+--------------+-------------------+------------------+---------+--------------------+---------+-------------+
|          NULL|1989-01-01 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|24.823256|usgs-14182500|
|          NULL|1989-01-01 05:00:00| usgs_observations|    m^3/s|streamflow_daily_...|    24.82|usgs-14182500|
|          NULL|1989-01-02 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|21.668287|usgs-14182500|
|          NULL|1989-01-02 05:00:00| usgs_observations|    m^3/s|streamflow_daily_...|    21.67|usgs-14182500|
|          NULL|1989-01-03 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|31.285395|usgs-14182500|
|          NULL|1989-01-04 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|38.038963|usgs-14182500|
|

In [24]:
# Oops added values with the wrong timezone, let's fix that. Lets see available snapshots
sedona.sql(f"""
    SELECT * FROM local.db.primary_timeseries.snapshots;
""").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-06-04 01:56:...| 539526172352411418|               NULL|   append|/Users/mdenno/tem...|{spark.app.id -> ...|
|2025-06-04 18:59:...|6640338625955216425| 539526172352411418|   append|/Users/mdenno/tem...|{spark.app.id -> ...|
|2025-06-04 19:01:...|2368637203356834011| 539526172352411418|overwrite|/Users/mdenno/tem...|{spark.app.id -> ...|
|2025-06-04 19:01:...|1024535650660937797|2368637203356834011|overwrite|/Users/mdenno/tem...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [17]:
# Roll back to initial append
sedona.sql(f"""
    CALL {catalog_name}.system.rollback_to_snapshot('db.primary_timeseries', 539526172352411418)
""")

DataFrame[previous_snapshot_id: bigint, current_snapshot_id: bigint]

In [18]:
# Verify rollback
sedona.sql(f"""
    SELECT * FROM local.db.primary_timeseries
    WHERE location_id == 'usgs-14182500'
    ORDER BY value_time ASC
    LIMIT 10
""").show()

+--------------+-------------------+------------------+---------+--------------------+---------+-------------+
|reference_time|         value_time|configuration_name|unit_name|       variable_name|    value|  location_id|
+--------------+-------------------+------------------+---------+--------------------+---------+-------------+
|          NULL|1989-01-01 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|24.823256|usgs-14182500|
|          NULL|1989-01-02 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|21.668287|usgs-14182500|
|          NULL|1989-01-03 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|31.285395|usgs-14182500|
|          NULL|1989-01-04 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|38.038963|usgs-14182500|
|          NULL|1989-01-05 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...| 37.80299|usgs-14182500|
|          NULL|1989-01-06 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|29.954504|usgs-14182500|
|

In [19]:
# Create new data with TZ set
df = sedona.createDataFrame(
    [
        (None, datetime(1989, 1, 1, 0, 0, tzinfo=pytz.UTC), 24.82, "streamflow_daily_mean", "usgs_observations", "m^3/s", "usgs-14182500"),
        (None, datetime(1989, 1, 2, 0, 0, tzinfo=pytz.UTC), 21.67, "streamflow_daily_mean", "usgs_observations", "m^3/s", "usgs-14182500"),
    ],
    schema
)
df.show()

+--------------+-------------------+-----+--------------------+------------------+---------+-------------+
|reference_time|         value_time|value|       variable_name|configuration_name|unit_name|  location_id|
+--------------+-------------------+-----+--------------------+------------------+---------+-------------+
|          NULL|1989-01-01 00:00:00|24.82|streamflow_daily_...| usgs_observations|    m^3/s|usgs-14182500|
|          NULL|1989-01-02 00:00:00|21.67|streamflow_daily_...| usgs_observations|    m^3/s|usgs-14182500|
+--------------+-------------------+-----+--------------------+------------------+---------+-------------+



In [20]:
# Try to merge again
# Merge the DataFrame into the primary_timeseries table
merge_into(spark=sedona, ev=ev, df=df)

                                                                                

In [21]:
# Query again
sedona.sql(f"""
    SELECT * FROM local.db.primary_timeseries
    WHERE location_id == 'usgs-14182500'
    ORDER BY value_time ASC
    LIMIT 10
""").show()

+--------------+-------------------+------------------+---------+--------------------+---------+-------------+
|reference_time|         value_time|configuration_name|unit_name|       variable_name|    value|  location_id|
+--------------+-------------------+------------------+---------+--------------------+---------+-------------+
|          NULL|1989-01-01 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|    24.82|usgs-14182500|
|          NULL|1989-01-02 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|    21.67|usgs-14182500|
|          NULL|1989-01-03 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|31.285395|usgs-14182500|
|          NULL|1989-01-04 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|38.038963|usgs-14182500|
|          NULL|1989-01-05 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...| 37.80299|usgs-14182500|
|          NULL|1989-01-06 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|29.954504|usgs-14182500|
|

In [22]:
# Create new data with TZ set
df = sedona.createDataFrame(
    [
        (None, datetime(1988, 12, 30, 0, 0, tzinfo=pytz.UTC), 12.30, "streamflow_daily_mean", "usgs_observations", "m^3/s", "usgs-14182500"),
        (None, datetime(1988, 12, 31, 0, 0, tzinfo=pytz.UTC), 12.31, "streamflow_daily_mean", "usgs_observations", "m^3/s", "usgs-14182500"),
        (None, datetime(1989, 1, 1, 0, 0, tzinfo=pytz.UTC), 19.00, "streamflow_daily_mean", "usgs_observations", "m^3/s", "usgs-14182500"),
    ],
    schema
)
df.show()

+--------------+-------------------+-----+--------------------+------------------+---------+-------------+
|reference_time|         value_time|value|       variable_name|configuration_name|unit_name|  location_id|
+--------------+-------------------+-----+--------------------+------------------+---------+-------------+
|          NULL|1988-12-30 00:00:00| 12.3|streamflow_daily_...| usgs_observations|    m^3/s|usgs-14182500|
|          NULL|1988-12-31 00:00:00|12.31|streamflow_daily_...| usgs_observations|    m^3/s|usgs-14182500|
|          NULL|1989-01-01 00:00:00| 19.0|streamflow_daily_...| usgs_observations|    m^3/s|usgs-14182500|
+--------------+-------------------+-----+--------------------+------------------+---------+-------------+



In [23]:
merge_into(spark=sedona, ev=ev, df=df)
# Verify the merge
sedona.sql("""
    SELECT * FROM local.db.primary_timeseries
    WHERE location_id == 'usgs-14182500'
    ORDER BY value_time ASC
    LIMIT 10
""").show()

                                                                                

+--------------+-------------------+------------------+---------+--------------------+---------+-------------+
|reference_time|         value_time|configuration_name|unit_name|       variable_name|    value|  location_id|
+--------------+-------------------+------------------+---------+--------------------+---------+-------------+
|          NULL|1988-12-30 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|     12.3|usgs-14182500|
|          NULL|1988-12-31 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|    12.31|usgs-14182500|
|          NULL|1989-01-01 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|     19.0|usgs-14182500|
|          NULL|1989-01-02 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|    21.67|usgs-14182500|
|          NULL|1989-01-03 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|31.285395|usgs-14182500|
|          NULL|1989-01-04 00:00:00| usgs_observations|    m^3/s|streamflow_daily_...|38.038963|usgs-14182500|
|

In [1]:
sedona.stop()

NameError: name 'sedona' is not defined

- Create joined timeseries needs some filters so that we can incrementally create the joined timeseries.  
- Joined timeseries should maybe only contain the joined ts.  That join process does not parallelize well.  Other joins do.