Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: Incoherent job statepoint access in subprocess #528

Closed
bdice opened this issue Mar 10, 2021 · 14 comments · Fixed by #529 or #530
Closed

Bug: Incoherent job statepoint access in subprocess #528

bdice opened this issue Mar 10, 2021 · 14 comments · Fixed by #529 or #530
Labels
bug Something isn't working
Milestone

Comments

@bdice
Copy link
Member

bdice commented Mar 10, 2021

Description

I was facing a strange bug in signac-flow's tests and have identified it is occurring because of some issue in signac.

I did a git bisect and it looks like the issue was introduced in PR #497 but I can't tell why.

There's some kind of a race condition that leads to job.sp returning {} even though there should be data in the state point.

This happens both with and without buffering.

The job is opened by statepoint, so there is no lazy state point access.

To reproduce

Here's a minimal failing example.

import signac
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor

def b_is_positive(job):
    sp = job.sp
    try:
        return sp.b >= 0
    except AttributeError:
        print("Incorrect state point:", sp)
        return False

def compute_status(data):
    job, func = data
    result = func(job)
    return job.id, func.__name__, result

if __name__ == "__main__":
    if len(sys.argv) == 1:
        # Launch the main process
        with signac.TemporaryProject() as project:
            for b in range(5):
                project.open_job({"b": b}).init()
            cmd = f"python {__file__} {project.root_directory()}"
            output = subprocess.check_output(cmd.split()).decode("utf-8")
            print(output)
    else:
        # Launch the subprocess
        print("Launched subprocess.")
        project_dir = sys.argv[1]
        project = signac.get_project(project_dir)
        tasks = []
        for job in project:
            # Launching more concurrent tasks causes a higher failure rate
            for func in [b_is_positive, b_is_positive, b_is_positive]:
                tasks.append((job, func))
        with ThreadPoolExecutor() as e:
            results = list(e.map(compute_status, tasks))
        failures = 0
        for r in results:
            if r[1] == 'b_is_positive' and not r[2]:
                print("FAIL", r)
                failures += 1
        print("Total failures:", failures)

System configuration

Please complete the following information:

  • Operating System [e.g. macOS]: Ubuntu 20.04 (WSL)
  • Version of Python [e.g. 3.7]: 3.9
  • Version of signac [e.g. 1.0]: e5b8057 or newer (master is pointing to 786d75f)
@bdice bdice added the bug Something isn't working label Mar 10, 2021
@bdice bdice added this to the 1.7.0 milestone Mar 10, 2021
@csadorf
Copy link
Contributor

csadorf commented Mar 11, 2021

What commands to you use to start this script?

@bdice
Copy link
Member Author

bdice commented Mar 11, 2021

@csadorf Save the script as test.py and execute python test.py.

@csadorf
Copy link
Contributor

csadorf commented Mar 11, 2021

@csadorf Save the script as test.py and execute python test.py.

I think I was a bit stumped by the whole sys.argv business and missed the forking.

The job is opened by statepoint, so there is no lazy state point access.

At least for the forked process that's not true, is it? We iterate over the job and then need to load the state point from disk, no?

@bdice
Copy link
Member Author

bdice commented Mar 11, 2021

@vyasr @csadorf I met with @joaander and we debugged this together. It turns out that the issue is indeed from my optimizations in #497 and can be explained like this:

  1. Job instance is created from an id. (Note: I was wrong above, @csadorf correctly noted that we open the job by id. The state point is delayed.)
  2. Each thread gets a reference to that Job instance.
  3. Threads all see job._statepoint_requires_init == True and contend with one another to instantiate job._statepoint.
  4. Multiple threads succeed at instantiating their own _StatePointDict synced collections and assigning it to job._statepoint. Each newly created instance of _StatePointDict has its own thread lock RLock instance!
  5. The threads all read from disk at the same time and call _update. I think this is where the corruption occurs.

I am not sure if I'm pointing to the right spot for the actual data corruption, but I can positively identify that every thread is instantiating its own _StatePointDict and thus owns its own lock rather than sharing a lock with the other threads.

Possible solutions:

  1. Instantiate _StatePointDict in Job.__init__. However, I am pretty sure this same argument would also cause failures in all lazy-initialized Job attributes (job document, job data, etc), wherever threading is involved.
  2. Add a lock to the Job. I have confirmed that this prevents failures, if I initialize the lock in Job.__init__ and wrap the "lazy init" parts in that lock's context manager.
  3. Other solutions?? I feel like this could be solved at the synced collections level if we were clever. e.g. a module level weak-value dict storing a lock-per-filename, with some module-level lock that prevents creating more than one lock per filename...?

@vyasr
Copy link
Contributor

vyasr commented Mar 11, 2021

