Skip to content

Commit

Permalink
Merge pull request #30 from CDJellen/issues/feature/29
Browse files Browse the repository at this point in the history
Expand `get_data` to support multiple stations or modes in the same query.
  • Loading branch information
CDJellen committed Jun 9, 2024
2 parents 664bee8 + b3cd3d0 commit acf79ee
Show file tree
Hide file tree
Showing 6 changed files with 28,562 additions and 115 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,21 @@ wspd_df = api.get_data(
as_df=True,
cols=['WSPD']
)
# get all standard meterological measurements for stations tplm2 and apam2
stdmet_df = api.get_data(
station_ids=['tplm2', 'apam2'],
mode='stdmet',
start_time='2022-01-01',
end_time='2023-01-01',
)
# get all (available) continuous wind and standard meterological measurements for stations tplm2 and apam2
# for station apam2, this is unavailable and will log an error but not affect the rest of the results.
stdmet_df = api.get_data(
station_ids=['tplm2', 'apam2'],
modes=['stdmet', 'cwind'],
start_time='2022-01-01',
end_time='2023-01-01',
)
```

#### More Information
Expand Down
164 changes: 133 additions & 31 deletions ndbc_api/ndbc_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@
handler.
"""
import logging
import itertools
import pickle
import warnings
from datetime import datetime, timedelta
from typing import Any, List, Union
from typing import Any, List, Tuple, Union

import pandas as pd

Expand Down Expand Up @@ -360,30 +361,37 @@ def available_historical(self,

def get_data(
self,
station_id: Union[int, str],
mode: str,
station_id: Union[int, str, None] = None,
mode: Union[str, None] = None,
start_time: Union[str, datetime] = datetime.now() - timedelta(days=30),
end_time: Union[str, datetime] = datetime.now(),
use_timestamp: bool = True,
as_df: bool = True,
cols: List[str] = None,
station_ids: Union[List[Union[int, str]], None] = None,
modes: Union[List[str], None] = None,
) -> Union[pd.DataFrame, dict]:
"""Execute data query against the specified NDBC station.
"""Execute data query against the specified NDBC station(s).
Query the NDBC data service for station-level measurements, using the
`mode` parameter to determine the measurement type (e.g. `'stdmet'` for
standard meterological data or `'cwind'` for continuous winds data). The
time range and data columns of interest may also be specified, such that
a tailored set of requests are executed against the NDBC data service to
generate a single `pandas.DataFrame` or `dict` matching the conditions
specified in the method call.
specified in the method call. When calling `get_data` with `station_ids`
the station identifier is added as a column to the returned data.
Args:
station_id: The NDBC station ID (e.g. `'tplm2'` or `41001`) for the
station of interest.
station_ids: A list of NDBC station IDs (e.g. `['tplm2', '41001']`)
for the stations of interest.
mode: The data measurement type to query for the station (e.g.
`'stdmet'` for standard meterological data or `'cwind'` for
continuous winds data).
modes: A list of data measurement types to query for the stations
(e.g. `['stdmet', 'cwind']`).
start_time: The first timestamp of interest (in UTC) for the data
query, defaulting to 30 days before the current system time.
end_time: The last timestamp of interest (in UTC) for the data
Expand All @@ -398,42 +406,71 @@ def get_data(
returned. All columns are returned if `None` is specified.
Returns:
The available station measurements for the specified mode, time
The available station(s) measurements for the specified modes, time
range, and columns, either as a `dict` or as a `pandas.DataFrame`
if the `as_df` flag is set to `True`.
Raises:
ValueError: Both `station_id` and `station_ids` are `None`, or both
are not `None`. This is also raised if `mode` and `modes` are
`None`, or both are not `None`
RequestException: The specified mode is not available.
ResponseException: There was an error in executing and parsing the
required requests against the NDBC data service.
HandlerException: There was an error in handling the returned data
as a `dict` or `pandas.DataFrame`.
"""
start_time = self._handle_timestamp(start_time)
end_time = self._handle_timestamp(end_time)
station_id = self._parse_station_id(station_id)
data_api_call = getattr(self._data_api, mode, None)
if not data_api_call:
raise RequestException(
'Please supply a supported mode from `get_modes()`.')
try:
data = data_api_call(
self._handler,
station_id,
start_time,
end_time,
use_timestamp,
)
except (ResponseException, ValueError, TypeError, KeyError) as e:
raise ResponseException('Failed to handle API call.') from e
if use_timestamp:
data = self._enforce_timerange(df=data,
start_time=start_time,
end_time=end_time)
try:
return self._handle_data(data, as_df, cols)
except (ValueError, KeyError, AttributeError) as e:
raise ParserException('Failed to handle returned data.') from e
if station_id is None and station_ids is None:
raise ValueError('Both `station_id` and `station_ids` are `None`.')
if station_id is not None and station_ids is not None:
raise ValueError('`station_id` and `station_ids` cannot both be '
'specified.')
if mode is None and modes is None:
raise ValueError('Both `mode` and `modes` are `None`.')
if mode is not None and modes is not None:
raise ValueError('`mode` and `modes` cannot both be specified.')

handle_station_ids: List[Union[int, str]] = []
handle_modes: List[str] = []
station_id_as_column: bool = True if station_id is None else False

if station_id is not None:
handle_station_ids.append(station_id)
if station_ids is not None:
handle_station_ids.extend(station_ids)
if mode is not None:
handle_modes.append(mode)
if modes is not None:
handle_modes.extend(modes)

# accumulated_data records the handled response and parsed station_id
# as a tuple, with the data as the first value and the id as the second.
accumulated_data: List[Tuple[Union[pd.DataFrame, dict], str]] = []
for station_id, mode in itertools.product(handle_station_ids, handle_modes):
try:
data = self._handle_get_data(
station_id=station_id,
mode=mode,
start_time=start_time,
end_time=end_time,
use_timestamp=use_timestamp,
as_df=as_df,
cols=cols
)
accumulated_data.append(data)
except (RequestException, ResponseException, HandlerException) as e:
self.log.error(f'Failed to process request for station_id {station_id} '
f'and mode {mode} with error: {e}')
continue
# check that we have some response
if len(accumulated_data) == 0:
raise ResponseException(f'No data was returned for station_ids {handle_station_ids} '
f'and modes {handle_modes}')
# handle the default case where a single station_id and mode are specified
if len(accumulated_data) == 1:
return accumulated_data[0][0]
# handle the case where multiple station_ids and modes are specified
return self._handle_accumulate_data(accumulated_data, station_id_as_column)

def get_modes(self):
"""Get the list of supported modes for `get_data(...)`."""
Expand Down Expand Up @@ -516,3 +553,68 @@ def _handle_data(data: pd.DataFrame,
'Failed to convert `pd.DataFrame` to `dict`.') from e
else:
return data

@staticmethod
def _handle_accumulate_data(
accumulated_data: List[Tuple[Union[pd.DataFrame, dict], str]],
station_id_as_column: bool = False
) -> Union[pd.DataFrame, dict]:
return_as_df = isinstance(accumulated_data[0][0], pd.DataFrame)
data: Union[List[pd.DataFrame], dict] = [] if return_as_df else {}

for d in accumulated_data:
if return_as_df:
if station_id_as_column:
d[0].insert(0, 'station_id', d[1])
data.append(d[0])
else:
d_data, d_station_id = d[0], d[1]
if station_id_as_column:
# the keys need to be updated to include the station_id
# as a prefix before we update the `data` dict.
d_data = {f'{d_station_id}_{k}': v for k, v in d_data.items()}
# the keys across modes should be unique so a simple update
# is sufficient.
data.update(d_data)

if return_as_df:
return pd.concat(data)
return data

def _handle_get_data(
self,
mode: str,
station_id: str,
start_time: datetime,
end_time: datetime,
use_timestamp: bool,
as_df: bool = True,
cols: List[str] = None
) -> Tuple[Union[pd.DataFrame, dict], str]:
start_time = self._handle_timestamp(start_time)
end_time = self._handle_timestamp(end_time)
station_id = self._parse_station_id(station_id)
data_api_call = getattr(self._data_api, mode, None)
if not data_api_call:
raise RequestException(
'Please supply a supported mode from `get_modes()`.')
try:
data = data_api_call(
self._handler,
station_id,
start_time,
end_time,
use_timestamp,
)
except (ResponseException, ValueError, TypeError, KeyError) as e:
raise ResponseException(f'Failed to handle API call.\nRaised from {e}') from e
if use_timestamp:
data = self._enforce_timerange(df=data,
start_time=start_time,
end_time=end_time)
try:
handled_data = self._handle_data(data, as_df, cols)
except (ValueError, KeyError, AttributeError) as e:
raise ParserException(f'Failed to handle returned data.\nRaised from {e}') from e

return (handled_data, station_id)
Loading

0 comments on commit acf79ee

Please sign in to comment.