Skip to content

Commit

Permalink
Is an if clause enough not to trigger invalid syntax?
Browse files Browse the repository at this point in the history
  • Loading branch information
txomon committed Apr 28, 2016
1 parent 1b1553a commit 8deb71d
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 70 deletions.
89 changes: 63 additions & 26 deletions apscheduler/executors/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
from __future__ import absolute_import

import asyncio
import sys
from traceback import format_tb

try:
import asyncio

real_asyncio = True
except ImportError: # pragma: nocover
try:
import trollius as asyncio

real_asyncio = False
except ImportError:
raise ImportError(
'AsyncIOScheduler requires either Python 3.4 or the asyncio/trollius package installed')

from apscheduler.events import JobExecutionEvent, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.executors.base import BaseExecutor
from apscheduler.executors.base import BaseExecutor, job_runtime


class AsyncIOExecutor(BaseExecutor):
Expand All @@ -31,29 +43,54 @@ def callback(f):
else:
self._run_job_success(job.id, events)

events = self._job_runtime(job, run_times)
future_events = []
for event in events:
if not (isinstance(event, asyncio.Future) or asyncio.iscoroutine(event)):
future = asyncio.Future()
future.set_result(event)
future_events.append(future)
else:
future_events.append(event)
future = asyncio.gather(*events)
if not asyncio.iscoroutinefunction(job.func):
future = self._eventloop.run_in_executor(None, job_runtime, job, run_times, self._logger.name)
else:
events = job_runtime(job, run_times, self._logger.name, self._run_job)
future_events = []
for event in events:
if not (isinstance(event, asyncio.Future) or asyncio.iscoroutine(event)):
future = asyncio.Future()
future.set_result(event)
future_events.append(future)
else:
future_events.append(event)
future = asyncio.gather(*events)
future.add_done_callback(callback)

@asyncio.coroutine
def _run_job(self, job, run_time):
"""Actual implementation of calling the job function"""
try:
retval = yield from job.func(*job.args, **job.kwargs)
except:
exc, tb = sys.exc_info()[1:]
formatted_tb = ''.join(format_tb(tb))
self._logger.exception('Job "%s" raised an exception', job)
return JobExecutionEvent(EVENT_JOB_ERROR, job.id, job._jobstore_alias, run_time,
exception=exc, traceback=formatted_tb)
else:
self._logger.info('Job "%s" executed successfully', job)
return JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, job._jobstore_alias, run_time, retval=retval)
if real_asyncio:
@asyncio.coroutine
def _run_job(self, job, run_time):
"""Actual implementation of calling the job function"""
try:
for v in job.func(*job.args, **job.kwargs):
retval = yield v
except:
exc, tb = sys.exc_info()[1:]
formatted_tb = ''.join(format_tb(tb))
self._logger.exception('Job "%s" raised an exception', job)
return JobExecutionEvent(EVENT_JOB_ERROR, job.id, job._jobstore_alias, run_time,
exception=exc, traceback=formatted_tb)
else:
self._logger.info('Job "%s" executed successfully', job)
return JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, job._jobstore_alias, run_time, retval=retval)
else:
@asyncio.coroutine
def _run_job(self, job, run_time):
"""Actual implementation of calling the job function"""
try:
retval = yield asyncio.From(job.func(*job.args, **job.kwargs))
except:
exc, tb = sys.exc_info()[1:]
formatted_tb = ''.join(format_tb(tb))
self._logger.exception('Job "%s" raised an exception', job)
raise asyncio.Return(
JobExecutionEvent(EVENT_JOB_ERROR, job.id, job._jobstore_alias, run_time,
exception=exc, traceback=formatted_tb)
)
else:
self._logger.info('Job "%s" executed successfully', job)
raise asyncio.Return(
JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, job._jobstore_alias, run_time,
retval=retval)
)
76 changes: 40 additions & 36 deletions apscheduler/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,40 +99,44 @@ def _run_job_error(self, job_id, exc, traceback=None):
exc_info = (exc.__class__, exc, traceback)
self._logger.error('Error running job %s', job_id, exc_info=exc_info)

def _job_runtime(self, job, run_times):
"""
Called by executors to run the job. Returns a list of scheduler events to be dispatched by the
scheduler.

"""
events = []
for run_time in run_times:
# See if the job missed its run time window, and handle
# possible misfires accordingly
if job.misfire_grace_time is not None:
difference = datetime.now(utc) - run_time
grace_time = timedelta(seconds=job.misfire_grace_time)
if difference > grace_time:
events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, job._jobstore_alias,
run_time))
self._logger.warning('Run time of job "%s" was missed by %s', job, difference)
continue
events.append(self._run_job(job, run_time))

