Skip to content

Commit

Permalink
Updates demo data to have partiion key
Browse files Browse the repository at this point in the history
  • Loading branch information
elijahbenizzy committed Mar 18, 2024
1 parent 50d2213 commit 5b9917a
Show file tree
Hide file tree
Showing 43 changed files with 1,501 additions and 1,485 deletions.
125 changes: 62 additions & 63 deletions burr/core/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
cast,
)

from burr import telemetry
from burr import visibility
from burr import telemetry, visibility
from burr.core import persistence
from burr.core.action import (
Action,
Expand Down Expand Up @@ -173,7 +172,7 @@ def _format_error_message(action: Action, input_state: State, inputs: dict) -> s


def _run_single_step_action(
action: SingleStepAction, state: State, inputs: Optional[Dict[str, Any]]
action: SingleStepAction, state: State, inputs: Optional[Dict[str, Any]]
) -> Tuple[Dict[str, Any], State]:
"""Runs a single step action. This API is internal-facing and a bit in flux, but
it corresponds to the SingleStepAction class.
Expand All @@ -191,15 +190,15 @@ def _run_single_step_action(


def _run_single_step_streaming_action(
action: SingleStepStreamingAction, state: State, inputs: Optional[Dict[str, Any]]
action: SingleStepStreamingAction, state: State, inputs: Optional[Dict[str, Any]]
) -> Generator[dict, None, Tuple[dict, State]]:
action.validate_inputs(inputs)
generator = action.stream_run_and_update(state, **inputs)
return (yield from generator)


def _run_multi_step_streaming_action(
action: StreamingAction, state: State, inputs: Optional[Dict[str, Any]]
action: StreamingAction, state: State, inputs: Optional[Dict[str, Any]]
) -> Generator[dict, None, Tuple[dict, State]]:
action.validate_inputs(inputs)
generator = action.stream_run(state, **inputs)
Expand All @@ -209,7 +208,7 @@ def _run_multi_step_streaming_action(


async def _arun_single_step_action(
action: SingleStepAction, state: State, inputs: Optional[Dict[str, Any]]
action: SingleStepAction, state: State, inputs: Optional[Dict[str, Any]]
) -> Tuple[dict, State]:
"""Runs a single step action in async. See the synchronous version for more details."""
state_to_use = state
Expand Down Expand Up @@ -292,7 +291,7 @@ def step(self, inputs: Optional[Dict[str, Any]] = None) -> Optional[Tuple[Action
return out

def _step(
self, inputs: Optional[Dict[str, Any]], _run_hooks: bool = True
self, inputs: Optional[Dict[str, Any]], _run_hooks: bool = True
) -> Optional[Tuple[Action, dict, State]]:
"""Internal-facing version of step. This is the same as step, but with an additional
parameter to hide hook execution so async can leverage it."""
Expand Down Expand Up @@ -446,10 +445,10 @@ async def astep(self, inputs: Dict[str, Any] = None) -> Optional[Tuple[Action, d
return next_action, result, new_state

def _clean_iterate_params(
self,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
self,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> Tuple[list[str], list[str], Dict[str, Any]]:
"""Utility function to clean out iterate params so we have less duplication between iterate/aiterate
and the logic is cleaner later.
Expand All @@ -474,7 +473,7 @@ def has_next_action(self) -> bool:
return self.get_next_action() is not None

def _should_halt_iterate(
self, halt_before: list[str], halt_after: list[str], prior_action: Action
self, halt_before: list[str], halt_after: list[str], prior_action: Action
) -> bool:
"""Internal utility function to determine whether or not to halt during iteration"""
if self.has_next_action() and self.get_next_action().name in halt_before:
Expand All @@ -486,11 +485,11 @@ def _should_halt_iterate(
return False

def _return_value_iterate(
self,
halt_before: list[str],
halt_after: list[str],
prior_action: Optional[Action],
result: Optional[dict],
self,
halt_before: list[str],
halt_after: list[str],
prior_action: Optional[Action],
result: Optional[dict],
) -> Tuple[Optional[Action], Optional[dict], State]:
"""Utility function to decide what to return for iterate/arun. Note that run() will delegate to the return value of
iterate, whereas arun cannot delegate to the return value of aiterate (as async generators cannot return a value).
Expand Down Expand Up @@ -520,11 +519,11 @@ def _return_value_iterate(

@telemetry.capture_function_usage
def iterate(
self,
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
self,
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> Generator[Tuple[Action, dict, State], None, Tuple[Action, Optional[dict], State]]:
"""Returns a generator that calls step() in a row, enabling you to see the state
of the system as it updates. Note this returns a generator, and also the final result
Expand Down Expand Up @@ -557,11 +556,11 @@ def iterate(

@telemetry.capture_function_usage
async def aiterate(
self,
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
self,
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> AsyncGenerator[Tuple[Action, dict, State], None]:
"""Returns a generator that calls step() in a row, enabling you to see the state
of the system as it updates. This is the asynchronous version so it has no capability of t
Expand All @@ -587,11 +586,11 @@ async def aiterate(

@telemetry.capture_function_usage
def run(
self,
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
self,
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> Tuple[Action, Optional[dict], State]:
"""Runs your application through until completion. Does
not give access to the state along the way -- if you want that, use iterate().
Expand All @@ -611,11 +610,11 @@ def run(

@telemetry.capture_function_usage
async def arun(
self,
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
self,
*,
halt_before: list[str] = None,
halt_after: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> Tuple[Action, Optional[dict], State]:
"""Runs your application through until completion, using async. Does
not give access to the state along the way -- if you want that, use iterate().
Expand All @@ -631,7 +630,7 @@ async def arun(
halt_before, halt_after, inputs
)
async for prior_action, result, state in self.aiterate(
halt_before=halt_before, halt_after=halt_after, inputs=inputs
halt_before=halt_before, halt_after=halt_after, inputs=inputs
):
pass
return self._return_value_iterate(halt_before, halt_after, prior_action, result)
Expand All @@ -647,10 +646,10 @@ def _validate_streaming_inputs(self, halt_after: list[str]):

@telemetry.capture_function_usage
def stream_result(
self,
halt_after: list[str],
halt_before: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
self,
halt_after: list[str],
halt_before: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> Tuple[Action, StreamingResultContainer]:
"""Streams a result out.
Expand Down Expand Up @@ -800,10 +799,10 @@ def process_result(result: dict, state: State) -> Tuple[Dict[str, Any], State]:
return result, new_state

def callback(
result: Optional[dict],
state: State,
exc: Optional[Exception] = None,
seq_id=self.sequence_id,
result: Optional[dict],
state: State,
exc: Optional[Exception] = None,
seq_id=self.sequence_id,
):
self._adapter_set.call_all_lifecycle_hooks_sync(
"post_run_step",
Expand Down Expand Up @@ -859,10 +858,10 @@ def callback(

@telemetry.capture_function_usage
async def astream_result(
self,
halt_after: list[str],
halt_before: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
self,
halt_after: list[str],
halt_before: list[str] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> Tuple[Action, ...]:
"""Placeholder for the async version of stream_result. This is not yet implemented."""
raise NotImplementedError(
Expand All @@ -872,13 +871,13 @@ async def astream_result(

@telemetry.capture_function_usage
def visualize(
self,
output_file_path: Optional[str],
include_conditions: bool = False,
include_state: bool = False,
view: bool = False,
engine: Literal["graphviz"] = "graphviz",
**engine_kwargs: Any,
self,
output_file_path: Optional[str],
include_conditions: bool = False,
include_state: bool = False,
view: bool = False,
engine: Literal["graphviz"] = "graphviz",
**engine_kwargs: Any,
):
"""Visualizes the application graph using graphviz. This will render the graph.
Expand Down Expand Up @@ -1051,7 +1050,7 @@ def _assert_set(value: Optional[Any], field: str, method: str):


def _validate_transitions(
transitions: Optional[List[Tuple[str, str, Condition]]], actions: Set[str]
transitions: Optional[List[Tuple[str, str, Condition]]], actions: Set[str]
):
_assert_set(transitions, "_transitions", "with_transitions")
exhausted = {} # items for which we have seen a default transition
Expand Down Expand Up @@ -1114,7 +1113,7 @@ def __init__(self):
self.default_state: Optional[dict] = None

def with_identifiers(
self, app_id: str = None, partition_key: str = None, sequence_id: int = None
self, app_id: str = None, partition_key: str = None, sequence_id: int = None
) -> "ApplicationBuilder":
"""Assigns various identifiers to the application. This is used for tracking, persistence, etc...
Expand Down Expand Up @@ -1185,10 +1184,10 @@ def with_actions(self, **actions: Union[Action, Callable]) -> "ApplicationBuilde
return self

def with_transitions(
self,
*transitions: Union[
Tuple[Union[str, list[str]], str], Tuple[Union[str, list[str]], str, Condition]
],
self,
*transitions: Union[
Tuple[Union[str, list[str]], str], Tuple[Union[str, list[str]], str, Condition]
],
) -> "ApplicationBuilder":
"""Adds transitions to the application. Transitions are specified as tuples of either:
1. (from, to, condition)
Expand Down
Loading

0 comments on commit 5b9917a

Please sign in to comment.