Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncioExecutor not truly async #96

Closed
agronholm opened this issue May 28, 2015 · 38 comments
Closed

AsyncioExecutor not truly async #96

agronholm opened this issue May 28, 2015 · 38 comments

Comments

@agronholm
Copy link
Owner

Originally reported by: Lennart Bastian (Bitbucket: bastianl, GitHub: bastianl)


I have been using the AsyncioScheduler with the APScheduler to schedule tasks asynchronously. However, I want to schedule jobs (coroutines) in the same event loop that the scheduler is running in.
It looks like the AsyncioExectutor is running tasks in a separate thread, and not as coroutines but rather regular functions. In fact, it doesn't seem like you can schedule any kind of asynchronous functions. Is this the intended behavior? What's the point of calling it an asyncio executor?

To solve this I modified the function run_job to work asynchronously, and tweaked the executor to run jobs as asyncio.Tasks which are automatically inserted into the eventloop.
See this.

Maybe we can modularize the run_job function and split it into two functions, one of which works asynchronously. Let me know what you think, am happy to submit a pull request.


@agronholm
Copy link
Owner Author

Original comment by Alex Grönholm (Bitbucket: agronholm, GitHub: agronholm):


It has occurred to me to offer an executor that runs the jobs in the event loop thread. But by default I wanted to offer an interface that always executed jobs in a thread regardless of the scheduler type (GeventScheduler being the obvious exception). I can't change this without breaking the API, which means not before 4.0. But I could add a suitable executor in 3.1. The same will have to be done with the Tornado and Twisted schedulers though.

@agronholm
Copy link
Owner Author

Original comment by Alex Grönholm (Bitbucket: agronholm, GitHub: agronholm):


And the reason it's called "asyncio executor" is because it utilizes the default thread pool in the event loop. That's it.

@agronholm
Copy link
Owner Author

Original comment by Lennart Bastian (Bitbucket: bastianl, GitHub: bastianl):


Ok, got it – thanks for the reply. Asynchronous execution seems useful in a variety of scenarios but I can understand why it is difficult.

@agronholm
Copy link
Owner Author

Original comment by Alex Grönholm (Bitbucket: agronholm, GitHub: agronholm):


Yeah, it's one thing to make the job run in the event loop thread, but a whole another to support generator-style execution. It would require a backend specific version of run_job().

@agronholm
Copy link
Owner Author

Original comment by Bernard Kerckenaere (Bitbucket: bernieke, GitHub: bernieke):


disclaimer: I'm on Python 3.5.0

For some reason the yield from in the modified run_job throws an "TypeError: 'NoneType' object is not iterable" exception for me.

All I actually needed to get it to work for me was:

import sys
import time
import asyncio

from pytz import timezone

from apscheduler.executors.base import BaseExecutor, run_job
from apscheduler.schedulers.asyncio import AsyncIOScheduler

class AsyncIOExecutor(BaseExecutor):
    """Runs coroutine asynchronously in the same event loop."""

    def start(self, scheduler, alias):
        super(AsyncIOExecutor, self).start(scheduler, alias)
        self._eventloop = scheduler._eventloop

    def _do_submit_job(self, job, run_times):
        def callback(f):
            try:
                events = f.result()
            except:
                self._run_job_error(job.id, *sys.exc_info()[1:])
            else:
                self._run_job_success(job.id, events)

        f = asyncio.Task(asyncio.coroutine(run_job)(
            job, job._jobstore_alias, run_times, self._logger.name))
        f.add_done_callback(callback)

scheduler = AsyncIOScheduler(timezone=timezone(time.tzname[0]),
                             executors={'default': AsyncIOExecutor()})

So instead of using a modified run_job, I just wrapped the existing run_job in a coroutine.

@agronholm
Copy link
Owner Author

Original comment by Alex Grönholm (Bitbucket: agronholm, GitHub: agronholm):


This is a good candidate for a feature in the next release. Of course I'll have to implement the same with Twisted and Tornado too...

