diff --git a/ddtrace/internal/datadog/profiling/stack_v2/__init__.pyi b/ddtrace/internal/datadog/profiling/stack_v2/__init__.pyi index 72654f5bd42..a051cb22c3d 100644 --- a/ddtrace/internal/datadog/profiling/stack_v2/__init__.pyi +++ b/ddtrace/internal/datadog/profiling/stack_v2/__init__.pyi @@ -24,7 +24,7 @@ def unregister_thread(name: str) -> None: ... # Asyncio support def track_asyncio_loop(thread_id: int, loop: Optional[asyncio.AbstractEventLoop]) -> None: ... -def link_tasks(parent: asyncio.AbstractEventLoop, child: asyncio.Task) -> None: ... +def link_tasks(parent: asyncio.Task, child: asyncio.Future) -> None: ... def init_asyncio( current_tasks: Sequence[asyncio.Task], scheduled_tasks: Sequence[asyncio.Task], diff --git a/ddtrace/profiling/_asyncio.py b/ddtrace/profiling/_asyncio.py index 967c3081d3f..de64fc72df3 100644 --- a/ddtrace/profiling/_asyncio.py +++ b/ddtrace/profiling/_asyncio.py @@ -132,6 +132,19 @@ def _(f, args, kwargs): for child in children: stack_v2.link_tasks(parent, child) + @partial(wrap, sys.modules["asyncio"].tasks._wait) + def _(f, args, kwargs): + try: + return f(*args, **kwargs) + finally: + futures = typing.cast(typing.Iterable["asyncio.Future"], get_argument_value(args, kwargs, 0, "fs")) + loop = typing.cast("asyncio.AbstractEventLoop", get_argument_value(args, kwargs, 3, "loop")) + + # Link the parent gathering task to the gathered children + parent: "asyncio.Task" = globals()["current_task"](loop) + for future in futures: + stack_v2.link_tasks(parent, future) + _call_init_asyncio(asyncio) diff --git a/releasenotes/notes/profiling-track-asyncio-wait-5c7c5f7e06760c9f.yaml b/releasenotes/notes/profiling-track-asyncio-wait-5c7c5f7e06760c9f.yaml new file mode 100644 index 00000000000..27e7d242497 --- /dev/null +++ b/releasenotes/notes/profiling-track-asyncio-wait-5c7c5f7e06760c9f.yaml @@ -0,0 +1,4 @@ +features: + - | + profiling: This introduces tracking for ``asyncio.wait`` in the Profiler. + This makes it possible to track dependencies between Tasks/Coroutines that await/are awaited through ``asyncio.wait``. diff --git a/tests/profiling/collector/test_asyncio_gather.py b/tests/profiling/collector/test_asyncio_gather.py new file mode 100644 index 00000000000..a6deeb3e9db --- /dev/null +++ b/tests/profiling/collector/test_asyncio_gather.py @@ -0,0 +1,180 @@ +import pytest + + +@pytest.mark.subprocess( + env=dict( + DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_utils_gather", + ), + err=None, +) +# For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test) +def test_asyncio_gather() -> None: + import asyncio + import os + import time + import uuid + + from ddtrace import ext + from ddtrace.internal.datadog.profiling import stack_v2 + from ddtrace.profiling import profiler + from ddtrace.trace import tracer + from tests.profiling.collector import pprof_utils + + assert stack_v2.is_available, stack_v2.failure_msg + + sleep_time = 0.2 + loop_run_time = 3 + + async def inner1() -> None: + start_time = time.time() + while time.time() < start_time + loop_run_time: + await asyncio.sleep(sleep_time) + + async def inner2() -> None: + start_time = time.time() + while time.time() < start_time + loop_run_time: + await asyncio.sleep(sleep_time) + + async def outer() -> None: + t1 = asyncio.create_task(inner1(), name="inner 1") + t2 = asyncio.create_task(inner2(), name="inner 2") + await asyncio.gather(t1, t2) + + resource = str(uuid.uuid4()) + span_type = ext.SpanTypes.WEB + + p = profiler.Profiler(tracer=tracer) + p.start() + with tracer.trace("test_asyncio", resource=resource, span_type=span_type) as span: + span_id = span.span_id + local_root_span_id = span._local_root.span_id + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + main_task = loop.create_task(outer(), name="outer") + loop.run_until_complete(main_task) + + p.stop() + + output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid()) + + profile = pprof_utils.parse_newest_profile(output_filename) + + samples_with_span_id = pprof_utils.get_samples_with_label_key(profile, "span id") + assert len(samples_with_span_id) > 0 + + # get samples with task_name + samples = pprof_utils.get_samples_with_label_key(profile, "task name") + # The next fails if stack_v2 is not properly configured with asyncio task + # tracking via ddtrace.profiling._asyncio + assert len(samples) > 0 + + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="outer", + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="outer", filename="test_asyncio_gather.py", line_no=outer.__code__.co_firstlineno + 3 + ), + # TODO: We should add the locations of the gathered Tasks here as they should be in the same Stack + ], + ), + ) + + try: + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="outer", # TODO: This is a bug and we need to fix it, it should be "inner 1" + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="inner2", + filename="test_asyncio_gather.py", + line_no=inner2.__code__.co_firstlineno + 3, + ), + pprof_utils.StackLocation( + function_name="outer", + filename="test_asyncio_gather.py", + line_no=outer.__code__.co_firstlineno + 3, + ), + ], + ), + ) + + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="inner 1", + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="inner1", + filename="test_asyncio_gather.py", + line_no=inner1.__code__.co_firstlineno + 3, + ), + pprof_utils.StackLocation( + function_name="outer", + filename="test_asyncio_gather.py", + line_no=outer.__code__.co_firstlineno + 3, + ), + ], + ), + ) + except AssertionError: + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="inner 2", # TODO: This is a bug and we need to fix it, it should be "inner 1" + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="inner2", + filename="test_asyncio_gather.py", + line_no=inner2.__code__.co_firstlineno + 3, + ), + pprof_utils.StackLocation( + function_name="outer", + filename="test_asyncio_gather.py", + line_no=outer.__code__.co_firstlineno + 3, + ), + ], + ), + ) + + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="outer", + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="inner1", + filename="test_asyncio_gather.py", + line_no=inner1.__code__.co_firstlineno + 3, + ), + pprof_utils.StackLocation( + function_name="outer", + filename="test_asyncio_gather.py", + line_no=outer.__code__.co_firstlineno + 3, + ), + ], + ), + ) diff --git a/tests/profiling/collector/test_asyncio_wait.py b/tests/profiling/collector/test_asyncio_wait.py new file mode 100644 index 00000000000..78f378e385f --- /dev/null +++ b/tests/profiling/collector/test_asyncio_wait.py @@ -0,0 +1,184 @@ +import pytest + + +@pytest.mark.subprocess( + env=dict( + DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_wait", + ), + err=None, +) +# For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test) +def test_asyncio_wait() -> None: + import asyncio + import os + import time + import uuid + + from ddtrace import ext + from ddtrace.internal.datadog.profiling import stack_v2 + from ddtrace.profiling import profiler + from ddtrace.trace import tracer + from tests.profiling.collector import pprof_utils + + assert stack_v2.is_available, stack_v2.failure_msg + + sleep_time = 0.2 + loop_run_time = 3 + + async def inner1() -> None: + start_time = time.time() + while time.time() < start_time + loop_run_time: + await asyncio.sleep(sleep_time) + + async def inner2() -> None: + start_time = time.time() + while time.time() < start_time + loop_run_time: + await asyncio.sleep(sleep_time) + + async def outer() -> None: + t1 = asyncio.create_task(inner1(), name="inner 1") + t2 = asyncio.create_task(inner2(), name="inner 2") + await asyncio.wait(fs=(t1, t2), return_when=asyncio.ALL_COMPLETED) + + resource = str(uuid.uuid4()) + span_type = ext.SpanTypes.WEB + + p = profiler.Profiler(tracer=tracer) + p.start() + with tracer.trace("test_asyncio", resource=resource, span_type=span_type) as span: + span_id = span.span_id + local_root_span_id = span._local_root.span_id + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + main_task = loop.create_task(outer(), name="outer") + loop.run_until_complete(main_task) + + p.stop() + + output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid()) + + profile = pprof_utils.parse_newest_profile(output_filename) + + samples_with_span_id = pprof_utils.get_samples_with_label_key(profile, "span id") + assert len(samples_with_span_id) > 0 + + # get samples with task_name + samples = pprof_utils.get_samples_with_label_key(profile, "task name") + # The next fails if stack_v2 is not properly configured with asyncio task + # tracking via ddtrace.profiling._asyncio + assert len(samples) > 0 + + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="outer", + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="outer", filename="test_asyncio_wait.py", line_no=outer.__code__.co_firstlineno + 3 + ), + # TODO: We should add the locations of the gathered Tasks here as they should be in the same Stack + ], + ), + ) + + # Note: there currently is a bug somewhere that makes one of the Tasks show up under the parent Task and the + # other Tasks be under their own Task name. We need to fix this. + # For the time being, though, which Task is "independent" is non-deterministic which means we must + # test both possibilities ("inner 2" is part of "outer" or "inner 1" is part of "outer"). + try: + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="outer", # TODO: This is a bug and we need to fix it, it should be "inner 1" + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="inner1", + filename="test_asyncio_wait.py", + line_no=inner1.__code__.co_firstlineno + 3, + ), + pprof_utils.StackLocation( + function_name="outer", + filename="test_asyncio_wait.py", + line_no=outer.__code__.co_firstlineno + 3, + ), + ], + ), + ) + + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="inner 2", + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="inner2", + filename="test_asyncio_wait.py", + line_no=inner2.__code__.co_firstlineno + 3, + ), + pprof_utils.StackLocation( + function_name="outer", + filename="test_asyncio_wait.py", + line_no=outer.__code__.co_firstlineno + 3, + ), + ], + ), + ) + except AssertionError: + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="outer", # TODO: This is a bug and we need to fix it, it should be "inner 1" + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="inner2", + filename="test_asyncio_wait.py", + line_no=inner2.__code__.co_firstlineno + 3, + ), + pprof_utils.StackLocation( + function_name="outer", + filename="test_asyncio_wait.py", + line_no=outer.__code__.co_firstlineno + 3, + ), + ], + ), + ) + + pprof_utils.assert_profile_has_sample( + profile, + samples, + expected_sample=pprof_utils.StackEvent( + thread_name="MainThread", + task_name="inner 1", + span_id=span_id, + local_root_span_id=local_root_span_id, + locations=[ + pprof_utils.StackLocation( + function_name="inner1", + filename="test_asyncio_wait.py", + line_no=inner1.__code__.co_firstlineno + 3, + ), + pprof_utils.StackLocation( + function_name="outer", + filename="test_asyncio_wait.py", + line_no=outer.__code__.co_firstlineno + 3, + ), + ], + ), + )