Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions azure/durable_functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,50 @@
from .models.DurableEntityContext import DurableEntityContext
from .models.RetryOptions import RetryOptions
from .models.TokenSource import ManagedIdentityTokenSource
import json
from pathlib import Path
import sys


def validate_extension_bundles():
"""Throw an exception if host.json contains bundle-range V1.

Raises
------
Exception: Exception prompting the user to update to bundles V2
"""
# No need to validate if we're running tests
if "pytest" in sys.modules:
return

host_path = "host.json"
bundles_key = "extensionBundle"
version_key = "version"
host_file = Path(host_path)

if not host_file.exists():
# If it doesn't exist, we ignore it
return

with open(host_path) as f:
host_settings = json.loads(f.read())
try:
version_range = host_settings[bundles_key][version_key]
except Exception:
# If bundle info is not available, we ignore it.
# For example: it's possible the user is using a manual extension install
return
# We do a best-effort attempt to detect bundles V1
# This is the string hard-coded into the bundles V1 template in VSCode
if version_range == "[1.*, 2.0.0)":
message = "Durable Functions for Python does not support Bundles V1."\
" Please update to Bundles V2 in your `host.json`."\
" You can set extensionBundles version to be: [2.*, 3.0.0)"
raise Exception(message)


# Validate that users are not in extension bundles V1
validate_extension_bundles()

__all__ = [
'Orchestrator',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self,
self.decision_started_event: HistoryEvent = \
[e_ for e_ in self.histories
if e_.event_type == HistoryEventType.ORCHESTRATOR_STARTED][0]
self._current_utc_datetime = \
self._current_utc_datetime: datetime.datetime = \
self.decision_started_event.timestamp
self._new_uuid_counter = 0
self.actions: List[List[Action]] = []
Expand Down
11 changes: 4 additions & 7 deletions azure/durable_functions/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def handle(self, context: DurableOrchestrationContext):
generation_state.exception)
continue

self._reset_timestamp()
self._update_timestamp()
self.durable_context._is_replaying = generation_state._is_played
generation_state = self._generate_next(generation_state)

Expand Down Expand Up @@ -141,16 +141,13 @@ def _add_to_actions(self, generation_state):
and hasattr(generation_state, "actions")):
self.durable_context.actions.append(generation_state.actions)

def _reset_timestamp(self):
def _update_timestamp(self):
last_timestamp = self.durable_context.decision_started_event.timestamp
decision_started_events = [e_ for e_ in self.durable_context.histories
if e_.event_type == HistoryEventType.ORCHESTRATOR_STARTED
and e_.timestamp > last_timestamp]
if len(decision_started_events) == 0:
self.durable_context.current_utc_datetime = None
else:
self.durable_context.decision_started_event = \
decision_started_events[0]
if len(decision_started_events) != 0:
self.durable_context.decision_started_event = decision_started_events[0]
self.durable_context.current_utc_datetime = \
self.durable_context.decision_started_event.timestamp

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from setuptools import setup, find_packages
from distutils.command import build

with open("README.md", "r") as fh:
with open("README.md", "r", encoding="utf8") as fh:
long_description = fh.read()

class BuildModule(build.build):
Expand Down
133 changes: 133 additions & 0 deletions tests/orchestrator/test_sequential_orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime, timedelta
from .orchestrator_test_utils \
import assert_orchestration_state_equals, get_orchestration_state_result, assert_valid_schema
from tests.test_utils.ContextBuilder import ContextBuilder
Expand All @@ -20,6 +21,49 @@ def generator_function(context):

return outputs

def generator_function_time_is_not_none(context):
outputs = []

now = context.current_utc_datetime
if not now:
raise Exception("No time! 1st attempt")
task1 = yield context.call_activity("Hello", "Tokyo")

now = context.current_utc_datetime
if not now:
raise Exception("No time! 2nd attempt")
task2 = yield context.call_activity("Hello", "Seattle")

now = context.current_utc_datetime
if not now:
raise Exception("No time! 3rd attempt")
task3 = yield context.call_activity("Hello", "London")

now = context.current_utc_datetime
if not now:
raise Exception("No time! 4th attempt")

outputs.append(task1)
outputs.append(task2)
outputs.append(task3)

return outputs

def generator_function_time_gather(context):
outputs = []

outputs.append(context.current_utc_datetime.strftime("%m/%d/%Y, %H:%M:%S"))
yield context.call_activity("Hello", "Tokyo")

outputs.append(context.current_utc_datetime.strftime("%m/%d/%Y, %H:%M:%S"))
yield context.call_activity("Hello", "Seattle")

outputs.append(context.current_utc_datetime.strftime("%m/%d/%Y, %H:%M:%S"))
yield context.call_activity("Hello", "London")

