Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swf.executor: improve the generation of a task's id #11

Closed
ggreg opened this issue Jul 30, 2014 · 3 comments
Closed

swf.executor: improve the generation of a task's id #11

ggreg opened this issue Jul 30, 2014 · 3 comments

Comments

@ggreg
Copy link
Contributor

ggreg commented Jul 30, 2014

To find a task in the workflow's history, the executor assigns a unique id to the task. It increments the last id associated with the task's name:

class TaskRegistry(dict):
    """This registry tracks tasks and assign them an integer identifier.

    """
    def add(self, task):
        """
        ID's are assigned sequentially by incrementing an integer. They start
        from 0.

        :returns:
            :rtype: int. 

        """
        name = task.name
        self[name] = self.setdefault(name, 0) + 1

        return self[name]

However it does not work if the execution flow changes like in simple implementation of a retry on a timeout:

f_result = self.submit(f, a)  # task1
if f_result.finished and f_result.exception:
    if is_timeout(f_result.exception):
        f_result = self.submit(f, a)  # task1bis

f_result_2 = self.submit(f, b)  # task2
futures.wait(f_result, f_result_2)

Let's simulate the execution of this workflow:

In the first run, the executor:

  • Assigns id 1 to task1.
  • Assigns id 2 to task2.
  • Blocks on futures.wait().
  • Schedules task1 with id 1 and task2 with id 2.

In the second run:

  • task1 timed out.
  • task2 finished at the same time.
  • Executor assigns id 1 to task1.
  • Both conditions are true and executor assigns id 2 to task1bis and finds it in the history. It mistakenly put the result of task2 in task1bis (they are not even called with the same argument).
  • Executor assigns id 3 to task2 and does not find it in the history.
  • Executor blocks on futures.wait()

The issue there is that the id of a task is generated with respect to the code that is evaluated before and this code may change depending on the execution of the workflow.

Assuming that tasks are idempotent would make things easier, because the executor would only need to hash the name and input of a task to generate its id. However if tasks can have side-effects (and we cannot ensure they cannot), we need to provide another mechanism. I thought that with could mark a computation as idempotent with a cached or memoized attribute. But we still need to handle other computations.

@jbbarth
Copy link
Collaborator

jbbarth commented Sep 20, 2015

Here's another illustration for this problem, extracted from real code that was misbehaving on SWF.

Let's assume we run two tasks for two sub-datasets (called "blocks" below), each second one depending on the completion of the first one for the same block. If the first task takes longer for block 0, then we'll submit second task twice for block 1, and never for block 0, with the following code:

import time

from simpleflow import activity, Workflow, futures

@activity.with_attributes(task_list="test", version="1.0")
def first_activity(block):
    print "start first_activity for block {}".format(block)
    if block == 0:
        time.sleep(5)
    else:
        time.sleep(1)
    print "finish first_activity for block {}".format(block)

@activity.with_attributes(task_list="test", version="1.0")
def second_activity(block):
    print "triggered second_activity for block {}".format(block)

class ChangingWorkflow(Workflow):
    name = "changing"
    version = "1.0"
    task_list = "test"

    def run(self):
        _all = []

        for block in [0, 1]:
            first = self.submit(first_activity, block)
            _all.append(first)

            if first.finished:
                second = self.submit(second_activity, block)
                _all.append(second)

        futures.wait(*_all)

The resulting log in a single activity worker speaks for itself:

start first_activity for block 1
start first_activity for block 0
finish first_activity for block 1
triggered second_activity for block 1
finish first_activity for block 0
triggered second_activity for block 1

If we can assume that tasks are idempotent, sure that simplifies the problem. I'm not sure about variations of the arguments though (we can imagine simple scenarios where the arguments themselves would vary in the workflow replay, and we probably don't want to spin up a new activity each time).

A more generic solution would consist in giving the ability to control the task id generation from the Workflow.submit() method or a decorator on the task, for instance by specifying a suffix that is user-controlled and varies with some parameter the task depends on (like block here).

What I don't like here overall is that it's a trap for users, it's very easy to assume that everything will work well regarding the code above, and if you don't understand this basic simpleflow assumption you'll fail.

Back to the drawing board, stay tuned ;-)

@jbbarth
Copy link
Collaborator

jbbarth commented Jan 2, 2017

Closing this old one, idempotent tasks mostly do the job \o/

@jbbarth jbbarth closed this as completed Jan 2, 2017
@jbbarth
Copy link
Collaborator

jbbarth commented Jan 23, 2017

New variant of this now that we have chains/groups. Example of a buggy decider (given "my_task" is not configured as idempotent):

self.submit(
    Group(
        Chain(ActivityTask(my_task, 1),
              ActivityTask(my_task, 2))
        Chain(ActivityTask(my_task, 3),
              ActivityTask(my_task, 4))
    )
)

=> 1st decision: 2 activities are scheduled:

  • my_task-1 with the input {"args": [1]}
  • then we jump to the next chain
  • my_task-2 with the input {"args": [3]}

Now imagine that my_task-1 finishes. A new decision is triggered.

=> 2nd decision:

  • my_task-1 is already finished, nothing to do
  • the next task in the chain gets the activity ID of my_task-2 in simpleflow ; simpleflow looks at the history and sees it's already scheduled, so nothing to do (wrong! this is not the same, but it cannot know)
  • then it sees the 1st task in the next chain, and names it my_task-3, and schedules it with the input: {"args": [3]}

In the end we will finish with 4 tasks executed:

  • my_task-1 with the input {"args": [1]}
  • my_task-2 with the input {"args": [3]}
  • my_task-3 with the input {"args": [3]}
  • my_task-4 with the input {"args": [4]}

=> probably not what we wanted...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

2 participants