diff --git a/signac/__init__.py b/signac/__init__.py index 6bf487e2f..64721eec8 100644 --- a/signac/__init__.py +++ b/signac/__init__.py @@ -26,12 +26,12 @@ ) from .contrib import filesystems as fs from .contrib import get_job, get_project, index, index_files, init_project +from .contrib.job import get_buffer_load, get_buffer_size, set_buffer_size from .core.h5store import H5Store, H5StoreManager -from .core.jsondict import JSONDict -from .core.jsondict import buffer_reads_writes as buffered from .core.jsondict import flush_all as flush -from .core.jsondict import get_buffer_load, get_buffer_size -from .core.jsondict import in_buffered_mode as is_buffered +from .core.synced_collections.buffered_collection import buffer_all as buffered +from .core.synced_collections.buffered_collection import is_buffered +from .core.synced_collections.collection_json import JSONDict from .db import get_database from .diff import diff_jobs from .version import __version__ @@ -69,6 +69,7 @@ "flush", "get_buffer_size", "get_buffer_load", + "set_buffer_size", "JSONDict", "H5Store", "H5StoreManager", diff --git a/signac/contrib/errors.py b/signac/contrib/errors.py index 80c3da855..0391a2a96 100644 --- a/signac/contrib/errors.py +++ b/signac/contrib/errors.py @@ -28,8 +28,8 @@ class DestinationExistsError(Error, RuntimeError): Parameters ---------- - destination : - The destination object causing the error. + destination : str + The destination causing the error. """ diff --git a/signac/contrib/hashing.py b/signac/contrib/hashing.py index a2c5a58bd..36d9970d2 100644 --- a/signac/contrib/hashing.py +++ b/signac/contrib/hashing.py @@ -6,6 +6,8 @@ import hashlib import json +from ..core.synced_collections.utils import SyncedCollectionJSONEncoder + # We must use the standard library json for exact consistency in formatting @@ -27,7 +29,7 @@ def calc_id(spec): Encoded hash in hexadecimal format. """ - blob = json.dumps(spec, sort_keys=True) + blob = json.dumps(spec, cls=SyncedCollectionJSONEncoder, sort_keys=True) m = hashlib.md5() m.update(blob.encode()) return m.hexdigest() diff --git a/signac/contrib/import_export.py b/signac/contrib/import_export.py index 5b497dfef..bd605567c 100644 --- a/signac/contrib/import_export.py +++ b/signac/contrib/import_export.py @@ -766,7 +766,7 @@ def _copy_to_job_workspace(src, job, copytree): raise DestinationExistsError(job) raise else: - job._init() + job.init() return dst diff --git a/signac/contrib/job.py b/signac/contrib/job.py index 812f3947b..fbf3d40a1 100644 --- a/signac/contrib/job.py +++ b/signac/contrib/job.py @@ -8,13 +8,13 @@ import os import shutil from copy import deepcopy +from json import JSONDecodeError from deprecation import deprecated -from ..core import json -from ..core.attrdict import SyncedAttrDict from ..core.h5store import H5StoreManager -from ..core.jsondict import JSONDict +from ..core.synced_collections.collection_json import BufferedJSONDict, JSONDict +from ..errors import KeyTypeError from ..sync import sync_jobs from ..version import __version__ from .errors import DestinationExistsError, JobsCorruptedError @@ -24,31 +24,189 @@ logger = logging.getLogger(__name__) -class _sp_save_hook: - """Hook to handle job migration when state points are changed. - - When a job's state point is changed, in addition - to the contents of the file being modified this hook - calls :meth:`~Job._reset_sp` to rehash the state - point, compute a new job id, and move the folder. - - Parameters - ---------- - jobs : iterable of `Jobs` - List of jobs(instance of `Job`). +class _StatePointDict(JSONDict): + """A JSON-backed dictionary for storing job state points. + There are three principal reasons for extending the base JSONDict: + 1. Saving needs to trigger a job directory migration, and + 2. State points are assumed to not support external modification, so + they never need to load from disk _except_ the very first time a job + is opened by id and they're not present in the cache. + 3. It must be possible to load and/or save on demand during tasks like + job directory migrations. """ - def __init__(self, *jobs): - self.jobs = list(jobs) + _PROTECTED_KEYS = ("_jobs",) + + def __init__( + self, + jobs=None, + filename=None, + write_concern=False, + data=None, + parent=None, + *args, + **kwargs, + ): + # Multiple Python Job objects can share a single `_StatePointDict` + # instance because they are shallow copies referring to the same data + # on disk. We need to store these jobs in a shared list here so that + # shallow copies can point to the same place and trigger each other to + # update. This does not apply to independently created Job objects, + # even if they refer to the same disk data; this only applies to + # explicit shallow copies and unpickled objects within a session. + self._jobs = list(jobs) + super().__init__( + filename=filename, + write_concern=write_concern, + data=data, + parent=parent, + *args, + **kwargs, + ) - def load(self): + def _load(self): + # State points never load from disk automatically. They are either + # initialized with provided data (e.g. from the state point cache), or + # they load from disk the first time state point data is requested for + # a Job opened by id (in which case the state point must first be + # validated manually). pass - def save(self): - """Reset the state point for all the jobs.""" - for job in self.jobs: - job._reset_sp() + def _save(self): + # State point modification triggers job migration for all jobs sharing + # this state point (shallow copies of a single job). + new_id = calc_id(self) + + # All elements of the job list are shallow copies of each other, so any + # one of them is representative. + job = next(iter(self._jobs)) + old_id = job._id + if old_id == new_id: + return + + tmp_statepoint_file = self.filename + "~" + should_init = False + try: + # Move the state point to an intermediate location as a backup. + os.replace(self.filename, tmp_statepoint_file) + try: + new_workspace = os.path.join(job._project.workspace(), new_id) + os.replace(job.workspace(), new_workspace) + except OSError as error: + os.replace(tmp_statepoint_file, self.filename) # rollback + if error.errno in (errno.EEXIST, errno.ENOTEMPTY, errno.EACCES): + raise DestinationExistsError(new_id) + else: + raise + else: + should_init = True + except OSError as error: + # The most likely reason we got here is because the state point + # file move failed due to the job not being initialized so the file + # doesn't exist, which is OK. + if error.errno != errno.ENOENT: + raise + + # Update each job instance. + for job in self._jobs: + job._id = new_id + job._initialize_lazy_properties() + + # Remove the temporary state point file if it was created. Have to do it + # here because we need to get the updated job state point filename. + try: + os.remove(job._statepoint_filename + "~") + except OSError as error: + if error.errno != errno.ENOENT: + raise + + # Since all the jobs are equivalent, just grab the filename from the + # last one and init it. Also migrate the lock for multithreaded support. + old_lock_id = self._lock_id + self._filename = job._statepoint_filename + type(self)._locks[self._lock_id] = type(self)._locks.pop(old_lock_id) + + if should_init: + # Only initializing one job assumes that all changes in init are + # changes reflected in the underlying resource (the JSON file). + # This assumption is currently valid because all in-memory + # attributes are loaded lazily (and are handled by the call to + # _initialize_lazy_properties above), except for the key defining + # property of the job id (which is also updated above). If init + # ever changes to making modifications to the job object, we may + # need to call it for all jobs. + job.init() + + logger.info(f"Moved '{old_id}' -> '{new_id}'.") + + def save(self, force=False): + """Trigger a save to disk. + + Unlike normal JSONDict objects, this class requires the ability to save + on command. Moreover, this save must be conditional on whether or not a + file is present to allow the user to observe state points in corrupted + data spaces and attempt to recover. + + Parameters + ---------- + force : bool + If True, save even if the file is present on disk. + """ + try: + # Open the file for writing only if it does not exist yet. + if force or not os.path.isfile(self._filename): + super()._save() + except Exception as error: + if not isinstance(error, OSError) or error.errno not in ( + errno.EEXIST, + errno.EACCES, + ): + # Attempt to delete the file on error, to prevent corruption. + # OSErrors that are EEXIST or EACCES don't need to delete the file. + try: + os.remove(self._filename) + except Exception: # ignore all errors here + pass + raise + + def load(self, job_id): + """Trigger a load from disk. + + Unlike normal JSONDict objects, this class requires the ability to + load on command. These loads typically occur when the state point + must be validated against the data on disk; at all other times, the + in-memory data is assumed to be accurate to avoid unnecessary I/O. + + Parameters + ---------- + job_id : str + Job id used to validate contents on disk. + + Returns + ------- + data : dict + Dictionary of state point data. + + Raises + ------ + :class:`~signac.errors.JobsCorruptedError` + If the data on disk is invalid or its hash does not match the job + id. + + """ + try: + data = self._load_from_resource() + except JSONDecodeError: + raise JobsCorruptedError([job_id]) + + if calc_id(data) != job_id: + raise JobsCorruptedError([job_id]) + + with self._suspend_sync: + self._update(data) + + return data class Job: @@ -73,9 +231,9 @@ class Job: """ FN_MANIFEST = "signac_statepoint.json" - """The job's manifest filename. + """The job's state point filename. - The job manifest is a human-readable file containing the job's state + The job state point is a human-readable file containing the job's state point that is stored in each job's workspace directory. """ @@ -87,38 +245,35 @@ class Job: def __init__(self, project, statepoint=None, _id=None): self._project = project + self._initialize_lazy_properties() if statepoint is None and _id is None: raise ValueError("Either statepoint or _id must be provided.") elif statepoint is not None: - # A state point was provided. - self._statepoint = SyncedAttrDict(statepoint, parent=_sp_save_hook(self)) - # If the id is provided, assume the job is already registered in - # the project cache and that the id is valid for the state point. - if _id is None: - # Validate the state point and recursively convert to supported types. - statepoint = self.statepoint() - # Compute the id from the state point if not provided. - self._id = calc_id(statepoint) - # Update the project's state point cache immediately if opened by state point - self._project._register(self.id, statepoint) - else: - self._id = _id + self._statepoint_requires_init = False + try: + self._id = calc_id(statepoint) if _id is None else _id + except TypeError: + raise KeyTypeError + self._statepoint = _StatePointDict( + jobs=[self], filename=self._statepoint_filename, data=statepoint + ) + + # Update the project's state point cache immediately if opened by state point + self._project._register(self.id, statepoint) else: # Only an id was provided. State point will be loaded lazily. - self._statepoint = None self._id = _id + self._statepoint = _StatePointDict( + jobs=[self], filename=self._statepoint_filename + ) + self._statepoint_requires_init = True - # Prepare job working directory + def _initialize_lazy_properties(self): + """Initialize all properties that are designed to be loaded lazily.""" self._wd = None - - # Prepare job document self._document = None - - # Prepare job H5StoreManager self._stores = None - - # Prepare current working directory for context management self._cwd = [] @deprecated( @@ -184,6 +339,11 @@ def workspace(self): self._wd = os.path.join(self._project.workspace(), self.id) return self._wd + @property + def _statepoint_filename(self): + """Get the path of the state point file for this job.""" + return os.path.join(self.workspace(), self.FN_MANIFEST) + @property def ws(self): """Alias for :meth:`~Job.workspace`.""" @@ -204,49 +364,7 @@ def reset_statepoint(self, new_statepoint): The job's new state point. """ - dst = self._project.open_job(new_statepoint) - if dst == self: - return - fn_manifest = os.path.join(self.workspace(), self.FN_MANIFEST) - fn_manifest_backup = fn_manifest + "~" - try: - os.replace(fn_manifest, fn_manifest_backup) - try: - os.replace(self.workspace(), dst.workspace()) - except OSError as error: - os.replace(fn_manifest_backup, fn_manifest) # rollback - if error.errno in (errno.EEXIST, errno.ENOTEMPTY, errno.EACCES): - raise DestinationExistsError(dst) - else: - raise - else: - dst.init() - except OSError as error: - if error.errno == errno.ENOENT: - pass # job is not initialized - else: - raise - # Update this instance - self.statepoint._data = dst.statepoint._data - self._id = dst._id - self._wd = None - self._document = None - self._stores = None - self._cwd = [] - logger.info(f"Moved '{self}' -> '{dst}'.") - - def _reset_sp(self, new_statepoint=None): - """Check for new state point requested to assign this job. - - Parameters - ---------- - new_statepoint : dict - The job's new state point (Default value = None). - - """ - if new_statepoint is None: - new_statepoint = self.statepoint() - self.reset_statepoint(new_statepoint) + self._statepoint.reset(new_statepoint) def update_statepoint(self, update, overwrite=False): """Update the state point of this job. @@ -283,36 +401,7 @@ def update_statepoint(self, update, overwrite=False): if statepoint.get(key, value) != value: raise KeyError(key) statepoint.update(update) - self.reset_statepoint(statepoint) - - def _read_manifest(self): - """Read and parse the manifest file, if it exists. - - Returns - ------- - manifest : dict - State point data. - - Raises - ------ - JobsCorruptedError - If an error occurs while parsing the state point manifest. - OSError - If an error occurs while reading the state point manifest. - - """ - fn_manifest = os.path.join(self.workspace(), self.FN_MANIFEST) - try: - with open(fn_manifest, "rb") as file: - manifest = json.loads(file.read().decode()) - except OSError as error: - if error.errno != errno.ENOENT: - raise error - except ValueError: - # This catches JSONDecodeError, a subclass of ValueError - raise JobsCorruptedError([self.id]) - else: - return manifest + self._statepoint.reset(statepoint) @property def statepoint(self): @@ -336,12 +425,13 @@ def statepoint(self): Returns the job's state point. """ - if self._statepoint is None: - # Load state point manifest lazily and assign to - # self._statepoint - statepoint = self._check_manifest() + if self._statepoint_requires_init: + # Load state point data lazily (on access). + statepoint = self._statepoint.load(self.id) + # Update the project's state point cache when loaded lazily self._project._register(self.id, statepoint) + self._statepoint_requires_init = False return self._statepoint @@ -355,7 +445,7 @@ def statepoint(self, new_statepoint): The new state point to be assigned. """ - self._reset_sp(new_statepoint) + self._statepoint.reset(new_statepoint) @property def sp(self): @@ -373,9 +463,13 @@ def document(self): .. warning:: + Even deep copies of :attr:`~Job.document` will modify the same file, + so changes will still effectively be persisted between deep copies. If you need a deep copy that will not modify the underlying - persistent JSON file, use :attr:`~Job.document` instead of :attr:`~Job.doc`. - For more information, see :attr:`~Job.statepoint` or :class:`~signac.JSONDict`. + persistent JSON file, use the call operator to get an equivalent + plain dictionary: ``job.document()``. + For more information, see + :class:`~signac.core.synced_collections.collection_json.BufferedJSONDict`. See :ref:`signac document ` for the command line equivalent. @@ -388,7 +482,7 @@ def document(self): if self._document is None: self.init() fn_doc = os.path.join(self.workspace(), self.FN_DOCUMENT) - self._document = JSONDict(filename=fn_doc, write_concern=True) + self._document = BufferedJSONDict(filename=fn_doc, write_concern=True) return self._document @document.setter @@ -397,7 +491,7 @@ def document(self, new_doc): Parameters ---------- - new_doc : :class:`~signac.JSONDict` + new_doc : :class:`~signac.core.synced_collections.collection_json.BufferedJSONDict` The job document handle. """ @@ -409,9 +503,18 @@ def doc(self): .. warning:: + Even deep copies of :attr:`~Job.doc` will modify the same file, so + changes will still effectively be persisted between deep copies. If you need a deep copy that will not modify the underlying - persistent JSON file, use :attr:`~Job.document` instead of :attr:`~Job.doc`. - For more information, see :attr:`~Job.statepoint` or :class:`~signac.JSONDict`. + persistent JSON file, use the call operator to get an equivalent + plain dictionary: ``job.doc()``. + + See :ref:`signac document ` for the command line equivalent. + + Returns + ------- + :class:`~signac.JSONDict` + The job document handle. """ return self.document @@ -493,96 +596,11 @@ def data(self, new_data): """ self.stores[self.KEY_DATA] = new_data - def _init(self, force=False): - """Contains all logic for job initialization. - - This method is called by :meth:`~.init` and is responsible - for actually creating the job workspace directory and - writing out the state point manifest file. - - Parameters - ---------- - force : bool - If ``True``, write the job manifest even if it - already exists. If ``False``, this method will - raise an Exception if the manifest exists - (Default value = False). - - """ - # Attempt early exit if the manifest exists and is valid - try: - statepoint = self._check_manifest() - except Exception: - # Any exception means this method cannot exit early. - - # Create the workspace directory if it does not exist. - try: - _mkdir_p(self.workspace()) - except OSError: - logger.error( - "Error occurred while trying to create " - "workspace directory for job '{}'.".format(self.id) - ) - raise - - fn_manifest = os.path.join(self.workspace(), self.FN_MANIFEST) - try: - # Prepare the data before file creation and writing. - statepoint = self.statepoint() - blob = json.dumps(statepoint, indent=2) - except JobsCorruptedError: - raise - - try: - # Open the file for writing only if it does not exist yet. - with open(fn_manifest, "w" if force else "x") as file: - file.write(blob) - except OSError as error: - if error.errno not in (errno.EEXIST, errno.EACCES): - raise - except Exception as error: - # Attempt to delete the file on error, to prevent corruption. - try: - os.remove(fn_manifest) - except Exception: # ignore all errors here - pass - raise error - else: - # Validate the output again after writing to disk - statepoint = self._check_manifest() - - # Update the project's state point cache if the manifest is valid - self._project._register(self.id, statepoint) - - def _check_manifest(self): - """Check whether the manifest file exists and is correct. - - If the manifest is valid, this sets the state point if it is not - already set. - - Returns - ------- - manifest : dict - State point data. - - Raises - ------ - JobsCorruptedError - If the manifest hash is not equal to the job id. - - """ - manifest = self._read_manifest() - if calc_id(manifest) != self.id: - raise JobsCorruptedError([self.id]) - if self._statepoint is None: - self._statepoint = SyncedAttrDict(manifest, parent=_sp_save_hook(self)) - return manifest - def init(self, force=False): """Initialize the job's workspace directory. - This function will do nothing if the directory and - the job manifest already exist. + This function will do nothing if the directory and the job state point + already exist. Returns the calling job. @@ -591,8 +609,8 @@ def init(self, force=False): Parameters ---------- force : bool - Overwrite any existing state point's manifest - files, e.g., to repair them if they got corrupted (Default value = False). + Overwrite any existing state point files, e.g., to repair them if + they got corrupted (Default value = False). Returns ------- @@ -601,10 +619,33 @@ def init(self, force=False): """ try: - self._init(force=force) + # Attempt early exit if the state point file exists and is valid. + try: + statepoint = self._statepoint.load(self.id) + except Exception: + # Any exception means this method cannot exit early. + + # Create the workspace directory if it does not exist. + try: + _mkdir_p(self.workspace()) + except OSError: + logger.error( + "Error occurred while trying to create " + "workspace directory for job '{}'.".format(self.id) + ) + raise + + # The state point save will not overwrite an existing file on + # disk unless force is True, so the subsequent load will catch + # when a preexisting invalid file was present. + self._statepoint.save(force=force) + statepoint = self._statepoint.load(self.id) + + # Update the project's state point cache if the saved file is valid. + self._project._register(self.id, statepoint) except Exception: logger.error( - f"State point manifest file of job '{self.id}' appears to be corrupted." + f"State point file of job '{self.id}' appears to be corrupted." ) raise return self @@ -825,7 +866,9 @@ def __exit__(self, err_type, err_value, tb): def __setstate__(self, state): self.__dict__.update(state) - self.statepoint._parent.jobs.append(self) + # We append to a list of jobs rather than replacing to support + # transparent id updates between shallow copies of a job. + self.statepoint._jobs.append(self) def __deepcopy__(self, memo): cls = self.__class__ @@ -834,3 +877,18 @@ def __deepcopy__(self, memo): for key, value in self.__dict__.items(): setattr(result, key, deepcopy(value, memo)) return result + + +def get_buffer_load(): + """Get the actual size of the buffer.""" + return BufferedJSONDict.get_current_buffer_size() + + +def get_buffer_size(): + """Get the maximum available capacity of the buffer.""" + return BufferedJSONDict.get_buffer_capacity() + + +def set_buffer_size(new_size): + """Set the maximum available capacity of the buffer.""" + return BufferedJSONDict.set_buffer_capacity(new_size) diff --git a/signac/contrib/project.py b/signac/contrib/project.py index 3a2487e29..dc3bd589f 100644 --- a/signac/contrib/project.py +++ b/signac/contrib/project.py @@ -26,7 +26,7 @@ from ..common.config import Config, get_config, load_config from ..core import json from ..core.h5store import H5StoreManager -from ..core.jsondict import JSONDict +from ..core.synced_collections.collection_json import BufferedJSONDict from ..sync import sync_projects from ..version import SCHEMA_VERSION, __version__ from .collection import Collection @@ -496,13 +496,13 @@ def document(self): Returns ------- - :class:`~signac.JSONDict` + :class:`~signac.core.synced_collections.collection_json.BufferedJSONDict` The project document. """ if self._document is None: fn_doc = os.path.join(self.root_directory(), self.FN_DOCUMENT) - self._document = JSONDict(filename=fn_doc, write_concern=True) + self._document = BufferedJSONDict(filename=fn_doc, write_concern=True) return self._document @document.setter @@ -525,7 +525,7 @@ def doc(self): Returns ------- - :class:`~signac.JSONDict` + :class:`~signac.core.synced_collections.collection_json.BufferedJSONDict` The project document. """ diff --git a/signac/core/synced_collections/buffered_collection.py b/signac/core/synced_collections/buffered_collection.py index 1ea446ad3..5d6afe791 100644 --- a/signac/core/synced_collections/buffered_collection.py +++ b/signac/core/synced_collections/buffered_collection.py @@ -36,6 +36,7 @@ """ import logging +import warnings from inspect import isabstract from typing import Any, List @@ -207,7 +208,7 @@ def _flush_buffer(self): # This function provides a more familiar module-scope, function-based interface # for enabling buffering rather than calling the class's static method. -def buffer_all(): +def buffer_all(force_write=None, buffer_size=None): """Return a global buffer context for all BufferedCollection instances. All future operations use the buffer whenever possible. Write operations @@ -217,4 +218,25 @@ def buffer_all(): manager represents a promise to buffer whenever possible, but does not guarantee that no writes will occur under all circumstances. """ + if force_write is not None: + warnings.warn( + DeprecationWarning( + "The force_write parameter is deprecated and will be removed in " + "signac 2.0. This functionality is no longer supported." + ) + ) + if buffer_size is not None: + warnings.warn( + DeprecationWarning( + "The buffer_size parameter is deprecated and will be removed in " + "signac 2.0. The buffer size should be set using the " + "set_buffer_capacity method of FileBufferedCollection or any of its " + "subclasses." + ) + ) return _BUFFER_ALL_CONTEXT + + +def is_buffered(): + """Check the global buffered mode setting.""" + return bool(_BUFFER_ALL_CONTEXT) diff --git a/signac/core/synced_collections/collection_json.py b/signac/core/synced_collections/collection_json.py index 504d3da40..5812cd916 100644 --- a/signac/core/synced_collections/collection_json.py +++ b/signac/core/synced_collections/collection_json.py @@ -8,13 +8,14 @@ import os import uuid import warnings +from typing import Tuple from .memory_buffered_collection import SharedMemoryFileBufferedCollection from .serialized_file_buffered_collection import SerializedFileBufferedCollection from .synced_attr_dict import SyncedAttrDict from .synced_collection import SyncedCollection from .synced_list import SyncedList -from .utils import SCJSONEncoder +from .utils import SyncedCollectionJSONEncoder from .validators import json_format_validator @@ -99,9 +100,10 @@ def _load_from_resource(self): Returns ------- - Collection + Collection or None An equivalent unsynced collection satisfying :meth:`is_base_type` that - contains the data in the JSON file. + contains the data in the JSON file. Will return None if the file does + not exist. """ try: @@ -111,11 +113,13 @@ def _load_from_resource(self): except OSError as error: if error.errno == errno.ENOENT: return None + else: + raise def _save_to_resource(self): """Write the data to JSON file.""" # Serialize data - blob = json.dumps(self, cls=SCJSONEncoder).encode() + blob = json.dumps(self, cls=SyncedCollectionJSONEncoder).encode() # When write_concern flag is set, we write the data into dummy file and then # replace that file with original file. We also enable this mode # irrespective of the write_concern flag if we're running in @@ -215,7 +219,7 @@ class JSONDict(JSONCollection, SyncedAttrDict): """ - _PROTECTED_KEYS = ("_filename",) + _PROTECTED_KEYS: Tuple[str, ...] = ("_filename",) def __init__( self, @@ -294,7 +298,7 @@ def __init__( class BufferedJSONDict(BufferedJSONCollection, SyncedAttrDict): """A buffered :class:`JSONDict`.""" - _PROTECTED_KEYS = ( + _PROTECTED_KEYS: Tuple[str, ...] = ( "_filename", "_buffered", "_is_buffered", @@ -344,7 +348,7 @@ def __init__( class MemoryBufferedJSONDict(MemoryBufferedJSONCollection, SyncedAttrDict): """A buffered :class:`JSONDict`.""" - _PROTECTED_KEYS = SyncedAttrDict._PROTECTED_KEYS + ( + _PROTECTED_KEYS: Tuple[str, ...] = SyncedAttrDict._PROTECTED_KEYS + ( "_filename", "_buffered", "_is_buffered", diff --git a/signac/core/synced_collections/collection_mongodb.py b/signac/core/synced_collections/collection_mongodb.py index 28126440a..ad983a458 100644 --- a/signac/core/synced_collections/collection_mongodb.py +++ b/signac/core/synced_collections/collection_mongodb.py @@ -53,9 +53,10 @@ def _load_from_resource(self): Returns ------- - Collection + Collection or None An equivalent unsynced collection satisfying :meth:`is_base_type` that - contains the data in the MongoDB database. + contains the data in the MongoDB database. Will return None if no data + was found in the database. """ blob = self._collection.find_one(self._uid) diff --git a/signac/core/synced_collections/collection_redis.py b/signac/core/synced_collections/collection_redis.py index b53fc0554..21df97b9b 100644 --- a/signac/core/synced_collections/collection_redis.py +++ b/signac/core/synced_collections/collection_redis.py @@ -39,9 +39,10 @@ def _load_from_resource(self): Returns ------- - Collection + Collection or None An equivalent unsynced collection satisfying :meth:`is_base_type` that - contains the data in the Redis database. + contains the data in the Redis database. Will return None if no data + was found in the Redis database. """ blob = self._client.get(self._key) diff --git a/signac/core/synced_collections/collection_zarr.py b/signac/core/synced_collections/collection_zarr.py index 45783beb7..99fe02f37 100644 --- a/signac/core/synced_collections/collection_zarr.py +++ b/signac/core/synced_collections/collection_zarr.py @@ -48,9 +48,10 @@ def _load_from_resource(self): Returns ------- - Collection + Collection or None An equivalent unsynced collection satisfying :meth:`is_base_type` that - contains the data in the Zarr group. + contains the data in the Zarr group. Will return None if associated + data is not found in the Zarr group. """ try: diff --git a/signac/core/synced_collections/serialized_file_buffered_collection.py b/signac/core/synced_collections/serialized_file_buffered_collection.py index 43749870c..8601b54e9 100644 --- a/signac/core/synced_collections/serialized_file_buffered_collection.py +++ b/signac/core/synced_collections/serialized_file_buffered_collection.py @@ -13,7 +13,7 @@ from .errors import MetadataError from .file_buffered_collection import FileBufferedCollection -from .utils import SCJSONEncoder +from .utils import SyncedCollectionJSONEncoder class SerializedFileBufferedCollection(FileBufferedCollection): @@ -168,7 +168,7 @@ def _encode(data): The underlying encoded data. """ - return json.dumps(data, cls=SCJSONEncoder).encode() + return json.dumps(data, cls=SyncedCollectionJSONEncoder).encode() @staticmethod def _decode(blob): diff --git a/signac/core/synced_collections/synced_attr_dict.py b/signac/core/synced_collections/synced_attr_dict.py index 01a1b477f..5b2010f0a 100644 --- a/signac/core/synced_collections/synced_attr_dict.py +++ b/signac/core/synced_collections/synced_attr_dict.py @@ -180,29 +180,22 @@ def __setitem__(self, key, value): for key, value in data.items(): self._data[key] = self._from_base(value, parent=self) - def reset(self, data=None): + def reset(self, data): """Update the instance with new data. Parameters ---------- - data: mapping - Data to update the instance (Default value = None). + data : mapping + Data to update the instance. Raises ------ ValueError - If the data is not instance of mapping + If the data is not a mapping. """ - if data is None: - data = {} if _mapping_resolver.get_type(data) == "MAPPING": - self._validate(data) - with self._suspend_sync: - self._data = { - key: self._from_base(data=value, parent=self) - for key, value in data.items() - } + self._update(data) with self._thread_lock: self._save() else: diff --git a/signac/core/synced_collections/synced_collection.py b/signac/core/synced_collections/synced_collection.py index fb9151467..001f5e23d 100644 --- a/signac/core/synced_collections/synced_collection.py +++ b/signac/core/synced_collections/synced_collection.py @@ -340,11 +340,15 @@ def is_base_type(cls, data): def _load_from_resource(self): """Load data from underlying backend. - This method must be implemented for each backend. + This method must be implemented for each backend. Backends may choose + to return ``None``, signaling that no modification should be performed + on the data in memory. This mode is useful for backends where the underlying + resource (e.g. a file) may not initially exist, but can be transparently + created on save. Returns ------- - Collection + Collection or None An equivalent unsynced collection satisfying :meth:`is_base_type` that contains the data in the underlying resource (e.g. a file). diff --git a/signac/core/synced_collections/synced_list.py b/signac/core/synced_collections/synced_list.py index d68a3336e..bd3235379 100644 --- a/signac/core/synced_collections/synced_list.py +++ b/signac/core/synced_collections/synced_list.py @@ -150,30 +150,24 @@ def _update(self, data=None): ) ) - def reset(self, data=None): + def reset(self, data): """Update the instance with new data. Parameters ---------- - data: non-string Sequence, optional - Data to update the instance (Default value = None). + data : non-string Sequence + Data to update the instance. Raises ------ ValueError - If the data is not instance of non-string seqeuence + If the data is not a non-string sequence. """ - if data is None: - data = [] - elif NUMPY and isinstance(data, numpy.ndarray): + if NUMPY and isinstance(data, numpy.ndarray): data = data.tolist() - self._validate(data) if _sequence_resolver.get_type(data) == "SEQUENCE": - with self._suspend_sync: - self._data = [ - self._from_base(data=value, parent=self) for value in data - ] + self._update(data) with self._thread_lock: self._save() else: diff --git a/signac/core/synced_collections/utils.py b/signac/core/synced_collections/utils.py index 882171c69..ba677ef86 100644 --- a/signac/core/synced_collections/utils.py +++ b/signac/core/synced_collections/utils.py @@ -114,7 +114,7 @@ def default(o: Any) -> Dict[str, Any]: # noqa: D102 raise TypeError from e -class SCJSONEncoder(JSONEncoder): +class SyncedCollectionJSONEncoder(JSONEncoder): """A JSONEncoder capable of encoding SyncedCollections and other supported types. This encoder will attempt to obtain a JSON-serializable representation of diff --git a/tests/test_buffered_mode.py b/tests/test_buffered_mode.py index 7a7d5f586..15d60a429 100644 --- a/tests/test_buffered_mode.py +++ b/tests/test_buffered_mode.py @@ -13,7 +13,8 @@ from test_project import TestProjectBase import signac -from signac.errors import BufferedFileError, BufferException, Error +from signac.core.synced_collections.errors import BufferedError +from signac.errors import BufferedFileError, Error PYPY = "PyPy" in platform.python_implementation() @@ -72,6 +73,10 @@ def test_basic_and_nested(self): assert job.doc.a == 2 assert job.doc.a == 2 + # Remove this test in signac 2.0. + @pytest.mark.xfail( + reason="The new SyncedCollection does not implement force_write." + ) def test_buffered_mode_force_write(self): with signac.buffered(force_write=False): with signac.buffered(force_write=False): @@ -88,6 +93,10 @@ def test_buffered_mode_force_write(self): pass assert not signac.is_buffered() + # Remove this test in signac 2.0. + @pytest.mark.xfail( + reason="The new SyncedCollection does not implement force_write." + ) def test_buffered_mode_force_write_with_file_modification(self): job = self.project.open_job(dict(a=0)) job.init() @@ -114,8 +123,9 @@ def test_buffered_mode_force_write_with_file_modification(self): file.write(json.dumps({"a": x}).encode()) assert job.doc.a == (not x) - @pytest.mark.skipif( - not ABLE_TO_PREVENT_WRITE, reason="unable to trigger permission error" + # Remove this test in signac 2.0. + @pytest.mark.xfail( + reason="The new SyncedCollection does not implement force_write." ) def test_force_write_mode_with_permission_error(self): job = self.project.open_job(dict(a=0)) @@ -139,31 +149,27 @@ def test_force_write_mode_with_permission_error(self): assert job.doc.a == x def test_buffered_mode_change_buffer_size(self): - assert not signac.is_buffered() - with signac.buffered(buffer_size=12): - assert signac.buffered() - assert signac.get_buffer_size() == 12 - - assert not signac.is_buffered() - with pytest.raises(TypeError): - with signac.buffered(buffer_size=True): - pass - - assert not signac.is_buffered() - with signac.buffered(buffer_size=12): - assert signac.buffered() - assert signac.get_buffer_size() == 12 - with signac.buffered(buffer_size=12): + original_buffer_size = signac.get_buffer_size() + try: + assert not signac.is_buffered() + signac.set_buffer_size(12) + with signac.buffered(): assert signac.buffered() assert signac.get_buffer_size() == 12 - assert not signac.is_buffered() - with pytest.raises(BufferException): - with signac.buffered(buffer_size=12): + assert not signac.is_buffered() + + assert not signac.is_buffered() + with signac.buffered(): assert signac.buffered() assert signac.get_buffer_size() == 12 - with signac.buffered(buffer_size=14): - pass + with signac.buffered(): + assert signac.buffered() + assert signac.get_buffer_size() == 12 + + assert not signac.is_buffered() + finally: + signac.set_buffer_size(original_buffer_size) def test_integration(self): def routine(): @@ -240,7 +246,7 @@ def routine(): assert job2.doc.a == (not x) assert job.doc.a == (not x) - with pytest.raises(BufferedFileError) as cm: + with pytest.raises(BufferedError) as cm: with signac.buffered(): assert job.doc.a == (not x) job.doc.a = x diff --git a/tests/test_job.py b/tests/test_job.py index 737c40004..e49a2ae4d 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -16,6 +16,7 @@ import signac.common.config import signac.contrib +from signac.contrib.errors import JobsCorruptedError from signac.contrib.job import Job from signac.errors import DestinationExistsError, InvalidKeyError, KeyTypeError @@ -445,11 +446,14 @@ class A: assert str(key) in job.sp def test_invalid_sp_key_types(self): - job = self.open_job(dict(invalid_key=True)).init() - class A: pass + with pytest.raises(KeyTypeError): + self.open_job({A(): True}).init() + + job = self.open_job(dict(invalid_key=True)).init() + for key in (0.0, A(), (1, 2, 3)): with pytest.raises(KeyTypeError): job.sp[key] = "test" @@ -536,6 +540,9 @@ def test_chained_init(self): assert os.path.exists(os.path.join(job.workspace(), job.FN_MANIFEST)) def test_construction(self): + from signac import Project # noqa: F401 + + # The eval statement needs to have Project available job = self.open_job(test_token) job2 = eval(repr(job)) assert job == job2 @@ -626,8 +633,8 @@ def test_corrupt_workspace(self): job2 = self.open_job(test_token) try: logging.disable(logging.ERROR) - # Detects the corrupted manifest and overwrites with valid data - job2.init() + with pytest.raises(JobsCorruptedError): + job2.init() finally: logging.disable(logging.NOTSET) job2.init(force=True) diff --git a/tests/test_synced_collections/synced_collection_test.py b/tests/test_synced_collections/synced_collection_test.py index 4e72d1da0..7d06b197b 100644 --- a/tests/test_synced_collections/synced_collection_test.py +++ b/tests/test_synced_collections/synced_collection_test.py @@ -208,8 +208,6 @@ def test_reset(self, synced_collection, testdata): synced_collection[key] = testdata assert len(synced_collection) == 1 assert synced_collection[key] == testdata - synced_collection.reset() - assert len(synced_collection) == 0 synced_collection.reset({"reset": "abc"}) assert len(synced_collection) == 1 assert synced_collection[key] == "abc" @@ -544,8 +542,6 @@ def test_reset(self, synced_collection): synced_collection.reset([1, 2, 3]) assert len(synced_collection) == 3 assert synced_collection == [1, 2, 3] - synced_collection.reset() - assert len(synced_collection) == 0 synced_collection.reset([3, 4]) assert len(synced_collection) == 2 assert synced_collection == [3, 4] diff --git a/tests/test_synced_collections/test_utils.py b/tests/test_synced_collections/test_utils.py index d0b398575..efa2e25bd 100644 --- a/tests/test_synced_collections/test_utils.py +++ b/tests/test_synced_collections/test_utils.py @@ -10,7 +10,10 @@ from signac.core.synced_collections.collection_json import JSONDict from signac.core.synced_collections.synced_list import SyncedList -from signac.core.synced_collections.utils import AbstractTypeResolver, SCJSONEncoder +from signac.core.synced_collections.utils import ( + AbstractTypeResolver, + SyncedCollectionJSONEncoder, +) try: import numpy @@ -52,7 +55,7 @@ def encode_flat_dict(d): # Raw dictionaries should be encoded transparently. data = {"foo": 1, "bar": 2, "baz": 3} assert json.dumps(data) == encode_flat_dict(data) - assert json.dumps(data, cls=SCJSONEncoder) == json.dumps(data) + assert json.dumps(data, cls=SyncedCollectionJSONEncoder) == json.dumps(data) with TemporaryDirectory() as tmp_dir: fn = os.path.join(tmp_dir, "test_json_encoding.json") @@ -60,14 +63,15 @@ def encode_flat_dict(d): synced_data.update(data) with pytest.raises(TypeError): json.dumps(synced_data) - assert json.dumps(synced_data, cls=SCJSONEncoder) == encode_flat_dict( - synced_data - ) + assert json.dumps( + synced_data, cls=SyncedCollectionJSONEncoder + ) == encode_flat_dict(synced_data) if NUMPY: array = numpy.random.rand(3) synced_data["foo"] = array assert isinstance(synced_data["foo"], SyncedList) assert ( - json.loads(json.dumps(synced_data, cls=SCJSONEncoder)) == synced_data() + json.loads(json.dumps(synced_data, cls=SyncedCollectionJSONEncoder)) + == synced_data() )