In [None]:
# initializer
import pathlib

# Make directories on shared (local) storage
for local_dir in [
    conf_local_root,
    conf_local_knmi,
    conf_local_odim,
    conf_local_vp,
    conf_local_conf,
]:
    local_dir = pathlib.Path(local_dir)
    if not local_dir.exists():
        local_dir.mkdir(parents=True, exist_ok=True)
# Reference files
if not pathlib.Path(conf_local_radar_db).exists():
    from minio import Minio, S3Error

    minioClient = Minio(
        endpoint=param_minio_endpoint,
        access_key=param_minio_access_key,
        secret_key=param_minio_secret_key,
        secure=True,
    )
    print(f"{conf_local_radar_db} not found, downloading")
    minioClient.fget_object(
        bucket_name=conf_minio_public_bucket_name,
        object_name=conf_minio_public_conf_radar_db_object_name,
        file_path=conf_local_radar_db,
    )

# Now produce a variable which acts as a marker for the workflow manager
# We can then drag a line from the configuration / initializer
# and time the start of the rest of the workflow
# If you decide to make different sets of configurations, you can store them
# and decide per workflow which config to attach
init_complete = "Yes"  # Cant sent bool
print("Finished initialization")

In [None]:
# list-knmi-files
"""
consume dummy var from config to signal workflow start
There is something dodgy going on with how
strings are being passed around.
The string "Yes" is being sent as '"Yes"'
So, to prevent extra quotes being introduced
we eval init_complete first before
we test if it contains "Yes"
"""
# Libraries
import requests

# Strip any extra quotes
init_complete = init_complete.replace("'", "")
init_complete = init_complete.replace('"', "")
if init_complete == "Yes":
    print("Workflow configuration succesfull")
else:
    print("Workflow configuration was not complete, exitting")
    import sys

    sys.exit(1)

# Notes:
# Timestamps in iso8601
# 2020-01-01T00:00+00:00

# configure
start_ts = param_start_date
end_ts = param_end_date
datasetName, datasetVersion, api_url, _ = conf_radars.get(param_radar)
params = {
    "datasetName": datasetName,
    "datasetVersion": datasetVersion,
    "maxKeys": 10,
    "sorting": "asc",
    "orderBy": "created",
    "begin": start_ts,
    "end": end_ts,
}
# Request a response from the KNMI severs
# Try the next page tokens
dataset_files = []
while True:
    list_files_response = requests.get(
        url=api_url, headers={"Authorization": param_api_key}, params=params
    )
    list_files = list_files_response.json()
    dset_files = list_files.get("files")
    dset_files = [list(dset_file.values()) for dset_file in dset_files]
    dataset_files += dset_files
    nextPageToken = list_files.get("nextPageToken")
    if not nextPageToken:
        break
    else:
        params.update({"nextPageToken": nextPageToken})

# KNMI outputs per 5 minutes, per 15 is less of a heavy hit on downloads and processing
# Quick and dirty way to only keep the 15 minute measurements.
# Check API if we can filter for this on their end. If not fine
filtered_list = []
interval_list = list(range(0, 60, param_interval))
for dataset_file in dataset_files:
    minute = int(dataset_file[0].split("_")[-1].split(".")[0][-2:])
    if minute in interval_list:
        filtered_list.append(dataset_file)

dataset_files = filtered_list
print(f"Found {len(dataset_files)} files")
print(dataset_files)

In [None]:
# KNMI-to-ODIM-converter
"""
notes: 
Need to add this such that it can upload the PVOL From this stage
Need to add option such that this can remove the PVOL files from this stage. 
Warning, with the removal of PVOL on this stage auto-bricks the VP / RBC gen
We can introduce a flag check where RBC and VP check if PVOL 'needed' to be removed
If that flag is met - abort, there 'shouldnt' be any INPUT files then. 
"""
import subprocess
import pathlib
import h5py
import json
import sys
import shutil

# from typing import List, Object
import math


def str2bool(v):
    if isinstance(v, bool):
        return v
    if v.lower() in ("yes", "true", "t", "y", "1"):
        return True
    elif v.lower() in ("no", "false", "f", "n", "0"):
        return False
    else:
        raise Exception


class FileTranslatorFileTypeError(LookupError):
    """raise this when there's a filetype mismatch derived from h5 file"""


