Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/howtos/integrations/llamaindex.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@
"\n",
"# init metrics with evaluator LLM\n",
"from ragas.llms import LlamaIndexLLMWrapper\n",
"\n",
"evaluator_llm = LlamaIndexLLMWrapper(OpenAI(model=\"gpt-4o\"))\n",
"metrics = [\n",
" Faithfulness(llm=evaluator_llm),\n",
Expand Down
106 changes: 73 additions & 33 deletions src/ragas/async_utils.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,89 @@
"""Async utils."""

import asyncio
from typing import Any, Coroutine, List
from typing import Any, Coroutine, List, Optional

from tqdm.auto import tqdm

from ragas.executor import is_event_loop_running
from ragas.utils import batched


def run_async_tasks(
tasks: List[Coroutine],
show_progress: bool = False,
batch_size: Optional[int] = None,
show_progress: bool = True,
progress_bar_desc: str = "Running async tasks",
) -> List[Any]:
"""Run a list of async tasks."""
tasks_to_execute: List[Any] = tasks
"""
Execute async tasks with optional batching and progress tracking.

NOTE: Order of results is not guaranteed!

Args:
tasks: List of coroutines to execute
batch_size: Optional size for batching tasks. If None, runs all concurrently
show_progress: Whether to display progress bars
"""

async def _run():
total_tasks = len(tasks)
results = []

# if running in notebook, use nest_asyncio to hijack the event loop
try:
loop = asyncio.get_running_loop()
# If no batching, run all tasks concurrently with single progress bar
if not batch_size:
with tqdm(
total=total_tasks,
desc=progress_bar_desc,
disable=not show_progress,
) as pbar:
for future in asyncio.as_completed(tasks):
result = await future
results.append(result)
pbar.update(1)
return results

# With batching, show nested progress bars
batches = batched(tasks, batch_size) # generator
n_batches = (total_tasks + batch_size - 1) // batch_size
with (
tqdm(
total=total_tasks,
desc=progress_bar_desc,
disable=not show_progress,
position=0,
leave=True,
) as overall_pbar,
tqdm(
total=batch_size,
desc=f"Batch 1/{n_batches}",
disable=not show_progress,
position=1,
leave=False,
) as batch_pbar,
):
for i, batch in enumerate(batches, 1):
batch_pbar.reset(total=len(batch))
batch_pbar.set_description(f"Batch {i}/{n_batches}")
for future in asyncio.as_completed(batch):
result = await future
results.append(result)
overall_pbar.update(1)
batch_pbar.update(1)

return results

if is_event_loop_running():
# an event loop is running so call nested_asyncio to fix this
try:
import nest_asyncio
except ImportError:
raise RuntimeError(
"nest_asyncio is required to run async tasks in jupyter. Please install it via `pip install nest_asyncio`." # noqa
raise ImportError(
"It seems like your running this in a jupyter-like environment. "
"Please install nest_asyncio with `pip install nest_asyncio` to make it work."
)
else:
nest_asyncio.apply()
except RuntimeError:
loop = asyncio.new_event_loop()

# gather tasks to run
if show_progress:
from tqdm.asyncio import tqdm

async def _gather() -> List[Any]:
"gather tasks and show progress bar"
return await tqdm.gather(*tasks_to_execute, desc=progress_bar_desc)

else: # don't show_progress

async def _gather() -> List[Any]:
return await asyncio.gather(*tasks_to_execute)

try:
outputs: List[Any] = loop.run_until_complete(_gather())
except Exception as e:
# run the operation w/o tqdm on hitting a fatal
# may occur in some environments where tqdm.asyncio
# is not supported
raise RuntimeError("Fatal error occurred while running async tasks.", e) from e
return outputs

results = asyncio.run(_run())
return results
2 changes: 1 addition & 1 deletion src/ragas/dataset_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def to_pandas(self) -> PandasDataframe:
def from_pandas(cls, dataframe: PandasDataframe):
"""Creates an EvaluationDataset from a pandas DataFrame."""
return cls.from_list(dataframe.to_dict(orient="records"))

def features(self):
"""Returns the features of the samples."""
return self.samples[0].get_features()
Expand Down
6 changes: 5 additions & 1 deletion src/ragas/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
@track_was_completed
def evaluate(
dataset: t.Union[Dataset, EvaluationDataset],
metrics: list[Metric] | None = None,
metrics: t.Optional[t.Sequence[Metric]] = None,
llm: t.Optional[BaseRagasLLM | LangchainLLM] = None,
embeddings: t.Optional[BaseRagasEmbeddings | LangchainEmbeddings] = None,
callbacks: Callbacks = None,
Expand All @@ -65,6 +65,7 @@ def evaluate(
raise_exceptions: bool = False,
column_map: t.Optional[t.Dict[str, str]] = None,
show_progress: bool = True,
batch_size: t.Optional[int] = None,
) -> EvaluationResult:
"""
Run the evaluation on the dataset with different metrics
Expand Down Expand Up @@ -110,6 +111,8 @@ def evaluate(
column_map can be given as {"contexts":"contexts_v1"}
show_progress: bool, optional
Whether to show the progress bar during evaluation. If set to False, the progress bar will be disabled. Default is True.
batch_size: int, optional
How large should batches be. If set to None (default), no batching is done.

Returns
-------
Expand Down Expand Up @@ -223,6 +226,7 @@ def evaluate(
raise_exceptions=raise_exceptions,
run_config=run_config,
show_progress=show_progress,
batch_size=batch_size,
)

# Ragas Callbacks
Expand Down
Loading
Loading