Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace ARM GHI with Best Estimate GHI #716

Merged
merged 12 commits into from Sep 16, 2021
10 changes: 10 additions & 0 deletions docs/source/whatsnew/1.0.5.rst
Expand Up @@ -12,6 +12,9 @@ Enhancements
by environment varable. This was motivated by timeouts caused when an
observation was suddenly provided a large amount of data and we attempted
to create catch up forecasts. (:issue:`727`, :pull:`727`)
* Replace ARM GHI observation with ARM Best Estimate GHI observation. The
best estimate is derived from DNI and DHI observations, GHI observations,
and a rotating shadow band observation. (:pull:`716`)

Fixed
~~~~~
Expand All @@ -29,6 +32,13 @@ Fixed
one file per observation or forecast. (:issue:`707`, :issue:`713`, :pull:`722`)
* Removed html line break tags from report timeseries csv headers.
(:issue:`706`, :pull:`722`)
* Handle files with a single time from ARM that cause NetCDF to crash.
(:pull:`716`)
* Handle duplicated times from ARM files by keeping only the last point.
(:pull:`716`)
* Handle duplicated times when posting reference data to API by keeping
only the last point. This is known to occur in ARM and NREL MIDC data
feeds. (:pull:`716`)

Contributors
~~~~~~~~~~~~
Expand Down
49 changes: 45 additions & 4 deletions solarforecastarbiter/io/fetch/arm.py
Expand Up @@ -8,6 +8,7 @@
import pandas as pd
import requests
import time
from urllib3 import Retry


logger = logging.getLogger(__name__)
Expand All @@ -19,8 +20,14 @@
# These lists are the commonly available irradiance and meteorological
# variables found in ARM data. Users can import and pass these to fetch_arm
# to parse out these variables.
IRRAD_VARIABLES = ['down_short_hemisp', 'down_short_diffuse_hemisp',
'short_direct_normal']
# We use 'BestEstimate_down_short_hemisp' instead of 'down_short_hemisp'. The
# best estimate has additional QA and is filled by 'MFRSR_hemisp_broadband'
# when needed.
IRRAD_VARIABLES = [
'BestEstimate_down_short_hemisp',
'down_short_diffuse_hemisp',
'short_direct_normal',
]
MET_VARIABLES = ['temp_mean', 'rh_mean', 'wspd_arith_mean']


