From d12befafad43d3851a2ba2b9df3e4ecf2750442e Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Fri, 17 Jan 2025 13:30:16 +0100 Subject: [PATCH 1/2] Use collect operator in test programs --- src/cascade/dataflow/test_dataflow.py | 18 ++++-- test_programs/expected/checkout_two_items.py | 59 +++++++++++++++----- 2 files changed, 60 insertions(+), 17 deletions(-) diff --git a/src/cascade/dataflow/test_dataflow.py b/src/cascade/dataflow/test_dataflow.py index 9458c70..67651b3 100644 --- a/src/cascade/dataflow/test_dataflow.py +++ b/src/cascade/dataflow/test_dataflow.py @@ -1,5 +1,5 @@ from typing import Any -from cascade.dataflow.dataflow import DataFlow, Edge, Event, EventResult, InvokeMethod, MergeNode, OpNode +from cascade.dataflow.dataflow import CollectNode, CollectTarget, DataFlow, Edge, Event, EventResult, InvokeMethod, MergeNode, OpNode from cascade.dataflow.operator import StatefulOperator class DummyUser: @@ -79,9 +79,19 @@ def test_simple_df_propogation(): def test_merge_df_propogation(): df = DataFlow("user.buy_2_items") n0 = OpNode(DummyUser, InvokeMethod("buy_2_items_0")) - n1 = OpNode(DummyItem, InvokeMethod("get_price")) - n2 = OpNode(DummyItem, InvokeMethod("get_price")) - n3 = MergeNode() + n3 = CollectNode(assign_result_to="item_prices", read_results_from="item_price") + n1 = OpNode( + DummyItem, + InvokeMethod("get_price"), + assign_result_to="item_price", + collect_target=CollectTarget(n3, 2, 0) + ) + n2 = OpNode( + DummyItem, + InvokeMethod("get_price"), + assign_result_to="item_price", + collect_target=CollectTarget(n3, 2, 1) + ) n4 = OpNode(DummyUser, InvokeMethod("buy_2_items_1")) df.add_edge(Edge(n0, n1)) df.add_edge(Edge(n0, n2)) diff --git a/test_programs/expected/checkout_two_items.py b/test_programs/expected/checkout_two_items.py index c3784bd..cceb488 100644 --- a/test_programs/expected/checkout_two_items.py +++ b/test_programs/expected/checkout_two_items.py @@ -1,17 +1,19 @@ from typing import Any -# from ..target.checkout_item import User, Item -# from cascade.dataflow.dataflow import DataFlow, OpNode, InvokeMethod, Edge + +from cascade.dataflow.operator import StatefulOperator +from ..target.checkout_two_items import User, Item +from cascade.dataflow.dataflow import DataFlow, OpNode, InvokeMethod, Edge, CollectNode, CollectTarget def buy_two_items_0_compiled(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any: - key_stack.append(variable_map['item_1_key']) - key_stack.append(variable_map['item_2_key']) + key_stack.append( + [variable_map["item1_key"], variable_map["item2_key"]] + ) return None - def buy_two_items_1_compiled(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any: key_stack.pop() - item_price_1_0 = variable_map['item_price_1_0'] - item_price_2_0 = variable_map['item_price_2_0'] + item_price_1_0 = variable_map['item_prices'][0] + item_price_2_0 = variable_map['item_prices'][1] total_price_0 = item_price_1_0 + item_price_2_0 state.balance -= total_price_0 return state.balance >= 0 @@ -22,13 +24,44 @@ def get_price_0_compiled(variable_map: dict[str, Any], state: Item, key_stack: l return state.price -def user_buy_item_df(): - df = DataFlow("user.buy_item") - n0 = OpNode(User, InvokeMethod("buy_item_0")) - n1 = OpNode(Item, InvokeMethod("get_price"), assign_result_to="item_price") - n2 = OpNode(User, InvokeMethod("buy_item_1")) +# An operator is defined by the underlying class and the functions that can be called +user_op = StatefulOperator( + User, + { + "buy_two_items_0": buy_two_items_0_compiled, + "buy_two_items_1": buy_two_items_1_compiled + }, + None) + +item_op = StatefulOperator( + Item, {"get_price": get_price_0_compiled}, None +) + +def user_buy_two_items_df(): + df = DataFlow("user.buy_2_items") + n0 = OpNode(user_op, InvokeMethod("buy_2_items_0")) + n3 = CollectNode(assign_result_to="item_prices", read_results_from="item_price") + n1 = OpNode( + item_op, + InvokeMethod("get_price"), + assign_result_to="item_price", + collect_target=CollectTarget(n3, 2, 0) + ) + n2 = OpNode( + item_op, + InvokeMethod("get_price"), + assign_result_to="item_price", + collect_target=CollectTarget(n3, 2, 1) + ) + n4 = OpNode(user_op, InvokeMethod("buy_2_items_1")) df.add_edge(Edge(n0, n1)) - df.add_edge(Edge(n1, n2)) + df.add_edge(Edge(n0, n2)) + df.add_edge(Edge(n1, n3)) + df.add_edge(Edge(n2, n3)) + df.add_edge(Edge(n3, n4)) df.entry = n0 return df +user_op.dataflows = { + "buy_two_items": user_buy_two_items_df(), +} \ No newline at end of file From b022709e947a2e69ffb24c454bfac8b7165f7de3 Mon Sep 17 00:00:00 2001 From: lucasvanmol Date: Fri, 17 Jan 2025 15:25:31 +0100 Subject: [PATCH 2/2] Add non-parallel checkout example --- test_programs/expected/checkout_two_items.py | 27 ++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/test_programs/expected/checkout_two_items.py b/test_programs/expected/checkout_two_items.py index cceb488..2081cd3 100644 --- a/test_programs/expected/checkout_two_items.py +++ b/test_programs/expected/checkout_two_items.py @@ -12,8 +12,8 @@ def buy_two_items_0_compiled(variable_map: dict[str, Any], state: User, key_stac def buy_two_items_1_compiled(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any: key_stack.pop() - item_price_1_0 = variable_map['item_prices'][0] - item_price_2_0 = variable_map['item_prices'][1] + item_price_1_0 = variable_map['item_price_1'] + item_price_2_0 = variable_map['item_price_2'] total_price_0 = item_price_1_0 + item_price_2_0 state.balance -= total_price_0 return state.balance >= 0 @@ -38,6 +38,29 @@ def get_price_0_compiled(variable_map: dict[str, Any], state: Item, key_stack: l ) def user_buy_two_items_df(): + df = DataFlow("user.buy_2_items") + n0 = OpNode(user_op, InvokeMethod("buy_2_items_0")) + n1 = OpNode( + item_op, + InvokeMethod("get_price"), + assign_result_to="item_price_1", + ) + n2 = OpNode( + item_op, + InvokeMethod("get_price"), + assign_result_to="item_price_2", + ) + n3 = OpNode(user_op, InvokeMethod("buy_2_items_1")) + df.add_edge(Edge(n0, n1)) + df.add_edge(Edge(n0, n2)) + df.add_edge(Edge(n1, n2)) + df.add_edge(Edge(n2, n3)) + df.entry = n0 + return df + + +# For future optimizations (not used) +def user_buy_two_items_df_parallelized(): df = DataFlow("user.buy_2_items") n0 = OpNode(user_op, InvokeMethod("buy_2_items_0")) n3 = CollectNode(assign_result_to="item_prices", read_results_from="item_price")