Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
Extended queue options - for better viewing and partial cleaning.
Browse files Browse the repository at this point in the history
  • Loading branch information
David Read committed Sep 21, 2012
1 parent b3a45f3 commit 29d937a
Showing 1 changed file with 54 additions and 16 deletions.
70 changes: 54 additions & 16 deletions ckan/lib/cli.py
Expand Up @@ -790,8 +790,9 @@ class Celery(CkanCommand):
celeryd - run the celery daemon
celeryd run - run the celery daemon
celeryd run concurrency=1 - run the celery daemon with argument 'concurrency'
celeryd view - view all tasks in the queue
celeryd view [num] - view queue stats and the <num> most recent tasks
celeryd clean - delete all tasks in the queue
celeryd clean-done - delete tasks in the queue that have been done
'''
min_args = 0
summary = __doc__.split('\n')[0]
Expand All @@ -805,9 +806,15 @@ def command(self):
if cmd == 'run':
self.run_()
elif cmd == 'view':
self.view()
if len(self.args) > 1:
num = int(self.args[1])
else:
num = 1
self.view(num)
elif cmd == 'clean':
self.clean()
elif cmd == 'clean-done':
self.clean(include_tasks_not_done=False)
else:
print 'Command %s not recognized' % cmd
sys.exit(1)
Expand All @@ -817,34 +824,65 @@ def run_(self):
os.environ['CKAN_CONFIG'] = os.path.abspath(self.options.config)
from ckan.lib.celery_app import celery
celery_args = ['--%s' % arg for arg in self.args[1:]]
celery.worker_main(argv=['celeryd', '--loglevel=INFO'] + celery_args)
for arg in self.args:
if arg.startswith('loglevel'):
break
else:
celery_args.append('--loglevel=INFO')
celery.worker_main(argv=['celeryd'] + celery_args)

def view(self):
def view(self, num):
self._load_config()
import pprint
import ckan.model as model
from ckan.lib.helpers import json
from kombu.transport.sqlalchemy.models import Message
from kombu.serialization import registry
from kombu.transport.virtual import Base64

# summary
q = model.Session.query(Message)
q_visible = q.filter_by(visible=True)
print '%i messages (total)' % q.count()
print '%i visible messages' % q_visible.count()
for message in q:
print 'Messages on the queue:'
print '%i total' % q.count()
print '%i not yet processed ("visible")' % q_visible.count()

# last messages
if num:
print '%i newest messages:\n' % num
for message in q.order_by(Message.sent_at.desc()).limit(num):
if message.visible:
print '%i: Visible' % (message.id)
print 'Task %i: Not yet processed' % (message.id)
else:
print '%i: Invisible Sent:%s' % (message.id, message.sent_at)

def clean(self):
print 'Task %i: Processed at:%s' % (message.id, message.sent_at.strftime('%Y-%m-%d %H:%M'))
payload_dict = json.loads(message.payload)
body = payload_dict['body']
if body:
body = Base64().decode(body)
payload = registry.decode(
body,
content_type=payload_dict['content-type'],
content_encoding=payload_dict['content-encoding'])
pprint.pprint(payload)
print # newline

def clean(self, include_tasks_not_done=True):
self._load_config()
import ckan.model as model
import pprint
tasks_initially = model.Session.execute("select * from kombu_message").rowcount
if include_tasks_not_done:
domain = 'from kombu_message'
else:
domain = 'from kombu_message where visible = false'
tasks_initially = model.Session.execute('select * %s' % domain).rowcount
if not tasks_initially:
print 'No tasks to delete'
sys.exit(0)
query = model.Session.execute("delete from kombu_message")
tasks_afterwards = model.Session.execute("select * from kombu_message").rowcount
print '%i of %i tasks deleted' % (tasks_initially - tasks_afterwards,
tasks_initially)
query = model.Session.execute('delete %s' % domain)
tasks_afterwards = model.Session.execute('select * %s' % domain).rowcount
print '%i of %i %stasks deleted' % (tasks_initially - tasks_afterwards,
tasks_initially,
' done' if not include_tasks_not_done else '')
if tasks_afterwards:
print 'ERROR: Failed to delete all tasks'
sys.exit(1)
Expand Down

0 comments on commit 29d937a

Please sign in to comment.