In [1]:
# NEEDS WRADLIB 1.19 !! 

import wradlib as wrl
import numpy as np
import sys
import glob
import xarray as xr
import os
import datetime as dt
import pandas as pd
from tqdm.notebook import trange, tqdm

import warnings
warnings.filterwarnings('ignore')
import xradar as xd
import datatree

In [2]:
import netCDF4
import packaging

In [3]:
assert packaging.version.Version(netCDF4.__version__) <=  packaging.version.Version("1.6.0")

AssertionError: 

In [4]:
print(f"xradar: {xd.__version__}")
print(f"wradlib: {wrl.__version__}")

xradar: 0.0.13
wradlib: 1.19.0


In [5]:
import time
start_time = time.time()

# Read DWD file to retrieve encoding values

In [6]:
dwd = xr.open_dataset("/automount/ags/jgiles/turkey_test/ras07-vol5minng01_sweeph5onem_allmoms_00-2017072700005800-pro-10392-hd5", group="sweep_0")
display(dwd)

# tamper with encoding

In [7]:
drop = ["szip", "zstd", "source", "chunksizes", "bzip2", "blosc", "shuffle", "fletcher32", "original_shape", "coordinates", "contiguous"]
dwd_enc = {k: {key: v.encoding[key] for key in v.encoding if key not in drop} for k, v in dwd.data_vars.items() if v.ndim == 3}
dwd_enc["PHIDP"] = dwd_enc["UPHIDP"]
dwd_enc["DBTH"] = dwd_enc["TH"]
dwd_enc["DBTV"] = dwd_enc["TV"]
dwd_enc


