Skip to content

Commit

Permalink
Add DWD Observation climate_urban datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Sep 5, 2022
1 parent 3ef7dbc commit 9a88fd2
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Expand Up @@ -7,6 +7,7 @@ Development
- Use lxml.iterparse to reduce memory consumption when parsing DWD Mosmix files
- Fix Settings object instantiation
- Change logging level for Settings.cache_disable to INFO
- Add DWD Observation climate_urban datasets

0.42.1 (25.08.2022)
*******************
Expand Down
22 changes: 12 additions & 10 deletions tests/provider/dwd/observation/test_io.py
Expand Up @@ -199,6 +199,7 @@ def test_request():
assert not df.empty


@pytest.mark.remote
def test_export_unknown():
"""Test export of DataFrame to unknown format"""
Settings.tidy = True
Expand All @@ -221,6 +222,7 @@ def test_export_unknown():
ex.match("Unknown export file type")


@pytest.mark.remote
def test_export_spreadsheet(tmpdir_factory):
"""Test export of DataFrame to spreadsheet"""
Settings.tidy = False
Expand Down Expand Up @@ -253,10 +255,10 @@ def test_export_spreadsheet(tmpdir_factory):
("station_id",),
("dataset",),
("date",),
("qn_3",),
("quality_wind",),
("wind_gust_max",),
("wind_speed",),
("qn_4",),
("quality",),
("precipitation_height",),
("precipitation_form",),
("sunshine_duration",),
Expand Down Expand Up @@ -358,10 +360,10 @@ def test_export_parquet(tmpdir_factory):
"station_id",
"dataset",
"date",
"qn_3",
"quality_wind",
"wind_gust_max",
"wind_speed",
"qn_4",
"quality",
"precipitation_height",
"precipitation_form",
"sunshine_duration",
Expand Down Expand Up @@ -423,10 +425,10 @@ def test_export_zarr(tmpdir_factory):
"station_id",
"dataset",
"date",
"qn_3",
"quality_wind",
"wind_gust_max",
"wind_speed",
"qn_4",
"quality",
"precipitation_height",
"precipitation_form",
"sunshine_duration",
Expand Down Expand Up @@ -487,10 +489,10 @@ def test_export_feather(tmpdir_factory):
"station_id",
"dataset",
"date",
"qn_3",
"quality_wind",
"wind_gust_max",
"wind_speed",
"qn_4",
"quality",
"precipitation_height",
"precipitation_form",
"sunshine_duration",
Expand Down Expand Up @@ -692,8 +694,8 @@ def test_export_influxdb1_tabular():
assert points[0]["measurement"] == "weather"
assert list(points[0]["tags"].keys()) == [
"station_id",
"qn_3",
"qn_4",
"quality_wind",
"quality",
"dataset",
]
assert list(points[0]["fields"].keys()) == [
Expand Down
4 changes: 2 additions & 2 deletions tests/ui/test_restapi.py
Expand Up @@ -247,8 +247,8 @@ def test_dwd_values_sql_tabular(dicts_are_same):
"precipitation_height": 0.0,
"pressure_air_site": 993.88,
"pressure_vapor": 4.6,
"qn_3": 10.0,
"qn_4": 3,
"quality_wind": 10.0,
"quality": 3,
"snow_depth": 0,
"station_id": "01048",
"sunshine_duration": 0.0,
Expand Down
47 changes: 33 additions & 14 deletions wetterdienst/core/scalar/values.py
Expand Up @@ -432,6 +432,9 @@ def query(self) -> Generator[ValuesResult, None, None]:
:return:
"""
# mapping of original to humanized parameter names is always the same
if self.sr.humanize:
hpm = self._create_humanized_parameters_mapping()

for station_id in self.sr.station_id:
# TODO: add method to return empty result with correct response string e.g.
Expand Down Expand Up @@ -535,7 +538,7 @@ def query(self) -> Generator[ValuesResult, None, None]:

# Assign meaningful parameter names (humanized).
if self.sr.humanize:
station_df = self._humanize(station_df)
station_df = self._humanize(station_df, hpm)

yield ValuesResult(stations=self.sr, df=station_df)

Expand Down Expand Up @@ -766,34 +769,50 @@ def all(self) -> ValuesResult:

return ValuesResult(stations=self.sr, df=df)

def _humanize(self, df: pd.DataFrame) -> pd.DataFrame:
def _humanize(self, df: pd.DataFrame, humanized_parameters_mapping: Dict[str, str]) -> pd.DataFrame:
"""
Method for humanizing parameters.
:param df:
:return:
:param df: pandas.DataFrame with original column names
:param humanized_parameters_mapping: mapping of original parameter names to humanized ones
:return: pandas.DataFrame with renamed columns
"""
hcnm = self._create_humanized_parameters_mapping()

if not self.sr.tidy:
df = df.rename(columns=hcnm)
df = df.rename(columns=humanized_parameters_mapping)
else:
df.loc[:, Columns.PARAMETER.value] = df[Columns.PARAMETER.value].cat.rename_categories(hcnm)
df.loc[:, Columns.PARAMETER.value] = (
df.loc[:, Columns.PARAMETER.value].map(humanized_parameters_mapping).astype("category")
)

return df

def _create_humanized_parameters_mapping(self) -> Dict[str, str]:
"""
Method for creation of parameter name mappings based on
self._parameter_base
Reduce the creation of parameter mapping of the massive amount of parameters
by specifying the resolution.
:return:
"""
hpm = {}
if self.sr._unique_dataset or not self.sr._has_datasets:
for parameter in self.sr._parameter_base[self.sr.resolution.name]:
try:
hpm[parameter.value.lower()] = parameter.name.lower()
except AttributeError:
pass
else:
datasets = [
dataset for dataset in self.sr._parameter_base[self.sr.resolution.name] if hasattr(dataset, "__name__")
]

for dataset in datasets:
for parameter in self.sr._parameter_base[self.sr.resolution.name][dataset.__name__]:
try:
hpm[parameter.value.lower()] = parameter.name.lower()
except AttributeError:
pass

return {
parameter.value: parameter.name.lower()
for parameter in self.sr.stations._parameter_base[self.sr.stations._dataset_accessor]
}
return hpm

@staticmethod
def _get_actual_percentage(df: pd.DataFrame) -> float:
Expand Down
11 changes: 8 additions & 3 deletions wetterdienst/provider/dwd/index.py
Expand Up @@ -14,7 +14,10 @@
DWD_SERVER,
DWDCDCBase,
)
from wetterdienst.provider.dwd.observation.metadata.dataset import DwdObservationDataset
from wetterdienst.provider.dwd.observation.metadata.dataset import (
DWD_URBAN_DATASETS,
DwdObservationDataset,
)
from wetterdienst.util.cache import CacheExpiry
from wetterdienst.util.network import list_remote_files_fsspec

Expand Down Expand Up @@ -68,5 +71,7 @@ def build_path_to_parameter(
Resolution.DAILY,
):
return f"{resolution.value}/{dataset.value}/"

return f"{resolution.value}/{dataset.value}/{period.value}/"
elif dataset in DWD_URBAN_DATASETS:
return f"{resolution.value}/{dataset.value[6:]}/{period.value}/"
else:
return f"{resolution.value}/{dataset.value}/{period.value}/"
10 changes: 1 addition & 9 deletions wetterdienst/provider/dwd/metadata/constants.py
Expand Up @@ -13,12 +13,4 @@

class DWDCDCBase(Enum):
CLIMATE_OBSERVATIONS = "observations_germany/climate/"


DWD_FOLDER_MAIN = "./dwd_data"
DWD_FOLDER_STATION_DATA = "station_data"
DWD_FILE_STATION_DATA = "dwd_station_data"
STATION_ID_REGEX = r"(?<!\d)\d{5}(?!\d)"
DATE_RANGE_REGEX = r"(?<!\d)\d{8}_\d{8}(?!\d)"
NA_STRING = "-999"
STATION_DATA_SEP = ";"
CLIMATE_URBAN_OBSERVATIONS = "observations_germany/climate_urban/"
4 changes: 3 additions & 1 deletion wetterdienst/provider/dwd/mosmix/api.py
Expand Up @@ -125,6 +125,8 @@ def query(self) -> Generator[ValuesResult, None, None]:
:return:
"""
hpm = self._create_humanized_parameters_mapping()

for df in self._collect_station_parameter():
df = self._coerce_parameter_types(df)

Expand All @@ -137,7 +139,7 @@ def query(self) -> Generator[ValuesResult, None, None]:
df = self._coerce_meta_fields(df)

if self.sr.humanize:
df = self._humanize(df)
df = self._humanize(df, hpm)

yield ValuesResult(stations=self.sr, df=df)

Expand Down
19 changes: 1 addition & 18 deletions wetterdienst/provider/dwd/observation/api.py
Expand Up @@ -4,7 +4,7 @@
import logging
from datetime import datetime, timezone
from itertools import repeat
from typing import Dict, List, Optional, Union
from typing import List, Optional, Union

import pandas as pd
from pandas import Timedelta, Timestamp
Expand Down Expand Up @@ -315,23 +315,6 @@ def _coerce_irregular_parameter(self, series: pd.Series) -> pd.Series:
"""
return pd.to_datetime(series, format=DatetimeFormat.YMDH_COLUMN_M.value)

def _create_humanized_parameters_mapping(self) -> Dict[str, str]:
"""
Reduce the creation of parameter mapping of the massive amount of parameters
by specifying the resolution.
:return:
"""
hpm = {}

for parameter in DwdObservationParameter[self.sr.resolution.name]:
try:
hpm[parameter.value] = parameter.name.lower()
except AttributeError:
pass

return hpm

def _get_historical_date_ranges(self, station_id: str, dataset: DwdObservationDataset) -> List[str]:
"""
Get particular files for historical data which for high resolution is
Expand Down
25 changes: 16 additions & 9 deletions wetterdienst/provider/dwd/observation/fileindex.py
Expand Up @@ -11,15 +11,17 @@
from wetterdienst.metadata.resolution import Resolution
from wetterdienst.provider.dwd.index import _create_file_index_for_dwd_server
from wetterdienst.provider.dwd.metadata.column_names import DwdColumns
from wetterdienst.provider.dwd.metadata.constants import (
DATE_RANGE_REGEX,
STATION_ID_REGEX,
DWDCDCBase,
)
from wetterdienst.provider.dwd.metadata.constants import DWDCDCBase
from wetterdienst.provider.dwd.metadata.datetime import DatetimeFormat
from wetterdienst.provider.dwd.observation.metadata.dataset import DwdObservationDataset
from wetterdienst.provider.dwd.observation.metadata.dataset import (
DWD_URBAN_DATASETS,
DwdObservationDataset,
)
from wetterdienst.provider.dwd.observation.metadata.resolution import HIGH_RESOLUTIONS

STATION_ID_REGEX = r"(?<!\d)\d{5}(?!\d)"
DATE_RANGE_REGEX = r"(?<!\d)\d{8}_\d{8}(?!\d)"


def create_file_list_for_climate_observations(
station_id: str,
Expand Down Expand Up @@ -53,23 +55,28 @@ def create_file_list_for_climate_observations(


def create_file_index_for_climate_observations(
parameter_set: DwdObservationDataset,
dataset: DwdObservationDataset,
resolution: Resolution,
period: Period,
) -> pd.DataFrame:
"""
Function (cached) to create a file index of the DWD station data. The file index
is created for an individual set of parameters.
Args:
parameter_set: parameter of Parameter enumeration
dataset: parameter of Parameter enumeration
resolution: time resolution of TimeResolution enumeration
period: period type of PeriodType enumeration
Returns:
file index in a pandas.DataFrame with sets of parameters and station id
"""
timezone_germany = timezone("Europe/Berlin")

file_index = _create_file_index_for_dwd_server(parameter_set, resolution, period, DWDCDCBase.CLIMATE_OBSERVATIONS)
if dataset in DWD_URBAN_DATASETS:
file_index = _create_file_index_for_dwd_server(
dataset, resolution, period, DWDCDCBase.CLIMATE_URBAN_OBSERVATIONS
)
else:
file_index = _create_file_index_for_dwd_server(dataset, resolution, period, DWDCDCBase.CLIMATE_OBSERVATIONS)

file_index = file_index.loc[file_index[DwdColumns.FILENAME.value].str.endswith(Extension.ZIP.value), :]

Expand Down
22 changes: 22 additions & 0 deletions wetterdienst/provider/dwd/observation/metadata/dataset.py
Expand Up @@ -41,8 +41,24 @@ class DwdObservationDataset(Enum):
WATER_EQUIVALENT = "water_equiv"
WEATHER_PHENOMENA = "weather_phenomena"
WEATHER_PHENOMENA_MORE = "more_weather_phenomena"
# hourly urban datasets
URBAN_TEMPERATURE_AIR = "urban_air_temperature"
URBAN_PRECIPITATION = "urban_precipitation"
URBAN_PRESSURE = "urban_pressure"
URBAN_TEMPERATURE_SOIL = "urban_soil_temperature"
URBAN_SUN = "urban_sun"
URBAN_WIND = "urban_wind"


DWD_URBAN_DATASETS = (
DwdObservationDataset.URBAN_TEMPERATURE_AIR,
DwdObservationDataset.URBAN_PRECIPITATION,
DwdObservationDataset.URBAN_PRESSURE,
DwdObservationDataset.URBAN_TEMPERATURE_SOIL,
DwdObservationDataset.URBAN_SUN,
DwdObservationDataset.URBAN_WIND,
)

RESOLUTION_DATASET_MAPPING: Dict[Resolution, Dict[DwdObservationDataset, List[Period]]] = {
Resolution.MINUTE_1: {
DwdObservationDataset.PRECIPITATION: [
Expand Down Expand Up @@ -148,6 +164,12 @@ class DwdObservationDataset(Enum):
Period.HISTORICAL,
Period.RECENT,
],
DwdObservationDataset.URBAN_TEMPERATURE_AIR: [Period.RECENT],
DwdObservationDataset.URBAN_PRECIPITATION: [Period.RECENT],
DwdObservationDataset.URBAN_PRESSURE: [Period.RECENT],
DwdObservationDataset.URBAN_TEMPERATURE_SOIL: [Period.RECENT],
DwdObservationDataset.URBAN_SUN: [Period.RECENT],
DwdObservationDataset.URBAN_WIND: [Period.RECENT],
},
Resolution.SUBDAILY: {
DwdObservationDataset.TEMPERATURE_AIR: [
Expand Down

0 comments on commit 9a88fd2

Please sign in to comment.