# Niedersachsen

Every federal state is represented by its own input directory and is processed into a NUTS level 2 directory containing a sub-folder for each discharge location. These folder names are derived from NUTS and reflect the CAMELS id. The NUTS level 2 code for Niedersachsen is `DE9`.

To pre-process the data, you need to write (at least) two functions. One should extract all metadata and condense it into a single `pandas.DataFrame`. This is used to build the folder structure and derive the ids.
The second function has to take an id, as provided by the state authorities, called `provider_id` and return a `pandas.DataFrame` with the transformed data. The dataframe needs the three columns `['date', 'q' | 'w', 'flag']`.

For easier and unified output handling, the `camelsp` package contains a context object called `Bundesland`. It takes a number of names and abbreviations to identify the correct federal state and returns an object that holds helper and save functions.

The context saves files as needed and can easily be changed to save files with different strategies, ie. fill missing data with NaN, merge data into a single file, create files for each variable or pack everything together into a netcdf.

In [1]:
import pandas as pd
import numpy as np
import geopandas as gpd
import os
import zipfile
from tqdm import tqdm
from typing import List
import warnings
import collections

from camelsp import Bundesland

The context can also be instantiated as any regular Python class, ie. to load only the default input data path, that we will user later.

In [2]:
# the context also makes the input path available, if camelsp was install locally
BASE = Bundesland('niedersachsen').input_path
BASE

'/home/camel/camelsp/input_data/Q_and_W/NiS_Niedersachsen'

## Parse data

Niedersachen produced only one file. I guess this needs to be pivoted.

In [3]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
q_raw = dd.read_csv(os.path.join(BASE, 'exp-peg-par252.csv'), encoding='latin1', sep=';', decimal=',', 
                    parse_dates=['DATUM'], date_format='%d.%m.%y', blocksize=4e6)

with ProgressBar():
    q_raw = q_raw.compute()

q_raw

In a future release, Dask DataFrame will use new implementation that
contains several improvements including a logical query planning.
The user-facing DataFrame API will remain unchanged.

The new implementation is already available and can be enabled by
installing the dask-expr library:

    $ pip install dask-expr

and turning the query planning option on:

    >>> import dask
    >>> dask.config.set({'dataframe.query-planning': True})
    >>> import dask.dataframe as dd

API documentation for the new implementation is available at
https://docs.dask.org/en/stable/dask-expr-api.html

Any feedback can be reported on the Dask issue tracker
https://github.com/dask/dask/issues 

  import dask.dataframe as dd


[########################################] | 100% Completed | 14.48 s


Unnamed: 0,MESSSTELLE_NR,DATUM,LANGNAME,BEZEICHNUNG,KENNUNG_ID,WERT,EINHEIT
0,3183101,1985-01-10,Sudendorf,Abfluss Tagesmittelwert,,0.853,m³/s
1,3183101,1985-01-11,Sudendorf,Abfluss Tagesmittelwert,,0.853,m³/s
2,3183101,1985-01-12,Sudendorf,Abfluss Tagesmittelwert,,0.853,m³/s
3,3183101,1985-01-13,Sudendorf,Abfluss Tagesmittelwert,,0.772,m³/s
4,3183101,1987-09-07,Sudendorf,Abfluss Tagesmittelwert,,0.938,m³/s
...,...,...,...,...,...,...,...
55770,3183101,1985-01-05,Sudendorf,Abfluss Tagesmittelwert,,1.030,m³/s
55771,3183101,1985-01-06,Sudendorf,Abfluss Tagesmittelwert,,1.030,m³/s
55772,3183101,1985-01-07,Sudendorf,Abfluss Tagesmittelwert,,1.800,m³/s
55773,3183101,1985-01-08,Sudendorf,Abfluss Tagesmittelwert,,1.200,m³/s


In [4]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
w_raw = dd.read_csv(os.path.join(BASE, 'exp-peg-par253.csv'), encoding='latin1', sep=';', decimal=',', 
                    parse_dates=['DATUM'], date_format='%d.%m.%y', blocksize=4e6)

with ProgressBar():
    w_raw = w_raw.compute()

w_raw

[                                        ] | 0% Completed | 110.74 ms

[########################################] | 100% Completed | 14.85 s


Unnamed: 0,MESSSTELLE_NR,DATUM,LANGNAME,BEZEICHNUNG,KENNUNG_ID,WERT,EINHEIT
0,3183101,1983-05-28,Sudendorf,Wasserstand Tagesmittelwert,,112,cm
1,3183101,1983-05-29,Sudendorf,Wasserstand Tagesmittelwert,,110,cm
2,3183101,1983-05-30,Sudendorf,Wasserstand Tagesmittelwert,,109,cm
3,3183101,1983-05-31,Sudendorf,Wasserstand Tagesmittelwert,,108,cm
4,3183101,1983-06-01,Sudendorf,Wasserstand Tagesmittelwert,,106,cm
...,...,...,...,...,...,...,...
56197,4964115,2014-12-27,Düste,Wasserstand Tagesmittelwert,,163,cm
56198,4964115,2014-12-28,Düste,Wasserstand Tagesmittelwert,,160,cm
56199,4964115,2014-12-29,Düste,Wasserstand Tagesmittelwert,,158,cm
56200,4964115,2014-12-30,Düste,Wasserstand Tagesmittelwert,,158,cm


### There are negative values

We replace a value of -0.777 / -777 with NaN and keep the rest.

In [5]:
print(q_raw[q_raw['WERT'] < 0].WERT.value_counts())
print(w_raw[w_raw['WERT'] < 0].WERT.value_counts())

WERT
-0.777    28860
-0.078        1
-0.058        1
-0.026        1
-0.021        1
-0.002        1
-0.037        1
-0.034        1
-0.030        1
-0.011        1
Name: count, dtype: int64
WERT
-777    20483
-2         45
-4         39
-1         35
-3         31
-5         20
-7         19
-6         16
-8         12
-9          7
-12         4
-10         3
-11         3
-16         2
-15         1
-13         1
-78         1
-58         1
-26         1
-21         1
-52         1
Name: count, dtype: int64


In [6]:
# replace negative values with NaN
q_raw.loc[q_raw['WERT'] == -0.777, 'WERT'] = np.nan
w_raw.loc[w_raw['WERT'] == -777, 'WERT'] = np.nan

### There are also dates in the future like 2060

This is because year is given with two digits, like `68` -> this should be 1968, but pandas interpreted it for 2068.  
So we substract 100 years from all dates > 2023 

In [7]:
q_raw[q_raw['DATUM'] > '2023-01-01']

Unnamed: 0,MESSSTELLE_NR,DATUM,LANGNAME,BEZEICHNUNG,KENNUNG_ID,WERT,EINHEIT
20560,3449103,2067-12-10,Hesselte,Abfluss Tagesmittelwert,,4.98,m³/s
20561,3449103,2067-12-11,Hesselte,Abfluss Tagesmittelwert,,4.07,m³/s
20562,3449103,2067-12-12,Hesselte,Abfluss Tagesmittelwert,,3.88,m³/s
20563,3449103,2067-12-13,Hesselte,Abfluss Tagesmittelwert,,4.07,m³/s
20564,3449103,2067-12-14,Hesselte,Abfluss Tagesmittelwert,,8.08,m³/s
...,...,...,...,...,...,...,...
39602,3881127,2066-04-25,Kampe,Abfluss Tagesmittelwert,,,m³/s
39604,3881127,2067-12-12,Kampe,Abfluss Tagesmittelwert,,,m³/s
39611,3881127,2060-12-07,Kampe,Abfluss Tagesmittelwert,,,m³/s
39612,3881127,2067-02-11,Kampe,Abfluss Tagesmittelwert,,,m³/s


In [8]:
# if year > 2023, subtract 100 years
def fix_date(date):
    if date.year > 2023:
        return date.replace(year=date.year-100)
    else:
        return date
    
q_raw['DATUM'] = q_raw['DATUM'].apply(fix_date)
w_raw['DATUM'] = w_raw['DATUM'].apply(fix_date)

We cannot merge q_raw and w_raw as we would need 500GB of RAM for that, nice!

In [9]:
# id column is MESSSTELLE_NR
id_column = 'MESSSTELLE_NR'

In [10]:
# How many different variables are there?
names = []
for _, df in q_raw.groupby(id_column):
    names.extend(df.BEZEICHNUNG.unique().tolist())
for _, df in w_raw.groupby(id_column):
    names.extend(df.BEZEICHNUNG.unique().tolist())
set(names)

{'Abfluss Tagesmittelwert', 'Wasserstand Tagesmittelwert'}

In [11]:
# total messstellen
N_q = len(q_raw.groupby(id_column))
N_w = len(w_raw.groupby(id_column))
print(f"Messstellen Q: {N_q}\nMessstellen W: {N_w}")

Messstellen Q: 282
Messstellen W: 280


In [12]:
# print Messstellen in q but not in w and vice versa
q_ids = set(q_raw[id_column].unique().tolist())
w_ids = set(w_raw[id_column].unique().tolist())
print(f"Q but not W: {q_ids - w_ids}")
print(f"W but not Q: {w_ids - q_ids}")

Q but not W: {5945124, 5648107, 4884108, 4364109, 4888110, 5945139, 4961107, 5934165, 9286139, 4961177, 5946106, 4922107, 4922109, 4821118}
W but not Q: {9371101, 5934145, 4948130, 4887101, 4961130, 5985101, 4781106, 9286164, 3942101, 5986107, 4994109, 3983102}


New create a list of collected 'metadata' and the actual discharge data. 

Extract  all metadata for this federal state, without using the `Bundesland` context and then later use the context to pass extracted metadata. The Context has a function for saving *raw* metadata, that takes a `pandas.DataFrame` and needs you to identify the id column.
Here, *raw* refers to provider metadata, that has not yet been transformed into the CAMELS-de Metadata schema.

In [13]:
# result container for both q and w
qw_meta_data = collections.defaultdict(dict)

# go for q
for nr, df in tqdm(q_raw.groupby(id_column)):
    qw_meta_data[str(nr)]['q_meta'] = {
        id_column: str(nr),
        'Q_BEZEICHNUNG': df.BEZEICHNUNG.unique().tolist(),
        'Q_EINHEIT': df.EINHEIT.unique().tolist(),
        'Q_LANGNAME': df.LANGNAME.unique().tolist(),
        'Q_KENNUNG_ID': df.KENNUNG_ID.unique().tolist()
    }
    qw_meta_data[str(nr)]['q_data'] = pd.DataFrame({
        'date': df.DATUM,
        'q': df.WERT,
        'q_flag': np.NaN
    })
    if nr == 3449103:
        print(qw_meta_data[str(nr)]['q_meta'])
        print(qw_meta_data[str(nr)]['q_data'])

# go for w
for nr, df in tqdm(w_raw.groupby(id_column)):
    qw_meta_data[str(nr)]['w_meta'] = {
        id_column: str(nr),
        'W_BEZEICHNUNG': df.BEZEICHNUNG.unique().tolist(),
        'W_EINHEIT': df.EINHEIT.unique().tolist(),
        'W_LANGNAME': df.LANGNAME.unique().tolist(),
        'W_KENNUNG_ID': df.KENNUNG_ID.unique().tolist()
    }
    qw_meta_data[str(nr)]['w_data'] = pd.DataFrame({
        'date': df.DATUM,
        'w': df.WERT,
        'w_flag': np.NaN
    })


print(f"Extracted {len(qw_meta_data)} timeseries")


  0%|          | 0/282 [00:00<?, ?it/s]

 21%|██▏       | 60/282 [00:00<00:01, 168.31it/s]

{'MESSSTELLE_NR': '3449103', 'Q_BEZEICHNUNG': ['Abfluss Tagesmittelwert'], 'Q_EINHEIT': ['m³/s'], 'Q_LANGNAME': ['Hesselte'], 'Q_KENNUNG_ID': [nan]}
            date      q  q_flag
19741 1980-04-07  3.760     NaN
19742 1980-04-08  3.670     NaN
19743 1980-04-09  3.840     NaN
19744 1980-04-10  3.760     NaN
19745 1980-04-11  3.540     NaN
...          ...    ...     ...
12543 2014-12-27  3.655     NaN
12544 2014-12-28  3.396     NaN
12545 2014-12-29  3.269     NaN
12546 2014-12-30  3.282     NaN
12547 2014-12-31  3.313     NaN

[22341 rows x 3 columns]


100%|██████████| 282/282 [00:01<00:00, 217.92it/s]
100%|██████████| 280/280 [00:01<00:00, 216.39it/s]

Extracted 294 timeseries





DATUM column is completely messed up, as dates are randomly shuffled, we have to sort by date.

In [14]:
for nr in qw_meta_data.keys():
    # check if key q_data exists
    if qw_meta_data[nr].get('q_data', None) is not None:
        qw_meta_data[nr]['q_data'].sort_values(by='date', inplace=True)
        qw_meta_data[nr]['q_data'].reset_index(drop=True, inplace=True)
    # check if key w_data exists
    if qw_meta_data[nr].get('w_data', None) is not None:
        qw_meta_data[nr]['w_data'].sort_values(by='date', inplace=True)
        qw_meta_data[nr]['w_data'].reset_index(drop=True, inplace=True)


Now merge q and w data and metadata

In [15]:
# now loop over dict and merge q and w data and metadata
qw_meta_data_merged = collections.defaultdict(dict)

for nr in tqdm(qw_meta_data.keys()):
    # check if q and w exist for this id
    if len(qw_meta_data[nr].keys()) == 4:
        # merge data
        qw_meta_data_merged[nr]['data'] = pd.merge(
            qw_meta_data[nr]['q_data'], 
            qw_meta_data[nr]['w_data'], 
            how='outer', 
            on='date'
        )
        
        # merge metadata
        qw_meta_data_merged[nr]['meta'] = {
            **qw_meta_data[nr]['q_meta'],
            **qw_meta_data[nr]['w_meta']
        }
    else:
        # only q or w data exists
        if qw_meta_data[nr].get('q_data', None) is not None:
            qw_meta_data_merged[nr]['data'] = qw_meta_data[nr]['q_data']
            qw_meta_data_merged[nr]['meta'] = qw_meta_data[nr]['q_meta']
        elif qw_meta_data[nr].get('w_data', None) is not None:
            qw_meta_data_merged[nr]['data'] = qw_meta_data[nr]['w_data']
            qw_meta_data_merged[nr]['meta'] = qw_meta_data[nr]['w_meta']
        else:
            raise ValueError(f"Neither q nor w data exists for {nr}")


100%|██████████| 294/294 [00:01<00:00, 285.90it/s]


In [16]:
# check if LANGNAME and KENNUNG_ID are the same for q and w
for nr in qw_meta_data_merged.keys():
    # only check if q and w both exist for this id
    if 'Q_BEZEICHNUNG' in qw_meta_data_merged[nr]['meta'].keys() and 'W_BEZEICHNUNG' in qw_meta_data_merged[nr]['meta'].keys():
        # check if LANGNAME is the same for q and w
        if qw_meta_data_merged[nr]['meta']['Q_LANGNAME'] != qw_meta_data_merged[nr]['meta']['W_LANGNAME']:
            print(f"LANGNAME not equal for {nr}")
        # check if KENNUNG_ID is the same or both nan for q and w
        if str(qw_meta_data_merged[nr]['meta']['Q_KENNUNG_ID']) != str(qw_meta_data_merged[nr]['meta']['W_KENNUNG_ID']):
            print(f"KENNUNG_ID not equal for {nr}")

`Q_LANGNAME` & `W_LANGNAME` and `Q_KENNUNG_ID` & `W_KENNUNG_ID` are always the same, so we rename to `LANGNAME` and `KENNUNG_ID`

In [17]:
data_merged = collections.defaultdict(dict)

for nr, old_dict in qw_meta_data_merged.items():
    data_merged[nr] = {
        # data is the same
        'data': old_dict['data'],
        'meta': {
            id_column: old_dict['meta'][id_column],
            # shared metadata
            'LANGNAME': old_dict['meta'].get('Q_LANGNAME', old_dict['meta'].get('W_LANGNAME', np.NaN)),
            'KENNUNG_ID': old_dict['meta'].get('Q_KENNUNG_ID', old_dict['meta'].get('W_KENNUNG_ID', np.NaN)),
            # q metadata
            'Q_EINHEIT': old_dict['meta'].get('Q_EINHEIT', np.NaN),
            'Q_BEZEICHNUNG': old_dict['meta'].get('Q_BEZEICHNUNG', np.NaN),
            # w metadata
            'W_EINHEIT': old_dict['meta'].get('W_EINHEIT', np.NaN),
            'W_BEZEICHNUNG': old_dict['meta'].get('W_BEZEICHNUNG', np.NaN)
        }
    }

data_merged['3449103']

{'data':             date      q  q_flag     w  w_flag
 0     1956-11-01  6.870     NaN   NaN     NaN
 1     1956-11-02  5.180     NaN   NaN     NaN
 2     1956-11-03  4.470     NaN   NaN     NaN
 3     1956-11-04  4.470     NaN   NaN     NaN
 4     1956-11-05  5.520     NaN   NaN     NaN
 ...          ...    ...     ...   ...     ...
 22336 2017-12-27  4.637     NaN  54.0     NaN
 22337 2017-12-28  4.526     NaN  53.0     NaN
 22338 2017-12-29  4.411     NaN  51.0     NaN
 22339 2017-12-30  4.685     NaN  54.0     NaN
 22340 2017-12-31  5.414     NaN  61.0     NaN
 
 [22341 rows x 5 columns],
 'meta': {'MESSSTELLE_NR': '3449103',
  'LANGNAME': ['Hesselte'],
  'KENNUNG_ID': [nan],
  'Q_EINHEIT': ['m³/s'],
  'Q_BEZEICHNUNG': ['Abfluss Tagesmittelwert'],
  'W_EINHEIT': ['cm'],
  'W_BEZEICHNUNG': ['Wasserstand Tagesmittelwert']}}

Now transform everything back to lists of `data` and `meta` to make it compatible to Mirkos old code (before we got W data).

In [18]:
# make lists of meta and data
meta = []
data = []

for nr in data_merged.keys():
    meta.append(data_merged[nr]['meta'])
    data.append(data_merged[nr]['data'])

### metadata

Ok, let's get really wild. Check that the code above produced only lists of 1 unique value per group. Otherwise the metadata would change over time for the same Messstelle and that would be a problem

In [19]:
def tidy_metadata(meta: List[dict]) -> pd.DataFrame:
    pmeta = []
    for i, m in enumerate(meta):
        out = {}
        for k, v in m.items():
            if isinstance(v, list):
                if len(v) == 1:
                    out[k] = v[0]
                else:
                    warnings.warn(f"Line {i + 1}: More than one value found for {k}: [{', '.join(v)}]")
            else:
                out[k] = v
        pmeta.append(out)
    return pd.DataFrame(pmeta)

tidy_metadata(meta)

Unnamed: 0,MESSSTELLE_NR,LANGNAME,KENNUNG_ID,Q_EINHEIT,Q_BEZEICHNUNG,W_EINHEIT,W_BEZEICHNUNG
0,3183101,Sudendorf,,m³/s,Abfluss Tagesmittelwert,cm,Wasserstand Tagesmittelwert
1,3346103,Schwege,,m³/s,Abfluss Tagesmittelwert,cm,Wasserstand Tagesmittelwert
2,3437108,Beesten,,m³/s,Abfluss Tagesmittelwert,cm,Wasserstand Tagesmittelwert
3,3445100,Spelle,,m³/s,Abfluss Tagesmittelwert,cm,Wasserstand Tagesmittelwert
4,3449100,Spelle,,m³/s,Abfluss Tagesmittelwert,cm,Wasserstand Tagesmittelwert
...,...,...,...,...,...,...,...
289,5934145,Jeetzel UW,,,,cm,Wasserstand Tagesmittelwert
290,5985101,Bremervörde,,,,cm,Wasserstand Tagesmittelwert
291,5986107,Hollen,,,,cm,Wasserstand Tagesmittelwert
292,9286164,Laar,,,,cm,Wasserstand Tagesmittelwert


### Finally run

Now, the Q and W data can be extracted along with the metadata. The cool thing is, that all the id creation, data creation, merging and the mapping from our ids to the original ids and files is done by the context. This is helpful, as we less likely screw something up.

In [20]:
N = len(meta)

with Bundesland('Niedersachsen') as bl:
    # catch warnings
    with warnings.catch_warnings(record=True) as warns:
        # tidy the metadata
        metadata = tidy_metadata(meta)

        # save the metadata
        bl.save_raw_metadata(metadata, id_column, overwrite=True)

        # for reference, call the nuts-mapping as table
        nuts_map = bl.nuts_table
        print(nuts_map.head())
    
        # go for all ids
        for meta, df in tqdm(zip(meta, data), total=N):
            # get the id
            provider_id = meta[id_column]

            # save
            bl.save_timeseries(df, provider_id)
        
        # check if there were warnings (there are warnings)
        if len(warns) > 0:
            log_path = bl.save_warnings(warns)
            print(f"There were warnings during the processing. The log can be found at: {log_path}")


    nuts_id provider_id                              path
0  DE910000     3183101  ./DE9/DE910000/DE910000_data.csv
1  DE910010     3346103  ./DE9/DE910010/DE910010_data.csv
2  DE910020     3437108  ./DE9/DE910020/DE910020_data.csv
3  DE910030     3445100  ./DE9/DE910030/DE910030_data.csv
4  DE910040     3449100  ./DE9/DE910040/DE910040_data.csv


  0%|          | 0/294 [00:00<?, ?it/s]

100%|██████████| 294/294 [00:17<00:00, 17.06it/s]


## Further Metadata

As the data files do not contain much metadata, we use the shapefile provided by Landesamt to add more metadata to the raw_metadata Niedersachsen file.

In [23]:
# unzip shapefile
if not os.path.exists(os.path.join(BASE, '../../Shapes/Niedersachsen_Shapes/EZG_Pegel_NWLKN/EZG_Pegel_NWLKN.shp')):
    with zipfile.ZipFile(os.path.join(BASE, '../../Shapes/Niedersachsen_Shapes/EZG_Pegel_NWLKN.zip'), 'r') as zip_ref:
        zip_ref.extractall(os.path.join(BASE, '../../Shapes/Niedersachsen_Shapes/EZG_Pegel_NWLKN/'))

In [24]:
# read raw metadata that was produced above
raw_metadata = pd.read_csv(os.path.join(Bundesland('DE9').base_path, 'raw_metadata/DE9_raw_metadata.csv'))

# read Landesamt shapefile
gdf = gpd.read_file(os.path.join(BASE, '../../Shapes/Niedersachsen_Shapes/EZG_Pegel_NWLKN/EZG_Pegel_NWLKN.shp'))
gdf['PEGELID'] = gdf['PEGELID'].astype(int)

# transform area from sqm to sqkm
gdf['SHAPE_STAr'] = gdf['SHAPE_STAr'] / 1e6

# merge raw metadata with gdf based on MESSSTELLE_NR and PEGELID
merged = pd.merge(
    raw_metadata, 
    gdf, 
    how='left', 
    left_on=['MESSSTELLE_NR'], 
    right_on=['PEGELID']
)

# save
merged.to_csv(os.path.join(Bundesland('DE9').base_path, 'raw_metadata/DE9_raw_metadata.csv'), index=False)