Skip to content

Commit

Permalink
Reorganize resolution, create full timeseries response
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Jan 3, 2021
1 parent b11e4c4 commit 9d381a7
Show file tree
Hide file tree
Showing 10 changed files with 549 additions and 317 deletions.
324 changes: 320 additions & 4 deletions wetterdienst/core/point_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,117 @@
from enum import Enum
from typing import Tuple, Union, Optional, List, Generator, Dict
from logging import getLogger
import pytz
from pytz import timezone

import dateparser
import numpy as np
import pandas as pd
import pytz
from pytz import timezone

from pandas._libs.tslibs.timestamps import Timestamp

from wetterdienst.core.core import Core
from wetterdienst.dwd.metadata.column_names import DWDMetaColumns
from wetterdienst.dwd.util import parse_datetime
from wetterdienst.exceptions import NoParametersFound, StartDateEndDateError
from wetterdienst.metadata.columns import Columns
from wetterdienst.metadata.period import Period, PeriodType
from wetterdienst.metadata.resolution import ResolutionType, Resolution, Frequency
from wetterdienst.metadata.result import Result
from wetterdienst.metadata.timezone import Timezone
from wetterdienst.util.enumeration import parse_enumeration_from_template

from wetterdienst.util.enumeration import parse_enumeration_from_template, parse_enumeration
from wetterdienst.util.geo import Coordinates, derive_nearest_neighbours

log = getLogger(__name__)


EARTH_RADIUS_KM = 6371

# TODO: move more attributes to __init__


class PointDataCore(Core):
""" Core for resolution based classes, not part of PointDataCore as it may not be
needed for metadata """
@property
def resolution(self) -> Optional[Resolution]:
return self._resolution

@resolution.setter
def resolution(self, res) -> None:
self._resolution = parse_enumeration_from_template(res, self.resolution_base)

@property
def resolution_base(self) -> Optional[Resolution]:
"""Property for building resolution base enum that represents possible
resolutions given for the source based on the defined available resolutions"""
_resolution_base = self._resolution_base

if self.resolution_type == ResolutionType.FIXED or self.resolution_type == ResolutionType.DYNAMIC:
return _resolution_base
else:
_resolution_base = Enum(
Resolution.__name__, [(e.name, e.value) for e in _resolution_base]
)

return _resolution_base

@property
@abstractmethod
def _resolution_base(self) -> Optional[Union[Resolution, List[Resolution]]]:
pass

@property
@abstractmethod
def resolution_type(self) -> ResolutionType:
pass

# TODO: implement for source with dynamic resolution
@staticmethod
def _determine_resolution(dates: pd.Series) -> Resolution:
""" Function to determine resolution from a pandas Series of dates """
pass

@property
def frequency(self) -> Frequency:
return Frequency[self.resolution.name]

@property
def period(self):
return pd.Series(self._period).sort_values().tolist()

@period.setter
def period(self, per):
self._period = parse_enumeration(self.period_base, per)

@property
@abstractmethod
def period_type(self) -> PeriodType:
pass

@property
def period_base(self) -> Period:
period_base = Enum(
Period.__name__, [(e.name, e.value) for e in self._period_base]
)

return period_base

@property
@abstractmethod
def _period_base(self) -> Union[Period, List[Period]]:
pass

def __init__(
self,
resolution: Resolution,
period: Union[Period, List[Period]]
) -> None:
self.resolution = resolution
self.period = period


class PointDataValuesCore(PointDataCore):
""" Core for sources of point data where data is related to a station """

# Fields for type coercion, needed for separation from fields with actual data
Expand Down Expand Up @@ -97,10 +189,27 @@ def _parameter_base(self) -> Enum:
DWDObservationParameter"""
pass

@property
def _complete_dates(self) -> pd.DatetimeIndex:
return pd.date_range(self.start_date, self.end_date, freq=self.frequency.value, tz=self.data_tz)

def _build_complete_ts(self, df: pd.DataFrame) -> pd.DataFrame:
if self.resolution_type == ResolutionType.DYNAMIC:
self.resolution = self._determine_resolution(df[Columns.DATETIME.value])

df_complete = pd.merge(
left=self._complete_dates,
right=df
)

return df_complete

def __init__(
self,
station_ids: Tuple[str],
parameters: Tuple[Union[str, Enum]],
resolution: Resolution,
period: Period,
start_date: Optional[Union[str, Timestamp, datetime]] = None,
end_date: Optional[Union[str, Timestamp, datetime]] = None,
humanize_parameters: bool = False,
Expand All @@ -117,6 +226,11 @@ def __init__(
:param humanize_parameters: bool if parameters should be renamed to meaningful
names
"""
super(PointDataValuesCore, self).__init__(
resolution=resolution,
period=period
)

# Make sure we receive a list of ids
self.station_ids = pd.Series(station_ids).astype(str).tolist()
self.parameters = self._parse_parameters(parameters)
Expand Down Expand Up @@ -203,6 +317,9 @@ def query(self) -> Generator[Result, None, None]:
for parameter in self.parameters:
parameter_df = self._collect_station_parameter(station_id, parameter)

# Create
parameter_df = self._build_complete_ts(parameter_df)

station_data.append(parameter_df)

station_df = pd.concat(station_data)
Expand Down Expand Up @@ -422,3 +539,202 @@ def _create_humanized_parameters_mapping(self) -> Dict[str, str]:
hcnm = {parameter.value: parameter.name for parameter in self._parameter_base}

return hcnm


class PointDataStationsCore(PointDataCore):
""" Core for stations information of a source """

