diff --git a/wetterdienst/core/scalar.py b/wetterdienst/core/scalar.py index 5b85ee999..957fc415e 100644 --- a/wetterdienst/core/scalar.py +++ b/wetterdienst/core/scalar.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- # Copyright (c) 2018-2021, earthobservations developers. # Distributed under the MIT License. See LICENSE for more info. -from abc import abstractmethod +from abc import abstractmethod, ABCMeta +from dataclasses import dataclass from datetime import datetime from enum import Enum from logging import getLogger @@ -20,7 +21,6 @@ from wetterdienst.metadata.columns import Columns from wetterdienst.metadata.period import Period, PeriodType from wetterdienst.metadata.resolution import Frequency, Resolution, ResolutionType -from wetterdienst.metadata.result import Result from wetterdienst.metadata.timezone import Timezone from wetterdienst.util.enumeration import parse_enumeration_from_template from wetterdienst.util.geo import Coordinates, derive_nearest_neighbours @@ -30,8 +30,6 @@ EARTH_RADIUS_KM = 6371 -# TODO: move more attributes to __init__ - class ScalarCore(Core): """Core for time series related classes """ @@ -129,6 +127,194 @@ def __init__( self.end_date = end_date +class ScalarStationsCore(ScalarCore): + """ Core for stations information of a source """ + + # Columns that should be contained within any stations information + _base_columns = ( + Columns.STATION_ID.value, + Columns.FROM_DATE.value, + Columns.TO_DATE.value, + Columns.HEIGHT.value, + Columns.LATITUDE.value, + Columns.LONGITUDE.value, + Columns.STATION_NAME.value, + Columns.STATE.value, + ) + # TODO: eventually this can be matched with the type coercion of station data to get + # similar types of floats and strings + # Dtype mapping for stations + _dtype_mapping = { + Columns.STATION_ID.value: str, + Columns.HEIGHT.value: float, + Columns.LATITUDE.value: float, + Columns.LONGITUDE.value: float, + Columns.STATION_NAME.value: str, + Columns.STATE.value: str, + } + + def _parse_period(self, period: Period): + if not period: + return None + elif self._period_type == PeriodType.FIXED: + return period + else: + return parse_enumeration_from_template(period, self._period_base, Period) + + def __init__( + self, + resolution: Resolution, + period: Period, + start_date: Union[None, str, datetime] = None, + end_date: Union[None, str, datetime] = None, + ) -> None: + """ + + :param start_date: start date for filtering stations for their available data + :param end_date: end date for filtering stations for their available data + """ + super(ScalarStationsCore, self).__init__( + resolution=resolution, + period=period, + start_date=start_date, + end_date=end_date, + ) + + def all(self) -> pd.DataFrame: + """ + Wraps the _all method and applies date filters. + + :return: pandas.DataFrame with the information of different available stations + """ + metadata_df = self._all().copy() + + metadata_df = metadata_df.reindex(columns=self._base_columns) + + metadata_df = self._coerce_meta_fields(metadata_df) + + if self.start_date: + metadata_df = metadata_df[ + metadata_df[DWDMetaColumns.FROM_DATE.value] <= self.start_date + ] + + if self.end_date: + metadata_df = metadata_df[ + metadata_df[DWDMetaColumns.TO_DATE.value] >= self.end_date + ] + + return metadata_df + + def _coerce_meta_fields(self, df) -> pd.DataFrame: + """ Method for filed coercion. """ + df = df.astype(self._dtype_mapping) + + df[Columns.FROM_DATE.value] = pd.to_datetime( + df[Columns.FROM_DATE.value], infer_datetime_format=True + ).dt.tz_localize(pytz.UTC) + df[Columns.TO_DATE.value] = pd.to_datetime( + df[Columns.TO_DATE.value], infer_datetime_format=True + ).dt.tz_localize(pytz.UTC) + + return df + + @abstractmethod + def _all(self) -> pd.DataFrame: + """ + Abstract method for gathering of sites information for a given implementation. + Information consist of a DataFrame with station ids, location, name, etc + + :return: pandas.DataFrame with the information of different available sites + """ + pass + + def nearby_number( + self, + latitude: float, + longitude: float, + number: int, + ) -> pd.DataFrame: + """ + Wrapper for get_nearby_stations_by_number using the given parameter set. Returns + nearest stations defined by number. + + :param latitude: latitude in degrees + :param longitude: longitude in degrees + :param number: number of stations to be returned, greater 0 + :return: pandas.DataFrame with station information for the selected stations + """ + if number <= 0: + raise ValueError("'num_stations_nearby' has to be at least 1.") + + coords = Coordinates(np.array(latitude), np.array(longitude)) + + metadata = self.all() + + metadata = metadata.reset_index(drop=True) + + distances, indices_nearest_neighbours = derive_nearest_neighbours( + metadata[Columns.LATITUDE.value].values, + metadata[Columns.LONGITUDE.value].values, + coords, + number, + ) + + distances = pd.Series(distances) + indices_nearest_neighbours = pd.Series(indices_nearest_neighbours) + + # If num_stations_nearby is higher then the actual amount of stations + # further indices and distances are added which have to be filtered out + distances = distances[: min(metadata.shape[0], number)] + indices_nearest_neighbours = indices_nearest_neighbours[ + : min(metadata.shape[0], number) + ] + + distances_km = np.array(distances * EARTH_RADIUS_KM) + + metadata_location = metadata.iloc[indices_nearest_neighbours, :].reset_index( + drop=True + ) + + metadata_location[DWDMetaColumns.DISTANCE_TO_LOCATION.value] = distances_km + + if metadata_location.empty: + log.warning( + f"No weather stations were found for coordinate " + f"{latitude}°N and {longitude}°E " + ) + + return metadata_location + + def nearby_radius( + self, + latitude: float, + longitude: float, + max_distance_in_km: int, + ) -> pd.DataFrame: + """ + Wrapper for get_nearby_stations_by_distance using the given parameter set. + Returns nearest stations defined by distance (km). + + :param latitude: latitude in degrees + :param longitude: longitude in degrees + :param max_distance_in_km: distance (km) for which stations will be selected + :return: pandas.DataFrame with station information for the selected stations + """ + # Theoretically a distance of 0 km is possible + if max_distance_in_km < 0: + raise ValueError("'max_distance_in_km' has to be at least 0.0.") + + metadata = self.all() + + all_nearby_stations = self.nearby_number(latitude, longitude, metadata.shape[0]) + + nearby_stations_in_distance = all_nearby_stations[ + all_nearby_stations[DWDMetaColumns.DISTANCE_TO_LOCATION.value] + <= max_distance_in_km + ] + + return nearby_stations_in_distance.reset_index(drop=True) + + class ScalarValuesCore(ScalarCore): """ Core for sources of point data where data is related to a station """ @@ -423,7 +609,7 @@ def query(self) -> Generator[Result, None, None]: continue # TODO: add meaningful metadata here - yield Result(pd.DataFrame(), station_df) + yield ValuesResult(pd.DataFrame(), station_df) @abstractmethod def _collect_station_parameter(self, station_id: str, parameter) -> pd.DataFrame: @@ -616,189 +802,43 @@ def _create_humanized_parameters_mapping(self) -> Dict[str, str]: return hcnm -class ScalarStationsCore(ScalarCore): - """ Core for stations information of a source """ - - # Columns that should be contained within any stations information - _base_columns = ( - Columns.STATION_ID.value, - Columns.FROM_DATE.value, - Columns.TO_DATE.value, - Columns.HEIGHT.value, - Columns.LATITUDE.value, - Columns.LONGITUDE.value, - Columns.STATION_NAME.value, - Columns.STATE.value, - ) - # TODO: eventually this can be matched with the type coercion of station data to get - # similar types of floats and strings - # Dtype mapping for stations - _dtype_mapping = { - Columns.STATION_ID.value: str, - Columns.HEIGHT.value: float, - Columns.LATITUDE.value: float, - Columns.LONGITUDE.value: float, - Columns.STATION_NAME.value: str, - Columns.STATE.value: str, - } +class StationsResult(metaclass=ABCMeta): + @abstractmethod + @property + def _values(self) -> ScalarValuesCore: + pass - def _parse_period(self, period: Period): - if not period: - return None - elif self._period_type == PeriodType.FIXED: - return period - else: - return parse_enumeration_from_template(period, self._period_base, Period) + @property + def station_id(self) -> pd.Series: + return self.df[Columns.STATION_ID.value] def __init__( self, - resolution: Resolution, - period: Period, - start_date: Union[None, str, datetime] = None, - end_date: Union[None, str, datetime] = None, + df: pd.DataFrame, + **kwargs ) -> None: - """ + self.df = df + self._kwargs = kwargs - :param start_date: start date for filtering stations for their available data - :param end_date: end date for filtering stations for their available data - """ - super(ScalarStationsCore, self).__init__( - resolution=resolution, - period=period, - start_date=start_date, - end_date=end_date, - ) + def query(self): + kwargs = self._kwargs - def all(self) -> pd.DataFrame: - """ - Wraps the _all method and applies date filters. + kwargs["station_id"] = self.station_id - :return: pandas.DataFrame with the information of different available stations - """ - metadata_df = self._all().copy() + values = self._values.__init__(**kwargs) - metadata_df = metadata_df.reindex(columns=self._base_columns) + yield from values.query() - metadata_df = self._coerce_meta_fields(metadata_df) + def all(self) -> ValuesResult: + data = sum(list(self.query())) - if self.start_date: - metadata_df = metadata_df[ - metadata_df[DWDMetaColumns.FROM_DATE.value] <= self.start_date - ] + return data - if self.end_date: - metadata_df = metadata_df[ - metadata_df[DWDMetaColumns.TO_DATE.value] >= self.end_date - ] - - return metadata_df - - def _coerce_meta_fields(self, df) -> pd.DataFrame: - """ Method for filed coercion. """ - df = df.astype(self._dtype_mapping) - - df[Columns.FROM_DATE.value] = pd.to_datetime( - df[Columns.FROM_DATE.value], infer_datetime_format=True - ).dt.tz_localize(pytz.UTC) - df[Columns.TO_DATE.value] = pd.to_datetime( - df[Columns.TO_DATE.value], infer_datetime_format=True - ).dt.tz_localize(pytz.UTC) - return df - - @abstractmethod - def _all(self) -> pd.DataFrame: - """ - Abstract method for gathering of sites information for a given implementation. - Information consist of a DataFrame with station ids, location, name, etc +@dataclass +class ValuesResult: + metadata: pd.DataFrame + data: pd.DataFrame - :return: pandas.DataFrame with the information of different available sites - """ + def __add__(self, other): pass - - def nearby_number( - self, - latitude: float, - longitude: float, - number: int, - ) -> pd.DataFrame: - """ - Wrapper for get_nearby_stations_by_number using the given parameter set. Returns - nearest stations defined by number. - - :param latitude: latitude in degrees - :param longitude: longitude in degrees - :param number: number of stations to be returned, greater 0 - :return: pandas.DataFrame with station information for the selected stations - """ - if number <= 0: - raise ValueError("'num_stations_nearby' has to be at least 1.") - - coords = Coordinates(np.array(latitude), np.array(longitude)) - - metadata = self.all() - - metadata = metadata.reset_index(drop=True) - - distances, indices_nearest_neighbours = derive_nearest_neighbours( - metadata[Columns.LATITUDE.value].values, - metadata[Columns.LONGITUDE.value].values, - coords, - number, - ) - - distances = pd.Series(distances) - indices_nearest_neighbours = pd.Series(indices_nearest_neighbours) - - # If num_stations_nearby is higher then the actual amount of stations - # further indices and distances are added which have to be filtered out - distances = distances[: min(metadata.shape[0], number)] - indices_nearest_neighbours = indices_nearest_neighbours[ - : min(metadata.shape[0], number) - ] - - distances_km = np.array(distances * EARTH_RADIUS_KM) - - metadata_location = metadata.iloc[indices_nearest_neighbours, :].reset_index( - drop=True - ) - - metadata_location[DWDMetaColumns.DISTANCE_TO_LOCATION.value] = distances_km - - if metadata_location.empty: - log.warning( - f"No weather stations were found for coordinate " - f"{latitude}°N and {longitude}°E " - ) - - return metadata_location - - def nearby_radius( - self, - latitude: float, - longitude: float, - max_distance_in_km: int, - ) -> pd.DataFrame: - """ - Wrapper for get_nearby_stations_by_distance using the given parameter set. - Returns nearest stations defined by distance (km). - - :param latitude: latitude in degrees - :param longitude: longitude in degrees - :param max_distance_in_km: distance (km) for which stations will be selected - :return: pandas.DataFrame with station information for the selected stations - """ - # Theoretically a distance of 0 km is possible - if max_distance_in_km < 0: - raise ValueError("'max_distance_in_km' has to be at least 0.0.") - - metadata = self.all() - - all_nearby_stations = self.nearby_number(latitude, longitude, metadata.shape[0]) - - nearby_stations_in_distance = all_nearby_stations[ - all_nearby_stations[DWDMetaColumns.DISTANCE_TO_LOCATION.value] - <= max_distance_in_km - ] - - return nearby_stations_in_distance.reset_index(drop=True) diff --git a/wetterdienst/dwd/forecasts/api.py b/wetterdienst/dwd/forecasts/api.py index 25b4da5fe..7d9fb89af 100644 --- a/wetterdienst/dwd/forecasts/api.py +++ b/wetterdienst/dwd/forecasts/api.py @@ -12,7 +12,7 @@ import requests from requests import HTTPError -from wetterdienst.core.scalar import ScalarStationsCore, ScalarValuesCore +from wetterdienst.core.scalar import ScalarStationsCore, ScalarValuesCore, ValuesResult from wetterdienst.dwd.forecasts.access import KMLReader from wetterdienst.dwd.forecasts.metadata import ( DWDForecastDate, @@ -30,7 +30,6 @@ from wetterdienst.metadata.columns import Columns from wetterdienst.metadata.period import Period, PeriodType from wetterdienst.metadata.resolution import Resolution, ResolutionType -from wetterdienst.metadata.result import Result from wetterdienst.metadata.source import Source from wetterdienst.metadata.timezone import Timezone from wetterdienst.util.enumeration import parse_enumeration_from_template @@ -244,7 +243,7 @@ def adjust_datetime(datetime_: datetime) -> datetime: return datetime_adjusted - def query(self) -> Generator[Result, None, None]: + def query(self) -> Generator[ValuesResult, None, None]: """Replace collect data method as all information is read once from kml file""" for metadata_df, forecast_df in self._collect_station_parameter(): forecast_df = self._coerce_meta_fields(forecast_df) @@ -264,11 +263,11 @@ def query(self) -> Generator[Result, None, None]: metadata_df = metadata_df.join(station_metadata) - result = Result(metadata_df, forecast_df) + result = ValuesResult(metadata_df, forecast_df) yield result - def _collect_station_parameter(self) -> Generator[Result, None, None]: + def _collect_station_parameter(self) -> Generator[ValuesResult, None, None]: """Wrapper of read_mosmix to collect forecast data (either latest or for defined dates)""" if self.start_issue == DWDForecastDate.LATEST: @@ -281,7 +280,7 @@ def _collect_station_parameter(self) -> Generator[Result, None, None]: log.warning(e) continue - def read_mosmix(self, date: Union[datetime, DWDForecastDate]) -> Result: + def read_mosmix(self, date: Union[datetime, DWDForecastDate]) -> ValuesResult: """ Manage data acquisition for a given date that is used to filter the found files on the MOSMIX path of the DWD server. diff --git a/wetterdienst/metadata/result.py b/wetterdienst/metadata/result.py deleted file mode 100644 index b81f38779..000000000 --- a/wetterdienst/metadata/result.py +++ /dev/null @@ -1,12 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright (c) 2018-2021, earthobservations developers. -# Distributed under the MIT License. See LICENSE for more info. -from dataclasses import dataclass - -import pandas as pd - - -@dataclass -class Result: - metadata: pd.DataFrame - data: pd.DataFrame