In [7]:
import calendar
import logging
import os
import intake
import concurrent.futures
import numpy as np
import pandas as pd
import xarray as xr
import dask
import zarr
from dask.diagnostics import ProgressBar
from dask.distributed import Client, LocalCluster

In [8]:
r = '/caldera/hovenweep/projects/usgs/water'
d = os.path.join(r, 'wymtwsc', 'dketchum')
c404 = os.path.join(d, 'conus404')
dads = os.path.join(d, 'dads')
ghcn = os.path.join(d, 'climate', 'ghcn')

zarr_store = os.path.join(r, 'impd/hytest/conus404/conus404_hourly.zarr')
sites = os.path.join(dads, 'met', 'stations', 'madis_29OCT2024.csv')
csv_files = os.path.join(c404, 'station_data')

In [9]:
def get_quadrants(b):
    mid_longitude = (b[0] + b[2]) / 2
    mid_latitude = (b[1] + b[3]) / 2
    quadrant_nw = (b[0], mid_latitude, mid_longitude, b[3])
    quadrant_ne = (mid_longitude, mid_latitude, b[2], b[3])
    quadrant_sw = (b[0], b[1], mid_longitude, mid_latitude)
    quadrant_se = (mid_longitude, b[1], b[2], mid_latitude)
    quadrants = [quadrant_nw, quadrant_ne, quadrant_sw, quadrant_se]
    return quadrants


In [10]:
bounds = (-125.0, 25.0, -67.0, 53.0)
quadrants = get_quadrants(bounds)
sixteens = [get_quadrants(q) for q in quadrants]
sixteens = [x for xs in sixteens for x in xs]
sixteens[0]


(-125.0, 46.0, -110.5, 53.0)

In [11]:
stations = sites
nc_data = zarr_store
out_data = csv_files
workers=36
overwrite=False
bounds=sixteens[0]
start_yr=2014
end_yr=2014
mode = 'dask'

In [16]:
station_list = pd.read_csv(stations)
if 'LAT' in station_list.columns:
    station_list = station_list.rename(columns={'STAID': 'fid', 'LAT': 'latitude', 'LON': 'longitude'})
station_list.index = station_list['fid']

if bounds:
    w, s, e, n = bounds
    station_list = station_list[(station_list['latitude'] < n) & (station_list['latitude'] >= s)]
    station_list = station_list[(station_list['longitude'] < e) & (station_list['longitude'] >= w)]

print(f'{len(station_list)} stations to write')

dates = [(year, month, calendar.monthrange(year, month)[-1])
         for year in range(start_yr, end_yr + 1) for month in range(1, 13)]
station_list

3092 stations to write


Unnamed: 0_level_0,fid,latitude,longitude,elev,stype
fid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
BNBMD,BNBMD,47.644989,-122.529404,41.174999,AWS
BRMWA,BRMWA,47.641659,-122.607201,44.224998,AWS
CLLMC,CLLMC,47.204441,-120.974098,650.565002,AWS
EVRMT,EVRMT,47.919991,-122.221901,6.405000,AWS
GCOUL,GCOUL,47.938332,-119.004700,509.350006,AWS
...,...,...,...,...,...
2228P,2228P,47.646210,-122.696274,6.100000,utmesnet
D2826,D2826,48.076500,-123.448334,280.380005,APRSWXNET
AP250,AP250,46.638000,-111.943001,1138.000000,APRSWXNET
2306P,2306P,47.660450,-117.424278,568.099976,utmesnet


In [13]:
crs = "+proj=utm +zone=13 +ellps=GRS80 +datum=NAD83 +units=m +no_defs" 
import pyproj
import cartopy.crs as ccrs

ds = xr.open_zarr(nc_data, consolidated=True, chunks='auto')
crs_info = ds.crs
globe = ccrs.Globe(ellipse='sphere', semimajor_axis=6370000, semiminor_axis=6370000)
lcc = ccrs.LambertConformal(globe=globe,
                            central_longitude=crs_info.longitude_of_central_meridian, 
                            central_latitude=crs_info.latitude_of_projection_origin,
                            standard_parallels=crs_info.standard_parallel)