def load_radar_db(radar_db_path):
    """Load and return the radar database

    Output dict sample (wmo code is used as key):
    {
        11038: {'number': '1209', 'country': 'Austria', 'countryid': 'LOWM41', 'oldcountryid': 'OS41', 'wmocode': '11038', 'odimcode': 'atrau', 'location': 'Wien/Schwechat', 'status': '1', 'latitude': '48.074', 'longitude': '16.536', 'heightofstation': ' ', 'band': 'C', 'doppler': 'Y', 'polarization': 'D', 'maxrange': '224', 'startyear': '1978', 'heightantenna': '224', 'diametrantenna': ' ', 'beam': ' ', 'gain': ' ', 'frequency': '5.625', 'single_rrr': 'Y', 'composite_rrr': 'Y', 'wrwp': 'Y'},
        11052: {'number': '1210', 'country': 'Austria', 'countryid': 'LOWM43', 'oldcountryid': 'OS43', 'wmocode': '11052', 'odimcode': 'atfel', 'location': 'Salzburg/Feldkirchen', 'status': '1', 'latitude': '48.065', 'longitude': '13.062', 'heightofstation': ' ', 'band': 'C', 'doppler': 'Y', 'polarization': 'D', 'maxrange': '224', 'startyear': '1992', 'heightantenna': '581', 'diametrantenna': ' ', 'beam': ' ', 'gain': ' ', 'frequency': '5.6', 'single_rrr': 'Y', 'composite_rrr': ' ', 'wrwp': ' '},
        ...
    }
    """
    with open(radar_db_path, mode="r") as f:
        radar_db_json = json.load(f)
    radar_db = {}
    # Reorder list to a usable dict with sub dicts which we can search with wmo codes
    for radar_dict in radar_db_json:
        try:
            wmo_code = int(radar_dict.get("wmocode"))
            radar_db.update({wmo_code: radar_dict})
        except Exception:  # Happens when there is for ex. no wmo code.
            pass
    return radar_db


def translate_wmo_odim(radar_db, wmo_code):
    """ """
    if not isinstance(wmo_code, int):
        raise ValueError("Expecting a wmo_code [int]")
    else:
        pass
    odim_code = (
        radar_db.get(wmo_code).get("odimcode").upper().strip()
    )  # Apparently, people sometimes forget to remove whitespace..
    return odim_code


def extract_wmo_code(in_path):
    with h5py.File(in_path, mode="r") as f:
        # DWD Specific
        # Main attributes
        what = f["what"].attrs
        # Source block
        source = what.get("source")
        source = source.decode("utf-8")
        # Determine if we are dealing with a WMO code or with an ODIM code set
        # Example from Germany where source block is set as WMO
        # what/source: "WMO:10103"
        # Example from The Netherlands where source block is set as a combination of ODIM and various codes
        # what/source: RAD:NL52,NOD:nlhrw,PLC:Herwijnen
        source_list = source.split(sep=",")
    wmo_code = [string for string in source_list if "WMO" in string]
    # Determine if we had exactly one WMO hit
    if len(wmo_code) == 1:
        wmo_code = wmo_code[0]
        wmo_code = wmo_code.replace("WMO:", "")
    # No wmo code found, most likeley dealing with a dutch radar
    elif len(wmo_code) == 0:
        rad_str = [string for string in source_list if "RAD" in string]

        if len(rad_str) == 1:
            rad_str = rad_str[0]
        else:
            print(
                "Something went wrong with determining the rad_str and it wasnt WMO either, exitting"
            )
            sys.exit(1)
        # Split the rad_str
        rad_str_split = rad_str.split(":")
        # [0] = RAD, [1] = rad code
        rad_code = rad_str_split[1]

        rad_codes = {"NL52": "6356", "NL51": "6234", "NL50": "6260"}

        wmo_code = rad_codes.get(rad_code)
    return int(wmo_code)


