Skip to content

Commit

Permalink
Use pdbufr to read DWD radar data in bufr format
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Jul 20, 2021
1 parent 48fc337 commit e8ed613
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Expand Up @@ -9,6 +9,7 @@ Added

- Enable selecting a parameter precisely from a dataset by passing a tuple like [("precipitation_height", "kl")] or
[("precipitation_height", "precipitation_more")], or for cli/restapi use "precipitation_height/kl"
- Allow parsing DWD radar data in bufr format to a pandas DataFrame

Changed
=======
Expand Down
65 changes: 56 additions & 9 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 14 additions & 11 deletions pyproject.toml
Expand Up @@ -104,7 +104,7 @@ click-params = "^0.1.1"
cloup = "^0.8.0"

# Optional dependencies aka. "extras"
matplotlib = { version = "^3.3.2", optional = true }
matplotlib = { version = "^3.3.2", optional = true }

openpyxl = { version = "^3.0.7", optional = true }
pyarrow = { version = "^3.0.0", optional = true, markers = "sys_platform != 'darwin' or (sys_platform == 'darwin' and platform_machine != 'arm64')" }
Expand All @@ -119,22 +119,25 @@ psycopg2-binary = { version = "^2.8.6", optional = true }
# HTTP REST API service
fastapi = { version = "^0.61.1", optional = true }
uvicorn = { version = "^0.13.3", optional = true }

# Radar
wradlib = { version = "^1.9.0", optional = true }
pdbufr = { version = "^0.9.0", optional = true }

# Explorer UI service
plotly = { version = "^4.14.3", optional = true }
dash = { version = "^1.19.0", optional = true }
dash-bootstrap-components = { version = "^0.12.0", optional = true }

sphinx = { version = "^3.2.1", optional = true }
sphinx-material = { version = "^0.0.30", optional = true }
sphinx-autodoc-typehints = { version = "^1.11.0", optional = true }
sphinxcontrib-svg2pdfconverter = { version = "^1.1.0", optional = true }
tomlkit = { version = "^0.7.0", optional = true }
ipython = { version = "^7.10.1", optional = true }
ipython-genutils = { version = "^0.2.0", optional = true }
zarr = { version = "^2.7.0", optional = true, markers = "sys_platform != 'darwin' or (sys_platform == 'darwin' and platform_machine != 'arm64')" } # not supported through numcodecs
xarray = { version = "^0.17.0", optional = true }
sphinx = { version = "^3.2.1", optional = true }
sphinx-material = { version = "^0.0.30", optional = true }
sphinx-autodoc-typehints = { version = "^1.11.0", optional = true }
sphinxcontrib-svg2pdfconverter = { version = "^1.1.0", optional = true }
tomlkit = { version = "^0.7.0", optional = true }
ipython = { version = "^7.10.1", optional = true }
ipython-genutils = { version = "^0.2.0", optional = true }
zarr = { version = "^2.7.0", optional = true, markers = "sys_platform != 'darwin' or (sys_platform == 'darwin' and platform_machine != 'arm64')" } # not supported through numcodecs
xarray = { version = "^0.17.0", optional = true }


[tool.poetry.dev-dependencies]
Expand Down Expand Up @@ -183,7 +186,7 @@ influxdb = ["influxdb", "influxdb-client"]
cratedb = ["crate"]
mysql = ["mysqlclient"]
postgresql = ["psycopg2-binary"]
radar = ["wradlib", "pybufrkit", "h5py"]
radar = ["wradlib", "pybufrkit", "h5py", "pdbufr"]
bufr = ["pybufrkit"]

[tool.poetry.scripts]
Expand Down
37 changes: 37 additions & 0 deletions tests/provider/dwd/radar/test_api_historic.py
Expand Up @@ -404,6 +404,43 @@ def test_radar_request_site_historic_pe_bufr():
decoder.process(payload, info_only=True)


@pytest.mark.remote
def test_radar_request_site_historic_pe_bufr_dataframe():
"""
Verify acquisition of radar/site/PE_ECHO_TOP data works
when using a specific date.
This time, we will use the BUFR data format.
"""

# Acquire data from yesterday at this time.
timestamp = datetime.utcnow() - timedelta(days=1)

