In [1]:
from teehr import Evaluation
from pathlib import Path
import shutil
import xarray as xr
import geopandas as gpd
from teehr.models.tables import (
    Attribute,
    Configuration,
    Variable
)
import duckdb

In [2]:
# Set a path to the directory where the evaluation will be created
TEST_STUDY_DIR = Path("/data/v0_4_protocols/p3_retro_hourly_streamflow")
# shutil.rmtree(TEST_STUDY_DIR, ignore_errors=True)
# TEST_STUDY_DIR.mkdir(parents=True, exist_ok=True)

In [3]:
# Set a path to the directory where the test data is stored
TEST_DATA_DIR = Path("/data/common/")
LOCATIONS_FILEPATH = Path(TEST_DATA_DIR, "geometry/usgs_point_geometry.all.parquet")
PRIMARY_TIMESERIES_FILEPATH = Path(TEST_DATA_DIR, "observations/usgs_conus/streamflow_hourly_inst")
CROSSWALK_FILEPATH = Path(TEST_DATA_DIR, "crosswalks/usgs_nwm30_crosswalk.conus.parquet")
SECONDARY_TIMESERIES_FILEPATH = Path(TEST_DATA_DIR, "baselines/nwm30_retrospective_conus/streamflow_hourly_inst")
ATTR_FILEPATH = Path(TEST_DATA_DIR, "attributes")

In [4]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = (
    SparkConf()
    .setAppName("TEEHR")
    .setMaster("local[*]")
    .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .set("spark.local.dir", "/data/tmp")
)
spark = SparkSession.builder.config(conf=conf).getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/29 21:11:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/29 21:11:29 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


In [5]:
# Create an Evaluation object
ev = Evaluation(dir_path=TEST_STUDY_DIR, spark=spark)

# Enable logging
ev.enable_logging()

# Clone the template
# ev.clone_template()

In [6]:
# Load the location data (observations)
ev.locations.load_spatial(in_path=LOCATIONS_FILEPATH)

<teehr.evaluation.tables.LocationTable at 0x7f9ec3f7fc20>

In [7]:
ev.locations.to_geopandas().head()

Unnamed: 0,id,name,geometry
0,usgs-01010000,"St. John River at Ninemile Bridge, Maine",POINT (-69.71556 46.70056)
1,usgs-01010070,"Big Black River near Depot Mtn, Maine",POINT (-69.75167 46.89389)
2,usgs-01010500,"St. John River at Dickey, Maine",POINT (-69.08806 47.11306)
3,usgs-01011000,"Allagash River near Allagash, Maine",POINT (-69.07944 47.06972)
4,usgs-01011500,"St. Francis River near Connors, New Brunswick",POINT (-68.95643 47.20698)


In [8]:
# Load the timeseries data and map over the fields and set constants
ev.primary_timeseries.load_parquet(
    in_path=PRIMARY_TIMESERIES_FILEPATH,
    field_mapping={
        "reference_time": "reference_time",
        "value_time": "value_time",
        "configuration": "configuration_name",
        "measurement_unit": "unit_name",
        "variable_name": "variable_name",
        "value": "value",
        "location_id": "location_id"
    },
    constant_field_values={
        "unit_name": "m^3/s",
        "configuration_name": "usgs_observations",
        "variable_name": "streamflow_hourly_inst"
    }
)

                                                                                

<teehr.evaluation.tables.PrimaryTimeseriesTable at 0x7f9f5daac380>

In [9]:
# Load the crosswalk data
ev.location_crosswalks.load_parquet(
    in_path=CROSSWALK_FILEPATH
)

                                                                                

<teehr.evaluation.tables.LocationCrosswalkTable at 0x7f9fabec5a30>

In [10]:
# Load the secondary timeseries data and map over the fields and set constants
ev.secondary_timeseries.load_parquet(
    in_path=SECONDARY_TIMESERIES_FILEPATH,
    field_mapping={
        "reference_time": "reference_time",
        "value_time": "value_time",
        "configuration": "configuration_name",
        "measurement_unit": "unit_name",
        "variable_name": "variable_name",
        "value": "value",
        "location_id": "location_id"
    },
    constant_field_values={
        "unit_name": "m^3/s",
        "configuration_name": "nwm30_retrospective",
        "variable_name": "streamflow_hourly_inst"
    }
)

                                                                                

<teehr.evaluation.tables.SecondaryTimeseriesTable at 0x7f9e92b40c50>

In [11]:
df = duckdb.query(
    f"SELECT distinct(attribute_name) FROM read_parquet('{ATTR_FILEPATH}/**/usgs_point_attr*.conus.parquet');"
).to_df()
attrs_list = [Attribute(name=i,type="categorical", description=i) for i in list(df.attribute_name)]
attrs_list

[Attribute(name='retro_10yr_recurrence_flow', type='categorical', description='retro_10yr_recurrence_flow'),
 Attribute(name='ecoregion_L2', type='categorical', description='ecoregion_L2'),
 Attribute(name='river_forecast_center', type='categorical', description='river_forecast_center'),
 Attribute(name='NWM_waterbody_dam_lengths', type='categorical', description='NWM_waterbody_dam_lengths'),
 Attribute(name='NID_dam_lengths', type='categorical', description='NID_dam_lengths'),
 Attribute(name='retro_2yr_recurrence_flow', type='categorical', description='retro_2yr_recurrence_flow'),
 Attribute(name='retro_100yr_recurrence_flow', type='categorical', description='retro_100yr_recurrence_flow'),
 Attribute(name='stream_order', type='categorical', description='stream_order')]

In [12]:
# Add some attributes
ev.attributes.add(attrs_list)

In [23]:
# Load the location attribute data
# ev.location_attributes.load_parquet(
#     in_path=ATTR_FILEPATH,
#     field_mapping={"attribute_value": "value"},
#     pattern="usgs_point_attr*.conus.parquet",
# )

In [6]:
%%time
# Create the joined timeseries
ev.joined_timeseries.create(execute_udf=True)

                                                                                

CPU times: user 591 ms, sys: 161 ms, total: 751 ms
Wall time: 1h 15min 38s


<teehr.evaluation.tables.JoinedTimeseriesTable at 0x7f071294c9e0>

In [None]:
ev.spark.stop()