def translate_knmi_filename(in_path_h5):
    wmo_code = extract_wmo_code(in_path_h5)
    odim_code = translate_wmo_odim(radar_db, wmo_code)
    with h5py.File(in_path_h5, mode="r") as f:
        what = f["what"].attrs
        # Date block
        date = what.get("date")
        date = date.decode("utf-8")
        # Time block
        time = what.get("time")
        # time = f['dataset1/what'].attrs['endtime']
        time = time.decode("utf-8")
        hh = time[:2]
        mm = time[2:4]
        ss = time[4:]
        time = time[:-2]  # Do not include seconds
        # File type
        filetype = what.get("object")
        filetype = filetype.decode("utf-8")
        if filetype != "PVOL":
            raise FileTranslatorFileTypeError("File type was NOT pvol")
    name = [
        odim_code,
        filetype.lower(),
        date + "T" + time,
        str(wmo_code) + ".h5",
    ]
    ibed_fname = "_".join(name)
    return ibed_fname


def knmi_to_odim(in_fpath, out_fpath):
    """
    Converter usage:
    Usage: KNMI_vol_h5_to_ODIM_h5 ODIM_file.h5 KNMI_input_file.h5

    Returns out_fpath and returncode
    """
    converter = "/opt/radar/vol2bird/bin/./KNMI_vol_h5_to_ODIM_h5"
    command = [converter, out_fpath, in_fpath]
    proc = subprocess.run(command, stderr=subprocess.PIPE)
    output = proc.stderr.decode("utf-8")
    returncode = int(proc.returncode)
    return (out_fpath, returncode, output)


print(f"{knmi_pvol_paths=}")
odim_pvol_paths = []
radar_db = load_radar_db(conf_local_radar_db)
for knmi_path in knmi_pvol_paths:
    out_path_pvol_odim = pathlib.Path(knmi_path.replace("knmi", "odim"))
    print(f"{knmi_path=}")
    print(f"{out_path_pvol_odim=}")
    if not out_path_pvol_odim.parent.exists():
        out_path_pvol_odim.parent.mkdir(parents=True, exist_ok=False)
    converter_results = knmi_to_odim(
        in_fpath=str(knmi_path), out_fpath=str(out_path_pvol_odim)
    )
    print(f"{converter_results=}")
    if param_clean_knmi_input:
        pathlib.Path(knmi_path).unlink()
        if not any(pathlib.Path(knmi_path).parent.iterdir()):
            pathlib.Path(knmi_path).parent.rmdir()
    # Determine name for our convention
    ibed_pvol_name = translate_knmi_filename(in_path_h5=out_path_pvol_odim)
    out_path_pvol_odim_tce = pathlib.Path(out_path_pvol_odim).parent.joinpath(
        ibed_pvol_name
    )
    shutil.move(src=out_path_pvol_odim, dst=out_path_pvol_odim_tce)
    odim_pvol_paths.append(out_path_pvol_odim_tce)

print(f"{odim_pvol_paths=}")
if str2bool(param_upload_results):
    # Minio version
    from minio import Minio

    minioClient = Minio(
        endpoint=param_minio_endpoint,
        access_key=param_minio_access_key,
        secret_key=param_minio_secret_key,
        secure=True,
    )
    print(f"Uploading results to {param_minio_user_pvol_output_prefix}")
    for odim_pvol_path in odim_pvol_paths:
        odim_pvol_path = pathlib.Path(odim_pvol_path)
        local_pvol_storage = pathlib.Path(conf_local_odim)
        relative_path = odim_pvol_path.relative_to(local_pvol_storage)
        remote_odim_pvol_path = Path(
            param_minio_user_pvol_output_prefix
        ).joinpath(relative_path)
        # check if this exists
        exists = False
        try:
            _ = minioClient.stat_object(
                bucket=conf_minio_user_bucket_name,
                prefix=remote_odim_pvol_path.as_posix(),
            )
            exists = True
        except:
            pass
        if not exists:
            print(f"Uploading {odim_pvol_path} to {remote_odim_pvol_path}")
            with open(odim_pvol_path, mode="rb") as file_data:
                file_stat = os.stat(odim_pvol_path)
                minioClient.put_object(
                    bucket_name=conf_minio_user_bucket_name,
                    object_name=remote_odim_pvol_path.as_posix(),
                    data=file_data,
                    length=file_stat.st_size,
                )
        else:
            print(f"{remote_odim_pvol_path} exists, skipping ")
    print("Finished uploading results")
# cast to string to not break json serializer
odim_pvol_paths = [path.as_posix() for path in odim_pvol_paths]