diff --git a/src/fmcore/prompt_tuner/dspy/dspy_prompt_tuner.py b/src/fmcore/prompt_tuner/dspy/dspy_prompt_tuner.py index 8fd844d..adbb549 100644 --- a/src/fmcore/prompt_tuner/dspy/dspy_prompt_tuner.py +++ b/src/fmcore/prompt_tuner/dspy/dspy_prompt_tuner.py @@ -1,3 +1,10 @@ +import os +import tempfile + +# Set the DSP_CACHEDIR environment variable to the system's default temporary directory +os.environ["DSP_CACHEDIR"] = tempfile.gettempdir() +os.environ["DSPY_CACHEDIR"] = tempfile.gettempdir() + import dspy import pandas as pd from typing import Dict, List diff --git a/src/fmcore/prompt_tuner/dspy/lm_adapters/dspy_adapter.py b/src/fmcore/prompt_tuner/dspy/lm_adapters/dspy_adapter.py index 52fc052..e998164 100644 --- a/src/fmcore/prompt_tuner/dspy/lm_adapters/dspy_adapter.py +++ b/src/fmcore/prompt_tuner/dspy/lm_adapters/dspy_adapter.py @@ -4,6 +4,8 @@ from fmcore.llm.types.llm_types import LLMConfig from langchain_core.messages import BaseMessage +from fmcore.utils.async_utils import AsyncUtils + class DSPyLLMAdapter(dspy.LM): """ @@ -65,7 +67,8 @@ def __call__( if prompt: messages = [{"role": "user", "content": prompt}] - response = self.llm.invoke(messages) + # We are using this hack because dspy doesn't support async + response = AsyncUtils.execute(self.llm.ainvoke(messages)) result = [response.content] # Update history with DSPy constructs, which currently support only dictionaries diff --git a/src/fmcore/prompt_tuner/dspy/utils/dspy_utils.py b/src/fmcore/prompt_tuner/dspy/utils/dspy_utils.py index 4e499e7..51cce9f 100644 --- a/src/fmcore/prompt_tuner/dspy/utils/dspy_utils.py +++ b/src/fmcore/prompt_tuner/dspy/utils/dspy_utils.py @@ -9,6 +9,8 @@ from fmcore.prompt_tuner.evaluator.types.evaluator_types import EvaluatorConfig from fmcore.prompt_tuner.types.prompt_tuner_types import PromptConfig, PromptEvaluationResult from fmcore.types.enums.dataset_enums import DatasetType +from fmcore.utils.async_utils import AsyncUtils +from fmcore.utils.logging_utils import Log class DSPyUtils: @@ -166,7 +168,15 @@ def evaluate_func(example: dspy.Example, prediction: dspy.Prediction, trace=None "output": prediction.toDict(), } - return evaluator.evaluate(data=row) + try: + # We are using this hack because dspy doesn't support async + decision = AsyncUtils.execute(evaluator.aevaluate(data=row)) + except Exception as e: + # Defaulting to false incase of failures + Log.info(f"Error {e} during evaluating {row}") + decision = False + + return decision return evaluate_func diff --git a/src/fmcore/prompt_tuner/evaluator/llm_as_a_judge_boolean/llm_as_a_judge_boolean_evaluator.py b/src/fmcore/prompt_tuner/evaluator/llm_as_a_judge_boolean/llm_as_a_judge_boolean_evaluator.py index 8ed7053..9c98fed 100644 --- a/src/fmcore/prompt_tuner/evaluator/llm_as_a_judge_boolean/llm_as_a_judge_boolean_evaluator.py +++ b/src/fmcore/prompt_tuner/evaluator/llm_as_a_judge_boolean/llm_as_a_judge_boolean_evaluator.py @@ -98,7 +98,7 @@ async def aevaluate(self, data: Dict) -> bool: BooleanLLMJudgeOutput: Evaluation result as a boolean decision. """ # Format the context into messages using the template - formatted_message: BaseMessage = await self.text_prompt_mapper.amap(data.context) + formatted_message: BaseMessage = await self.text_prompt_mapper.amap(data) llm_response: BaseMessage = await self.llm_inference_mapper.amap([formatted_message]) json_response: Dict = await self.json_mapper.amap(llm_response.content) decision: bool = await self.criteria_checker.amap(json_response) diff --git a/src/fmcore/utils/async_utils.py b/src/fmcore/utils/async_utils.py new file mode 100644 index 0000000..15f1b33 --- /dev/null +++ b/src/fmcore/utils/async_utils.py @@ -0,0 +1,18 @@ +import asyncio +from concurrent.futures import ThreadPoolExecutor + +import asyncio +from concurrent.futures import ThreadPoolExecutor + + +class AsyncUtils: + _executor = ThreadPoolExecutor() + + @staticmethod + def execute(coro): + """ + Executes an async coroutine in a thread pool executor. + - No event loop interaction: Just submit coroutines to the thread pool. + - Always uses ThreadPoolExecutor to run async functions. + """ + return AsyncUtils._executor.submit(lambda: asyncio.run(coro)).result()