Expand Down Expand Up @@ -113,11 +120,31 @@ def request_arm_file(user_id, api_key, filename, retries=5):
request.exceptions.ChunkedEncodingError
Reraises this error when all retries are exhausted.
"""
max_retries = Retry(
total=10,
connect=3,
read=3,
status=3,
status_forcelist=[
408, 423, 444, 500, 501, 502, 503, 504, 507, 508, 511, 599,
],
backoff_factor=0.5,
raise_on_status=False,
remove_headers_on_redirect=[]
)
adapter = requests.adapters.HTTPAdapter(max_retries=max_retries)
s = requests.Session()
s.mount('https://', adapter)

params = {'user': f'{user_id}:{api_key}',
'file': filename}

try:
request = requests.get(ARM_FILES_DOWNLOAD_URL, params=params)
request = s.get(
ARM_FILES_DOWNLOAD_URL,
params=params,
timeout=(10, 60)
)
except requests.exceptions.ChunkedEncodingError:
if retries > 0:
logger.debug(f'Retrying DOE ARM file {filename}: {retries}'
Expand Down Expand Up @@ -220,6 +247,10 @@ def fetch_arm(user_id, api_key, datastream, variables, start, end):
found, an empty DataFrame will be returned. Users should verify the
contents of the return value before use.

Occassionally ARM API returns multiple files that contain the same
valid time. This function keeps only the last occurance of the data
at a given time.

Example
-------
A user requesting data for the variables 'down_short_hemisp' and
Expand All @@ -245,11 +276,21 @@ def fetch_arm(user_id, api_key, datastream, variables, start, end):
nc_file = retrieve_arm_dataset(user_id, api_key, filename)
except requests.exceptions.ChunkedEncodingError:
logger.error(f'Request failed for DOE ARM file {filename}')
except PermissionError:
# occurs when there's only one data point in a file
# https://github.com/Unidata/netcdf4-python/issues/1125
logger.error(f'PermissionError in reading {filename}')
else:
datastream_df = extract_arm_variables(nc_file, variables)
datastream_dfs.append(datastream_df)
if len(datastream_dfs) > 0:
new_data = pd.concat(datastream_dfs)
return new_data
index = new_data.index.duplicated(keep='last')
if index.sum():
logger.warning(
'Duplicate index values in %s. Keeping last.', datastream
)
data = new_data[~index]
return data
else:
return pd.DataFrame()
Binary file not shown.
Binary file not shown.
51 changes: 42 additions & 9 deletions solarforecastarbiter/io/fetch/tests/test_arm.py
@@ -1,8 +1,9 @@
import inspect
import os

from pathlib import Path

from netCDF4 import Dataset
import numpy as np
import pandas as pd
import pytest
from requests.exceptions import ChunkedEncodingError
Expand All @@ -13,10 +14,9 @@

TEST_DATA_DIR = os.path.dirname(
os.path.abspath(inspect.getfile(inspect.currentframe())))
MET_FILE = os.path.join(TEST_DATA_DIR, 'data',
'sgpmetE13.b1.20190122.000000.cdf')
IRRAD_FILE = os.path.join(TEST_DATA_DIR, 'data',
'sgpqcrad1longC1.c1.20190122.000000.cdf')
TEST_DATA_DIR = Path(TEST_DATA_DIR) / 'data'
MET_FILE = TEST_DATA_DIR / 'sgpmetE13.b1.20190122.000000.cdf'
IRRAD_FILE = TEST_DATA_DIR / 'sgpqcrad1longC1.c1.20190122.000000.cdf'


test_datastreams = ['ds_1', 'ds_2']
Expand Down Expand Up @@ -77,6 +77,31 @@ def test_fetch_arm(user_id, api_key, stream, variables, start, end, mocker):
assert variables[0] in data.columns


def test_fetch_arm_duplicates(user_id, api_key, mocker, caplog):
start = pd.Timestamp('20180130T0000Z')
end = pd.Timestamp('20180130T235900Z')
datastream = 'sgpmetE15.b1'

# wrap list in a function so that Mock.side_effect interprets as a single
# return value instead of multiple iterations
def request_filenames(*args):
return [
'gpmetE15.b1.20180130.000000.cdf',
'gpmetE15.b1.20180130.200000.cdf',
]
datasets = [Dataset(TEST_DATA_DIR / Path(f)) for f in request_filenames()]
mocker.patch('solarforecastarbiter.io.fetch.arm.list_arm_filenames',
side_effect=request_filenames)
mocker.patch('solarforecastarbiter.io.fetch.arm.retrieve_arm_dataset',
side_effect=datasets)
variables = ['temp_mean', 'rh_mean', 'wspd_arith_mean']
data = arm.fetch_arm(user_id, api_key, datastream, variables, start, end)
assert 'Duplicate index values' in caplog.text
assert not np.any(data.index.duplicated())
assert data.index[0] == start
assert data.index[-1] == end


@pytest.mark.parametrize('stream,start,end', [
('datastream', start_date, end_date)
])
Expand All @@ -96,14 +121,18 @@ def test_request_file_lists(user_id, api_key, stream, start, end, mocker):


def test_request_arm_file(user_id, api_key, mocker):
mocked_get = mocker.patch('solarforecastarbiter.io.fetch.arm.requests.get')
mocked_get = mocker.patch(
'solarforecastarbiter.io.fetch.arm.requests.Session.get'
)
arm.request_arm_file(user_id, api_key, 'sgpqcrad1longC1.c1.cdf')
mocked_get.assert_called_with(
arm.ARM_FILES_DOWNLOAD_URL,
params={
'user': f'{user_id}:{api_key}',
'file': 'sgpqcrad1longC1.c1.cdf',
})
},
timeout=(10, 60)
)


def test_extract_arm_variables_exist(mocker):
Expand Down Expand Up @@ -134,7 +163,9 @@ def test_no_files(user_id, api_key, mocker):

@pytest.mark.parametrize('num_failures', range(1, 5))
def test_request_arm_file_retries(mocker, num_failures):
mocked_get = mocker.patch('solarforecastarbiter.io.fetch.arm.requests.get')
mocked_get = mocker.patch(
'solarforecastarbiter.io.fetch.arm.requests.Session.get'
)
return_values = (d for d in [0, num_failures])

def get_response(*args, **kwargs):
Expand All @@ -152,7 +183,9 @@ def get_response(*args, **kwargs):


def test_request_arm_file_failure_after_retries(mocker):
mocked_get = mocker.patch('solarforecastarbiter.io.fetch.arm.requests.get')
mocked_get = mocker.patch(
'solarforecastarbiter.io.fetch.arm.requests.Session.get'
)
mocked_get.side_effect = ChunkedEncodingError
with pytest.raises(ChunkedEncodingError):
arm.request_arm_file('user', 'ley', 'filename')
Expand Down
2 changes: 1 addition & 1 deletion solarforecastarbiter/io/reference_observations/arm.py
Expand Up @@ -25,7 +25,7 @@
}

DOE_ARM_VARIABLE_MAP = {
'down_short_hemisp': 'ghi',
'BestEstimate_down_short_hemisp': 'ghi',
'short_direct_normal': 'dni',
'down_short_diffuse_hemisp': 'dhi',
'temp_mean': 'air_temperature',
Expand Down
Expand Up @@ -9,7 +9,9 @@
"use that datastream for in format <start>/<end> where ",
"the value 'now' can be used as the <end> value to indicate ",
"currently available data. When configuring multiple ",
"datastreams, their date ranges should *not* overlap."
"datastreams, their date ranges should *not* overlap.",
"Data is typically cycled from .c1 files into .c2 files on ",
"a yearly basis when instruments are replaced and calibrated."
],
"sites": [
{
Expand Down
6 changes: 6 additions & 0 deletions solarforecastarbiter/io/reference_observations/common.py
Expand Up @@ -324,6 +324,12 @@ def _prepare_data_to_post(data, variable, observation, start, end,
to prepare for posting"""
data = data[[variable]]
data = data.rename(columns={variable: 'value'})

