Skip to content

Commit

Permalink
Merge ac8911e into 902d60a
Browse files Browse the repository at this point in the history
  • Loading branch information
CDJellen committed Jun 12, 2024
2 parents 902d60a + ac8911e commit 3219913
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 28,777 deletions.
34 changes: 21 additions & 13 deletions ndbc_api/api/handlers/stations.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def nearest_station(
raise ParserException from e
closest = closest.to_dict().get('Station', {'UNK': 'UNK'})
return list(closest.values())[0]

@classmethod
def radial_search(
cls,
Expand All @@ -67,15 +67,16 @@ def radial_search(
) -> pd.DataFrame:
"""Get stations within <radius> of the specified lat/lon."""
if units not in cls.UNITS:
raise ValueError(f'Invalid unit: {units}, must be one of {cls.UNITS}.')
raise ValueError(
f'Invalid unit: {units}, must be one of {cls.UNITS}.')
if radius < 0:
raise ValueError(f'Invalid radius: {radius}, must be non-negative.')
# pass the radius in km
if units == 'nm':
radius = radius * 1.852
elif units == 'mi':
radius = radius * 1.60934

df = cls.stations(handler=handler)
if isinstance(lat, str):
lat = StationsHandler.LAT_MAP(lat)
Expand Down Expand Up @@ -122,14 +123,15 @@ def historical(cls, handler: Any,
return HistoricalParser.available_measurements(resp)

""" PRIVATE """

def _distance(lat_a: float, lon_a: float, lat_b: float,
lon_b: float) -> float:
haversine = (0.5 - cos(
(lat_b - lat_a) * StationsHandler.DEG_TO_RAD) / 2 +
cos(lat_a * StationsHandler.DEG_TO_RAD) *
cos(lat_b * StationsHandler.DEG_TO_RAD) * (1 - cos(
(lon_b - lon_a) * StationsHandler.DEG_TO_RAD)) / 2)
return StationsHandler.DIAM_OF_EARTH * asin(sqrt(haversine))
lon_b: float) -> float:
haversine = (0.5 - cos(
(lat_b - lat_a) * StationsHandler.DEG_TO_RAD) / 2 +
cos(lat_a * StationsHandler.DEG_TO_RAD) *
cos(lat_b * StationsHandler.DEG_TO_RAD) * (1 - cos(
(lon_b - lon_a) * StationsHandler.DEG_TO_RAD)) / 2)
return StationsHandler.DIAM_OF_EARTH * asin(sqrt(haversine))

@staticmethod
def _nearest(df: pd.DataFrame, lat_a: float, lon_a: float):
Expand All @@ -140,17 +142,23 @@ def _nearest(df: pd.DataFrame, lat_a: float, lon_a: float):
StationsHandler.LAT_MAP(r[0].split(' ')[0]),
StationsHandler.LON_MAP(r[0].split(' ')[1]),
) for idx, r in enumerate(ls)]
closest = min(ls, key=lambda p: StationsHandler._distance(lat_a, lon_a, p[1], p[2]))
closest = min(
ls,
key=lambda p: StationsHandler._distance(lat_a, lon_a, p[1], p[2]))
return df.iloc[[closest[0]]]

@staticmethod
def _radial_search(df: pd.DataFrame, lat_a: float, lon_a: float, radius: float):
def _radial_search(df: pd.DataFrame, lat_a: float, lon_a: float,
radius: float):
"""Get the stations within radius km from specified `float`-valued lat/lon."""
ls = list(df[['Location Lat/Long']].to_records(index=False))
ls = [(
idx,
StationsHandler.LAT_MAP(r[0].split(' ')[0]),
StationsHandler.LON_MAP(r[0].split(' ')[1]),
) for idx, r in enumerate(ls)]
stations = [p for p in ls if StationsHandler._distance(lat_a, lon_a, p[1], p[2]) <= radius]
stations = [
p for p in ls
if StationsHandler._distance(lat_a, lon_a, p[1], p[2]) <= radius
]
return df.iloc[[p[0] for p in stations]]
6 changes: 3 additions & 3 deletions ndbc_api/api/parsers/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _read_response(cls, response: dict,
names = cls.REVERT_COL_NAMES
if '(' in data[0]:
data = cls._clean_data(data)

try:
parse_dates = False
date_format = None
Expand All @@ -65,10 +65,10 @@ def _read_response(cls, response: dict,
)
if use_timestamp:
df.index.name = 'timestamp'

except (NotImplementedError, TypeError, ValueError) as e:
return pd.DataFrame()

# check whether to parse dates
return df

Expand Down
9 changes: 4 additions & 5 deletions ndbc_api/api/requests/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,16 @@ def req_hist_helper_month_current(current_month: int) -> str:
min(int(current_year),
int(end_time.year) + 1)):
reqs.append(req_hist_helper_year(hist_year))

# handle month requests
if end_time.year == months_req_year:
for hist_month in range(
int(start_time.month),
min(int(end_time.month), int(last_avail_month))+1
):
min(int(end_time.month), int(last_avail_month)) + 1):
reqs.append(req_hist_helper_month(months_req_year, hist_month))
if int(last_avail_month) <= (end_time.month):
reqs.append(
req_hist_helper_month_current(int(last_avail_month)))
reqs.append(req_hist_helper_month_current(
int(last_avail_month)))

