diff --git a/tests/profiling/collector/lock_utils.py b/tests/profiling/collector/lock_utils.py index 1be0b759498..ad4239b9e4a 100644 --- a/tests/profiling/collector/lock_utils.py +++ b/tests/profiling/collector/lock_utils.py @@ -1,9 +1,10 @@ from collections import namedtuple import sys +from typing import Dict LineNo = namedtuple("LineNo", ["create", "acquire", "release"]) -lock_locs = {} +lock_locs: Dict[str, LineNo] = {} loc_type_map = { "!CREATE!": "create", "!ACQUIRE!": "acquire", @@ -11,7 +12,7 @@ } -def get_lock_locations(path: str): +def get_lock_locations(path: str) -> None: """ The lock profiler is capable of determining where locks are created and used. In order to test this behavior, line numbers are compared in several tests. However, since it's cumbersome to write the tests in any way except @@ -34,12 +35,12 @@ def get_lock_locations(path: str): lock_locs[lock_name] = lock_locs[lock_name]._replace(**{field: lineno}) -def get_lock_linenos(name, with_stmt=False): +def get_lock_linenos(name, with_stmt=False) -> LineNo: linenos = lock_locs.get(name, LineNo(0, 0, 0)) if with_stmt and sys.version_info < (3, 10): linenos = linenos._replace(release=linenos.release + 1) return linenos -def init_linenos(path): +def init_linenos(path) -> None: get_lock_locations(path) diff --git a/tests/profiling_v2/collector/test_threading.py b/tests/profiling_v2/collector/test_threading.py index 672a936879d..6a9de6fa3d9 100644 --- a/tests/profiling_v2/collector/test_threading.py +++ b/tests/profiling_v2/collector/test_threading.py @@ -1,7 +1,9 @@ +import _thread import glob import os import threading -from typing import Any +from typing import Callable +from typing import List from typing import Optional from typing import Type from typing import Union @@ -11,51 +13,54 @@ import pytest from ddtrace import ext +from ddtrace._trace.span import Span +from ddtrace._trace.tracer import Tracer from ddtrace.internal.datadog.profiling import ddup from ddtrace.profiling.collector.threading import ThreadingLockCollector from ddtrace.profiling.collector.threading import ThreadingRLockCollector from tests.profiling.collector import pprof_utils from tests.profiling.collector import test_collector +from tests.profiling.collector.lock_utils import LineNo from tests.profiling.collector.lock_utils import get_lock_linenos from tests.profiling.collector.lock_utils import init_linenos +from tests.profiling.collector.pprof_utils import pprof_pb2 # Type aliases for supported classes -LockClass = Union[Type[threading.Lock], Type[threading.RLock]] -CollectorClass = Union[Type[ThreadingLockCollector], Type[ThreadingRLockCollector]] +LockClassType = Union[Type[threading.Lock], Type[threading.RLock]] +CollectorClassType = Union[Type[ThreadingLockCollector], Type[ThreadingRLockCollector]] +# threading.Lock and threading.RLock are factory functions that return _thread types. +# We reference the underlying _thread types directly to avoid creating instances at import time. +LockClassInst = Union[_thread.LockType, _thread.RLock] # Module-level globals for testing global lock profiling -_test_global_lock: Optional[Any] = None -_test_global_bar_instance: Optional[Any] = None +_test_global_lock: LockClassInst -TESTING_GEVENT: Union[str, bool] = os.getenv("DD_PROFILE_TEST_GEVENT", False) - -# Module-level globals for testing global lock profiling -_test_global_lock = None -_test_global_bar_instance = None +class TestBar: + ... -TESTING_GEVENT = os.getenv("DD_PROFILE_TEST_GEVENT", False) +_test_global_bar_instance: TestBar init_linenos(__file__) # Helper classes for testing lock collector class Foo: - def __init__(self, lock_class: Any): - self.foo_lock = lock_class() # !CREATE! foolock + def __init__(self, lock_class: LockClassType) -> None: + self.foo_lock: LockClassInst = lock_class() # !CREATE! foolock - def foo(self): + def foo(self) -> None: with self.foo_lock: # !RELEASE! !ACQUIRE! foolock pass class Bar: - def __init__(self, lock_class: Any): - self.foo = Foo(lock_class) + def __init__(self, lock_class: LockClassType) -> None: + self.foo: Foo = Foo(lock_class) - def bar(self): + def bar(self) -> None: self.foo.foo() @@ -77,7 +82,7 @@ def bar(self): ], ) def test_repr( - collector_class: CollectorClass, + collector_class: CollectorClassType, expected_repr: str, ) -> None: test_collector._test_repr(collector_class, expected_repr) @@ -91,11 +96,11 @@ def test_repr( ], ) def test_patch( - lock_class: LockClass, - collector_class: CollectorClass, + lock_class: LockClassType, + collector_class: CollectorClassType, ) -> None: - lock = lock_class - collector = collector_class() + lock: LockClassType = lock_class + collector: ThreadingLockCollector | ThreadingRLockCollector = collector_class() collector.start() assert lock == collector._original # wrapt makes this true @@ -108,7 +113,7 @@ def test_patch( @pytest.mark.subprocess( env=dict(WRAPT_DISABLE_EXTENSIONS="True", DD_PROFILING_FILE_PATH=__file__), ) -def test_wrapt_disable_extensions(): +def test_wrapt_disable_extensions() -> None: import os import threading @@ -116,16 +121,20 @@ def test_wrapt_disable_extensions(): from ddtrace.profiling.collector import _lock from ddtrace.profiling.collector.threading import ThreadingLockCollector from tests.profiling.collector import pprof_utils + from tests.profiling.collector.lock_utils import LineNo from tests.profiling.collector.lock_utils import get_lock_linenos from tests.profiling.collector.lock_utils import init_linenos + from tests.profiling.collector.pprof_utils import pprof_pb2 assert ddup.is_available, "ddup is not available" # Set up the ddup exporter - test_name = "test_wrapt_disable_extensions" - pprof_prefix = "/tmp" + os.sep + test_name - output_filename = pprof_prefix + "." + str(os.getpid()) - ddup.config(env="test", service=test_name, version="my_version", output_filename=pprof_prefix) + test_name: str = "test_wrapt_disable_extensions" + pprof_prefix: str = "/tmp" + os.sep + test_name + output_filename: str = pprof_prefix + "." + str(os.getpid()) + ddup.config( + env="test", service=test_name, version="my_version", output_filename=pprof_prefix + ) # pyright: ignore[reportCallIssue] ddup.start() init_linenos(os.environ["DD_PROFILING_FILE_PATH"]) @@ -137,17 +146,17 @@ def test_wrapt_disable_extensions(): assert _lock.WRAPT_C_EXT is False with ThreadingLockCollector(capture_pct=100): - th_lock = threading.Lock() # !CREATE! test_wrapt_disable_extensions + th_lock: threading.Lock = threading.Lock() # !CREATE! test_wrapt_disable_extensions with th_lock: # !ACQUIRE! !RELEASE! test_wrapt_disable_extensions pass - ddup.upload() + ddup.upload() # pyright: ignore[reportCallIssue] - expected_filename = "test_threading.py" + expected_filename: str = "test_threading.py" - linenos = get_lock_linenos("test_wrapt_disable_extensions", with_stmt=True) + linenos: LineNo = get_lock_linenos("test_wrapt_disable_extensions", with_stmt=True) - profile = pprof_utils.parse_newest_profile(output_filename) + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(output_filename) pprof_utils.assert_lock_events( profile, expected_acquire_events=[ @@ -171,7 +180,7 @@ def test_wrapt_disable_extensions(): # This test has to be run in a subprocess because it calls gevent.monkey.patch_all() # which affects the whole process. -@pytest.mark.skipif(not TESTING_GEVENT, reason="gevent is not available") +@pytest.mark.skipif(not os.getenv("DD_PROFILE_TEST_GEVENT"), reason="gevent is not available") @pytest.mark.subprocess( env=dict(DD_PROFILING_FILE_PATH=__file__), ) @@ -193,26 +202,30 @@ def test_lock_gevent_tasks() -> None: assert ddup.is_available, "ddup is not available" # Set up the ddup exporter - test_name = "test_lock_gevent_tasks" - pprof_prefix = "/tmp" + os.sep + test_name - output_filename = pprof_prefix + "." + str(os.getpid()) - ddup.config(env="test", service=test_name, version="my_version", output_filename=pprof_prefix) + test_name: str = "test_lock_gevent_tasks" + pprof_prefix: str = "/tmp" + os.sep + test_name + output_filename: str = pprof_prefix + "." + str(os.getpid()) + ddup.config( + env="test", service=test_name, version="my_version", output_filename=pprof_prefix + ) # pyright: ignore[reportCallIssue] ddup.start() init_linenos(os.environ["DD_PROFILING_FILE_PATH"]) def play_with_lock() -> None: - lock = threading.Lock() # !CREATE! test_lock_gevent_tasks + lock: threading.Lock = threading.Lock() # !CREATE! test_lock_gevent_tasks lock.acquire() # !ACQUIRE! test_lock_gevent_tasks lock.release() # !RELEASE! test_lock_gevent_tasks - def validate_and_cleanup(): - ddup.upload() + def validate_and_cleanup() -> None: + ddup.upload() # pyright: ignore[reportCallIssue] - expected_filename = "test_threading.py" - linenos = get_lock_linenos(test_name) + expected_filename: str = "test_threading.py" + linenos: LineNo = get_lock_linenos(test_name) - profile = pprof_utils.parse_newest_profile(output_filename) + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile( + output_filename + ) # pyright: ignore[reportInvalidTypeForm] pprof_utils.assert_lock_events( profile, expected_acquire_events=[ @@ -250,7 +263,7 @@ def validate_and_cleanup(): print("Error removing file: {}".format(e)) with ThreadingLockCollector(capture_pct=100): - t = threading.Thread(name="foobar", target=play_with_lock) + t: threading.Thread = threading.Thread(name="foobar", target=play_with_lock) t.start() t.join() @@ -259,7 +272,7 @@ def validate_and_cleanup(): # This test has to be run in a subprocess because it calls gevent.monkey.patch_all() # which affects the whole process. -@pytest.mark.skipif(not TESTING_GEVENT, reason="gevent is not available") +@pytest.mark.skipif(not os.getenv("DD_PROFILE_TEST_GEVENT"), reason="gevent is not available") @pytest.mark.subprocess( env=dict(DD_PROFILING_FILE_PATH=__file__), ) @@ -281,26 +294,28 @@ def test_rlock_gevent_tasks() -> None: assert ddup.is_available, "ddup is not available" # Set up the ddup exporter - test_name = "test_rlock_gevent_tasks" - pprof_prefix = "/tmp" + os.sep + test_name - output_filename = pprof_prefix + "." + str(os.getpid()) - ddup.config(env="test", service=test_name, version="my_version", output_filename=pprof_prefix) + test_name: str = "test_rlock_gevent_tasks" + pprof_prefix: str = "/tmp" + os.sep + test_name + output_filename: str = pprof_prefix + "." + str(os.getpid()) + ddup.config( + env="test", service=test_name, version="my_version", output_filename=pprof_prefix + ) # pyright: ignore[reportCallIssue] ddup.start() init_linenos(os.environ["DD_PROFILING_FILE_PATH"]) def play_with_lock() -> None: - lock = threading.RLock() # !CREATE! test_rlock_gevent_tasks + lock: threading.RLock = threading.RLock() # !CREATE! test_rlock_gevent_tasks lock.acquire() # !ACQUIRE! test_rlock_gevent_tasks lock.release() # !RELEASE! test_rlock_gevent_tasks - def validate_and_cleanup(): - ddup.upload() + def validate_and_cleanup() -> None: + ddup.upload() # pyright: ignore[reportCallIssue] - expected_filename = "test_threading.py" - linenos = get_lock_linenos(test_name) + expected_filename: str = "test_threading.py" + linenos: LineNo = get_lock_linenos(test_name) - profile = pprof_utils.parse_newest_profile(output_filename) + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(output_filename) pprof_utils.assert_lock_events( profile, expected_acquire_events=[ @@ -338,7 +353,7 @@ def validate_and_cleanup(): print("Error removing file: {}".format(e)) with ThreadingRLockCollector(capture_pct=100): - t = threading.Thread(name="foobar", target=play_with_lock) + t: threading.Thread = threading.Thread(name="foobar", target=play_with_lock) t.start() t.join() @@ -348,29 +363,31 @@ def validate_and_cleanup(): class BaseThreadingLockCollectorTest: # These should be implemented by child classes @property - def collector_class(self): + def collector_class(self) -> CollectorClassType: raise NotImplementedError("Child classes must implement collector_class") @property - def lock_class(self): + def lock_class(self) -> LockClassType: raise NotImplementedError("Child classes must implement lock_class") # setup_method and teardown_method which will be called before and after # each test method, respectively, part of pytest api. - def setup_method(self, method): - self.test_name = method.__name__ - self.pprof_prefix = "/tmp" + os.sep + self.test_name + def setup_method(self, method: Callable[..., None]) -> None: + self.test_name: str = method.__name__ + self.pprof_prefix: str = "/tmp" + os.sep + self.test_name # The output filename will be /tmp/method_name... # The counter number is incremented for each test case, as the tests are # all run in a single process and share the same exporter. - self.output_filename = self.pprof_prefix + "." + str(os.getpid()) + self.output_filename: str = self.pprof_prefix + "." + str(os.getpid()) # ddup is available when the native module is compiled assert ddup.is_available, "ddup is not available" - ddup.config(env="test", service=self.test_name, version="my_version", output_filename=self.pprof_prefix) + ddup.config( + env="test", service=self.test_name, version="my_version", output_filename=self.pprof_prefix + ) # pyright: ignore[reportCallIssue] ddup.start() - def teardown_method(self, method): + def teardown_method(self, method: Callable[..., None]) -> None: # might be unnecessary but this will ensure that the file is removed # after each successful test, and when a test fails it's easier to # pinpoint and debug. @@ -380,24 +397,23 @@ def teardown_method(self, method): except Exception as e: print("Error removing file: {}".format(e)) - def test_wrapper(self): - collector = self.collector_class() + def test_wrapper(self) -> None: + collector: ThreadingLockCollector | ThreadingRLockCollector = self.collector_class() with collector: class Foobar(object): - def __init__(self, lock_class): - lock = lock_class() + def __init__(self, lock_class: LockClassType) -> None: + lock: LockClassInst = lock_class() assert lock.acquire() lock.release() - lock = self.lock_class() + lock: LockClassInst = self.lock_class() assert lock.acquire() lock.release() # Try this way too Foobar(self.lock_class) - # Tests def test_lock_events(self): # The first argument is the recorder.Recorder which is used for the # v1 exporter. We don't need it for the v2 exporter. @@ -430,22 +446,22 @@ def test_lock_events(self): ], ) - def test_lock_acquire_events_class(self): + def test_lock_acquire_events_class(self) -> None: with self.collector_class(capture_pct=100): - lock_class = self.lock_class # Capture for inner class + lock_class: LockClassType = self.lock_class # Capture for inner class class Foobar(object): - def lockfunc(self): - lock = lock_class() # !CREATE! test_lock_acquire_events_class + def lockfunc(self) -> None: + lock: LockClassInst = lock_class() # !CREATE! test_lock_acquire_events_class lock.acquire() # !ACQUIRE! test_lock_acquire_events_class Foobar().lockfunc() - ddup.upload() + ddup.upload() # pyright: ignore[reportCallIssue] - linenos = get_lock_linenos("test_lock_acquire_events_class") + linenos: LineNo = get_lock_linenos("test_lock_acquire_events_class") - profile = pprof_utils.parse_newest_profile(self.output_filename) + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) pprof_utils.assert_lock_events( profile, expected_acquire_events=[ @@ -458,29 +474,29 @@ def lockfunc(self): ], ) - def test_lock_events_tracer(self, tracer): + def test_lock_events_tracer(self, tracer: Tracer) -> None: tracer._endpoint_call_counter_span_processor.enable() - resource = str(uuid.uuid4()) - span_type = ext.SpanTypes.WEB + resource: str = str(uuid.uuid4()) + span_type: str = ext.SpanTypes.WEB with self.collector_class( tracer=tracer, capture_pct=100, ): - lock1 = self.lock_class() # !CREATE! test_lock_events_tracer_1 + lock1: LockClassInst = self.lock_class() # !CREATE! test_lock_events_tracer_1 lock1.acquire() # !ACQUIRE! test_lock_events_tracer_1 with tracer.trace("test", resource=resource, span_type=span_type) as t: - lock2 = self.lock_class() # !CREATE! test_lock_events_tracer_2 + lock2: LockClassInst = self.lock_class() # !CREATE! test_lock_events_tracer_2 lock2.acquire() # !ACQUIRE! test_lock_events_tracer_2 lock1.release() # !RELEASE! test_lock_events_tracer_1 - span_id = t.span_id + span_id: int = t.span_id lock2.release() # !RELEASE! test_lock_events_tracer_2 ddup.upload(tracer=tracer) - linenos1 = get_lock_linenos("test_lock_events_tracer_1") - linenos2 = get_lock_linenos("test_lock_events_tracer_2") + linenos1: LineNo = get_lock_linenos("test_lock_events_tracer_1") + linenos2: LineNo = get_lock_linenos("test_lock_events_tracer_2") - profile = pprof_utils.parse_newest_profile(self.output_filename) + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) pprof_utils.assert_lock_events( profile, expected_acquire_events=[ @@ -519,25 +535,25 @@ def test_lock_events_tracer(self, tracer): ], ) - def test_lock_events_tracer_non_web(self, tracer): + def test_lock_events_tracer_non_web(self, tracer: Tracer) -> None: tracer._endpoint_call_counter_span_processor.enable() - resource = str(uuid.uuid4()) - span_type = ext.SpanTypes.SQL + resource: str = str(uuid.uuid4()) + span_type: str = ext.SpanTypes.SQL with self.collector_class( tracer=tracer, capture_pct=100, ): with tracer.trace("test", resource=resource, span_type=span_type) as t: - lock2 = self.lock_class() # !CREATE! test_lock_events_tracer_non_web + lock2: LockClassInst = self.lock_class() # !CREATE! test_lock_events_tracer_non_web lock2.acquire() # !ACQUIRE! test_lock_events_tracer_non_web - span_id = t.span_id + span_id: int = t.span_id lock2.release() # !RELEASE! test_lock_events_tracer_non_web ddup.upload(tracer=tracer) - linenos2 = get_lock_linenos("test_lock_events_tracer_non_web") + linenos2: LineNo = get_lock_linenos("test_lock_events_tracer_non_web") - profile = pprof_utils.parse_newest_profile(self.output_filename) + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) pprof_utils.assert_lock_events( profile, expected_acquire_events=[ @@ -561,18 +577,18 @@ def test_lock_events_tracer_non_web(self, tracer): ], ) - def test_lock_events_tracer_late_finish(self, tracer): + def test_lock_events_tracer_late_finish(self, tracer: Tracer) -> None: tracer._endpoint_call_counter_span_processor.enable() - resource = str(uuid.uuid4()) - span_type = ext.SpanTypes.WEB + resource: str = str(uuid.uuid4()) + span_type: str = ext.SpanTypes.WEB with self.collector_class( tracer=tracer, capture_pct=100, ): - lock1 = self.lock_class() # !CREATE! test_lock_events_tracer_late_finish_1 + lock1: LockClassInst = self.lock_class() # !CREATE! test_lock_events_tracer_late_finish_1 lock1.acquire() # !ACQUIRE! test_lock_events_tracer_late_finish_1 - span = tracer.start_span("test", span_type=span_type) - lock2 = self.lock_class() # !CREATE! test_lock_events_tracer_late_finish_2 + span: Span = tracer.start_span(name="test", span_type=span_type) # pyright: ignore[reportCallIssue] + lock2: LockClassInst = self.lock_class() # !CREATE! test_lock_events_tracer_late_finish_2 lock2.acquire() # !ACQUIRE! test_lock_events_tracer_late_finish_2 lock1.release() # !RELEASE! test_lock_events_tracer_late_finish_1 lock2.release() # !RELEASE! test_lock_events_tracer_late_finish_2 @@ -580,10 +596,10 @@ def test_lock_events_tracer_late_finish(self, tracer): span.finish() ddup.upload(tracer=tracer) - linenos1 = get_lock_linenos("test_lock_events_tracer_late_finish_1") - linenos2 = get_lock_linenos("test_lock_events_tracer_late_finish_2") + linenos1: LineNo = get_lock_linenos("test_lock_events_tracer_late_finish_1") + linenos2: LineNo = get_lock_linenos("test_lock_events_tracer_late_finish_2") - profile = pprof_utils.parse_newest_profile(self.output_filename) + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) pprof_utils.assert_lock_events( profile, expected_acquire_events=[ @@ -616,29 +632,29 @@ def test_lock_events_tracer_late_finish(self, tracer): ], ) - def test_resource_not_collected(self, tracer): + def test_resource_not_collected(self, tracer: Tracer) -> None: tracer._endpoint_call_counter_span_processor.enable() - resource = str(uuid.uuid4()) - span_type = ext.SpanTypes.WEB + resource: str = str(uuid.uuid4()) + span_type: str = ext.SpanTypes.WEB with self.collector_class( tracer=tracer, capture_pct=100, endpoint_collection_enabled=False, ): - lock1 = self.lock_class() # !CREATE! test_resource_not_collected_1 + lock1: LockClassInst = self.lock_class() # !CREATE! test_resource_not_collected_1 lock1.acquire() # !ACQUIRE! test_resource_not_collected_1 with tracer.trace("test", resource=resource, span_type=span_type) as t: - lock2 = self.lock_class() # !CREATE! test_resource_not_collected_2 + lock2: LockClassInst = self.lock_class() # !CREATE! test_resource_not_collected_2 lock2.acquire() # !ACQUIRE! test_resource_not_collected_2 lock1.release() # !RELEASE! test_resource_not_collected_1 - span_id = t.span_id + span_id: int = t.span_id lock2.release() # !RELEASE! test_resource_not_collected_2 ddup.upload(tracer=tracer) - linenos1 = get_lock_linenos("test_resource_not_collected_1") - linenos2 = get_lock_linenos("test_resource_not_collected_2") + linenos1: LineNo = get_lock_linenos("test_resource_not_collected_1") + linenos2: LineNo = get_lock_linenos("test_resource_not_collected_2") - profile = pprof_utils.parse_newest_profile(self.output_filename) + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) pprof_utils.assert_lock_events( profile, expected_acquire_events=[ @@ -677,18 +693,18 @@ def test_resource_not_collected(self, tracer): ], ) - def test_lock_enter_exit_events(self): + def test_lock_enter_exit_events(self) -> None: with self.collector_class(capture_pct=100): - th_lock = self.lock_class() # !CREATE! test_lock_enter_exit_events + th_lock: LockClassInst = self.lock_class() # !CREATE! test_lock_enter_exit_events with th_lock: # !ACQUIRE! !RELEASE! test_lock_enter_exit_events pass - ddup.upload() + ddup.upload() # pyright: ignore[reportCallIssue] # for enter/exits, we need to update the lock_linenos for versions >= 3.10 - linenos = get_lock_linenos("test_lock_enter_exit_events", with_stmt=True) + linenos: LineNo = get_lock_linenos("test_lock_enter_exit_events", with_stmt=True) - profile = pprof_utils.parse_newest_profile(self.output_filename) + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) pprof_utils.assert_lock_events( profile, expected_acquire_events=[ @@ -713,23 +729,23 @@ def test_lock_enter_exit_events(self): "inspect_dir_enabled", [True, False], ) - def test_class_member_lock(self, inspect_dir_enabled): + def test_class_member_lock(self, inspect_dir_enabled: bool) -> None: with mock.patch("ddtrace.settings.profiling.config.lock.name_inspect_dir", inspect_dir_enabled): - expected_lock_name = "foo_lock" if inspect_dir_enabled else None + expected_lock_name: Optional[str] = "foo_lock" if inspect_dir_enabled else None with self.collector_class(capture_pct=100): - foobar = Foo(self.lock_class) + foobar: Foo = Foo(self.lock_class) foobar.foo() - bar = Bar(self.lock_class) + bar: Bar = Bar(self.lock_class) bar.bar() - ddup.upload() + ddup.upload() # pyright: ignore[reportCallIssue] - linenos = get_lock_linenos("foolock", with_stmt=True) - profile = pprof_utils.parse_newest_profile(self.output_filename) - acquire_samples = pprof_utils.get_samples_with_value_type(profile, "lock-acquire") + linenos: LineNo = get_lock_linenos("foolock", with_stmt=True) + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) + acquire_samples: List[pprof_pb2.Sample] = pprof_utils.get_samples_with_value_type(profile, "lock-acquire") assert len(acquire_samples) >= 2, "Expected at least 2 lock-acquire samples" - release_samples = pprof_utils.get_samples_with_value_type(profile, "lock-release") + release_samples: List[pprof_pb2.Sample] = pprof_utils.get_samples_with_value_type(profile, "lock-release") assert len(release_samples) >= 2, "Expected at least 2 lock-release samples" pprof_utils.assert_lock_events( @@ -752,24 +768,24 @@ def test_class_member_lock(self, inspect_dir_enabled): ], ) - def test_private_lock(self): + def test_private_lock(self) -> None: class Foo: - def __init__(self, lock_class: Any): - self.__lock = lock_class() # !CREATE! test_private_lock + def __init__(self, lock_class: LockClassType) -> None: + self.__lock: LockClassInst = lock_class() # !CREATE! test_private_lock - def foo(self): + def foo(self) -> None: with self.__lock: # !RELEASE! !ACQUIRE! test_private_lock pass with self.collector_class(capture_pct=100): - foo = Foo(self.lock_class) + foo: Foo = Foo(self.lock_class) foo.foo() - ddup.upload() + ddup.upload() # pyright: ignore[reportCallIssue] - linenos = get_lock_linenos("test_private_lock", with_stmt=True) + linenos: LineNo = get_lock_linenos("test_private_lock", with_stmt=True) - profile = pprof_utils.parse_newest_profile(self.output_filename) + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) pprof_utils.assert_lock_events( profile, @@ -791,28 +807,28 @@ def foo(self): ], ) - def test_inner_lock(self): + def test_inner_lock(self) -> None: class Bar: - def __init__(self, lock_class: Any): - self.foo = Foo(lock_class) + def __init__(self, lock_class: LockClassType) -> None: + self.foo: Foo = Foo(lock_class) - def bar(self): + def bar(self) -> None: with self.foo.foo_lock: # !RELEASE! !ACQUIRE! test_inner_lock pass with self.collector_class(capture_pct=100): - bar = Bar(self.lock_class) + bar: Bar = Bar(self.lock_class) bar.bar() - ddup.upload() + ddup.upload() # pyright: ignore[reportCallIssue] - linenos_foo = get_lock_linenos("foolock") - linenos_bar = get_lock_linenos("test_inner_lock", with_stmt=True) + linenos_foo: LineNo = get_lock_linenos("foolock") + linenos_bar: LineNo = get_lock_linenos("test_inner_lock", with_stmt=True) linenos_bar = linenos_bar._replace( create=linenos_foo.create, ) - profile = pprof_utils.parse_newest_profile(self.output_filename) + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) pprof_utils.assert_lock_events( profile, expected_acquire_events=[ @@ -831,15 +847,15 @@ def bar(self): ], ) - def test_anonymous_lock(self): + def test_anonymous_lock(self) -> None: with self.collector_class(capture_pct=100): with self.lock_class(): # !CREATE! !ACQUIRE! !RELEASE! test_anonymous_lock pass - ddup.upload() + ddup.upload() # pyright: ignore[reportCallIssue] - linenos = get_lock_linenos("test_anonymous_lock", with_stmt=True) + linenos: LineNo = get_lock_linenos("test_anonymous_lock", with_stmt=True) - profile = pprof_utils.parse_newest_profile(self.output_filename) + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) pprof_utils.assert_lock_events( profile, expected_acquire_events=[ @@ -866,14 +882,14 @@ def test_global_locks(self) -> None: _test_global_lock = self.lock_class() # !CREATE! _test_global_lock class TestBar: - def __init__(self, lock_class: LockClass) -> None: - self.bar_lock = lock_class() # !CREATE! bar_lock + def __init__(self, lock_class: LockClassType) -> None: + self.bar_lock: LockClassInst = lock_class() # !CREATE! bar_lock - def bar(self): + def bar(self) -> None: with self.bar_lock: # !ACQUIRE! !RELEASE! bar_lock pass - def foo(): + def foo() -> None: global _test_global_lock assert _test_global_lock is not None with _test_global_lock: # !ACQUIRE! !RELEASE! _test_global_lock @@ -885,14 +901,14 @@ def foo(): foo() _test_global_bar_instance.bar() - ddup.upload() + ddup.upload() # pyright: ignore[reportCallIssue] # Process this file to get the correct line numbers for our !CREATE! comments init_linenos(__file__) - profile = pprof_utils.parse_newest_profile(self.output_filename) - linenos_global = get_lock_linenos("_test_global_lock", with_stmt=True) - linenos_bar = get_lock_linenos("bar_lock", with_stmt=True) + profile: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) + linenos_global: LineNo = get_lock_linenos("_test_global_lock", with_stmt=True) + linenos_bar: LineNo = get_lock_linenos("bar_lock", with_stmt=True) pprof_utils.assert_lock_events( profile, @@ -926,18 +942,18 @@ def foo(): ], ) - def test_upload_resets_profile(self): + def test_upload_resets_profile(self) -> None: # This test checks that the profile is cleared after each upload() call # It is added in test_threading.py as LockCollector can easily be # configured to be deterministic with capture_pct=100. with self.collector_class(capture_pct=100): with self.lock_class(): # !CREATE! !ACQUIRE! !RELEASE! test_upload_resets_profile pass - ddup.upload() + ddup.upload() # pyright: ignore[reportCallIssue] - linenos = get_lock_linenos("test_upload_resets_profile", with_stmt=True) + linenos: LineNo = get_lock_linenos("test_upload_resets_profile", with_stmt=True) - pprof = pprof_utils.parse_newest_profile(self.output_filename) + pprof: pprof_pb2.Profile = pprof_utils.parse_newest_profile(self.output_filename) pprof_utils.assert_lock_events( pprof, expected_acquire_events=[ @@ -957,7 +973,7 @@ def test_upload_resets_profile(self): ) # Now we call upload() again, and we expect the profile to be empty - ddup.upload() + ddup.upload() # pyright: ignore[reportCallIssue] # parse_newest_profile raises an AssertionError if the profile doesn't # have any samples with pytest.raises(AssertionError): @@ -968,11 +984,11 @@ class TestThreadingLockCollector(BaseThreadingLockCollectorTest): """Test Lock profiling""" @property - def collector_class(self): + def collector_class(self) -> Type[ThreadingLockCollector]: return ThreadingLockCollector @property - def lock_class(self): + def lock_class(self) -> Type[threading.Lock]: return threading.Lock @@ -980,9 +996,9 @@ class TestThreadingRLockCollector(BaseThreadingLockCollectorTest): """Test RLock profiling""" @property - def collector_class(self): + def collector_class(self) -> Type[ThreadingRLockCollector]: return ThreadingRLockCollector @property - def lock_class(self): + def lock_class(self) -> Type[threading.RLock]: return threading.RLock