Browse files

implemented server_info, added tests, refactorings

  • Loading branch information...
1 parent d931342 commit b7cb61b4f629e2ad1a52674bc68e1358d1040d12 @jsk jsk committed Apr 6, 2012
View
3 MANIFEST.in
@@ -1,3 +1,4 @@
include README.rst
include LICENSE
-include tests.py
+include requirements.txt
+include runtests.py
View
5 README.rst
@@ -43,7 +43,6 @@ Jozef Ševčík, sevcik@codescale.net
TODO
====
+* cleaning up gearman jobs after failed unit tests
* improve logging
-* gearman_overview - support for multiple gearman servers
-* gearman_overview - output 'workers'
-* tests (foo/reverse worker + job submission, status)
+* gearman_server_info - output 'workers'
View
137 django_gearman_commands/__init__.py
@@ -1,3 +1,138 @@
+# -*- coding: utf-8 -*-
+from django.core.management.base import BaseCommand, CommandError
+from django.conf import settings
+from datetime import datetime
+import time
+import gearman
+import logging
+
__version__ = '0.1'
-from .base import *
+log = logging.getLogger(__name__)
+
+class HookedGearmanWorker(gearman.GearmanWorker):
+ """GearmanWorker with hooks support."""
+
+ def __init__(self, exit_after_job, host_list=None):
+ super(HookedGearmanWorker, self).__init__(host_list=host_list)
+ self.exit_after_job = exit_after_job
+
+ def after_job(self):
+ return (not self.exit_after_job)
+
+class GearmanWorkerBaseCommand(BaseCommand):
+ """Base command for Gearman workers.
+
+ Subclass this class in your gearman worker commands.
+
+ """
+
+ @property
+ def task_name(self):
+ """Override task_name property in worker to indicate what task should be registered in Gearman."""
+ raise NotImplementedError, 'task_name should be implemented in worker'
+
+ @property
+ def exit_after_job(self):
+ """Return True if worker should exit after processing job. False by default.
+
+ You do not need to override this in standard case, except in case
+ you want to control and terminate worker after processing jobs.
+ Used by test worker 'footest'.
+
+ """
+ return False
+
+ def do_job(self, job_data):
+ """Gearman job execution logic.
+
+ Override this in worker to perform job.
+
+ """
+ raise NotImplementedError, 'do_job() should be implemented in worker'
+
+ def handle(self, *args, **options):
+ try:
+ worker = HookedGearmanWorker(exit_after_job=self.exit_after_job, host_list=settings.GEARMAN_SERVERS)
+ log.info('Registering gearman task: %s', self.task_name)
+ worker.register_task(self.task_name, self._invoke_job)
+ except:
+ log.exception('Problem with registering gearman task')
+ raise
+
+ worker.work()
+
+ def _invoke_job(self, worker, job):
+ """Invoke gearman job.
+
+ Honestly, wrapper for do_job().
+
+ """
+ try:
+ # represent default job data '' as None
+ job_data = job.data if job.data else None
+ self.stdout.write('Invoking gearman job, task: %s.\n' % self.task_name)
+
+ result = self.do_job(job_data)
+
+ log.info('Job finished, task: %s', self.task_name)
+ self.stdout.write('Job finished, task: %s\n' % self.task_name)
+
+ if result is not None:
+ log.info(result)
+ self.stdout.write('%s\n' % result)
+
+ return 'OK'
+ except:
+ log.exception('Error occured when invoking job, task: %s', self.task_name)
+ raise
+
+
+class GearmanServerInfo():
+ """Administration informations about Gearman server.
+
+ See GearmanAdminClient for reference: http://packages.python.org/gearman/admin_client.html
+
+ """
+
+ def __init__(self, host):
+ self.host = host
+ self.server_version = None
+ self.tasks = None
+
+ def get_server_info(self):
+ """Read Gearman server info - status, workers and and version."""
+ result = ''
+
+ # read server status info
+ client = gearman.GearmanAdminClient([self.host])
+
+ self.server_version = client.get_version()
+ self.tasks = client.get_status()
+
+ # use prettytable if available, otherwise raw output
+ try:
+ from prettytable import PrettyTable
+ use_prettytable = True
+ except ImportError:
+ use_prettytable = False
+
+ if use_prettytable:
+ # PrettyTable output
+ table = PrettyTable(['Gearman Server Host', 'Gearman Server Version'])
+ table.add_row([self.host, self.server_version])
+ result += '%s.\n\n' % str(table)
+
+ table = PrettyTable(['Task Name', 'Total Workers', 'Running Jobs', 'Queued Jobs'])
+ for r in self.tasks:
+ table.add_row([r['task'], r['workers'], r['running'], r['queued']])
+
+ result += '%s.\n\n' % str(table)
+
+ else:
+ # raw output
+ result += 'Gearman Server Host:%s\n' % self.host
+ result += 'Gearman Server Version:%s.\n' % self.server_version
+ result += 'Tasks:\n%s\n' % str(self.tasks)
+
+ return result
View
84 django_gearman_commands/base.py
@@ -1,84 +0,0 @@
-# -*- coding: utf-8 -*-
-from django.core.management.base import BaseCommand, CommandError
-from django.conf import settings
-from datetime import datetime
-import time
-import gearman
-import logging
-
-log = logging.getLogger(__name__)
-
-class HookedGearmanWorker(gearman.GearmanWorker):
-
- def __init__(self, exit_after_job, host_list=None):
- super(HookedGearmanWorker, self).__init__(host_list=host_list)
- self.exit_after_job = exit_after_job
-
- def after_job(self):
- return (not self.exit_after_job)
-
-class GearmanWorkerBaseCommand(BaseCommand):
- """Base command for Gearman workers."""
-
- @property
- def task_name(self):
- """Override task_name property in worker to indicate what task should be registered in Gearman."""
- raise NotImplementedError, 'task_name should be implemented in worker'
-
- @property
- def exit_after_job(self):
- """Return True if worker should exit after processing job. False by default.
-
- You do not need to override this in standard case, except in case
- you want to control and terminate worker after processing jobs.
- Used by test worker 'footest'.
-
- """
- return False
-
- def do_job(self, job_data):
- """Gearman job execution logic.
-
- Override this in worker to perform job.
-
- """
- raise NotImplementedError, 'do_job() should be implemented in worker'
-
- def handle(self, *args, **options):
- try:
- worker = HookedGearmanWorker(exit_after_job=self.exit_after_job, host_list=settings.GEARMAN_SERVERS)
- log.info('Registering gearman task: %s', self.task_name)
- worker.register_task(self.task_name, self._invoke_job)
- except:
- log.exception('Problem with registering gearman task')
- raise
-
- worker.work()
-
- def _invoke_job(self, worker, job):
- """Invoke gearman job.
-
- Honestly, wrapper for do_job().
-
- """
- try:
- # represent default job data '' as None
- job_data = job.data if job.data else None
- self.stdout.write('Invoking gearman job, task: %s.\n' % self.task_name)
-
- result = self.do_job(job_data)
-
- log.info('Job finished, task: %s', self.task_name)
- self.stdout.write('Job finished, task: %s\n' % self.task_name)
-
- if result is not None:
- log.info(result)
- self.stdout.write('%s\n' % result)
-
- return 'OK'
- except:
- log.exception('Error occured when invoking job, task: %s', self.task_name)
- raise
-
-
-
View
34 django_gearman_commands/management/commands/gearman_overview.py
@@ -1,34 +0,0 @@
-# -*- coding: utf-8 -*-
-from django.core.management.base import BaseCommand, CommandError
-from django.conf import settings
-from prettytable import PrettyTable
-import gearman
-import logging
-
-log = logging.getLogger(__name__)
-
-class Command(BaseCommand):
- """Pretty-print overview of Gearman server status and workers."""
-
- help = 'Print overview of Gearman server status and workers.'
-
- def handle(self, *args, **options):
- client = gearman.GearmanAdminClient(settings.GEARMAN_SERVERS)
-
- # get server version
- version = client.get_version()
-
- table = PrettyTable(['Gearman Server Version'])
- table.add_row([version])
-
- self.stdout.write('%s.\n\n' % table)
-
- # status
- raw_status = client.get_status()
- table = PrettyTable(['Task Name', 'Total Workers', 'Running Jobs', 'Queued Jobs'])
- for r in raw_status:
- table.add_row([r['task'], r['workers'], r['running'], r['queued']])
-
- self.stdout.write('%s.\n' % table)
-
- # TODO: workers
View
22 django_gearman_commands/management/commands/gearman_server_info.py
@@ -0,0 +1,22 @@
+# -*- coding: utf-8 -*-
+from django.core.management.base import BaseCommand, CommandError
+from django.conf import settings
+from prettytable import PrettyTable
+import gearman
+import logging
+from django_gearman_commands import GearmanServerInfo
+
+log = logging.getLogger(__name__)
+
+class Command(BaseCommand):
+ """Pretty-print overview of Gearman server status and workers."""
+
+ help = 'Print overview of Gearman server status and workers.'
+
+ def handle(self, *args, **options):
+ result = ''
+ for server in settings.GEARMAN_SERVERS:
+ server_info = GearmanServerInfo(server)
+ result += server_info.get_server_info()
+
+ self.stdout.write(result)
View
2 django_gearman_commands/management/commands/gearman_worker_footest.py
@@ -20,7 +20,7 @@ def exit_after_job(self):
def do_job(self, job_data):
# set data to cache
- cache.set('footest', u'I AM FOO !')
+ cache.set('footest', 'I AM FOO !' if not job_data else job_data)
View
52 django_gearman_commands/tests/__init__.py
@@ -1 +1,51 @@
-from .commands import *
+# -*- coding: utf-8 -*-
+from django.core.management import call_command
+from django.conf import settings
+from django.test import TestCase
+from django.core.cache import cache
+from django_gearman_commands import GearmanServerInfo
+import logging
+
+log = logging.getLogger(__name__)
+
+class GearmanCommandsTest(TestCase):
+
+ def test_server_info(self):
+ server_info = GearmanServerInfo(settings.GEARMAN_SERVERS[0])
+ server_info.get_server_info()
+ self.assertTrue(server_info.server_version.startswith('OK'), 'Unexpected server version string')
+ self.assertTrue(type(server_info.tasks) is tuple, 'Unexpected server tasks type')
+
+ # verify command is callable
+ overview = call_command('gearman_server_info')
+
+ def test_worker_simple(self):
+ # submit job
+ call_command('gearman_submit_job', 'footest')
+
+ # let the worker process the job
+ call_command('gearman_worker_footest')
+
+ # verify job was processed
+ self.assertEqual(cache.get('footest'), 'I AM FOO !', 'Unexpected footest worker result')
+
+ def test_worker_task_data_string(self):
+ # submit job
+ call_command('gearman_submit_job', 'footest', 'DATA')
+
+ # let the worker process the job
+ call_command('gearman_worker_footest')
+
+ # verify job was processed and processed task data
+ self.assertEqual(cache.get('footest'), 'DATA', 'Unexpected footest worker result (data string)')
+
+ def test_worker_task_data_pickled(self):
+ import pickle
+ # submit job
+ call_command('gearman_submit_job', 'footest', pickle.dumps(u'DATA'))
+
+ # let the worker process the job
+ call_command('gearman_worker_footest')
+
+ # verify job was processed and processed task data
+ self.assertEqual(pickle.loads(cache.get('footest')), u'DATA', 'Unexpected footest worker result (data pickled)')
View
24 django_gearman_commands/tests/commands.py
@@ -1,24 +0,0 @@
-# -*- coding: utf-8 -*-
-from django.core.management import call_command
-from django.conf import settings
-from django.test import TestCase
-from django.core.cache import cache
-import django_gearman_commands
-import logging
-
-log = logging.getLogger(__name__)
-
-class GearmanCommandsTest(TestCase):
-
- def test_overview(self):
- call_command('gearman_overview')
-
- def test_worker_simple(self):
- # submit job
- call_command('gearman_submit_job', 'footest')
-
- # let the worker process the job
- call_command('gearman_worker_footest')
-
- # verify job was processed
- self.assertEqual(cache.get('footest'), u'I AM FOO !', 'Unexpected footest worker result')
View
2 setup.py
@@ -25,7 +25,7 @@ def read(fname):
packages=['django_gearman_commands'],
platforms='any',
classifiers=[
- 'Development Status :: 4 - Beta',
+ 'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'Environment :: Web Environment',
'Intended Audience :: Developers',

0 comments on commit b7cb61b

Please sign in to comment.