Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/fmcore/prompt_tuner/dspy/dspy_prompt_tuner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
import os
import tempfile

# Set the DSP_CACHEDIR environment variable to the system's default temporary directory
os.environ["DSP_CACHEDIR"] = tempfile.gettempdir()
os.environ["DSPY_CACHEDIR"] = tempfile.gettempdir()

import dspy
import pandas as pd
from typing import Dict, List
Expand Down
5 changes: 4 additions & 1 deletion src/fmcore/prompt_tuner/dspy/lm_adapters/dspy_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from fmcore.llm.types.llm_types import LLMConfig
from langchain_core.messages import BaseMessage

from fmcore.utils.async_utils import AsyncUtils


class DSPyLLMAdapter(dspy.LM):
"""
Expand Down Expand Up @@ -65,7 +67,8 @@ def __call__(
if prompt:
messages = [{"role": "user", "content": prompt}]

response = self.llm.invoke(messages)
# We are using this hack because dspy doesn't support async
response = AsyncUtils.execute(self.llm.ainvoke(messages))
result = [response.content]

# Update history with DSPy constructs, which currently support only dictionaries
Expand Down
12 changes: 11 additions & 1 deletion src/fmcore/prompt_tuner/dspy/utils/dspy_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from fmcore.prompt_tuner.evaluator.types.evaluator_types import EvaluatorConfig
from fmcore.prompt_tuner.types.prompt_tuner_types import PromptConfig, PromptEvaluationResult
from fmcore.types.enums.dataset_enums import DatasetType
from fmcore.utils.async_utils import AsyncUtils
from fmcore.utils.logging_utils import Log


class DSPyUtils:
Expand Down Expand Up @@ -166,7 +168,15 @@ def evaluate_func(example: dspy.Example, prediction: dspy.Prediction, trace=None
"output": prediction.toDict(),
}

return evaluator.evaluate(data=row)
try:
# We are using this hack because dspy doesn't support async
decision = AsyncUtils.execute(evaluator.aevaluate(data=row))
except Exception as e:
# Defaulting to false incase of failures
Log.info(f"Error {e} during evaluating {row}")
decision = False

return decision

return evaluate_func

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async def aevaluate(self, data: Dict) -> bool:
BooleanLLMJudgeOutput: Evaluation result as a boolean decision.
"""
# Format the context into messages using the template
formatted_message: BaseMessage = await self.text_prompt_mapper.amap(data.context)
formatted_message: BaseMessage = await self.text_prompt_mapper.amap(data)
llm_response: BaseMessage = await self.llm_inference_mapper.amap([formatted_message])
json_response: Dict = await self.json_mapper.amap(llm_response.content)
decision: bool = await self.criteria_checker.amap(json_response)
Expand Down
18 changes: 18 additions & 0 deletions src/fmcore/utils/async_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor

import asyncio
from concurrent.futures import ThreadPoolExecutor


class AsyncUtils:
_executor = ThreadPoolExecutor()

@staticmethod
def execute(coro):
"""
Executes an async coroutine in a thread pool executor.
- No event loop interaction: Just submit coroutines to the thread pool.
- Always uses ThreadPoolExecutor to run async functions.
"""
return AsyncUtils._executor.submit(lambda: asyncio.run(coro)).result()