request = DwdRadarValues(
parameter=DwdRadarParameter.PE_ECHO_TOP,
start_date=timestamp,
site=DwdRadarSite.BOO,
fmt=DwdRadarDataFormat.BUFR,
read_bufr=True,
)

results = list(request.query())

if len(results) == 0:
raise pytest.skip("Data currently not available")

df = results[0].data

assert not df.empty

assert df.columns == [
"station_id",
"latitude",
"longitude",
"horizontalReflectivity",
]


@pytest.mark.remote
@pytest.mark.parametrize(
"format",
Expand Down
32 changes: 30 additions & 2 deletions wetterdienst/provider/dwd/radar/access.py
Expand Up @@ -8,11 +8,12 @@
from dataclasses import dataclass
from datetime import datetime
from io import BytesIO
from typing import Generator, Optional, Tuple
from typing import Generator, Optional, Tuple, Union

import pandas as pd

from wetterdienst.exceptions import FailedDownload
from wetterdienst.metadata.columns import Columns
from wetterdienst.metadata.extension import Extension
from wetterdienst.metadata.period import Period
from wetterdienst.metadata.resolution import Resolution
Expand Down Expand Up @@ -46,7 +47,7 @@ class RadarResult:
Currently, this will relate to exactly one radar data file.
"""

data: BytesIO
data: Union[BytesIO, pd.DataFrame]
timestamp: datetime = None
url: str = None
filename: str = None
Expand Down Expand Up @@ -79,6 +80,7 @@ def collect_radar_data(
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
verify: Optional[bool] = True,
read_bufr: bool = False,
) -> RadarResult:
"""
Collect radar data for given parameters.
Expand All @@ -95,10 +97,14 @@ def collect_radar_data(
:param start_date: Start date
:param end_date: End date
:param verify: Whether to verify the response
:param read_bufr: read bufr to pd.DataFrame
:return: ``RadarResult`` item
"""

if read_bufr:
import pdbufr

# Find latest file.
if start_date == DwdRadarDate.LATEST:

Expand Down Expand Up @@ -212,6 +218,28 @@ def collect_radar_data(
if result.timestamp is None:
result.timestamp = date_time

if fmt == DwdRadarDataFormat.BUFR:
df = pdbufr.read_bufr(
result.data,
columns=(
"stationNumber",
"latitude",
"longitude",
"horizontalReflectivity",
),
)

df = df.rename(
columns={
"stationNumber": Columns.STATION_ID.value,
"latitude": Columns.LATITUDE.value,
"longitude": Columns.LONGITUDE.value,
}
)

# rewrite data to result
result.data = df

if verify:
if fmt == DwdRadarDataFormat.HDF5:
verify_hdf5(result.data)
Expand Down
5 changes: 5 additions & 0 deletions wetterdienst/provider/dwd/radar/api.py
Expand Up @@ -55,6 +55,7 @@ def __init__(
end_date: Optional[Union[str, datetime, timedelta]] = None,
resolution: Optional[Union[str, Resolution, DwdRadarResolution]] = None,
period: Optional[Union[str, Period, DwdRadarPeriod]] = None,
**kwargs,
) -> None:
"""
:param parameter: The radar moment to request
Expand All @@ -67,6 +68,7 @@ def __init__(
:param resolution: Time resolution for RadarParameter.RADOLAN_CDC,
either daily or hourly or 5 minutes.
:param period: Period type for RadarParameter.RADOLAN_CDC
:param kwargs: kwargs used for collecting the data
"""

# Convert parameters to enum types.
Expand All @@ -82,6 +84,8 @@ def __init__(
period, DwdRadarPeriod, Period
)

self.read_bufr = kwargs.get("read_bufr")

# Sanity checks.
if self.parameter == DwdRadarParameter.RADOLAN_CDC:

Expand Down Expand Up @@ -261,6 +265,7 @@ def query(self) -> RadarResult:
end_date=self.end_date,
resolution=self.resolution,
period=self.period,
read_bufr=self.read_bufr,
):
progressbar.update()
yield item
Expand Down

0 comments on commit e8ed613

Please sign in to comment.