
# RADKLIM YW download and upload to metacatalog, including creation of metadata

This is the final solution, using `radolan_to_netcdf` for download and splitting the netCDF daily when uploading to metacatalog!

All available RADKLIM data: **2001 - 2022**

In [1]:
import tarfile
from glob import glob
import os

import tqdm
import xarray as xr
import numpy as np

import radolan_to_netcdf as rtn
#import cf

from metacatalog import api, ext


## Download data from DWD CDC server


In [2]:
%%time

!wget -q -P /data/qt7760/ --show-progress -r -np -A .tar -R "index.html*" https://opendata.dwd.de/climate_environment/CDC/grids_germany/5_minutes/radolan/reproc/2017_002/bin/

opendata.dwd.de/cli     [ <=>                ]   2.76K  --.-KB/s    in 0s      
opendata.dwd.de/cli     [ <=>                ]   1.77K  --.-KB/s    in 0s      
opendata.dwd.de/cli     [ <=>                ]   1.77K  --.-KB/s    in 0s      
opendata.dwd.de/cli     [ <=>                ]   1.77K  --.-KB/s    in 0s      
opendata.dwd.de/cli     [ <=>                ]   1.77K  --.-KB/s    in 0s      
opendata.dwd.de/cli     [ <=>                ]   1.77K  --.-KB/s    in 0s      
opendata.dwd.de/cli     [ <=>                ]   1.77K  --.-KB/s    in 0s      
opendata.dwd.de/cli     [ <=>                ]   1.77K  --.-KB/s    in 0s      
opendata.dwd.de/cli     [ <=>                ]   1.77K  --.-KB/s    in 0s      
opendata.dwd.de/cli     [ <=>                ]   1.77K  --.-KB/s    in 0s      
opendata.dwd.de/cli     [ <=>                ]   1.77K  --.-KB/s    in 0s      
opendata.dwd.de/cli     [ <=>                ]   1.77K  --.-KB/s    in 0s      
opendata.dwd.de/cli     [ <=>           

In [3]:
# delete (empty) folder /supplement
!rm -r /data/qt7760/opendata.dwd.de/climate_environment/CDC/grids_germany/5_minutes/radolan/reproc/2017_002/bin/supplement


## Read data from nested tar file

Data is provided as monthly tar files, which contains daily tar.gz files, which contain the 5-minute binary files. To avoid extracting everything first we use the nested loop-construct below and extract only the data we want on the fly.

**split daily**:

folder structure:
- radklim_yw/
    - 20100101_radklim_yw.nc
    - 20100102_radklim_yw.nc
    - ...
    - 20211230_radklim_yw.nc
    - 20211231_radklim_yw.nc

-> naming pattern: `%Y%m%d_radklim_yw.nc`

In [None]:
%%time
#from time import time

# absolute output_path to the folder radklim, where netCDF files are saved
output_path = "/data/qt7760/radklim_yw/"

# create folder radklim_yw
os.makedirs(output_path, exist_ok=True)

for year in sorted(glob('/data/qt7760/opendata.dwd.de/climate_environment/CDC/grids_germany/5_minutes/radolan/reproc/2017_002/bin/*')):
    print(f"Extracting data for the year {year[-4:]}")
    for month in tqdm.tqdm(sorted(glob(year + '/*'))):
        with tarfile.open(month, 'r') as tar:
            # fn_list: filenames of daily files ('YW2017.002_20010131.tar.gz')
            fn_list = sorted([f.name for f in tar.getmembers()])
            #fn_list = sorted(tar.getnames())

            # loop over daily files
            for fn in fn_list:
                f = tar.extractfile(fn)
                # create (empty) daily netCDF
                fn_netcdf = f"{output_path}/{fn[-15:-7]}_radklim_yw.nc"
                # t1 = time()
                rtn.create_empty_netcdf(fn=fn_netcdf, product_name='YW')
                # t2=time()
                #print(f"create_empty_netcdf: {time() - t1}")
                # daily files contain the 5-minute data (tar_inner)
                with tarfile.open(fileobj=f) as tar_inner:
                    # t1 = time()
                    # fn_list_inner = sorted(tar_inner.getnames())
                    # print(f"tar_inner.getnames: {time() - t1}")
                    # t1 = time()
                    fn_list_inner = sorted([f.name for f in tar_inner.getmembers()])
                    # print(f"tar_inner.getmembers: {time() - t1}")

                    for fn_inner in fn_list_inner:
                        # extract 5-minute data, append to previously created daily netCDF
                        # t1 = time()
                        data, metadata = rtn.read_in_one_bin_file(tar_inner.extractfile(fn_inner))
                        # print(f"read_in_one_bin_file: {time() - t1}")
                        # t1 = time()
                        rtn.append_to_netcdf(
                            fn_netcdf, 
                            data_list=[data, ], 
                            metadata_list=[metadata, ],
                        )
                        # print(f"append_to_netcdf: {time() - t1}")


Extracting data for the year 2001


  0%|          | 0/12 [00:00<?, ?it/s]

Wrap everything into a (restartable) function.

In [2]:
def tar2netcdf(input_path: str, output_path: str, if_exists: str, delete_last=False):
    """
    Untar DWD binary downloads and store as daily netCDF files under path.

    Parameters:
    ------
    input_path: str 
        path to the folder where binary DWD downloads are stored (yearly folders).
        Usually something like *"./opendata.dwd.de/climate_environment/CDC/grids_germany/5_minutes/radolan/reproc/2017_002/bin"*
    output_path: str
        where to store generated netCDF files
    if_exists: {fail, replace, skip}
        What to do if netcdf file already exists.
        If you use 'skip', it is probably a good idea to delete the last generated netCDF in the output_folder by hand to 
        generate this file again and make sure that the file is not corrupted due an interruption while last creation of the file.
    delete_last: bool
        Whether to delete the last generated file in the output_folder.
        This option makes only sense if your last call of tar2netcdf() got interrupted and you are not sure if the netCDF file 
        generated last was fully completed, so you can generate that netCDF file again.  
    """
    # create folder in output path
    os.makedirs(output_path, exist_ok=True)

    # get the absolute output_path to the folder radklim, where netCDF files are saved
    output_path = os.path.abspath(output_path)

    # delete last created netCDF file in output_path if delete_last == True
    if delete_last:
        existing_files = sorted(glob(f"{output_path}/*"))
        if len(existing_files) >= 1:
            os.remove(existing_files[-1])

    # loop over binary files
    for year in sorted(glob(f"{input_path}/*")):
        print(f"Extracting data for the year {year[-4:]}")
        for month in tqdm.tqdm(sorted(glob(year + '/*'))):
            with tarfile.open(month, 'r') as tar:
                # fn_list: filenames of daily files ('YW2017.002_20010131.tar.gz')
                fn_list = sorted([f.name for f in tar.getmembers()])

                # loop over daily files
                for fn in fn_list:
                    f = tar.extractfile(fn)

                    # netCDF file name
                    fn_netcdf = f"{output_path}/{fn[-15:-7]}_radklim_yw.nc"

                    if os.path.exists(fn_netcdf):
                        if if_exists == 'fail':
                            raise ValueError(f"netCDF file {output_path}/{fn[-15:-7]}_radklim_yw.nc already exists")
                        elif if_exists == 'skip':
                            continue
                        
                    # create (empty) daily netCDF                    
                    rtn.create_empty_netcdf(fn=fn_netcdf, product_name='YW')
                    
                    # daily files contain the 5-minute data (tar_inner)
                    with tarfile.open(fileobj=f) as tar_inner:
                        fn_list_inner = sorted([f.name for f in tar_inner.getmembers()])

                        for fn_inner in fn_list_inner:
                            # extract 5-minute data, append to previously created daily netCDF
                            data, metadata = rtn.read_in_one_bin_file(tar_inner.extractfile(fn_inner))
                            rtn.append_to_netcdf(
                                fn_netcdf, 
                                data_list=[data, ], 
                                metadata_list=[metadata, ],
                            )


Execute `tar2netcdf` to extract data, if process is interrupted, the process can be continued with parameters `if_exists='skip'` and `delete_last=True`.

In [None]:
tar2netcdf(input_path="/data/qt7760/opendata.dwd.de/climate_environment/CDC/grids_germany/5_minutes/radolan/reproc/2017_002/bin/",
           output_path="/data/qt7760/radklim_yw/",
           if_exists='skip',
           delete_last=True)

Extracting data for the year 2001


100%|██████████| 12/12 [00:00<00:00, 70.90it/s]


Extracting data for the year 2002


100%|██████████| 12/12 [00:00<00:00, 71.14it/s]


Extracting data for the year 2003


100%|██████████| 12/12 [00:00<00:00, 74.50it/s]


Extracting data for the year 2004


100%|██████████| 12/12 [00:00<00:00, 72.18it/s]


Extracting data for the year 2005


100%|██████████| 12/12 [00:00<00:00, 71.98it/s]


Extracting data for the year 2006


100%|██████████| 12/12 [00:00<00:00, 74.35it/s]


Extracting data for the year 2007


100%|██████████| 12/12 [00:00<00:00, 74.97it/s]


Extracting data for the year 2008


100%|██████████| 12/12 [00:00<00:00, 70.49it/s]


Extracting data for the year 2009


100%|██████████| 12/12 [00:00<00:00, 102.31it/s]


Extracting data for the year 2010


100%|██████████| 12/12 [00:00<00:00, 237.42it/s]


Extracting data for the year 2011


100%|██████████| 12/12 [00:00<00:00, 232.91it/s]


Extracting data for the year 2012


 50%|█████     | 6/12 [1:31:53<1:31:53, 918.95s/it]

Open data and check

In [12]:
ds = xr.open_mfdataset('/data/qt7760/radklim_yw/20010101_radklim_yw.nc')#, chunks='auto') # automatisch daily gechunked?
ds

Unnamed: 0,Array,Chunk
Bytes,7.55 MiB,7.55 MiB
Shape,"(1100, 900)","(1100, 900)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.55 MiB 7.55 MiB Shape (1100, 900) (1100, 900) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",900  1100,

Unnamed: 0,Array,Chunk
Bytes,7.55 MiB,7.55 MiB
Shape,"(1100, 900)","(1100, 900)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,7.55 MiB,7.55 MiB
Shape,"(1100, 900)","(1100, 900)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.55 MiB 7.55 MiB Shape (1100, 900) (1100, 900) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",900  1100,

Unnamed: 0,Array,Chunk
Bytes,7.55 MiB,7.55 MiB
Shape,"(1100, 900)","(1100, 900)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.06 GiB,1.06 GiB
Shape,"(288, 1100, 900)","(288, 1100, 900)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 1.06 GiB 1.06 GiB Shape (288, 1100, 900) (288, 1100, 900) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",900  1100  288,

Unnamed: 0,Array,Chunk
Bytes,1.06 GiB,1.06 GiB
Shape,"(288, 1100, 900)","(288, 1100, 900)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,576 B,576 B
Shape,"(288,)","(288,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray
"Array Chunk Bytes 576 B 576 B Shape (288,) (288,) Dask graph 1 chunks in 2 graph layers Data type int16 numpy.ndarray",288  1,

Unnamed: 0,Array,Chunk
Bytes,576 B,576 B
Shape,"(288,)","(288,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,int16 numpy.ndarray,int16 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.25 kiB,2.25 kiB
Shape,"(288,)","(288,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,object numpy.ndarray,object numpy.ndarray
"Array Chunk Bytes 2.25 kiB 2.25 kiB Shape (288,) (288,) Dask graph 1 chunks in 2 graph layers Data type object numpy.ndarray",288  1,

Unnamed: 0,Array,Chunk
Bytes,2.25 kiB,2.25 kiB
Shape,"(288,)","(288,)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,object numpy.ndarray,object numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.06 GiB,1.06 GiB
Shape,"(288, 1100, 900)","(288, 1100, 900)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 1.06 GiB 1.06 GiB Shape (288, 1100, 900) (288, 1100, 900) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",900  1100  288,

Unnamed: 0,Array,Chunk
Bytes,1.06 GiB,1.06 GiB
Shape,"(288, 1100, 900)","(288, 1100, 900)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.06 GiB,1.06 GiB
Shape,"(288, 1100, 900)","(288, 1100, 900)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 1.06 GiB 1.06 GiB Shape (288, 1100, 900) (288, 1100, 900) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",900  1100  288,

Unnamed: 0,Array,Chunk
Bytes,1.06 GiB,1.06 GiB
Shape,"(288, 1100, 900)","(288, 1100, 900)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.06 GiB,1.06 GiB
Shape,"(288, 1100, 900)","(288, 1100, 900)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 1.06 GiB 1.06 GiB Shape (288, 1100, 900) (288, 1100, 900) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",900  1100  288,

Unnamed: 0,Array,Chunk
Bytes,1.06 GiB,1.06 GiB
Shape,"(288, 1100, 900)","(288, 1100, 900)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [13]:
print('First time stamp in data: ' + str(ds.rainfall_amount.time.values[0]))
print('Last time stamp in data : ' + str(ds.rainfall_amount.time.values[-1]))

First time stamp in data: 2000-12-31T23:59:59.999999996
Last time stamp in data : 2001-01-01T23:55:00.000000004


In [103]:
#g = cf.read('data/radklim_yw/20010101_radklim_yw.nc')[0]
#g.dataset_compliance()

        Bad grid_mapping: 'RADOLAN_grid'
        Bad grid_mapping: 'RADOLAN_grid'
        Bad grid_mapping: 'RADOLAN_grid'
        Bad grid_mapping: 'RADOLAN_grid'


{'cluttermask': {'CF version': '1.9',
  'dimensions': ('time', 'y', 'x'),
  'non-compliance': {'RADOLAN_grid': [{'code': 150003,
     'attribute': {'cluttermask:grid_mapping': 'RADOLAN_grid'},
     'reason': 'Grid mapping variable is not in file'},
    {'code': 150003,
     'attribute': {'cluttermask:grid_mapping': 'RADOLAN_grid'},
     'reason': 'Grid mapping variable is not in file'}]}}}


## Metadata Creation

Create a metadata Entry for RADKLIM data in metacatalog.

In [9]:
UPLOAD = True
#CONNECTION = 'test'
CONNECTION = 'default'

session = api.connect_database(CONNECTION)
print('Using: %s' % session.bind)

Using: Engine(postgresql://postgres:***@vfw-db:5432/metacatalog-dev)


In [3]:
# check if the IO extension is activate
try:
    print(ext.extension('io'))
except AttributeError:
    ext.activate_extension('io', 'metacatalog.ext.io', 'IOExtension')
    from metacatalog.ext.io import IOExtension
    ext.extension('io', IOExtension)

<class 'metacatalog.ext.io.extension.IOExtension'>


#### Author

In [10]:
author = api.find_organisation(session, organisation_name='Deutscher Wetterdienst', return_iterator=True).first()

if author is None and UPLOAD:
    author = api.add_organisation(session, organisation_name='Deutscher Wetterdienst',
                              affiliation='DWD Climate Data Center (CDC)',
                              organisation_abbrev='DWD'
                              #attribution='Source: Deutscher Wetterdienst'
                              )

print(author)


Deutscher Wetterdienst (Org.) <ID=10>



#### Location

~~For now, we use the centroid of the raster (center of Germany) or just location POINT(0, 0), but this is not perfect and will be discussed in the future!~~  
Newest implementation: `location = NULL`, SQL view generates **centroid** from bbox (`datasource.spatial_extent`) and **geom** (RADOLAN grid / `datasource.spatial_extent`)

In [14]:
# get bounding box for spatial extent, use centroid as location
# bounding box
min_lon = float(ds.rainfall_amount.longitudes.min().values)
min_lat = float(ds.rainfall_amount.latitudes.min().values)
max_lon = float(ds.rainfall_amount.longitudes.max().values)
max_lat = float(ds.rainfall_amount.latitudes.max().values)

bbox=f"POLYGON(({min_lon} {min_lat},{min_lon} {max_lat},{max_lon} {max_lat},{max_lon} {min_lat}, {min_lon} {min_lat}))"
print(bbox)


POLYGON((3.0907961942336355 46.1834547755304,3.0907961942336355 55.77506293778408,17.095451751078592 55.77506293778408,17.095451751078592 46.1834547755304, 3.0907961942336355 46.1834547755304))


In [15]:
location = None # Datasource.spatial_scale.extent to locate data



#### License

`Open Data Commons` or better `GeoNutzV` (https://www.gesetze-im-internet.de/geonutzv/GeoNutzV.pdf, http://www.gesetze-im-internet.de/geonutzv/) if this is a license??

In [16]:
license = api.find_license(session, short_title='dl-by-de/2.0')[0]
# TODO db revision auf Maschine noch ausführen

print(license)

Data licence Germany – attribution – version 2.0 <ID=10>



#### Unit


In [17]:
unit = api.find_unit(session, name="kilogram")[0]

print(unit.name)

kilogram



#### Variable

`variable.column_names` only makes sense with timeseries data? oder?

In [18]:
variable = api.find_variable(session, name='rainfall amount', return_iterator=True).first()

if variable is None and UPLOAD:
    variable = api.add_variable(session, name="rainfall amount", symbol='P', unit=unit.id, column_names=['rainfall_amount'])

print(variable.name, variable.symbol)

rainfall amount P



#### Create Entry


In [19]:
entry = api.find_entry(session, title="Radar-based Precipitation Climatology Version 2017.002 (RADKLIM)", return_iterator=True).first()

if not entry and UPLOAD:
    entry = api.add_entry(session,
                          title="Radar-based Precipitation Climatology Version 2017.002 (RADKLIM)",
                          abstract="The data are results of the reprocessing version 2017.002 of the DWD radar-based precipitation climatology based on the RADOLAN method.",
                          location=location,
                          variable=variable.id,
                          citation="Quasi gauge-adjusted five-minute precipitation rate (YW): Winterrath, Tanja; Brendel, Christoph, Hafer, Mario; Junghänel, Thomas; Klameth, Anna; Lengfeld, Katharina; Walawender, Ewelina; Weigl, Elmar; Becker, Andreas (2018): RADKLIM Version 2017.002: Reprocessed quasi gauge-adjusted radar data, 5-minute precipitation sums (YW) DOI: 10.5676/DWD/RADKLIM_YW_V2017.002",
                          license=license,
                          author=author.id,
                          embargo=False,
                          is_partial=False
                          )

print(entry)

<ID=1099 Radar-based Precipit [rainfall amount] >



#### Details

from: https://opendata.dwd.de/climate_environment/CDC/grids_germany/daily/radolan/recent/bin/DESCRIPTION_gridsgermany-daily-radolan-recent-bin_en.pdf

In [20]:
if not entry.details and UPLOAD:
    details_dict = [
        {
            "key": "Spatial coverage",
            "value": "Gridded Precipitation Data for Germany"
        }
    ]

    # add details to entry
    api.add_details_to_entries(session, entry, details_dict)

entry.details_dict()

{'spatial coverag': {'id': 11960,
  'key': 'Spatial coverage',
  'stem': 'spatial coverag',
  'value': 'Gridded Precipitation Data for Germany',
  'entry_id': 1099,
  'entry_uuid': 'ffb2a3dd-9856-413e-b4d7-14921750f2b3'}}


#### Thesaurus


In [21]:
keyword = api.find_keyword(session, value='PRECIPITATION AMOUNT')[0]

if not entry.keywords and UPLOAD:
    api.add_keywords_to_entries(session, entry, keyword)

print(api.find_keyword(session, entry.keywords[0].keyword_id)[0].full_path)

EARTH SCIENCE > ATMOSPHERE > PRECIPITATION > PRECIPITATION AMOUNT



#### Data upload


In [22]:
ds_type = api.find_datasource_type(session, name='netCDF')[0]
ds_type.__dict__

# TODO: 'description': 'netCDF file source on the local file system.'
# TODO: 'id': 5 (oder so)

{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState at 0x7f2b20f1afb0>,
 'id': 5,
 'description': 'netCDF file source on the local file system.',
 'name': 'netCDF',
 'title': 'Local netCDF File'}

In [29]:
# path where netCDF files are stored
datasource_path = "/data/qt7760/radklim_yw"

if UPLOAD and not entry.datasource:
    # create datasource
    entry.create_datasource(type=ds_type.id, 
                            path=os.path.abspath(datasource_path),
                            datatype='raster',
                            commit=True,
                            engine='h5netcdf' # this is saved into column datasource.args
                            )

    # create temporal scale
    entry.datasource.create_scale(
        resolution='5min',
        extent=(str(ds.rainfall_amount.time.values[0]), str(np.datetime64('2021-12-31T23:59:59'))), # end time should be updated when netCDF creation is complete (not sure about exact value)
        support=1.0, # not sure
        scale_dimension='temporal',
        commit=True
    )

    # create spatial scale
    entry.datasource.create_scale(
        resolution=1000, # 1km grid 
        extent=bbox, 
        support=1.0,
        scale_dimension='spatial',
        commit=True
    )

    # entry.import_data(data='data/radklim_yw/')
    # TODO: import_data hier nicht nötig, Daten schon an richtigen Ort legen, import_data sollte später metadaten zu netcdf hinzufügen 

In [None]:
out_data = entry.get_data()
out_data

`entry.get_data()` **CRASHES**!!