Permalink
Browse files

django-kombu is now part of kombu as kombu.transport.django

  • Loading branch information...
1 parent 2daf3c2 commit 2f6fe164e867671a988828fd6f26977d9348c470 @ask ask committed Sep 9, 2011
View
@@ -23,6 +23,7 @@ Ian Struble <istruble@gmail.com>
Jason Cater <jason@ncsfulfillment.com>
Jeff Balogh <me@jeffbalogh.org>
John Spray <jcspray@gmail.com>
+Keith Fitzgerald <ghostrocket@me.com>
Marcin Lulek (ergo) <info@webreactor.eu>
Mher Movsisyan <mher.movsisyan@gmail.com>
Noah Kantrowitz <noah@coderanger.net>
View
38 THANKS
@@ -1,6 +1,32 @@
-Thanks to Barry Pederson <bp@barryp.org> for the py-amqplib library.
-Thanks to Grégoire Cachet <gregoire@audacy.fr> for bug reports.
-Thanks to Martin Mahner for the Sphinx theme.
-Thanks to jcater for bug reports.
-Thanks to sebest for bug reports.
-Thanks to greut for bug reports
+========
+ THANKS
+========
+
+From ``carrot`` THANKS file
+===========================
+
+* Thanks to Barry Pederson <bp@barryp.org> for the py-amqplib library.
+* Thanks to Grégoire Cachet <gregoire@audacy.fr> for bug reports.
+* Thanks to Martin Mahner for the Sphinx theme.
+* Thanks to jcater for bug reports.
+* Thanks to sebest for bug reports.
+* Thanks to greut for bug reports
+
+From ``django-kombu`` THANKS file
+=================================
+
+* Thanks to Rajesh Dhawan and other authors of django-queue-service
+ for the database model implementation.
+ See http://code.google.com/p/django-queue-service/.
+
+From ``kombu-sqlalchemy`` THANKS file
+=====================================
+
+* Thanks to Rajesh Dhawan and other authors of django-queue-service
+ for the database model implementation.
+ See http://code.google.com/p/django-queue-service/.
+
+* Thanks to haridsv for the draft SQLAlchemy port (which can still
+ be found at http://github.com/haridsv/celery-alchemy-poc)
+
+
View
@@ -34,6 +34,7 @@ def setup():
# so coverage sees all our modules.
for module in moduleindex:
try:
+ print("IMPORT: %r" % (module, ))
__import__(module)
except ImportError:
pass
@@ -8,15 +8,6 @@
class test_transport(unittest.TestCase):
- def test_django_transport(self):
- self.assertRaises(
- ImportError,
- mask_modules("djkombu")(transport.resolve_transport), "django")
-
- self.assertTupleEqual(
- module_exists("djkombu")(transport.resolve_transport)("django"),
- ("djkombu.transport", "DatabaseTransport"))
-
def test_resolve_transport__no_class_name(self):
self.assertRaises(KeyError, transport.resolve_transport,
"nonexistant")
@@ -14,36 +14,9 @@
DEFAULT_TRANSPORT = "kombu.transport.amqplib.Transport"
-MISSING_LIB = """
- The %(feature)s requires the %(lib)s module to be
- installed; http://pypi.python.org/pypi/%(lib)s
-
- Use pip to install this module::
-
- $ pip install %(lib)s
-
- or using easy_install::
-
- $ easy_install %(lib)s
-"""
-
-
-def _requires(feature, module, lib):
- try:
- __import__(module)
- except ImportError:
- raise ImportError(MISSING_LIB % {"feature": feature,
- "module": module,
- "lib": lib})
-
-
-def _django_transport():
- _requires("Django transport", "djkombu", "django-kombu")
- return "djkombu.transport.DatabaseTransport"
-
def _ghettoq(name, new, alias=None):
- xxx = new
+ xxx = new # stupid enclosing
def __inner():
import warnings
@@ -74,11 +47,10 @@ def __inner():
"beanstalk": "kombu.transport.beanstalk.Transport",
"mongodb": "kombu.transport.mongodb.Transport",
"couchdb": "kombu.transport.couchdb.Transport",
- "django": _django_transport,
+ "django": "kombu.transport.django.Transport",
"sqlalchemy": "kombu.transport.sqlalchemy.Transport",
"ghettoq.taproot.Redis": _ghettoq("Redis", "redis", "redis"),
- "ghettoq.taproot.Database": _ghettoq("Database", _django_transport,
- "django"),
+ "ghettoq.taproot.Database": _ghettoq("Database", "django", "django"),
"ghettoq.taproot.MongoDB": _ghettoq("MongoDB", "mongodb"),
"ghettoq.taproot.Beanstalk": _ghettoq("Beanstalk", "beanstalk"),
"ghettoq.taproot.CouchDB": _ghettoq("CouchDB", "couchdb"),
@@ -0,0 +1,61 @@
+"""Kombu transport using the Django database as a message store."""
+from __future__ import absolute_import
+
+from Queue import Empty
+
+from anyjson import serialize, deserialize
+
+from django.conf import settings
+from django.core import exceptions as errors
+
+from .. import virtual
+
+from .models import Queue
+
+VERSION = (0, 9, 4)
+__version__ = ".".join(map(str, VERSION))
+
+POLLING_INTERVAL = getattr(settings, "DJKOMBU_POLLING_INTERVAL", 5.0)
+
+
+class Channel(virtual.Channel):
+
+ def _new_queue(self, queue, **kwargs):
+ Queue.objects.get_or_create(name=queue)
+
+ def _put(self, queue, message, **kwargs):
+ Queue.objects.publish(queue, serialize(message))
+
+ def basic_consume(self, queue, *args, **kwargs):
+ qinfo = self.state.bindings[queue]
+ exchange = qinfo[0]
+ if self.typeof(exchange).type == "fanout":
+ return
+ super(Channel, self).basic_consume(queue, *args, **kwargs)
+
+ def _get(self, queue):
+ #self.refresh_connection()
+ m = Queue.objects.fetch(queue)
+ if m:
+ return deserialize(m)
+ raise Empty()
+
+ def _size(self, queue):
+ return Queue.objects.size(queue)
+
+ def _purge(self, queue):
+ return Queue.objects.purge(queue)
+
+ def refresh_connection(self):
+ from django import db
+ db.close_connection()
+
+
+class DatabaseTransport(virtual.Transport):
+ Channel = Channel
+
+ default_port = 0
+ polling_interval = POLLING_INTERVAL
+ connection_errors = ()
+ channel_errors = (errors.ObjectDoesNotExist,
+ errors.MultipleObjectsReturned)
@@ -0,0 +1,25 @@
+from __future__ import absolute_import
+
+from django.core.management.base import BaseCommand
+
+
+def pluralize(desc, value):
+ if value > 1:
+ return desc + 's'
+ return desc
+
+
+class Command(BaseCommand):
+ requires_model_validation = True
+
+ def handle(self, *args, **options):
+ from ...models import Message
+
+ count = Message.objects.filter(visible=False).count()
+
+ print("Removing %s invisible %s... " % (
+ count, pluralize("message", count))
+ Message.objects.cleanup()
+
+
+
@@ -0,0 +1,70 @@
+from __future__ import absolute_import
+
+from django.db import transaction, connection, models
+try:
+ from django.db import connections, router
+except ImportError: # pre-Django 1.2
+ connections = router = None
+
+
+class QueueManager(models.Manager):
+
+ def publish(self, queue_name, payload):
+ queue, created = self.get_or_create(name=queue_name)
+ queue.messages.create(payload=payload)
+
+ def fetch(self, queue_name):
+ try:
+ queue = self.get(name=queue_name)
+ except self.model.DoesNotExist:
+ return
+
+ return queue.messages.pop()
+
+ def size(self, queue_name):
+ return self.get(name=queue_name).messages.count()
+
+ def purge(self, queue_name):
+ try:
+ queue = self.get(name=queue_name)
+ except self.model.DoesNotExist:
+ return
+
+ messages = queue.messages.all()
+ count = messages.count()
+ messages.delete()
+ return count
+
+
+class MessageManager(models.Manager):
+ _messages_received = [0]
+ cleanup_every = 10
+
+ def pop(self):
+ try:
+ resultset = self.filter(visible=True).order_by('sent_at', 'id')
+ result = resultset[0:1].get()
+ result.visible = False
+ result.save()
+ recv = self.__class__._messages_received
+ recv[0] += 1
+ if not recv[0] % self.cleanup_every:
+ self.cleanup()
+ return result.payload
+ except self.model.DoesNotExist:
+ pass
+
+ def cleanup(self):
+ cursor = self.connection_for_write().cursor()
+ try:
+ cursor.execute("DELETE FROM %s WHERE visible=%%s" % (
+ self.model._meta.db_table, ), (False, ))
+ except:
+ transaction.rollback_unless_managed()
+ else:
+ transaction.commit_unless_managed()
+
+ def connection_for_write(self):
+ if connections:
+ return connections[router.db_for_write(self.model)]
+ return connection
@@ -0,0 +1,32 @@
+from __future__ import absolute_import
+
+from django.db import models
+from django.utils.translation import ugettext_lazy as _
+
+from .managers import QueueManager, MessageManager
+
+
+class Queue(models.Model):
+ name = models.CharField(_("name"), max_length=200, unique=True)
+
+ objects = QueueManager()
+
+ class Meta:
+ db_table = "djkombu_queue"
+ verbose_name = _("queue")
+ verbose_name_plural = _("queues")
+
+
+class Message(models.Model):
+ visible = models.BooleanField(default=True, db_index=True)
+ sent_at = models.DateTimeField(null=True, blank=True, db_index=True,
+ auto_now_add=True)
+ payload = models.TextField(_("payload"), null=False)
+ queue = models.ForeignKey(Queue, related_name="messages")
+
+ objects = MessageManager()
+
+ class Meta:
+ db_table = "djkombu_message"
+ verbose_name = _("message")
+ verbose_name_plural = _("messages")

0 comments on commit 2f6fe16

Please sign in to comment.