Skip to content

Commit

Permalink
Generate a safer task ID for idempotent tasks (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbbarth committed Sep 22, 2015
1 parent b2970c4 commit b9faf1b
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 6 deletions.
8 changes: 6 additions & 2 deletions simpleflow/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def with_attributes(
schedule_to_close_timeout=settings.ACTIVITY_SCHEDULE_TO_CLOSE_TIMEOUT,
schedule_to_start_timeout=settings.ACTIVITY_SCHEDULE_TO_START_TIMEOUT,
heartbeat_timeout=settings.ACTIVITY_HEARTBEAT_TIMEOUT,
idempotent=None,
):
"""
:param name: of the activity type.
Expand All @@ -29,7 +30,8 @@ def wrap(func):
start_to_close_timeout,
schedule_to_close_timeout,
schedule_to_start_timeout,
heartbeat_timeout
heartbeat_timeout,
idempotent=idempotent,
)

return wrap
Expand All @@ -45,14 +47,16 @@ def __init__(self, callable,
start_to_close_timeout=None,
schedule_to_close_timeout=None,
schedule_to_start_timeout=None,
heartbeat_timeout=None):
heartbeat_timeout=None,
idempotent=None):
self._callable = callable

self._name = name
self.version = version
self.task_list = task_list
self.retry = retry
self.raises_on_failure = raises_on_failure
self.idempotent = idempotent
self.task_start_to_close_timeout = start_to_close_timeout
self.task_schedule_to_close_timeout = schedule_to_close_timeout
self.task_schedule_to_start_timeout = schedule_to_start_timeout
Expand Down
19 changes: 15 additions & 4 deletions simpleflow/swf/executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import absolute_import

import hashlib
import json
import logging
import traceback
Expand Down Expand Up @@ -75,17 +76,27 @@ def reset(self):
self._decisions = []
self._tasks = TaskRegistry()

def _make_task_id(self, task):
def _make_task_id(self, task, *args, **kwargs):
"""
Assign a new ID to *task*.
:returns:
String with at most 256 characters.
"""
index = self._tasks.add(task)
task_id = '{name}-{idx}'.format(name=task.name, idx=index)
if not task.idempotent:
# If idempotency is False or unknown, let's generate a task id by
# incrementing and id after the task name.
# (default strategy, backwards compatible with previous versions)
suffix = self._tasks.add(task)
else:
# If task is idempotent, we can do better and hash arguments.
# It makes the workflow resistant to retries or variations on the
# same task name (see #11).
arguments = json.dumps({"args": args, "kwargs": kwargs})
suffix = hashlib.md5(arguments).hexdigest()

task_id = '{name}-{idx}'.format(name=task.name, idx=suffix)
return task_id

def _get_future_from_activity_event(self, event):
Expand Down Expand Up @@ -235,7 +246,7 @@ def resume(self, task, *args, **kwargs):
otherwise schedules it.
"""
task.id = self._make_task_id(task)
task.id = self._make_task_id(task, *args, **kwargs)
event = self.find_event(task, self._history)

future = None
Expand Down
3 changes: 3 additions & 0 deletions simpleflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def serialize(self, value):
class ActivityTask(Task):
def __init__(self, activity, *args, **kwargs):
self.activity = activity
self.idempotent = activity.idempotent
self.args = args
self.kwargs = kwargs
self.id = None
Expand All @@ -51,6 +52,8 @@ def __repr__(self):
class WorkflowTask(Task):
def __init__(self, workflow, *args, **kwargs):
self.workflow = workflow
# TODO: handle idempotence at workflow level
self.idempotent = False
self.args = args
self.kwargs = kwargs
self.id = None
Expand Down
45 changes: 45 additions & 0 deletions tests/test_dataflow.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ def double(x):
return x * 2


@activity.with_attributes(version=DEFAULT_VERSION, idempotent=True)
def triple(x):
return x * 3


class TestWorkflow(Workflow):
name = 'test_workflow'
version = 'test_version'
Expand Down Expand Up @@ -1215,3 +1220,43 @@ def test_more_than_1000_open_activities_partial_max():
# 2 already scheduled + 20 to schedule now
assert executor._open_activity_count == 22
assert len(decisions) == 20



class TestTaskNaming(TestWorkflow):
"""
This workflow executes a few tasks and tests the naming (task ID
assignation) depending on their idempotence.
"""
def run(self):
results = []
results.append(self.submit(increment, 0))
results.append(self.submit(increment, 0))
results.append(self.submit(triple, 1))
results.append(self.submit(triple, 2))
results.append(self.submit(triple, 2))
futures.wait(*results)

def test_task_naming():
workflow = TestTaskNaming
executor = Executor(DOMAIN, workflow)

history = builder.History(workflow, input={})

decisions, _ = executor.replay(history)
expected = [
# non idempotent task, should increment
"activity-tests.test_dataflow.increment-1",
# non idempotent task, should increment again
"activity-tests.test_dataflow.increment-2",
# idempotent task, with arg 1
"activity-tests.test_dataflow.triple-deb8adb88b687c0df408628aa69b1377",
# idempotent task, with arg 2
"activity-tests.test_dataflow.triple-d269dc325a06c6ad32888f450ee8dd30",
# idempotent task, with arg 2 too => same task id
"activity-tests.test_dataflow.triple-d269dc325a06c6ad32888f450ee8dd30",
]
for i in range(0, len(expected)):
decision = decisions[i]['scheduleActivityTaskDecisionAttributes']
assert decision['activityId'] == expected[i]

0 comments on commit b9faf1b

Please sign in to comment.