Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 44 additions & 12 deletions src/ragas/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,40 @@ def is_event_loop_running() -> bool:
return loop.is_running()


def apply_nest_asyncio():
"""Apply nest_asyncio if an event loop is running."""
if is_event_loop_running():
# an event loop is running so call nested_asyncio to fix this
try:
import nest_asyncio
except ImportError:
raise ImportError(
"It seems like your running this in a jupyter-like environment. Please install nest_asyncio with `pip install nest_asyncio` to make it work."
def apply_nest_asyncio() -> bool:
"""
Apply nest_asyncio if an event loop is running and compatible.

Returns:
bool: True if nest_asyncio was applied, False if skipped
"""
if not is_event_loop_running():
return False

try:
import nest_asyncio
except ImportError:
raise ImportError(
"It seems like your running this in a jupyter-like environment. Please install nest_asyncio with `pip install nest_asyncio` to make it work."
)

try:
loop = asyncio.get_running_loop()
loop_type = type(loop).__name__

if "uvloop" in loop_type.lower() or "uvloop" in str(type(loop)):
logger.debug(
f"Skipping nest_asyncio.apply() for incompatible loop type: {loop_type}"
)
return False

nest_asyncio.apply()
return True
except ValueError as e:
if "Can't patch loop of type" in str(e):
logger.debug(f"Skipping nest_asyncio.apply(): {e}")
return False
raise


def as_completed(
Expand Down Expand Up @@ -116,12 +138,22 @@ def run(
Whether to apply nest_asyncio for Jupyter compatibility. Default is True.
Set to False in production environments to avoid event loop patching.
"""
# Only apply nest_asyncio if explicitly allowed
nest_asyncio_applied = False
if allow_nest_asyncio:
apply_nest_asyncio()
nest_asyncio_applied = apply_nest_asyncio()

# Create the coroutine if it's a callable, otherwise use directly
coro = async_func() if callable(async_func) else async_func

if is_event_loop_running() and not nest_asyncio_applied:
loop = asyncio.get_running_loop()
loop_type = type(loop).__name__
raise RuntimeError(
f"Cannot execute nested async code with {loop_type}. "
f"uvloop does not support nested event loop execution. "
f"Please use asyncio's standard event loop in Jupyter environments, "
f"or refactor your code to avoid nested async calls."
)

return asyncio.run(coro)


Expand Down
92 changes: 92 additions & 0 deletions tests/unit/test_uvloop_compatibility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Test uvloop compatibility with nest_asyncio."""

import asyncio
import sys

import pytest


class TestUvloopCompatibility:
"""Test that ragas works with uvloop event loops."""

@pytest.mark.skipif(sys.version_info < (3, 8), reason="uvloop requires Python 3.8+")
def test_apply_nest_asyncio_with_uvloop_returns_false(self):
"""Test that apply_nest_asyncio returns False with uvloop."""
uvloop = pytest.importorskip("uvloop")

from ragas.async_utils import apply_nest_asyncio

async def test_func():
result = apply_nest_asyncio()
return result

uvloop.install()
try:
result = asyncio.run(test_func())
assert result is False
finally:
asyncio.set_event_loop_policy(None)

@pytest.mark.skipif(sys.version_info < (3, 8), reason="uvloop requires Python 3.8+")
def test_run_with_uvloop_and_running_loop(self):
"""Test that run() raises clear error with uvloop in running event loop (Jupyter scenario)."""
uvloop = pytest.importorskip("uvloop")

from ragas.async_utils import run

async def inner_task():
return "success"

async def outer_task():
with pytest.raises(RuntimeError, match="Cannot execute nested async code"):
run(inner_task)

uvloop.install()
try:
asyncio.run(outer_task())
finally:
asyncio.set_event_loop_policy(None)

@pytest.mark.skipif(sys.version_info < (3, 8), reason="uvloop requires Python 3.8+")
def test_run_async_tasks_with_uvloop(self):
"""Test that run_async_tasks works with uvloop."""
uvloop = pytest.importorskip("uvloop")

from ragas.async_utils import run_async_tasks

async def task(n):
return n * 2

tasks = [task(i) for i in range(5)]

uvloop.install()
try:
results = run_async_tasks(tasks, show_progress=False)
assert sorted(results) == [0, 2, 4, 6, 8]
finally:
asyncio.set_event_loop_policy(None)

def test_apply_nest_asyncio_without_uvloop_returns_true(self):
"""Test that apply_nest_asyncio returns True with standard asyncio."""
from ragas.async_utils import apply_nest_asyncio

async def test_func():
result = apply_nest_asyncio()
return result

result = asyncio.run(test_func())
assert result is True

def test_run_with_standard_asyncio_and_running_loop(self):
"""Test that run() works with standard asyncio in a running loop."""
from ragas.async_utils import run

async def inner_task():
return "nested_success"

async def outer_task():
result = run(inner_task)
return result

result = asyncio.run(outer_task())
assert result == "nested_success"