Skip to content

Commit

Permalink
be able to use collective.celery for queuing tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
vangheem committed Sep 11, 2015
1 parent bf30b68 commit c261d8a
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 31 deletions.
141 changes: 121 additions & 20 deletions collective/documentviewer/async.py
Expand Up @@ -19,10 +19,23 @@
except ImportError:
pass

try:
from celery.result import AsyncResult # noqa
except ImportError:
pass


def asyncInstalled():
try:
import plone.app.async
import plone.app.async # noqa
return True
except:
return False


def celeryInstalled():
try:
import collective.celery # noqa
return True
except:
return False
Expand All @@ -35,7 +48,14 @@ def isConversion(job, sitepath):
return sitepath == job.args[1] and job.args[4] == runConversion


class JobRunner(object):
def getJobRunner(obj):
if asyncInstalled():
return AsyncJobRunner(obj)
elif celeryInstalled():
return CeleryJobRunner(obj)


class AsyncJobRunner(object):
"""
helper class to setup the quota and check the
queue before adding it to the queue
Expand Down Expand Up @@ -130,28 +150,109 @@ def move_to_front(self):
bucket._data = tuple(jobs)


def queueJob(obj):
try:
from collective.celery import task

@task()
def _celeryQueueJob(obj):
runConversion(obj)
settings = Settings(obj)
settings.converting = True
except ImportError:
pass


class CeleryJobRunner(object):
"""
queue a job async if available.
otherwise, just run normal
helper class to setup the quota and check the
queue before adding it to the queue
"""

def __init__(self, obj):
self.object = obj
self.portal = getPortal(obj)
self.settings = Settings(obj)

def is_current_active(self, job):
try:
return job.state not in ('PENDING', 'FAILURE', 'SUCCESS')
except TypeError:
return False

@property
def already_in_queue(self):
"""
Check if object in queue
"""
return self.find_job()[0] > -1

def find_position(self):
# active in queue
try:
return self.find_job()[0]
except KeyError:
return -1

def find_job(self):
result = AsyncResult(self.settings.celery_task_id)
if self.is_current_active(result):
return 0, result

return -1, None

def queue_it(self):
result = _celeryQueueJob.delay(self.object)
self.settings.celery_task_id = result.id
self.settings.converting = True

def move_to_front(self):
pass


class QueueException(Exception):
pass


def asyncQueueJob(obj):
try:
runner = AsyncJobRunner(obj)
runner.set_quota()
if runner.already_in_queue:
logger.info('object %s already in queue for conversion' % (
repr(obj)))
else:
runner.queue_it()
return
except:
raise QueueException


def celeryQueueJob(obj):
try:
runner = CeleryJobRunner(obj)
if runner.already_in_queue:
logger.info('object %s already in queue for conversion' % (
repr(obj)))
else:
runner.queue_it()
return
except:
raise QueueException


def queueJob(obj):
converter = Converter(obj)
if not converter.can_convert:
return
if asyncInstalled():
try:
runner = JobRunner(obj)
runner.set_quota()
if runner.already_in_queue:
logger.info('object %s already in queue for conversion' % (
repr(obj)))
else:
runner.queue_it()
return
except:
logger.exception("Error using plone.app.async with "
"collective.documentviewer. Converting pdf without "
"plone.app.async...")
try:
if asyncInstalled():
asyncQueueJob(obj)
elif celeryInstalled():
celeryQueueJob(obj)
else:
converter()
else:
except QueueException:
logger.exception(
"Error using async with "
"collective.documentviewer. Converting pdf without async...")
converter()
19 changes: 8 additions & 11 deletions collective/documentviewer/browser/views.py
Expand Up @@ -11,8 +11,9 @@
from Products.Five.browser import BrowserView
from collective.documentviewer import mf as _
from collective.documentviewer import storage
from collective.documentviewer.async import JobRunner
from collective.documentviewer.async import getJobRunner
from collective.documentviewer.async import asyncInstalled
from collective.documentviewer.async import celeryInstalled
from collective.documentviewer.async import queueJob
from collective.documentviewer.convert import DUMP_FILENAME
from collective.documentviewer.convert import TEXT_REL_PATHNAME
Expand Down Expand Up @@ -328,7 +329,7 @@ def settings_enabled(self):
return self.context.getLayout() == 'documentviewer'

def async_enabled(self):
return asyncInstalled()
return asyncInstalled() or celeryInstalled()

def clean_folder(self, catalog, storage_loc):
if not os.path.isdir(storage_loc):
Expand Down Expand Up @@ -395,7 +396,7 @@ def __call__(self):
mtool = getToolByName(self.context, 'portal_membership')
self.manager = mtool.checkPermission('cmf.ManagePortal',
self.context)
self.async_installed = asyncInstalled()
self.async_installed = asyncInstalled() or celeryInstalled()
self.converting = False
if self.enabled():
req = self.request
Expand All @@ -411,15 +412,11 @@ def __call__(self):
settings.filehash = '--foobar--'
queueJob(self.context)
self.converting = True
if self.async_installed:
self.position = JobRunner(self.context).find_position()
queueJob(self.context)
else:
return self.request.response.redirect(
self.context.absolute_url() + '/view')
return self.request.response.redirect(
self.context.absolute_url() + '/view')
else:
if self.async_installed:
self.position = JobRunner(self.context).find_position()
self.position = getJobRunner(self.context).find_position()
if self.position > -1:
self.converting = True

Expand Down Expand Up @@ -523,7 +520,7 @@ def __call__(self):
if not authenticator.verify():
raise Unauthorized

JobRunner(self.context).move_to_front()
getJobRunner(self.context).move_to_front()

return self.request.response.redirect(
self.context.absolute_url() + '/@@convert-to-documentviewer')
Expand Down
3 changes: 3 additions & 0 deletions docs/HISTORY.txt
Expand Up @@ -4,6 +4,9 @@ Changelog
4.0.1 (unreleased)
------------------

- be able to use collective.celery for queuing tasks
[vangheem]

- fix async monitor registration
[pilz]

Expand Down
3 changes: 3 additions & 0 deletions setup.py
Expand Up @@ -46,5 +46,8 @@
[z3c.autoinclude.plugin]
target = plone
[celery_tasks]
documentviewer = collective.documentviewer.async
"""
)

0 comments on commit c261d8a

Please sign in to comment.