# In rare cases, times are repeated within or across files.
# We assume the last data is the most recent and thus the best data.
index = data.index.duplicated(keep='last')
data = data[~index]

# ensure data is sorted before slicing and for optimal order in the
# database
data = data.sort_index()
Expand Down
Expand Up @@ -383,6 +383,23 @@ def test_prepare_data_to_post_offset():
pd.testing.assert_frame_equal(out, expected)


def test_prepare_data_to_post_duplicated():
index = pd.DatetimeIndex([
'2019-01-01 00:00Z', '2019-01-01 00:00Z', '2019-01-01 00:01Z'
])
inp = pd.DataFrame(
{'ghi': [0., 1., 2.]}, index=index)
start = pd.Timestamp('2019-01-01T0000Z')
end = pd.Timestamp('2019-01-01T0001Z')
variable = 'ghi'
expected = pd.DataFrame(
{'value': [1., 2.], 'quality_flag': [0, 0]},
index=pd.date_range(index[0], freq='1min', periods=2))
out = common._prepare_data_to_post(inp, variable, test_kwarg_observation,
start, end)
pd.testing.assert_frame_equal(out, expected)


def test_prepare_data_to_post_empty():
inp = pd.DataFrame(
{'ghi': [0.0]}, index=pd.DatetimeIndex(['2019-01-01 00:00Z']))
Expand Down