Skip to content

Commit

Permalink
Reenable multithreading tests (#850)
Browse files Browse the repository at this point in the history
* Disable threading support for JSONCollection on Windows.

* Change all xfailed tests to conditional skips on Windows.

* Enable xfail strict.

* Don't try to modify locks when threading support doesn't exist.

* Reset the filename whether or not we need to play with locks.

* Remove debug prints.

* Minor cleanup.

* Address PR comments.

* Fix typo in comment.

* Setting filename also requires the lock.

* Centralize ON_WINDOWS.

* Update changelog.

* No special behavior for cygwin.

* Disable multithreading on Cygwin. Tests were failing.

Co-authored-by: Bradley Dice <bdice@bradleydice.com>
  • Loading branch information
vyasr and bdice committed Nov 8, 2022
1 parent c6c0b48 commit e69e629
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 64 deletions.
1 change: 1 addition & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Changed
- Rather than searching upwards until the root, ``load_config`` will only load configuration files in the specified directory, which is assumed to be a project directory, as well as the user's home directory (#711).
- Changed the ``root`` parameter to ``path`` in the ``signac.get_project`` and ``signac.init_project`` functions and corresponding ``Project`` methods (#757, #758).
- The prefix argument to ``$ signac view`` is now optional and can be provided with ``-p/--prefix`` (#653, #774).
- Tests are run with ``xfail_strict = True`` (#850).
- Detection of an invalid config will raise an error rather than a debug log (#855).

Removed
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ omit =
*/signac/common/deprecation/*.py

[tool:pytest]
xfail_strict = True
filterwarnings =
error::FutureWarning:signac.*
error::DeprecationWarning:signac.*
Expand Down
38 changes: 32 additions & 6 deletions signac/_synced_collections/backends/collection_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import errno
import json
import os
import sys
import uuid
from collections.abc import Mapping, Sequence
from typing import Callable, FrozenSet
Expand All @@ -27,6 +28,9 @@
from ..utils import AbstractTypeResolver, SyncedCollectionJSONEncoder
from ..validators import json_format_validator, no_dot_in_key, require_string_key

ON_WINDOWS = sys.platform.startswith("win32") or sys.platform.startswith("cygwin")


"""
There are many classes defined in this file. Most of the definitions are
trivial since logic is largely inherited, but the large number of classes
Expand Down Expand Up @@ -127,11 +131,13 @@ class JSONCollection(SyncedCollection):
**Thread safety**
The :class:`JSONCollection` is thread-safe. To make these collections safe, the
``write_concern`` flag is ignored in multithreaded execution, and the
write is **always** performed via a write to temporary file followed by a
The :class:`JSONCollection` is thread-safe on Unix-like systems (not
Windows, see the Warnings section). To make these collections safe, the
``write_concern`` flag is ignored in multithreaded execution, and the write
is **always** performed via a write to temporary file followed by a
replacement of the original file. The file replacement operation uses
:func:`os.replace`, which is guaranteed to be atomic by the Python standard.
:func:`os.replace`, which is guaranteed to be atomic by the Python
standard.
Parameters
----------
Expand All @@ -145,10 +151,17 @@ class JSONCollection(SyncedCollection):
\*\*kwargs :
Keyword arguments forwarded to parent constructors.
Warnings
--------
This class is _not_ thread safe on Windows. It relies on ``os.replace`` for
atomic file writes, and that method can fail in multithreaded situations if
open handles exist to the destination file within the same process on a
different thread. See https://bugs.python.org/issue46003 for more
information.
"""

_backend = __name__ # type: ignore
_supports_threading = True
_supports_threading = not ON_WINDOWS
_validators: Sequence_t[Callable] = (require_string_key, json_format_validator)

def __init__(self, filename=None, write_concern=False, *args, **kwargs):
Expand Down Expand Up @@ -200,9 +213,21 @@ def _save_to_resource(self):

@property
def filename(self):
"""str: The name of the associated JSON file on disk."""
"""str: Get or set the name of the associated JSON file on disk."""
return self._filename

@filename.setter
def filename(self, value):
# When setting the filename we must also remap the locks.
with self._thread_lock:
if type(self)._threading_support_is_active:
old_lock_id = self._lock_id

self._filename = value

if type(self)._threading_support_is_active:
type(self)._locks[self._lock_id] = type(self)._locks.pop(old_lock_id)

@property
def _lock_id(self):
return self._filename
Expand All @@ -227,6 +252,7 @@ def _lock_id(self):
"registry",
# These keys are specific to the JSON backend.
"_filename",
"filename",
"_write_concern",
)
)
Expand Down
13 changes: 5 additions & 8 deletions signac/contrib/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,15 @@ def _save(self):
job._initialize_lazy_properties()

# Remove the temporary state point file if it was created. Have to do it
# here because we need to get the updated job state point filename.
# here so that we can get the updated job state point filename and set
# it to self.filename below.
try:
os.remove(job._statepoint_filename + "~")
except OSError as error:
if error.errno != errno.ENOENT:
raise

# Since all the jobs are equivalent, just grab the filename from the
# last one and init it. Also migrate the lock for multithreaded support.
old_lock_id = self._lock_id
self._filename = job._statepoint_filename
type(self)._locks[self._lock_id] = type(self)._locks.pop(old_lock_id)
self.filename = job._statepoint_filename

if should_init:
# Only initializing one job assumes that all changes in init are
Expand Down Expand Up @@ -164,7 +161,7 @@ def save(self, force=False):
"""
try:
# Open the file for writing only if it does not exist yet.
if force or not os.path.isfile(self._filename):
if force or not os.path.isfile(self.filename):
super()._save()
except Exception as error:
if not isinstance(error, OSError) or error.errno not in (
Expand All @@ -174,7 +171,7 @@ def save(self, force=False):
# Attempt to delete the file on error, to prevent corruption.
# OSErrors that are EEXIST or EACCES don't need to delete the file.
try:
os.remove(self._filename)
os.remove(self.filename)
except Exception: # ignore all errors here
pass
raise
Expand Down
14 changes: 0 additions & 14 deletions tests/test_synced_collections/synced_collection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,6 @@ class A:
with pytest.raises(TypeError):
synced_collection[key] = testdata

@pytest.mark.xfail(
reason=(
"This test sometimes fails. This may indicate a race condition. "
"The test fails more consistently on Windows but also appears on "
"Linux in CI."
),
)
def test_multithreaded(self, synced_collection):
"""Test multithreaded runs of synced dicts."""
if not type(synced_collection)._supports_threading:
Expand Down Expand Up @@ -770,13 +763,6 @@ def test_nested_list_with_dict(self, synced_collection):
assert isinstance(child2, SyncedCollection)
assert isinstance(child1, SyncedCollection)

@pytest.mark.xfail(
reason=(
"This test sometimes fails. This may indicate a race condition. "
"The test fails more consistently on Windows but also appears on "
"Linux in CI."
),
)
def test_multithreaded(self, synced_collection):
"""Test multithreaded runs of synced lists."""
if not type(synced_collection)._supports_threading:
Expand Down
75 changes: 39 additions & 36 deletions tests/test_synced_collections/test_json_buffered_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import itertools
import json
import os
import sys
import time
from concurrent.futures import ThreadPoolExecutor

Expand All @@ -23,6 +24,8 @@
)
from signac._synced_collections.errors import BufferedError, MetadataError

ON_WINDOWS = sys.platform.startswith("win32") or sys.platform.startswith("cygwin")


class BufferedJSONCollectionTest(JSONCollectionTest):
def load(self, collection):
Expand Down Expand Up @@ -376,11 +379,11 @@ def multithreaded_buffering_test(self, op, tmpdir):
# truly flushed correctly.
assert self._collection_type.get_current_buffer_size() == 0

@pytest.mark.xfail(
@pytest.mark.skipif(
ON_WINDOWS,
reason=(
"This test sometimes fails. This may indicate a race condition. "
"The test fails more consistently on Windows but also appears on "
"Linux in CI."
"The JSONCollection cannot be safely used in multithreaded settings "
"on Windows due to https://bugs.python.org/issue46003."
),
)
def test_multithreaded_buffering_setitem(self, tmpdir):
Expand All @@ -392,11 +395,11 @@ def setitem_dict(sd, data):

self.multithreaded_buffering_test(setitem_dict, tmpdir)

@pytest.mark.xfail(
@pytest.mark.skipif(
ON_WINDOWS,
reason=(
"This test sometimes fails. This may indicate a race condition. "
"The test fails more consistently on Windows but also appears on "
"Linux in CI."
"The JSONCollection cannot be safely used in multithreaded settings "
"on Windows due to https://bugs.python.org/issue46003."
),
)
def test_multithreaded_buffering_update(self, tmpdir):
Expand All @@ -407,11 +410,11 @@ def update_dict(sd, data):

self.multithreaded_buffering_test(update_dict, tmpdir)

@pytest.mark.xfail(
@pytest.mark.skipif(
ON_WINDOWS,
reason=(
"This test sometimes fails. This may indicate a race condition. "
"The test fails more consistently on Windows but also appears on "
"Linux in CI."
"The JSONCollection cannot be safely used in multithreaded settings "
"on Windows due to https://bugs.python.org/issue46003."
),
)
def test_multithreaded_buffering_reset(self, tmpdir):
Expand All @@ -422,11 +425,11 @@ def reset_dict(sd, data):

self.multithreaded_buffering_test(reset_dict, tmpdir)

@pytest.mark.xfail(
@pytest.mark.skipif(
ON_WINDOWS,
reason=(
"This test sometimes fails. This may indicate a race condition. "
"The test fails more consistently on Windows but also appears on "
"Linux in CI."
"The JSONCollection cannot be safely used in multithreaded settings "
"on Windows due to https://bugs.python.org/issue46003."
),
)
def test_multithreaded_buffering_clear(self, tmpdir):
Expand Down Expand Up @@ -474,11 +477,11 @@ def test_multithreaded_buffering_clear(self, tmpdir):
# truly flushed correctly.
assert self._collection_type.get_current_buffer_size() == 0

@pytest.mark.xfail(
@pytest.mark.skipif(
ON_WINDOWS,
reason=(
"This test sometimes fails. This may indicate a race condition. "
"The test fails more consistently on Windows but also appears on "
"Linux in CI."
"The JSONCollection cannot be safely used in multithreaded settings "
"on Windows due to https://bugs.python.org/issue46003."
),
)
def test_multithreaded_buffering_load(self, tmpdir):
Expand Down Expand Up @@ -670,11 +673,11 @@ def multithreaded_buffering_test(self, op, requires_init, tmpdir):
# truly flushed correctly.
assert self._collection_type.get_current_buffer_size() == 0

@pytest.mark.xfail(
@pytest.mark.skipif(
ON_WINDOWS,
reason=(
"This test sometimes fails. This may indicate a race condition. "
"The test fails more consistently on Windows but also appears on "
"Linux in CI."
"The JSONCollection cannot be safely used in multithreaded settings "
"on Windows due to https://bugs.python.org/issue46003."
),
)
def test_multithreaded_buffering_setitem(self, tmpdir):
Expand All @@ -686,11 +689,11 @@ def setitem_list(sd, data):

self.multithreaded_buffering_test(setitem_list, True, tmpdir)

@pytest.mark.xfail(
@pytest.mark.skipif(
ON_WINDOWS,
reason=(
"This test sometimes fails. This may indicate a race condition. "
"The test fails more consistently on Windows but also appears on "
"Linux in CI."
"The JSONCollection cannot be safely used in multithreaded settings "
"on Windows due to https://bugs.python.org/issue46003."
),
)
def test_multithreaded_buffering_extend(self, tmpdir):
Expand All @@ -701,11 +704,11 @@ def extend_list(sd, data):

self.multithreaded_buffering_test(extend_list, False, tmpdir)

@pytest.mark.xfail(
@pytest.mark.skipif(
ON_WINDOWS,
reason=(
"This test sometimes fails. This may indicate a race condition. "
"The test fails more consistently on Windows but also appears on "
"Linux in CI."
"The JSONCollection cannot be safely used in multithreaded settings "
"on Windows due to https://bugs.python.org/issue46003."
),
)
def test_multithreaded_buffering_append(self, tmpdir):
Expand All @@ -717,11 +720,11 @@ def append_list(sd, data):

self.multithreaded_buffering_test(append_list, False, tmpdir)

@pytest.mark.xfail(
@pytest.mark.skipif(
ON_WINDOWS,
reason=(
"This test sometimes fails. This may indicate a race condition. "
"The test fails more consistently on Windows but also appears on "
"Linux in CI."
"The JSONCollection cannot be safely used in multithreaded settings "
"on Windows due to https://bugs.python.org/issue46003."
),
)
def test_multithreaded_buffering_load(self, tmpdir):
Expand Down
11 changes: 11 additions & 0 deletions tests/test_synced_collections/test_json_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from synced_collection_test import SyncedDictTest, SyncedListTest

from signac._synced_collections.backends.collection_json import (
ON_WINDOWS,
JSONAttrDict,
JSONAttrList,
JSONDict,
Expand Down Expand Up @@ -42,6 +43,16 @@ def synced_collection_positional(self, tmpdir):
def test_filename(self, synced_collection):
assert os.path.basename(synced_collection.filename) == self._fn

@pytest.mark.skipif(
ON_WINDOWS,
reason=(
"The JSONCollection cannot be safely used in multithreaded settings "
"on Windows due to https://bugs.python.org/issue46003."
),
)
def test_multithreaded(self, synced_collection):
return super().test_multithreaded(synced_collection)


class TestJSONDict(JSONCollectionTest, SyncedDictTest):

Expand Down

0 comments on commit e69e629

Please sign in to comment.