From cbbb1c570aaa22c3cb47645eb8b98570b923d1fe Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 13 Jun 2025 15:03:34 +0200 Subject: [PATCH] Deal with threading related issues that caused flakinnes - Add lock to prevent acces to already running generator - Compare sets to avoid order related non determinism caused by threads --- tests/unit/test_logging.py | 56 ++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/tests/unit/test_logging.py b/tests/unit/test_logging.py index 88879c9e..636f0fc4 100644 --- a/tests/unit/test_logging.py +++ b/tests/unit/test_logging.py @@ -1,6 +1,7 @@ import asyncio import json import logging +import threading import time from collections.abc import AsyncIterator, Generator, Iterator from datetime import datetime, timedelta @@ -65,6 +66,8 @@ @pytest.fixture def mock_api() -> None: + test_server_lock = threading.Lock() + def get_responses() -> Generator[httpx.Response, None, None]: """Simulate actor run that changes status 3 times.""" for _ in range(5): @@ -116,8 +119,11 @@ def get_responses() -> Generator[httpx.Response, None, None]: responses = get_responses() def actor_runs_side_effect(_: httpx.Request) -> httpx.Response: - time.sleep(0.1) - return next(responses) + test_server_lock.acquire() + # To avoid multiple threads accessing at the same time and causing `ValueError: generator already executing` + response = next(responses) + test_server_lock.release_lock() + return response respx.get(url=f'{_MOCKED_API_URL}/v2/actor-runs/{_MOCKED_RUN_ID}').mock(side_effect=actor_runs_side_effect) @@ -212,10 +218,10 @@ async def test_redirected_logs_async( # Do stuff while the log from the other Actor is being redirected to the logs. await asyncio.sleep(2) - assert len(caplog.records) == expected_log_count - for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:], caplog.records): - assert expected_message_and_level[0] == record.message - assert expected_message_and_level[1] == record.levelno + # Ensure logs are propagated + assert {(record.message, record.levelno) for record in caplog.records} == set( + _EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:] + ) @pytest.mark.parametrize( @@ -250,10 +256,10 @@ def test_redirected_logs_sync( # Do stuff while the log from the other Actor is being redirected to the logs. time.sleep(2) - assert len(caplog.records) == expected_log_count - for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:], caplog.records): - assert expected_message_and_level[0] == record.message - assert expected_message_and_level[1] == record.levelno + # Ensure logs are propagated + assert {(record.message, record.levelno) for record in caplog.records} == set( + _EXPECTED_MESSAGES_AND_LEVELS[-expected_log_count:] + ) @respx.mock @@ -278,10 +284,9 @@ async def test_actor_call_redirect_logs_to_default_logger_async( assert isinstance(logger.handlers[0], logging.StreamHandler) # Ensure logs are propagated - assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES) - for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records): - assert expected_message_and_level[0] == record.message - assert expected_message_and_level[1] == record.levelno + assert {(record.message, record.levelno) for record in caplog.records} == set( + _EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES + ) @respx.mock @@ -306,10 +311,9 @@ def test_actor_call_redirect_logs_to_default_logger_sync( assert isinstance(logger.handlers[0], logging.StreamHandler) # Ensure logs are propagated - assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES) - for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records): - assert expected_message_and_level[0] == record.message - assert expected_message_and_level[1] == record.levelno + assert {(record.message, record.levelno) for record in caplog.records} == set( + _EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES + ) @respx.mock @@ -357,10 +361,10 @@ async def test_actor_call_redirect_logs_to_custom_logger_async( with caplog.at_level(logging.DEBUG, logger=logger_name): await actor_client.call(logger=logger) - assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES) - for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records): - assert expected_message_and_level[0] == record.message - assert expected_message_and_level[1] == record.levelno + # Ensure logs are propagated + assert {(record.message, record.levelno) for record in caplog.records} == set( + _EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES + ) @respx.mock @@ -378,10 +382,10 @@ def test_actor_call_redirect_logs_to_custom_logger_sync( with caplog.at_level(logging.DEBUG, logger=logger_name): actor_client.call(logger=logger) - assert len(caplog.records) == len(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES) - for expected_message_and_level, record in zip(_EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES, caplog.records): - assert expected_message_and_level[0] == record.message - assert expected_message_and_level[1] == record.levelno + # Ensure logs are propagated + assert {(record.message, record.levelno) for record in caplog.records} == set( + _EXPECTED_MESSAGES_AND_LEVELS_WITH_STATUS_MESSAGES + ) @respx.mock