I'd have to look at the code again, but from my recollections step 4 doesn't quite make sense to me. Locks are never stored per-instance level; to avoid precisely this problem, all locks are stored in a class-level dictionary mapping cls._locks : Dict[type(self._lock_id)->RLock] (basically the "other solution" you're looking for). Even if the different instances are created on different threads, they should just end up overwriting the locks stored in that class level dictionary, then when instances try to perform thread-safe operations they should all access the same lock via type(self)._locks[self._lock_id]. The initial lock creation itself is certainly not thread-safe, but accessing a lock in this manner should consistently give the lock created by the _StatePointDict on the last thread to create a lock. It's possible that somewhere I am accidentally assigning an RLock to self._thread_lock or something, which would explain the problem but seems unlikely since I would have expected that to break somewhere else by now.

You said that this only fails if you fork, right? If you run the ThreadPool on process 0 without forking it works as expected? If so, somehow the forking is critical to triggering this failure. Assuming I'm remembering that correctly, perhaps the problem is related to the way that I'm creating the _locks dictionary, which is done in SyncedCollection.__init_subclass__. Is it possible that somehow the _StatePointDict class is being initialized separately in each thread and is not being shared? What does _StatePointDict._locks look like at the end of your script? How about on each thread, does id(next(iter(_StatePointDict.values()))) print the same object id on every thread (which would indicate whether or not the same RLock is being stored)?

A potentially useful basis for comparison would be the tests for multithreaded behavior that I previously wrote in tests/test_synced_collections/synced_collection_test.py::SyncedCollectionTest::test_multithreaded. If you can make that test or a slightly adapted version of it fail in a subprocess, that might help you narrow down the problem. It might be too removed from your more complex example involving signac to be that helpful though; just a suggestion.

@bdice
Copy link
Member Author

bdice commented Mar 11, 2021

Is it possible that somehow the _StatePointDict class is being initialized separately in each thread and is not being shared?

Yes, that's what I meant in "3." above. Every thread shares the same Job instance. Calling job.statepoint on each thread makes them instantiate separate _StatePointDict instances, each of which owns its own RLock.

@vyasr
Copy link
Contributor

vyasr commented Mar 11, 2021

No, not separate instances. When locks are instantiated here, they are stored in a dictionary owned by the class. So for example if I do

a = _StatePointDict(filename='a.json', ...)
b = _StatePointDict(filename='a.json', ...)

I should see something like

print(_StatePointDict._locks)
# {'.../a.json': RLock, '.../b.json': RLock}

Instances never own RLocks. What I'm asking is if it is possible that either the _StatePointDict class itself is different on each thread (very unlikely since it's probably created on import signac, but possible if it's only created when a specific subpackage is imported), or if it's possible that _StatePointDict._locks (the class-level dictionary) is somehow being reassigned on different threads.

@vyasr
Copy link
Contributor

vyasr commented Mar 11, 2021

Locks should always be accessed via the property self._thread_lock, which is defined here as basically a lookup in that dictionary. My suspicion would be that the monkey-patching done by enable_multithreading could be causing some problems here because of the forking, but that's just a guess.

@joaander
Copy link
Member

Locks should always be accessed via the property self._thread_lock, which is defined here as basically a lookup in that dictionary. My suspicion would be that the monkey-patching done by enable_multithreading could be causing some problems here because of the forking, but that's just a guess.

We confirmed via print() debugging that each independent thread had a separate RLock instanced returned by self._thread_lock.

@joaander
Copy link
Member

self._locks[self._lock_id] is set here in the __init__ method of SyncedCollection. It does not check if the lock exists first. So when the code instantiates separate SyncedCollection instances concurrently in separate threads, the different instances may get different locks.

@vyasr
Copy link
Contributor

vyasr commented Mar 11, 2021

Got it. In order for that to happen, one of the following must be true:

  1. Each _StatePointDict has a different _lock_id, allowing them all to coexist in _StatePointDict._locks (a dictionary owned by the class, not each instance), which is keyed on instance._lock_id.
  2. Each thread must see a different instance of _StatePointDict._locks, which I think is only possible if the class itself is different on each thread. That would only be possible if the import that brings _StatePointDict into the namespace is happening on each thread, because _StatePointDict._locks is created when the class is created (defined).
  3. Each thread would have to both instantiate the lock on construction, and access it in another method (e.g. _update), before a second thread tried to create a lock.

@vyasr
Copy link
Contributor

vyasr commented Mar 11, 2021

Perhaps this would be easiest sorted out in a call?

@vyasr
Copy link
Contributor

vyasr commented Mar 12, 2021

Just had a call with @bdice and @joaander. There are two classes of problems that need to be resolved.

  1. The first issue is what I listed in point (3) above. It is possible for a SyncedCollection to be created (at which point it saves a lock) and then start performing a nominally thread-safe operation on one thread, then have another thread create a new SyncedCollection pointing to the same underlying resource and create a new thread lock and try to perform the same operation. At this point, the two threads will use different locks, even though type(self)._locks[self._thread_id] is unique. @joaander has a MWE for this bug and has pushed a fix to the fix-syncedcollection-deadlock branch; the solution is a straightforward class-level locking of the per-resource thread creation operation. However, this bug is not responsible for the behavior originally observed in this issue.
  2. The problem causing the original issue on this thread is that lazy initialization of job attributes is not currently thread safe. All lazily initialized objects (statepoint, document, and stores) must check if initialization is required -- and if so, initialize -- in a thread-safe manner. This issue has to be fixed at the Job level, not the SyncedCollection level. @bdice is going to make a PR with the necessary changes, which basically come down to adding a lock to the Job class and using it inside all the lazily initialized properties.

@vyasr
Copy link
Contributor

vyasr commented Mar 14, 2021

Resolved by #529 and #530

@vyasr vyasr closed this as completed Mar 14, 2021
This was linked to pull requests Mar 14, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants