diff --git a/docs/howtos/integrations/llamaindex.ipynb b/docs/howtos/integrations/llamaindex.ipynb index ddf677957..9ff65f123 100644 --- a/docs/howtos/integrations/llamaindex.ipynb +++ b/docs/howtos/integrations/llamaindex.ipynb @@ -335,6 +335,7 @@ "\n", "# init metrics with evaluator LLM\n", "from ragas.llms import LlamaIndexLLMWrapper\n", + "\n", "evaluator_llm = LlamaIndexLLMWrapper(OpenAI(model=\"gpt-4o\"))\n", "metrics = [\n", " Faithfulness(llm=evaluator_llm),\n", diff --git a/src/ragas/async_utils.py b/src/ragas/async_utils.py index 6937b4617..f327955c2 100644 --- a/src/ragas/async_utils.py +++ b/src/ragas/async_utils.py @@ -1,49 +1,89 @@ """Async utils.""" import asyncio -from typing import Any, Coroutine, List +from typing import Any, Coroutine, List, Optional + +from tqdm.auto import tqdm + +from ragas.executor import is_event_loop_running +from ragas.utils import batched def run_async_tasks( tasks: List[Coroutine], - show_progress: bool = False, + batch_size: Optional[int] = None, + show_progress: bool = True, progress_bar_desc: str = "Running async tasks", ) -> List[Any]: - """Run a list of async tasks.""" - tasks_to_execute: List[Any] = tasks + """ + Execute async tasks with optional batching and progress tracking. + + NOTE: Order of results is not guaranteed! + + Args: + tasks: List of coroutines to execute + batch_size: Optional size for batching tasks. If None, runs all concurrently + show_progress: Whether to display progress bars + """ + + async def _run(): + total_tasks = len(tasks) + results = [] - # if running in notebook, use nest_asyncio to hijack the event loop - try: - loop = asyncio.get_running_loop() + # If no batching, run all tasks concurrently with single progress bar + if not batch_size: + with tqdm( + total=total_tasks, + desc=progress_bar_desc, + disable=not show_progress, + ) as pbar: + for future in asyncio.as_completed(tasks): + result = await future + results.append(result) + pbar.update(1) + return results + + # With batching, show nested progress bars + batches = batched(tasks, batch_size) # generator + n_batches = (total_tasks + batch_size - 1) // batch_size + with ( + tqdm( + total=total_tasks, + desc=progress_bar_desc, + disable=not show_progress, + position=0, + leave=True, + ) as overall_pbar, + tqdm( + total=batch_size, + desc=f"Batch 1/{n_batches}", + disable=not show_progress, + position=1, + leave=False, + ) as batch_pbar, + ): + for i, batch in enumerate(batches, 1): + batch_pbar.reset(total=len(batch)) + batch_pbar.set_description(f"Batch {i}/{n_batches}") + for future in asyncio.as_completed(batch): + result = await future + results.append(result) + overall_pbar.update(1) + batch_pbar.update(1) + + return results + + if is_event_loop_running(): + # an event loop is running so call nested_asyncio to fix this try: import nest_asyncio except ImportError: - raise RuntimeError( - "nest_asyncio is required to run async tasks in jupyter. Please install it via `pip install nest_asyncio`." # noqa + raise ImportError( + "It seems like your running this in a jupyter-like environment. " + "Please install nest_asyncio with `pip install nest_asyncio` to make it work." ) else: nest_asyncio.apply() - except RuntimeError: - loop = asyncio.new_event_loop() - - # gather tasks to run - if show_progress: - from tqdm.asyncio import tqdm - - async def _gather() -> List[Any]: - "gather tasks and show progress bar" - return await tqdm.gather(*tasks_to_execute, desc=progress_bar_desc) - - else: # don't show_progress - - async def _gather() -> List[Any]: - return await asyncio.gather(*tasks_to_execute) - - try: - outputs: List[Any] = loop.run_until_complete(_gather()) - except Exception as e: - # run the operation w/o tqdm on hitting a fatal - # may occur in some environments where tqdm.asyncio - # is not supported - raise RuntimeError("Fatal error occurred while running async tasks.", e) from e - return outputs + + results = asyncio.run(_run()) + return results diff --git a/src/ragas/dataset_schema.py b/src/ragas/dataset_schema.py index 71d63c0a4..0a59c2485 100644 --- a/src/ragas/dataset_schema.py +++ b/src/ragas/dataset_schema.py @@ -208,7 +208,7 @@ def to_pandas(self) -> PandasDataframe: def from_pandas(cls, dataframe: PandasDataframe): """Creates an EvaluationDataset from a pandas DataFrame.""" return cls.from_list(dataframe.to_dict(orient="records")) - + def features(self): """Returns the features of the samples.""" return self.samples[0].get_features() diff --git a/src/ragas/evaluation.py b/src/ragas/evaluation.py index d681e7132..57a2ea978 100644 --- a/src/ragas/evaluation.py +++ b/src/ragas/evaluation.py @@ -55,7 +55,7 @@ @track_was_completed def evaluate( dataset: t.Union[Dataset, EvaluationDataset], - metrics: list[Metric] | None = None, + metrics: t.Optional[t.Sequence[Metric]] = None, llm: t.Optional[BaseRagasLLM | LangchainLLM] = None, embeddings: t.Optional[BaseRagasEmbeddings | LangchainEmbeddings] = None, callbacks: Callbacks = None, @@ -65,6 +65,7 @@ def evaluate( raise_exceptions: bool = False, column_map: t.Optional[t.Dict[str, str]] = None, show_progress: bool = True, + batch_size: t.Optional[int] = None, ) -> EvaluationResult: """ Run the evaluation on the dataset with different metrics @@ -110,6 +111,8 @@ def evaluate( column_map can be given as {"contexts":"contexts_v1"} show_progress: bool, optional Whether to show the progress bar during evaluation. If set to False, the progress bar will be disabled. Default is True. + batch_size: int, optional + How large should batches be. If set to None (default), no batching is done. Returns ------- @@ -223,6 +226,7 @@ def evaluate( raise_exceptions=raise_exceptions, run_config=run_config, show_progress=show_progress, + batch_size=batch_size, ) # Ragas Callbacks diff --git a/src/ragas/executor.py b/src/ragas/executor.py index c89a763c0..2ead05e17 100644 --- a/src/ragas/executor.py +++ b/src/ragas/executor.py @@ -9,6 +9,7 @@ from tqdm.auto import tqdm from ragas.run_config import RunConfig +from ragas.utils import batched logger = logging.getLogger(__name__) @@ -25,19 +26,26 @@ def is_event_loop_running() -> bool: return loop.is_running() -async def as_completed(coros, max_workers): - if max_workers == -1: - return asyncio.as_completed(coros) +async def as_completed( + coroutines: t.List[t.Coroutine], max_workers: int +) -> t.Iterator[asyncio.Future]: + """ + Wrap coroutines with a semaphore if max_workers is specified. - semaphore = asyncio.Semaphore(max_workers) + Returns an iterator of futures that completes as tasks finish. + """ + if max_workers == -1: + tasks = [asyncio.create_task(coro) for coro in coroutines] - async def sema_coro(coro): - async with semaphore: - return await coro + else: + semaphore = asyncio.Semaphore(max_workers) - sema_coros = [sema_coro(c) for c in coros] + async def sema_coro(coro): + async with semaphore: + return await coro - return asyncio.as_completed(sema_coros) + tasks = [asyncio.create_task(sema_coro(coro)) for coro in coroutines] + return asyncio.as_completed(tasks) @dataclass @@ -57,6 +65,8 @@ class Executor: List of jobs to execute raise_exceptions : bool Whether to raise exceptions or log them + batch_size : int + Whether to batch (large) lists of tasks run_config : RunConfig Configuration for the run _nest_asyncio_applied : bool @@ -68,14 +78,19 @@ class Executor: keep_progress_bar: bool = True jobs: t.List[t.Any] = field(default_factory=list, repr=False) raise_exceptions: bool = False + batch_size: t.Optional[int] = None run_config: t.Optional[RunConfig] = field(default=None, repr=False) _nest_asyncio_applied: bool = field(default=False, repr=False) - def wrap_callable_with_index(self, callable: t.Callable, counter): - async def wrapped_callable_async(*args, **kwargs): - result = np.nan + def wrap_callable_with_index( + self, callable: t.Callable, counter: int + ) -> t.Callable: + async def wrapped_callable_async( + *args, **kwargs + ) -> t.Tuple[int, t.Callable | float]: try: result = await callable(*args, **kwargs) + return counter, result except Exception as e: if self.raise_exceptions: raise e @@ -89,69 +104,112 @@ async def wrapped_callable_async(*args, **kwargs): exec_message, exc_info=False, ) - - return counter, result + return counter, np.nan return wrapped_callable_async def submit( - self, callable: t.Callable, *args, name: t.Optional[str] = None, **kwargs - ): + self, + callable: t.Callable, + *args, + name: t.Optional[str] = None, + **kwargs, + ) -> None: """ - Submit a job to be executed. This will wrap the callable with error handling - and indexing to keep track of the job index. + Submit a job to be executed, wrapping the callable with error handling and indexing to keep track of the job index. """ callable_with_index = self.wrap_callable_with_index(callable, len(self.jobs)) self.jobs.append((callable_with_index, args, kwargs, name)) + async def _process_jobs(self) -> t.List[t.Any]: + """Execute jobs with optional progress tracking.""" + max_workers = (self.run_config or RunConfig()).max_workers + results = [] + + if not self.batch_size: + with tqdm( + total=len(self.jobs), + desc=self.desc, + disable=not self.show_progress, + ) as pbar: + # Create coroutines + coroutines = [ + afunc(*args, **kwargs) for afunc, args, kwargs, _ in self.jobs + ] + for future in await as_completed(coroutines, max_workers): + result = await future + results.append(result) + pbar.update(1) + + return results + + # With batching, show nested progress bars + batches = batched(self.jobs, self.batch_size) # generator of job tuples + n_batches = (len(self.jobs) + self.batch_size - 1) // self.batch_size + + with ( + tqdm( + total=len(self.jobs), + desc=self.desc, + disable=not self.show_progress, + position=1, + leave=True, + ) as overall_pbar, + tqdm( + total=min(self.batch_size, len(self.jobs)), + desc=f"Batch 1/{n_batches}", + disable=not self.show_progress, + position=0, + leave=False, + ) as batch_pbar, + ): + for i, batch in enumerate(batches, 1): + batch_pbar.reset(total=len(batch)) + batch_pbar.set_description(f"Batch {i}/{n_batches}") + + # Create coroutines per batch + coroutines = [ + afunc(*args, **kwargs) for afunc, args, kwargs, _ in batch + ] + for future in await as_completed(coroutines, max_workers): + result = await future + results.append(result) + overall_pbar.update(1) + batch_pbar.update(1) + + return results + def results(self) -> t.List[t.Any]: """ - Execute all submitted jobs and return their results. The results are returned in - the order of job submission. + Execute all submitted jobs and return their results. The results are returned in the order of job submission. """ if is_event_loop_running(): # an event loop is running so call nested_asyncio to fix this try: import nest_asyncio - except ImportError: + except ImportError as e: raise ImportError( - "It seems like your running this in a jupyter-like environment. Please install nest_asyncio with `pip install nest_asyncio` to make it work." - ) - - if not self._nest_asyncio_applied: - nest_asyncio.apply() - self._nest_asyncio_applied = True - - # create a generator for which returns tasks as they finish - futures_as_they_finish = as_completed( - coros=[afunc(*args, **kwargs) for afunc, args, kwargs, _ in self.jobs], - max_workers=(self.run_config or RunConfig()).max_workers, - ) - - async def _aresults() -> t.List[t.Any]: - results = [] - for future in tqdm( - await futures_as_they_finish, - desc=self.desc, - total=len(self.jobs), - # whether you want to keep the progress bar after completion - leave=self.keep_progress_bar, - disable=not self.show_progress, - ): - r = await future - results.append(r) - - return results - - results = asyncio.run(_aresults()) + "It seems like your running this in a jupyter-like environment. " + "Please install nest_asyncio with `pip install nest_asyncio` to make it work." + ) from e + else: + if not self._nest_asyncio_applied: + nest_asyncio.apply() + self._nest_asyncio_applied = True + + results = asyncio.run(self._process_jobs()) sorted_results = sorted(results, key=lambda x: x[0]) return [r[1] for r in sorted_results] -def run_async_batch(desc: str, func: t.Callable, kwargs_list: t.List[t.Dict]): +def run_async_batch( + desc: str, + func: t.Callable, + kwargs_list: t.List[t.Dict], + batch_size: t.Optional[int] = None, +): """ - A utility function to run the same async function with different arguments in - parallel. + Provide functionality to run the same async function with different arguments in parallel. """ run_config = RunConfig() executor = Executor( @@ -159,6 +217,7 @@ def run_async_batch(desc: str, func: t.Callable, kwargs_list: t.List[t.Dict]): keep_progress_bar=False, raise_exceptions=True, run_config=run_config, + batch_size=batch_size, ) for kwargs in kwargs_list: diff --git a/src/ragas/integrations/llama_index.py b/src/ragas/integrations/llama_index.py index d833670f4..b93961800 100644 --- a/src/ragas/integrations/llama_index.py +++ b/src/ragas/integrations/llama_index.py @@ -34,6 +34,7 @@ def evaluate( callbacks: t.Optional[Callbacks] = None, in_ci: bool = False, run_config: t.Optional[RunConfig] = None, + batch_size: t.Optional[int] = None, token_usage_parser: t.Optional[TokenUsageParser] = None, raise_exceptions: bool = False, column_map: t.Optional[t.Dict[str, str]] = None, @@ -59,6 +60,7 @@ def evaluate( show_progress=show_progress, raise_exceptions=raise_exceptions, run_config=run_config, + batch_size=batch_size, ) # check if multi-turn diff --git a/src/ragas/llms/base.py b/src/ragas/llms/base.py index f8704dfe1..d34b9b795 100644 --- a/src/ragas/llms/base.py +++ b/src/ragas/llms/base.py @@ -157,7 +157,10 @@ def is_finished(self, response: LLMResult) -> bool: # if generation_info is empty, we parse the response_metadata # this is less reliable - elif isinstance(resp, ChatGeneration) and t.cast(ChatGeneration, resp).message is not None: + elif ( + isinstance(resp, ChatGeneration) + and t.cast(ChatGeneration, resp).message is not None + ): resp_message: BaseMessage = t.cast(ChatGeneration, resp).message if resp_message.response_metadata.get("finish_reason") is not None: is_finished_list.append( diff --git a/src/ragas/testset/synthesizers/generate.py b/src/ragas/testset/synthesizers/generate.py index f8fc1363b..67fae1132 100644 --- a/src/ragas/testset/synthesizers/generate.py +++ b/src/ragas/testset/synthesizers/generate.py @@ -150,14 +150,14 @@ def generate_with_langchain_docs( # force the user to provide an llm and embedding client to prevent use of default LLMs if not self.llm and not transforms_llm: raise ValueError( - """An llm client was not provided. - Provide an LLM on TestsetGenerator instantiation or as an argument for transforms_llm parameter. + """An llm client was not provided. + Provide an LLM on TestsetGenerator instantiation or as an argument for transforms_llm parameter. Alternatively you can provide your own transforms through the `transforms` parameter.""" ) if not self.embedding_model and not transforms_embedding_model: raise ValueError( - """An embedding client was not provided. - Provide an embedding model on TestsetGenerator instantiation or as an argument for transforms_llm parameter. + """An embedding client was not provided. + Provide an embedding model on TestsetGenerator instantiation or as an argument for transforms_llm parameter. Alternatively you can provide your own transforms through the `transforms` parameter.""" ) @@ -272,6 +272,7 @@ def generate( testset_size: int, query_distribution: t.Optional[QueryDistribution] = None, run_config: t.Optional[RunConfig] = None, + batch_size: t.Optional[int] = None, callbacks: t.Optional[Callbacks] = None, token_usage_parser: t.Optional[TokenUsageParser] = None, with_debugging_logs=False, @@ -287,14 +288,16 @@ def generate( query_distribution : Optional[QueryDistribution], optional A list of tuples containing scenario simulators and their probabilities. If None, default simulators will be used. + run_config : Optional[RunConfig], optional + Configuration for running the generation process. + batch_size: int, optional + How large should batches be. If set to None (default), no batching is done. callbacks : Optional[Callbacks], optional Langchain style callbacks to use for the generation process. You can use this to log the generation process or add other metadata. token_usage_parser : Optional[TokenUsageParser], optional Parse the LLMResult object and return a TokenUsage object. This is used to calculate the cost of the generation process. - run_config : Optional[RunConfig], optional - Configuration for running the generation process. with_debugging_logs : bool, default False If True, enable debug logging for various components. raise_exceptions : bool, default True @@ -369,6 +372,7 @@ def generate( raise_exceptions=raise_exceptions, run_config=run_config, keep_progress_bar=False, + batch_size=batch_size, ) # generate samples splits, _ = calculate_split_values( @@ -403,6 +407,7 @@ def generate( raise_exceptions=raise_exceptions, run_config=run_config, keep_progress_bar=True, + batch_size=batch_size, ) additional_testset_info: t.List[t.Dict] = [] for i, (synthesizer, _) in enumerate(query_distribution): diff --git a/src/ragas/testset/transforms/engine.py b/src/ragas/testset/transforms/engine.py index f7ee7faef..0eb312b86 100644 --- a/src/ragas/testset/transforms/engine.py +++ b/src/ragas/testset/transforms/engine.py @@ -4,7 +4,9 @@ import logging import typing as t -from ragas.executor import as_completed, is_event_loop_running, tqdm +from tqdm.auto import tqdm + +from ragas.executor import as_completed, is_event_loop_running from ragas.run_config import RunConfig from ragas.testset.graph import KnowledgeGraph from ragas.testset.transforms.base import BaseGraphTransformation diff --git a/src/ragas/utils.py b/src/ragas/utils.py index 59a835a7d..a9bc34390 100644 --- a/src/ragas/utils.py +++ b/src/ragas/utils.py @@ -1,5 +1,6 @@ from __future__ import annotations +import itertools import logging import os import re @@ -221,3 +222,13 @@ def camel_to_snake(name): """ pattern = re.compile(r"(? t.Iterator[t.Tuple]: + """Batch data from the iterable into tuples of length n. The last batch may be shorter than n.""" + # batched('ABCDEFG', 3) → ABC DEF G + if n < 1: + raise ValueError("n must be at least one") + iterator = iter(iterable) + while batch := tuple(itertools.islice(iterator, n)): + yield batch diff --git a/src/ragas/validation.py b/src/ragas/validation.py index 37739bdaa..a247eed18 100644 --- a/src/ragas/validation.py +++ b/src/ragas/validation.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import typing as t from datasets import Dataset, Sequence @@ -53,7 +54,7 @@ def get_supported_metric_type(ds: EvaluationDataset): raise ValueError(f"Unsupported sample type {sample_type}") -def validate_required_columns(ds: EvaluationDataset, metrics: list[Metric]): +def validate_required_columns(ds: EvaluationDataset, metrics: t.Sequence[Metric]): metric_type = get_supported_metric_type(ds) for m in metrics: required_columns = set(m.required_columns.get(metric_type, [])) @@ -66,7 +67,7 @@ def validate_required_columns(ds: EvaluationDataset, metrics: list[Metric]): ) -def validate_supported_metrics(ds: EvaluationDataset, metrics: list[Metric]): +def validate_supported_metrics(ds: EvaluationDataset, metrics: t.Sequence[Metric]): data_type = ds.get_sample_type() for m in metrics: if data_type == SingleTurnSample: diff --git a/tests/unit/test_async_utils.py b/tests/unit/test_async_utils.py new file mode 100644 index 000000000..2666a35ff --- /dev/null +++ b/tests/unit/test_async_utils.py @@ -0,0 +1,38 @@ +import pytest + +from ragas.async_utils import run_async_tasks + + +@pytest.fixture +def tasks(): + async def echo_order(index: int): + return index + + return [echo_order(i) for i in range(1, 11)] + + +@pytest.mark.asyncio +async def test_run_async_tasks_unbatched(tasks): + # Act + results = run_async_tasks(tasks) + + # Assert + assert sorted(results) == sorted(range(1, 11)) + + +@pytest.mark.asyncio +async def test_run_async_tasks_batched(tasks): + # Act + results = run_async_tasks(tasks, batch_size=3) + + # Assert + assert sorted(results) == sorted(range(1, 11)) + + +@pytest.mark.asyncio +async def test_run_async_tasks_no_progress(tasks): + # Act + results = run_async_tasks(tasks, show_progress=False) + + # Assert + assert sorted(results) == sorted(range(1, 11)) diff --git a/tests/unit/test_executor.py b/tests/unit/test_executor.py index ea143c9ea..be08caaa9 100644 --- a/tests/unit/test_executor.py +++ b/tests/unit/test_executor.py @@ -1,67 +1,66 @@ import asyncio +import time import pytest +from ragas.executor import Executor -@pytest.mark.asyncio -async def test_order_of_execution(): - from ragas.executor import Executor +@pytest.mark.asyncio +@pytest.mark.parametrize("batch_size", [None, 3, 20]) +async def test_order_of_execution(batch_size): async def echo_order(index: int): + await asyncio.sleep(1 / index) return index # Arrange - executor = Executor() - - # Act + executor = Executor(batch_size=batch_size) # add 10 jobs to the executor for i in range(1, 11): executor.submit(echo_order, i, name=f"echo_order_{i}") + + # Act results = executor.results() # Assert assert results == list(range(1, 11)) @pytest.mark.asyncio -async def test_executor_in_script(): - from ragas.executor import Executor - +@pytest.mark.parametrize("batch_size", [None, 3, 20]) +async def test_executor_in_script(batch_size): async def echo_order(index: int): - await asyncio.sleep(0.1) + await asyncio.sleep(1 / index) return index # Arrange - executor = Executor() - - # Act + executor = Executor(batch_size=batch_size) # add 10 jobs to the executor for i in range(1, 4): executor.submit(echo_order, i, name=f"echo_order_{i}") + + # Act results = executor.results() # Assert assert results == list(range(1, 4)) @pytest.mark.asyncio -async def test_executor_with_running_loop(): - import asyncio - - from ragas.executor import Executor - +@pytest.mark.parametrize("batch_size", [None, 3, 20]) +async def test_executor_with_running_loop(batch_size): loop = asyncio.new_event_loop() loop.run_until_complete(asyncio.sleep(0.1)) async def echo_order(index: int): - await asyncio.sleep(0.1) + await asyncio.sleep(1 / index) return index # Arrange - executor = Executor() + executor = Executor(batch_size=batch_size) + for i in range(1, 4): + executor.submit(echo_order, i, name=f"echo_order_{i}") # Act # add 10 jobs to the executor - for i in range(1, 4): - executor.submit(echo_order, i, name=f"echo_order_{i}") results = executor.results() # Assert assert results == list(range(1, 4)) @@ -90,3 +89,25 @@ async def _run(): results = asyncio.run(_run()) assert results == [1, 2, 3] + + +def test_executor_timings(): + # if we submit n tasks that take 1 second each, + # the total time taken should be close to 1 second + + executor = Executor() + + async def long_task(): + await asyncio.sleep(0.1) + return 1 + + n_tasks = 5 + for i in range(n_tasks): + executor.submit(long_task, name=f"long_task_{i}") + + start_time = time.time() + results = executor.results() + end_time = time.time() + assert len(results) == n_tasks + assert all(r == 1 for r in results) + assert end_time - start_time < 0.2 diff --git a/tests/unit/test_executor_in_jupyter.ipynb b/tests/unit/test_executor_in_jupyter.ipynb index c3fa270b5..e8922e934 100644 --- a/tests/unit/test_executor_in_jupyter.ipynb +++ b/tests/unit/test_executor_in_jupyter.ipynb @@ -1,72 +1,65 @@ { "cells": [ { - "cell_type": "markdown", + "cell_type": "code", + "execution_count": null, "metadata": {}, + "outputs": [], "source": [ - "# Test Executor " + "import asyncio\n", + "from random import random" ] }, { "cell_type": "code", - "execution_count": 14, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "ebb0705d6a05459a89f4ae87cbbbfd84", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "Evaluating: 0%| | 0/10 [00:00