Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In notify_barriers when queue task_retry_limit exceeded the pipeline never finalize #54

Open
ymohii opened this issue Aug 13, 2015 · 5 comments

Comments

@ymohii
Copy link

ymohii commented Aug 13, 2015

in this line
https://github.com/GoogleCloudPlatform/appengine-pipelines/blob/master/python/src/pipeline/pipeline.py#L1672

The library simply passes the exception of taskqueue.TombstonedTaskError which happens often in our application at this point of the library.
But when the retries exceeds task_retry_limit the pipelines are not notified with the filled slots and the pipeline stay in the status of Run or Finalize forever.

is this kind of behavior is intended or there is a way to overcome this issue.

@soundofjw
Copy link
Contributor

I have seen this issue too, where the pipeline stays in the Finalizing status, when it should have otherwise aborted, but I am not certain that this is the root cause.

What this line is trying to achieve is idempotence, making sure that we haven't already fired the same task from the taskqueue perspective. The goal of this codeblock is to prevent errors when we've actually already created and run the notify tasks.


What is an issue here, at a glance, is that task_list contains one or more tasks, but any one of those tasks being duplicated will throw one of (taskqueue.TombstonedTaskError, taskqueue.TaskAlreadyExistsError).

There's nothing in the documentation that tells me what happens to the remaining tasks when one fails for this reason. Anyone from the Google team know for sure?

@ymohii
Copy link
Author

ymohii commented Aug 17, 2015

@soundofjw
thanks for your reply.
Now I agree that it appears that this block of code is not responsible for the problem,I hope someone can help on this issue:
The root pipeline is either on run state or finalize for ever.
Number of child pipelines I can see from the UI in run state but they never actually started.
There is no errors at all in the logs.
Which gives me the feeling that the problem may be is that we have number of tasks that has not been added to the queue somehow.

@soundofjw
Copy link
Contributor

@ymohii Can you share the root pipeline code, specifically if you've got something going on in the finalized method? - Obfuscate any business logic as necessary.

I've seen this problem occur when my finalized methods throw Exceptions - do you see anything lik that in the logs? (I'd guess not, by your comment).

I wouldn't completely throw out your initial assumption - when I'm in situations like this I tend to start adding logging statements everywhere.


As an alternative debugging option, can you get your pipeline successfully running on your local environment?

I tend to choose using pipe.start() over pipe.start_test() and then executing the queues via the taskqueue stub. This is a pretty intimidating process the first time, so I recommend looking at how appengine-mapreduce runs test cases. - Here's a stripped down version of similar implementation:
https://gist.github.com/soundofjw/8bb8247e5f7d1d31d917

@ymohii
Copy link
Author

ymohii commented Aug 24, 2015

@soundofjw
Thanks for your feedback
my pipelines in simply an implementation of the fan-in example in this article http://sookocheff.com/post/appengine/pipelines/fan-in-fan-out/
I'm taking this code snippet from the article as an example

class SquarePipeline(pipeline.Pipeline):

    def run(self, number):
        logging.info('Squaring: %s' % number)
        return number * number


class Sum(pipeline.Pipeline):

    def run(self, *args):
        value = sum(list(args))
        logging.info('Sum: %s', value)
        return value


class FanInPipeline(pipeline.Pipeline):

    def run(self, count):
        results = []
        for i in xrange(0, count):
            result = yield SquarePipeline(i)
            results.append(result)

        # Waits until all SquarePipeline results are complete
        yield Sum(*results)

In my own pipeline there is nothing more than this fan-in implementation.

For this example (if the my problem happens to it) I logged throw the loop and found that the loop has called the SquarePipeline successfully for all the items
But some of the SquarePipeline pipelines do not start at all ( the logging I put inside the run method of SquarePipeline has never been called for some missing pipelines).

I can overcome the problem by increasing the number of retries for the queue in queue.yaml but i think this is not a good thing to rely on without understanding the problem.

regarding finalize method I there is no finalize implementation for it.
regarding local environment the pipelines are working fine also in small subsets of data, but in my app the data is more bigger (the loop in FanInPipeline is generating in some cases more than 100 of pipelines) and then the problem arises, which makes me not satisfied with increasing number of retries solution until I understand the source of the problem.

@tkaitchuck
Copy link
Contributor

Two comments:

  1. taskqueue.TombstonedTaskError and taskqueue.TaskAlreadyExistsError don't protect against all duplication of messages. IE a message could be received, processed, and then fail to be acknowledged. So the library code still needs to be idempotent. (by updating datastore)
  2. It would probably be much better to manage retries externally to the task queue. IE: set the taskqueue retry limit to infinite, and manage the sate in datastore. This would avoid the problem of having an inconsistent state between them resulting in silently stopping.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants