From 5418c7fcc8cde0b2184592136352627729ecccb9 Mon Sep 17 00:00:00 2001 From: jjmachan Date: Thu, 28 Nov 2024 19:20:24 +0530 Subject: [PATCH 1/4] feat: added a batcher to ragas --- src/ragas/_analytics.py | 80 +++++++++++++++++++++++++ tests/unit/test_analytics.py | 112 +++++++++++++++++++++++++++++++++++ 2 files changed, 192 insertions(+) diff --git a/src/ragas/_analytics.py b/src/ragas/_analytics.py index 458256c09..914f4ac24 100644 --- a/src/ragas/_analytics.py +++ b/src/ragas/_analytics.py @@ -3,9 +3,12 @@ import json import logging import os +import time import typing as t import uuid from functools import lru_cache, wraps +from threading import Lock +from typing import List import requests from appdirs import user_data_dir @@ -100,6 +103,79 @@ class TestsetGenerationEvent(BaseEvent): version: str = "3" # the version of testset generation pipeline +class AnalyticsBatcher: + def __init__(self, batch_size: int = 50, flush_interval: float = 120): + """ + Initialize an AnalyticsBatcher instance. + + Args: + batch_size (int, optional): Maximum number of events to batch before flushing. Defaults to 50. + flush_interval (float, optional): Maximum time in seconds between flushes. Defaults to 5. + """ + self.buffer: List[EvaluationEvent] = [] + self.lock = Lock() + self.last_flush_time = time.time() + self.BATCH_SIZE = batch_size + self.FLUSH_INTERVAL = flush_interval # seconds + + def add_evaluation(self, evaluation_event: EvaluationEvent) -> None: + with self.lock: + self.buffer.append(evaluation_event) + if ( + len(self.buffer) >= self.BATCH_SIZE + or (time.time() - self.last_flush_time) > self.FLUSH_INTERVAL + ): + self.flush() + + def _join_evaluation_events( + self, events: List[EvaluationEvent] + ) -> List[EvaluationEvent]: + """ + Join multiple evaluation events into a single event and increase the num_rows. + Group properties except for num_rows. + """ + if not events: + return [] + + # Group events by their properties (except num_rows) + grouped_events = {} + for event in events: + key = ( + event.event_type, + tuple(event.metrics), + event.evaluation_mode, + event.language, + event.in_ci, + ) + if key not in grouped_events: + grouped_events[key] = event + else: + grouped_events[key].num_rows += event.num_rows + + # Convert grouped events back to a list + return list(grouped_events.values()) + + def flush(self) -> None: + with self.lock: + # if no events to send, do nothing + if not self.buffer: + return + + try: + # join all the EvaluationEvents into a single event and send it + events_to_send = self._join_evaluation_events(self.buffer) + for event in events_to_send: + track(event) + except Exception as err: + if _usage_event_debugging(): + logger.error( + "Tracking Error: %s", err, stack_info=True, stacklevel=3 + ) + finally: + self.buffer = [] + self.last_flush_time = time.time() + + @silent def track(event_properties: BaseEvent): if do_not_track(): @@ -133,3 +209,7 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> t.Any: return result return wrapper + + +# Create a global batcher instance +_analytics_batcher = AnalyticsBatcher() diff --git a/tests/unit/test_analytics.py b/tests/unit/test_analytics.py index 7f263d51c..d653687a6 100644 --- a/tests/unit/test_analytics.py +++ b/tests/unit/test_analytics.py @@ -1,5 +1,7 @@ from __future__ import annotations +import math +import time import typing as t import numpy as np @@ -7,6 +9,7 @@ from langchain_core.outputs import Generation, LLMResult from langchain_core.prompt_values import StringPromptValue as PromptValue +from ragas._analytics import EvaluationEvent from ragas.llms.base import BaseRagasLLM @@ -187,3 +190,112 @@ def test(raise_error=True): assert event_properties_list[-1].event_type == "test" assert event_properties_list[-1].is_completed is True + + +evaluation_events_and_num_rows = [ + ( # 5 same events + [ + EvaluationEvent( + event_type="evaluation", + metrics=["harmfulness"], + num_rows=1, + evaluation_mode="", + language="english", + in_ci=True, + ) + for _ in range(5) + ], + [5], + ), + ( # 5 different events with different metrics + [ + EvaluationEvent( + event_type="evaluation", + metrics=[f"harmfulness_{i}"], + num_rows=1, + evaluation_mode="", + language="english", + in_ci=True, + ) + for i in range(5) + ], + [1, 1, 1, 1, 1], + ), + ( # 5 different events with different num_rows but 2 group of metrics + [ + EvaluationEvent( + event_type="evaluation", + metrics=[f"harmfulness"], + num_rows=1, + evaluation_mode="", + language="english", + in_ci=True, + ) + for i in range(10) + ] + + [ + EvaluationEvent( + event_type="evaluation", + metrics=["accuracy"], + num_rows=1, + evaluation_mode="", + language="english", + in_ci=True, + ) + for i in range(5) + ], + [10, 5], + ), +] + + +@pytest.mark.parametrize( + "evaluation_events, expected_num_rows_set", evaluation_events_and_num_rows +) +def test_analytics_batcher_join_evaluation_events( + monkeypatch, evaluation_events, expected_num_rows_set +): + """ + Test if the batcher joins the evaluation events correctly + """ + from ragas._analytics import AnalyticsBatcher + + batcher = AnalyticsBatcher() + + joined_events = batcher._join_evaluation_events(evaluation_events) + assert len(joined_events) == len(expected_num_rows_set) + assert sorted(e.num_rows for e in joined_events) == sorted(expected_num_rows_set) + + +@pytest.mark.skip(reason="no working yet") +@pytest.mark.parametrize( + "evaluation_events, expected_num_rows_set", evaluation_events_and_num_rows +) +def test_analytics_batcher_flush(monkeypatch, evaluation_events, expected_num_rows_set): + """ + Test if the batcher flushes the events correctly + """ + from ragas._analytics import AnalyticsBatcher + + FLUSH_INTERVAL = 0.3 + BATCH_SIZE = 5 + batcher = AnalyticsBatcher(batch_size=BATCH_SIZE, flush_interval=FLUSH_INTERVAL) + + # Use a list to hold the counter so it can be modified in the nested function + flush_mock_call_count = [0] + + def flush_mock(): + # Access the list and modify its first element + flush_mock_call_count[0] += 1 + + monkeypatch.setattr(batcher, "flush", flush_mock) + + for event in evaluation_events[:-1]: + batcher.add_evaluation(event) + + # Access the counter using flush_mock_call_count[0] + time.sleep(FLUSH_INTERVAL + 0.1) + batcher.add_evaluation(evaluation_events[-1]) + assert flush_mock_call_count[0] == math.ceil( + sum(expected_num_rows_set) / BATCH_SIZE + ) From 8320cb9415debda12a42a6fb22ee48e05c01d6b5 Mon Sep 17 00:00:00 2001 From: jjmachan Date: Fri, 29 Nov 2024 00:33:29 +0530 Subject: [PATCH 2/4] feat: added it into metrics --- src/ragas/_analytics.py | 52 +++++++++++++++++------------------- src/ragas/evaluation.py | 19 ++----------- src/ragas/metrics/base.py | 38 ++++++++++++++++++++++++++ tests/unit/test_analytics.py | 30 +++++++-------------- 4 files changed, 73 insertions(+), 66 deletions(-) diff --git a/src/ragas/_analytics.py b/src/ragas/_analytics.py index 914f4ac24..829e3740b 100644 --- a/src/ragas/_analytics.py +++ b/src/ragas/_analytics.py @@ -88,10 +88,9 @@ class BaseEvent(BaseModel): class EvaluationEvent(BaseEvent): metrics: t.List[str] - evaluation_mode: str num_rows: int - language: str - in_ci: bool + evaluation_type: t.Literal["SINGLE_TURN", "MULTI_TURN"] + event_type: str = "evaluation" class TestsetGenerationEvent(BaseEvent): @@ -121,11 +120,12 @@ def __init__(self, batch_size: int = 50, flush_interval: float = 120): def add_evaluation(self, evaluation_event: EvaluationEvent) -> None: with self.lock: self.buffer.append(evaluation_event) - if ( - len(self.buffer) >= self.BATCH_SIZE - or (time.time() - self.last_flush_time) > self.FLUSH_INTERVAL - ): - self.flush() + + if ( + len(self.buffer) >= self.BATCH_SIZE + or (time.time() - self.last_flush_time) > self.FLUSH_INTERVAL + ): + self.flush() def _join_evaluation_events( self, events: List[EvaluationEvent] @@ -143,9 +143,7 @@ def _join_evaluation_events( key = ( event.event_type, tuple(event.metrics), - event.evaluation_mode, - event.language, - event.in_ci, + event.evaluation_type, ) if key not in grouped_events: grouped_events[key] = event @@ -156,22 +154,20 @@ def _join_evaluation_events( return list(grouped_events.values()) def flush(self) -> None: - with self.lock: - # if no events to send, do nothing - if not self.buffer: - return - - try: - # join all the EvaluationEvents into a single event and send it - events_to_send = self._join_evaluation_events(self.buffer) - for event in events_to_send: - track(event) - except Exception as err: - if _usage_event_debugging(): - logger.error( - "Tracking Error: %s", err, stack_info=True, stacklevel=3 - ) - finally: + # if no events to send, do nothing + if not self.buffer: + return + + try: + # join all the EvaluationEvents into a single event and send it + events_to_send = self._join_evaluation_events(self.buffer) + for event in events_to_send: + track(event) + except Exception as err: + if _usage_event_debugging(): + logger.error("Tracking Error: %s", err, stack_info=True, stacklevel=3) + finally: + with self.lock: self.buffer = [] self.last_flush_time = time.time() @@ -212,4 +208,4 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> t.Any: # Create a global batcher instance -_analytics_batcher = AnalyticsBatcher() +_analytics_batcher = AnalyticsBatcher(batch_size=10, flush_interval=10) diff --git a/src/ragas/evaluation.py b/src/ragas/evaluation.py index fd53925b3..a1878c43f 100644 --- a/src/ragas/evaluation.py +++ b/src/ragas/evaluation.py @@ -2,13 +2,12 @@ import typing as t -import numpy as np from datasets import Dataset from langchain_core.callbacks import BaseCallbackHandler, BaseCallbackManager from langchain_core.embeddings import Embeddings as LangchainEmbeddings from langchain_core.language_models import BaseLanguageModel as LangchainLLM -from ragas._analytics import EvaluationEvent, track, track_was_completed +from ragas._analytics import track_was_completed from ragas.callbacks import ChainType, RagasTracer, new_group from ragas.dataset_schema import ( EvaluationDataset, @@ -37,7 +36,7 @@ is_reproducable, ) from ragas.run_config import RunConfig -from ragas.utils import convert_v1_to_v2_dataset, get_feature_language +from ragas.utils import convert_v1_to_v2_dataset from ragas.validation import ( remap_column_names, validate_required_columns, @@ -351,18 +350,4 @@ def evaluate( for i in reproducable_metrics: metrics[i].reproducibility = 1 # type: ignore - # log the evaluation event - metrics_names = [m.name for m in metrics] - metric_lang = [get_feature_language(m) for m in metrics] - metric_lang = np.unique([m for m in metric_lang if m is not None]) - track( - EvaluationEvent( - event_type="evaluation", - metrics=metrics_names, - evaluation_mode="", - num_rows=len(dataset), - language=metric_lang[0] if len(metric_lang) > 0 else "", - in_ci=in_ci, - ) - ) return result diff --git a/src/ragas/metrics/base.py b/src/ragas/metrics/base.py index 978954ec7..10f236abb 100644 --- a/src/ragas/metrics/base.py +++ b/src/ragas/metrics/base.py @@ -10,6 +10,7 @@ from pysbd import Segmenter +from ragas._analytics import EvaluationEvent, _analytics_batcher from ragas.callbacks import ChainType, new_group from ragas.dataset_schema import MultiTurnSample, SingleTurnSample from ragas.executor import is_event_loop_running @@ -286,6 +287,15 @@ def single_turn_score( else: if not group_cm.ended: rm.on_chain_end({"output": score}) + + # track the evaluation event + _analytics_batcher.add_evaluation( + EvaluationEvent( + metrics=[self.name], + num_rows=1, + evaluation_type=MetricType.SINGLE_TURN.name, + ) + ) return score async def single_turn_ascore( @@ -320,6 +330,15 @@ async def single_turn_ascore( else: if not group_cm.ended: rm.on_chain_end({"output": score}) + + # track the evaluation event + _analytics_batcher.add_evaluation( + EvaluationEvent( + metrics=[self.name], + num_rows=1, + evaluation_type=MetricType.SINGLE_TURN.name, + ) + ) return score @abstractmethod @@ -394,6 +413,15 @@ def multi_turn_score( else: if not group_cm.ended: rm.on_chain_end({"output": score}) + + # track the evaluation event + _analytics_batcher.add_evaluation( + EvaluationEvent( + metrics=[self.name], + num_rows=1, + evaluation_type=MetricType.SINGLE_TURN.name, + ) + ) return score async def multi_turn_ascore( @@ -428,6 +456,16 @@ async def multi_turn_ascore( else: if not group_cm.ended: rm.on_chain_end({"output": score}) + + # track the evaluation event + _analytics_batcher.add_evaluation( + EvaluationEvent( + metrics=[self.name], + num_rows=1, + evaluation_type=MetricType.SINGLE_TURN.name, + ) + ) + return score @abstractmethod diff --git a/tests/unit/test_analytics.py b/tests/unit/test_analytics.py index d653687a6..22568c031 100644 --- a/tests/unit/test_analytics.py +++ b/tests/unit/test_analytics.py @@ -50,17 +50,13 @@ def test_evaluation_event(): event_type="evaluation", metrics=["harmfulness"], num_rows=1, - evaluation_mode="", - language="english", - in_ci=True, + evaluation_type="SINGLE_TURN", ) payload = evaluation_event.model_dump() assert isinstance(payload.get("user_id"), str) - assert isinstance(payload.get("evaluation_mode"), str) + assert isinstance(payload.get("evaluation_type"), str) assert isinstance(payload.get("metrics"), list) - assert isinstance(payload.get("language"), str) - assert isinstance(payload.get("in_ci"), bool) def setup_user_id_filepath(tmp_path, monkeypatch): @@ -199,9 +195,7 @@ def test(raise_error=True): event_type="evaluation", metrics=["harmfulness"], num_rows=1, - evaluation_mode="", - language="english", - in_ci=True, + evaluation_type="SINGLE_TURN", ) for _ in range(5) ], @@ -213,9 +207,7 @@ def test(raise_error=True): event_type="evaluation", metrics=[f"harmfulness_{i}"], num_rows=1, - evaluation_mode="", - language="english", - in_ci=True, + evaluation_type="SINGLE_TURN", ) for i in range(5) ], @@ -224,12 +216,9 @@ def test(raise_error=True): ( # 5 different events with different num_rows but 2 group of metrics [ EvaluationEvent( - event_type="evaluation", - metrics=[f"harmfulness"], + metrics=["harmfulness"], num_rows=1, - evaluation_mode="", - language="english", - in_ci=True, + evaluation_type="SINGLE_TURN", ) for i in range(10) ] @@ -238,9 +227,7 @@ def test(raise_error=True): event_type="evaluation", metrics=["accuracy"], num_rows=1, - evaluation_mode="", - language="english", - in_ci=True, + evaluation_type="SINGLE_TURN", ) for i in range(5) ], @@ -267,7 +254,6 @@ def test_analytics_batcher_join_evaluation_events( assert sorted(e.num_rows for e in joined_events) == sorted(expected_num_rows_set) -@pytest.mark.skip(reason="no working yet") @pytest.mark.parametrize( "evaluation_events, expected_num_rows_set", evaluation_events_and_num_rows ) @@ -287,6 +273,8 @@ def test_analytics_batcher_flush(monkeypatch, evaluation_events, expected_num_ro def flush_mock(): # Access the list and modify its first element flush_mock_call_count[0] += 1 + batcher.buffer = [] + batcher.last_flush_time = time.time() monkeypatch.setattr(batcher, "flush", flush_mock) From 9ceb63429db9d08aaef4aaab7da360f68132ad23 Mon Sep 17 00:00:00 2001 From: jjmachan Date: Fri, 29 Nov 2024 00:55:49 +0530 Subject: [PATCH 3/4] feat: added language back --- src/ragas/_analytics.py | 1 + src/ragas/metrics/base.py | 11 ++++++++++- src/ragas/utils.py | 6 +++--- tests/unit/test_analytics.py | 5 +++++ 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/ragas/_analytics.py b/src/ragas/_analytics.py index 829e3740b..b1215d9db 100644 --- a/src/ragas/_analytics.py +++ b/src/ragas/_analytics.py @@ -90,6 +90,7 @@ class EvaluationEvent(BaseEvent): metrics: t.List[str] num_rows: int evaluation_type: t.Literal["SINGLE_TURN", "MULTI_TURN"] + language: str event_type: str = "evaluation" diff --git a/src/ragas/metrics/base.py b/src/ragas/metrics/base.py index 10f236abb..681bc1a41 100644 --- a/src/ragas/metrics/base.py +++ b/src/ragas/metrics/base.py @@ -16,7 +16,12 @@ from ragas.executor import is_event_loop_running from ragas.prompt import PromptMixin from ragas.run_config import RunConfig -from ragas.utils import RAGAS_SUPPORTED_LANGUAGE_CODES, camel_to_snake, deprecated +from ragas.utils import ( + RAGAS_SUPPORTED_LANGUAGE_CODES, + camel_to_snake, + deprecated, + get_metric_language, +) if t.TYPE_CHECKING: from langchain_core.callbacks import Callbacks @@ -294,6 +299,7 @@ def single_turn_score( metrics=[self.name], num_rows=1, evaluation_type=MetricType.SINGLE_TURN.name, + language=get_metric_language(self), ) ) return score @@ -337,6 +343,7 @@ async def single_turn_ascore( metrics=[self.name], num_rows=1, evaluation_type=MetricType.SINGLE_TURN.name, + language=get_metric_language(self), ) ) return score @@ -420,6 +427,7 @@ def multi_turn_score( metrics=[self.name], num_rows=1, evaluation_type=MetricType.SINGLE_TURN.name, + language=get_metric_language(self), ) ) return score @@ -463,6 +471,7 @@ async def multi_turn_ascore( metrics=[self.name], num_rows=1, evaluation_type=MetricType.SINGLE_TURN.name, + language=get_metric_language(self), ) ) diff --git a/src/ragas/utils.py b/src/ragas/utils.py index f39d92e8b..62413cbc2 100644 --- a/src/ragas/utils.py +++ b/src/ragas/utils.py @@ -85,15 +85,15 @@ def is_nan(x): return False -def get_feature_language(feature: Metric) -> t.Optional[str]: +def get_metric_language(metric: Metric) -> str: from ragas.prompt import BasePrompt languags = [ value.language - for _, value in vars(feature).items() + for _, value in vars(metric).items() if isinstance(value, BasePrompt) ] - return languags[0] if len(languags) > 0 else None + return languags[0] if len(languags) > 0 else "" def deprecated( diff --git a/tests/unit/test_analytics.py b/tests/unit/test_analytics.py index 22568c031..45d3c2673 100644 --- a/tests/unit/test_analytics.py +++ b/tests/unit/test_analytics.py @@ -50,6 +50,7 @@ def test_evaluation_event(): event_type="evaluation", metrics=["harmfulness"], num_rows=1, + language="english", evaluation_type="SINGLE_TURN", ) @@ -196,6 +197,7 @@ def test(raise_error=True): metrics=["harmfulness"], num_rows=1, evaluation_type="SINGLE_TURN", + language="english", ) for _ in range(5) ], @@ -208,6 +210,7 @@ def test(raise_error=True): metrics=[f"harmfulness_{i}"], num_rows=1, evaluation_type="SINGLE_TURN", + language="english", ) for i in range(5) ], @@ -219,6 +222,7 @@ def test(raise_error=True): metrics=["harmfulness"], num_rows=1, evaluation_type="SINGLE_TURN", + language="english", ) for i in range(10) ] @@ -228,6 +232,7 @@ def test(raise_error=True): metrics=["accuracy"], num_rows=1, evaluation_type="SINGLE_TURN", + language="english", ) for i in range(5) ], From 51509b171a9138b7dc3d9723bba2fa31c2f97c7d Mon Sep 17 00:00:00 2001 From: jjmachan Date: Fri, 29 Nov 2024 01:40:43 +0530 Subject: [PATCH 4/4] feat: added ragas_version into evaluations --- src/ragas/_analytics.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ragas/_analytics.py b/src/ragas/_analytics.py index b1215d9db..200e7a7b6 100644 --- a/src/ragas/_analytics.py +++ b/src/ragas/_analytics.py @@ -14,6 +14,7 @@ from appdirs import user_data_dir from pydantic import BaseModel, Field +from ragas._version import __version__ from ragas.utils import get_debug_mode if t.TYPE_CHECKING: @@ -84,6 +85,7 @@ def get_userid() -> str: class BaseEvent(BaseModel): event_type: str user_id: str = Field(default_factory=get_userid) + ragas_version: str = Field(default=__version__) class EvaluationEvent(BaseEvent):