In [None]:
import os
import duckdb
import pandas as pd
import numpy as np
from arch.bootstrap import StationaryBootstrap
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, FloatType, StringType, MapType, StructType, StructField

In [None]:
config = {
    "spark.kubernetes.authenticate.driver.serviceAccountName": "jupyter",
    "spark.kubernetes.namespace": "teehr-spark-default",
    "spark.kubernetes.container.image": os.environ["TEEHR_WORKER_IMAGE"],
    "spark.executor.extraJavaOptions=-Daws.region": "us-east-1",
    "spark.driver.extraJavaOptions=-Daws.region": "us-east-1",
    "spark.executor.instances": "2",
    "spark.executor.memory": "4g",
    "spark.executor.cores": "2",
    "spark.driver.blockManager.port": "7777",
    "spark.driver.port": "2222",
    "spark.driver.host": "jupyter.teehr-spark-default.svc.cluster.local",
    "spark.driver.bindAddress": "0.0.0.0",
    "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider",
    "spark.sql.catalog.demo.s3.access-key-id": "minio",
    "spark.sql.catalog.demo.s3.secret-access-key": "password123",
    "spark.sql.parquet.enableVectorizedReader": "false",
    "spark.kubernetes.executor.node.selector.dedicated": "worker-cpu",
    "spark.kubernetes.executor.podTemplateFile": "/home/spark/pod-template.yaml",
    "spark.sql.execution.arrow.pyspark.enabled": "true",
}

def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("k8s://https://kubernetes.default.svc.cluster.local")
    for key, value in config.items():
        conf.set(key, value)    
    return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()

In [None]:
spark = get_spark_session("teehr-workers", SparkConf())
# spark.sparkContext.getConf().getAll()

In [None]:
SparkSession.builder.master("local[*]").getOrCreate().stop()

conf = (
    SparkConf()
    .setAppName('TestJupyter')
    .set("spark.sql.catalog.demo.s3.access-key-id", "minio")
    .set("spark.sql.catalog.demo.s3.secret-access-key", "password123")
)
## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# spark.sparkContext.getConf().getAll()

In [None]:
DATABASE = "streamflow"
PRIMARY_TABLE = "primary"
SECONDARY_TABLE = "secondary"
CROSSWALK_TABLE = "crosswalk"
JOINED_TABLE = "joined"

In [None]:
def kling_gupta_efficiency(p: pd.Series, s: pd.Series) -> float:
    
    if len(s) == 0 or len(s) == 0:
        return np.nan
    std_p = np.std(p)
    mean_p = np.mean(p)
    std_s = np.std(s)

    if std_p == 0 or mean_p == 0 or std_s == 0:
        return np.nan
        
    # Pearson correlation coefficient
    linear_correlation = np.corrcoef(s, p)[0,1]

    # Relative variability
    relative_variability = std_s / std_p

    # Relative mean
    relative_mean = np.mean(s) / mean_p

    # Scaled Euclidean distance
    euclidean_distance = np.sqrt(
        (1 * (linear_correlation - 1.0)) ** 2.0 + 
        (1 * (relative_variability - 1.0)) ** 2.0 + 
        (1* (relative_mean - 1.0)) ** 2.0
        )

    # Return KGE
    return 1.0 - euclidean_distance

In [None]:
@pandas_udf( MapType(StringType(), FloatType()) )
def bs_kling_gupta_efficiency(p: pd.Series, s: pd.Series) -> float:
    
    bs = StationaryBootstrap(365, p, s, seed=1234)
    results = bs.apply(kling_gupta_efficiency, 1000)
    quantiles = (0.05, 0.50, 0.95)
    values = np.quantile(results, quantiles)
    quantiles = [f"KGE_{str(i)}" for i in quantiles]
    d = dict(zip(quantiles,values))
    return d

In [None]:
spark.udf.register("bs_kling_gupta_efficiency", bs_kling_gupta_efficiency)

In [None]:
%%time
# Calculate a few basic metrics for python UFDs
sdf = spark.sql(f"""
WITH joined as (
    SELECT * FROM {DATABASE}.{JOINED_TABLE}
)
, metrics AS (
    SELECT
        joined.primary_location_id
        , bs_kling_gupta_efficiency(joined.primary_value, joined.secondary_value) as bs_kling_gupta_efficiency
    FROM
        joined
    GROUP BY
        joined.primary_location_id
)
SELECT
    *
FROM metrics
  -- WHERE primary_location_id IN ('usgs-01010070', 'usgs-01105500')
ORDER BY
    metrics.primary_location_id
""")
sdf.show()

In [None]:
obs = spark.read.parquet("s3a://ciroh-rti-public-data/teehr-data-warehouse/common/observations/usgs_conus/streamflow_hourly_inst/*.parquet")
sim = spark.read.parquet("s3a://ciroh-rti-public-data/teehr-data-warehouse/common/baselines/nwm30_retrospective_conus/streamflow_hourly_inst/*.parquet")
xw = spark.read.parquet("s3a://ciroh-rti-public-data/teehr-data-warehouse/common/crosswalks/usgs_nwm30_crosswalk.conus.parquet")

obs.createTempView("obs_temp")
sim.createTempView("sim_temp")
xw.createTempView("xw_temp")

In [None]:
%%time
# Calculate a few basic metrics for python UFDs
sdf = spark.sql(f"""
WITH joined as (
    SELECT
        sf.reference_time
        , sf.value_time as value_time
        , sf.location_id as secondary_location_id
        , pf.reference_time as reference_time
        , sf.value as secondary_value
        , sf.configuration
        , sf.measurement_unit
        , sf.variable_name
        , pf.value as primary_value
        , pf.location_id as primary_location_id
    FROM sim_temp sf
    JOIN xw_temp cf
        on cf.secondary_location_id = sf.location_id
    JOIN obs_temp pf
        on cf.primary_location_id = pf.location_id
        and sf.value_time = pf.value_time
        and sf.measurement_unit = pf.measurement_unit
        and sf.variable_name = pf.variable_name
)
, metrics AS (
    SELECT
        joined.primary_location_id
        , bs_kling_gupta_efficiency(joined.primary_value, joined.secondary_value) as bs_kling_gupta_efficiency
    FROM
        joined
    GROUP BY
        joined.primary_location_id
)
SELECT
    *
FROM metrics
  -- WHERE primary_location_id IN ('usgs-01010070', 'usgs-01105500')
ORDER BY
    metrics.primary_location_id
""")
sdf.show()

In [None]:
spark.stop()