Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions activestorage/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ def __init__(self,
interface_type: str = None,
max_threads: int = 100,
storage_options: dict = None,
active_storage_url: str = None) -> None:
active_storage_url: str = None,
option_disable_chunk_cache: bool = False) -> None:
"""
Instantiate with a NetCDF4 dataset URI and the variable of interest within that file.
(We need the variable, because we need variable specific metadata from within that
Expand All @@ -199,6 +200,7 @@ def __init__(self,

:param storage_options: s3fs.S3FileSystem options
:param active_storage_url: Reductionist server URL
:param option_disable_chunk_cache: flag to disable chunk caching
"""
self.ds = None
input_variable = False
Expand Down Expand Up @@ -256,6 +258,9 @@ def __init__(self,
self.storage_options = storage_options
self.active_storage_url = active_storage_url

# turn off chunk caching
self.option_disable_chunk_cache = option_disable_chunk_cache

# basic check on file
if not input_variable:
if not os.path.isfile(self.uri) and not self.interface_type:
Expand Down Expand Up @@ -660,6 +665,9 @@ def _process_chunk(self,
# Axes over which to apply a reduction
axis = self._axis

# turn off chunk caching
chunk_caching = self.option_disable_chunk_cache

if self.interface_type == 's3' and self._version == 1:
tmp, count = reduce_opens3_chunk(ds._fh,
offset,
Expand Down Expand Up @@ -702,7 +710,8 @@ def _process_chunk(self,
ds._order,
chunk_selection,
axis,
operation=self._method)
operation=self._method,
option_disable_chunk_cache=chunk_caching,)
else:
if self.storage_options.get("anon", None) is True:
bucket = os.path.dirname(parsed_url.path)
Expand All @@ -722,7 +731,8 @@ def _process_chunk(self,
ds._order,
chunk_selection,
axis,
operation=self._method)
operation=self._method,
option_disable_chunk_cache=chunk_caching,)
elif self.interface_type == "https" and self._version == 2:
tmp, count = reductionist.reduce_chunk(session,
self.active_storage_url,
Expand All @@ -738,7 +748,8 @@ def _process_chunk(self,
chunk_selection,
axis,
operation=self._method,
interface_type="https")
interface_type="https",
option_disable_chunk_cache=chunk_caching,)

elif self.interface_type == 'ActivePosix' and self.version == 2:
# This is where the DDN Fuse and Infinia wrappers go
Expand Down
12 changes: 9 additions & 3 deletions activestorage/reductionist.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def reduce_chunk(session,
chunk_selection,
axis,
operation,
interface_type=None):
interface_type=None,
option_disable_chunk_cache=False):
"""Perform a reduction on a chunk using Reductionist.

:param server: Reductionist server URL
Expand All @@ -71,6 +72,7 @@ def reduce_chunk(session,
:param axis: tuple of the axes to be reduced (non-negative integers)
:param operation: name of operation to perform
:param interface_type: optional testing flag to allow HTTPS reduction
:param option_disable_chunk_cache: optional turn off chunk cache
:returns: the reduced data as a numpy array or scalar
:raises ReductionistError: if the request to Reductionist fails
"""
Expand All @@ -86,7 +88,8 @@ def reduce_chunk(session,
order,
chunk_selection,
axis,
interface_type=interface_type)
interface_type=interface_type,
option_disable_chunk_cache=option_disable_chunk_cache)
if DEBUG:
print(f"Reductionist request data dictionary: {request_data}")
api_operation = "sum" if operation == "mean" else operation or "select"
Expand Down Expand Up @@ -184,7 +187,8 @@ def build_request_data(url: str,
order,
selection,
axis,
interface_type=None) -> dict:
interface_type=None,
option_disable_chunk_cache=False) -> dict:
"""Build request data for Reductionist API."""
request_data = {
'interface_type': interface_type if interface_type else "s3",
Expand All @@ -208,6 +212,8 @@ def build_request_data(url: str,
request_data["filters"] = encode_filters(filters)
if any(missing):
request_data["missing"] = encode_missing(missing)
if option_disable_chunk_cache:
request_data["option_disable_chunk_cache"] = True

if axis is not None:
request_data['axis'] = axis
Expand Down
3 changes: 2 additions & 1 deletion activestorage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ def reduce_chunk(rfile,
order,
chunk_selection,
axis,
method=None):
method=None,
option_disable_chunk_cache=False,):
"""
We do our own read of chunks and decoding etc

Expand Down
16 changes: 11 additions & 5 deletions tests/test_real_https.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,17 @@ def test_https():
# v2: declared storage type
active = Active(test_file_uri, "ta",
interface_type="https",
active_storage_url=active_storage_url)
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)
active._version = 2
result = active.min()[0:3, 4:6, 7:9]
print("Result is", result)
assert result == np.array([220.3180694580078], dtype="float32")

# v2: inferred storage type
active = Active(test_file_uri, "ta",
active_storage_url=active_storage_url)
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)
active._version = 2
result = active.min()[0:3, 4:6, 7:9]
print("Result is", result)
Expand All @@ -57,7 +59,8 @@ def test_https():
# v2: inferred storage type, pop axis
active = Active(test_file_uri, "ta",
interface_type="https",
active_storage_url=active_storage_url)
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)
active._version = 2
result = active.min(axis=(0, 1))[:]
print("Result is", result)
Expand All @@ -84,7 +87,8 @@ def test_https():
active = Active(test_file_uri, "ta",
interface_type="https",
storage_options={"username": None, "password": None},
active_storage_url=active_storage_url)
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)
active._version = 2
result = active.min(axis=(0, 1))[:]
print("Result is", result)
Expand Down Expand Up @@ -137,7 +141,9 @@ def test_https_bigger_file():
"""Run a true test with a https FILE."""
test_file_uri = "https://esgf.ceda.ac.uk/thredds/fileServer/esg_cmip6/CMIP6/AerChemMIP/MOHC/UKESM1-0-LL/ssp370SST-lowNTCF/r1i1p1f2/Amon/cl/gn/latest/cl_Amon_UKESM1-0-LL_ssp370SST-lowNTCF_r1i1p1f2_gn_205001-209912.nc"
active_storage_url = "https://reductionist.jasmin.ac.uk/" # Wacasoft new Reductionist
active = Active(test_file_uri, "cl", active_storage_url=active_storage_url)
active = Active(test_file_uri, "cl",
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)
active._version = 2
result = active.min()[0:3, 4:6, 7:9]
print("Result is", result)
Expand Down
21 changes: 14 additions & 7 deletions tests/test_real_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def test_anon_s3():
'endpoint_url': "https://uor-aces-o.s3-ext.jc.rl.ac.uk"
}
},
active_storage_url=active_storage_url)
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)
active._version = 2
with pytest.raises(ReductionistError):
result = active.min()[:]
Expand All @@ -54,7 +55,8 @@ def test_s3_small_file():
active = Active(test_file_uri,
'tas',
storage_options=storage_options,
active_storage_url=active_storage_url)
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)
active._version = 2
result = active.min()[0:3, 4:6, 7:9]
print("Result is", result)
Expand All @@ -80,7 +82,8 @@ def test_s3_small_dataset():
av = dataset['tas']
active = Active(av,
storage_options=storage_options,
active_storage_url=active_storage_url)
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)
active._version = 2
result = active.min()[0:3, 4:6, 7:9]
print("Result is", result)
Expand Down Expand Up @@ -109,7 +112,8 @@ def test_s3_dataset():
'UM_m01s16i202_vn1106',
interface_type="s3",
storage_options=storage_options,
active_storage_url=active_storage_url)
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)
active._version = 2
result = active.min()[0:3, 4:6, 7:9] # standardized slice
print("Result is", result)
Expand All @@ -119,7 +123,8 @@ def test_s3_dataset():
active = Active(test_file_uri,
'UM_m01s16i202_vn1106',
storage_options=storage_options,
active_storage_url=active_storage_url)
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)
active._version = 2
result = active.min()[0:3, 4:6, 7:9] # standardized slice
print("Result is", result)
Expand All @@ -133,7 +138,8 @@ def test_s3_dataset():
active = Active(av,
interface_type="s3",
storage_options=storage_options,
active_storage_url=active_storage_url)
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)
active._version = 2
result = active.min()[0:3, 4:6, 7:9] # standardized slice
print("Result is", result)
Expand All @@ -142,7 +148,8 @@ def test_s3_dataset():
# dataset: implicit interface_type
active = Active(av,
storage_options=storage_options,
active_storage_url=active_storage_url)
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)
active._version = 2
result = active.min()[0:3, 4:6, 7:9] # standardized slice
print("Result is", result)
Expand Down
9 changes: 6 additions & 3 deletions tests/test_real_s3_with_axes.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def build_active_test1_file():
print("S3 Test file path:", test_file_uri)
active = Active(test_file_uri, 'tas', interface_type="s3",
storage_options=storage_options,
active_storage_url=active_storage_url)
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)

active._version = 2

Expand All @@ -49,7 +50,8 @@ def build_active_small_file():
print("S3 Test file path:", test_file_uri)
active = Active(test_file_uri, 'tas', interface_type="s3",
storage_options=storage_options,
active_storage_url=active_storage_url)
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)

active._version = 2

Expand Down Expand Up @@ -104,7 +106,8 @@ def build_active():
print("S3 Test file path:", test_file_uri)
active = Active(test_file_uri, 'm01s30i111', interface_type="s3", # 'm01s06i247_4', interface_type="s3",
storage_options=storage_options,
active_storage_url=active_storage_url)
active_storage_url=active_storage_url,
option_disable_chunk_cache=True)

active._version = 2

Expand Down
3 changes: 3 additions & 0 deletions tests/unit/test_storage_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def reduce_chunk(
chunk_selection,
axis,
operation,
option_disable_chunk_cache,
):
return activestorage.storage.reduce_chunk(
test_file,
Expand All @@ -60,6 +61,7 @@ def reduce_chunk(
chunk_selection,
axis,
np.max,
False,
)

mock_load.side_effect = load_from_s3
Expand Down Expand Up @@ -102,6 +104,7 @@ def reduce_chunk(
mock.ANY,
mock.ANY,
operation="max",
option_disable_chunk_cache=False,
)


Expand Down
Loading