@agronholm
Copy link
Owner Author

Original comment by Bernard Kerckenaere (Bitbucket: bernieke, GitHub: bernieke):


I just figured out why I didn't need the yield from in my use case.

The function I'm calling isn't a coroutine itself, but schedules one by calling asyncio.async.
(Which without the modified AsyncIOExecutor raises a "RuntimeError: There is no current event loop in thread 'Thread-1'." exception.)

So I guess the proper solution would be to have run_job check if job.func is a coroutine, and yield from if it is. That way both use cases should work.

@agronholm
Copy link
Owner Author

Original comment by Alex Grönholm (Bitbucket: agronholm, GitHub: agronholm):


By the way, you've essentially reinvented asyncio.async() (or asyncio.ensure_future()).

@agronholm
Copy link
Owner Author

Original comment by Alex Grönholm (Bitbucket: agronholm, GitHub: agronholm):


I have a plan of action for this but it requires adding a major feature.

@binhnq94
Copy link

in your example asyncio_.py

i try declare function tick like:

@asyncio.coroutine
def tick()

, although, no error was raised, but it not done.

How i work with AsyncioScheduler?

@agronholm
Copy link
Owner Author

Scheduling of coroutine functions is marked to be implemented in the 3.3 milestone. Meanwhile, you can use the event loop's call_soon_threadsafe() function to make your code run in the event loop thread. Note that the target function of the scheduler job must not be a coroutine or it will never run.

@binhnq94
Copy link

Plzzz, illustrate example how to use call_soon_threadsafe() with AsyncioScheduler?

@txomon
Copy link
Contributor

txomon commented Apr 20, 2016

@agronholm Does it have any blocking issues that should be taken care of?

@agronholm
Copy link
Owner Author

This particular thing doesn't. However, the next step is to get 3.2.0 out which will fix the use case 3.1.0 broke -- adjusting jobs in job stores before the scheduler starts processing the jobs. The code is done, the tests are just unfinished. I expect to make a release this weekend.

The biggest problem with async execution is that I have to duplicate pretty much all of run_job() for use with scheduled coroutines and I don't want to do that. So I'm still looking for elegant solutions to that.

@txomon
Copy link
Contributor

txomon commented Apr 21, 2016

@TechBK I am now using asyncio executor as the following. being execute() the function used on add_job()

import asyncio

async def _coroutine_execute(args):
    print('Executing through coroutine')

def execute(*args):
    try:
        el = asyncio.get_event_loop()
    except RuntimeError:
        el = asyncio.new_event_loop()
        asyncio.set_event_loop(el)

    try:
        el.run_until_complete(_coroutine_execute(args))
    except AttributeError:
        print('Something is going on in the tasks')

@agronholm I don't see what you mean. AFA I see, you would just need run a check on the job func and if it's a coroutine, and await from it if so, no?

@agronholm
Copy link
Owner Author

How would that work with Python 2?

@txomon
Copy link
Contributor

txomon commented Apr 21, 2016

Following this?
https://www.python.org/dev/peps/pep-0492/#backwards-compatibility

Other option is to define execute_job() as a function where just the execution of the job is handled, and still have run_job() globally defined, leaving room to each executor implement it's own.

That way, you could reimplement execute_job() in each Executor, making sure that it does what it says (the actual function, is executed in a thread, in a coroutine, in an eventlet, etc.)

The good part of this one is that you wouldn't run into any problems importing asyncio in the AsyncIOExecutor. You would need to move any logic you may have already created to overcome these limitations for other modules (spawning a thread etc.) to the execute_job() function of each executor.

@agronholm
Copy link
Owner Author

agronholm commented Apr 24, 2016

Ok @txomon , can you show me how to write run_job() in a way that works for both coroutine callables and regular functions in a manner that doesn't duplicate all the code? I've tried and failed.

@txomon
Copy link
Contributor

txomon commented Apr 28, 2016

@agronholm check #134

@binhnq94
Copy link

