Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions flight_profiler/plugins/stack/server_plugin_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,21 +184,29 @@ def _get_all_async_tasks(self) -> List[dict]:
"""
Get all async tasks from all running event loops across all threads.
Returns a list of task info dictionaries.

Supports multiple Python versions:
- Python 3.7-3.11: uses asyncio.tasks._all_tasks (WeakSet of all tasks)
- Python 3.12+: uses asyncio.tasks._scheduled_tasks (WeakSet of pending tasks)
"""
all_tasks_info = []
seen_task_ids = set()

# Directly access asyncio's global _all_tasks WeakSet
# This contains ALL tasks from ALL event loops, not just current loop
# Directly access asyncio's global task tracking WeakSet
# This contains tasks from ALL event loops, not just current loop
try:
# _all_tasks is in asyncio.tasks module
import asyncio.tasks as asyncio_tasks
all_tasks_weak = None

# Python 3.7-3.11: _all_tasks contains all tasks (pending + done)
if hasattr(asyncio_tasks, '_all_tasks'):
all_tasks_weak = asyncio_tasks._all_tasks
elif hasattr(asyncio, '_all_tasks'):
all_tasks_weak = asyncio._all_tasks
else:
all_tasks_weak = None
# Python 3.12+: _scheduled_tasks contains pending tasks
elif hasattr(asyncio_tasks, '_scheduled_tasks'):
all_tasks_weak = asyncio_tasks._scheduled_tasks

if all_tasks_weak is not None:
# Safe iteration over WeakSet (may need retry due to concurrent modification)
for attempt in range(10):
Expand Down
76 changes: 76 additions & 0 deletions flight_profiler/test/plugins/stack/stack_plugin_test.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,43 @@
import os
import tempfile
import time
import unittest

from flight_profiler.test.plugins.profile_integration import ProfileIntegration
from flight_profiler.utils.env_util import is_linux


# Async test script content - placed outside flight_profiler directory to avoid filtering
ASYNC_TEST_SCRIPT = '''
import asyncio
import sys

async def long_running_task():
"""A long running async task that can be detected."""
while True:
await asyncio.sleep(1)

async def nested_coroutine_inner():
"""Inner nested coroutine."""
await asyncio.sleep(10)

async def nested_coroutine_outer():
"""Outer nested coroutine that awaits inner."""
await nested_coroutine_inner()

async def main():
"""Main async entry point."""
task1 = asyncio.create_task(long_running_task(), name="TestLongRunningTask")
task2 = asyncio.create_task(nested_coroutine_outer(), name="TestNestedCoroutine")
await asyncio.sleep(60)

if __name__ == "__main__":
print("plugin unit test script started")
sys.stdout.flush()
asyncio.run(main())
'''


class StackPluginTest(unittest.TestCase):

def test_stack(self):
Expand Down Expand Up @@ -110,8 +142,52 @@ def test_stack_native_frames(self):
finally:
integration.stop()

def test_stack_async(self):
"""Test stack -a command to detect asyncio tasks.

Note: The test script is created in a temp directory (outside flight_profiler)
to avoid being filtered by _is_flight_profiler_task().
"""
# Create temp script file outside flight_profiler directory
with tempfile.NamedTemporaryFile(
mode='w', suffix='_async_test.py', delete=False
) as f:
f.write(ASYNC_TEST_SCRIPT)
temp_script = f.name

integration = ProfileIntegration()
try:
integration.start(temp_script, 20)
integration.execute_profile_cmd("stack -a")
process = integration.client_process
find_header = False
find_task = False
start = time.time()
while time.time() - start < 20:
output = process.stdout.readline()
print(output)
if output:
line = str(output)
if "Async Coroutine/Task Stacks" in line:
find_header = True
if "TestLongRunningTask" in line or "TestNestedCoroutine" in line:
find_task = True
if find_header and find_task:
break
else:
break

self.assertTrue(find_header, "Did not find 'Async Coroutine/Task Stacks' header")
self.assertTrue(find_task, "Did not find any test async task")
except Exception:
raise
finally:
integration.stop()
os.unlink(temp_script)


if __name__ == "__main__":
test = StackPluginTest()
test.test_stack()
test.test_stack_filepath()
test.test_stack_async()
Loading