Skip to content

Commit

Permalink
Updates the API for streaming sync mode to match streaming async
Browse files Browse the repository at this point in the history
We can do this as we're still in 0.X.Y, but we'll be adding lots of
documentation. Overall the structure is the same largely. We just:

1. yield intermediate_result, None
2. yield final_result, state_update

This leaves open the possibility for multiple type (2) outputs.

Note this is a lot -- some of the changes:

1. Unify the class structure for streaming/async -- switch on a few
   method definitions
2. Add a peek-ahead (in case we're at the end), only for the multi-step
   streaming (as we won't know). Nobody uses this (yet), and we have a
   chance to indicate by adding a `end` token at some point.
3. Adds quite a few more unit tests/updates them
  • Loading branch information
elijahbenizzy committed May 21, 2024
1 parent da1ba76 commit df8f6c8
Show file tree
Hide file tree
Showing 8 changed files with 609 additions and 348 deletions.
381 changes: 190 additions & 191 deletions burr/core/action.py

Large diffs are not rendered by default.

181 changes: 157 additions & 24 deletions burr/core/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
Reducer,
SingleStepAction,
SingleStepStreamingAction,
SingleStepStreamingActionAsync,
StreamingAction,
StreamingResultContainer,
create_action,
Expand Down Expand Up @@ -230,17 +229,26 @@ def _run_single_step_action(

def _run_single_step_streaming_action(
action: SingleStepStreamingAction, state: State, inputs: Optional[Dict[str, Any]]
) -> Generator[dict, None, Tuple[dict, State]]:
) -> Generator[Tuple[dict, Optional[State]], None, None]:
action.validate_inputs(inputs)
generator = action.stream_run_and_update(state, **inputs)
result, state = yield from generator
result = None
state_update = None
for result, state_update in generator:
if state_update is None:
yield result, None
if state_update is None:
raise ValueError(
f"Action {action.name} did not return a state update. For streaming actions, the last yield "
f"statement must be a tuple of (result, state_update). For example, yield dict(foo='bar'), state.update(foo='bar')"
)
_validate_result(result, action.name)
_validate_reducer_writes(action, state, action.name)
return result, state
_validate_reducer_writes(action, state_update, action.name)
yield result, state_update


async def _run_single_step_streaming_action_async(
action: SingleStepStreamingActionAsync, state: State, inputs: Optional[Dict[str, Any]]
async def _arun_single_step_streaming_action(
action: SingleStepStreamingAction, state: State, inputs: Optional[Dict[str, Any]]
) -> AsyncGenerator[Tuple[dict, Optional[State]], None]:
action.validate_inputs(inputs)
generator = action.stream_run_and_update(state, **inputs)
Expand Down Expand Up @@ -270,28 +278,54 @@ async def _run_single_step_streaming_action_async(

def _run_multi_step_streaming_action(
action: StreamingAction, state: State, inputs: Optional[Dict[str, Any]]
) -> Generator[dict, None, Tuple[dict, State]]:
) -> Generator[Tuple[dict, Optional[State]], None, None]:
action.validate_inputs(inputs)
generator = action.stream_run(state, **inputs)
result = yield from generator
result = None
for item in generator:
# We want to peek ahead so we can return the last one
# This is slightly eager, but only in the case in which we
# are using a multi-step streaming action
next_result = result
result = item
if next_result is not None:
yield next_result, None
# yield item, None
state_update = _run_reducer(action, state, result, action.name)
_validate_result(result, action.name)
new_state = _run_reducer(action, state, result, action.name)
return result, _state_update(state, new_state)
_validate_reducer_writes(action, state_update, action.name)
yield result, state_update


async def _run_multi_step_streaming_action_async(
async def _arun_multi_step_streaming_action(
action: AsyncStreamingAction, state: State, inputs: Optional[Dict[str, Any]]
) -> AsyncGenerator[Tuple[dict, Optional[State]], None]:
action.validate_inputs(inputs)
generator = action.stream_run(state, **inputs)
result = None
async for item in generator:
# We want to peek ahead so we can return the last one
# This is slightly eager, but only in the case in which we
# are using a multi-step streaming action
next_result = result
result = item
yield item, None
if next_result is not None:
yield next_result, None
# yield item, None
state_update = _run_reducer(action, state, result, action.name)
_validate_result(result, action.name)
_validate_reducer_writes(action, state_update, action.name)
yield result, state_update
# action.validate_inputs(inputs)
# generator = action.stream_run(state, **inputs)
# result = None
# async for item in generator:
# result = item
# yield item, None
# state_update = _run_reducer(action, state, result, action.name)
# _validate_result(result, action.name)
# _validate_reducer_writes(action, state_update, action.name)
# yield result, state_update


async def _arun_single_step_action(
Expand Down Expand Up @@ -793,7 +827,8 @@ def stream_result(
:param halt_before: The list of actions to halt before execution of. It will halt on the first one. Note that
if this is met, the streaming result container will be empty (and return None) for the result, having an empty generator.
:param inputs: Inputs to the action -- this is if this action requires an input that is passed in from the outside world
:return: A streaming result container, which is a generator that will yield results as they come in, as wel as cache/give you the final result, and update state accordingly.
:return: A streaming result container, which is a generator that will yield results as they come in, as well as cache/give you the final result,
and update state accordingly.
This is meant to be used with streaming actions -- :py:meth:`streaming_action <burr.core.action.streaming_action>`
or :py:class:`StreamingAction <burr.core.action.StreamingAction>` It returns a
Expand Down Expand Up @@ -836,10 +871,10 @@ def streaming_response(state: State, prompt: str) -> Generator[dict, None, Tuple
delta = chunk.choices[0].delta.content
buffer.append(delta)
# yield partial results
yield {'response': delta}
yield {'response': delta}, None # indicate that we are not done by returning a `None` state!
full_response = ''.join(buffer)
# return the final result
return {'response': full_response}, state.update(response=full_response)
yield {'response': full_response}, state.update(response=full_response)
To use streaming_result, you pass in names of streaming actions (such as the one above) to the halt_after
parameter:
Expand Down Expand Up @@ -1001,7 +1036,109 @@ async def astream_result(
halt_before: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> Tuple[Action, AsyncStreamingResultContainer]:
"""Placeholder for the async version of stream_result. This is not yet implemented."""
"""Streams a result out in an asynchronous manner.
:param halt_after: The list of actions to halt after execution of. It will halt on the first one.
:param halt_before: The list of actions to halt before execution of. It will halt on the first one. Note that
if this is met, the streaming result container will be empty (and return None) for the result, having an empty generator.
:param inputs: Inputs to the action -- this is if this action requires an input that is passed in from the outside world
:return: An asynchronous :py:class:`AsyncStreamingResultContainer <burr.core.action.AsyncStreamingResultContainer>`, which is a generator that will yield results as they come in, as well as cache/give you the final result,
and update state accordingly.
This is meant to be used with streaming actions -- :py:meth:`streaming_action <burr.core.action.streaming_action>`
or :py:class:`StreamingAction <burr.core.action.StreamingAction>` It returns a
:py:class:`StreamingResultContainer <burr.core.action.StreamingResultContainer>`, which has two capabilities:
1. It is a generator that streams out the intermediate results of the action
2. It has an async ``.get()`` method that returns the final result of the action, and the final state.
If ``.get()`` is called before the generator is exhausted, it will block until the generator is exhausted.
While this container is meant to work with streaming actions, it can also be used with non-streaming actions. In this case,
the generator will be empty, and the ``.get()`` method will return the final result and state.
The rules for halt_before and halt_after are the same as for :py:meth:`iterate <burr.core.application.Application.iterate>`,
and :py:meth:`run <burr.core.application.Application.run>`. In this case, `halt_before` will indicate a *non* streaming action,
which will be empty. Thus ``halt_after`` takes precedence -- if it is met, the streaming result container will contain the result of the
halt_after condition.
The :py:class:`AsyncStreamingResultContainer <burr.core.action.StreamingResultContainer>` is meant as a convenience -- specifically this allows for
hooks, callbacks, etc... so you can take the control flow and still have state updated afterwards. Hooks/state update will be called after an exception
is thrown during streaming, or the stream is completed. Note that it is undefined behavior to attempt to execute another action while a stream is in progress.
To see how this works, let's take the following action (simplified as a single-node workflow) as an example:
.. code-block:: python
client = openai.AsyncClient()
@streaming_action(reads=[], writes=['response'])
async def streaming_response(state: State, prompt: str) -> Generator[dict, None, Tuple[dict, State]]:
response = client.chat.completions.create(
model='gpt-3.5-turbo',
messages=[{
'role': 'user',
'content': prompt
}],
temperature=0,
)
buffer = []
async for chunk in response: # use an async for loop
delta = chunk.choices[0].delta.content
buffer.append(delta)
# yield partial results
yield {'response': delta}, None # indicate that we are not done by returning a `None` state!
# make sure to join with the buffer!
full_response = ''.join(buffer)
# yield the final result at the end + the state update
yield {'response': full_response}, state.update(response=full_response)
To use streaming_result, you pass in names of streaming actions (such as the one above) to the halt_after
parameter:
.. code-block:: python
application = ApplicationBuilder().with_actions(streaming_response=streaming_response)...build()
prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..."
action, streaming_result = application.astream_result(halt_after='streaming_response', inputs={"prompt": prompt})
async for result in streaming_result:
print(result['response']) # one by one
result, state = await streaming_result.get()
print(result['response']) # all at once
Note that if you have multiple halt_after conditions, you can use the ``.action`` attribute to get the action that
was run.
.. code-block:: python
application = ApplicationBuilder().with_actions(
streaming_response=streaming_response,
error=error # another function that outputs an error, streaming
)...build()
prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..."
action, streaming_result = await application.astream_result(halt_after='streaming_response', inputs={"prompt": prompt})
color = "red" if action.name == "error" else "green"
for result in streaming_result:
print(format(result['response'], color)) # assumes that error and streaming_response both have the same output shape
.. code-block:: python
application = ApplicationBuilder().with_actions(
streaming_response=streaming_response,
error=non_streaming_error # a non-streaming function that outputs an error
)...build()
prompt = "Count to 100, with a comma between each number and no newlines. E.g., 1, 2, 3, ..."
action, streaming_result = await application.astream_result(halt_after='streaming_response', inputs={"prompt": prompt})
color = "red" if action.name == "error" else "green"
if action.name == "streaming_response": # can also use the ``.streaming`` attribute of action
async for result in streaming_result:
print(format(result['response'], color)) # assumes that error and streaming_response both have the same output shape
else:
result, state = await output.get()
print(format(result['response'], color))
"""
halt_before, halt_after, inputs = self._clean_iterate_params(
halt_before, halt_after, inputs
)
Expand Down Expand Up @@ -1078,10 +1215,10 @@ async def callback(
results=result, final_state=state
)
if next_action.single_step:
next_action = cast(SingleStepStreamingActionAsync, next_action)
next_action = cast(SingleStepStreamingAction, next_action)
if not next_action.is_async():
raise ValueError("TODO -- convert")
generator = _run_single_step_streaming_action_async(
generator = _arun_single_step_streaming_action(
next_action, self._state, action_inputs
)
return next_action, AsyncStreamingResultContainer(
Expand All @@ -1091,7 +1228,7 @@ async def callback(
if not next_action.is_async():
raise ValueError("TODO -- convert")
next_action = cast(AsyncStreamingAction, next_action)
generator = _run_multi_step_streaming_action_async(
generator = _arun_multi_step_streaming_action(
next_action, self._state, action_inputs
)
except Exception as e:
Expand All @@ -1112,10 +1249,6 @@ async def callback(
return next_action, AsyncStreamingResultContainer(
generator, self._state, process_result, callback
)
# raise NotImplementedError(
# "This has not yet been implemented! See the github issue: https://github.com/DAGWorks-Inc/burr/issues/64"
# " for details. Please comment or vote to get it implemented quickly!"
# )

@telemetry.capture_function_usage
def visualize(
Expand Down
8 changes: 7 additions & 1 deletion docs/concepts/state-machine.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ This is shown in the example from :ref:`getting started <simpleexample>`
Running
-------

There are three APIs for executing an application.
There are four APIs for executing an application.

``step``/``astep``
------------------
Expand Down Expand Up @@ -135,6 +135,12 @@ In the async context, you can run ``arun``:

``run`` and ``arun`` largely have the same behavior as ``iterate`` and ``aiterate``.

``stream_result``/``astream_result``
------------------------------------

These allow you to stream responses back. Note they work best with streaming actions.
Read about them in the :ref:`streaming <streaming>` section.

----------
Inspection
----------
Expand Down
Loading

0 comments on commit df8f6c8

Please sign in to comment.