Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 32 additions & 7 deletions azure/durable_functions/models/Task.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ def __init__(self, tasks: List[TaskBase], compound_action_constructor=None):
self.completed_tasks: List[TaskBase] = []
self.children = tasks

if len(self.children) == 0:
self.state = TaskState.SUCCEEDED

def handle_completion(self, child: TaskBase):
"""Manage sub-task completion events.

Expand Down Expand Up @@ -238,7 +241,7 @@ def try_set_value(self, child: TaskBase):
# A WhenAll Task only completes when it has no pending tasks
# i.e _when all_ of its children have completed
if len(self.pending_tasks) == 0:
results = list(map(lambda x: x.result, self.completed_tasks))
results = list(map(lambda x: x.result, self.children))
self.set_value(is_error=False, value=results)
else: # child.state is TaskState.FAILED:
# a single error is sufficient to fail this task
Expand Down Expand Up @@ -287,14 +290,28 @@ class RetryAbleTask(WhenAllTask):
"""

def __init__(self, child: TaskBase, retry_options: RetryOptions, context):
self.id_ = str(child.id) + "_retryable_proxy"
tasks = [child]
super().__init__(tasks, context._replay_schema)

self.retry_options = retry_options
self.num_attempts = 1
self.context = context
self.actions = child.action_repr
self.is_waiting_on_timer = False

@property
def id_(self):
"""Obtain the task's ID.

Since this is an internal-only abstraction, the task ID is represented
by the ID of its inner/wrapped task _plus_ a suffix: "_retryable_proxy"

Returns
-------
[type]
[description]
"""
return str(list(map(lambda x: x.id, self.children))) + "_retryable_proxy"

def try_set_value(self, child: TaskBase):
"""Transition a Retryable Task to a terminal state and set its value.
Expand All @@ -304,6 +321,14 @@ def try_set_value(self, child: TaskBase):
child : TaskBase
A sub-task that just completed
"""
if self.is_waiting_on_timer:
# timer fired, re-scheduling original task
self.is_waiting_on_timer = False
rescheduled_task = self.context._generate_task(
action=NoOpAction("rescheduled task"), parent=self)
self.pending_tasks.add(rescheduled_task)
self.context._add_to_open_tasks(rescheduled_task)
return
if child.state is TaskState.SUCCEEDED:
if len(self.pending_tasks) == 0:
# if all pending tasks have completed,
Expand All @@ -318,11 +343,11 @@ def try_set_value(self, child: TaskBase):
else:
# still have some retries left.
# increase size of pending tasks by adding a timer task
# and then re-scheduling the current task after that
timer_task = self.context._generate_task(action=NoOpAction(), parent=self)
# when it completes, we'll retry the original task
timer_task = self.context._generate_task(
action=NoOpAction("-WithRetry timer"), parent=self)
self.pending_tasks.add(timer_task)
self.context._add_to_open_tasks(timer_task)
rescheduled_task = self.context._generate_task(action=NoOpAction(), parent=self)
self.pending_tasks.add(rescheduled_task)
self.context._add_to_open_tasks(rescheduled_task)
self.is_waiting_on_timer = True

self.num_attempts += 1
17 changes: 16 additions & 1 deletion azure/durable_functions/models/actions/NoOpAction.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
from azure.durable_functions.models.actions.Action import Action
from typing import Any, Dict
from typing import Any, Dict, Optional


class NoOpAction(Action):
"""A no-op action, for anonymous tasks only."""

def __init__(self, metadata: Optional[str] = None):
"""Create a NoOpAction object.
This is an internal-only action class used to represent cases when intermediate
tasks are used to implement some API. For example, in -WithRetry APIs, intermediate
timers are created. We create this NoOp action to track those the backing actions
of those tasks, which is necessary because we mimic the DF-internal replay algorithm.
Parameters
----------
metadata : Optional[str]
Used for internal debugging: metadata about the action being represented.
"""
self.metadata = metadata

def action_type(self) -> int:
"""Get the type of action this class represents."""
raise Exception("Attempted to get action type of an anonymous Action")
Expand Down
211 changes: 207 additions & 4 deletions tests/orchestrator/test_sequential_orchestrator_with_retry.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import List, Union
from azure.durable_functions.models.ReplaySchema import ReplaySchema
from .orchestrator_test_utils \
import get_orchestration_state_result, assert_orchestration_state_equals, assert_valid_schema
Expand Down Expand Up @@ -28,16 +29,63 @@ def generator_function(context):