if has_realtime:
reqs.append(
Expand Down
80 changes: 42 additions & 38 deletions ndbc_api/ndbc_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def nearest_station(
nearest_station = self._stations_api.nearest_station(
handler=self._handler, lat=lat, lon=lon)
return nearest_station

def radial_search(
self,
lat: Union[str, float, None] = None,
Expand Down Expand Up @@ -430,7 +430,7 @@ def get_data(
raise ValueError('Both `mode` and `modes` are `None`.')
if mode is not None and modes is not None:
raise ValueError('`mode` and `modes` cannot both be specified.')

handle_station_ids: List[Union[int, str]] = []
handle_modes: List[str] = []
station_id_as_column: bool = True if station_id is None else False
Expand All @@ -443,44 +443,46 @@ def get_data(
handle_modes.append(mode)
if modes is not None:
handle_modes.extend(modes)

# accumulated_data records the handled response and parsed station_id
# as a tuple, with the data as the first value and the id as the second.
accumulated_data: List[Tuple[Union[pd.DataFrame, dict], str]] = []
with ThreadPoolExecutor(max_workers=len(handle_station_ids) * len(handle_modes)) as executor:
with ThreadPoolExecutor(max_workers=len(handle_station_ids) *
len(handle_modes)) as executor:
futures = [
executor.submit(
self._handle_get_data,
station_id=station_id,
mode=mode,
start_time=start_time,
end_time=end_time,
use_timestamp=use_timestamp,
as_df=as_df,
cols=cols
)
for station_id, mode in itertools.product(handle_station_ids, handle_modes)
executor.submit(self._handle_get_data,
station_id=station_id,
mode=mode,
start_time=start_time,
end_time=end_time,
use_timestamp=use_timestamp,
as_df=as_df,
cols=cols)
for station_id, mode in itertools.product(
handle_station_ids, handle_modes)
]
for future in futures:
try:
data = future.result()
accumulated_data.append(data)
except (RequestException, ResponseException, HandlerException) as e:
except (RequestException, ResponseException,
HandlerException) as e:
self.log.error(
f"Failed to process request for station_id {station_id} "
f"and mode {mode} with error: {e}"
)
f"and mode {mode} with error: {e}")
continue

# check that we have some response
if len(accumulated_data) == 0:
raise ResponseException(f'No data was returned for station_ids {handle_station_ids} '
f'and modes {handle_modes}')
raise ResponseException(
f'No data was returned for station_ids {handle_station_ids} '
f'and modes {handle_modes}')
# handle the default case where a single station_id and mode are specified
if len(accumulated_data) == 1:
return accumulated_data[0][0]
# handle the case where multiple station_ids and modes are specified
return self._handle_accumulate_data(accumulated_data, station_id_as_column)
return self._handle_accumulate_data(accumulated_data,
station_id_as_column)

def get_modes(self):
"""Get the list of supported modes for `get_data(...)`."""
Expand Down Expand Up @@ -563,15 +565,14 @@ def _handle_data(data: pd.DataFrame,
'Failed to convert `pd.DataFrame` to `dict`.') from e
else:
return data

@staticmethod
def _handle_accumulate_data(
accumulated_data: List[Tuple[Union[pd.DataFrame, dict], str]],
station_id_as_column: bool = False
) -> Union[pd.DataFrame, dict]:
accumulated_data: List[Tuple[Union[pd.DataFrame, dict], str]],
station_id_as_column: bool = False) -> Union[pd.DataFrame, dict]:
return_as_df = isinstance(accumulated_data[0][0], pd.DataFrame)
data: Union[List[pd.DataFrame], dict] = [] if return_as_df else {}

for d in accumulated_data:
if return_as_df:
if station_id_as_column:
Expand All @@ -582,7 +583,9 @@ def _handle_accumulate_data(
if station_id_as_column:
# the keys need to be updated to include the station_id
# as a prefix before we update the `data` dict.
d_data = {f'{d_station_id}_{k}': v for k, v in d_data.items()}
d_data = {
f'{d_station_id}_{k}': v for k, v in d_data.items()
}
# the keys across modes should be unique so a simple update
# is sufficient.
data.update(d_data)
Expand All @@ -592,15 +595,14 @@ def _handle_accumulate_data(
return data

def _handle_get_data(
self,
mode: str,
station_id: str,
start_time: datetime,
end_time: datetime,
use_timestamp: bool,
as_df: bool = True,
cols: List[str] = None
) -> Tuple[Union[pd.DataFrame, dict], str]:
self,
mode: str,
station_id: str,
start_time: datetime,
end_time: datetime,
use_timestamp: bool,
as_df: bool = True,
cols: List[str] = None) -> Tuple[Union[pd.DataFrame, dict], str]:
start_time = self._handle_timestamp(start_time)
end_time = self._handle_timestamp(end_time)
station_id = self._parse_station_id(station_id)
Expand All @@ -617,14 +619,16 @@ def _handle_get_data(
use_timestamp,
)
except (ResponseException, ValueError, TypeError, KeyError) as e:
raise ResponseException(f'Failed to handle API call.\nRaised from {e}') from e
raise ResponseException(
f'Failed to handle API call.\nRaised from {e}') from e
if use_timestamp:
data = self._enforce_timerange(df=data,
start_time=start_time,
end_time=end_time)
try:
handled_data = self._handle_data(data, as_df, cols)
except (ValueError, KeyError, AttributeError) as e:
raise ParserException(f'Failed to handle returned data.\nRaised from {e}') from e
raise ParserException(
f'Failed to handle returned data.\nRaised from {e}') from e

return (handled_data, station_id)
Loading

0 comments on commit 3219913

Please sign in to comment.