diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 784f1247..8061718b 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -1,3 +1,4 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema import json import datetime import inspect @@ -30,7 +31,7 @@ class DurableOrchestrationContext: # parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions def __init__(self, history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool, - parentInstanceId: str, input: Any = None, **kwargs): + parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs): self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history] self._instance_id: str = instanceId self._is_replaying: bool = isReplaying @@ -45,8 +46,11 @@ def __init__(self, self._current_utc_datetime: datetime.datetime = \ self.decision_started_event.timestamp self._new_uuid_counter = 0 - self.actions: List[List[Action]] = [] self._function_context: FunctionContext = FunctionContext(**kwargs) + self._replay_schema = ReplaySchema(upperSchemaVersion) + self.actions: List[List[Action]] = [] + if self._replay_schema == ReplaySchema.V2: + self.actions.append([]) # make _input always a string # (consistent with Python Functions generic trigger/input bindings) @@ -240,7 +244,7 @@ def task_all(self, activities: List[Task]) -> TaskSet: TaskSet The results of all activities. """ - return task_all(tasks=activities) + return task_all(tasks=activities, replay_schema=self._replay_schema) def task_any(self, activities: List[Task]) -> TaskSet: """Schedule the execution of all activities. @@ -260,7 +264,7 @@ def task_any(self, activities: List[Task]) -> TaskSet: TaskSet The first [[Task]] instance to complete. """ - return task_any(tasks=activities) + return task_any(tasks=activities, replay_schema=self._replay_schema) def set_custom_status(self, status: Any): """Set the customized orchestration status for your orchestrator function. diff --git a/azure/durable_functions/models/OrchestratorState.py b/azure/durable_functions/models/OrchestratorState.py index 2deec595..d4d6fffe 100644 --- a/azure/durable_functions/models/OrchestratorState.py +++ b/azure/durable_functions/models/OrchestratorState.py @@ -1,6 +1,8 @@ import json from typing import List, Any, Dict, Optional, Union +from azure.durable_functions.models.ReplaySchema import ReplaySchema + from .utils.json_utils import add_attrib from azure.durable_functions.models.actions.Action import Action @@ -16,6 +18,7 @@ def __init__(self, is_done: bool, actions: List[List[Action]], output: Any, + replay_schema: ReplaySchema, error: str = None, custom_status: Any = None): self._is_done: bool = is_done @@ -23,6 +26,7 @@ def __init__(self, self._output: Any = output self._error: Optional[str] = error self._custom_status: Any = custom_status + self._replay_schema: ReplaySchema = replay_schema @property def actions(self) -> List[List[Action]]: @@ -66,6 +70,11 @@ def custom_status(self): """Get the JSON-serializable value used by DurableOrchestrationContext.SetCustomStatus.""" return self._custom_status + @property + def schema_version(self): + """Get the Replay Schema represented in this OrchestratorState payload.""" + return self._replay_schema.value + def to_json(self) -> Dict[str, Union[str, int]]: """Convert object into a json dictionary. @@ -76,6 +85,8 @@ def to_json(self) -> Dict[str, Union[str, int]]: """ json_dict: Dict[str, Union[str, int]] = {} add_attrib(json_dict, self, '_is_done', 'isDone') + if self._replay_schema != ReplaySchema.V1: + add_attrib(json_dict, self, 'schema_version', 'schemaVersion') self._add_actions(json_dict) if not (self._output is None): json_dict['output'] = self._output diff --git a/azure/durable_functions/models/ReplaySchema.py b/azure/durable_functions/models/ReplaySchema.py new file mode 100644 index 00000000..1fb79b95 --- /dev/null +++ b/azure/durable_functions/models/ReplaySchema.py @@ -0,0 +1,8 @@ +from enum import Enum + + +class ReplaySchema(Enum): + """Enum representing the ReplaySchemas supported by this SDK version.""" + + V1 = 0 + V2 = 1 diff --git a/azure/durable_functions/models/actions/ActionType.py b/azure/durable_functions/models/actions/ActionType.py index 406c6f86..a7bea219 100644 --- a/azure/durable_functions/models/actions/ActionType.py +++ b/azure/durable_functions/models/actions/ActionType.py @@ -14,3 +14,5 @@ class ActionType(IntEnum): CALL_ENTITY = 7 CALL_HTTP: int = 8 SIGNAL_ENTITY: int = 9 + WHEN_ANY = 11 + WHEN_ALL = 12 diff --git a/azure/durable_functions/models/actions/CompoundAction.py b/azure/durable_functions/models/actions/CompoundAction.py new file mode 100644 index 00000000..419017eb --- /dev/null +++ b/azure/durable_functions/models/actions/CompoundAction.py @@ -0,0 +1,35 @@ +from typing import Dict, Union + +from .Action import Action +from ..utils.json_utils import add_attrib +from typing import List +from abc import abstractmethod + + +class CompoundAction(Action): + """Defines the structure of the WhenAll Action object. + + Provides the information needed by the durable extension to be able to invoke WhenAll tasks. + """ + + def __init__(self, compoundTasks: List[Action]): + self.compound_actions = list(map(lambda x: x.to_json(), compoundTasks)) + + @property + @abstractmethod + def action_type(self) -> int: + """Get this object's action type as an integer.""" + ... + + def to_json(self) -> Dict[str, Union[str, int]]: + """Convert object into a json dictionary. + + Returns + ------- + Dict[str, Union[str, int]] + The instance of the class converted into a json dictionary + """ + json_dict: Dict[str, Union[str, int]] = {} + add_attrib(json_dict, self, 'action_type', 'actionType') + add_attrib(json_dict, self, 'compound_actions', 'compoundActions') + return json_dict diff --git a/azure/durable_functions/models/actions/WhenAllAction.py b/azure/durable_functions/models/actions/WhenAllAction.py new file mode 100644 index 00000000..b66f6237 --- /dev/null +++ b/azure/durable_functions/models/actions/WhenAllAction.py @@ -0,0 +1,14 @@ +from .ActionType import ActionType +from azure.durable_functions.models.actions.CompoundAction import CompoundAction + + +class WhenAllAction(CompoundAction): + """Defines the structure of the WhenAll Action object. + + Provides the information needed by the durable extension to be able to invoke WhenAll tasks. + """ + + @property + def action_type(self) -> int: + """Get the type of action this class represents.""" + return ActionType.WHEN_ALL diff --git a/azure/durable_functions/models/actions/WhenAnyAction.py b/azure/durable_functions/models/actions/WhenAnyAction.py new file mode 100644 index 00000000..3a2bc860 --- /dev/null +++ b/azure/durable_functions/models/actions/WhenAnyAction.py @@ -0,0 +1,14 @@ +from azure.durable_functions.models.actions.CompoundAction import CompoundAction +from .ActionType import ActionType + + +class WhenAnyAction(CompoundAction): + """Defines the structure of the WhenAll Action object. + + Provides the information needed by the durable extension to be able to invoke WhenAll tasks. + """ + + @property + def action_type(self) -> int: + """Get the type of action this class represents.""" + return ActionType.WHEN_ANY diff --git a/azure/durable_functions/orchestrator.py b/azure/durable_functions/orchestrator.py index 801b9701..0f42f1a8 100644 --- a/azure/durable_functions/orchestrator.py +++ b/azure/durable_functions/orchestrator.py @@ -5,6 +5,8 @@ """ from typing import Callable, Iterator, Any, Generator +from azure.durable_functions.models.ReplaySchema import ReplaySchema + from .models import ( DurableOrchestrationContext, Task, @@ -55,6 +57,7 @@ def handle(self, context: DurableOrchestrationContext): # `fn_output` is the return value instead of a generator if not isinstance(fn_output, Iterator): orchestration_state = OrchestratorState( + replay_schema=self.durable_context._replay_schema, is_done=True, output=fn_output, actions=self.durable_context.actions, @@ -75,6 +78,7 @@ def handle(self, context: DurableOrchestrationContext): # `will_continue_as_new` essentially "tracks" # whether or not the orchestration is done. orchestration_state = OrchestratorState( + replay_schema=self.durable_context._replay_schema, is_done=self.durable_context.will_continue_as_new, output=None, actions=self.durable_context.actions, @@ -95,6 +99,7 @@ def handle(self, context: DurableOrchestrationContext): except StopIteration as sie: orchestration_state = OrchestratorState( + replay_schema=self.durable_context._replay_schema, is_done=True, output=sie.value, actions=self.durable_context.actions, @@ -102,6 +107,7 @@ def handle(self, context: DurableOrchestrationContext): except Exception as e: exception_str = str(e) orchestration_state = OrchestratorState( + replay_schema=self.durable_context._replay_schema, is_done=False, output=None, # Should have no output, after generation range actions=self.durable_context.actions, @@ -135,12 +141,17 @@ def _add_to_actions(self, generation_state): if self.durable_context.will_continue_as_new: return if not generation_state._is_yielded: - if (isinstance(generation_state, Task) - and hasattr(generation_state, "action")): - self.durable_context.actions.append([generation_state.action]) - elif (isinstance(generation_state, TaskSet) - and hasattr(generation_state, "actions")): - self.durable_context.actions.append(generation_state.actions) + if isinstance(generation_state, Task): + if self.durable_context._replay_schema == ReplaySchema.V1: + self.durable_context.actions.append([generation_state.action]) + else: + self.durable_context.actions[0].append(generation_state.action) + + elif isinstance(generation_state, TaskSet): + if self.durable_context._replay_schema == ReplaySchema.V1: + self.durable_context.actions.append(generation_state.actions) + else: + self.durable_context.actions[0].append(generation_state.actions) generation_state._is_yielded = True def _update_timestamp(self): diff --git a/azure/durable_functions/tasks/task_all.py b/azure/durable_functions/tasks/task_all.py index 9d19a917..13721a10 100644 --- a/azure/durable_functions/tasks/task_all.py +++ b/azure/durable_functions/tasks/task_all.py @@ -1,3 +1,5 @@ +from azure.durable_functions.models.actions.WhenAllAction import WhenAllAction +from azure.durable_functions.models.ReplaySchema import ReplaySchema from datetime import datetime from typing import List, Optional, Any @@ -6,7 +8,7 @@ from ..models.actions import Action -def task_all(tasks: List[Task]): +def task_all(tasks: List[Task], replay_schema: ReplaySchema): """Determine the state of scheduling the activities for execution with retry options. Parameters @@ -33,7 +35,10 @@ def task_all(tasks: List[Task]): for task in tasks: # Add actions and results if isinstance(task, TaskSet): - actions.extend(task.actions) + if replay_schema == ReplaySchema.V1: + actions.extend(task.actions) + else: + actions.append(task.actions) else: # We know it's an atomic Task actions.append(task.action) @@ -62,6 +67,9 @@ def task_all(tasks: List[Task]): results = [] end_time = None + if replay_schema == ReplaySchema.V2: + actions = WhenAllAction(actions) + # Construct TaskSet taskset = TaskSet( is_completed=is_completed, diff --git a/azure/durable_functions/tasks/task_any.py b/azure/durable_functions/tasks/task_any.py index 8b8ccb81..f76e5572 100644 --- a/azure/durable_functions/tasks/task_any.py +++ b/azure/durable_functions/tasks/task_any.py @@ -1,7 +1,9 @@ +from azure.durable_functions.models.actions.WhenAnyAction import WhenAnyAction +from azure.durable_functions.models.ReplaySchema import ReplaySchema from ..models.TaskSet import TaskSet -def task_any(tasks): +def task_any(tasks, replay_schema: ReplaySchema): """Determine whether any of the given tasks is completed. Parameters @@ -22,8 +24,10 @@ def task_any(tasks): error_message = [] for task in tasks: if isinstance(task, TaskSet): - for action in task.actions: - all_actions.append(action) + if replay_schema == ReplaySchema.V1: + all_actions.extend(task.actions) + else: + all_actions.append(task.actions) else: all_actions.append(task.action) @@ -35,6 +39,9 @@ def task_any(tasks): completed_tasks.sort(key=lambda t: t.timestamp) + if replay_schema == ReplaySchema.V2: + all_actions = WhenAnyAction(all_actions) + if len(faulted_tasks) == len(tasks): return TaskSet(True, all_actions, None, is_faulted=True, exception=Exception( f"All tasks have failed, errors messages in all tasks:{error_message}")) diff --git a/tests/models/test_DurableOrchestrationContext.py b/tests/models/test_DurableOrchestrationContext.py index c86dc1e8..e2dc9f59 100644 --- a/tests/models/test_DurableOrchestrationContext.py +++ b/tests/models/test_DurableOrchestrationContext.py @@ -1,6 +1,7 @@ import pytest import json from dateutil.parser import parse as dt_parse +from azure.durable_functions.models.ReplaySchema import ReplaySchema from azure.durable_functions.models.DurableOrchestrationContext \ import DurableOrchestrationContext @@ -23,10 +24,32 @@ def starting_context(): '"isReplaying":false,"parentInstanceId":null} ') return context +@pytest.fixture +def starting_context_v2(): + context = DurableOrchestrationContext.from_json( + '{"history":[{"EventType":12,"EventId":-1,"IsPlayed":false,' + '"Timestamp":"2019-12-08T23:18:41.3240927Z"}, ' + '{"OrchestrationInstance":{' + '"InstanceId":"48d0f95957504c2fa579e810a390b938", ' + '"ExecutionId":"fd183ee02e4b4fd18c95b773cfb5452b"},"EventType":0,' + '"ParentInstance":null, ' + '"Name":"DurableOrchestratorTrigger","Version":"","Input":"null",' + '"Tags":null,"EventId":-1,"IsPlayed":false, ' + '"Timestamp":"2019-12-08T23:18:39.756132Z"}],"input":null,' + '"instanceId":"48d0f95957504c2fa579e810a390b938", ' + '"upperSchemaVersion": 1, ' + '"isReplaying":false,"parentInstanceId":null} ') + return context + def test_extracts_is_replaying(starting_context): assert not starting_context.is_replaying +def test_assumes_v1_replayschema(starting_context): + assert starting_context._replay_schema is ReplaySchema.V1 + +def test_assumes_v2_replayschema(starting_context_v2): + assert starting_context_v2._replay_schema is ReplaySchema.V2 def test_extracts_instance_id(starting_context): assert "48d0f95957504c2fa579e810a390b938" == starting_context.instance_id diff --git a/tests/models/test_OrchestrationState.py b/tests/models/test_OrchestrationState.py index 602a1251..3289152d 100644 --- a/tests/models/test_OrchestrationState.py +++ b/tests/models/test_OrchestrationState.py @@ -1,3 +1,4 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema from typing import List from azure.durable_functions.models.actions.Action import Action @@ -8,7 +9,7 @@ def test_empty_state_to_json_string(): actions: List[List[Action]] = [] - state = OrchestratorState(is_done=False, actions=actions, output=None) + state = OrchestratorState(is_done=False, actions=actions, output=None, replay_schema=ReplaySchema.V1.value) result = state.to_json_string() expected_result = '{"isDone": false, "actions": []}' assert expected_result == result @@ -19,7 +20,7 @@ def test_single_action_state_to_json_string(): action: Action = CallActivityAction( function_name="MyFunction", input_="AwesomeInput") actions.append([action]) - state = OrchestratorState(is_done=False, actions=actions, output=None) + state = OrchestratorState(is_done=False, actions=actions, output=None, replay_schema=ReplaySchema.V1.value) result = state.to_json_string() expected_result = ('{"isDone": false, "actions": [[{"actionType": 0, ' '"functionName": "MyFunction", "input": ' diff --git a/tests/orchestrator/orchestrator_test_utils.py b/tests/orchestrator/orchestrator_test_utils.py index cef69472..f7fcd07c 100644 --- a/tests/orchestrator/orchestrator_test_utils.py +++ b/tests/orchestrator/orchestrator_test_utils.py @@ -12,6 +12,7 @@ def assert_orchestration_state_equals(expected, result): """Ensure that the observable OrchestratorState matches the expected result. """ assert_attribute_equal(expected, result, "isDone") + assert_attribute_equal(expected, result, "schemaVersion") assert_actions_are_equal(expected, result) assert_attribute_equal(expected, result, "output") assert_attribute_equal(expected, result, "error") diff --git a/tests/orchestrator/test_call_http.py b/tests/orchestrator/test_call_http.py index 2eea39d6..be46d870 100644 --- a/tests/orchestrator/test_call_http.py +++ b/tests/orchestrator/test_call_http.py @@ -1,3 +1,4 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema import json from typing import Dict @@ -30,8 +31,8 @@ def complete_generator_function(context): headers=HEADERS, token_source=TOKEN_SOURCE) -def base_expected_state(output=None) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output) +def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value) def add_http_action(state: OrchestratorState, request): diff --git a/tests/orchestrator/test_continue_as_new.py b/tests/orchestrator/test_continue_as_new.py index ec7956d9..f145b2db 100644 --- a/tests/orchestrator/test_continue_as_new.py +++ b/tests/orchestrator/test_continue_as_new.py @@ -1,3 +1,4 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema from .orchestrator_test_utils \ import assert_orchestration_state_equals, get_orchestration_state_result, assert_valid_schema from tests.test_utils.ContextBuilder import ContextBuilder @@ -13,8 +14,8 @@ def generator_function(context): context.continue_as_new("Cause I can") -def base_expected_state(output=None) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output) +def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value) def add_hello_action(state: OrchestratorState, input_: str): diff --git a/tests/orchestrator/test_create_timer.py b/tests/orchestrator/test_create_timer.py index b4e083e1..bd6eb174 100644 --- a/tests/orchestrator/test_create_timer.py +++ b/tests/orchestrator/test_create_timer.py @@ -1,3 +1,4 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema from tests.test_utils.ContextBuilder import ContextBuilder from .orchestrator_test_utils \ import get_orchestration_state_result, assert_orchestration_state_equals, assert_valid_schema @@ -7,8 +8,8 @@ from datetime import datetime, timedelta, timezone -def base_expected_state(output=None) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output) +def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.V1.value) def add_timer_fired_events(context_builder: ContextBuilder, id_: int, timestamp: str): fire_at: str = context_builder.add_timer_created_event(id_, timestamp) diff --git a/tests/orchestrator/test_entity.py b/tests/orchestrator/test_entity.py index eaf4bbc9..af0bccf4 100644 --- a/tests/orchestrator/test_entity.py +++ b/tests/orchestrator/test_entity.py @@ -1,3 +1,4 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema from .orchestrator_test_utils \ import assert_orchestration_state_equals, get_orchestration_state_result, assert_valid_schema, \ get_entity_state_result, assert_entity_state_equals @@ -148,8 +149,8 @@ def add_call_entity_action_for_entity(state: OrchestratorState, id_: df.EntityId state.actions.append([action]) -def base_expected_state(output=None) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output) +def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.V1.value) def add_call_entity_action(state: OrchestratorState, id_: df.EntityId, op: str, input_: Any): action = CallEntityAction(entity_id=id_, operation=op, input_=input_) diff --git a/tests/orchestrator/test_fan_out_fan_in.py b/tests/orchestrator/test_fan_out_fan_in.py index 5d0c33c5..e0283be6 100644 --- a/tests/orchestrator/test_fan_out_fan_in.py +++ b/tests/orchestrator/test_fan_out_fan_in.py @@ -1,3 +1,4 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema import json from azure.durable_functions.models import OrchestratorState @@ -18,8 +19,8 @@ def generator_function(context): return results -def base_expected_state(output=None, error=None) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output, error=error) +def base_expected_state(output=None, error=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: + return OrchestratorState(is_done=False, actions=[], output=output, error=error, replay_schema=replay_schema.value) def add_completed_event( diff --git a/tests/orchestrator/test_is_replaying_flag.py b/tests/orchestrator/test_is_replaying_flag.py index 98f47b4b..e638c6df 100644 --- a/tests/orchestrator/test_is_replaying_flag.py +++ b/tests/orchestrator/test_is_replaying_flag.py @@ -1,3 +1,4 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema from tests.test_utils.ContextBuilder import ContextBuilder from .orchestrator_test_utils \ import get_orchestration_property, assert_orchestration_state_equals, assert_valid_schema @@ -17,8 +18,8 @@ def generator_function(context): deadline = deadline + timedelta(seconds=30) yield context.create_timer(deadline) -def base_expected_state(output=None) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output) +def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value) def add_timer_fired_events(context_builder: ContextBuilder, id_: int, timestamp: str, is_played: bool = True): diff --git a/tests/orchestrator/test_sequential_orchestrator.py b/tests/orchestrator/test_sequential_orchestrator.py index c56b9d1a..ac63259b 100644 --- a/tests/orchestrator/test_sequential_orchestrator.py +++ b/tests/orchestrator/test_sequential_orchestrator.py @@ -1,3 +1,6 @@ +from azure.durable_functions.models.actions.WhenAnyAction import WhenAnyAction +from azure.durable_functions.models.actions.WhenAllAction import WhenAllAction +from azure.durable_functions.models.ReplaySchema import ReplaySchema from datetime import datetime, timedelta from .orchestrator_test_utils \ import assert_orchestration_state_equals, get_orchestration_state_result, assert_valid_schema @@ -28,6 +31,19 @@ def generator_function_duplicate_yield(context): return "" +def generator_function_compound_tasks(context): + yield context.call_activity("Hello", "Tokyo") + + task1 = context.call_activity("Hello", "Tokyo") + task2 = context.call_activity("Hello", "Tokyo") + task3 = context.call_activity("Hello", "Tokyo") + task4 = context.task_any([task3]) + task5 = context.task_all([task1, task2, task4]) + task6 = context.task_any([task5]) + yield task6 + + return "" + def generator_function_time_is_not_none(context): outputs = [] @@ -112,15 +128,14 @@ def generator_function_new_guid(context): return outputs -def base_expected_state(output=None) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output) +def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema) def add_hello_action(state: OrchestratorState, input_: str): action = CallActivityAction(function_name='Hello', input_=input_) state.actions.append([action]) - def add_hello_completed_events( context_builder: ContextBuilder, id_: int, result: str): context_builder.add_task_scheduled_event(name='Hello', id_=id_) @@ -408,3 +423,26 @@ def test_duplicate_yields_do_not_add_duplicate_actions(): assert_valid_schema(result) assert_orchestration_state_equals(expected, result) +def test_compound_tasks_return_single_action_in_V2(): + """Tests that compound tasks, in the v2 replay schema, are represented as a single "deep" action""" + context_builder = ContextBuilder('test_v2_replay_schema', replay_schema=ReplaySchema.V2) + add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"") + + result = get_orchestration_state_result( + context_builder, generator_function_compound_tasks) + + expected_state = base_expected_state(replay_schema=ReplaySchema.V2) + expected_state._actions = [ + [CallActivityAction("Hello", "Tokyo"), WhenAnyAction( + [WhenAllAction( + [CallActivityAction("Hello", "Tokyo"), CallActivityAction("Hello", "Tokyo"), WhenAnyAction( + [CallActivityAction("Hello", "Tokyo")]) + ]) + ]) + ] + ] + expected_state._is_done = False + expected = expected_state.to_json() + + #assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) \ No newline at end of file diff --git a/tests/orchestrator/test_sequential_orchestrator_custom_status.py b/tests/orchestrator/test_sequential_orchestrator_custom_status.py index 4b3da730..fd56eebc 100644 --- a/tests/orchestrator/test_sequential_orchestrator_custom_status.py +++ b/tests/orchestrator/test_sequential_orchestrator_custom_status.py @@ -1,3 +1,4 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema from .orchestrator_test_utils \ import assert_orchestration_state_equals, get_orchestration_state_result, assert_valid_schema from tests.test_utils.ContextBuilder import ContextBuilder @@ -22,8 +23,8 @@ def generator_function_with_object_status(context): obj_status["tokyo"] = "completed" context.set_custom_status(obj_status) -def base_expected_state(output=None) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output) +def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value) def add_custom_status(state:OrchestratorState, status:Any): state._custom_status = status diff --git a/tests/orchestrator/test_sequential_orchestrator_with_retry.py b/tests/orchestrator/test_sequential_orchestrator_with_retry.py index 0356b43b..fa923463 100644 --- a/tests/orchestrator/test_sequential_orchestrator_with_retry.py +++ b/tests/orchestrator/test_sequential_orchestrator_with_retry.py @@ -1,3 +1,4 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema from .orchestrator_test_utils \ import get_orchestration_state_result, assert_orchestration_state_equals, assert_valid_schema from tests.test_utils.ContextBuilder import ContextBuilder @@ -28,8 +29,8 @@ def generator_function(context): return outputs -def base_expected_state(output=None) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output) +def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value) def add_hello_action(state: OrchestratorState, input_: str): diff --git a/tests/orchestrator/test_serialization.py b/tests/orchestrator/test_serialization.py index affd18ff..d92966c1 100644 --- a/tests/orchestrator/test_serialization.py +++ b/tests/orchestrator/test_serialization.py @@ -1,10 +1,11 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema from tests.test_utils.ContextBuilder import ContextBuilder from .orchestrator_test_utils \ import get_orchestration_state_result, assert_orchestration_state_equals, assert_valid_schema from azure.durable_functions.models.OrchestratorState import OrchestratorState -def base_expected_state(output=None) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output) +def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value) def generator_function(context): return False diff --git a/tests/orchestrator/test_sub_orchestrator.py b/tests/orchestrator/test_sub_orchestrator.py index e1c14d87..d462e087 100644 --- a/tests/orchestrator/test_sub_orchestrator.py +++ b/tests/orchestrator/test_sub_orchestrator.py @@ -1,3 +1,4 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema from .orchestrator_test_utils \ import assert_orchestration_state_equals, get_orchestration_state_result, assert_valid_schema from tests.test_utils.ContextBuilder import ContextBuilder @@ -18,8 +19,8 @@ def generator_function(context): return outputs -def base_expected_state(output=None) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output) +def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value) def add_hello_suborch_action(state: OrchestratorState, input_: str): diff --git a/tests/orchestrator/test_sub_orchestrator_with_retry.py b/tests/orchestrator/test_sub_orchestrator_with_retry.py index 95b79811..1247a342 100644 --- a/tests/orchestrator/test_sub_orchestrator_with_retry.py +++ b/tests/orchestrator/test_sub_orchestrator_with_retry.py @@ -1,3 +1,4 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema from .orchestrator_test_utils \ import assert_orchestration_state_equals, get_orchestration_state_result, assert_valid_schema from tests.test_utils.ContextBuilder import ContextBuilder @@ -22,8 +23,8 @@ def generator_function(context): return outputs -def base_expected_state(output=None) -> OrchestratorState: - return OrchestratorState(is_done=False, actions=[], output=output) +def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: + return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value) def add_hello_suborch_action(state: OrchestratorState, input_: str): diff --git a/tests/tasks/test_task_any.py b/tests/tasks/test_task_any.py index d80515e7..b55abd7c 100644 --- a/tests/tasks/test_task_any.py +++ b/tests/tasks/test_task_any.py @@ -1,3 +1,4 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema from datetime import datetime, date import json from azure.durable_functions.models import Task, TaskSet @@ -23,7 +24,7 @@ def test_has_completed_task(): task3 = Task(is_completed=True, is_faulted=False, action=all_actions[2],timestamp=date(2000,1,1)) tasks = [task1, task2, task3] - returned_taskset = task_any(tasks) + returned_taskset = task_any(tasks, replay_schema=ReplaySchema.V1) expected_taskset = TaskSet(is_completed=True, actions=all_actions, result=task3, timestamp=date(2000,1,1)) assert_taskset_equal(expected_taskset, returned_taskset) @@ -35,7 +36,7 @@ def test_has_no_completed_task(): task3 = Task(is_completed=False, is_faulted=False, action=all_actions[2],timestamp=date(2000,1,1)) tasks = [task1, task2, task3] - returned_taskset = task_any(tasks) + returned_taskset = task_any(tasks, replay_schema=ReplaySchema.V1) expected_taskset = TaskSet(is_completed=False, actions=all_actions, result=None) assert_taskset_equal(expected_taskset, returned_taskset) @@ -47,7 +48,7 @@ def test_all_faulted_task_should_fail(): task3 = Task(is_completed=False, is_faulted=True, action=all_actions[2], timestamp=date(2000,1,1), exc=Exception("test failure")) tasks = [task1, task2, task3] - returned_taskset = task_any(tasks) + returned_taskset = task_any(tasks, replay_schema=ReplaySchema.V1) error_messages = [Exception("test failure") for _ in range(3)] expected_exception = Exception(f"All tasks have failed, errors messages in all tasks:{error_messages}") expected_taskset = TaskSet(is_completed=True, actions=all_actions, result=None, is_faulted=True, exception=expected_exception) @@ -60,7 +61,7 @@ def test_one_faulted_task_should_still_proceed(): task3 = Task(is_completed=False, is_faulted=False, action=all_actions[2],timestamp=date(2000,1,1)) tasks = [task1, task2, task3] - returned_taskset = task_any(tasks) + returned_taskset = task_any(tasks, replay_schema=ReplaySchema.V1) expected_taskset = TaskSet(is_completed=False, actions=all_actions, result=None) assert_taskset_equal(expected_taskset, returned_taskset) @@ -72,7 +73,7 @@ def test_taskset_and_tasks_as_args(): result=[None, None], timestamp=date(2000,1,1)) tasks = [task1, task2] - returned_taskset = task_any(tasks) + returned_taskset = task_any(tasks, replay_schema=ReplaySchema.V1) expected_taskset = TaskSet(is_completed=True, actions=all_actions, result=task2, timestamp=date(2000,1,1)) assert_taskset_equal(expected_taskset, returned_taskset) diff --git a/tests/test_utils/ContextBuilder.py b/tests/test_utils/ContextBuilder.py index 9bd04f5b..1ec218fe 100644 --- a/tests/test_utils/ContextBuilder.py +++ b/tests/test_utils/ContextBuilder.py @@ -1,3 +1,4 @@ +from azure.durable_functions.models.ReplaySchema import ReplaySchema import uuid import json from datetime import datetime, timedelta @@ -13,7 +14,7 @@ class ContextBuilder: - def __init__(self, name: str="", increase_time: bool = True, starting_time: Optional[datetime] = None): + def __init__(self, name: str="", increase_time: bool = True, starting_time: Optional[datetime] = None, replay_schema: ReplaySchema = ReplaySchema.V1): self.increase_time = increase_time self.instance_id = uuid.uuid4() self.is_replaying: bool = False @@ -24,6 +25,7 @@ def __init__(self, name: str="", increase_time: bool = True, starting_time: Opti if starting_time is None: starting_time = datetime.now() self.current_datetime: datetime = starting_time + self.upperSchemaVersion = replay_schema.value self.add_orchestrator_started_event() self.add_execution_started_event(name) @@ -139,6 +141,7 @@ def to_json(self, **kwargs) -> Dict[str, Any]: add_attrib(json_dict, self, 'parent_instance_id', 'parentInstanceId') add_attrib(json_dict, self, 'is_replaying', 'isReplaying') add_attrib(json_dict, self, 'input_', "input") + add_attrib(json_dict, self, 'upperSchemaVersion', "upperSchemaVersion") history_list_as_dict = self.get_history_list_as_dict() json_dict['history'] = history_list_as_dict