Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trying to fix #96 #134

Closed
wants to merge 8 commits into from
80 changes: 76 additions & 4 deletions apscheduler/executors/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
from __future__ import absolute_import

import sys
from traceback import format_tb

try:
import asyncio

real_asyncio = True
except ImportError: # pragma: nocover
try:
import trollius as asyncio

from apscheduler.executors.base import BaseExecutor, run_job
real_asyncio = False
except ImportError:
raise ImportError(
'AsyncIOScheduler requires either Python 3.4 or the asyncio/trollius package installed'
)

from apscheduler.events import JobExecutionEvent, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.executors.base import BaseExecutor, job_runtime


class AsyncIOExecutor(BaseExecutor):
Expand All @@ -16,6 +33,9 @@ def start(self, scheduler, alias):
self._eventloop = scheduler._eventloop

def _do_submit_job(self, job, run_times):
asyncio.get_event_loop().call_soon(self._do_job_runtime, job, run_times)

def _do_job_runtime(self, job, run_times):
def callback(f):
try:
events = f.result()
Expand All @@ -24,6 +44,58 @@ def callback(f):
else:
self._run_job_success(job.id, events)

f = self._eventloop.run_in_executor(None, run_job, job, job._jobstore_alias, run_times,
self._logger.name)
f.add_done_callback(callback)
if not asyncio.iscoroutinefunction(job.func):
future = self._eventloop.run_in_executor(None, job_runtime, job, run_times,
job._jobstore_alias,
self._logger.name)
else:
events = job_runtime(job, run_times, self._logger.name, job._jobstore_alias,
self._run_job)
future_events = []
for event in events:
if not (isinstance(event, asyncio.Future) or asyncio.iscoroutine(event)):
future = asyncio.Future()
future.set_result(event)
future_events.append(future)
else:
future_events.append(event)
future = asyncio.gather(*events)
future.add_done_callback(callback)

if real_asyncio:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope you're not being serious about this duplication. It also won't fix the syntax error. There are better ways to emulate "yield from" in a way that works for both asyncio and trollius.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am all ears =)

The alternative was to place if else depeding on trollius or asyncio for the returns and the yield from, but that looked better.

I am really out of ideas on how to emulate yield from.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Native coroutines have the __await__ method which returns an iterable. On Python < 3.4, you can just iterate over the generator directly:

for v in job.func(*job.args, **job.kwargs):
    retval = yield v

I haven't actually done this kind of emulation before but this should help you figure out a common code path that works for both asyncio and trollius plus all versions of Python.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also an option of using the async() or ensure_future() function which should work nicely together with run_on_executor which also returns a future. You'll need to experiment a bit though!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Native coroutines have the await method which returns an iterable. On Python < 3.4, you can just iterate over the generator directly:

The problem is that they don't accept return inside generators, I already tried and the function doesn't get to execute, that's why trollius has the raise Return()

There is also an option of using the async() or ensure_future() function which should work nicely together with run_on_executor which also returns a future. You'll need to experiment a bit though!

The problem with these is that you need someone to read the future result, or it won't execute. ATM, with the future + callback it works smoothly

@asyncio.coroutine
def _run_job(self, job, run_time, jobstore_alias, logger_name):
"""Actual implementation of calling the job function"""
try:
for v in job.func(*job.args, **job.kwargs):
retval = yield v
except:
exc, tb = sys.exc_info()[1:]
formatted_tb = ''.join(format_tb(tb))
self._logger.exception('Job "%s" raised an exception', job)
return JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,
exception=exc, traceback=formatted_tb)
else:
self._logger.info('Job "%s" executed successfully', job)
return JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,
retval=retval)
else:
@asyncio.coroutine
def _run_job(self, job, run_time, jobstore_alias, logger_name):
"""Actual implementation of calling the job function"""
try:
retval = yield asyncio.From(job.func(*job.args, **job.kwargs))
except:
exc, tb = sys.exc_info()[1:]
formatted_tb = ''.join(format_tb(tb))
self._logger.exception('Job "%s" raised an exception', job)
raise asyncio.Return(
JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,
exception=exc, traceback=formatted_tb)
)
else:
self._logger.info('Job "%s" executed successfully', job)
raise asyncio.Return(
JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,
retval=retval)
)
40 changes: 23 additions & 17 deletions apscheduler/executors/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import logging
import sys
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from datetime import datetime, timedelta
from traceback import format_tb
import logging
import sys

from pytz import utc
import six
from pytz import utc

from apscheduler.events import (
JobExecutionEvent, EVENT_JOB_MISSED, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED)
Expand Down Expand Up @@ -100,14 +100,31 @@ def _run_job_error(self, job_id, exc, traceback=None):
self._logger.error('Error running job %s', job_id, exc_info=exc_info)


def run_job(job, jobstore_alias, run_times, logger_name):
def run_job(job, run_time, logger_name, jobstore_alias):
"""Actual implementation of calling the job function"""
logger = logging.getLogger(logger_name)
try:
retval = job.func(*job.args, **job.kwargs)
except:
exc, tb = sys.exc_info()[1:]
formatted_tb = ''.join(format_tb(tb))
logger.exception('Job "%s" raised an exception', job)
return JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,
exception=exc, traceback=formatted_tb)
else:
logger.info('Job "%s" executed successfully', job)
return JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,
retval=retval)


def job_runtime(job, run_times, logger_name, jobstore_alias, run_job_func=run_job):
"""
Called by executors to run the job. Returns a list of scheduler events to be dispatched by the
scheduler.

"""
events = []
logger = logging.getLogger(logger_name)
events = []
for run_time in run_times:
# See if the job missed its run time window, and handle
# possible misfires accordingly
Expand All @@ -119,19 +136,8 @@ def run_job(job, jobstore_alias, run_times, logger_name):
run_time))
logger.warning('Run time of job "%s" was missed by %s', job, difference)
continue
events.append(run_job_func(job, run_time, logger_name, jobstore_alias))

logger.info('Running job "%s" (scheduled at %s)', job, run_time)
try:
retval = job.func(*job.args, **job.kwargs)
except:
exc, tb = sys.exc_info()[1:]
formatted_tb = ''.join(format_tb(tb))
events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,
exception=exc, traceback=formatted_tb))
logger.exception('Job "%s" raised an exception', job)
else:
events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,
retval=retval))
logger.info('Job "%s" executed successfully', job)

return events
4 changes: 2 additions & 2 deletions apscheduler/executors/debug.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sys

from apscheduler.executors.base import BaseExecutor, run_job
from apscheduler.executors.base import BaseExecutor, job_runtime


class DebugExecutor(BaseExecutor):
Expand All @@ -13,7 +13,7 @@ class DebugExecutor(BaseExecutor):

def _do_submit_job(self, job, run_times):
try:
events = run_job(job, job._jobstore_alias, run_times, self._logger.name)
events = job_runtime(job, run_times, self._logger.name, job._jobstore_alias)
except:
self._run_job_error(job.id, *sys.exc_info()[1:])
else:
Expand Down
8 changes: 4 additions & 4 deletions apscheduler/executors/gevent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from __future__ import absolute_import
import sys

from apscheduler.executors.base import BaseExecutor, run_job
import sys

from apscheduler.executors.base import BaseExecutor, job_runtime

try:
import gevent
Expand All @@ -26,5 +26,5 @@ def callback(greenlet):
else:
self._run_job_success(job.id, events)

gevent.spawn(run_job, job, job._jobstore_alias, run_times, self._logger.name).\
link(callback)
gevent.spawn(job_runtime, job, run_times, self._logger.name, job._jobstore_alias) \
.link(callback)
6 changes: 3 additions & 3 deletions apscheduler/executors/pool.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import abstractmethod
import concurrent.futures
from abc import abstractmethod

from apscheduler.executors.base import BaseExecutor, run_job
from apscheduler.executors.base import BaseExecutor, job_runtime


class BasePoolExecutor(BaseExecutor):
Expand All @@ -19,7 +19,7 @@ def callback(f):
else:
self._run_job_success(job.id, f.result())

f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
f = self._pool.submit(job_runtime, job, run_times, self._logger.name, job._jobstore_alias)
f.add_done_callback(callback)

def shutdown(self, wait=True):
Expand Down
8 changes: 5 additions & 3 deletions apscheduler/executors/twisted.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import absolute_import

from apscheduler.executors.base import BaseExecutor, run_job
from apscheduler.executors.base import BaseExecutor, job_runtime


class TwistedExecutor(BaseExecutor):
Expand All @@ -21,5 +21,7 @@ def callback(success, result):
else:
self._run_job_error(job.id, result.value, result.tb)

self._reactor.getThreadPool().callInThreadWithCallback(
callback, run_job, job, job._jobstore_alias, run_times, self._logger.name)
self._reactor \
.getThreadPool() \
.callInThreadWithCallback(callback, job_runtime, job, run_times, self._logger.name,
job._jobstore_alias)
6 changes: 3 additions & 3 deletions tests/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class FauxJob(object):
_jobstore_alias = 'foo'


def dummy_run_job(job, jobstore_alias, run_times, logger_name):
def dummy_job_runtime(job, jobstore_alias, run_times, logger_name):
raise Exception('dummy')


Expand All @@ -115,8 +115,8 @@ def run_job_error(job_id, exc, traceback):

event = Event()
exc_traceback = [None, None]
monkeypatch.setattr('apscheduler.executors.base.run_job', dummy_run_job)
monkeypatch.setattr('apscheduler.executors.pool.run_job', dummy_run_job)
monkeypatch.setattr('apscheduler.executors.base.job_runtime', dummy_job_runtime)
monkeypatch.setattr('apscheduler.executors.pool.job_runtime', dummy_job_runtime)
monkeypatch.setattr(executor, '_run_job_error', run_job_error)
executor.submit_job(FauxJob(), [])

Expand Down