return outputs

def generator_function_concurrent_retries(context):
outputs = []

retry_options = RETRY_OPTIONS
task1 = context.call_activity_with_retry(
"Hello", retry_options, "Tokyo")
task2 = context.call_activity_with_retry(
"Hello", retry_options, "Seattle")
task3 = context.call_activity_with_retry(
"Hello", retry_options, "London")

outputs = yield context.task_all([task1, task2, task3])

return outputs

def generator_function_two_concurrent_retries_when_all(context):
outputs = []

retry_options = RETRY_OPTIONS
task1 = context.call_activity_with_retry(
"Hello", retry_options, "Tokyo")
task2 = context.call_activity_with_retry(
"Hello", retry_options, "Seattle")

outputs = yield context.task_all([task1, task2])

return outputs

def generator_function_two_concurrent_retries_when_any(context):
outputs = []

retry_options = RETRY_OPTIONS
task1 = context.call_activity_with_retry(
"Hello", retry_options, "Tokyo")
task2 = context.call_activity_with_retry(
"Hello", retry_options, "Seattle")

outputs = yield context.task_any([task1, task2])

return outputs.result


def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState:
return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value)


def add_hello_action(state: OrchestratorState, input_: str):
def add_hello_action(state: OrchestratorState, input_: Union[List[str], str]):
retry_options = RETRY_OPTIONS
action = CallActivityWithRetryAction(
function_name='Hello', retry_options=retry_options, input_=input_)
state._actions.append([action])
actions = []
inputs = input_
if not isinstance(input_, list):
inputs = [input_]
for input_ in inputs:
action = CallActivityWithRetryAction(
function_name='Hello', retry_options=retry_options, input_=input_)
actions.append(action)
state._actions.append(actions)


def add_hello_failed_events(
Expand All @@ -63,6 +111,45 @@ def add_retry_timer_events(context_builder: ContextBuilder, id_: int):
context_builder.add_orchestrator_started_event()
context_builder.add_timer_fired_event(id_=id_, fire_at=fire_at)

def add_two_retriable_events_completing_out_of_order(context_builder: ContextBuilder,
failed_reason, failed_details):
## Schedule tasks
context_builder.add_task_scheduled_event(name='Hello', id_=0) # Tokyo task
context_builder.add_task_scheduled_event(name='Hello', id_=1) # Seattle task

context_builder.add_orchestrator_completed_event()
context_builder.add_orchestrator_started_event()

## Task failures and timer-scheduling

# tasks fail "out of order"
context_builder.add_task_failed_event(
id_=1, reason=failed_reason, details=failed_details) # Seattle task
fire_at_1 = context_builder.add_timer_created_event(2) # Seattle timer

context_builder.add_orchestrator_completed_event()
context_builder.add_orchestrator_started_event()

context_builder.add_task_failed_event(
id_=0, reason=failed_reason, details=failed_details) # Tokyo task
fire_at_2 = context_builder.add_timer_created_event(3) # Tokyo timer

context_builder.add_orchestrator_completed_event()
context_builder.add_orchestrator_started_event()

## fire timers
context_builder.add_timer_fired_event(id_=2, fire_at=fire_at_1) # Seattle timer
context_builder.add_timer_fired_event(id_=3, fire_at=fire_at_2) # Tokyo timer

## Complete events
context_builder.add_task_scheduled_event(name='Hello', id_=4) # Seattle task
context_builder.add_task_scheduled_event(name='Hello', id_=5) # Tokyo task

context_builder.add_orchestrator_completed_event()
context_builder.add_orchestrator_started_event()
context_builder.add_task_completed_event(id_=4, result="\"Hello Seattle!\"")
context_builder.add_task_completed_event(id_=5, result="\"Hello Tokyo!\"")


def test_initial_orchestration_state():
context_builder = ContextBuilder('test_simple_function')
Expand Down Expand Up @@ -217,3 +304,119 @@ def test_failed_tokyo_hit_max_attempts():

expected_error_str = f"{error_msg}{error_label}{state_str}"
assert expected_error_str == error_str

def test_concurrent_retriable_results():
failed_reason = 'Reasons'
failed_details = 'Stuff and Things'
context_builder = ContextBuilder('test_concurrent_retriable')
add_hello_failed_events(context_builder, 0, failed_reason, failed_details)
add_hello_failed_events(context_builder, 1, failed_reason, failed_details)
add_hello_failed_events(context_builder, 2, failed_reason, failed_details)
add_retry_timer_events(context_builder, 3)
add_retry_timer_events(context_builder, 4)
add_retry_timer_events(context_builder, 5)
add_hello_completed_events(context_builder, 6, "\"Hello Tokyo!\"")
add_hello_completed_events(context_builder, 7, "\"Hello Seattle!\"")
add_hello_completed_events(context_builder, 8, "\"Hello London!\"")

result = get_orchestration_state_result(
context_builder, generator_function_concurrent_retries)

expected_state = base_expected_state()
add_hello_action(expected_state, ['Tokyo', 'Seattle', 'London'])
expected_state._output = ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_concurrent_retriable_results_unordered_arrival():
failed_reason = 'Reasons'
failed_details = 'Stuff and Things'
context_builder = ContextBuilder('test_concurrent_retriable_unordered_results')
add_hello_failed_events(context_builder, 0, failed_reason, failed_details)
add_hello_failed_events(context_builder, 1, failed_reason, failed_details)
add_hello_failed_events(context_builder, 2, failed_reason, failed_details)
add_retry_timer_events(context_builder, 3)
add_retry_timer_events(context_builder, 4)
add_retry_timer_events(context_builder, 5)
# events arrive in non-sequential different order
add_hello_completed_events(context_builder, 8, "\"Hello London!\"")
add_hello_completed_events(context_builder, 6, "\"Hello Tokyo!\"")
add_hello_completed_events(context_builder, 7, "\"Hello Seattle!\"")

result = get_orchestration_state_result(
context_builder, generator_function_concurrent_retries)

expected_state = base_expected_state()
add_hello_action(expected_state, ['Tokyo', 'Seattle', 'London'])
expected_state._output = ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_concurrent_retriable_results_mixed_arrival():
failed_reason = 'Reasons'
failed_details = 'Stuff and Things'
context_builder = ContextBuilder('test_concurrent_retriable_unordered_results')
# one task succeeds, the other two fail at first, and succeed on retry
add_hello_failed_events(context_builder, 1, failed_reason, failed_details)
add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"")
add_hello_failed_events(context_builder, 2, failed_reason, failed_details)
add_retry_timer_events(context_builder, 3)
add_retry_timer_events(context_builder, 4)
add_hello_completed_events(context_builder, 6, "\"Hello London!\"")
add_hello_completed_events(context_builder, 5, "\"Hello Seattle!\"")

