Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddtrace/internal/datadog/profiling/stack_v2/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def unregister_thread(name: str) -> None: ...

# Asyncio support
def track_asyncio_loop(thread_id: int, loop: Optional[asyncio.AbstractEventLoop]) -> None: ...
def link_tasks(parent: asyncio.AbstractEventLoop, child: asyncio.Task) -> None: ...
def link_tasks(parent: asyncio.Task, child: asyncio.Future) -> None: ...
def init_asyncio(
current_tasks: Sequence[asyncio.Task],
scheduled_tasks: Sequence[asyncio.Task],
Expand Down
13 changes: 13 additions & 0 deletions ddtrace/profiling/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,19 @@ def _(f, args, kwargs):
for child in children:
stack_v2.link_tasks(parent, child)

@partial(wrap, sys.modules["asyncio"].tasks._wait)
def _(f, args, kwargs):
try:
return f(*args, **kwargs)
finally:
futures = typing.cast(typing.Iterable["asyncio.Future"], get_argument_value(args, kwargs, 0, "fs"))
loop = typing.cast("asyncio.AbstractEventLoop", get_argument_value(args, kwargs, 3, "loop"))

# Link the parent gathering task to the gathered children
parent: "asyncio.Task" = globals()["current_task"](loop)
for future in futures:
stack_v2.link_tasks(parent, future)

_call_init_asyncio(asyncio)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
features:
- |
profiling: This introduces tracking for ``asyncio.wait`` in the Profiler.
This makes it possible to track dependencies between Tasks/Coroutines that await/are awaited through ``asyncio.wait``.
180 changes: 180 additions & 0 deletions tests/profiling/collector/test_asyncio_gather.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import pytest


@pytest.mark.subprocess(
env=dict(
DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_utils_gather",
),
err=None,
)
# For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test)
def test_asyncio_gather() -> None:
import asyncio
import os
import time
import uuid

from ddtrace import ext
from ddtrace.internal.datadog.profiling import stack_v2
from ddtrace.profiling import profiler
from ddtrace.trace import tracer
from tests.profiling.collector import pprof_utils

assert stack_v2.is_available, stack_v2.failure_msg

sleep_time = 0.2
loop_run_time = 3

async def inner1() -> None:
start_time = time.time()
while time.time() < start_time + loop_run_time:
await asyncio.sleep(sleep_time)

async def inner2() -> None:
start_time = time.time()
while time.time() < start_time + loop_run_time:
await asyncio.sleep(sleep_time)

async def outer() -> None:
t1 = asyncio.create_task(inner1(), name="inner 1")
t2 = asyncio.create_task(inner2(), name="inner 2")
await asyncio.gather(t1, t2)

resource = str(uuid.uuid4())
span_type = ext.SpanTypes.WEB

p = profiler.Profiler(tracer=tracer)
p.start()
with tracer.trace("test_asyncio", resource=resource, span_type=span_type) as span:
span_id = span.span_id
local_root_span_id = span._local_root.span_id

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
main_task = loop.create_task(outer(), name="outer")
loop.run_until_complete(main_task)

p.stop()

output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid())

profile = pprof_utils.parse_newest_profile(output_filename)

samples_with_span_id = pprof_utils.get_samples_with_label_key(profile, "span id")
assert len(samples_with_span_id) > 0

# get samples with task_name
samples = pprof_utils.get_samples_with_label_key(profile, "task name")
# The next fails if stack_v2 is not properly configured with asyncio task
# tracking via ddtrace.profiling._asyncio
assert len(samples) > 0

pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
task_name="outer",
span_id=span_id,
local_root_span_id=local_root_span_id,
locations=[
pprof_utils.StackLocation(
function_name="outer", filename="test_asyncio_gather.py", line_no=outer.__code__.co_firstlineno + 3
),
# TODO: We should add the locations of the gathered Tasks here as they should be in the same Stack
],
),
)

try:
pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
task_name="outer", # TODO: This is a bug and we need to fix it, it should be "inner 1"
span_id=span_id,
local_root_span_id=local_root_span_id,
locations=[
pprof_utils.StackLocation(
function_name="inner2",
filename="test_asyncio_gather.py",
line_no=inner2.__code__.co_firstlineno + 3,
),
pprof_utils.StackLocation(
function_name="outer",
filename="test_asyncio_gather.py",
line_no=outer.__code__.co_firstlineno + 3,
),
],
),
)

pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
task_name="inner 1",
span_id=span_id,
local_root_span_id=local_root_span_id,
locations=[
pprof_utils.StackLocation(
function_name="inner1",
filename="test_asyncio_gather.py",
line_no=inner1.__code__.co_firstlineno + 3,
),
pprof_utils.StackLocation(
function_name="outer",
filename="test_asyncio_gather.py",
line_no=outer.__code__.co_firstlineno + 3,
),
],
),
)
except AssertionError:
pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
task_name="inner 2", # TODO: This is a bug and we need to fix it, it should be "inner 1"
span_id=span_id,
local_root_span_id=local_root_span_id,
locations=[
pprof_utils.StackLocation(
function_name="inner2",
filename="test_asyncio_gather.py",
line_no=inner2.__code__.co_firstlineno + 3,
),
pprof_utils.StackLocation(
function_name="outer",
filename="test_asyncio_gather.py",
line_no=outer.__code__.co_firstlineno + 3,
),
],
),
)

pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
task_name="outer",
span_id=span_id,
local_root_span_id=local_root_span_id,
locations=[
pprof_utils.StackLocation(
function_name="inner1",
filename="test_asyncio_gather.py",
line_no=inner1.__code__.co_firstlineno + 3,
),
pprof_utils.StackLocation(
function_name="outer",
filename="test_asyncio_gather.py",
line_no=outer.__code__.co_firstlineno + 3,
),
],
),
)
184 changes: 184 additions & 0 deletions tests/profiling/collector/test_asyncio_wait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import pytest


@pytest.mark.subprocess(
env=dict(
DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_wait",
),
err=None,
)
# For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test)
def test_asyncio_wait() -> None:
import asyncio
import os
import time
import uuid

from ddtrace import ext
from ddtrace.internal.datadog.profiling import stack_v2
from ddtrace.profiling import profiler
from ddtrace.trace import tracer
from tests.profiling.collector import pprof_utils

assert stack_v2.is_available, stack_v2.failure_msg

sleep_time = 0.2
loop_run_time = 3

async def inner1() -> None:
start_time = time.time()
while time.time() < start_time + loop_run_time:
await asyncio.sleep(sleep_time)

async def inner2() -> None:
start_time = time.time()
while time.time() < start_time + loop_run_time:
await asyncio.sleep(sleep_time)

async def outer() -> None:
t1 = asyncio.create_task(inner1(), name="inner 1")
t2 = asyncio.create_task(inner2(), name="inner 2")
await asyncio.wait(fs=(t1, t2), return_when=asyncio.ALL_COMPLETED)

resource = str(uuid.uuid4())
span_type = ext.SpanTypes.WEB

p = profiler.Profiler(tracer=tracer)
p.start()
with tracer.trace("test_asyncio", resource=resource, span_type=span_type) as span:
span_id = span.span_id
local_root_span_id = span._local_root.span_id

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
main_task = loop.create_task(outer(), name="outer")
loop.run_until_complete(main_task)

p.stop()

output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid())

profile = pprof_utils.parse_newest_profile(output_filename)

samples_with_span_id = pprof_utils.get_samples_with_label_key(profile, "span id")
assert len(samples_with_span_id) > 0

# get samples with task_name
samples = pprof_utils.get_samples_with_label_key(profile, "task name")
# The next fails if stack_v2 is not properly configured with asyncio task
# tracking via ddtrace.profiling._asyncio
assert len(samples) > 0

pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
task_name="outer",
span_id=span_id,
local_root_span_id=local_root_span_id,
locations=[
pprof_utils.StackLocation(
function_name="outer", filename="test_asyncio_wait.py", line_no=outer.__code__.co_firstlineno + 3
),
# TODO: We should add the locations of the gathered Tasks here as they should be in the same Stack
],
),
)

# Note: there currently is a bug somewhere that makes one of the Tasks show up under the parent Task and the
# other Tasks be under their own Task name. We need to fix this.
# For the time being, though, which Task is "independent" is non-deterministic which means we must
# test both possibilities ("inner 2" is part of "outer" or "inner 1" is part of "outer").
try:
pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
task_name="outer", # TODO: This is a bug and we need to fix it, it should be "inner 1"
span_id=span_id,
local_root_span_id=local_root_span_id,
locations=[
pprof_utils.StackLocation(
function_name="inner1",
filename="test_asyncio_wait.py",
line_no=inner1.__code__.co_firstlineno + 3,
),
pprof_utils.StackLocation(
function_name="outer",
filename="test_asyncio_wait.py",
line_no=outer.__code__.co_firstlineno + 3,
),
],
),
)

pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
task_name="inner 2",
span_id=span_id,
local_root_span_id=local_root_span_id,
locations=[
pprof_utils.StackLocation(
function_name="inner2",
filename="test_asyncio_wait.py",
line_no=inner2.__code__.co_firstlineno + 3,
),
pprof_utils.StackLocation(
function_name="outer",
filename="test_asyncio_wait.py",
line_no=outer.__code__.co_firstlineno + 3,
),
],
),
)
except AssertionError:
pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
task_name="outer", # TODO: This is a bug and we need to fix it, it should be "inner 1"
span_id=span_id,
local_root_span_id=local_root_span_id,
locations=[
pprof_utils.StackLocation(
function_name="inner2",
filename="test_asyncio_wait.py",
line_no=inner2.__code__.co_firstlineno + 3,
),
pprof_utils.StackLocation(
function_name="outer",
filename="test_asyncio_wait.py",
line_no=outer.__code__.co_firstlineno + 3,
),
],
),
)

pprof_utils.assert_profile_has_sample(
profile,
samples,
expected_sample=pprof_utils.StackEvent(
thread_name="MainThread",
task_name="inner 1",
span_id=span_id,
local_root_span_id=local_root_span_id,
locations=[
pprof_utils.StackLocation(
function_name="inner1",
filename="test_asyncio_wait.py",
line_no=inner1.__code__.co_firstlineno + 3,
),
pprof_utils.StackLocation(
function_name="outer",
filename="test_asyncio_wait.py",
line_no=outer.__code__.co_firstlineno + 3,
),
],
),
)
Loading