From c918e5900c89cdacb19c73079e97ff7ba703d412 Mon Sep 17 00:00:00 2001 From: Chenyang Li Date: Tue, 14 Oct 2025 15:10:15 -0400 Subject: [PATCH 1/3] Remove error suppressor in async_utils.py and engine.py --- src/ragas/async_utils.py | 11 ++++------- src/ragas/testset/transforms/engine.py | 5 +---- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/ragas/async_utils.py b/src/ragas/async_utils.py index 2a56bea1ff..2e118bcc56 100644 --- a/src/ragas/async_utils.py +++ b/src/ragas/async_utils.py @@ -85,16 +85,13 @@ async def process_futures( Yields: Results from completed futures as they finish + + Raises: + Exception: Any exception raised by the futures will be propagated """ # Process completed futures as they finish for future in futures: - try: - result = await future - except asyncio.CancelledError: - raise # Re-raise CancelledError to ensure proper cancellation - except Exception as e: - result = e - + result = await future yield result diff --git a/src/ragas/testset/transforms/engine.py b/src/ragas/testset/transforms/engine.py index 29eec1a526..3518e40ab8 100644 --- a/src/ragas/testset/transforms/engine.py +++ b/src/ragas/testset/transforms/engine.py @@ -58,10 +58,7 @@ async def run_coroutines( # whether you want to keep the progress bar after completion leave=False, ): - try: - await future - except Exception as e: - logger.error(f"unable to apply transformation: {e}") + await future def get_desc(transform: BaseGraphTransformation | Parallel): From 2b2aaea6204660d6c9ef57c39ab8875aa1e7dd0b Mon Sep 17 00:00:00 2001 From: Chenyang Li Date: Wed, 15 Oct 2025 10:55:55 -0400 Subject: [PATCH 2/3] fix: handle exceptions in async tasks without breaking execution --- src/ragas/async_utils.py | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/src/ragas/async_utils.py b/src/ragas/async_utils.py index 2e118bcc56..8878113cb7 100644 --- a/src/ragas/async_utils.py +++ b/src/ragas/async_utils.py @@ -85,13 +85,15 @@ async def process_futures( Yields: Results from completed futures as they finish - - Raises: - Exception: Any exception raised by the futures will be propagated """ # Process completed futures as they finish for future in futures: - result = await future + try: + result = await future + except asyncio.CancelledError: + raise # Re-raise CancelledError to ensure proper cancellation + except Exception as e: + result = e yield result @@ -147,6 +149,7 @@ def run_async_tasks( async def _run(): total_tasks = len(tasks) results = [] + first_exception = None pbm = ProgressBarManager(progress_bar_desc, show_progress) if not batch_size: @@ -154,6 +157,14 @@ async def _run(): async for result in process_futures( as_completed(tasks, max_workers, cancel_check=cancel_check) ): + if isinstance(result, Exception): + logger.error( + f"Task failed with {type(result).__name__}: {result}", + exc_info=False, + ) + # Store first exception to raise after all tasks complete + if first_exception is None: + first_exception = result results.append(result) pbar.update(1) else: @@ -168,10 +179,22 @@ async def _run(): async for result in process_futures( as_completed(batch, max_workers, cancel_check=cancel_check) ): + if isinstance(result, Exception): + logger.error( + f"Task failed with {type(result).__name__}: {result}", + exc_info=False, + ) + # Store first exception to raise after all tasks complete + if first_exception is None: + first_exception = result results.append(result) batch_pbar.update(1) overall_pbar.update(len(batch)) + # Raise the first exception encountered to fail fast with clear error message + if first_exception is not None: + raise first_exception + return results return run(_run) From 9aac64b03eaaf584cfdf0249e4cb47bd937ab158 Mon Sep 17 00:00:00 2001 From: Chenyang Li Date: Tue, 21 Oct 2025 15:06:32 -0400 Subject: [PATCH 3/3] lint issue --- src/ragas/async_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ragas/async_utils.py b/src/ragas/async_utils.py index 8878113cb7..fc03c42456 100644 --- a/src/ragas/async_utils.py +++ b/src/ragas/async_utils.py @@ -194,7 +194,7 @@ async def _run(): # Raise the first exception encountered to fail fast with clear error message if first_exception is not None: raise first_exception - + return results return run(_run)