Permalink
Browse files

Merge pull request #3 from char0n/master

Namespace support
  • Loading branch information...
2 parents 8466317 + 8fc5918 commit ee847e372f697845dd63fa27c56af36ce0d9946d @jsk jsk committed Nov 26, 2012
View
@@ -10,4 +10,5 @@ pip-log.txt*
*.swn
logs
dist
-django_gearman_commands.egg-info
+django_gearman_commands.egg-info
+.idea
View
@@ -166,7 +166,7 @@ As you see, you need to do three things:
* create Command class inheriting from django_gearman_commands.GearmanWorkerBaseCommand class
* override task_name property and do_job() method
-**task_name** is unique identification of task, which your worker is supposed to do. Submitting jobs is done via sending task name and optional job parameters.
+**task_name** is unique identification of task, which your worker is supposed to do. Submitting jobs is done via sending task name and optional job parameters. Task name can be also easily name-spaced.
**do_job()** is method which will be invoked when job is submitted. If job was submitted with arguments, 'job_data' is not None.
@@ -343,5 +343,6 @@ Authors and Contributors
Author: Jozef Ševčík, sevcik@codescale.net
Contributors:
-None. Be the first ! :)
+
+ * Vladimír Gorej (gorej@codescale.net)
@@ -1,15 +1,23 @@
# -*- coding: utf-8 -*-
-from django.core.management.base import BaseCommand, CommandError
-from django.conf import settings
-from datetime import datetime
+
import time
-import gearman
import logging
+from datetime import datetime
+
+import gearman
+
+from django.core.management import call_command
+from django.core.management.base import BaseCommand, CommandError
+from django.conf import settings
+
+import django_gearman_commands.settings
__version__ = '0.1'
+
log = logging.getLogger(__name__)
+
class HookedGearmanWorker(gearman.GearmanWorker):
"""GearmanWorker with hooks support."""
@@ -18,19 +26,19 @@ def __init__(self, exit_after_job, host_list=None):
self.exit_after_job = exit_after_job
def after_job(self):
- return (not self.exit_after_job)
+ return not self.exit_after_job
+
class GearmanWorkerBaseCommand(BaseCommand):
"""Base command for Gearman workers.
Subclass this class in your gearman worker commands.
"""
-
@property
def task_name(self):
"""Override task_name property in worker to indicate what task should be registered in Gearman."""
- raise NotImplementedError, 'task_name should be implemented in worker'
+ raise NotImplementedError('task_name should be implemented in worker')
@property
def exit_after_job(self):
@@ -49,14 +57,16 @@ def do_job(self, job_data):
Override this in worker to perform job.
"""
- raise NotImplementedError, 'do_job() should be implemented in worker'
+ raise NotImplementedError('do_job() should be implemented in worker')
def handle(self, *args, **options):
try:
- worker = HookedGearmanWorker(exit_after_job=self.exit_after_job, host_list=settings.GEARMAN_SERVERS)
+ worker = HookedGearmanWorker(exit_after_job=self.exit_after_job,
+ host_list=django_gearman_commands.settings.GEARMAN_SERVERS)
+ task_name = '{0}@{1}'.format(self.task_name, get_namespace()) if get_namespace() else self.task_name
log.info('Registering gearman task: %s', self.task_name)
- worker.register_task(self.task_name, self._invoke_job)
- except:
+ worker.register_task(task_name, self._invoke_job)
+ except Exception:
log.exception('Problem with registering gearman task')
raise
@@ -69,22 +79,21 @@ def _invoke_job(self, worker, job):
"""
try:
- # represent default job data '' as None
+ # Represent default job data '' as None.
job_data = job.data if job.data else None
- self.stdout.write('Invoking gearman job, task: %s.\n' % self.task_name)
+ self.stdout.write('Invoking gearman job, task: {0:s}.\n'.format(self.task_name))
result = self.do_job(job_data)
- log.info('Job finished, task: %s', self.task_name)
- self.stdout.write('Job finished, task: %s\n' % self.task_name)
+ log.info('Job finished, task: %s, result %s', self.task_name, result)
+ self.stdout.write('Job finished, task: {0:s}\n'.format(self.task_name))
if result is not None:
- log.info(result)
- self.stdout.write('%s\n' % result)
+ self.stdout.write('{0}\n'.format(result))
return 'OK'
- except:
- log.exception('Error occured when invoking job, task: %s', self.task_name)
+ except Exception:
+ log.exception('Error occurred when invoking job, task: %s', self.task_name)
raise
@@ -105,47 +114,56 @@ def get_server_info(self):
"""Read Gearman server info - status, workers and and version."""
result = ''
- # read server status info
+ # Read server status info.
client = gearman.GearmanAdminClient([self.host])
self.server_version = client.get_version()
self.tasks = client.get_status()
self.workers = client.get_workers()
- # use prettytable if available, otherwise raw output
+ # Use prettytable if available, otherwise raw output.
try:
from prettytable import PrettyTable
- use_prettytable = True
except ImportError:
- use_prettytable = False
+ PrettyTable = None
- if use_prettytable:
- # use PrettyTable for output
+ if PrettyTable is not None:
+ # Use PrettyTable for output.
# version
table = PrettyTable(['Gearman Server Host', 'Gearman Server Version'])
table.add_row([self.host, self.server_version])
- result += '%s.\n\n' % str(table)
+ result += '{0:s}.\n\n'.format(table)
# tasks
table = PrettyTable(['Task Name', 'Total Workers', 'Running Jobs', 'Queued Jobs'])
for r in self.tasks:
table.add_row([r['task'], r['workers'], r['running'], r['queued']])
- result += '%s.\n\n' % str(table)
+ result += '{0:s}.\n\n'.format(table)
# workers
table = PrettyTable(['Worker IP', 'Registered Tasks', 'Client ID', 'File Descriptor'])
for r in self.workers:
if r['tasks']: # ignore workers with no registered task
table.add_row([r['ip'], ','.join(r['tasks']), r['client_id'], r['file_descriptor']])
- result += '%s.\n\n' % str(table)
+ result += '{0:s}.\n\n'.format(table)
else:
# raw output without PrettyTable
- result += 'Gearman Server Host:%s\n' % self.host
- result += 'Gearman Server Version:%s.\n' % self.server_version
- result += 'Tasks:\n%s\n' % str(self.tasks)
- result += 'Workers:\n%s\n' % str(self.workers)
+ result += 'Gearman Server Host:{0:s}\n'.format(self.host)
+ result += 'Gearman Server Version:{0:s}.\n'.format(self.server_version)
+ result += 'Tasks:\n{0:s}\n'.format(self.tasks)
+ result += 'Workers:\n{0:s}\n'.format(self.workers)
return result
+
+
+def get_namespace():
+ """Namespace to suffix function on a mutialized gearman."""
+ return django_gearman_commands.settings.GEARMAN_CLIENT_NAMESPACE
+
+
+def submit_job(task_name, data='', **options):
+ """Shortcut util for submitting job in standard way."""
+ return call_command('gearman_submit_job', task_name, data, **options)
@@ -1,20 +1,24 @@
# -*- coding: utf-8 -*-
-from django.core.management.base import BaseCommand, CommandError
-from django.conf import settings
-import gearman
+
import logging
+
+from django.core.management.base import BaseCommand
+
+import django_gearman_commands.settings
from django_gearman_commands import GearmanServerInfo
+
log = logging.getLogger(__name__)
+
class Command(BaseCommand):
"""Pprint overview of Gearman server status and workers."""
help = 'Print overview of Gearman server status and workers.'
def handle(self, *args, **options):
result = ''
- for server in settings.GEARMAN_SERVERS:
+ for server in django_gearman_commands.settings.GEARMAN_SERVERS:
server_info = GearmanServerInfo(server)
result += server_info.get_server_info()
@@ -1,11 +1,18 @@
# -*- coding: utf-8 -*-
-from django.core.management.base import BaseCommand, CommandError
-from django.conf import settings
-import gearman
+
import logging
+import gearman
+
+from django.core.management.base import BaseCommand, CommandError
+
+import django_gearman_commands.settings
+from django_gearman_commands import get_namespace
+
+
log = logging.getLogger(__name__)
+
class Command(BaseCommand):
"""Submit specific gearman job with job data as an arguments."""
@@ -14,22 +21,22 @@ class Command(BaseCommand):
def handle(self, *args, **options):
try:
- task_name = None
job_data = ''
if len(args) == 0:
raise CommandError('At least task name must be provided.')
- task_name = args[0]
+ task_name = '{0}@{1}'.format(args[0], get_namespace()) if get_namespace() else args[0]
if len(args) > 1:
job_data = args[1]
- self.stdout.write('Submitting job: %s, job data: %s.\n' % (task_name, job_data if job_data else '(empty)'))
+ self.stdout.write('Submitting job: {0:s}, job data: {1:s}.\n'.format(task_name, job_data if job_data else '(empty)'))
- client = gearman.GearmanClient(settings.GEARMAN_SERVERS)
- result = client.submit_job(task_name, job_data, wait_until_complete=False, background=True)
+ client = gearman.GearmanClient(django_gearman_commands.settings.GEARMAN_SERVERS)
+ result = client.submit_job(task_name, job_data, wait_until_complete=options.get('wait_until_complete', False),
+ background=options.get('background', True))
- self.stdout.write('Job submission done, result: %s.\n' % result)
+ self.stdout.write('Job submission done, result: {0:s}.\n'.format(result))
except:
log.exception('Error when submitting gearman job')
raise
@@ -1,9 +1,11 @@
# -*- coding: utf-8 -*-
+
from django.core.cache import cache
-import django_gearman_commands
+
+from django_gearman_commands import GearmanWorkerBaseCommand
-class Command(django_gearman_commands.GearmanWorkerBaseCommand):
+class Command(GearmanWorkerBaseCommand):
"""Gearman worker performing 'footest' job."""
@property
@@ -2,3 +2,6 @@
# gearman jobs servers
GEARMAN_SERVERS = getattr(settings, 'GEARMAN_SERVERS', ['127.0.0.1:4730'])
+
+# Namespacing support
+GEARMAN_CLIENT_NAMESPACE = getattr(settings, 'GEARMAN_CLIENT_NAMESPACE', '')
@@ -1,17 +1,22 @@
# -*- coding: utf-8 -*-
+
+import logging
+import pickle
+
from django.core.management import call_command
-from django.conf import settings
from django.test import TestCase
from django.core.cache import cache
from django_gearman_commands import GearmanServerInfo
-import logging
+
+import django_gearman_commands.settings
log = logging.getLogger(__name__)
+
class GearmanCommandsTest(TestCase):
def test_server_info(self):
- server_info = GearmanServerInfo(settings.GEARMAN_SERVERS[0])
+ server_info = GearmanServerInfo(django_gearman_commands.settings.GEARMAN_SERVERS[0])
server_info.get_server_info()
self.assertTrue(server_info.server_version.startswith('OK'), 'Unexpected server version string')
self.assertTrue(type(server_info.tasks) is tuple, 'Unexpected server tasks type')
@@ -40,7 +45,6 @@ def test_worker_task_data_string(self):
self.assertEqual(cache.get('footest'), 'DATA', 'Unexpected footest worker result (data string)')
def test_worker_task_data_pickled(self):
- import pickle
# submit job
call_command('gearman_submit_job', 'footest', pickle.dumps(u'DATA'))

0 comments on commit ee847e3

Please sign in to comment.