Skip to content

Commit

Permalink
move all_array_storage/compression into config
Browse files Browse the repository at this point in the history
  • Loading branch information
braingram committed Mar 8, 2023
1 parent f334ba7 commit f387daf
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 34 deletions.
9 changes: 9 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
3.0.0 (unreleased)
------------------

The ASDF Standard is at v1.6.0
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

- Add ``all_array_storage``, ``all_array_compression`` and
``all_array_compression_kwargs`` to ``asdf.config.AsdfConfig`` [#1468]

2.15.0 (unreleased)
-------------------

Expand Down
20 changes: 20 additions & 0 deletions asdf/_tests/test_array_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,26 @@ def test_update_expand_tree(tmp_path):
assert_array_equal(ff.tree["arrays"][1], my_array2)


def test_update_all_external(tmp_path):
fn = tmp_path / "test.asdf"

my_array = np.arange(64) * 1
my_array2 = np.arange(64) * 2
tree = {"arrays": [my_array, my_array2]}

af = asdf.AsdfFile(tree)
af.write_to(fn)

with asdf.config.config_context() as cfg:
cfg.array_inline_threshold = 10
cfg.all_array_storage = "external"
with asdf.open(fn, mode="rw") as af:
af.update()

assert "test0000.asdf" in os.listdir(tmp_path)
assert "test0001.asdf" in os.listdir(tmp_path)


def _get_update_tree():
return {"arrays": [np.arange(64) * 1, np.arange(64) * 2, np.arange(64) * 3]}

Expand Down
6 changes: 6 additions & 0 deletions asdf/_tests/test_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,15 @@ def compressors(self):
def test_compression_with_extension(tmp_path):
tree = _get_large_tree()

with pytest.raises(ValueError, match="Supported compression types are"), config_context() as cfg:
cfg.all_array_compression = "lzma"

with config_context() as config:
config.add_extension(LzmaExtension())

with config_context() as cfg:
cfg.all_array_compression = "lzma"

with pytest.raises(lzma.LZMAError, match=r"Invalid or unsupported options"):
_roundtrip(tmp_path, tree, "lzma", write_options={"compression_kwargs": {"preset": 9000}})
fn = _roundtrip(tmp_path, tree, "lzma", write_options={"compression_kwargs": {"preset": 6}})
Expand Down
33 changes: 33 additions & 0 deletions asdf/_tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,39 @@ def test_array_inline_threshold():
assert get_config().array_inline_threshold is None


def test_all_array_storage():
with asdf.config_context() as config:
assert config.all_array_storage == asdf.config.DEFAULT_ALL_ARRAY_STORAGE
config.all_array_storage = "internal"
assert get_config().all_array_storage == "internal"
config.all_array_storage = None
assert get_config().all_array_storage is None
with pytest.raises(ValueError, match=r"Invalid value for all_array_storage"):
config.all_array_storage = "foo"


def test_all_array_compression():
with asdf.config_context() as config:
assert config.all_array_compression == asdf.config.DEFAULT_ALL_ARRAY_COMPRESSION
config.all_array_compression = "zlib"
assert get_config().all_array_compression == "zlib"
config.all_array_compression = None
assert get_config().all_array_compression is None
with pytest.raises(ValueError, match=r"Supported compression types are"):
config.all_array_compression = "foo"


def test_all_array_compression_kwargs():
with asdf.config_context() as config:
assert config.all_array_compression_kwargs == asdf.config.DEFAULT_ALL_ARRAY_COMPRESSION_KWARGS
config.all_array_compression_kwargs = {}
assert get_config().all_array_compression_kwargs == {}
config.all_array_compression_kwargs = None
assert get_config().all_array_compression_kwargs is None
with pytest.raises(ValueError, match=r"Invalid value for all_array_compression_kwargs"):
config.all_array_compression_kwargs = "foo"


def test_resource_mappings():
with asdf.config_context() as config:
core_mappings = get_json_schema_resource_mappings() + asdf_standard.integration.get_resource_mappings()
Expand Down
2 changes: 1 addition & 1 deletion asdf/_tests/test_fits_embed.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ def test_dangling_file_handle(tmp_path):
ctx = asdf.AsdfFile()
gc.collect()

ctx._blocks.find_or_create_block_for_array(hdul[0].data, ctx)
ctx._blocks.find_or_create_block_for_array(hdul[0].data)
gc.collect()

hdul.close()
Expand Down
46 changes: 26 additions & 20 deletions asdf/asdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1156,16 +1156,7 @@ def _tree_finalizer(tagged_tree):
padding = util.calculate_padding(fd.tell(), pad_blocks, fd.block_size)
fd.fast_forward(padding)

def _pre_write(self, fd, all_array_storage, all_array_compression, compression_kwargs=None):
if all_array_storage not in (None, "internal", "external", "inline"):
msg = f"Invalid value for all_array_storage: '{all_array_storage}'"
raise ValueError(msg)

self._all_array_storage = all_array_storage

self._all_array_compression = all_array_compression
self._all_array_compression_kwargs = compression_kwargs

def _pre_write(self, fd):
if len(self._tree):
self._run_hook("pre_write")

Expand Down Expand Up @@ -1260,7 +1251,17 @@ def update(
in ``asdf.get_config().array_inline_threshold``.
"""

pad_blocks = kwargs.pop("pad_blocks", False)
include_block_index = kwargs.pop("include_block_index", True)
version = kwargs.pop("version", None)

with config_context() as config:
if "all_array_storage" in kwargs:
config.all_array_storage = kwargs.pop("all_array_storage")
if "all_array_compression" in kwargs:
config.all_array_compression = kwargs.pop("all_array_compression")
if "compression_kwargs" in kwargs:
config.all_array_compression_kwargs = kwargs.pop("compression_kwargs")
_handle_deprecated_kwargs(config, kwargs)

fd = self._fd
Expand All @@ -1280,10 +1281,10 @@ def update(
if version is not None:
self.version = version

if all_array_storage == "external":
if config.all_array_storage == "external":
# If the file is fully exploded, there's no benefit to
# update, so just use write_to()
self.write_to(fd, all_array_storage=all_array_storage)
self.write_to(fd)
fd.truncate()
return

Expand All @@ -1297,7 +1298,7 @@ def update(
if fd.can_memmap():
fd.flush_memmap()

self._pre_write(fd, all_array_storage, all_array_compression, compression_kwargs=compression_kwargs)
self._pre_write(fd)

try:
fd.seek(0)
Expand Down Expand Up @@ -1344,12 +1345,6 @@ def update(
def write_to(
self,
fd,
all_array_storage=None,
all_array_compression="input",
pad_blocks=False,
include_block_index=True,
version=None,
compression_kwargs=None,
**kwargs,
):
"""
Expand Down Expand Up @@ -1419,7 +1414,18 @@ def write_to(
``asdf.get_config().array_inline_threshold``.
"""

pad_blocks = kwargs.pop("pad_blocks", False)
include_block_index = kwargs.pop("include_block_index", True)
version = kwargs.pop("version", None)

with config_context() as config:
if "all_array_storage" in kwargs:
config.all_array_storage = kwargs.pop("all_array_storage")
if "all_array_compression" in kwargs:
config.all_array_compression = kwargs.pop("all_array_compression")
if "compression_kwargs" in kwargs:
config.all_array_compression_kwargs = kwargs.pop("compression_kwargs")
_handle_deprecated_kwargs(config, kwargs)

if version is not None:
Expand All @@ -1431,7 +1437,7 @@ def write_to(
# attribute of the AsdfFile.
if self._uri is None:
self._uri = fd.uri
self._pre_write(fd, all_array_storage, all_array_compression, compression_kwargs=compression_kwargs)
self._pre_write(fd)

try:
self._serial_write(fd, pad_blocks, include_block_index)
Expand Down
19 changes: 10 additions & 9 deletions asdf/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def write_external_blocks(self, uri, pad_blocks=False):
blk._array_storage = "internal"
asdffile._blocks.add(blk)
blk._used = True
asdffile.write_to(subfd, pad_blocks=pad_blocks)
asdffile.write_to(subfd, pad_blocks=pad_blocks, all_array_storage="internal")

def write_block_index(self, fd, ctx):
"""
Expand Down Expand Up @@ -567,13 +567,14 @@ def _find_used_blocks(self, tree, ctx):
if getattr(block, "_used", 0) == 0 and block not in reserved_blocks:
self.remove(block)

def _handle_global_block_settings(self, ctx, block):
all_array_storage = getattr(ctx, "_all_array_storage", None)
def _handle_global_block_settings(self, block):
cfg = get_config()
all_array_storage = cfg.all_array_storage
if all_array_storage:
self.set_array_storage(block, all_array_storage)

all_array_compression = getattr(ctx, "_all_array_compression", "input")
all_array_compression_kwargs = getattr(ctx, "_all_array_compression_kwargs", {})
all_array_compression = cfg.all_array_compression
all_array_compression_kwargs = cfg.all_array_compression_kwargs
# Only override block compression algorithm if it wasn't explicitly set
# by AsdfFile.set_array_compression.
if all_array_compression != "input":
Expand Down Expand Up @@ -601,7 +602,7 @@ def finalize(self, ctx):
self._find_used_blocks(ctx.tree, ctx)

for block in list(self.blocks):
self._handle_global_block_settings(ctx, block)
self._handle_global_block_settings(block)

def get_block(self, source):
"""
Expand Down Expand Up @@ -714,7 +715,7 @@ def get_source(self, block):
msg = "block not found."
raise ValueError(msg)

def find_or_create_block_for_array(self, arr, ctx):
def find_or_create_block_for_array(self, arr):
"""
For a given array, looks for an existing block containing its
underlying data. If not found, adds a new block to the block
Expand Down Expand Up @@ -743,7 +744,7 @@ def find_or_create_block_for_array(self, arr, ctx):

block = Block(base)
self.add(block)
self._handle_global_block_settings(ctx, block)
self._handle_global_block_settings(block)

return block

Expand Down Expand Up @@ -787,7 +788,7 @@ def get_output_compression_extensions(self):
return ext

def __getitem__(self, arr):
return self.find_or_create_block_for_array(arr, object())
return self.find_or_create_block_for_array(arr)

def close(self):
for block in self.blocks:
Expand Down
79 changes: 79 additions & 0 deletions asdf/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Methods for getting and setting asdf global configuration
options.
"""
import collections
import copy
import threading
from contextlib import contextmanager
Expand All @@ -19,6 +20,9 @@
DEFAULT_LEGACY_FILL_SCHEMA_DEFAULTS = True
DEFAULT_IO_BLOCK_SIZE = -1 # auto
DEFAULT_ARRAY_INLINE_THRESHOLD = None
DEFAULT_ALL_ARRAY_STORAGE = None
DEFAULT_ALL_ARRAY_COMPRESSION = "input"
DEFAULT_ALL_ARRAY_COMPRESSION_KWARGS = None


class AsdfConfig:
Expand All @@ -37,6 +41,9 @@ def __init__(self):
self._legacy_fill_schema_defaults = DEFAULT_LEGACY_FILL_SCHEMA_DEFAULTS
self._io_block_size = DEFAULT_IO_BLOCK_SIZE
self._array_inline_threshold = DEFAULT_ARRAY_INLINE_THRESHOLD
self._all_array_storage = DEFAULT_ALL_ARRAY_STORAGE
self._all_array_compression = DEFAULT_ALL_ARRAY_COMPRESSION
self._all_array_compression_kwargs = DEFAULT_ALL_ARRAY_COMPRESSION_KWARGS

self._lock = threading.RLock()

Expand Down Expand Up @@ -315,6 +322,72 @@ def array_inline_threshold(self, value):
"""
self._array_inline_threshold = value

@property
def all_array_storage(self):
"""
Override the array storage type of all blocks
in the file immediately before writing. Must be one of the
following strings or `None`:
- ``internal``: The default. The array data will be
stored in a binary block in the same ASDF file.
- ``external``: Store the data in a binary block in a
separate ASDF file.
- ``inline``: Store the data as YAML inline in the tree.
"""
return self._all_array_storage

@all_array_storage.setter
def all_array_storage(self, value):
if value not in (None, "internal", "external", "inline"):
msg = f"Invalid value for all_array_storage: '{value}'"
raise ValueError(msg)
self._all_array_storage = value

@property
def all_array_compression(self):
"""
Override the compression type on all binary blocks in the
file. Must be one of the following strings, `None` or a
label supported by a `asdf.extension.Compressor`:
- ``''`` or `None`: No compression.
- ``zlib``: Use zlib compression.
- ``bzp2``: Use bzip2 compression.
- ``lz4``: Use lz4 compression.
- ``input``: Use the same compression as in the file read.
If there is no prior file, acts as None
"""
return self._all_array_compression

@all_array_compression.setter
def all_array_compression(self, value):
# local to avoid circular import
from asdf.compression import validate

self._all_array_compression = validate(value)

@property
def all_array_compression_kwargs(self):
"""
Dictionary of keyword arguments provided to the compressor during
block compression (or `None` for no keyword arguments)
"""
return self._all_array_compression_kwargs

@all_array_compression_kwargs.setter
def all_array_compression_kwargs(self, value):
if value is not None and not isinstance(value, collections.abc.Mapping):
msg = f"Invalid value for all_array_compression_kwargs: '{value}'"
raise ValueError(msg)
self._all_array_compression_kwargs = value

@property
def validate_on_read(self):
"""
Expand Down Expand Up @@ -344,13 +417,19 @@ def __repr__(self):
return (
"<AsdfConfig\n"
" array_inline_threshold: {}\n"
" all_array_storage: {}\n"
" all_array_compression: {}\n"
" all_array_compression_kwargs: {}\n"
" default_version: {}\n"
" io_block_size: {}\n"
" legacy_fill_schema_defaults: {}\n"
" validate_on_read: {}\n"
">"
).format(
self.array_inline_threshold,
self.all_array_storage,
self.all_array_compression,
self.all_array_compression_kwargs,
self.default_version,
self.io_block_size,
self.legacy_fill_schema_defaults,
Expand Down
Loading

0 comments on commit f387daf

Please sign in to comment.