diff --git a/CREDITS b/CREDITS index 716de944c..690f5717f 100644 --- a/CREDITS +++ b/CREDITS @@ -664,3 +664,7 @@ I: 1665 N: Anselm Kruis W: https://github.com/akruis I: 1695 + +N: Michał Górny +W: https://github.com/mgorny +I: 1726 diff --git a/HISTORY.rst b/HISTORY.rst index 0c99965bb..b906066a3 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -1,5 +1,15 @@ *Bug tracker at https://github.com/giampaolo/psutil/issues* +5.7.1 (unreleased) +================== + +XXXX-XX-XX + +**Bug fixes** + +- 1726_: [Linux] cpu_freq() parsing should use spaces instead of tabs on ia64. + (patch by Michał Górny) + 5.7.0 ===== diff --git a/MANIFEST.in b/MANIFEST.in index 380a4fa24..d0d752401 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -103,6 +103,7 @@ include psutil/tests/test_posix.py include psutil/tests/test_process.py include psutil/tests/test_sunos.py include psutil/tests/test_system.py +include psutil/tests/test_testutils.py include psutil/tests/test_unicode.py include psutil/tests/test_windows.py include scripts/battery.py diff --git a/Makefile b/Makefile index 5c4d5b70d..fbd7ffcc8 100644 --- a/Makefile +++ b/Makefile @@ -124,6 +124,10 @@ test-misc: ## Run miscellaneous tests. ${MAKE} install $(TEST_PREFIX) $(PYTHON) psutil/tests/test_misc.py +test-testutils: ## Run test utils tests. + ${MAKE} install + $(TEST_PREFIX) $(PYTHON) psutil/tests/test_testutils.py + test-unicode: ## Test APIs dealing with strings. ${MAKE} install $(TEST_PREFIX) $(PYTHON) psutil/tests/test_unicode.py diff --git a/psutil/__init__.py b/psutil/__init__.py index 46c4a20bd..e6a2da8d6 100644 --- a/psutil/__init__.py +++ b/psutil/__init__.py @@ -226,7 +226,7 @@ __all__.extend(_psplatform.__extra__all__) __author__ = "Giampaolo Rodola'" -__version__ = "5.7.0" +__version__ = "5.7.1" version_info = tuple([int(num) for num in __version__.split('.')]) _timer = getattr(time, 'monotonic', time.time) diff --git a/psutil/_common.py b/psutil/_common.py index 8a39de7f8..b97bb01dc 100644 --- a/psutil/_common.py +++ b/psutil/_common.py @@ -785,7 +785,7 @@ def hilite(s, color="green", bold=False): if not term_supports_colors(): return s attr = [] - colors = dict(green='32', red='91', brown='33') + colors = dict(green='32', red='91', brown='33', yellow='93') colors[None] = '29' try: color = colors[color] @@ -812,7 +812,7 @@ def print_color(s, color="green", bold=False, file=sys.stdout): SetConsoleTextAttribute = \ ctypes.windll.Kernel32.SetConsoleTextAttribute - colors = dict(green=2, red=4, brown=6) + colors = dict(green=2, red=4, brown=6, yellow=6) colors[None] = DEFAULT_COLOR try: color = colors[color] diff --git a/psutil/_compat.py b/psutil/_compat.py index 2965fd1b5..64a5761ee 100644 --- a/psutil/_compat.py +++ b/psutil/_compat.py @@ -5,13 +5,14 @@ """Module which provides compatibility with older Python versions.""" import collections +import contextlib import errno import functools import os import sys __all__ = ["PY3", "long", "xrange", "unicode", "basestring", "u", "b", - "lru_cache", "which", "get_terminal_size", + "lru_cache", "which", "get_terminal_size", "redirect_stderr", "FileNotFoundError", "PermissionError", "ProcessLookupError", "InterruptedError", "ChildProcessError", "FileExistsError"] @@ -343,3 +344,17 @@ def get_terminal_size(fallback=(80, 24)): return (res[1], res[0]) except Exception: return fallback + + +# python 3.4 +try: + from contextlib import redirect_stderr +except ImportError: + @contextlib.contextmanager + def redirect_stderr(target): + original = sys.stderr + try: + sys.stderr = target + yield + finally: + sys.stderr = original diff --git a/psutil/tests/__init__.py b/psutil/tests/__init__.py index cd78fae67..f33588067 100644 --- a/psutil/tests/__init__.py +++ b/psutil/tests/__init__.py @@ -9,12 +9,12 @@ """ from __future__ import print_function - import atexit import contextlib import ctypes import errno import functools +import gc import os import random import re @@ -40,7 +40,9 @@ from psutil import POSIX from psutil import SUNOS from psutil import WINDOWS +from psutil._common import bytes2human from psutil._common import supports_ipv6 +from psutil._common import print_color from psutil._compat import ChildProcessError from psutil._compat import FileExistsError from psutil._compat import FileNotFoundError @@ -48,6 +50,7 @@ from psutil._compat import u from psutil._compat import unicode from psutil._compat import which +from psutil._compat import xrange if sys.version_info < (2, 7): import unittest2 as unittest # requires "pip install unittest2" @@ -84,7 +87,7 @@ 'ThreadTask' # test utils 'unittest', 'skip_on_access_denied', 'skip_on_not_implemented', - 'retry_on_failure', + 'retry_on_failure', 'TestMemoryLeak', # install utils 'install_pip', 'install_test_deps', # fs utils @@ -814,6 +817,120 @@ def __str__(self): unittest.TestCase = TestCase +@unittest.skipIf(PYPY, "unreliable on PYPY") +class TestMemoryLeak(unittest.TestCase): + """Test framework class for detecting function memory leaks (typically + functions implemented in C). + It does so by calling a function many times, and checks whether the + process memory usage increased before and after having called the + function repeadetly. + Note that sometimes this may produce false positives. + PyPy appears to be completely unstable for this framework, probably + because of how its JIT handles memory, so tests on PYPY are + automatically skipped. + """ + # Configurable class attrs. + times = 1200 + warmup_times = 10 + tolerance = 4096 # memory + retry_for = 3.0 # seconds + verbose = True + + def setUp(self): + self._thisproc = psutil.Process() + gc.collect() + + def _get_mem(self): + # USS is the closest thing we have to "real" memory usage and it + # should be less likely to produce false positives. + mem = self._thisproc.memory_full_info() + return getattr(mem, "uss", mem.rss) + + def _call(self, fun): + return fun() + + def _itercall(self, fun, iterator): + """Get 2 distinct memory samples, before and after having + called fun repeadetly, and return the memory difference. + """ + ncalls = 0 + gc.collect() + mem1 = self._get_mem() + for x in iterator: + ret = self._call(fun) + ncalls += 1 + del x, ret + gc.collect() + mem2 = self._get_mem() + self.assertEqual(gc.garbage, []) + diff = mem2 - mem1 + if diff < 0: + self._log("negative memory diff -%s" % (bytes2human(abs(diff)))) + return (diff, ncalls) + + def _call_ntimes(self, fun, times): + return self._itercall(fun, xrange(times))[0] + + def _call_for(self, fun, secs): + def iterator(secs): + stop_at = time.time() + secs + while time.time() < stop_at: + yield + return self._itercall(fun, iterator(secs)) + + def _log(self, msg): + if self.verbose: + print_color(msg, color="yellow", file=sys.stderr) + + def execute(self, fun, times=times, warmup_times=warmup_times, + tolerance=tolerance, retry_for=retry_for): + """Test a callable.""" + if times <= 0: + raise ValueError("times must be > 0") + if warmup_times < 0: + raise ValueError("warmup_times must be >= 0") + if tolerance is not None and tolerance < 0: + raise ValueError("tolerance must be >= 0") + if retry_for is not None and retry_for < 0: + raise ValueError("retry_for must be >= 0") + + # warm up + self._call_ntimes(fun, warmup_times) + mem1 = self._call_ntimes(fun, times) + + if mem1 > tolerance: + # This doesn't necessarily mean we have a leak yet. + # At this point we assume that after having called the + # function so many times the memory usage is stabilized + # and if there are no leaks it should not increase + # anymore. Let's keep calling fun for N more seconds and + # fail if we notice any difference (ignore tolerance). + msg = "+%s after %s calls; try calling fun for another %s secs" % ( + bytes2human(mem1), times, retry_for) + if not retry_for: + raise self.fail(msg) + else: + self._log(msg) + + mem2, ncalls = self._call_for(fun, retry_for) + if mem2 > mem1: + # failure + msg = "+%s memory increase after %s calls; " % ( + bytes2human(mem1), times) + msg += "+%s after another %s calls over %s secs" % ( + bytes2human(mem2), ncalls, retry_for) + raise self.fail(msg) + + def execute_w_exc(self, exc, fun, **kwargs): + """Convenience method to test a callable while making sure it + raises an exception on every call. + """ + def call(): + self.assertRaises(exc, fun) + + self.execute(call, **kwargs) + + def retry_on_failure(retries=NO_RETRIES): """Decorator which runs a test function and retries N times before actually failing. diff --git a/psutil/tests/test_memory_leaks.py b/psutil/tests/test_memory_leaks.py index f9cad70fd..b0b4af1b5 100755 --- a/psutil/tests/test_memory_leaks.py +++ b/psutil/tests/test_memory_leaks.py @@ -17,11 +17,7 @@ from __future__ import print_function import functools -import gc import os -import sys -import threading -import time import psutil import psutil._common @@ -32,9 +28,7 @@ from psutil import POSIX from psutil import SUNOS from psutil import WINDOWS -from psutil._common import bytes2human from psutil._compat import ProcessLookupError -from psutil._compat import xrange from psutil.tests import CIRRUS from psutil.tests import create_sockets from psutil.tests import get_test_subprocess @@ -50,21 +44,15 @@ from psutil.tests import HAS_SENSORS_BATTERY from psutil.tests import HAS_SENSORS_FANS from psutil.tests import HAS_SENSORS_TEMPERATURES -from psutil.tests import PYPY from psutil.tests import reap_children from psutil.tests import safe_rmpath from psutil.tests import skip_on_access_denied from psutil.tests import TESTFN +from psutil.tests import TestMemoryLeak from psutil.tests import TRAVIS from psutil.tests import unittest - -# configurable opts -LOOPS = 1000 -MEMORY_TOLERANCE = 4096 -RETRY_FOR = 3 SKIP_PYTHON_IMPL = True - cext = psutil._psplatform.cext thisproc = psutil.Process() @@ -79,107 +67,12 @@ def skip_if_linux(): "worthless on LINUX (pure python)") -@unittest.skipIf(PYPY, "unreliable on PYPY") -class TestMemLeak(unittest.TestCase): - """Base framework class which calls a function many times and - produces a failure if process memory usage keeps increasing - between calls or over time. - """ - tolerance = MEMORY_TOLERANCE - loops = LOOPS - retry_for = RETRY_FOR - - def setUp(self): - gc.collect() - - def execute(self, fun, *args, **kwargs): - """Test a callable.""" - def call_many_times(): - for x in xrange(loops): - self._call(fun, *args, **kwargs) - del x - gc.collect() - - tolerance = kwargs.pop('tolerance_', None) or self.tolerance - loops = kwargs.pop('loops_', None) or self.loops - retry_for = kwargs.pop('retry_for_', None) or self.retry_for - - # warm up - for x in range(10): - self._call(fun, *args, **kwargs) - self.assertEqual(gc.garbage, []) - self.assertEqual(threading.active_count(), 1) - self.assertEqual(thisproc.children(), []) - - # Get 2 distinct memory samples, before and after having - # called fun repeadetly. - # step 1 - call_many_times() - mem1 = self._get_mem() - # step 2 - call_many_times() - mem2 = self._get_mem() - - diff1 = mem2 - mem1 - if diff1 > tolerance: - # This doesn't necessarily mean we have a leak yet. - # At this point we assume that after having called the - # function so many times the memory usage is stabilized - # and if there are no leaks it should not increase - # anymore. - # Let's keep calling fun for 3 more seconds and fail if - # we notice any difference. - ncalls = 0 - stop_at = time.time() + retry_for - while time.time() <= stop_at: - self._call(fun, *args, **kwargs) - ncalls += 1 - - del stop_at - gc.collect() - mem3 = self._get_mem() - diff2 = mem3 - mem2 - - if mem3 > mem2: - # failure - extra_proc_mem = bytes2human(diff1 + diff2) - print("exta proc mem: %s" % extra_proc_mem, file=sys.stderr) - msg = "+%s after %s calls, +%s after another %s calls, " - msg += "+%s extra proc mem" - msg = msg % ( - bytes2human(diff1), loops, bytes2human(diff2), ncalls, - extra_proc_mem) - self.fail(msg) - - def execute_w_exc(self, exc, fun, *args, **kwargs): - """Convenience function which tests a callable raising - an exception. - """ - def call(): - self.assertRaises(exc, fun, *args, **kwargs) - - self.execute(call) - - @staticmethod - def _get_mem(): - # By using USS memory it seems it's less likely to bump - # into false positives. - if LINUX or WINDOWS or MACOS: - return thisproc.memory_full_info().uss - else: - return thisproc.memory_info().rss - - @staticmethod - def _call(fun, *args, **kwargs): - fun(*args, **kwargs) - - # =================================================================== # Process class # =================================================================== -class TestProcessObjectLeaks(TestMemLeak): +class TestProcessObjectLeaks(TestMemoryLeak): """Test leaks of Process class methods.""" proc = thisproc @@ -232,7 +125,7 @@ def test_nice_get(self): def test_nice_set(self): niceness = thisproc.nice() - self.execute(self.proc.nice, niceness) + self.execute(lambda: self.proc.nice(niceness)) @unittest.skipIf(not HAS_IONICE, "not supported") def test_ionice_get(self): @@ -242,9 +135,9 @@ def test_ionice_get(self): def test_ionice_set(self): if WINDOWS: value = thisproc.ionice() - self.execute(self.proc.ionice, value) + self.execute(lambda: self.proc.ionice(value)) else: - self.execute(self.proc.ionice, psutil.IOPRIO_CLASS_NONE) + self.execute(lambda: self.proc.ionice(psutil.IOPRIO_CLASS_NONE)) fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0) self.execute_w_exc(OSError, fun) @@ -322,9 +215,10 @@ def test_cpu_affinity_get(self): @unittest.skipIf(not HAS_CPU_AFFINITY, "not supported") def test_cpu_affinity_set(self): affinity = thisproc.cpu_affinity() - self.execute(self.proc.cpu_affinity, affinity) + self.execute(lambda: self.proc.cpu_affinity(affinity)) if not TRAVIS: - self.execute_w_exc(ValueError, self.proc.cpu_affinity, [-1]) + self.execute_w_exc( + ValueError, lambda: self.proc.cpu_affinity([-1])) @skip_if_linux() def test_open_files(self): @@ -340,14 +234,14 @@ def test_memory_maps(self): @unittest.skipIf(not LINUX, "LINUX only") @unittest.skipIf(not HAS_RLIMIT, "not supported") def test_rlimit_get(self): - self.execute(self.proc.rlimit, psutil.RLIMIT_NOFILE) + self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE)) @unittest.skipIf(not LINUX, "LINUX only") @unittest.skipIf(not HAS_RLIMIT, "not supported") def test_rlimit_set(self): limit = thisproc.rlimit(psutil.RLIMIT_NOFILE) - self.execute(self.proc.rlimit, psutil.RLIMIT_NOFILE, limit) - self.execute_w_exc(OSError, self.proc.rlimit, -1) + self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE, limit)) + self.execute_w_exc(OSError, lambda: self.proc.rlimit(-1)) @skip_if_linux() # Windows implementation is based on a single system-wide @@ -359,7 +253,7 @@ def test_connections(self): # be executed. with create_sockets(): kind = 'inet' if SUNOS else 'all' - self.execute(self.proc.connections, kind) + self.execute(lambda: self.proc.connections(kind)) @unittest.skipIf(not HAS_ENVIRON, "not supported") def test_environ(self): @@ -371,13 +265,13 @@ def test_proc_info(self): @unittest.skipIf(not WINDOWS, "WINDOWS only") -class TestProcessDualImplementation(TestMemLeak): +class TestProcessDualImplementation(TestMemoryLeak): def test_cmdline_peb_true(self): - self.execute(cext.proc_cmdline, os.getpid(), use_peb=True) + self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=True)) def test_cmdline_peb_false(self): - self.execute(cext.proc_cmdline, os.getpid(), use_peb=False) + self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=False)) class TestTerminatedProcessLeaks(TestProcessObjectLeaks): @@ -400,9 +294,9 @@ def tearDownClass(cls): super(TestTerminatedProcessLeaks, cls).tearDownClass() reap_children() - def _call(self, fun, *args, **kwargs): + def _call(self, fun): try: - fun(*args, **kwargs) + fun() except psutil.NoSuchProcess: pass @@ -439,7 +333,7 @@ def call(): # =================================================================== -class TestModuleFunctionsLeaks(TestMemLeak): +class TestModuleFunctionsLeaks(TestMemoryLeak): """Test leaks of psutil module functions.""" def test_coverage(self): @@ -457,11 +351,11 @@ def test_coverage(self): @skip_if_linux() def test_cpu_count_logical(self): - self.execute(psutil.cpu_count, logical=True) + self.execute(lambda: psutil.cpu_count(logical=True)) @skip_if_linux() def test_cpu_count_physical(self): - self.execute(psutil.cpu_count, logical=False) + self.execute(lambda: psutil.cpu_count(logical=False)) @skip_if_linux() def test_cpu_times(self): @@ -469,7 +363,7 @@ def test_cpu_times(self): @skip_if_linux() def test_per_cpu_times(self): - self.execute(psutil.cpu_times, percpu=True) + self.execute(lambda: psutil.cpu_times(percpu=True)) @skip_if_linux() def test_cpu_stats(self): @@ -497,14 +391,14 @@ def test_swap_memory(self): @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL, "worthless on POSIX (pure python)") def test_pid_exists(self): - self.execute(psutil.pid_exists, os.getpid()) + self.execute(lambda: psutil.pid_exists(os.getpid())) # --- disk @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL, "worthless on POSIX (pure python)") def test_disk_usage(self): - self.execute(psutil.disk_usage, '.') + self.execute(lambda: psutil.disk_usage('.')) def test_disk_partitions(self): self.execute(psutil.disk_partitions) @@ -513,7 +407,7 @@ def test_disk_partitions(self): '/proc/diskstats not available on this Linux version') @skip_if_linux() def test_disk_io_counters(self): - self.execute(psutil.disk_io_counters, nowrap=False) + self.execute(lambda: psutil.disk_io_counters(nowrap=False)) # --- proc @@ -528,7 +422,7 @@ def test_pids(self): @skip_if_linux() @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') def test_net_io_counters(self): - self.execute(psutil.net_io_counters, nowrap=False) + self.execute(lambda: psutil.net_io_counters(nowrap=False)) @skip_if_linux() @unittest.skipIf(MACOS and os.getuid() != 0, "need root access") @@ -539,7 +433,7 @@ def test_net_connections(self): def test_net_if_addrs(self): # Note: verified that on Windows this was a false positive. self.execute(psutil.net_if_addrs, - tolerance_=80 * 1024 if WINDOWS else None) + tolerance=80 * 1024 if WINDOWS else 4096) @unittest.skipIf(TRAVIS, "EPERM on travis") def test_net_if_stats(self): @@ -584,15 +478,15 @@ def test_win_service_get(self): def test_win_service_get_config(self): name = next(psutil.win_service_iter()).name() - self.execute(cext.winservice_query_config, name) + self.execute(lambda: cext.winservice_query_config(name)) def test_win_service_get_status(self): name = next(psutil.win_service_iter()).name() - self.execute(cext.winservice_query_status, name) + self.execute(lambda: cext.winservice_query_status(name)) def test_win_service_get_description(self): name = next(psutil.win_service_iter()).name() - self.execute(cext.winservice_query_descr, name) + self.execute(lambda: cext.winservice_query_descr(name)) if __name__ == '__main__': diff --git a/psutil/tests/test_misc.py b/psutil/tests/test_misc.py index c20cd9413..ca0a04336 100755 --- a/psutil/tests/test_misc.py +++ b/psutil/tests/test_misc.py @@ -11,7 +11,6 @@ import ast import collections -import contextlib import errno import json import os @@ -19,58 +18,33 @@ import socket import stat -from psutil import FREEBSD from psutil import LINUX -from psutil import NETBSD from psutil import POSIX from psutil import WINDOWS from psutil._common import memoize from psutil._common import memoize_when_activated from psutil._common import supports_ipv6 from psutil._common import wrap_numbers -from psutil._common import open_text -from psutil._common import open_binary from psutil._compat import PY3 from psutil.tests import APPVEYOR -from psutil.tests import bind_socket -from psutil.tests import bind_unix_socket -from psutil.tests import call_until -from psutil.tests import chdir from psutil.tests import CI_TESTING -from psutil.tests import create_proc_children_pair -from psutil.tests import create_sockets -from psutil.tests import create_zombie_proc from psutil.tests import DEVNULL -from psutil.tests import get_free_port -from psutil.tests import get_test_subprocess from psutil.tests import HAS_BATTERY -from psutil.tests import HAS_CONNECTIONS_UNIX from psutil.tests import HAS_MEMORY_MAPS from psutil.tests import HAS_NET_IO_COUNTERS from psutil.tests import HAS_SENSORS_BATTERY from psutil.tests import HAS_SENSORS_FANS from psutil.tests import HAS_SENSORS_TEMPERATURES from psutil.tests import import_module_by_path -from psutil.tests import is_namedtuple from psutil.tests import mock from psutil.tests import PYTHON_EXE -from psutil.tests import reap_children from psutil.tests import reload_module -from psutil.tests import retry from psutil.tests import ROOT_DIR -from psutil.tests import safe_mkdir -from psutil.tests import safe_rmpath from psutil.tests import SCRIPTS_DIR from psutil.tests import sh -from psutil.tests import tcp_socketpair -from psutil.tests import TESTFN from psutil.tests import TOX from psutil.tests import TRAVIS from psutil.tests import unittest -from psutil.tests import unix_socket_path -from psutil.tests import unix_socketpair -from psutil.tests import wait_for_file -from psutil.tests import wait_for_pid import psutil import psutil.tests @@ -787,279 +761,6 @@ def test_sensors(self): self.assert_stdout('sensors.py') -# =================================================================== -# --- Unit tests for test utilities. -# =================================================================== - - -class TestRetryDecorator(unittest.TestCase): - - @mock.patch('time.sleep') - def test_retry_success(self, sleep): - # Fail 3 times out of 5; make sure the decorated fun returns. - - @retry(retries=5, interval=1, logfun=None) - def foo(): - while queue: - queue.pop() - 1 / 0 - return 1 - - queue = list(range(3)) - self.assertEqual(foo(), 1) - self.assertEqual(sleep.call_count, 3) - - @mock.patch('time.sleep') - def test_retry_failure(self, sleep): - # Fail 6 times out of 5; th function is supposed to raise exc. - - @retry(retries=5, interval=1, logfun=None) - def foo(): - while queue: - queue.pop() - 1 / 0 - return 1 - - queue = list(range(6)) - self.assertRaises(ZeroDivisionError, foo) - self.assertEqual(sleep.call_count, 5) - - @mock.patch('time.sleep') - def test_exception_arg(self, sleep): - @retry(exception=ValueError, interval=1) - def foo(): - raise TypeError - - self.assertRaises(TypeError, foo) - self.assertEqual(sleep.call_count, 0) - - @mock.patch('time.sleep') - def test_no_interval_arg(self, sleep): - # if interval is not specified sleep is not supposed to be called - - @retry(retries=5, interval=None, logfun=None) - def foo(): - 1 / 0 - - self.assertRaises(ZeroDivisionError, foo) - self.assertEqual(sleep.call_count, 0) - - @mock.patch('time.sleep') - def test_retries_arg(self, sleep): - - @retry(retries=5, interval=1, logfun=None) - def foo(): - 1 / 0 - - self.assertRaises(ZeroDivisionError, foo) - self.assertEqual(sleep.call_count, 5) - - @mock.patch('time.sleep') - def test_retries_and_timeout_args(self, sleep): - self.assertRaises(ValueError, retry, retries=5, timeout=1) - - -class TestSyncTestUtils(unittest.TestCase): - - def tearDown(self): - safe_rmpath(TESTFN) - - def test_wait_for_pid(self): - wait_for_pid(os.getpid()) - nopid = max(psutil.pids()) + 99999 - with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])): - self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid) - - def test_wait_for_file(self): - with open(TESTFN, 'w') as f: - f.write('foo') - wait_for_file(TESTFN) - assert not os.path.exists(TESTFN) - - def test_wait_for_file_empty(self): - with open(TESTFN, 'w'): - pass - wait_for_file(TESTFN, empty=True) - assert not os.path.exists(TESTFN) - - def test_wait_for_file_no_file(self): - with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])): - self.assertRaises(IOError, wait_for_file, TESTFN) - - def test_wait_for_file_no_delete(self): - with open(TESTFN, 'w') as f: - f.write('foo') - wait_for_file(TESTFN, delete=False) - assert os.path.exists(TESTFN) - - def test_call_until(self): - ret = call_until(lambda: 1, "ret == 1") - self.assertEqual(ret, 1) - - -class TestFSTestUtils(unittest.TestCase): - - def setUp(self): - safe_rmpath(TESTFN) - - tearDown = setUp - - def test_open_text(self): - with open_text(__file__) as f: - self.assertEqual(f.mode, 'rt') - - def test_open_binary(self): - with open_binary(__file__) as f: - self.assertEqual(f.mode, 'rb') - - def test_safe_mkdir(self): - safe_mkdir(TESTFN) - assert os.path.isdir(TESTFN) - safe_mkdir(TESTFN) - assert os.path.isdir(TESTFN) - - def test_safe_rmpath(self): - # test file is removed - open(TESTFN, 'w').close() - safe_rmpath(TESTFN) - assert not os.path.exists(TESTFN) - # test no exception if path does not exist - safe_rmpath(TESTFN) - # test dir is removed - os.mkdir(TESTFN) - safe_rmpath(TESTFN) - assert not os.path.exists(TESTFN) - # test other exceptions are raised - with mock.patch('psutil.tests.os.stat', - side_effect=OSError(errno.EINVAL, "")) as m: - with self.assertRaises(OSError): - safe_rmpath(TESTFN) - assert m.called - - def test_chdir(self): - base = os.getcwd() - os.mkdir(TESTFN) - with chdir(TESTFN): - self.assertEqual(os.getcwd(), os.path.join(base, TESTFN)) - self.assertEqual(os.getcwd(), base) - - -class TestProcessUtils(unittest.TestCase): - - def test_reap_children(self): - subp = get_test_subprocess() - p = psutil.Process(subp.pid) - assert p.is_running() - reap_children() - assert not p.is_running() - assert not psutil.tests._pids_started - assert not psutil.tests._subprocesses_started - - def test_create_proc_children_pair(self): - p1, p2 = create_proc_children_pair() - self.assertNotEqual(p1.pid, p2.pid) - assert p1.is_running() - assert p2.is_running() - children = psutil.Process().children(recursive=True) - self.assertEqual(len(children), 2) - self.assertIn(p1, children) - self.assertIn(p2, children) - self.assertEqual(p1.ppid(), os.getpid()) - self.assertEqual(p2.ppid(), p1.pid) - - # make sure both of them are cleaned up - reap_children() - assert not p1.is_running() - assert not p2.is_running() - assert not psutil.tests._pids_started - assert not psutil.tests._subprocesses_started - - @unittest.skipIf(not POSIX, "POSIX only") - def test_create_zombie_proc(self): - zpid = create_zombie_proc() - self.addCleanup(reap_children, recursive=True) - p = psutil.Process(zpid) - self.assertEqual(p.status(), psutil.STATUS_ZOMBIE) - - -class TestNetUtils(unittest.TestCase): - - def bind_socket(self): - port = get_free_port() - with contextlib.closing(bind_socket(addr=('', port))) as s: - self.assertEqual(s.getsockname()[1], port) - - @unittest.skipIf(not POSIX, "POSIX only") - def test_bind_unix_socket(self): - with unix_socket_path() as name: - sock = bind_unix_socket(name) - with contextlib.closing(sock): - self.assertEqual(sock.family, socket.AF_UNIX) - self.assertEqual(sock.type, socket.SOCK_STREAM) - self.assertEqual(sock.getsockname(), name) - assert os.path.exists(name) - assert stat.S_ISSOCK(os.stat(name).st_mode) - # UDP - with unix_socket_path() as name: - sock = bind_unix_socket(name, type=socket.SOCK_DGRAM) - with contextlib.closing(sock): - self.assertEqual(sock.type, socket.SOCK_DGRAM) - - def tcp_tcp_socketpair(self): - addr = ("127.0.0.1", get_free_port()) - server, client = tcp_socketpair(socket.AF_INET, addr=addr) - with contextlib.closing(server): - with contextlib.closing(client): - # Ensure they are connected and the positions are - # correct. - self.assertEqual(server.getsockname(), addr) - self.assertEqual(client.getpeername(), addr) - self.assertNotEqual(client.getsockname(), addr) - - @unittest.skipIf(not POSIX, "POSIX only") - @unittest.skipIf(NETBSD or FREEBSD, - "/var/run/log UNIX socket opened by default") - def test_unix_socketpair(self): - p = psutil.Process() - num_fds = p.num_fds() - assert not p.connections(kind='unix') - with unix_socket_path() as name: - server, client = unix_socketpair(name) - try: - assert os.path.exists(name) - assert stat.S_ISSOCK(os.stat(name).st_mode) - self.assertEqual(p.num_fds() - num_fds, 2) - self.assertEqual(len(p.connections(kind='unix')), 2) - self.assertEqual(server.getsockname(), name) - self.assertEqual(client.getpeername(), name) - finally: - client.close() - server.close() - - def test_create_sockets(self): - with create_sockets() as socks: - fams = collections.defaultdict(int) - types = collections.defaultdict(int) - for s in socks: - fams[s.family] += 1 - # work around http://bugs.python.org/issue30204 - types[s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)] += 1 - self.assertGreaterEqual(fams[socket.AF_INET], 2) - if supports_ipv6(): - self.assertGreaterEqual(fams[socket.AF_INET6], 2) - if POSIX and HAS_CONNECTIONS_UNIX: - self.assertGreaterEqual(fams[socket.AF_UNIX], 2) - self.assertGreaterEqual(types[socket.SOCK_STREAM], 2) - self.assertGreaterEqual(types[socket.SOCK_DGRAM], 2) - - -class TestOtherUtils(unittest.TestCase): - - def test_is_namedtuple(self): - assert is_namedtuple(collections.namedtuple('foo', 'a b c')(1, 2, 3)) - assert not is_namedtuple(tuple()) - - if __name__ == '__main__': from psutil.tests.runner import run run(__file__) diff --git a/psutil/tests/test_testutils.py b/psutil/tests/test_testutils.py new file mode 100755 index 000000000..7d3d6675c --- /dev/null +++ b/psutil/tests/test_testutils.py @@ -0,0 +1,400 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +""" +Tests for testing utils (psutil.tests namespace). +""" + +import collections +import contextlib +import errno +import io +import os +import socket +import stat + +from psutil import FREEBSD +from psutil import NETBSD +from psutil import POSIX +from psutil._common import open_binary +from psutil._common import open_text +from psutil._common import supports_ipv6 +from psutil._compat import PY3 +from psutil._compat import redirect_stderr +from psutil.tests import bind_socket +from psutil.tests import bind_unix_socket +from psutil.tests import call_until +from psutil.tests import chdir +from psutil.tests import create_proc_children_pair +from psutil.tests import create_sockets +from psutil.tests import create_zombie_proc +from psutil.tests import get_free_port +from psutil.tests import get_test_subprocess +from psutil.tests import HAS_CONNECTIONS_UNIX +from psutil.tests import is_namedtuple +from psutil.tests import mock +from psutil.tests import reap_children +from psutil.tests import retry +from psutil.tests import retry_on_failure +from psutil.tests import safe_mkdir +from psutil.tests import safe_rmpath +from psutil.tests import tcp_socketpair +from psutil.tests import TESTFN +from psutil.tests import TestMemoryLeak +from psutil.tests import unittest +from psutil.tests import unix_socket_path +from psutil.tests import unix_socketpair +from psutil.tests import wait_for_file +from psutil.tests import wait_for_pid +import psutil +import psutil.tests + +# =================================================================== +# --- Unit tests for test utilities. +# =================================================================== + + +class TestRetryDecorator(unittest.TestCase): + + @mock.patch('time.sleep') + def test_retry_success(self, sleep): + # Fail 3 times out of 5; make sure the decorated fun returns. + + @retry(retries=5, interval=1, logfun=None) + def foo(): + while queue: + queue.pop() + 1 / 0 + return 1 + + queue = list(range(3)) + self.assertEqual(foo(), 1) + self.assertEqual(sleep.call_count, 3) + + @mock.patch('time.sleep') + def test_retry_failure(self, sleep): + # Fail 6 times out of 5; th function is supposed to raise exc. + @retry(retries=5, interval=1, logfun=None) + def foo(): + while queue: + queue.pop() + 1 / 0 + return 1 + + queue = list(range(6)) + self.assertRaises(ZeroDivisionError, foo) + self.assertEqual(sleep.call_count, 5) + + @mock.patch('time.sleep') + def test_exception_arg(self, sleep): + @retry(exception=ValueError, interval=1) + def foo(): + raise TypeError + + self.assertRaises(TypeError, foo) + self.assertEqual(sleep.call_count, 0) + + @mock.patch('time.sleep') + def test_no_interval_arg(self, sleep): + # if interval is not specified sleep is not supposed to be called + + @retry(retries=5, interval=None, logfun=None) + def foo(): + 1 / 0 + + self.assertRaises(ZeroDivisionError, foo) + self.assertEqual(sleep.call_count, 0) + + @mock.patch('time.sleep') + def test_retries_arg(self, sleep): + + @retry(retries=5, interval=1, logfun=None) + def foo(): + 1 / 0 + + self.assertRaises(ZeroDivisionError, foo) + self.assertEqual(sleep.call_count, 5) + + @mock.patch('time.sleep') + def test_retries_and_timeout_args(self, sleep): + self.assertRaises(ValueError, retry, retries=5, timeout=1) + + +class TestSyncTestUtils(unittest.TestCase): + + def tearDown(self): + safe_rmpath(TESTFN) + + def test_wait_for_pid(self): + wait_for_pid(os.getpid()) + nopid = max(psutil.pids()) + 99999 + with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])): + self.assertRaises(psutil.NoSuchProcess, wait_for_pid, nopid) + + def test_wait_for_file(self): + with open(TESTFN, 'w') as f: + f.write('foo') + wait_for_file(TESTFN) + assert not os.path.exists(TESTFN) + + def test_wait_for_file_empty(self): + with open(TESTFN, 'w'): + pass + wait_for_file(TESTFN, empty=True) + assert not os.path.exists(TESTFN) + + def test_wait_for_file_no_file(self): + with mock.patch('psutil.tests.retry.__iter__', return_value=iter([0])): + self.assertRaises(IOError, wait_for_file, TESTFN) + + def test_wait_for_file_no_delete(self): + with open(TESTFN, 'w') as f: + f.write('foo') + wait_for_file(TESTFN, delete=False) + assert os.path.exists(TESTFN) + + def test_call_until(self): + ret = call_until(lambda: 1, "ret == 1") + self.assertEqual(ret, 1) + + +class TestFSTestUtils(unittest.TestCase): + + def setUp(self): + safe_rmpath(TESTFN) + + tearDown = setUp + + def test_open_text(self): + with open_text(__file__) as f: + self.assertEqual(f.mode, 'rt') + + def test_open_binary(self): + with open_binary(__file__) as f: + self.assertEqual(f.mode, 'rb') + + def test_safe_mkdir(self): + safe_mkdir(TESTFN) + assert os.path.isdir(TESTFN) + safe_mkdir(TESTFN) + assert os.path.isdir(TESTFN) + + def test_safe_rmpath(self): + # test file is removed + open(TESTFN, 'w').close() + safe_rmpath(TESTFN) + assert not os.path.exists(TESTFN) + # test no exception if path does not exist + safe_rmpath(TESTFN) + # test dir is removed + os.mkdir(TESTFN) + safe_rmpath(TESTFN) + assert not os.path.exists(TESTFN) + # test other exceptions are raised + with mock.patch('psutil.tests.os.stat', + side_effect=OSError(errno.EINVAL, "")) as m: + with self.assertRaises(OSError): + safe_rmpath(TESTFN) + assert m.called + + def test_chdir(self): + base = os.getcwd() + os.mkdir(TESTFN) + with chdir(TESTFN): + self.assertEqual(os.getcwd(), os.path.join(base, TESTFN)) + self.assertEqual(os.getcwd(), base) + + +class TestProcessUtils(unittest.TestCase): + + def test_reap_children(self): + subp = get_test_subprocess() + p = psutil.Process(subp.pid) + assert p.is_running() + reap_children() + assert not p.is_running() + assert not psutil.tests._pids_started + assert not psutil.tests._subprocesses_started + + def test_create_proc_children_pair(self): + p1, p2 = create_proc_children_pair() + self.assertNotEqual(p1.pid, p2.pid) + assert p1.is_running() + assert p2.is_running() + children = psutil.Process().children(recursive=True) + self.assertEqual(len(children), 2) + self.assertIn(p1, children) + self.assertIn(p2, children) + self.assertEqual(p1.ppid(), os.getpid()) + self.assertEqual(p2.ppid(), p1.pid) + + # make sure both of them are cleaned up + reap_children() + assert not p1.is_running() + assert not p2.is_running() + assert not psutil.tests._pids_started + assert not psutil.tests._subprocesses_started + + @unittest.skipIf(not POSIX, "POSIX only") + def test_create_zombie_proc(self): + zpid = create_zombie_proc() + self.addCleanup(reap_children, recursive=True) + p = psutil.Process(zpid) + self.assertEqual(p.status(), psutil.STATUS_ZOMBIE) + + +class TestNetUtils(unittest.TestCase): + + def bind_socket(self): + port = get_free_port() + with contextlib.closing(bind_socket(addr=('', port))) as s: + self.assertEqual(s.getsockname()[1], port) + + @unittest.skipIf(not POSIX, "POSIX only") + def test_bind_unix_socket(self): + with unix_socket_path() as name: + sock = bind_unix_socket(name) + with contextlib.closing(sock): + self.assertEqual(sock.family, socket.AF_UNIX) + self.assertEqual(sock.type, socket.SOCK_STREAM) + self.assertEqual(sock.getsockname(), name) + assert os.path.exists(name) + assert stat.S_ISSOCK(os.stat(name).st_mode) + # UDP + with unix_socket_path() as name: + sock = bind_unix_socket(name, type=socket.SOCK_DGRAM) + with contextlib.closing(sock): + self.assertEqual(sock.type, socket.SOCK_DGRAM) + + def tcp_tcp_socketpair(self): + addr = ("127.0.0.1", get_free_port()) + server, client = tcp_socketpair(socket.AF_INET, addr=addr) + with contextlib.closing(server): + with contextlib.closing(client): + # Ensure they are connected and the positions are + # correct. + self.assertEqual(server.getsockname(), addr) + self.assertEqual(client.getpeername(), addr) + self.assertNotEqual(client.getsockname(), addr) + + @unittest.skipIf(not POSIX, "POSIX only") + @unittest.skipIf(NETBSD or FREEBSD, + "/var/run/log UNIX socket opened by default") + def test_unix_socketpair(self): + p = psutil.Process() + num_fds = p.num_fds() + assert not p.connections(kind='unix') + with unix_socket_path() as name: + server, client = unix_socketpair(name) + try: + assert os.path.exists(name) + assert stat.S_ISSOCK(os.stat(name).st_mode) + self.assertEqual(p.num_fds() - num_fds, 2) + self.assertEqual(len(p.connections(kind='unix')), 2) + self.assertEqual(server.getsockname(), name) + self.assertEqual(client.getpeername(), name) + finally: + client.close() + server.close() + + def test_create_sockets(self): + with create_sockets() as socks: + fams = collections.defaultdict(int) + types = collections.defaultdict(int) + for s in socks: + fams[s.family] += 1 + # work around http://bugs.python.org/issue30204 + types[s.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)] += 1 + self.assertGreaterEqual(fams[socket.AF_INET], 2) + if supports_ipv6(): + self.assertGreaterEqual(fams[socket.AF_INET6], 2) + if POSIX and HAS_CONNECTIONS_UNIX: + self.assertGreaterEqual(fams[socket.AF_UNIX], 2) + self.assertGreaterEqual(types[socket.SOCK_STREAM], 2) + self.assertGreaterEqual(types[socket.SOCK_DGRAM], 2) + + +class TestMemLeakClass(TestMemoryLeak): + + def test_times(self): + def fun(): + cnt['cnt'] += 1 + cnt = {'cnt': 0} + self.execute(fun, times=1, warmup_times=10) + self.assertEqual(cnt['cnt'], 11) + self.execute(fun, times=10, warmup_times=10) + self.assertEqual(cnt['cnt'], 31) + + def test_warmup_times(self): + def fun(): + cnt['cnt'] += 1 + cnt = {'cnt': 0} + self.execute(fun, times=1, warmup_times=10) + self.assertEqual(cnt['cnt'], 11) + + def test_param_err(self): + self.assertRaises(ValueError, self.execute, lambda: 0, times=0) + self.assertRaises(ValueError, self.execute, lambda: 0, times=-1) + self.assertRaises(ValueError, self.execute, lambda: 0, warmup_times=-1) + self.assertRaises(ValueError, self.execute, lambda: 0, tolerance=-1) + self.assertRaises(ValueError, self.execute, lambda: 0, retry_for=-1) + + def test_leak(self): + def fun(): + ls.append("x" * 24 * 1024) + ls = [] + times = 100 + self.assertRaises(AssertionError, self.execute, fun, times=times, + warmup_times=10, retry_for=None) + self.assertEqual(len(ls), times + 10) + + @retry_on_failure(retries=20) # 2 secs + def test_leak_with_retry(self, ls=[]): + def fun(): + ls.append("x" * 24 * 1024) + times = 100 + f = io.StringIO() if PY3 else io.BytesIO() + with redirect_stderr(f): + self.assertRaises(AssertionError, self.execute, fun, times=times, + retry_for=0.1) + self.assertIn("try calling fun for another", f.getvalue()) + self.assertGreater(len(ls), times) + + def test_tolerance(self): + def fun(): + ls.append("x" * 24 * 1024) + ls = [] + times = 100 + self.execute(fun, times=times, warmup_times=0, + tolerance=200 * 1024 * 1024) + self.assertEqual(len(ls), times) + + def test_execute_w_exc(self): + def fun(): + 1 / 0 + self.execute_w_exc(ZeroDivisionError, fun, times=2000, warmup_times=20, + tolerance=4096, retry_for=3) + + with self.assertRaises(ZeroDivisionError): + self.execute_w_exc(OSError, fun) + + def fun(): + pass + with self.assertRaises(AssertionError): + self.execute_w_exc(ZeroDivisionError, fun) + + +class TestOtherUtils(unittest.TestCase): + + def test_is_namedtuple(self): + assert is_namedtuple(collections.namedtuple('foo', 'a b c')(1, 2, 3)) + assert not is_namedtuple(tuple()) + + +if __name__ == '__main__': + from psutil.tests.runner import run + run(__file__) diff --git a/scripts/internal/check_broken_links.py b/scripts/internal/check_broken_links.py index 1a0761161..e66448fd1 100755 --- a/scripts/internal/check_broken_links.py +++ b/scripts/internal/check_broken_links.py @@ -40,7 +40,6 @@ """ from __future__ import print_function - import concurrent.futures import functools import os diff --git a/scripts/sensors.py b/scripts/sensors.py index 726af3d2c..e532bebad 100755 --- a/scripts/sensors.py +++ b/scripts/sensors.py @@ -30,7 +30,6 @@ """ from __future__ import print_function - import psutil