lcc_wkt = lcc.to_wkt()


def project_coordinates(bounds, target_crs):
  source_crs = 'epsg:4326'
  transformer = pyproj.Transformer.from_crs(source_crs, target_crs)
  west, south, east, north = bounds
  sw_x, sw_y = transformer.transform(south, west)
  ne_x, ne_y = transformer.transform(north, east)
  return sw_x, sw_y, ne_x, ne_y

bounds_proj = project_coordinates(bounds, lcc_wkt)
bounds_proj

(-2039715.0340867909,
 1070649.589525464,
 -849923.4175176456,
 1594627.8324152853)

In [14]:
hytest_cat = intake.open_catalog("https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml")
cat = hytest_cat['conus404-catalog']
dataset = 'conus404-hourly-onprem-hw'
ds = cat[dataset].to_dask()


  'dims': dict(self._ds.dims),


In [15]:
year, month, month_end = dates[0]
variables = ['T2', 'TD2', 'QVAPOR', 'U10', 'V10', 'PSFC', 'ACSWDNLSM']
ds = ds.sel(time=slice(f'{year}-{month}-01', f'{year}-{month}-{month_end}'))
ds = ds[variables]
# print(ds.y.values.min(), ds.y.values.max())
if bounds is not None:
    ds = ds.sel(y=slice(bounds_proj[1], bounds_proj[3]),
                x=slice(bounds_proj[0], bounds_proj[2]))
ds

Unnamed: 0,Array,Chunk
Bytes,151.98 kiB,68.36 kiB
Shape,"(131, 297)","(100, 175)"
Dask graph,6 chunks in 3 graph layers,6 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 151.98 kiB 68.36 kiB Shape (131, 297) (100, 175) Dask graph 6 chunks in 3 graph layers Data type float32 numpy.ndarray",297  131,

Unnamed: 0,Array,Chunk
Bytes,151.98 kiB,68.36 kiB
Shape,"(131, 297)","(100, 175)"
Dask graph,6 chunks in 3 graph layers,6 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,151.98 kiB,68.36 kiB
Shape,"(131, 297)","(100, 175)"
Dask graph,6 chunks in 3 graph layers,6 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 151.98 kiB 68.36 kiB Shape (131, 297) (100, 175) Dask graph 6 chunks in 3 graph layers Data type float32 numpy.ndarray",297  131,

Unnamed: 0,Array,Chunk
Bytes,151.98 kiB,68.36 kiB
Shape,"(131, 297)","(100, 175)"
Dask graph,6 chunks in 3 graph layers,6 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,110.42 MiB,9.61 MiB
Shape,"(744, 131, 297)","(144, 100, 175)"
Dask graph,36 chunks in 4 graph layers,36 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 110.42 MiB 9.61 MiB Shape (744, 131, 297) (144, 100, 175) Dask graph 36 chunks in 4 graph layers Data type float32 numpy.ndarray",297  131  744,

Unnamed: 0,Array,Chunk
Bytes,110.42 MiB,9.61 MiB
Shape,"(744, 131, 297)","(144, 100, 175)"
Dask graph,36 chunks in 4 graph layers,36 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,110.42 MiB,9.61 MiB
Shape,"(744, 131, 297)","(144, 100, 175)"
Dask graph,36 chunks in 4 graph layers,36 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 110.42 MiB 9.61 MiB Shape (744, 131, 297) (144, 100, 175) Dask graph 36 chunks in 4 graph layers Data type float32 numpy.ndarray",297  131  744,

Unnamed: 0,Array,Chunk
Bytes,110.42 MiB,9.61 MiB
Shape,"(744, 131, 297)","(144, 100, 175)"
Dask graph,36 chunks in 4 graph layers,36 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,110.42 MiB,9.61 MiB
Shape,"(744, 131, 297)","(144, 100, 175)"
Dask graph,36 chunks in 4 graph layers,36 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 110.42 MiB 9.61 MiB Shape (744, 131, 297) (144, 100, 175) Dask graph 36 chunks in 4 graph layers Data type float32 numpy.ndarray",297  131  744,