{'UZDR': {'zlib': True,
  'complevel': 6,
  'dtype': dtype('uint16'),
  '_FillValue': 65535,
  'scale_factor': 0.0009766072055300383,
  'add_offset': -32.00097660720553},
 'WRADH': {'zlib': True,
  'complevel': 6,
  'dtype': dtype('uint16'),
  '_FillValue': 65535,
  'scale_factor': 0.0019532144110600766,
  'add_offset': -0.0019532144110600766},
 'ZDR': {'zlib': True,
  'complevel': 6,
  'dtype': dtype('uint16'),
  '_FillValue': 65535,
  'scale_factor': 0.0009766072055300383,
  'add_offset': -32.00097660720553},
 'KDP': {'zlib': True,
  'complevel': 6,
  'dtype': dtype('uint16'),
  '_FillValue': 65535,
  'scale_factor': 0.00045778462759220545,
  'add_offset': -15.000457784627592},
 'DBZH': {'zlib': True,
  'complevel': 6,
  'dtype': dtype('uint16'),
  '_FillValue': 65535,
  'scale_factor': 0.002929821616590115,
  'add_offset': -64.00292982161659},
 'UVRADH': {'zlib': True,
  'complevel': 6,
  'dtype': dtype('uint16'),
  '_FillValue': 65535,
  'scale_factor': 0.003906369212927641,
  'add

# Import and set Dask stuff

In [8]:
import dask
from dask.distributed import Client
# not sure if this is needed
client = Client(n_workers=8)
client
from dask.diagnostics import ProgressBar

# Get Files

In [9]:
# Get all files for one day
htypath = sorted(glob.glob("/automount/ags/jgiles/turkey_test/acq/OLDDATA/uza/RADAR/2017/07/27/HTY/RAW/*"))


In [10]:
# Create a dataframe to store the metadata of all files and then select it more easily

# Read attributes of files
radarid = []
dtime = []
taskname = []
elevation = []
nrays_expected = []
nrays_written = []
nbins = []
rlastbin = []
binlength = []
horbeamwidth = []
fpath = []

for f in htypath:
    print(".", end="")
    # Read metadata
    m = wrl.io.read_iris(f, loaddata=False, keep_old_sweep_data=True)
    # Extract info
    fname = os.path.basename(f).split(".")[0]
    radarid_ = fname[0:3]
    dtimestr = fname[3:]
    dtime_ = dt.datetime.strptime(dtimestr, "%y%m%d%H%M%S")
    taskname_ = m["product_hdr"]["product_configuration"]["task_name"].strip()
    nbins_ = m["nbins"]
    rlastbin_ = m["ingest_header"]["task_configuration"]["task_range_info"]["range_last_bin"]/100
    binlength_ = m["ingest_header"]["task_configuration"]["task_range_info"]["step_output_bins"]/100
    horbeamwidth_ = round(m["ingest_header"]["task_configuration"]["task_misc_info"]["horizontal_beam_width"], 2)
    for i in range(10):
        try:
            nrays_expected_ = m["data"][i]["ingest_data_hdrs"]["DB_DBZ"]["number_rays_file_expected"]
            nrays_written_ = m["data"][i]["ingest_data_hdrs"]["DB_DBZ"]["number_rays_file_written"]    
            elevation_ = round(m["data"][i]["ingest_data_hdrs"]["DB_DBZ"]["fixed_angle"], 2)
            break
        except KeyError:
            try:
                nrays_expected_ = m["data"][i]["ingest_data_hdrs"]["DB_DBZ2"]["number_rays_file_expected"]
                nrays_written_ = m["data"][i]["ingest_data_hdrs"]["DB_DBZ2"]["number_rays_file_written"]    
                elevation_ = round(m["data"][i]["ingest_data_hdrs"]["DB_DBZ2"]["fixed_angle"], 2)
                break
            except KeyError:
                continue
    # Append to list
    radarid.append(radarid_)
    dtime.append(dtime_)
    taskname.append(taskname_)
    elevation.append(elevation_)
    nbins.append(nbins_)
    rlastbin.append(rlastbin_)
    binlength.append(binlength_)
    #nrays_expected.append(nrays_expected_)
    #nrays_written.append(nrays_written_)
    fpath.append(f)
    horbeamwidth.append(horbeamwidth_)   

# put attributes in a dataframe
from collections import OrderedDict
df = pd.DataFrame(OrderedDict(
                  {"radarid": radarid,
                   "datetime": dtime,
                   "taskname": taskname,
                   "elevation": elevation,
                   #"nrays_expected": nrays_expected,
                   #"nrays_written": nrays_written,
                   "nbins": nbins,
                   "rlastbin": rlastbin,
                   "binlength": binlength,
                   "horbeamwidth": horbeamwidth,
                   "fpath": fpath                   
                  }))

........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................

In [11]:
# Let's open one scanning mode and one elevation (this will take some minutes to load)
mode = 'VOL_A'
elev = 0.

# Use the dataframe to get the paths that correspond to our selection
paths = df["fpath"].loc[df["elevation"]==elev].loc[df["taskname"]==mode]

In [12]:
paths = sorted(list(paths))
print(len(paths))


240


# set engine

In [13]:
# engine = "netcdf4"
engine = "h5netcdf"

# Reading functions

In [14]:
def read_single(f):
    reindex = dict(start_angle=-0.5, stop_angle=360, angle_res=1., direction=1)
    ds = xr.open_dataset(f, engine="iris", group="sweep_0", reindex_angle=reindex)
    ds = ds.set_coords("sweep_mode")
    ds = ds.rename_vars(time="rtime")
    ds = ds.assign_coords(time=ds.rtime.min())
    return ds


In [15]:
@dask.delayed
def process_single(f, num, dest):
    ds = read_single(f)
    moments = [k for k,v in ds.variables.items() if v.ndim == 2]
    new_enc = {k: dwd_enc[k] for k in moments if k in dwd_enc}
    shape = ds[moments[0]].shape
    enc_new = dict(chunksizes=(1, ) + shape[1:])
    [new_enc[k].update(enc_new) for k in new_enc]
    dest = f"{dest}{num:03d}.nc"
    ds.to_netcdf(dest, engine=engine, encoding=new_enc)
    return dest

# Convert Files in subfolder

In [17]:
%%time
dest = "/home/jgiles/turkey_test/test_"
results = []
# fill dask compute pipeline
for i, f in tqdm(enumerate(paths)):
    results.append(client.compute(process_single(f, i, dest)))
# compute pipeline
# this returns, if all results are computed
for res in results:
    print(res.result())    

0it [00:00, ?it/s]

2023-04-17 11:08:01,412 - tornado.application - ERROR - Exception in callback <bound method Worker.trigger_profile of <Worker 'tcp://127.0.0.1:36915', name: 0, status: running, stored: 0, running: 2/2, ready: 1, comm: 0, waiting: 0>>
Traceback (most recent call last):
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/tornado/ioloop.py", line 921, in _run
    val = self.callback()
          ^^^^^^^^^^^^^^^
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/distributed/worker.py", line 2417, in trigger_profile
    state = profile.process(
            ^^^^^^^^^^^^^^^^
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/distributed/profile.py", line 173, in process
    ident = identifier(frame)
            ^^^^^^^^^^^^^^^^^
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/distributed/profile.py", line 61, in identifier
    frame.f_code.co_name,
    ^^^^^^^^^^^^^^^^^^^^
AttributeError: 'int' ob

/home/jgiles/turkey_test/000.nc


2023-04-17 11:08:14,175 - tornado.application - ERROR - Exception in callback <bound method Worker.trigger_profile of <Worker 'tcp://127.0.0.1:40095', name: 0, status: running, stored: 0, running: 2/2, ready: 1, comm: 0, waiting: 0>>
Traceback (most recent call last):
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/tornado/ioloop.py", line 921, in _run
    val = self.callback()
          ^^^^^^^^^^^^^^^
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/distributed/worker.py", line 2417, in trigger_profile
    state = profile.process(
            ^^^^^^^^^^^^^^^^
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/distributed/profile.py", line 173, in process
    ident = identifier(frame)
            ^^^^^^^^^^^^^^^^^
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/distributed/profile.py", line 61, in identifier
    frame.f_code.co_name,
    ^^^^^^^^^^^^^^^^^^^^
AttributeError: 'int' ob

2023-04-17 11:08:44,757 - tornado.application - ERROR - Exception in callback <bound method Worker.trigger_profile of <Worker 'tcp://127.0.0.1:35281', name: 1, status: running, stored: 3, running: 2/2, ready: 1, comm: 0, waiting: 0>>
Traceback (most recent call last):
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/tornado/ioloop.py", line 921, in _run
    val = self.callback()
          ^^^^^^^^^^^^^^^
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/distributed/worker.py", line 2421, in trigger_profile
    profile.process(
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/distributed/profile.py", line 173, in process
    ident = identifier(frame)
            ^^^^^^^^^^^^^^^^^
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/distributed/profile.py", line 61, in identifier
    frame.f_code.co_name,
    ^^^^^^^^^^^^^^^^^^^^
AttributeError: 'dict' object has no attribute 'co_name'
2023

/home/jgiles/turkey_test/001.nc


2023-04-17 11:09:25,503 - tornado.application - ERROR - Exception in callback <bound method Worker.trigger_profile of <Worker 'tcp://127.0.0.1:36229', name: 4, status: running, stored: 17, running: 2/2, ready: 1, comm: 0, waiting: 0>>
Traceback (most recent call last):
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/tornado/ioloop.py", line 921, in _run
    val = self.callback()
          ^^^^^^^^^^^^^^^
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/distributed/worker.py", line 2417, in trigger_profile
    state = profile.process(
            ^^^^^^^^^^^^^^^^
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/distributed/profile.py", line 173, in process
    ident = identifier(frame)
            ^^^^^^^^^^^^^^^^^
  File "/home/jgiles/mambaforge/envs/wradlib3/lib/python3.11/site-packages/distributed/profile.py", line 61, in identifier
    frame.f_code.co_name,
    ^^^^^^^^^^^^^^^^^^^^
AttributeError: 'Lock' 

KeyboardInterrupt: 

# Reload converted files

In [None]:
%%time
dsr = xr.open_mfdataset(f"{dest}*", concat_dim="time", combine="nested", engine=engine)
display(dsr)

# Fix encoding before write to single file

In [None]:
moments = [k for k,v in dsr.variables.items() if v.ndim == 3]
shape = dsr[moments[0]].shape
enc_new= dict(chunksizes=(1, ) + shape[1:])

drop = ['szip', 'zstd', 'bzip2', 'blosc', 'coordinates']
enc = {k: {key: v.encoding[key] for key in v.encoding if key not in drop} for k, v in dsr.data_vars.items() if k in moments}
[enc[k].update(enc_new) for k in moments if k not in ["DB_HCLASS2"]]
del enc["DB_HCLASS2"]["chunksizes"]
encoding = {k: enc[k] for k in moments}
print(encoding)

# Write to single file

In [None]:
%%time
dsr.to_netcdf(f"iris-test-compressed-{engine}.nc", engine=engine, encoding=encoding)

In [None]:
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
#!h5dump -HBvp iris-test-compressed-h5netcdf.nc