binhnq94 commented May 5, 2016

i am waiting for solution of issue :(

@agronholm
Copy link
Owner Author

And you'll remain waiting for a while. The next release will not fix this -- 3.3 at the earliest will. Meanwhile you can try hacks involving call_soon_threadsafe(), create_task() and ensure_future().

@binhnq94
Copy link

binhnq94 commented May 5, 2016

@txomon i try to run ur code, but when job done, el stop directly.

So, i cant repeat job.

@agronholm
Copy link
Owner Author

@txomon's code won't work. You can't just add new event loops whenever you feel like it.

@agronholm
Copy link
Owner Author

You have to pass the existing event loop to the scheduled callable and then use call_soon_threadsafe().

@binhnq94
Copy link

binhnq94 commented May 5, 2016

call_soon_threadsafe() not accept coroutine parameter 👎
BaseEventLoop.call_soon_threadsafe(callback, *args)
https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.BaseEventLoop.call_soon_threadsafe

@agronholm
Copy link
Owner Author

I know that. You have to pass it a callable which then calls get_event_loop().create_task(your_coroutine_function()).

@agronholm
Copy link
Owner Author

But to enable the runner to properly detect exceptions and such, you need to then have a concurrent.futures.Future object. In the event loop thread you do future.set_result() or future.set_exception() and in the wrapper function (that runs in the thread pool) you do return future.result(). I know, it gets hairy. That's why I didn't want to instruct people to do this.

@binhnq94
Copy link

binhnq94 commented May 5, 2016

Super thank 👍

@evilaliv3
Copy link

evilaliv3 commented Sep 25, 2016

@agronholm: we are evaluating to re-adopt apscheduler in GlobaLeaks: globaleaks/GlobaLeaks#1769

What i'm failing to understand is if this functionality is functional in twisted since previous version packaged in Debian/Ubuntu, or if its only available now in relation to python 3.5.

Is there any way to handle this behaviour with twisted and python 2.7 by using the twisted inlineCallbacks?

(thanks)

@agronholm
Copy link
Owner Author

There is no way to detect @inlineCallbacks, so it will not be supported before 4.0. Native coroutines would be doable before that but there is no working test harness for pytest and twisted (pytest-twisted is really old and does not work with native coroutines).

@evilaliv3
Copy link

i see thank you for the explanation

@agronholm
Copy link
Owner Author

The fundamental problem here is that currently all those executors default to running the jobs in worker threads. If I change this default behavior, it will break things for a lot of people. I was able to add native coroutine support because they're detectable and didn't work before anyway so I was able to add code that takes a different code path when a coroutine function is detected.

@evilaliv3
Copy link

understood;
do not worry, we were looking forward to the possibility of integrating apsscheduler given the great work you are doing and the need of being able of monitoring the job from failures.

sadly we will have still to go for a custom implementation because we will be stuck with python 2.7 for some time and in general we can use eventually use only the apscheduler packaged in ubuntu trusty/xenial.

@agronholm
Copy link
Owner Author

You could easily implement a custom executor that runs jobs directly in the reactor thread. In fact, that makes me wonder why I'm not doing that – providing an alternate executor that does this, even if it's not the default one.

@agronholm
Copy link
Owner Author

Thinking more about it, I'm not really sure if it's easy but it's far less work than implementing all those monitoring etc. features all by yourselves.

@evilaliv3
Copy link

the basic custom implementation that we use is the following: https://github.com/globaleaks/GlobaLeaks/blob/master/backend/globaleaks/jobs/base.py

the main issue is not be able to stop something that maybe can be stuck into the reactor, but this is mostly an issue with twisted that i think is not solvable in any way.

@agronholm
Copy link
Owner Author

So is your problem that the reactor could be stuck running code that's blocking everything else and you can't stop it?

@agronholm
Copy link
Owner Author

I didn't realize Ubuntu had such an ancient version of APScheduler. Why do you restrict yourselves to OS provided packages?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants