Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion azure/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Base module for the Python Durable functions."""
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
__path__ = extend_path(__path__, __name__) # type: ignore
60 changes: 59 additions & 1 deletion azure/durable_functions/models/DurableOrchestrationClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from .PurgeHistoryResult import PurgeHistoryResult
from .DurableOrchestrationStatus import DurableOrchestrationStatus
from .EntityStateResponse import EntityStateResponse
from .RpcManagementOptions import RpcManagementOptions
from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus
from ..models.DurableOrchestrationBindings import DurableOrchestrationBindings
Expand Down Expand Up @@ -132,6 +133,56 @@ def create_http_management_payload(self, instance_id: str) -> Dict[str, str]:
"""
return self.get_client_response_links(None, instance_id)

async def read_entity_state(
self,
entityId: EntityId,
task_hub_name: Optional[str] = None,
connection_name: Optional[str] = None,
) -> EntityStateResponse:
"""Read the state of the entity.

Parameters
----------
entityId : EntityId
The EntityId of the targeted entity.
task_hub_name : Optional[str]
The task hub name of the target entity.
connection_name : Optional[str]
The name of the connection string associated with [task_hub_name].

Raises
------
Exception:
When an unexpected status code is returned

Returns
-------
EntityStateResponse
container object representing the state of the entity
"""
options = RpcManagementOptions(
connection_name=connection_name,
task_hub_name=task_hub_name,
entity_Id=entityId,
)

request_url = options.to_url(self._orchestration_bindings.rpc_base_url)
response = await self._get_async_request(request_url)

switch_statement = {
200: lambda: EntityStateResponse(True, response[1]),
404: lambda: EntityStateResponse(False),
}

result = switch_statement.get(response[0])

if not result:
raise Exception(
f"The operation failed with an unexpected status code {response[0]}"
)

return result()

def get_client_response_links(
self,
request: Optional[func.HttpRequest], instance_id: str) -> Dict[str, str]:
Expand Down Expand Up @@ -440,6 +491,8 @@ async def wait_for_completion_or_create_check_status_response(
lambda: self._create_http_response(200, status.to_json()),
OrchestrationRuntimeStatus.Failed:
lambda: self._create_http_response(500, status.to_json()),
None:
None
}

