Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 81 additions & 2 deletions src/ragas/_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
import json
import logging
import os
import time
import typing as t
import uuid
from functools import lru_cache, wraps
from threading import Lock
from typing import List

import requests
from appdirs import user_data_dir
from pydantic import BaseModel, Field

from ragas._version import __version__
from ragas.utils import get_debug_mode

if t.TYPE_CHECKING:
Expand Down Expand Up @@ -81,14 +85,15 @@ def get_userid() -> str:
class BaseEvent(BaseModel):
event_type: str
user_id: str = Field(default_factory=get_userid)
ragas_version: str = Field(default=__version__)


class EvaluationEvent(BaseEvent):
metrics: t.List[str]
evaluation_mode: str
num_rows: int
evaluation_type: t.Literal["SINGLE_TURN", "MULTI_TURN"]
language: str
in_ci: bool
event_type: str = "evaluation"


class TestsetGenerationEvent(BaseEvent):
Expand All @@ -100,6 +105,76 @@ class TestsetGenerationEvent(BaseEvent):
version: str = "3" # the version of testset generation pipeline


class AnalyticsBatcher:
def __init__(self, batch_size: int = 50, flush_interval: float = 120):
"""
Initialize an AnalyticsBatcher instance.

Args:
batch_size (int, optional): Maximum number of events to batch before flushing. Defaults to 50.
flush_interval (float, optional): Maximum time in seconds between flushes. Defaults to 5.
"""
self.buffer: List[EvaluationEvent] = []
self.lock = Lock()
self.last_flush_time = time.time()
self.BATCH_SIZE = batch_size
self.FLUSH_INTERVAL = flush_interval # seconds

def add_evaluation(self, evaluation_event: EvaluationEvent) -> None:
with self.lock:
self.buffer.append(evaluation_event)

if (
len(self.buffer) >= self.BATCH_SIZE
or (time.time() - self.last_flush_time) > self.FLUSH_INTERVAL
):
self.flush()

def _join_evaluation_events(
self, events: List[EvaluationEvent]
) -> List[EvaluationEvent]:
"""
Join multiple evaluation events into a single event and increase the num_rows.
Group properties except for num_rows.
"""
if not events:
return []

# Group events by their properties (except num_rows)
grouped_events = {}
for event in events:
key = (
event.event_type,
tuple(event.metrics),
event.evaluation_type,
)
if key not in grouped_events:
grouped_events[key] = event
else:
grouped_events[key].num_rows += event.num_rows

# Convert grouped events back to a list
return list(grouped_events.values())

def flush(self) -> None:
# if no events to send, do nothing
if not self.buffer:
return

try:
# join all the EvaluationEvents into a single event and send it
events_to_send = self._join_evaluation_events(self.buffer)
for event in events_to_send:
track(event)
except Exception as err:
if _usage_event_debugging():
logger.error("Tracking Error: %s", err, stack_info=True, stacklevel=3)
finally:
with self.lock:
self.buffer = []
self.last_flush_time = time.time()


@silent
def track(event_properties: BaseEvent):
if do_not_track():
Expand Down Expand Up @@ -133,3 +208,7 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> t.Any:
return result

return wrapper


# Create a global batcher instance
_analytics_batcher = AnalyticsBatcher(batch_size=10, flush_interval=10)
19 changes: 2 additions & 17 deletions src/ragas/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@

import typing as t

import numpy as np
from datasets import Dataset
from langchain_core.callbacks import BaseCallbackHandler, BaseCallbackManager
from langchain_core.embeddings import Embeddings as LangchainEmbeddings
from langchain_core.language_models import BaseLanguageModel as LangchainLLM

