diff --git a/azure/durable_functions/models/Task.py b/azure/durable_functions/models/Task.py index 5749d798..08f2974b 100644 --- a/azure/durable_functions/models/Task.py +++ b/azure/durable_functions/models/Task.py @@ -159,6 +159,9 @@ def __init__(self, tasks: List[TaskBase], compound_action_constructor=None): self.completed_tasks: List[TaskBase] = [] self.children = tasks + if len(self.children) == 0: + self.state = TaskState.SUCCEEDED + def handle_completion(self, child: TaskBase): """Manage sub-task completion events. @@ -238,7 +241,7 @@ def try_set_value(self, child: TaskBase): # A WhenAll Task only completes when it has no pending tasks # i.e _when all_ of its children have completed if len(self.pending_tasks) == 0: - results = list(map(lambda x: x.result, self.completed_tasks)) + results = list(map(lambda x: x.result, self.children)) self.set_value(is_error=False, value=results) else: # child.state is TaskState.FAILED: # a single error is sufficient to fail this task @@ -287,7 +290,6 @@ class RetryAbleTask(WhenAllTask): """ def __init__(self, child: TaskBase, retry_options: RetryOptions, context): - self.id_ = str(child.id) + "_retryable_proxy" tasks = [child] super().__init__(tasks, context._replay_schema) @@ -295,6 +297,21 @@ def __init__(self, child: TaskBase, retry_options: RetryOptions, context): self.num_attempts = 1 self.context = context self.actions = child.action_repr + self.is_waiting_on_timer = False + + @property + def id_(self): + """Obtain the task's ID. + + Since this is an internal-only abstraction, the task ID is represented + by the ID of its inner/wrapped task _plus_ a suffix: "_retryable_proxy" + + Returns + ------- + [type] + [description] + """ + return str(list(map(lambda x: x.id, self.children))) + "_retryable_proxy" def try_set_value(self, child: TaskBase): """Transition a Retryable Task to a terminal state and set its value. @@ -304,6 +321,14 @@ def try_set_value(self, child: TaskBase): child : TaskBase A sub-task that just completed """ + if self.is_waiting_on_timer: + # timer fired, re-scheduling original task + self.is_waiting_on_timer = False + rescheduled_task = self.context._generate_task( + action=NoOpAction("rescheduled task"), parent=self) + self.pending_tasks.add(rescheduled_task) + self.context._add_to_open_tasks(rescheduled_task) + return if child.state is TaskState.SUCCEEDED: if len(self.pending_tasks) == 0: # if all pending tasks have completed, @@ -318,11 +343,11 @@ def try_set_value(self, child: TaskBase): else: # still have some retries left. # increase size of pending tasks by adding a timer task - # and then re-scheduling the current task after that - timer_task = self.context._generate_task(action=NoOpAction(), parent=self) + # when it completes, we'll retry the original task + timer_task = self.context._generate_task( + action=NoOpAction("-WithRetry timer"), parent=self) self.pending_tasks.add(timer_task) self.context._add_to_open_tasks(timer_task) - rescheduled_task = self.context._generate_task(action=NoOpAction(), parent=self) - self.pending_tasks.add(rescheduled_task) - self.context._add_to_open_tasks(rescheduled_task) + self.is_waiting_on_timer = True + self.num_attempts += 1 diff --git a/azure/durable_functions/models/actions/NoOpAction.py b/azure/durable_functions/models/actions/NoOpAction.py index b59475e2..dec5f71d 100644 --- a/azure/durable_functions/models/actions/NoOpAction.py +++ b/azure/durable_functions/models/actions/NoOpAction.py @@ -1,10 +1,25 @@ from azure.durable_functions.models.actions.Action import Action -from typing import Any, Dict +from typing import Any, Dict, Optional class NoOpAction(Action): """A no-op action, for anonymous tasks only.""" + def __init__(self, metadata: Optional[str] = None): + """Create a NoOpAction object. + + This is an internal-only action class used to represent cases when intermediate + tasks are used to implement some API. For example, in -WithRetry APIs, intermediate + timers are created. We create this NoOp action to track those the backing actions + of those tasks, which is necessary because we mimic the DF-internal replay algorithm. + + Parameters + ---------- + metadata : Optional[str] + Used for internal debugging: metadata about the action being represented. + """ + self.metadata = metadata + def action_type(self) -> int: """Get the type of action this class represents.""" raise Exception("Attempted to get action type of an anonymous Action") diff --git a/tests/orchestrator/test_sequential_orchestrator_with_retry.py b/tests/orchestrator/test_sequential_orchestrator_with_retry.py index fa923463..edcb999c 100644 --- a/tests/orchestrator/test_sequential_orchestrator_with_retry.py +++ b/tests/orchestrator/test_sequential_orchestrator_with_retry.py @@ -1,3 +1,4 @@ +from typing import List, Union from azure.durable_functions.models.ReplaySchema import ReplaySchema from .orchestrator_test_utils \ import get_orchestration_state_result, assert_orchestration_state_equals, assert_valid_schema @@ -28,16 +29,63 @@ def generator_function(context): return outputs +def generator_function_concurrent_retries(context): + outputs = [] + + retry_options = RETRY_OPTIONS + task1 = context.call_activity_with_retry( + "Hello", retry_options, "Tokyo") + task2 = context.call_activity_with_retry( + "Hello", retry_options, "Seattle") + task3 = context.call_activity_with_retry( + "Hello", retry_options, "London") + + outputs = yield context.task_all([task1, task2, task3]) + + return outputs + +def generator_function_two_concurrent_retries_when_all(context): + outputs = [] + + retry_options = RETRY_OPTIONS + task1 = context.call_activity_with_retry( + "Hello", retry_options, "Tokyo") + task2 = context.call_activity_with_retry( + "Hello", retry_options, "Seattle") + + outputs = yield context.task_all([task1, task2]) + + return outputs + +def generator_function_two_concurrent_retries_when_any(context): + outputs = [] + + retry_options = RETRY_OPTIONS + task1 = context.call_activity_with_retry( + "Hello", retry_options, "Tokyo") + task2 = context.call_activity_with_retry( + "Hello", retry_options, "Seattle") + + outputs = yield context.task_any([task1, task2]) + + return outputs.result + def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState: return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value) -def add_hello_action(state: OrchestratorState, input_: str): +def add_hello_action(state: OrchestratorState, input_: Union[List[str], str]): retry_options = RETRY_OPTIONS - action = CallActivityWithRetryAction( - function_name='Hello', retry_options=retry_options, input_=input_) - state._actions.append([action]) + actions = [] + inputs = input_ + if not isinstance(input_, list): + inputs = [input_] + for input_ in inputs: + action = CallActivityWithRetryAction( + function_name='Hello', retry_options=retry_options, input_=input_) + actions.append(action) + state._actions.append(actions) def add_hello_failed_events( @@ -63,6 +111,45 @@ def add_retry_timer_events(context_builder: ContextBuilder, id_: int): context_builder.add_orchestrator_started_event() context_builder.add_timer_fired_event(id_=id_, fire_at=fire_at) +def add_two_retriable_events_completing_out_of_order(context_builder: ContextBuilder, + failed_reason, failed_details): + ## Schedule tasks + context_builder.add_task_scheduled_event(name='Hello', id_=0) # Tokyo task + context_builder.add_task_scheduled_event(name='Hello', id_=1) # Seattle task + + context_builder.add_orchestrator_completed_event() + context_builder.add_orchestrator_started_event() + + ## Task failures and timer-scheduling + + # tasks fail "out of order" + context_builder.add_task_failed_event( + id_=1, reason=failed_reason, details=failed_details) # Seattle task + fire_at_1 = context_builder.add_timer_created_event(2) # Seattle timer + + context_builder.add_orchestrator_completed_event() + context_builder.add_orchestrator_started_event() + + context_builder.add_task_failed_event( + id_=0, reason=failed_reason, details=failed_details) # Tokyo task + fire_at_2 = context_builder.add_timer_created_event(3) # Tokyo timer + + context_builder.add_orchestrator_completed_event() + context_builder.add_orchestrator_started_event() + + ## fire timers + context_builder.add_timer_fired_event(id_=2, fire_at=fire_at_1) # Seattle timer + context_builder.add_timer_fired_event(id_=3, fire_at=fire_at_2) # Tokyo timer + + ## Complete events + context_builder.add_task_scheduled_event(name='Hello', id_=4) # Seattle task + context_builder.add_task_scheduled_event(name='Hello', id_=5) # Tokyo task + + context_builder.add_orchestrator_completed_event() + context_builder.add_orchestrator_started_event() + context_builder.add_task_completed_event(id_=4, result="\"Hello Seattle!\"") + context_builder.add_task_completed_event(id_=5, result="\"Hello Tokyo!\"") + def test_initial_orchestration_state(): context_builder = ContextBuilder('test_simple_function') @@ -217,3 +304,119 @@ def test_failed_tokyo_hit_max_attempts(): expected_error_str = f"{error_msg}{error_label}{state_str}" assert expected_error_str == error_str + +def test_concurrent_retriable_results(): + failed_reason = 'Reasons' + failed_details = 'Stuff and Things' + context_builder = ContextBuilder('test_concurrent_retriable') + add_hello_failed_events(context_builder, 0, failed_reason, failed_details) + add_hello_failed_events(context_builder, 1, failed_reason, failed_details) + add_hello_failed_events(context_builder, 2, failed_reason, failed_details) + add_retry_timer_events(context_builder, 3) + add_retry_timer_events(context_builder, 4) + add_retry_timer_events(context_builder, 5) + add_hello_completed_events(context_builder, 6, "\"Hello Tokyo!\"") + add_hello_completed_events(context_builder, 7, "\"Hello Seattle!\"") + add_hello_completed_events(context_builder, 8, "\"Hello London!\"") + + result = get_orchestration_state_result( + context_builder, generator_function_concurrent_retries) + + expected_state = base_expected_state() + add_hello_action(expected_state, ['Tokyo', 'Seattle', 'London']) + expected_state._output = ["Hello Tokyo!", "Hello Seattle!", "Hello London!"] + expected_state._is_done = True + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + +def test_concurrent_retriable_results_unordered_arrival(): + failed_reason = 'Reasons' + failed_details = 'Stuff and Things' + context_builder = ContextBuilder('test_concurrent_retriable_unordered_results') + add_hello_failed_events(context_builder, 0, failed_reason, failed_details) + add_hello_failed_events(context_builder, 1, failed_reason, failed_details) + add_hello_failed_events(context_builder, 2, failed_reason, failed_details) + add_retry_timer_events(context_builder, 3) + add_retry_timer_events(context_builder, 4) + add_retry_timer_events(context_builder, 5) + # events arrive in non-sequential different order + add_hello_completed_events(context_builder, 8, "\"Hello London!\"") + add_hello_completed_events(context_builder, 6, "\"Hello Tokyo!\"") + add_hello_completed_events(context_builder, 7, "\"Hello Seattle!\"") + + result = get_orchestration_state_result( + context_builder, generator_function_concurrent_retries) + + expected_state = base_expected_state() + add_hello_action(expected_state, ['Tokyo', 'Seattle', 'London']) + expected_state._output = ["Hello Tokyo!", "Hello Seattle!", "Hello London!"] + expected_state._is_done = True + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + +def test_concurrent_retriable_results_mixed_arrival(): + failed_reason = 'Reasons' + failed_details = 'Stuff and Things' + context_builder = ContextBuilder('test_concurrent_retriable_unordered_results') + # one task succeeds, the other two fail at first, and succeed on retry + add_hello_failed_events(context_builder, 1, failed_reason, failed_details) + add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"") + add_hello_failed_events(context_builder, 2, failed_reason, failed_details) + add_retry_timer_events(context_builder, 3) + add_retry_timer_events(context_builder, 4) + add_hello_completed_events(context_builder, 6, "\"Hello London!\"") + add_hello_completed_events(context_builder, 5, "\"Hello Seattle!\"") + + result = get_orchestration_state_result( + context_builder, generator_function_concurrent_retries) + + expected_state = base_expected_state() + add_hello_action(expected_state, ['Tokyo', 'Seattle', 'London']) + expected_state._output = ["Hello Tokyo!", "Hello Seattle!", "Hello London!"] + expected_state._is_done = True + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + +def test_concurrent_retriable_results_alternating_taskIDs_when_all(): + failed_reason = 'Reasons' + failed_details = 'Stuff and Things' + context_builder = ContextBuilder('test_concurrent_retriable_unordered_results') + + add_two_retriable_events_completing_out_of_order(context_builder, failed_reason, failed_details) + + result = get_orchestration_state_result( + context_builder, generator_function_two_concurrent_retries_when_all) + + expected_state = base_expected_state() + add_hello_action(expected_state, ['Tokyo', 'Seattle']) + expected_state._output = ["Hello Tokyo!", "Hello Seattle!"] + expected_state._is_done = True + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) + +def test_concurrent_retriable_results_alternating_taskIDs_when_any(): + failed_reason = 'Reasons' + failed_details = 'Stuff and Things' + context_builder = ContextBuilder('test_concurrent_retriable_unordered_results') + + add_two_retriable_events_completing_out_of_order(context_builder, failed_reason, failed_details) + + result = get_orchestration_state_result( + context_builder, generator_function_two_concurrent_retries_when_any) + + expected_state = base_expected_state() + add_hello_action(expected_state, ['Tokyo', 'Seattle']) + expected_state._output = "Hello Seattle!" + expected_state._is_done = True + expected = expected_state.to_json() + + assert_valid_schema(result) + assert_orchestration_state_equals(expected, result) \ No newline at end of file diff --git a/tests/orchestrator/test_task_any.py b/tests/orchestrator/test_task_any.py index 57f49a28..f91fdbfc 100644 --- a/tests/orchestrator/test_task_any.py +++ b/tests/orchestrator/test_task_any.py @@ -14,6 +14,19 @@ def generator_function(context): except: return "exception" +def generator_function_no_activity(context): + yield context.task_any([]) + return "Done!" + +def test_continues_on_zero_inner_tasks(): + context_builder = ContextBuilder() + result = get_orchestration_state_result( + context_builder, generator_function_no_activity) + expected_state = base_expected_state("Done!") + expected_state._is_done = True + expected = expected_state.to_json() + assert_orchestration_state_equals(expected, result) + def test_continues_on_zero_results(): context_builder = ContextBuilder() result = get_orchestration_state_result(