Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix thread safety of cached functions lock-freely #1949

Merged
merged 3 commits into from
Oct 2, 2021

Conversation

XuehaiPan
Copy link
Contributor

Summary

Description

Add thread locks to cached functions, fixes issuse #1948.

Signed-off-by: XuehaiPan <XuehaiPan@pku.edu.cn>
@XuehaiPan XuehaiPan changed the title Add thread locks to cached functions Fix: add thread locks to cached functions Sep 5, 2021
@XuehaiPan XuehaiPan changed the title Fix: add thread locks to cached functions Fix: add thread locks to ensure thread safety of cached functions Sep 27, 2021
@giampaolo
Copy link
Owner

This is definitively needed. I benchmarked it though, and it introduces a 11% slowdown.
Unfortunately that's a lot for this part of the code, because it's called multiple times, for different Process methods, typically in a loop (imagine an app monitoring CPU percentages of all processes every second, which is a typical use-case for psutil).
This would penalize both threaded and non-threaded apps too much.

I have an idea on how to solve this, but first I would like to find a way to reproduce the original issue that you reported in #1948

I tried this, but it doesn't work:

import psutil, threading


def worker():
    while 1:
        with p.oneshot():
            p.cpu_times()


p = psutil.Process()
for x in range(1000):
    t = threading.Thread(target=worker)
    t.start()

@XuehaiPan
Copy link
Contributor Author

XuehaiPan commented Oct 2, 2021

This is definitively needed. I benchmarked it though, and it introduces a 11% slowdown.
Unfortunately that's a lot for this part of the code, because it's called multiple times, for different Process methods, typically in a loop (imagine an app monitoring CPU percentages of all processes every second, which is a typical use-case for psutil).
This would penalize both threaded and non-threaded apps too much.

Indeed adding a thread lock to these commonly used functions will cause heavy overhead, especially for some apps monitoring all processes (up to ~10k).

I use multi-threading to gather process information asynchronously in my own app nvitop, which is to take snapshots for all GPU Processes every 2 seconds in 3 separated threads. There could be multiple threads to manipulate with the same Process instance at the same time. My current solution is to add a global snapshot lock to all these threads to ensure there is only one thread gathering information. But I think to make psutil a thread-safe library is better.

If the thread lock introduces too much overhead. I think just fix the AttributeError (lock-less) is fine:

    @functools.wraps(fun)
    def wrapper(self):
        try:
            # case 1: we previously entered oneshot() ctx
            ret = self._cache[fun]
        except AttributeError:
            # case 2: we never entered oneshot() ctx
            return fun(self)
        except KeyError:
            # case 3: we entered oneshot() ctx but there's no cache
            # for this entry yet
            ret = fun(self)
            try:
                self._cache[fun] = ret
            except AttributeError:
                pass  # inconsistency caused by multi-threading, just ignore it
            
        return ret

I have an idea on how to solve this, but first I would like to find a way to reproduce the original issue that you reported in #1948
I tried this, but it doesn't work

It is hard to reproduce issue #1948, sometimes it will take hours of running. The issue raises on both thread A and B entering oneshot() ctx with empty cache. Then thread B exits oneshot() ctx and delete attribute self._cache before thread A can set ret = self._cache[fun] = fun(self).

@giampaolo
Copy link
Owner

giampaolo commented Oct 2, 2021

If the thread lock introduces too much overhead. I think just fix the AttributeError (lock-less) is fine:
[snippet]

This is exactly what I had in mind: sacrifice caching if we bump into this corner case. If you update your PR I'll gladly merge it.

Signed-off-by: XuehaiPan <XuehaiPan@pku.edu.cn>
@XuehaiPan XuehaiPan changed the title Fix: add thread locks to ensure thread safety of cached functions Fix thread safety of cached functions lock-freely Oct 2, 2021
@XuehaiPan
Copy link
Contributor Author

XuehaiPan commented Oct 2, 2021

This is exactly what I had in mind: sacrifice caching if we bump into this corner case. If you update your PR I'll gladly merge it.

I have just updated the content and title of this PR.

@giampaolo giampaolo merged commit 29214fc into giampaolo:master Oct 2, 2021
@giampaolo
Copy link
Owner

Xiexie Xuehai. Merged.

giampaolo added a commit that referenced this pull request Oct 2, 2021
Signed-off-by: Giampaolo Rodola <g.rodola@gmail.com>
@XuehaiPan
Copy link
Contributor Author

XuehaiPan commented Oct 3, 2021

Hi, I find out why the snippet in #1949 (comment) cannot reproduce the race condition in issue #1948. There is a thread lock in oneshot():

psutil/psutil/__init__.py

Lines 455 to 456 in 53a6c03

with self._lock:
if hasattr(self, "_cache"):

It is thread safe within statement with proc.oneshot().


Here I provide two snippets to reproduce issue #1948:

import psutil
import threading


def worker_A(process):
    while True:
        process.cpu_times.cache_activate(process)
        process.cpu_times()
        process.cpu_times.cache_deactivate(process)


def worker_B(process):
    while True:
        process.cpu_times.cache_activate(process)
        # do something faster than worker A
        # delete cache before worker A's uncached function returning
        pass  # do nothing
        process.cpu_times.cache_deactivate(process)


this = psutil.Process()
threading.Thread(target=worker_A, args=(this,)).start()
threading.Thread(target=worker_B, args=(this,)).start()

Another snippet to reproduce:

import psutil
import threading
import time
import traceback


def worker_A(process, event, timeout):
    finish = time.perf_counter() + timeout
    while event.is_set() and time.perf_counter() < finish:
        with process.oneshot():
            process.cpu_times()
        time.sleep(0.1)

    event.clear()


def worker_B(process, event, timeout):
    finish = time.perf_counter() + timeout
    while event.is_set() and time.perf_counter() < finish:
        try:
            # worker B accidentally uses worker A's one-shot cache
            process.cpu_times()
        except Exception:
            # the cache can only deactivated by worker A, so the exception
            # only raises on worker B
            traceback.print_exc()
            break

    event.clear()


this = psutil.Process()
alive = threading.Event()
timeout = 10  # test passed if no expection after 10 seconds
alive.set()
thread_A = threading.Thread(name='Worker A', target=worker_A, args=(this, alive, timeout))
thread_A.start()
thread_B = threading.Thread(name='Worker B', target=worker_B, args=(this, alive, timeout))
thread_B.start()

thread_A.join()
thread_B.join()

The second snippet could be a common use case. Only one thread uses the oneshot() ctx but the other does not (if both, it's thread-safe). Should we add this to the test cases?

@giampaolo
Copy link
Owner

I don't think it's worth adding them as a test case, but thanks for the repro snippets. (I verified they indeed work without patch)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Linux] Thread-unsafe cache functions
2 participants