In [None]:
!pip install --quiet duckdb duckdb-engine kerchunk zarr

In [None]:
import timeit
import numpy as np
import matplotlib.pyplot as plt
import fsspec


In [None]:
reach_ids = [
    7978071,
    15059811,
    15039097,
    15039077,
    15038825,
    15034617,
    15034581,
    15034577,
    15034485,
    15034469,
    15034467,
    15034459,
    15034409,
]

In [None]:
repeat = 20

In [None]:
reach_id = reach_ids[2]

In [None]:
reference_time = '2023-01-01'
ensemble_member = 1

## DuckDB

In [None]:
import duckdb

In [None]:
# Import jupysql Jupyter extension to create SQL cells
%load_ext sql

In [None]:
%sql duckdb:///:default:

In [None]:
%%sql
INSTALL httpfs;
LOAD httpfs;

In [None]:
%sql SET s3_endpoint='storage.googleapis.com';

In [None]:
def get_duckdb():
    # read the result of an arbitrary SQL query to a Pandas DataFrame
    results = duckdb.sql(f"""
    SELECT
        time,
        streamflow,
        velocity,
        feature_id
    FROM read_parquet(
        's3://national-water-model-parq/channel_rt/long_range_mem{ensemble_member}/nwm.{reference_time.replace('-','')}.t00z.long_range.channel_rt_{ensemble_member}.f*.conus.parq.gz'
    )
    WHERE
        feature_id = {reach_id}
    ORDER BY
        time;
    """).df()
    return results

In [None]:
%time streamflow = get_duckdb()

In [None]:
streamflow['streamflow'].plot()

In [None]:
duckdb_times = timeit.repeat(get_duckdb, number=1, repeat=repeat)

## netCDF native

In [None]:
import xarray as xr

In [None]:
fs = fsspec.filesystem('gcs', anon=True, skip_incstance_cache=True)

In [None]:
flist = fs.glob(f"national-water-model/nwm.{reference_time.replace('-','')}/long_range_mem{ensemble_member}/nwm.t00z.long_range.channel_rt_{ensemble_member}.f*.conus.nc")

In [None]:
uris = ["gs://" + f for f in flist]

In [None]:
def get_netcdf():
  datasets = []
  for uri in uris:
    with fsspec.open(uri,'rb') as f:
      ds_i = xr.open_dataset(f).sel(feature_id = reach_id )[['velocity', 'streamflow','feature_id','time']].compute()
      datasets.append(ds_i)

  ds = xr.concat(datasets, dim='time')

  return ds.compute().to_dataframe()

In [None]:
netcdf_times = timeit.repeat(get_netcdf, number=1, repeat=repeat)

## Kerchunking netCDF

In [None]:
from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr

In [None]:
fs = fsspec.filesystem('gcs', anon=True, skip_incstance_cache=True)

In [None]:
best_hour='f001'
var = 'channel_rt'

In [None]:
flist = fs.glob(f"national-water-model/nwm.{reference_time.replace('-','')}/long_range_mem{ensemble_member}/nwm.t00z.long_range.channel_rt_{ensemble_member}.f*.conus.nc")

In [None]:
urls = ["gs://" + f for f in flist]

In [None]:
def gen_kerchunk(u):
    with fsspec.open(u,'rb') as infile:
        h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
        return h5chunks.translate()

In [None]:
%time results = list(map(gen_kerchunk, urls))

In [None]:
mzz = MultiZarrToZarr(
    results,
    remote_protocol="gcs",
    remote_options={'anon': True},
    concat_dims=["time"]
)

In [None]:
kerchunk_result = mzz.translate()

In [None]:
fs = fsspec.filesystem("reference", fo=kerchunk_result,
                       remote_protocol='gs', remote_options={'anon':True})

m = fs.get_mapper("")

In [None]:
def get_kerchunk():
    ds = xr.open_dataset(
        m,
        engine="zarr",
        consolidated=False
    )
    ds = ds.sel(feature_id = reach_id)[['velocity', 'streamflow','feature_id']]
    return ds.compute().to_dataframe()

In [None]:
%time ds = get_kerchunk()

In [None]:
ds

In [None]:
kerchunk_times = timeit.repeat(get_kerchunk, number=1, repeat=repeat)

## NWM API

In [None]:
import pandas as pd
import requests
from io import StringIO

In [None]:
api_endpoint = 'https://nwm-api-kmarkert-test-jxw7jm8.uc.gateway.dev/forecast_records'

api_key = 'YOUR_API_KEY'

In [None]:
header = {
    'x-api-key': api_key
}

params = {
    'forecast_type': 'long_range',
    'reference_time': reference_time,
    'ensemble': ensemble_member-1,
    'comids': reach_id,
    'output_format': 'csv'
}

In [None]:
def get_api():
    r = requests.get(api_endpoint, params=params, headers=header)
    return pd.read_csv(StringIO(r.text))

In [None]:
api_times = timeit.repeat(get_api, number=1, repeat=repeat)

## BigQuery

In [None]:
from google.cloud.bigquery import Client, QueryJobConfig

In [None]:
# initialize the Big Query client for submitting jobs
client = Client(project='ciroh-water-demo')
job_config = QueryJobConfig(use_query_cache=False)

In [None]:
def get_bq():
    # define the query to get the forecast
    query = f"""
    SELECT
        time,
        streamflow,
        velocity,
        feature_id
    FROM
        `ciroh-water-demo.national_water_model_demo.channel_rt_long_range`
    WHERE
        feature_id = {reach_id}
        AND reference_time = '{reference_time}'
        AND ensemble = {ensemble_member-1}
    ORDER BY
        time
    """
    # submit the BQ job and load as a pandas dataframe
    job = client.query(query, job_config=job_config)
    return job.to_dataframe()

In [None]:
%time streamflow_bq = get_bq()

In [None]:
streamflow_bq

In [None]:
bq_times = timeit.repeat(get_bq, number=1, repeat=repeat)

## Plot results

In [None]:
import matplotlib.pyplot as plt

In [None]:
data =[
    np.mean(duckdb_times),
    np.mean(kerchunk_times),
    np.mean(bq_times)
]

data_std = [
    np.std(duckdb_times),
    np.std(kerchunk_times),
    np.std(bq_times)
]

In [None]:
fig, ax = plt.subplots()
ax.bar(range(0,3),data,yerr=data_std)
ax.set_xticks(range(3))
ax.set_xticklabels(['DuckDB/Parquet', 'xarray/netCDF', 'BigQuery'], rotation=30)
ax.set_ylabel('Response time [s]')

In [None]:
result_df = pd.DataFrame({
    'duckdb': duckdb_times,
    'kerchunk': kerchunk_times,
    'bq': bq_times,
    'netcdf': netcdf_times,
    'api': api_times
})

In [None]:
result_df