v0.9.0
Async support for pipeline wrappers
Now you can implement run_api_async and run_chat_completion_async methods in your pipeline wrapper.
You can now also make use of async_streaming_generator if you need to stream pipeline output to from async methods.
For example:
from pathlib import Path
from typing import AsyncGenerator, List
from haystack import AsyncPipeline
from haystack.dataclasses import ChatMessage
from hayhooks import (
get_last_user_message,
BasePipelineWrapper,
async_streaming_generator,
)
SYSTEM_MESSAGE = "You are a helpful assistant that can answer questions about the world."
class PipelineWrapper(BasePipelineWrapper):
def setup(self) -> None:
pipeline_yaml = (Path(__file__).parent / "question_answer.yml").read_text()
self.pipeline = AsyncPipeline.loads(pipeline_yaml)
async def run_api_async(self, question: str) -> str:
result = await self.pipeline.run_async(
{
"prompt_builder": {
"template": [
ChatMessage.from_system(SYSTEM_MESSAGE),
ChatMessage.from_user(question),
]
}
}
)
return result["llm"]["replies"][0].text
async def run_chat_completion_async(self, model: str, messages: List[dict], body: dict) -> AsyncGenerator:
question = get_last_user_message(messages)
return async_streaming_generator(
pipeline=self.pipeline,
pipeline_run_args={
"prompt_builder": {
"template": [
ChatMessage.from_system(SYSTEM_MESSAGE),
ChatMessage.from_user(question),
]
},
},
)Full example is available here.
What's Changed
- Better example video resolution on Hayhooks Core MCP Tools docs by @mpangrazzi in #121
- Add support for async pipeline wrapper methods by @mpangrazzi in #122
Full Changelog: v0.8.0...v0.9.0