# Columns that should be contained within any stations information
_base_columns = (
Columns.STATION_ID.value,
Columns.FROM_DATE.value,
Columns.TO_DATE.value,
Columns.STATION_HEIGHT.value,
Columns.LATITUDE.value,
Columns.LONGITUDE.value,
Columns.STATION_NAME.value,
Columns.STATE.value,
)
# TODO: eventually this can be matched with the type coercion of station data to get
# similar types of floats and strings
# Dtype mapping for stations
_dtype_mapping = {
Columns.STATION_ID.value: str,
Columns.STATION_HEIGHT.value: float,
DWDMetaColumns.LATITUDE.value: float,
DWDMetaColumns.LONGITUDE.value: float,
DWDMetaColumns.STATION_NAME.value: str,
DWDMetaColumns.STATE.value: str,
}

def __init__(
self,
resolution: Resolution,
period: Period,
start_date: Union[None, str, Timestamp] = None,
end_date: Union[None, str, Timestamp] = None,
) -> None:
"""
:param start_date: start date for filtering stations for their available data
:param end_date: end date for filtering stations for their available data
"""
super(PointDataStationsCore, self).__init__(
resolution=resolution,
period=period
)

# TODO: make datetimes timezone sensible
start_date = (
start_date
if not start_date or isinstance(start_date, datetime)
else parse_datetime(start_date)
)
end_date = (
end_date
if not end_date or isinstance(end_date, datetime)
else parse_datetime(end_date)
)

start_date = start_date.replace(tzinfo=self.tz) if start_date else None
end_date = end_date.replace(tzinfo=self.tz) if end_date else None

if start_date and end_date:
if start_date > end_date:
raise StartDateEndDateError("'start_date' has to be before 'end_date'")

self.start_date = start_date
self.end_date = end_date

def all(self) -> pd.DataFrame:
"""
Wraps the _all method and applies date filters.
:return: pandas.DataFrame with the information of different available stations
"""
metadata_df = self._all()

for column in self._base_columns:
if column not in metadata_df:
metadata_df[column] = pd.NA

metadata_df = self._coerce_meta_fields(metadata_df)

if self.start_date:
metadata_df = metadata_df[
metadata_df[DWDMetaColumns.FROM_DATE.value] <= self.start_date
]

if self.end_date:
metadata_df = metadata_df[
metadata_df[DWDMetaColumns.TO_DATE.value] >= self.end_date
]

return metadata_df

def _coerce_meta_fields(self, df) -> pd.DataFrame:
""" Method for filed coercion. """
df = df.astype(self._dtype_mapping)

df[Columns.FROM_DATE.value] = pd.to_datetime(
df[Columns.FROM_DATE.value], infer_datetime_format=True
).dt.tz_localize(pytz.UTC)
df[Columns.TO_DATE.value] = pd.to_datetime(
df[Columns.TO_DATE.value], infer_datetime_format=True
).dt.tz_localize(pytz.UTC)

return df

@abstractmethod
def _all(self) -> pd.DataFrame:
"""
Abstract method for gathering of sites information for a given implementation.
Information consist of a DataFrame with station ids, location, name, etc
:return: pandas.DataFrame with the information of different available sites
"""
pass

def nearby_number(
self,
latitude: float,
longitude: float,
num_stations_nearby: int,
) -> pd.DataFrame:
"""
Wrapper for get_nearby_stations_by_number using the given parameter set. Returns
nearest stations defined by number.
:param latitude: latitude in degrees
:param longitude: longitude in degrees
:param num_stations_nearby: number of stations to be returned, greater 0
:return: pandas.DataFrame with station information for the selected stations
"""
if num_stations_nearby <= 0:
raise ValueError("'num_stations_nearby' has to be at least 1.")

coords = Coordinates(np.array(latitude), np.array(longitude))

metadata = self.all()

metadata = metadata.reset_index(drop=True)

distances, indices_nearest_neighbours = derive_nearest_neighbours(
metadata.LAT.values, metadata.LON.values, coords, num_stations_nearby
)

distances = pd.Series(distances)
indices_nearest_neighbours = pd.Series(indices_nearest_neighbours)

# If num_stations_nearby is higher then the actual amount of stations
# further indices and distances are added which have to be filtered out
distances = distances[: min(metadata.shape[0], num_stations_nearby)]
indices_nearest_neighbours = indices_nearest_neighbours[
: min(metadata.shape[0], num_stations_nearby)
]

distances_km = np.array(distances * EARTH_RADIUS_KM)

metadata_location = metadata.iloc[indices_nearest_neighbours, :].reset_index(
drop=True
)

metadata_location[DWDMetaColumns.DISTANCE_TO_LOCATION.value] = distances_km

if metadata_location.empty:
log.warning(
f"No weather stations were found for coordinate "
f"{latitude}°N and {longitude}°E "
)

return metadata_location

def nearby_radius(
self,
latitude: float,
longitude: float,
max_distance_in_km: int,
) -> pd.DataFrame:
"""
Wrapper for get_nearby_stations_by_distance using the given parameter set.
Returns nearest stations defined by distance (km).
:param latitude: latitude in degrees
:param longitude: longitude in degrees
:param max_distance_in_km: distance (km) for which stations will be selected
:return: pandas.DataFrame with station information for the selected stations
"""
# Theoretically a distance of 0 km is possible
if max_distance_in_km < 0:
raise ValueError("'max_distance_in_km' has to be at least 0.0.")

metadata = self.all()

all_nearby_stations = self.nearby_number(latitude, longitude, metadata.shape[0])

nearby_stations_in_distance = all_nearby_stations[
all_nearby_stations[DWDMetaColumns.DISTANCE_TO_LOCATION.value]
<= max_distance_in_km
]

return nearby_stations_in_distance.reset_index(drop=True)

0 comments on commit 9d381a7

Please sign in to comment.