Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Ask Solem committed May 19, 2010
0 parents commit f7a1b5c
Show file tree
Hide file tree
Showing 18 changed files with 764 additions and 0 deletions.
14 changes: 14 additions & 0 deletions .gitignore
@@ -0,0 +1,14 @@
.DS_Store
*.pyc
*~
.*.sw[po]
dist/
*.egg-info
doc/__build/*
build/
.build/
pip-log.txt
.directory
erl_crash.dump
*.db
Documentation/
20 changes: 20 additions & 0 deletions djcelery/__init__.py
@@ -0,0 +1,20 @@
"""Django Celery Integration."""

VERSION = (1, 0, 3)

__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
__author__ = "Ask Solem"
__contact__ = "ask@celeryproject.org"
__homepage__ = "http://github.com/ask/django-celery/"
__docformat__ = "restructuredtext"


def is_stable_release():
if len(VERSION) > 3 and isinstance(VERSION[3], basestring):
return False
return not VERSION[1] % 2


def version_with_meta():
return "%s (%s)" % (__version__,
is_stable_release() and "stable" or "unstable")
Empty file added djcelery/backends/__init__.py
Empty file.
62 changes: 62 additions & 0 deletions djcelery/backends/cache.py
@@ -0,0 +1,62 @@
"""celery.backends.cache"""
from datetime import timedelta

from django.utils.encoding import smart_str
from django.core.cache import cache, get_cache
from django.core.cache.backends.base import InvalidCacheBackendError

from celery import conf
from celery.utils.timeutils import timedelta_seconds
from celery.backends.base import KeyValueStoreBackend

# CELERY_CACHE_BACKEND overrides the django-global(tm) backend settings.
if conf.CELERY_CACHE_BACKEND:
cache = get_cache(conf.CELERY_CACHE_BACKEND)


class DjangoMemcacheWrapper(object):
"""Wrapper class to django's memcache backend class, that overrides the
:meth:`get` method in order to remove the forcing of unicode strings
since it may cause binary or pickled data to break."""

def __init__(self, cache):
self.cache = cache

def get(self, key, default=None):
val = self.cache._cache.get(smart_str(key))
if val is None:
return default
else:
return val

def set(self, key, value, timeout=0):
self.cache.set(key, value, timeout)

# Check if django is using memcache as the cache backend. If so, wrap the
# cache object in a DjangoMemcacheWrapper that fixes a bug with retrieving
# pickled data
from django.core.cache.backends.base import InvalidCacheBackendError
try:
from django.core.cache.backends.memcached import CacheClass
except InvalidCacheBackendError:
pass
else:
if isinstance(cache, CacheClass):
cache = DjangoMemcacheWrapper(cache)


class CacheBackend(KeyValueStoreBackend):
"""Backend using the Django cache framework to store task metadata."""

def __init__(self, *args, **kwargs):
super(CacheBackend, self).__init__(self, *args, **kwargs)
expires = conf.TASK_RESULT_EXPIRES
if isinstance(expires, timedelta):
expires = timedelta_seconds(conf.TASK_RESULT_EXPIRES)
self.expires = expires

def get(self, key):
return cache.get(key)

def set(self, key, value):
cache.set(key, value, self.expires)
35 changes: 35 additions & 0 deletions djcelery/backends/database.py
@@ -0,0 +1,35 @@
from celery.backends.base import BaseDictBackend

from djcelery.models import TaskMeta, TaskSetMeta


class DatabaseBackend(BaseDictBackend):
"""The database backends. Using Django models to store task metadata."""

def _store_result(self, task_id, result, status, traceback=None):
"""Store return value and status of an executed task."""
TaskMeta.objects.store_result(task_id, result, status,
traceback=traceback)
return result

def _save_taskset(self, taskset_id, result):
"""Store the result of an executed taskset."""
TaskSetMeta.objects.store_result(taskset_id, result)
return result

def _get_task_meta_for(self, task_id):
"""Get task metadata for a task by id."""
meta = TaskMeta.objects.get_task(task_id)
if meta:
return meta.to_dict()

def _restore_taskset(self, taskset_id):
"""Get taskset metadata for a taskset by id."""
meta = TaskSetMeta.objects.restore_taskset(taskset_id)
if meta:
return meta.to_dict()

def cleanup(self):
"""Delete expired metadata."""
TaskMeta.objects.delete_expired()
TaskSetMeta.objects.delete_expired()
Empty file added djcelery/loaders/__init__.py
Empty file.
100 changes: 100 additions & 0 deletions djcelery/loaders/djangoapp.py
@@ -0,0 +1,100 @@
import imp
import importlib

from celery.loaders.base import BaseLoader

_RACE_PROTECTION = False


class Loader(BaseLoader):
"""The Django loader."""
_db_reuse = 0

def read_configuration(self):
"""Load configuration from Django settings."""
from django.conf import settings
return settings

def close_database(self):
from django.db import connection
db_reuse_max = getattr(self.conf, "CELERY_DB_REUSE_MAX", None)
if not db_reuse_max:
return connection.close()
if self._db_reuse >= db_reuse_max:
self._db_reuse = 0
return connection.close()
self._db_reuse += 1

def on_task_init(self, task_id, task):
"""This method is called before a task is executed.
Does everything necessary for Django to work in a long-living,
multiprocessing environment.
"""
# See http://groups.google.com/group/django-users/
# browse_thread/thread/78200863d0c07c6d/
self.close_database()

# ## Reset cache connection only if using memcached/libmemcached
from django.core import cache
# XXX At Opera we use a custom memcached backend that uses
# libmemcached instead of libmemcache (cmemcache). Should find a
# better solution for this, but for now "memcached" should probably
# be unique enough of a string to not make problems.
cache_backend = cache.settings.CACHE_BACKEND
try:
parse_backend = cache.parse_backend_uri
except AttributeError:
parse_backend = lambda backend: backend.split(":", 1)
cache_scheme = parse_backend(cache_backend)[0]

if "memcached" in cache_scheme:
cache.cache.close()

def on_worker_init(self):
"""Called when the worker starts.
Automatically discovers any ``tasks.py`` files in the applications
listed in ``INSTALLED_APPS``.
"""
self.import_default_modules()
autodiscover()


def autodiscover():
"""Include tasks for all applications in :setting:`INSTALLED_APPS`."""
from django.conf import settings
global _RACE_PROTECTION

if _RACE_PROTECTION:
return
_RACE_PROTECTION = True
try:
return filter(None, [find_related_module(app, "tasks")
for app in settings.INSTALLED_APPS])
finally:
_RACE_PROTECTION = False


def find_related_module(app, related_name):
"""Given an application name and a module name, tries to find that
module in the application."""

try:
app_path = importlib.import_module(app).__path__
except AttributeError:
return

try:
imp.find_module(related_name, app_path)
except ImportError:
return

module = importlib.import_module("%s.%s" % (app, related_name))

try:
return getattr(module, related_name)
except AttributeError:
return
Empty file added djcelery/management/__init__.py
Empty file.
Empty file.
18 changes: 18 additions & 0 deletions djcelery/management/commands/camqadm.py
@@ -0,0 +1,18 @@
"""
Celery AMQP Administration Tool using the AMQP API.
"""
from django.core.management.base import BaseCommand

from celery.bin.camqadm import camqadm, OPTION_LIST


class Command(BaseCommand):
"""Run the celery daemon."""
option_list = BaseCommand.option_list + OPTION_LIST
help = 'Celery AMQP Administration Tool using the AMQP API.'

def handle(self, *args, **options):
"""Handle the management command."""
camqadm(*args, **options)
18 changes: 18 additions & 0 deletions djcelery/management/commands/celerybeat.py
@@ -0,0 +1,18 @@
"""
Start the celery clock service from the Django management command.
"""
from django.core.management.base import BaseCommand

from celery.bin.celerybeat import run_clockservice, OPTION_LIST


class Command(BaseCommand):
"""Run the celery periodic task scheduler."""
option_list = BaseCommand.option_list + OPTION_LIST
help = 'Run the celery periodic task scheduler'

def handle(self, *args, **options):
"""Handle the management command."""
run_clockservice(**options)
18 changes: 18 additions & 0 deletions djcelery/management/commands/celeryd.py
@@ -0,0 +1,18 @@
"""
Start the celery daemon from the Django management command.
"""
from django.core.management.base import BaseCommand

from celery.bin.celeryd import run_worker, OPTION_LIST


class Command(BaseCommand):
"""Run the celery daemon."""
option_list = BaseCommand.option_list + OPTION_LIST
help = 'Run the celery daemon'

def handle(self, *args, **options):
"""Handle the management command."""
run_worker(**options)
37 changes: 37 additions & 0 deletions djcelery/management/commands/celerymon.py
@@ -0,0 +1,37 @@
"""
Start the celery clock service from the Django management command.
"""
import sys
from django.core.management.base import BaseCommand

#try:
from celerymonitor.bin.celerymond import run_monitor, OPTION_LIST
#except ImportError:
# OPTION_LIST = ()
# run_monitor = None

MISSING = """
You don't have celerymon installed, please install it by running the following
command:
$ easy_install celerymon
or if you're using pip (like you should be):
$ pip install celerymon
"""


class Command(BaseCommand):
"""Run the celery monitor."""
option_list = BaseCommand.option_list + OPTION_LIST
help = 'Run the celery monitor'

def handle(self, *args, **options):
"""Handle the management command."""
if run_monitor is None:
sys.stderr.write(MISSING)
else:
run_monitor(**options)

0 comments on commit f7a1b5c

Please sign in to comment.