self._logger.info('Running job "%s" (scheduled at %s)', job, run_time)

return events

def _run_job(self, job, run_time):
"""Actual implementation of calling the job function"""
try:
retval = job.func(*job.args, **job.kwargs)
except:
exc, tb = sys.exc_info()[1:]
formatted_tb = ''.join(format_tb(tb))
self._logger.exception('Job "%s" raised an exception', job)
return JobExecutionEvent(EVENT_JOB_ERROR, job.id, job._jobstore_alias, run_time,
exception=exc, traceback=formatted_tb)
else:
self._logger.info('Job "%s" executed successfully', job)
JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, job._jobstore_alias, run_time, retval=retval)
def run_job(job, run_time, logger_name):
"""Actual implementation of calling the job function"""
logger = logging.getLogger(logger_name)
try:
retval = job.func(*job.args, **job.kwargs)
except:
exc, tb = sys.exc_info()[1:]
formatted_tb = ''.join(format_tb(tb))
logger.exception('Job "%s" raised an exception', job)
return JobExecutionEvent(EVENT_JOB_ERROR, job.id, job._jobstore_alias, run_time,
exception=exc, traceback=formatted_tb)
else:
logger.info('Job "%s" executed successfully', job)
JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, job._jobstore_alias, run_time, retval=retval)


def job_runtime(job, run_times, logger_name, run_job_func=run_job):
"""
Called by executors to run the job. Returns a list of scheduler events to be dispatched by the
scheduler.
"""
logger = logging.getLogger(logger_name)
events = []
for run_time in run_times:
# See if the job missed its run time window, and handle
# possible misfires accordingly
if job.misfire_grace_time is not None:
difference = datetime.now(utc) - run_time
grace_time = timedelta(seconds=job.misfire_grace_time)
if difference > grace_time:
events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, job._jobstore_alias,
run_time))
logger.warning('Run time of job "%s" was missed by %s', job, difference)
continue
events.append(run_job_func(job, run_time, logger_name))

logger.info('Running job "%s" (scheduled at %s)', job, run_time)

return events
4 changes: 2 additions & 2 deletions apscheduler/executors/debug.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sys

from apscheduler.executors.base import BaseExecutor
from apscheduler.executors.base import BaseExecutor, job_runtime


class DebugExecutor(BaseExecutor):
Expand All @@ -13,7 +13,7 @@ class DebugExecutor(BaseExecutor):

def _do_submit_job(self, job, run_times):
try:
events = self._job_runtime(job, run_times)
events = job_runtime(job, run_times, self._logger.name)
except:
self._run_job_error(job.id, *sys.exc_info()[1:])
else:
Expand Down
4 changes: 2 additions & 2 deletions apscheduler/executors/gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import sys

from apscheduler.executors.base import BaseExecutor
from apscheduler.executors.base import BaseExecutor, job_runtime

try:
import gevent
Expand All @@ -26,4 +26,4 @@ def callback(greenlet):
else:
self._run_job_success(job.id, events)

gevent.spawn(self._job_runtime, job, run_times).link(callback)
gevent.spawn(job_runtime, job, run_times, self._logger.name).link(callback)
4 changes: 2 additions & 2 deletions apscheduler/executors/pool.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import concurrent.futures
from abc import abstractmethod

from apscheduler.executors.base import BaseExecutor
from apscheduler.executors.base import BaseExecutor, job_runtime, run_job


class BasePoolExecutor(BaseExecutor):
Expand All @@ -19,7 +19,7 @@ def callback(f):
else:
self._run_job_success(job.id, f.result())

f = self._pool.submit(self._job_runtime, job, run_times)
f = self._pool.submit(job_runtime, run_job, job, run_times, self._logger.name)
f.add_done_callback(callback)

def shutdown(self, wait=True):
Expand Down
5 changes: 3 additions & 2 deletions apscheduler/executors/twisted.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import absolute_import

from apscheduler.executors.base import BaseExecutor
from apscheduler.executors.base import BaseExecutor, job_runtime


class TwistedExecutor(BaseExecutor):
Expand All @@ -21,4 +21,5 @@ def callback(success, result):
else:
self._run_job_error(job.id, result.value, result.tb)

self._reactor.getThreadPool().callInThreadWithCallback(callback, self._job_runtime, job, run_times)
self._reactor.getThreadPool() \
.callInThreadWithCallback(callback, job_runtime, job, run_times, self._logger.name)

0 comments on commit 8deb71d

Please sign in to comment.