from ragas._analytics import EvaluationEvent, track, track_was_completed
from ragas._analytics import track_was_completed
from ragas.callbacks import ChainType, RagasTracer, new_group
from ragas.dataset_schema import (
EvaluationDataset,
Expand Down Expand Up @@ -37,7 +36,7 @@
is_reproducable,
)
from ragas.run_config import RunConfig
from ragas.utils import convert_v1_to_v2_dataset, get_feature_language
from ragas.utils import convert_v1_to_v2_dataset
from ragas.validation import (
remap_column_names,
validate_required_columns,
Expand Down Expand Up @@ -351,18 +350,4 @@ def evaluate(
for i in reproducable_metrics:
metrics[i].reproducibility = 1 # type: ignore

# log the evaluation event
metrics_names = [m.name for m in metrics]
metric_lang = [get_feature_language(m) for m in metrics]
metric_lang = np.unique([m for m in metric_lang if m is not None])
track(
EvaluationEvent(
event_type="evaluation",
metrics=metrics_names,
evaluation_mode="",
num_rows=len(dataset),
language=metric_lang[0] if len(metric_lang) > 0 else "",
in_ci=in_ci,
)
)
return result
49 changes: 48 additions & 1 deletion src/ragas/metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@

from pysbd import Segmenter

from ragas._analytics import EvaluationEvent, _analytics_batcher
from ragas.callbacks import ChainType, new_group
from ragas.dataset_schema import MultiTurnSample, SingleTurnSample
from ragas.executor import is_event_loop_running
from ragas.prompt import PromptMixin
from ragas.run_config import RunConfig
from ragas.utils import RAGAS_SUPPORTED_LANGUAGE_CODES, camel_to_snake, deprecated
from ragas.utils import (
RAGAS_SUPPORTED_LANGUAGE_CODES,
camel_to_snake,
deprecated,
get_metric_language,
)

if t.TYPE_CHECKING:
from langchain_core.callbacks import Callbacks
Expand Down Expand Up @@ -286,6 +292,16 @@ def single_turn_score(
else:
if not group_cm.ended:
rm.on_chain_end({"output": score})

# track the evaluation event
_analytics_batcher.add_evaluation(
EvaluationEvent(
metrics=[self.name],
num_rows=1,
evaluation_type=MetricType.SINGLE_TURN.name,
language=get_metric_language(self),
)
)
return score

async def single_turn_ascore(
Expand Down Expand Up @@ -320,6 +336,16 @@ async def single_turn_ascore(
else:
if not group_cm.ended:
rm.on_chain_end({"output": score})

# track the evaluation event
_analytics_batcher.add_evaluation(
EvaluationEvent(
metrics=[self.name],
num_rows=1,
evaluation_type=MetricType.SINGLE_TURN.name,
language=get_metric_language(self),
)
)
return score

@abstractmethod
Expand Down Expand Up @@ -394,6 +420,16 @@ def multi_turn_score(
else:
if not group_cm.ended:
rm.on_chain_end({"output": score})

# track the evaluation event
_analytics_batcher.add_evaluation(
EvaluationEvent(
metrics=[self.name],
num_rows=1,
evaluation_type=MetricType.SINGLE_TURN.name,
language=get_metric_language(self),
)
)
return score

async def multi_turn_ascore(
Expand Down Expand Up @@ -428,6 +464,17 @@ async def multi_turn_ascore(
else:
if not group_cm.ended:
rm.on_chain_end({"output": score})

# track the evaluation event
_analytics_batcher.add_evaluation(
EvaluationEvent(
metrics=[self.name],
num_rows=1,
evaluation_type=MetricType.SINGLE_TURN.name,
language=get_metric_language(self),
)
)

return score

@abstractmethod
Expand Down
6 changes: 3 additions & 3 deletions src/ragas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ def is_nan(x):
return False


def get_feature_language(feature: Metric) -> t.Optional[str]:
def get_metric_language(metric: Metric) -> str:
from ragas.prompt import BasePrompt

languags = [
value.language
for _, value in vars(feature).items()
for _, value in vars(metric).items()
if isinstance(value, BasePrompt)
]
return languags[0] if len(languags) > 0 else None
return languags[0] if len(languags) > 0 else ""


def deprecated(
Expand Down
Loading
Loading