Unnamed: 0,Array,Chunk
Bytes,110.42 MiB,9.61 MiB
Shape,"(744, 131, 297)","(144, 100, 175)"
Dask graph,36 chunks in 4 graph layers,36 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,110.42 MiB,9.61 MiB
Shape,"(744, 131, 297)","(144, 100, 175)"
Dask graph,36 chunks in 4 graph layers,36 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 110.42 MiB 9.61 MiB Shape (744, 131, 297) (144, 100, 175) Dask graph 36 chunks in 4 graph layers Data type float32 numpy.ndarray",297  131  744,

Unnamed: 0,Array,Chunk
Bytes,110.42 MiB,9.61 MiB
Shape,"(744, 131, 297)","(144, 100, 175)"
Dask graph,36 chunks in 4 graph layers,36 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,110.42 MiB,9.61 MiB
Shape,"(744, 131, 297)","(144, 100, 175)"
Dask graph,36 chunks in 4 graph layers,36 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 110.42 MiB 9.61 MiB Shape (744, 131, 297) (144, 100, 175) Dask graph 36 chunks in 4 graph layers Data type float32 numpy.ndarray",297  131  744,

Unnamed: 0,Array,Chunk
Bytes,110.42 MiB,9.61 MiB
Shape,"(744, 131, 297)","(144, 100, 175)"
Dask graph,36 chunks in 4 graph layers,36 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,110.42 MiB,9.61 MiB
Shape,"(744, 131, 297)","(144, 100, 175)"
Dask graph,36 chunks in 4 graph layers,36 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 110.42 MiB 9.61 MiB Shape (744, 131, 297) (144, 100, 175) Dask graph 36 chunks in 4 graph layers Data type float32 numpy.ndarray",297  131  744,

Unnamed: 0,Array,Chunk
Bytes,110.42 MiB,9.61 MiB
Shape,"(744, 131, 297)","(144, 100, 175)"
Dask graph,36 chunks in 4 graph layers,36 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,110.42 MiB,9.61 MiB
Shape,"(744, 131, 297)","(144, 100, 175)"
Dask graph,36 chunks in 4 graph layers,36 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 110.42 MiB 9.61 MiB Shape (744, 131, 297) (144, 100, 175) Dask graph 36 chunks in 4 graph layers Data type float32 numpy.ndarray",297  131  744,

Unnamed: 0,Array,Chunk
Bytes,110.42 MiB,9.61 MiB
Shape,"(744, 131, 297)","(144, 100, 175)"
Dask graph,36 chunks in 4 graph layers,36 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [10]:
def get_month_met(nc_data_, station_list_, date_, out_data, overwrite, bounds_=None):
    """"""
    import xoak
    year, month, month_end = date_
    date_string = '{}-{}'.format(year, str(month).rjust(2, '0'))

    variables = ['T2', 'TD2', 'QVAPOR', 'U10', 'V10', 'PSFC', 'ACSWDNLSM']
    fids = station_list_.index.to_list()
    station_list_ = station_list_.to_xarray()

    print(f'read zarr {date_string} ')
    ds = xr.open_zarr(nc_data_, consolidated=True, chunks='auto')
    print(f'select time {date_string} ')
    ds = ds.sel(time=slice(f'{year}-{month}-01', f'{year}-{month}-{month_end}'))
    ds = ds[variables]
    if bounds_ is not None:
        ds = ds.sel(y=slice(bounds_proj[1], bounds_proj[3]),
                    x=slice(bounds_proj[0], bounds_proj[2]))
    print(f'index stations {date_string} ')
    ds.xoak.set_index(['lat', 'lon'], 'sklearn_geo_balltree')
    ds = ds.xoak.sel(lat=station_list_.latitude, lon=station_list_.longitude)
    ds = xr.merge([station_list_, ds])

    all_df = ds.to_dataframe()
    print(f'write {date_string} from dataframe...')

    try:
        ct = 0
        for fid in fids:
            dst_dir = os.path.join(out_data, fid)
            if not os.path.exists(dst_dir):
                os.mkdir(dst_dir)
            _file = os.path.join(dst_dir, f'{fid}_{date_string}.parquet')
            if not os.path.exists(_file) or overwrite:
                df_station = all_df.loc[slice(fid), slice(None)].copy()
                df_station = df_station.groupby(df_station.index.get_level_values('time')).first()
                df_station['dt'] = [i.strftime('%Y%m%d%H') for i in df_station.index]
                df_station.to_parquet(_file, index=False)
                ct += 1
        if ct % 10 == 0.:
            print(f'{ct} of {len(fids)} for {date_string}')
    except Exception as exc:
        print(f'{date_string}: {exc}')

    del ds

