diff --git a/flight_profiler/plugins/stack/server_plugin_stack.py b/flight_profiler/plugins/stack/server_plugin_stack.py index 0c83ce0..e469274 100644 --- a/flight_profiler/plugins/stack/server_plugin_stack.py +++ b/flight_profiler/plugins/stack/server_plugin_stack.py @@ -184,21 +184,29 @@ def _get_all_async_tasks(self) -> List[dict]: """ Get all async tasks from all running event loops across all threads. Returns a list of task info dictionaries. + + Supports multiple Python versions: + - Python 3.7-3.11: uses asyncio.tasks._all_tasks (WeakSet of all tasks) + - Python 3.12+: uses asyncio.tasks._scheduled_tasks (WeakSet of pending tasks) """ all_tasks_info = [] seen_task_ids = set() - # Directly access asyncio's global _all_tasks WeakSet - # This contains ALL tasks from ALL event loops, not just current loop + # Directly access asyncio's global task tracking WeakSet + # This contains tasks from ALL event loops, not just current loop try: - # _all_tasks is in asyncio.tasks module import asyncio.tasks as asyncio_tasks + all_tasks_weak = None + + # Python 3.7-3.11: _all_tasks contains all tasks (pending + done) if hasattr(asyncio_tasks, '_all_tasks'): all_tasks_weak = asyncio_tasks._all_tasks elif hasattr(asyncio, '_all_tasks'): all_tasks_weak = asyncio._all_tasks - else: - all_tasks_weak = None + # Python 3.12+: _scheduled_tasks contains pending tasks + elif hasattr(asyncio_tasks, '_scheduled_tasks'): + all_tasks_weak = asyncio_tasks._scheduled_tasks + if all_tasks_weak is not None: # Safe iteration over WeakSet (may need retry due to concurrent modification) for attempt in range(10): diff --git a/flight_profiler/test/plugins/stack/stack_plugin_test.py b/flight_profiler/test/plugins/stack/stack_plugin_test.py index e8db7c0..974fd6f 100644 --- a/flight_profiler/test/plugins/stack/stack_plugin_test.py +++ b/flight_profiler/test/plugins/stack/stack_plugin_test.py @@ -1,4 +1,5 @@ import os +import tempfile import time import unittest @@ -6,6 +7,37 @@ from flight_profiler.utils.env_util import is_linux +# Async test script content - placed outside flight_profiler directory to avoid filtering +ASYNC_TEST_SCRIPT = ''' +import asyncio +import sys + +async def long_running_task(): + """A long running async task that can be detected.""" + while True: + await asyncio.sleep(1) + +async def nested_coroutine_inner(): + """Inner nested coroutine.""" + await asyncio.sleep(10) + +async def nested_coroutine_outer(): + """Outer nested coroutine that awaits inner.""" + await nested_coroutine_inner() + +async def main(): + """Main async entry point.""" + task1 = asyncio.create_task(long_running_task(), name="TestLongRunningTask") + task2 = asyncio.create_task(nested_coroutine_outer(), name="TestNestedCoroutine") + await asyncio.sleep(60) + +if __name__ == "__main__": + print("plugin unit test script started") + sys.stdout.flush() + asyncio.run(main()) +''' + + class StackPluginTest(unittest.TestCase): def test_stack(self): @@ -110,8 +142,52 @@ def test_stack_native_frames(self): finally: integration.stop() + def test_stack_async(self): + """Test stack -a command to detect asyncio tasks. + + Note: The test script is created in a temp directory (outside flight_profiler) + to avoid being filtered by _is_flight_profiler_task(). + """ + # Create temp script file outside flight_profiler directory + with tempfile.NamedTemporaryFile( + mode='w', suffix='_async_test.py', delete=False + ) as f: + f.write(ASYNC_TEST_SCRIPT) + temp_script = f.name + + integration = ProfileIntegration() + try: + integration.start(temp_script, 20) + integration.execute_profile_cmd("stack -a") + process = integration.client_process + find_header = False + find_task = False + start = time.time() + while time.time() - start < 20: + output = process.stdout.readline() + print(output) + if output: + line = str(output) + if "Async Coroutine/Task Stacks" in line: + find_header = True + if "TestLongRunningTask" in line or "TestNestedCoroutine" in line: + find_task = True + if find_header and find_task: + break + else: + break + + self.assertTrue(find_header, "Did not find 'Async Coroutine/Task Stacks' header") + self.assertTrue(find_task, "Did not find any test async task") + except Exception: + raise + finally: + integration.stop() + os.unlink(temp_script) + if __name__ == "__main__": test = StackPluginTest() test.test_stack() test.test_stack_filepath() + test.test_stack_async()