outputs.append(context.current_utc_datetime.strftime("%m/%d/%Y, %H:%M:%S"))
return outputs

def generator_function_rasing_ex(context):
outputs = []

Expand Down Expand Up @@ -220,3 +264,92 @@ def test_tokyo_and_seattle_and_london_with_serialization_state():

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_utc_time_is_never_none():
"""Tests an orchestrator that errors out if its current_utc_datetime is ever None.

If we receive all activity results, it means we never error'ed out. Our test has
a history events array with identical timestamps, simulating events arriving
very close to one another."""

# we set `increase_time` to False to make sure the changes are resilient
# to undistinguishable timestamps (events arrive very close to each other)
context_builder = ContextBuilder('test_simple_function', increase_time=False)
add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"")
add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"")
add_hello_completed_events(context_builder, 2, "\"Hello London!\"")

result = get_orchestration_state_result(
context_builder, generator_function_deterministic_utc_time)

expected_state = base_expected_state(
['Hello Tokyo!', 'Hello Seattle!', 'Hello London!'])
add_hello_action(expected_state, 'Tokyo')
add_hello_action(expected_state, 'Seattle')
add_hello_action(expected_state, 'London')
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_utc_time_is_never_none():
"""Tests an orchestrator that errors out if its current_utc_datetime is ever None.

If we receive all activity results, it means we never error'ed out. Our test has
a history events array with identical timestamps, simulating events arriving
very close to one another."""

# we set `increase_time` to False to make sure the changes are resilient
# to undistinguishable timestamps (events arrive very close to each other)
context_builder = ContextBuilder('test_simple_function', increase_time=False)
add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"")
add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"")
add_hello_completed_events(context_builder, 2, "\"Hello London!\"")

result = get_orchestration_state_result(
context_builder, generator_function_time_is_not_none)

expected_state = base_expected_state(
['Hello Tokyo!', 'Hello Seattle!', 'Hello London!'])
add_hello_action(expected_state, 'Tokyo')
add_hello_action(expected_state, 'Seattle')
add_hello_action(expected_state, 'London')
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

def test_utc_time_updates_correctly():
"""Tests that current_utc_datetime updates correctly"""

now = datetime.utcnow()
# the first orchestrator-started event starts 1 second after `now`
context_builder = ContextBuilder('test_simple_function', starting_time=now)
add_hello_completed_events(context_builder, 0, "\"Hello Tokyo!\"")
add_hello_completed_events(context_builder, 1, "\"Hello Seattle!\"")
add_hello_completed_events(context_builder, 2, "\"Hello London!\"")

result = get_orchestration_state_result(
context_builder, generator_function_time_gather)

# In the expected history, the orchestrator starts again every 4 seconds
# The current_utc_datetime should update to the orchestrator start event timestamp
num_restarts = 3
expected_utc_time = now + timedelta(seconds=1)
outputs = [expected_utc_time.strftime("%m/%d/%Y, %H:%M:%S")]
for _ in range(num_restarts):
expected_utc_time += timedelta(seconds=4)
outputs.append(expected_utc_time.strftime("%m/%d/%Y, %H:%M:%S"))

expected_state = base_expected_state(outputs)
add_hello_action(expected_state, 'Tokyo')
add_hello_action(expected_state, 'Seattle')
add_hello_action(expected_state, 'London')
expected_state._is_done = True
expected = expected_state.to_json()

assert_valid_schema(result)
assert_orchestration_state_equals(expected, result)

14 changes: 10 additions & 4 deletions tests/test_utils/ContextBuilder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
import json
from datetime import datetime, timedelta
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional

from .json_utils import add_attrib, convert_history_event_to_json_dict
from azure.durable_functions.constants import DATETIME_STRING_FORMAT
Expand All @@ -13,20 +13,26 @@


class ContextBuilder:
def __init__(self, name: str=""):
def __init__(self, name: str="", increase_time: bool = True, starting_time: Optional[datetime] = None):
self.increase_time = increase_time
self.instance_id = uuid.uuid4()
self.is_replaying: bool = False
self.input_ = None
self.parent_instance_id = None
self.history_events: List[HistoryEvent] = []
self.current_datetime: datetime = datetime.now()

if starting_time is None:
starting_time = datetime.now()
self.current_datetime: datetime = starting_time

self.add_orchestrator_started_event()
self.add_execution_started_event(name)

def get_base_event(
self, event_type: HistoryEventType, id_: int = -1,
is_played: bool = False, timestamp=None) -> HistoryEvent:
self.current_datetime = self.current_datetime + timedelta(seconds=1)
if self.increase_time:
self.current_datetime = self.current_datetime + timedelta(seconds=1)
if not timestamp:
timestamp = self.current_datetime
event = HistoryEvent(EventType=event_type, EventId=id_,
Expand Down