diff --git a/src/ragas/embeddings/base.py b/src/ragas/embeddings/base.py index 77922d843..3d81c6900 100644 --- a/src/ragas/embeddings/base.py +++ b/src/ragas/embeddings/base.py @@ -1,17 +1,47 @@ from __future__ import annotations +import asyncio import typing as t +from abc import ABC from dataclasses import field from typing import List import numpy as np -from langchain_core.embeddings import Embeddings as BaseRagasEmbeddings +from langchain_core.embeddings import Embeddings from langchain_openai.embeddings import OpenAIEmbeddings from pydantic.dataclasses import dataclass DEFAULT_MODEL_NAME = "BAAI/bge-small-en-v1.5" +class BaseRagasEmbeddings(Embeddings, ABC): + async def embed_texts( + self, texts: List[str], is_async: bool = True + ) -> t.List[t.List[float]]: + if is_async: + return await self.aembed_documents(texts) + else: + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, self.embed_documents, texts) + + +class LangchainEmbeddingsWrapper(BaseRagasEmbeddings): + def __init__(self, embeddings: Embeddings): + self.embeddings = embeddings + + def embed_query(self, text: str) -> List[float]: + return self.embeddings.embed_query(text) + + def embed_documents(self, texts: List[str]) -> List[List[float]]: + return self.embeddings.embed_documents(texts) + + async def aembed_query(self, text: str) -> List[float]: + return await self.embeddings.aembed_query(text) + + async def aembed_documents(self, texts: List[str]) -> List[List[float]]: + return await self.embeddings.aembed_documents(texts) + + @dataclass class HuggingfaceEmbeddings(BaseRagasEmbeddings): model_name: str = DEFAULT_MODEL_NAME @@ -89,4 +119,4 @@ def predict(self, texts: List[List[str]]) -> List[List[float]]: def embedding_factory() -> BaseRagasEmbeddings: openai_embeddings = OpenAIEmbeddings() - return openai_embeddings + return LangchainEmbeddingsWrapper(openai_embeddings) diff --git a/src/ragas/evaluation.py b/src/ragas/evaluation.py index 6dfc90403..2d3793220 100644 --- a/src/ragas/evaluation.py +++ b/src/ragas/evaluation.py @@ -156,7 +156,7 @@ def evaluate( executor = Executor( desc="Evaluating", keep_progress_bar=True, - is_async=is_async, + is_async=True, max_workers=max_workers, raise_exceptions=raise_exceptions, ) @@ -174,21 +174,18 @@ def evaluate( is_async=is_async, ) row_run_managers.append((row_rm, row_group_cm)) - - if is_async: - [ - executor.submit( - metric.ascore, row, row_group_cm, name=f"{metric.name}-{i}" - ) - for metric in metrics - ] - else: - [executor.submit(metric.score, row, row_group_cm) for metric in metrics] + [ + executor.submit( + metric.ascore, row, row_group_cm, is_async, name=f"{metric.name}-{i}" + ) + for metric in metrics + ] scores = [] try: # get the results results = executor.results() + # convert results to dataset_like for i, _ in enumerate(dataset): s = {} diff --git a/src/ragas/llms/base.py b/src/ragas/llms/base.py index bd099523d..45aa73213 100644 --- a/src/ragas/llms/base.py +++ b/src/ragas/llms/base.py @@ -1,8 +1,10 @@ from __future__ import annotations +import asyncio import typing as t from abc import ABC, abstractmethod from dataclasses import dataclass +from functools import partial from langchain_community.chat_models import AzureChatOpenAI, ChatOpenAI, ChatVertexAI from langchain_community.llms import AzureOpenAI, OpenAI, VertexAI @@ -63,6 +65,38 @@ async def agenerate_text( ) -> LLMResult: ... + async def generate( + self, + prompt: PromptValue, + n: int = 1, + temperature: float = 1e-8, + stop: t.Optional[t.List[str]] = None, + callbacks: Callbacks = [], + is_async: bool = True, + ) -> LLMResult: + """Generate text using the given event loop.""" + loop = asyncio.get_event_loop() + + if is_async: + return await self.agenerate_text( + prompt=prompt, + n=n, + temperature=temperature, + stop=stop, + callbacks=callbacks, + ) + else: + loop = asyncio.get_event_loop() + generate_text = partial( + self.generate_text, + prompt=prompt, + n=n, + temperature=temperature, + stop=stop, + callbacks=callbacks, + ) + return await loop.run_in_executor(None, generate_text) + @dataclass class LangchainLLMWrapper(BaseRagasLLM): @@ -75,7 +109,7 @@ class LangchainLLMWrapper(BaseRagasLLM): langchain_llm: BaseLanguageModel - @retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6)) + @retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(15)) def generate_text( self, prompt: PromptValue, @@ -106,7 +140,7 @@ def generate_text( result.generations = generations return result - @retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6)) + @retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(15)) async def agenerate_text( self, prompt: PromptValue, diff --git a/src/ragas/llms/json_load.py b/src/ragas/llms/json_load.py index 8cbf8b866..adfafaaa4 100644 --- a/src/ragas/llms/json_load.py +++ b/src/ragas/llms/json_load.py @@ -1,9 +1,11 @@ from __future__ import annotations +import asyncio import json import logging import typing as t from dataclasses import dataclass +from functools import partial logger = logging.getLogger(__name__) @@ -75,7 +77,7 @@ def load_as_json(text) -> t.Dict: class JsonLoader: max_retries: int = 2 - def safe_load(self, text: str, llm: BaseRagasLLM, callbacks: Callbacks = None): + def _safe_load(self, text: str, llm: BaseRagasLLM, callbacks: Callbacks = None): retry = 0 while retry <= self.max_retries: try: @@ -94,7 +96,7 @@ def safe_load(self, text: str, llm: BaseRagasLLM, callbacks: Callbacks = None): return {} - async def asafe_load( + async def _asafe_load( self, text: str, llm: BaseRagasLLM, callbacks: Callbacks = None ): retry = 0 @@ -115,6 +117,25 @@ async def asafe_load( return {} + async def safe_load( + self, + text: str, + llm: BaseRagasLLM, + callbacks: Callbacks = None, + is_async: bool = True, + ): + if is_async: + return await self._asafe_load(text=text, llm=llm, callbacks=callbacks) + else: + loop = asyncio.get_event_loop() + safe_load = partial( + self._safe_load, text=text, llm=llm, callbacks=callbacks + ) + return await loop.run_in_executor( + None, + safe_load, + ) + def _find_outermost_json(self, text): stack = [] start_index = -1 diff --git a/src/ragas/llms/prompt.py b/src/ragas/llms/prompt.py index 505849c77..2d4eec4b5 100644 --- a/src/ragas/llms/prompt.py +++ b/src/ragas/llms/prompt.py @@ -190,7 +190,7 @@ def adapt( {k: v for k, v in zip(self.input_keys, example[: len(self.input_keys)])} ) example_dict[self.output_key] = ( - json_loader.safe_load(example[-1], llm) + json_loader._safe_load(example[-1], llm) if self.output_type.lower() == "json" else example[-1] ) diff --git a/src/ragas/metrics/_answer_correctness.py b/src/ragas/metrics/_answer_correctness.py index 3aca756d5..d49b65336 100644 --- a/src/ragas/metrics/_answer_correctness.py +++ b/src/ragas/metrics/_answer_correctness.py @@ -139,40 +139,17 @@ def _compute_statement_presence(self, prediction: t.Any) -> float: return score - def _score(self, row: t.Dict, callbacks: Callbacks) -> float: - assert self.llm is not None, "LLM must be set" - q, a, g = row["question"], row["answer"], row["ground_truth"] - p_value = self.correctness_prompt.format(question=q, ground_truth=g, answer=a) - is_statement_present = self.llm.generate_text(p_value, callbacks=callbacks) - - prediction = json_loader.safe_load( - is_statement_present.generations[0][0].text, self.llm - ) - f1_score = self._compute_statement_presence(prediction) - - if self.weights[1] == 0: - similarity_score = 0 - else: - similarity_score = self.answer_similarity.score(row, callbacks=callbacks) # type: ignore - - score = np.average( - [f1_score, similarity_score], - weights=self.weights, - ) - - return float(score) - - async def _ascore(self, row: t.Dict, callbacks: Callbacks) -> float: + async def _ascore(self, row: t.Dict, callbacks: Callbacks, is_async: bool) -> float: assert self.llm is not None, "LLM must be set" q, a, g = row["question"], row["answer"], row["ground_truth"] p_value = self.correctness_prompt.format(question=q, ground_truth=g, answer=a) - is_statement_present = await self.llm.agenerate_text( - p_value, callbacks=callbacks + is_statement_present = await self.llm.generate( + p_value, callbacks=callbacks, is_async=is_async ) - prediction = await json_loader.asafe_load( - is_statement_present.generations[0][0].text, self.llm + prediction = await json_loader.safe_load( + is_statement_present.generations[0][0].text, self.llm, is_async=is_async ) f1_score = self._compute_statement_presence(prediction) @@ -182,7 +159,7 @@ async def _ascore(self, row: t.Dict, callbacks: Callbacks) -> float: assert self.answer_similarity is not None, "AnswerSimilarity must be set" similarity_score = await self.answer_similarity.ascore( - row, callbacks=callbacks + row, callbacks=callbacks, is_async=is_async ) score = np.average( diff --git a/src/ragas/metrics/_answer_relevance.py b/src/ragas/metrics/_answer_relevance.py index 3459530da..7efbde2aa 100644 --- a/src/ragas/metrics/_answer_relevance.py +++ b/src/ragas/metrics/_answer_relevance.py @@ -117,32 +117,18 @@ def _create_question_gen_prompt(self, row: t.Dict) -> PromptValue: ans, ctx = row["answer"], row["contexts"] return self.question_generation.format(answer=ans, context="\n".join(ctx)) - def _score(self: t.Self, row: t.Dict, callbacks: Callbacks) -> float: + async def _ascore(self, row: t.Dict, callbacks: Callbacks, is_async: bool) -> float: assert self.llm is not None, "LLM is not set" prompt = self._create_question_gen_prompt(row) - result = self.llm.generate_text( + result = await self.llm.generate( prompt, n=self.strictness, callbacks=callbacks, + is_async=is_async, ) response = [ - json_loader.safe_load(r.text, self.llm) for r in result.generations[0] - ] - - return self._calculate_score(response, row) - - async def _ascore(self, row: t.Dict, callbacks: Callbacks) -> float: - assert self.llm is not None, "LLM is not set" - - prompt = self._create_question_gen_prompt(row) - result = await self.llm.agenerate_text( - prompt, - n=self.strictness, - callbacks=callbacks, - ) - response = [ - await json_loader.asafe_load(r.text, self.llm) + await json_loader.safe_load(r.text, self.llm, is_async=is_async) for r in result.generations[0] ] diff --git a/src/ragas/metrics/_answer_similarity.py b/src/ragas/metrics/_answer_similarity.py index c389a8919..da952fc6b 100644 --- a/src/ragas/metrics/_answer_similarity.py +++ b/src/ragas/metrics/_answer_similarity.py @@ -56,7 +56,9 @@ def __post_init__(self: t.Self): def init_model(self): super().init_model() - def _score(self, row: t.Dict, callbacks: Callbacks) -> float: + async def _ascore( + self: t.Self, row: t.Dict, callbacks: Callbacks, is_async: bool + ) -> float: assert self.embeddings is not None, "embeddings must be set" ground_truth, answers = row["ground_truth"], row["answer"] @@ -67,37 +69,8 @@ def _score(self, row: t.Dict, callbacks: Callbacks) -> float: "async score [ascore()] not implemented for HuggingFace embeddings" ) else: - embeddings_1 = np.array(self.embeddings.embed_documents(ground_truth)) - embeddings_2 = np.array(self.embeddings.embed_documents(answers)) - similarity = embeddings_1 @ embeddings_2.T - if similarity.size == 1: - # If similarity has only one value, directly use this value as scores - scores = similarity.flatten() - else: - # If similarity contains multiple values, extract the diagonal as scores - scores = np.diagonal(similarity) - - assert isinstance(scores, np.ndarray), "Expects ndarray" - if self.threshold: - scores = scores >= self.threshold - - return scores.tolist()[0] - - async def _ascore(self: t.Self, row: t.Dict, callbacks: Callbacks = []) -> float: - assert self.embeddings is not None, "embeddings must be set" - - ground_truth: t.List[str] = row["ground_truth"] - answer: t.List[str] = row["answer"] - - if self.is_cross_encoder and isinstance(self.embeddings, HuggingfaceEmbeddings): - raise NotImplementedError( - "async score [ascore()] not implemented for HuggingFace embeddings" - ) - else: - embeddings_1 = np.array( - await self.embeddings.aembed_documents(ground_truth) - ) - embeddings_2 = np.array(await self.embeddings.aembed_documents(answer)) + embeddings_1 = np.array(await self.embeddings.embed_texts(ground_truth)) + embeddings_2 = np.array(await self.embeddings.embed_texts(answers)) similarity = embeddings_1 @ embeddings_2.T if similarity.size == 1: scores = similarity.flatten() diff --git a/src/ragas/metrics/_context_precision.py b/src/ragas/metrics/_context_precision.py index a5b723066..c4017c9ad 100644 --- a/src/ragas/metrics/_context_precision.py +++ b/src/ragas/metrics/_context_precision.py @@ -109,42 +109,28 @@ def _calculate_average_precision(self, json_responses: t.List[t.Dict]) -> float: score = numerator / denominator return score - def _score(self, row: t.Dict, callbacks: Callbacks = []) -> float: - assert self.llm is not None, "LLM is not set" - - human_prompts = self._context_precision_prompt(row) - responses: t.List[str] = [] - for hp in human_prompts: - result = self.llm.generate_text( - hp, - n=1, - callbacks=callbacks, - ) - responses.append(result.generations[0][0].text) - - json_responses = [json_loader.safe_load(item, self.llm) for item in responses] - score = self._calculate_average_precision(json_responses) - return score - async def _ascore( self: t.Self, row: t.Dict, - callbacks: Callbacks = [], + callbacks: Callbacks, + is_async: bool, ) -> float: assert self.llm is not None, "LLM is not set" human_prompts = self._context_precision_prompt(row) responses: t.List[str] = [] for hp in human_prompts: - result = await self.llm.agenerate_text( + result = await self.llm.generate( hp, n=1, callbacks=callbacks, + is_async=is_async, ) responses.append(result.generations[0][0].text) json_responses = [ - await json_loader.asafe_load(item, self.llm) for item in responses + await json_loader.safe_load(item, self.llm, is_async=is_async) + for item in responses ] score = self._calculate_average_precision(json_responses) return score diff --git a/src/ragas/metrics/_context_recall.py b/src/ragas/metrics/_context_recall.py index b1185b9e4..c5c227078 100644 --- a/src/ragas/metrics/_context_recall.py +++ b/src/ragas/metrics/_context_recall.py @@ -106,24 +106,15 @@ def _compute_score(self, response: t.Any) -> float: else: return np.nan - def _score(self: t.Self, row: t.Dict, callbacks: Callbacks) -> float: + async def _ascore(self, row: t.Dict, callbacks: Callbacks, is_async: bool) -> float: assert self.llm is not None, "set LLM before use" - result = self.llm.generate_text( + result = await self.llm.generate( self._create_context_recall_prompt(row), callbacks=callbacks ) - response = json_loader.safe_load(result.generations[0][0].text, self.llm) - response = [response] if isinstance(response, dict) else response - - return self._compute_score(response) - - async def _ascore(self, row: t.Dict, callbacks: Callbacks) -> float: - assert self.llm is not None, "set LLM before use" - - result = await self.llm.agenerate_text( - self._create_context_recall_prompt(row), callbacks=callbacks + response = await json_loader.safe_load( + result.generations[0][0].text, self.llm, is_async=is_async ) - response = await json_loader.asafe_load(result.generations[0][0].text, self.llm) return self._compute_score(response) diff --git a/src/ragas/metrics/_context_relevancy.py b/src/ragas/metrics/_context_relevancy.py index de2ebd464..23ee8b518 100644 --- a/src/ragas/metrics/_context_relevancy.py +++ b/src/ragas/metrics/_context_relevancy.py @@ -69,7 +69,7 @@ def _compute_score(self, response: str, row: t.Dict) -> float: else: return min(len(indices) / len(context_sents), 1) - def _score(self, row: t.Dict, callbacks: Callbacks) -> float: + async def _ascore(self, row: t.Dict, callbacks: Callbacks, is_async: bool) -> float: assert self.llm is not None, "LLM is not initialized" if self.show_deprecation_warning: @@ -78,25 +78,7 @@ def _score(self, row: t.Dict, callbacks: Callbacks) -> float: ) question, contexts = row["question"], row["contexts"] - result = self.llm.generate_text( - self.context_relevancy_prompt.format( - question=question, context="\n".join(contexts) - ), - callbacks=callbacks, - ) - - return self._compute_score(result.generations[0][0].text, row) - - async def _ascore(self, row: t.Dict, callbacks: Callbacks) -> float: - assert self.llm is not None, "LLM is not initialized" - - if self.show_deprecation_warning: - logger.warning( - "The 'context_relevancy' metric is going to be deprecated soon! Please use the 'context_precision' metric instead. It is a drop-in replacement just a simple search and replace should work." # noqa - ) - - question, contexts = row["question"], row["contexts"] - result = await self.llm.agenerate_text( + result = await self.llm.generate( self.context_relevancy_prompt.format( question=question, context="\n".join(contexts) ), diff --git a/src/ragas/metrics/_faithfulness.py b/src/ragas/metrics/_faithfulness.py index 36badb920..619a97791 100644 --- a/src/ragas/metrics/_faithfulness.py +++ b/src/ragas/metrics/_faithfulness.py @@ -167,37 +167,32 @@ def _compute_score(self, output: t.Any): return score - async def _ascore(self: t.Self, row: t.Dict, callbacks: Callbacks) -> float: + async def _ascore( + self: t.Self, row: t.Dict, callbacks: Callbacks, is_async: bool + ) -> float: """ returns the NLI score for each (q, c, a) pair """ assert self.llm is not None, "LLM is not set" p = self._create_answer_prompt(row) - answer_result = await self.llm.agenerate_text(p, callbacks=callbacks) - - statements = await json_loader.asafe_load( - answer_result.generations[0][0].text, self.llm + answer_result = await self.llm.generate( + p, callbacks=callbacks, is_async=is_async ) - p = self._create_nli_prompt(row, statements.get("statements", [])) - result = await self.llm.agenerate_text(p, callbacks=callbacks) - - json_output = await json_loader.asafe_load( - result.generations[0][0].text, self.llm + statements = await json_loader.safe_load( + text=answer_result.generations[0][0].text, + llm=self.llm, + callbacks=callbacks, + is_async=is_async, ) - return self._compute_score(json_output) - - def _score(self, row: t.Dict, callbacks: Callbacks) -> float: - assert self.llm is not None, "LLM is not set" - p = self._create_answer_prompt(row) - answer_result = self.llm.generate_text(p, callbacks=callbacks) - statements = json_loader.safe_load( - answer_result.generations[0][0].text, self.llm - ) p = self._create_nli_prompt(row, statements.get("statements", [])) - result = self.llm.generate_text(p, callbacks=callbacks) - - json_output = json_loader.safe_load(result.generations[0][0].text, self.llm) + nli_result = await self.llm.generate(p, callbacks=callbacks, is_async=is_async) + json_output = await json_loader.safe_load( + text=nli_result.generations[0][0].text, + llm=self.llm, + callbacks=callbacks, + is_async=is_async, + ) return self._compute_score(json_output) def adapt(self, language: str, cache_dir: t.Optional[str] = None) -> None: diff --git a/src/ragas/metrics/base.py b/src/ragas/metrics/base.py index 59a76c512..69b88e06b 100644 --- a/src/ragas/metrics/base.py +++ b/src/ragas/metrics/base.py @@ -6,6 +6,7 @@ """ from __future__ import annotations +import asyncio import typing as t from abc import ABC, abstractmethod from dataclasses import dataclass @@ -69,7 +70,9 @@ def score( self.name, inputs=row, callbacks=callbacks, is_async=False ) try: - score = self._score(row=row, callbacks=group_cm) + score = asyncio.run( + self._ascore(row=row, callbacks=group_cm, is_async=False) + ) except Exception as e: if not group_cm.ended: rm.on_chain_error(e) @@ -79,16 +82,14 @@ def score( rm.on_chain_end({"output": score}) return score - @abstractmethod - def _score(self, row: t.Dict, callbacks: Callbacks) -> float: - ... - - async def ascore(self: t.Self, row: t.Dict, callbacks: Callbacks = []) -> float: + async def ascore( + self: t.Self, row: t.Dict, callbacks: Callbacks = [], is_async: bool = True + ) -> float: rm, group_cm = new_group( self.name, inputs=row, callbacks=callbacks, is_async=True ) try: - score = await self._ascore(row=row, callbacks=group_cm) + score = await self._ascore(row=row, callbacks=group_cm, is_async=is_async) except Exception as e: if not group_cm.ended: rm.on_chain_error(e) @@ -99,7 +100,7 @@ async def ascore(self: t.Self, row: t.Dict, callbacks: Callbacks = []) -> float: return score @abstractmethod - async def _ascore(self, row: t.Dict, callbacks: Callbacks) -> float: + async def _ascore(self, row: t.Dict, callbacks: Callbacks, is_async: bool) -> float: ... diff --git a/src/ragas/metrics/critique.py b/src/ragas/metrics/critique.py index 15c9ed522..7bfe5bb54 100644 --- a/src/ragas/metrics/critique.py +++ b/src/ragas/metrics/critique.py @@ -112,32 +112,21 @@ def _compute_score(self, safe_loaded_responses): return score - def _score(self: t.Self, row: t.Dict, callbacks: Callbacks) -> float: + async def _ascore( + self: t.Self, row: t.Dict, callbacks: Callbacks, is_async: bool + ) -> float: assert self.llm is not None, "set LLM before use" q, c, a = row["question"], row["contexts"], row["answer"] - result = self.llm.generate_text( - self.prompt_format(q, a, c), callbacks=callbacks - ) - - responses = [r.text for r in result.generations[0]] - safe_loaded_responses = [json_loader.safe_load(r, self.llm) for r in responses] - - return self._compute_score(safe_loaded_responses) - - async def _ascore(self: t.Self, row: t.Dict, callbacks: Callbacks) -> float: - assert self.llm is not None, "set LLM before use" - - q, c, a = row["question"], row["contexts"], row["answer"] - - result = await self.llm.agenerate_text( - self.prompt_format(q, a, c), callbacks=callbacks + result = await self.llm.generate( + self.prompt_format(q, a, c), callbacks=callbacks, is_async=is_async ) responses = [r.text for r in result.generations[0]] safe_loaded_responses = [ - await json_loader.asafe_load(r, self.llm) for r in responses + await json_loader.safe_load(r, self.llm, is_async=is_async) + for r in responses ] return self._compute_score(safe_loaded_responses) diff --git a/src/ragas/testset/evolutions.py b/src/ragas/testset/evolutions.py index 756b59f86..4b284c5d6 100644 --- a/src/ragas/testset/evolutions.py +++ b/src/ragas/testset/evolutions.py @@ -5,7 +5,6 @@ from abc import abstractmethod from dataclasses import dataclass, field -from fsspec.exceptions import asyncio from langchain_core.pydantic_v1 import BaseModel from numpy.random import default_rng @@ -52,6 +51,7 @@ class Evolution: node_filter: t.Optional[NodeFilter] = None question_filter: t.Optional[QuestionFilter] = None max_tries: int = 5 + is_async: bool = True @staticmethod def merge_nodes(nodes: CurrentNodes) -> Node: @@ -59,11 +59,14 @@ def merge_nodes(nodes: CurrentNodes) -> Node: doc_id="merged", page_content="\n".join(n.page_content for n in nodes.nodes) ) - def init_evolution(self): - ... + def init_evolution(self, is_async: bool = True): + self.is_async = is_async async def aretry_evolve( - self, current_tries: int, current_nodes: CurrentNodes, update_count: bool = True + self, + current_tries: int, + current_nodes: CurrentNodes, + update_count: bool = True, ) -> EvolutionOutput: if update_count: current_tries += 1 @@ -73,11 +76,11 @@ async def aretry_evolve( raise ValueError("Max tries reached") return await self._aevolve(current_tries, current_nodes) - def _transform_question(self, prompt: Prompt, question: str) -> str: + async def _transform_question(self, prompt: Prompt, question: str) -> str: assert self.generator_llm is not None, "generator_llm cannot be None" - results = self.generator_llm.generate_text( - prompt=prompt.format(question=question) + results = await self.generator_llm.generate( + prompt=prompt.format(question=question), is_async=self.is_async ) return results.generations[0][0].text.strip() @@ -109,9 +112,6 @@ def _get_more_adjacent_nodes(self, current_nodes: CurrentNodes): return current_nodes - def evolve(self, current_nodes: CurrentNodes) -> DataRow: - return asyncio.get_event_loop().run_until_complete(self.aevolve(current_nodes)) - async def aevolve(self, current_nodes: CurrentNodes) -> DataRow: # init tries with 0 when first called current_tries = 0 @@ -121,7 +121,7 @@ async def aevolve(self, current_nodes: CurrentNodes) -> DataRow: evolution_type, ) = await self._aevolve(current_tries, current_nodes) - return self.generate_datarow( + return await self.generate_datarow( question=evolved_question, current_nodes=current_nodes, evolution_type=evolution_type, @@ -133,7 +133,7 @@ async def _aevolve( ) -> EvolutionOutput: ... - def generate_datarow( + async def generate_datarow( self, question: str, current_nodes: CurrentNodes, @@ -144,14 +144,17 @@ def generate_datarow( node_content = [ f"{i}\t{n.page_content}" for i, n in enumerate(current_nodes.nodes) ] - results = self.generator_llm.generate_text( + results = await self.generator_llm.generate( prompt=find_relevent_context_prompt.format( question=question, contexts=node_content ) ) - relevant_context_indices = json_loader.safe_load( + relevent_contexts_result = await json_loader.safe_load( results.generations[0][0].text.strip(), llm=self.generator_llm - ).get("relevant_context", None) + ) + relevant_context_indices = relevent_contexts_result.get( + "relevant_context", None + ) if relevant_context_indices is None: relevant_context = CurrentNodes( root_node=current_nodes.root_node, nodes=current_nodes.nodes @@ -160,7 +163,7 @@ def generate_datarow( relevant_context = current_nodes merged_nodes = self.merge_nodes(relevant_context) - results = self.generator_llm.generate_text( + results = await self.generator_llm.generate( prompt=question_answer_prompt.format( question=question, context=merged_nodes.page_content ) @@ -190,7 +193,7 @@ async def _aevolve( assert self.question_filter is not None, "question_filter cannot be None" merged_node = self.merge_nodes(current_nodes) - passed = await self.node_filter.afilter(current_nodes.root_node) + passed = await self.node_filter.filter(current_nodes.root_node) if not passed["score"]: nodes = self.docstore.get_random_nodes(k=1) new_current_nodes = CurrentNodes(root_node=nodes[0], nodes=nodes) @@ -198,13 +201,13 @@ async def _aevolve( current_tries, new_current_nodes, update_count=False ) - results = self.generator_llm.generate_text( + results = await self.generator_llm.generate( prompt=seed_question_prompt.format(context=merged_node.page_content) ) seed_question = results.generations[0][0].text # NOTE: might need improvement # select only one seed question here - is_valid_question = await self.question_filter.afilter(seed_question) + is_valid_question = await self.question_filter.filter(seed_question) if not is_valid_question: # get more context to rewrite question current_nodes = self._get_more_adjacent_nodes(current_nodes) @@ -223,7 +226,9 @@ class ComplexEvolution(Evolution): se: t.Optional[SimpleEvolution] = field(default=None, repr=False) evolution_filter: t.Optional[EvolutionFilter] = field(default=None, repr=False) - def init_evolution(self): + def init_evolution(self, is_async: bool = True): + super().init_evolution(is_async=is_async) + # init simple evolution to get seed question self.se = SimpleEvolution( generator_llm=self.generator_llm, @@ -249,7 +254,7 @@ async def _acomplex_evolution( simple_question, ) - result = await self.generator_llm.agenerate_text( + result = await self.generator_llm.generate( prompt=question_prompt.format( question=simple_question, context=current_nodes.root_node.page_content ) @@ -257,7 +262,7 @@ async def _acomplex_evolution( reasoning_question = result.generations[0][0].text.strip() # compress the question - compressed_question = self._transform_question( + compressed_question = await self._transform_question( prompt=compress_question_prompt, question=reasoning_question ) logger.debug( @@ -266,15 +271,13 @@ async def _acomplex_evolution( reasoning_question, ) - if not await self.question_filter.afilter(compressed_question): + if not await self.question_filter.filter(compressed_question): # retry current_nodes = self.se._get_more_adjacent_nodes(current_nodes) return await self.aretry_evolve(current_tries, current_nodes) assert self.evolution_filter is not None, "evolution filter cannot be None" - if not await self.evolution_filter.afilter( - simple_question, compressed_question - ): + if not await self.evolution_filter.filter(simple_question, compressed_question): # retry current_nodes = self.se._get_more_adjacent_nodes(current_nodes) logger.debug( @@ -307,29 +310,27 @@ async def _aevolve( context1=current_nodes.root_node.page_content, context2=similar_node, ) - results = await self.generator_llm.agenerate_text(prompt=prompt) + results = await self.generator_llm.generate(prompt=prompt) question = results.generations[0][0].text.strip() logger.debug( "[MultiContextEvolution] multicontext question generated: %s", question ) # compress the question - compressed_question = self._transform_question( + compressed_question = await self._transform_question( prompt=compress_question_prompt, question=question ) logger.debug( "[MultiContextEvolution] multicontext question compressed: %s", question ) - if not await self.question_filter.afilter(compressed_question): + if not await self.question_filter.filter(compressed_question): # retry current_nodes = self.se._get_more_adjacent_nodes(current_nodes) return await self.aretry_evolve(current_tries, current_nodes) assert self.evolution_filter is not None, "evolution filter cannot be None" - if not await self.evolution_filter.afilter( - simple_question, compressed_question - ): + if not await self.evolution_filter.filter(simple_question, compressed_question): # retry current_nodes = self.se._get_more_adjacent_nodes(current_nodes) return await self.aretry_evolve(current_tries, current_nodes) diff --git a/src/ragas/testset/filters.py b/src/ragas/testset/filters.py index 79d19bd47..e261a62d7 100644 --- a/src/ragas/testset/filters.py +++ b/src/ragas/testset/filters.py @@ -1,6 +1,5 @@ from __future__ import annotations -import asyncio import logging import typing as t from abc import ABC @@ -31,10 +30,7 @@ class NodeFilter(Filter): llm: BaseRagasLLM threshold: float = 7.5 - def filter(self, node: Node) -> t.Dict: - return asyncio.get_event_loop().run_until_complete(self.afilter(node)) - - async def afilter(self, node: Node) -> t.Dict: + async def filter(self, node: Node) -> t.Dict: prompt = context_scoring_prompt.format(context=node.page_content) results = await self.llm.agenerate_text(prompt=prompt) output = results.generations[0][0].text.strip() @@ -47,10 +43,7 @@ async def afilter(self, node: Node) -> t.Dict: class QuestionFilter(Filter): llm: BaseRagasLLM - def filter(self, question: str) -> bool: - return asyncio.get_event_loop().run_until_complete(self.afilter(question)) - - async def afilter(self, question: str) -> bool: + async def filter(self, question: str) -> bool: prompt = filter_question_prompt.format(question=question) results = await self.llm.agenerate_text(prompt=prompt) results = results.generations[0][0].text.strip() @@ -63,12 +56,7 @@ async def afilter(self, question: str) -> bool: class EvolutionFilter(Filter): llm: BaseRagasLLM - def filter(self, simple_question: str, compressed_question: str) -> bool: - return asyncio.get_event_loop().run_until_complete( - self.afilter(simple_question, compressed_question) - ) - - async def afilter(self, simple_question: str, compressed_question: str) -> bool: + async def filter(self, simple_question: str, compressed_question: str) -> bool: prompt = evolution_elimination_prompt.format( question1=simple_question, question2=compressed_question ) diff --git a/src/ragas/testset/generator.py b/src/ragas/testset/generator.py index d9f449f40..cec5e7241 100644 --- a/src/ragas/testset/generator.py +++ b/src/ragas/testset/generator.py @@ -10,7 +10,7 @@ from langchain_openai.embeddings import OpenAIEmbeddings from ragas._analytics import TesetGenerationEvent, track -from ragas.embeddings import BaseRagasEmbeddings +from ragas.embeddings.base import BaseRagasEmbeddings, LangchainEmbeddingsWrapper from ragas.executor import Executor from ragas.llms import BaseRagasLLM, LangchainLLMWrapper from ragas.testset.docstore import Document, DocumentStore, InMemoryDocumentStore @@ -75,7 +75,9 @@ def with_openai( ) -> "TestsetGenerator": generator_llm_model = LangchainLLMWrapper(ChatOpenAI(model=generator_llm)) critic_llm_model = LangchainLLMWrapper(ChatOpenAI(model=critic_llm)) - embeddings_model = OpenAIEmbeddings(model=embeddings) + embeddings_model = LangchainEmbeddingsWrapper( + OpenAIEmbeddings(model=embeddings) + ) if docstore is None: from langchain.text_splitter import TokenTextSplitter @@ -105,6 +107,7 @@ def generate_with_llamaindex_docs( test_size: int, distributions: Distributions = {}, with_debugging_logs=False, + is_async: bool = True, ): # chunk documents and add to docstore self.docstore.add_documents( @@ -115,6 +118,7 @@ def generate_with_llamaindex_docs( test_size=test_size, distributions=distributions, with_debugging_logs=with_debugging_logs, + is_async=is_async, ) # if you add any arguments to this function, make sure to add them to @@ -125,6 +129,7 @@ def generate_with_langchain_docs( test_size: int, distributions: Distributions = {}, with_debugging_logs=False, + is_async: bool = True, ): # chunk documents and add to docstore self.docstore.add_documents( @@ -135,6 +140,7 @@ def generate_with_langchain_docs( test_size=test_size, distributions=distributions, with_debugging_logs=with_debugging_logs, + is_async=is_async, ) def generate( @@ -142,6 +148,7 @@ def generate( test_size: int, distributions: Distributions = DEFAULT_DISTRIBUTION, with_debugging_logs=False, + is_async: bool = True, ): # init filters and evolutions for evolution in distributions: @@ -159,7 +166,7 @@ def generate( if evolution.evolution_filter is None: evolution.evolution_filter = EvolutionFilter(llm=self.critic_llm) - evolution.init_evolution() + evolution.init_evolution(is_async=is_async) if with_debugging_logs: from ragas.utils import patch_logger diff --git a/tests/benchmarks/benchmark_eval.py b/tests/benchmarks/benchmark_eval.py index 03d2348f3..e5ed0bb94 100644 --- a/tests/benchmarks/benchmark_eval.py +++ b/tests/benchmarks/benchmark_eval.py @@ -1,4 +1,3 @@ -import os import time from datasets import DatasetDict, load_dataset @@ -34,7 +33,7 @@ answer_similarity, ] -os.environ["PYTHONASYNCIODEBUG"] = "1" +# os.environ["PYTHONASYNCIODEBUG"] = "1" IGNORE_THREADS = False IGNORE_ASYNCIO = False diff --git a/tests/benchmarks/benchmark_testsetgen.py b/tests/benchmarks/benchmark_testsetgen.py index 4aa802e79..ced5f0b2e 100644 --- a/tests/benchmarks/benchmark_testsetgen.py +++ b/tests/benchmarks/benchmark_testsetgen.py @@ -22,22 +22,33 @@ def get_documents(): return documents -IGNORE_THREADS = True +IGNORE_THREADS = False IGNORE_ASYNCIO = False +# os.environ["PYTHONASYNCIODEBUG"] = "1" if __name__ == "__main__": documents = get_documents() # asyncio if not IGNORE_ASYNCIO: - os.environ["PYTHONASYNCIODEBUG"] = "1" print("Starting [Asyncio]") start = time.time() generator.generate_with_llamaindex_docs( - documents=documents, test_size=100, distributions=distributions + documents=documents, + test_size=50, + distributions=distributions, + is_async=True, ) print(f"Time taken: {time.time() - start:.2f}s") # Threads if not IGNORE_THREADS: print("Starting [Threads]") + start = time.time() + generator.generate_with_llamaindex_docs( + documents=documents, + test_size=50, + distributions=distributions, + is_async=False, + ) + print(f"Time taken [Threads]: {time.time() - start:.2f}s")