In [11]:
station_list

Unnamed: 0_level_0,fid,latitude,longitude,elev,stype
fid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
BNBMD,BNBMD,47.644989,-122.529404,41.174999,AWS
BRMWA,BRMWA,47.641659,-122.607201,44.224998,AWS
CLLMC,CLLMC,47.204441,-120.974098,650.565002,AWS
EVRMT,EVRMT,47.919991,-122.221901,6.405000,AWS
GCOUL,GCOUL,47.938332,-119.004700,509.350006,AWS
...,...,...,...,...,...
2228P,2228P,47.646210,-122.696274,6.100000,utmesnet
D2826,D2826,48.076500,-123.448334,280.380005,APRSWXNET
AP250,AP250,46.638000,-111.943001,1138.000000,APRSWXNET
2306P,2306P,47.660450,-117.424278,568.099976,utmesnet


In [12]:
if mode == 'debug':
    for date in dates:
        get_month_met(nc_data, station_list, date, out_data, overwrite, bounds_proj)

elif mode == 'multi':
    with concurrent.futures.ProcessPoolExecutor(max_workers=workers) as executor:
        futures = [executor.submit(get_month_met, nc_data, station_list, dt, out_data, overwrite, bounds_proj)
                   for dt in dates]
        concurrent.futures.wait(futures)

elif mode == 'dask':
    cluster = LocalCluster(n_workers=workers, memory_limit='32GB', threads_per_worker=1,
                           silence_logs=logging.ERROR)
    client = Client(cluster)
    print("Dask cluster started with dashboard at:", client.dashboard_link)
    station_list = client.scatter(station_list)
    tasks = [dask.delayed(get_month_met)(nc_data, station_list, date, out_data, overwrite, bounds_proj) for date in
             dates]
    dask.compute(*tasks)
    client.close()

Dask cluster started with dashboard at: http://127.0.0.1:8787/status


2024-11-16 23:29:17,974 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
2024-11-16 23:29:17,974 - distributed.nanny - ERROR - Worker process died unexpectedly
2024-11-16 23:29:17,975 - distributed.nanny - ERROR - Worker process died unexpectedly
2024-11-16 23:29:17,976 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
2024-11-16 23:29:17,976 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
2024-11-16 23:29:17,976 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/dketchum/miniconda3/envs/hyt/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
   

read zarr 2014-12 
select time 2014-12 
index stations 2014-12 
write 2014-12 from dataframe...
read zarr 2014-01 
select time 2014-01 
index stations 2014-01 
write 2014-01 from dataframe...
0 of 3092 for 2014-01
read zarr 2014-02 
select time 2014-02 
index stations 2014-02 
write 2014-02 from dataframe...
0 of 3092 for 2014-02
read zarr 2014-08 
select time 2014-08 
index stations 2014-08 
write 2014-08 from dataframe...
read zarr 2014-03 
select time 2014-03 
index stations 2014-03 
write 2014-03 from dataframe...
read zarr 2014-07 
select time 2014-07 
index stations 2014-07 
write 2014-07 from dataframe...
read zarr 2014-10 
select time 2014-10 
index stations 2014-10 
write 2014-10 from dataframe...
150 of 3092 for 2014-10
read zarr 2014-06 
select time 2014-06 
index stations 2014-06 
write 2014-06 from dataframe...
read zarr 2014-09 
select time 2014-09 
index stations 2014-09 
write 2014-09 from dataframe...
read zarr 2014-11 
select time 2014-11 
index stations 2014-11 
writ