result = get_orchestration_state_result(
context_builder, generator_function_concurrent_retries)

expected_state = base_expected_state()
add_hello_action(expected_state, ['Tokyo', 'Seattle', 'London'])
expected_state._output = ["Hello Tokyo!", "Hello Seattle!", "Hello London!"]
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_concurrent_retriable_results_alternating_taskIDs_when_all():
failed_reason = 'Reasons'
failed_details = 'Stuff and Things'
context_builder = ContextBuilder('test_concurrent_retriable_unordered_results')

add_two_retriable_events_completing_out_of_order(context_builder, failed_reason, failed_details)

result = get_orchestration_state_result(
context_builder, generator_function_two_concurrent_retries_when_all)

expected_state = base_expected_state()
add_hello_action(expected_state, ['Tokyo', 'Seattle'])
expected_state._output = ["Hello Tokyo!", "Hello Seattle!"]
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_concurrent_retriable_results_alternating_taskIDs_when_any():
failed_reason = 'Reasons'
failed_details = 'Stuff and Things'
context_builder = ContextBuilder('test_concurrent_retriable_unordered_results')

add_two_retriable_events_completing_out_of_order(context_builder, failed_reason, failed_details)

result = get_orchestration_state_result(
context_builder, generator_function_two_concurrent_retries_when_any)

expected_state = base_expected_state()
add_hello_action(expected_state, ['Tokyo', 'Seattle'])
expected_state._output = "Hello Seattle!"
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)
13 changes: 13 additions & 0 deletions tests/orchestrator/test_task_any.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ def generator_function(context):
except:
return "exception"

def generator_function_no_activity(context):
yield context.task_any([])
return "Done!"

def test_continues_on_zero_inner_tasks():
context_builder = ContextBuilder()
result = get_orchestration_state_result(
context_builder, generator_function_no_activity)
expected_state = base_expected_state("Done!")
expected_state._is_done = True
expected = expected_state.to_json()
assert_orchestration_state_equals(expected, result)

def test_continues_on_zero_results():
context_builder = ContextBuilder()
result = get_orchestration_state_result(
Expand Down