diff --git a/CONTRIBUTORS.rst b/CONTRIBUTORS.rst index 9205cc6b3..1bc35501c 100644 --- a/CONTRIBUTORS.rst +++ b/CONTRIBUTORS.rst @@ -15,3 +15,6 @@ This is a list of the people who directly contributed to wetterdienst in one way * Ilya Kamenshchikov * Michael Schrammel + +* `Niclas Hoyer `_ + - Change mosmix parser to xml stream events diff --git a/wetterdienst/provider/dwd/mosmix/access.py b/wetterdienst/provider/dwd/mosmix/access.py index a438350c6..bcd827880 100644 --- a/wetterdienst/provider/dwd/mosmix/access.py +++ b/wetterdienst/provider/dwd/mosmix/access.py @@ -11,7 +11,7 @@ import numpy as np import pandas as pd from fsspec.implementations.zip import ZipFileSystem -from lxml.etree import XMLParser, parse # noqa: S410 +from lxml.etree import iterparse # noqa: S410 from pandas import DatetimeIndex from tqdm import tqdm @@ -32,9 +32,9 @@ def __init__( self.station_ids = station_ids self.parameters = parameters self.metadata = {} - self.root = None self.timesteps = [] - self.items = [] + self.nsmap = None + self.iter_elems = None self.dwdfs = NetworkFilesystemManager.get(ttl=CacheExpiry.FIVE_MINUTES) @@ -88,9 +88,7 @@ def read(self, url: str): kml = self.fetch(url) log.info("Parsing KML data") - # TODO: Check if XML parsing performance can be improved by using libxml2. - tree = parse(BytesIO(kml), parser=XMLParser(huge_tree=True)) # noqa: S320 - self.root = root = tree.getroot() + self.iter_elems = iterparse(BytesIO(kml), events=("start", "end"), resolve_entities=False) prod_items = { "issuer": "Issuer", @@ -99,43 +97,67 @@ def read(self, url: str): "issue_time": "IssueTime", } - # Get Basic Metadata - prod_definition = root.findall("kml:Document/kml:ExtendedData/dwd:ProductDefinition", root.nsmap)[0] + nsmap = None - self.metadata = {k: prod_definition.find(f"{{{root.nsmap['dwd']}}}{v}").text for k, v in prod_items.items()} + # Get Basic Metadata + prod_definition = None + prod_definition_tag = None + for event, element in self.iter_elems: + if event == "start": + # get namespaces from root element + if nsmap is None: + nsmap = element.nsmap + prod_definition_tag = f"{{{nsmap['dwd']}}}ProductDefinition" + elif event == "end": + if element.tag == prod_definition_tag: + prod_definition = element + # stop processing after head + # leave forecast data for iteration + break + + self.metadata = {k: prod_definition.find(f"{{{nsmap['dwd']}}}{v}").text for k, v in prod_items.items()} self.metadata["issue_time"] = pd.Timestamp(self.metadata["issue_time"]) # Get time steps. - timesteps = root.findall( - "kml:Document/kml:ExtendedData/dwd:ProductDefinition/dwd:ForecastTimeSteps", - root.nsmap, + timesteps = prod_definition.findall( + "dwd:ForecastTimeSteps", + nsmap, )[0] self.timesteps = DatetimeIndex([pd.Timestamp(i.text) for i in timesteps.getchildren()]) - # Find all kml:Placemark items. - self.items = root.findall("kml:Document/kml:Placemark", root.nsmap) + # save namespace map for later iteration + self.nsmap = nsmap def iter_items(self): - for item in self.items: - station_id = item.find("kml:name", self.root.nsmap).text - - if (self.station_ids is None) or station_id in self.station_ids: - yield item + clear = True + placemark_tag = f"{{{self.nsmap['kml']}}}Placemark" + for event, element in self.iter_elems: + if event == "start": + if element.tag == placemark_tag: + clear = False + elif event == "end": + if element.tag == placemark_tag: + station_id = element.find("kml:name", self.nsmap).text + if (self.station_ids is None) or station_id in self.station_ids: + yield element + clear = True + if clear: + element.clear() def get_metadata(self): return pd.DataFrame([self.metadata]) def get_forecasts(self): for station_forecast in self.iter_items(): - station_ids = station_forecast.find("kml:name", self.root.nsmap).text + station_ids = station_forecast.find("kml:name", self.nsmap).text - measurement_list = station_forecast.findall("kml:ExtendedData/dwd:Forecast", self.root.nsmap) + measurement_list = station_forecast.findall("kml:ExtendedData/dwd:Forecast", self.nsmap) data_dict = {"station_id": station_ids, "datetime": self.timesteps} for measurement_item in measurement_list: - measurement_parameter = measurement_item.get(f"{{{self.root.nsmap['dwd']}}}elementName") + measurement_parameter = measurement_item.get(f"{{{self.nsmap['dwd']}}}elementName") if measurement_parameter.lower() in self.parameters: measurement_string = measurement_item.getchildren()[0].text @@ -149,4 +171,5 @@ def get_forecasts(self): data_dict[measurement_parameter.lower()] = measurement_values + station_forecast.clear() yield pd.DataFrame.from_dict(data_dict)