Skip to content

Commit

Permalink
Change mosmix parser to use xml stream events
Browse files Browse the repository at this point in the history
  • Loading branch information
niclashoyer authored and gutzbenj committed Sep 3, 2022
1 parent c40da05 commit 99b34e9
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 22 deletions.
3 changes: 3 additions & 0 deletions CONTRIBUTORS.rst
Expand Up @@ -15,3 +15,6 @@ This is a list of the people who directly contributed to wetterdienst in one way
* Ilya Kamenshchikov

* Michael Schrammel

* `Niclas Hoyer <https://github.com/niclashoyer>`_
- Change mosmix parser to xml stream events
67 changes: 45 additions & 22 deletions wetterdienst/provider/dwd/mosmix/access.py
Expand Up @@ -11,7 +11,7 @@
import numpy as np
import pandas as pd
from fsspec.implementations.zip import ZipFileSystem
from lxml.etree import XMLParser, parse # noqa: S410
from lxml.etree import iterparse # noqa: S410
from pandas import DatetimeIndex
from tqdm import tqdm

Expand All @@ -32,9 +32,9 @@ def __init__(
self.station_ids = station_ids
self.parameters = parameters
self.metadata = {}
self.root = None
self.timesteps = []
self.items = []
self.nsmap = None
self.iter_elems = None

self.dwdfs = NetworkFilesystemManager.get(ttl=CacheExpiry.FIVE_MINUTES)

Expand Down Expand Up @@ -88,9 +88,7 @@ def read(self, url: str):
kml = self.fetch(url)

log.info("Parsing KML data")
# TODO: Check if XML parsing performance can be improved by using libxml2.
tree = parse(BytesIO(kml), parser=XMLParser(huge_tree=True)) # noqa: S320
self.root = root = tree.getroot()
self.iter_elems = iterparse(BytesIO(kml), events=("start", "end"), resolve_entities=False)

prod_items = {
"issuer": "Issuer",
Expand All @@ -99,43 +97,67 @@ def read(self, url: str):
"issue_time": "IssueTime",
}

# Get Basic Metadata
prod_definition = root.findall("kml:Document/kml:ExtendedData/dwd:ProductDefinition", root.nsmap)[0]
nsmap = None

self.metadata = {k: prod_definition.find(f"{{{root.nsmap['dwd']}}}{v}").text for k, v in prod_items.items()}
# Get Basic Metadata
prod_definition = None
prod_definition_tag = None
for event, element in self.iter_elems:
if event == "start":
# get namespaces from root element
if nsmap is None:
nsmap = element.nsmap
prod_definition_tag = f"{{{nsmap['dwd']}}}ProductDefinition"
elif event == "end":
if element.tag == prod_definition_tag:
prod_definition = element
# stop processing after head
# leave forecast data for iteration
break

self.metadata = {k: prod_definition.find(f"{{{nsmap['dwd']}}}{v}").text for k, v in prod_items.items()}
self.metadata["issue_time"] = pd.Timestamp(self.metadata["issue_time"])

# Get time steps.
timesteps = root.findall(
"kml:Document/kml:ExtendedData/dwd:ProductDefinition/dwd:ForecastTimeSteps",
root.nsmap,
timesteps = prod_definition.findall(
"dwd:ForecastTimeSteps",
nsmap,
)[0]
self.timesteps = DatetimeIndex([pd.Timestamp(i.text) for i in timesteps.getchildren()])

# Find all kml:Placemark items.
self.items = root.findall("kml:Document/kml:Placemark", root.nsmap)
# save namespace map for later iteration
self.nsmap = nsmap

def iter_items(self):
for item in self.items:
station_id = item.find("kml:name", self.root.nsmap).text

if (self.station_ids is None) or station_id in self.station_ids:
yield item
clear = True
placemark_tag = f"{{{self.nsmap['kml']}}}Placemark"
for event, element in self.iter_elems:
if event == "start":
if element.tag == placemark_tag:
clear = False
elif event == "end":
if element.tag == placemark_tag:
station_id = element.find("kml:name", self.nsmap).text
if (self.station_ids is None) or station_id in self.station_ids:
yield element
clear = True
if clear:
element.clear()

def get_metadata(self):
return pd.DataFrame([self.metadata])

def get_forecasts(self):
for station_forecast in self.iter_items():
station_ids = station_forecast.find("kml:name", self.root.nsmap).text
station_ids = station_forecast.find("kml:name", self.nsmap).text

measurement_list = station_forecast.findall("kml:ExtendedData/dwd:Forecast", self.root.nsmap)
measurement_list = station_forecast.findall("kml:ExtendedData/dwd:Forecast", self.nsmap)

data_dict = {"station_id": station_ids, "datetime": self.timesteps}

for measurement_item in measurement_list:

measurement_parameter = measurement_item.get(f"{{{self.root.nsmap['dwd']}}}elementName")
measurement_parameter = measurement_item.get(f"{{{self.nsmap['dwd']}}}elementName")

if measurement_parameter.lower() in self.parameters:
measurement_string = measurement_item.getchildren()[0].text
Expand All @@ -149,4 +171,5 @@ def get_forecasts(self):

data_dict[measurement_parameter.lower()] = measurement_values

station_forecast.clear()
yield pd.DataFrame.from_dict(data_dict)

0 comments on commit 99b34e9

Please sign in to comment.