Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions azure/durable_functions/models/DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from azure.durable_functions.models.ReplaySchema import ReplaySchema
import json
import datetime
import inspect
Expand Down Expand Up @@ -30,7 +31,7 @@ class DurableOrchestrationContext:
# parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions
def __init__(self,
history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool,
parentInstanceId: str, input: Any = None, **kwargs):
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs):
self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history]
self._instance_id: str = instanceId
self._is_replaying: bool = isReplaying
Expand All @@ -45,8 +46,11 @@ def __init__(self,
self._current_utc_datetime: datetime.datetime = \
self.decision_started_event.timestamp
self._new_uuid_counter = 0
self.actions: List[List[Action]] = []
self._function_context: FunctionContext = FunctionContext(**kwargs)
self._replay_schema = ReplaySchema(upperSchemaVersion)
self.actions: List[List[Action]] = []
if self._replay_schema == ReplaySchema.V2:
self.actions.append([])

# make _input always a string
# (consistent with Python Functions generic trigger/input bindings)
Expand Down Expand Up @@ -240,7 +244,7 @@ def task_all(self, activities: List[Task]) -> TaskSet:
TaskSet
The results of all activities.
"""
return task_all(tasks=activities)
return task_all(tasks=activities, replay_schema=self._replay_schema)

def task_any(self, activities: List[Task]) -> TaskSet:
"""Schedule the execution of all activities.
Expand All @@ -260,7 +264,7 @@ def task_any(self, activities: List[Task]) -> TaskSet:
TaskSet
The first [[Task]] instance to complete.
"""
return task_any(tasks=activities)
return task_any(tasks=activities, replay_schema=self._replay_schema)

def set_custom_status(self, status: Any):
"""Set the customized orchestration status for your orchestrator function.
Expand Down
11 changes: 11 additions & 0 deletions azure/durable_functions/models/OrchestratorState.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
from typing import List, Any, Dict, Optional, Union

from azure.durable_functions.models.ReplaySchema import ReplaySchema

from .utils.json_utils import add_attrib
from azure.durable_functions.models.actions.Action import Action

Expand All @@ -16,13 +18,15 @@ def __init__(self,
is_done: bool,
actions: List[List[Action]],
output: Any,
replay_schema: ReplaySchema,
error: str = None,
custom_status: Any = None):
self._is_done: bool = is_done
self._actions: List[List[Action]] = actions
self._output: Any = output
self._error: Optional[str] = error
self._custom_status: Any = custom_status
self._replay_schema: ReplaySchema = replay_schema

@property
def actions(self) -> List[List[Action]]:
Expand Down Expand Up @@ -66,6 +70,11 @@ def custom_status(self):
"""Get the JSON-serializable value used by DurableOrchestrationContext.SetCustomStatus."""
return self._custom_status

@property
def schema_version(self):
"""Get the Replay Schema represented in this OrchestratorState payload."""
return self._replay_schema.value

def to_json(self) -> Dict[str, Union[str, int]]:
"""Convert object into a json dictionary.

Expand All @@ -76,6 +85,8 @@ def to_json(self) -> Dict[str, Union[str, int]]:
"""
json_dict: Dict[str, Union[str, int]] = {}
add_attrib(json_dict, self, '_is_done', 'isDone')
if self._replay_schema != ReplaySchema.V1:
add_attrib(json_dict, self, 'schema_version', 'schemaVersion')
self._add_actions(json_dict)
if not (self._output is None):
json_dict['output'] = self._output
Expand Down
8 changes: 8 additions & 0 deletions azure/durable_functions/models/ReplaySchema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from enum import Enum


class ReplaySchema(Enum):
"""Enum representing the ReplaySchemas supported by this SDK version."""

V1 = 0
V2 = 1
2 changes: 2 additions & 0 deletions azure/durable_functions/models/actions/ActionType.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ class ActionType(IntEnum):
CALL_ENTITY = 7
CALL_HTTP: int = 8
SIGNAL_ENTITY: int = 9
WHEN_ANY = 11
WHEN_ALL = 12
35 changes: 35 additions & 0 deletions azure/durable_functions/models/actions/CompoundAction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import Dict, Union

from .Action import Action
from ..utils.json_utils import add_attrib
from typing import List
from abc import abstractmethod


class CompoundAction(Action):
"""Defines the structure of the WhenAll Action object.

Provides the information needed by the durable extension to be able to invoke WhenAll tasks.
"""

def __init__(self, compoundTasks: List[Action]):
self.compound_actions = list(map(lambda x: x.to_json(), compoundTasks))

@property
@abstractmethod
def action_type(self) -> int:
"""Get this object's action type as an integer."""
...

def to_json(self) -> Dict[str, Union[str, int]]:
"""Convert object into a json dictionary.

Returns
-------
Dict[str, Union[str, int]]
The instance of the class converted into a json dictionary
"""
json_dict: Dict[str, Union[str, int]] = {}
add_attrib(json_dict, self, 'action_type', 'actionType')
add_attrib(json_dict, self, 'compound_actions', 'compoundActions')
return json_dict
14 changes: 14 additions & 0 deletions azure/durable_functions/models/actions/WhenAllAction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from .ActionType import ActionType
from azure.durable_functions.models.actions.CompoundAction import CompoundAction


class WhenAllAction(CompoundAction):
"""Defines the structure of the WhenAll Action object.

Provides the information needed by the durable extension to be able to invoke WhenAll tasks.
"""

@property
def action_type(self) -> int:
"""Get the type of action this class represents."""
return ActionType.WHEN_ALL
14 changes: 14 additions & 0 deletions azure/durable_functions/models/actions/WhenAnyAction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from azure.durable_functions.models.actions.CompoundAction import CompoundAction
from .ActionType import ActionType


class WhenAnyAction(CompoundAction):
"""Defines the structure of the WhenAll Action object.

Provides the information needed by the durable extension to be able to invoke WhenAll tasks.
"""

@property
def action_type(self) -> int:
"""Get the type of action this class represents."""
return ActionType.WHEN_ANY
23 changes: 17 additions & 6 deletions azure/durable_functions/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
"""
from typing import Callable, Iterator, Any, Generator

from azure.durable_functions.models.ReplaySchema import ReplaySchema

from .models import (
DurableOrchestrationContext,
Task,
Expand Down Expand Up @@ -55,6 +57,7 @@ def handle(self, context: DurableOrchestrationContext):
# `fn_output` is the return value instead of a generator
if not isinstance(fn_output, Iterator):
orchestration_state = OrchestratorState(
replay_schema=self.durable_context._replay_schema,
is_done=True,
output=fn_output,
actions=self.durable_context.actions,
Expand All @@ -75,6 +78,7 @@ def handle(self, context: DurableOrchestrationContext):
# `will_continue_as_new` essentially "tracks"
# whether or not the orchestration is done.
orchestration_state = OrchestratorState(
replay_schema=self.durable_context._replay_schema,
is_done=self.durable_context.will_continue_as_new,
output=None,
actions=self.durable_context.actions,
Expand All @@ -95,13 +99,15 @@ def handle(self, context: DurableOrchestrationContext):

except StopIteration as sie:
orchestration_state = OrchestratorState(
replay_schema=self.durable_context._replay_schema,
is_done=True,
output=sie.value,
actions=self.durable_context.actions,
custom_status=self.durable_context.custom_status)
except Exception as e:
exception_str = str(e)
orchestration_state = OrchestratorState(
replay_schema=self.durable_context._replay_schema,
is_done=False,
output=None, # Should have no output, after generation range
actions=self.durable_context.actions,
Expand Down Expand Up @@ -135,12 +141,17 @@ def _add_to_actions(self, generation_state):
if self.durable_context.will_continue_as_new:
return
if not generation_state._is_yielded:
if (isinstance(generation_state, Task)
and hasattr(generation_state, "action")):
self.durable_context.actions.append([generation_state.action])
elif (isinstance(generation_state, TaskSet)
and hasattr(generation_state, "actions")):
self.durable_context.actions.append(generation_state.actions)
if isinstance(generation_state, Task):
if self.durable_context._replay_schema == ReplaySchema.V1:
self.durable_context.actions.append([generation_state.action])
else:
self.durable_context.actions[0].append(generation_state.action)

elif isinstance(generation_state, TaskSet):
if self.durable_context._replay_schema == ReplaySchema.V1:
self.durable_context.actions.append(generation_state.actions)
else:
self.durable_context.actions[0].append(generation_state.actions)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even generate TaskSet even more in V2?

Copy link
Collaborator Author

@davidmrdavid davidmrdavid Jun 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do! It's just its corresponding actions field that is now represented with either a WhenAllAction or WhenAnyAction.

I did originally think of removing the TaskSet class with this PR, but that broke far too many unit tests to be viable at the moment. It is high on my priority list though

generation_state._is_yielded = True

def _update_timestamp(self):
Expand Down
12 changes: 10 additions & 2 deletions azure/durable_functions/tasks/task_all.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from azure.durable_functions.models.actions.WhenAllAction import WhenAllAction
from azure.durable_functions.models.ReplaySchema import ReplaySchema
from datetime import datetime
from typing import List, Optional, Any

Expand All @@ -6,7 +8,7 @@
from ..models.actions import Action


def task_all(tasks: List[Task]):
def task_all(tasks: List[Task], replay_schema: ReplaySchema):
"""Determine the state of scheduling the activities for execution with retry options.

Parameters
Expand All @@ -33,7 +35,10 @@ def task_all(tasks: List[Task]):
for task in tasks:
# Add actions and results
if isinstance(task, TaskSet):
actions.extend(task.actions)
if replay_schema == ReplaySchema.V1:
actions.extend(task.actions)
else:
actions.append(task.actions)
else:
# We know it's an atomic Task
actions.append(task.action)
Expand Down Expand Up @@ -62,6 +67,9 @@ def task_all(tasks: List[Task]):
results = []
end_time = None

if replay_schema == ReplaySchema.V2:
actions = WhenAllAction(actions)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't WhenAllAction a single action, and actions expecting an Action[]?

Copy link
Collaborator Author

@davidmrdavid davidmrdavid Jun 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're asking why the single action WhenAllAction is assigned to a variable named actions, right? Yeah that was a weird choice on my end. Looking back, it's not very readable.

The reason is that, in the case of V1 replays, actions is a list of action objects. Since we already use this variable in constructing the TaskSet below, I figured I would just override its contents for V2 and keep the remaining logic intact.

In general, a lot of this PR is designed to modify as little of the existing logic as possible, as I worried that too big of a refactor would risk introducing bugs. That being said, in this case we're talking about a specific variable name, so I don't mind just replacing the name with something like "action_payload" which works for V1 and V2.


# Construct TaskSet
taskset = TaskSet(
is_completed=is_completed,
Expand Down
13 changes: 10 additions & 3 deletions azure/durable_functions/tasks/task_any.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from azure.durable_functions.models.actions.WhenAnyAction import WhenAnyAction
from azure.durable_functions.models.ReplaySchema import ReplaySchema
from ..models.TaskSet import TaskSet


def task_any(tasks):
def task_any(tasks, replay_schema: ReplaySchema):
"""Determine whether any of the given tasks is completed.

Parameters
Expand All @@ -22,8 +24,10 @@ def task_any(tasks):
error_message = []
for task in tasks:
if isinstance(task, TaskSet):
for action in task.actions:
all_actions.append(action)
if replay_schema == ReplaySchema.V1:
all_actions.extend(task.actions)
else:
all_actions.append(task.actions)
else:
all_actions.append(task.action)

Expand All @@ -35,6 +39,9 @@ def task_any(tasks):

completed_tasks.sort(key=lambda t: t.timestamp)

if replay_schema == ReplaySchema.V2:
all_actions = WhenAnyAction(all_actions)

if len(faulted_tasks) == len(tasks):
return TaskSet(True, all_actions, None, is_faulted=True, exception=Exception(
f"All tasks have failed, errors messages in all tasks:{error_message}"))
Expand Down
23 changes: 23 additions & 0 deletions tests/models/test_DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
import json
from dateutil.parser import parse as dt_parse
from azure.durable_functions.models.ReplaySchema import ReplaySchema

from azure.durable_functions.models.DurableOrchestrationContext \
import DurableOrchestrationContext
Expand All @@ -23,10 +24,32 @@ def starting_context():
'"isReplaying":false,"parentInstanceId":null} ')
return context

@pytest.fixture
def starting_context_v2():
context = DurableOrchestrationContext.from_json(
'{"history":[{"EventType":12,"EventId":-1,"IsPlayed":false,'
'"Timestamp":"2019-12-08T23:18:41.3240927Z"}, '
'{"OrchestrationInstance":{'
'"InstanceId":"48d0f95957504c2fa579e810a390b938", '
'"ExecutionId":"fd183ee02e4b4fd18c95b773cfb5452b"},"EventType":0,'
'"ParentInstance":null, '
'"Name":"DurableOrchestratorTrigger","Version":"","Input":"null",'
'"Tags":null,"EventId":-1,"IsPlayed":false, '
'"Timestamp":"2019-12-08T23:18:39.756132Z"}],"input":null,'
'"instanceId":"48d0f95957504c2fa579e810a390b938", '
'"upperSchemaVersion": 1, '
'"isReplaying":false,"parentInstanceId":null} ')
return context


def test_extracts_is_replaying(starting_context):
assert not starting_context.is_replaying

def test_assumes_v1_replayschema(starting_context):
assert starting_context._replay_schema is ReplaySchema.V1

def test_assumes_v2_replayschema(starting_context_v2):
assert starting_context_v2._replay_schema is ReplaySchema.V2

def test_extracts_instance_id(starting_context):
assert "48d0f95957504c2fa579e810a390b938" == starting_context.instance_id
Expand Down
5 changes: 3 additions & 2 deletions tests/models/test_OrchestrationState.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from azure.durable_functions.models.ReplaySchema import ReplaySchema
from typing import List

from azure.durable_functions.models.actions.Action import Action
Expand All @@ -8,7 +9,7 @@

def test_empty_state_to_json_string():
actions: List[List[Action]] = []
state = OrchestratorState(is_done=False, actions=actions, output=None)
state = OrchestratorState(is_done=False, actions=actions, output=None, replay_schema=ReplaySchema.V1.value)
result = state.to_json_string()
expected_result = '{"isDone": false, "actions": []}'
assert expected_result == result
Expand All @@ -19,7 +20,7 @@ def test_single_action_state_to_json_string():
action: Action = CallActivityAction(
function_name="MyFunction", input_="AwesomeInput")
actions.append([action])
state = OrchestratorState(is_done=False, actions=actions, output=None)
state = OrchestratorState(is_done=False, actions=actions, output=None, replay_schema=ReplaySchema.V1.value)
result = state.to_json_string()
expected_result = ('{"isDone": false, "actions": [[{"actionType": 0, '
'"functionName": "MyFunction", "input": '
Expand Down
1 change: 1 addition & 0 deletions tests/orchestrator/orchestrator_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def assert_orchestration_state_equals(expected, result):
"""Ensure that the observable OrchestratorState matches the expected result.
"""
assert_attribute_equal(expected, result, "isDone")
assert_attribute_equal(expected, result, "schemaVersion")
assert_actions_are_equal(expected, result)
assert_attribute_equal(expected, result, "output")
assert_attribute_equal(expected, result, "error")
Expand Down
5 changes: 3 additions & 2 deletions tests/orchestrator/test_call_http.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from azure.durable_functions.models.ReplaySchema import ReplaySchema
import json
from typing import Dict

Expand Down Expand Up @@ -30,8 +31,8 @@ def complete_generator_function(context):
headers=HEADERS, token_source=TOKEN_SOURCE)


def base_expected_state(output=None) -> OrchestratorState:
return OrchestratorState(is_done=False, actions=[], output=output)
def base_expected_state(output=None, replay_schema: ReplaySchema = ReplaySchema.V1) -> OrchestratorState:
return OrchestratorState(is_done=False, actions=[], output=output, replay_schema=replay_schema.value)


def add_http_action(state: OrchestratorState, request):
Expand Down
Loading