result = switch_statement.get(status.runtime_status)
Expand All @@ -456,6 +509,7 @@ async def wait_for_completion_or_create_check_status_response(
await sleep(sleep_time)
else:
return self.create_check_status_response(request, instance_id)
return self.create_check_status_response(request, instance_id)

async def signal_entity(self, entityId: EntityId, operation_name: str,
operation_input: Optional[Any] = None,
Expand Down Expand Up @@ -640,6 +694,7 @@ async def rewind(self,

response = await self._post_async_request(request_url, None)
status: int = response[0]
ex_msg: str = ""
if status == 200 or status == 202:
return
elif status == 404:
Expand All @@ -648,6 +703,9 @@ async def rewind(self,
elif status == 410:
ex_msg = "The rewind operation is only supported on failed orchestration instances."
raise Exception(ex_msg)
else:
elif isinstance(response[1], str):
ex_msg = response[1]
raise Exception(ex_msg)
else:
ex_msg = "Received unexpected payload from the durable-extension: " + str(response)
raise Exception(ex_msg)
26 changes: 22 additions & 4 deletions azure/durable_functions/models/DurableOrchestrationContext.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from azure.durable_functions.models.ReplaySchema import ReplaySchema
import json
import datetime
import inspect
from typing import List, Any, Dict, Optional
from uuid import UUID, uuid5, NAMESPACE_URL
from datetime import timezone

from .RetryOptions import RetryOptions
from .TaskSet import TaskSet
Expand All @@ -28,7 +31,7 @@ class DurableOrchestrationContext:
# parameter names are as defined by JSON schema and do not conform to PEP8 naming conventions
def __init__(self,
history: List[Dict[Any, Any]], instanceId: str, isReplaying: bool,
parentInstanceId: str, input: Any = None, **kwargs):
parentInstanceId: str, input: Any = None, upperSchemaVersion: int = 0, **kwargs):
self._histories: List[HistoryEvent] = [HistoryEvent(**he) for he in history]
self._instance_id: str = instanceId
self._is_replaying: bool = isReplaying
Expand All @@ -43,8 +46,11 @@ def __init__(self,
self._current_utc_datetime: datetime.datetime = \
self.decision_started_event.timestamp
self._new_uuid_counter = 0
self.actions: List[List[Action]] = []
self._function_context: FunctionContext = FunctionContext(**kwargs)
self._replay_schema = ReplaySchema(upperSchemaVersion)
self.actions: List[List[Action]] = []
if self._replay_schema == ReplaySchema.V2:
self.actions.append([])

# make _input always a string
# (consistent with Python Functions generic trigger/input bindings)
Expand Down Expand Up @@ -238,7 +244,7 @@ def task_all(self, activities: List[Task]) -> TaskSet:
TaskSet
The results of all activities.
"""
return task_all(tasks=activities)
return task_all(tasks=activities, replay_schema=self._replay_schema)

def task_any(self, activities: List[Task]) -> TaskSet:
"""Schedule the execution of all activities.
Expand All @@ -258,7 +264,7 @@ def task_any(self, activities: List[Task]) -> TaskSet:
TaskSet
The first [[Task]] instance to complete.
"""
return task_any(tasks=activities)
return task_any(tasks=activities, replay_schema=self._replay_schema)

def set_custom_status(self, status: Any):
"""Set the customized orchestration status for your orchestrator function.
Expand Down Expand Up @@ -459,3 +465,15 @@ def new_guid(self) -> UUID:
self._new_uuid_counter += 1
guid = uuid5(NAMESPACE_URL, guid_name)
return guid

def _pretty_print_history(self) -> str:
"""Get a pretty-printed version of the orchestration's internal history."""
def history_to_string(event):
json_dict = {}
for key, val in inspect.getmembers(event):
if not key.startswith('_') and not inspect.ismethod(val):
if isinstance(val, datetime.date):
val = val.replace(tzinfo=timezone.utc).timetuple()
json_dict[key] = val
return json.dumps(json_dict)
return str(list(map(history_to_string, self._histories)))
23 changes: 23 additions & 0 deletions azure/durable_functions/models/EntityStateResponse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Any


class EntityStateResponse:
"""Entity state response object for [read_entity_state]."""

def __init__(self, entity_exists: bool, entity_state: Any = None) -> None:
self._entity_exists = entity_exists
self._entity_state = entity_state

@property
def entity_exists(self) -> bool:
"""Get the bool representing whether entity exists."""
return self._entity_exists

@property
def entity_state(self) -> Any:
"""Get the state of the entity.
When [entity_exists] is False, this value will be None.
Optional.
"""
return self._entity_state
11 changes: 11 additions & 0 deletions azure/durable_functions/models/OrchestratorState.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
from typing import List, Any, Dict, Optional, Union

from azure.durable_functions.models.ReplaySchema import ReplaySchema

from .utils.json_utils import add_attrib
from azure.durable_functions.models.actions.Action import Action

Expand All @@ -16,13 +18,15 @@ def __init__(self,
is_done: bool,
actions: List[List[Action]],
output: Any,
replay_schema: ReplaySchema,
error: str = None,
custom_status: Any = None):
self._is_done: bool = is_done
self._actions: List[List[Action]] = actions
self._output: Any = output
self._error: Optional[str] = error
self._custom_status: Any = custom_status
self._replay_schema: ReplaySchema = replay_schema

@property
def actions(self) -> List[List[Action]]:
Expand Down Expand Up @@ -66,6 +70,11 @@ def custom_status(self):
"""Get the JSON-serializable value used by DurableOrchestrationContext.SetCustomStatus."""
return self._custom_status

@property
def schema_version(self):
"""Get the Replay Schema represented in this OrchestratorState payload."""
return self._replay_schema.value

def to_json(self) -> Dict[str, Union[str, int]]:
"""Convert object into a json dictionary.

Expand All @@ -76,6 +85,8 @@ def to_json(self) -> Dict[str, Union[str, int]]:
"""
json_dict: Dict[str, Union[str, int]] = {}
add_attrib(json_dict, self, '_is_done', 'isDone')
if self._replay_schema != ReplaySchema.V1:
add_attrib(json_dict, self, 'schema_version', 'schemaVersion')
self._add_actions(json_dict)
if not (self._output is None):
json_dict['output'] = self._output
Expand Down
8 changes: 8 additions & 0 deletions azure/durable_functions/models/ReplaySchema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from enum import Enum


class ReplaySchema(Enum):
"""Enum representing the ReplaySchemas supported by this SDK version."""

V1 = 0
V2 = 1
1 change: 1 addition & 0 deletions azure/durable_functions/models/Task.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self, is_completed, is_faulted, action,
self._id = id_
self._exception = exc
self._is_played = is_played
self._is_yielded: bool = False

@property
def is_completed(self) -> bool:
Expand Down
1 change: 1 addition & 0 deletions azure/durable_functions/models/TaskSet.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(self, is_completed, actions, result, is_faulted=False,
self._timestamp: datetime = timestamp
self._exception = exception
self._is_played = is_played
self._is_yielded: bool = False

@property
def is_completed(self) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions azure/durable_functions/models/actions/ActionType.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ class ActionType(IntEnum):
CALL_ENTITY = 7
CALL_HTTP: int = 8
SIGNAL_ENTITY: int = 9
WHEN_ANY = 11
WHEN_ALL = 12
35 changes: 35 additions & 0 deletions azure/durable_functions/models/actions/CompoundAction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import Dict, Union

from .Action import Action
from ..utils.json_utils import add_attrib
from typing import List
from abc import abstractmethod


class CompoundAction(Action):
"""Defines the structure of the WhenAll Action object.

Provides the information needed by the durable extension to be able to invoke WhenAll tasks.
"""

def __init__(self, compoundTasks: List[Action]):
self.compound_actions = list(map(lambda x: x.to_json(), compoundTasks))

@property
@abstractmethod
def action_type(self) -> int:
"""Get this object's action type as an integer."""
...

def to_json(self) -> Dict[str, Union[str, int]]:
"""Convert object into a json dictionary.

Returns
-------
Dict[str, Union[str, int]]
The instance of the class converted into a json dictionary
"""
json_dict: Dict[str, Union[str, int]] = {}
add_attrib(json_dict, self, 'action_type', 'actionType')
add_attrib(json_dict, self, 'compound_actions', 'compoundActions')
return json_dict
14 changes: 14 additions & 0 deletions azure/durable_functions/models/actions/WhenAllAction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from .ActionType import ActionType
from azure.durable_functions.models.actions.CompoundAction import CompoundAction


class WhenAllAction(CompoundAction):
"""Defines the structure of the WhenAll Action object.

Provides the information needed by the durable extension to be able to invoke WhenAll tasks.
"""

@property
def action_type(self) -> int:
"""Get the type of action this class represents."""
return ActionType.WHEN_ALL
14 changes: 14 additions & 0 deletions azure/durable_functions/models/actions/WhenAnyAction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from azure.durable_functions.models.actions.CompoundAction import CompoundAction
from .ActionType import ActionType


class WhenAnyAction(CompoundAction):
"""Defines the structure of the WhenAll Action object.

Provides the information needed by the durable extension to be able to invoke WhenAll tasks.
"""

@property
def action_type(self) -> int:
"""Get the type of action this class represents."""
return ActionType.WHEN_ANY
7 changes: 7 additions & 0 deletions azure/durable_functions/models/history/HistoryEvent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ def __init__(self, EventType: HistoryEventType, EventId: int, IsPlayed: bool, Ti
self._is_played: bool = IsPlayed
self._timestamp: datetime.datetime = dt_parse(Timestamp)
self._is_processed: bool = False

self.Name = None
self.InstanceId = None
self.TaskScheduledId = None
self.Reason = None
self.Details = None
self.Input = None
if kwargs is not None:
for key, value in kwargs.items():
self.__setattr__(key, value)
Expand Down
Loading