Skip to content

Commit

Permalink
Re-Adding support for sqlalchemy as it is needed by Apache project Ai…
Browse files Browse the repository at this point in the history
…rflow (#687)

* Re-Adding support for sqlalchemy as it is needed by Apache project Airflow

* Re-Adding support for sqlalchemy as it is needed by Apache project Airflow
  • Loading branch information
aminghadersohi authored and auvipy committed May 17, 2017
1 parent 25a9e76 commit be79b3d
Show file tree
Hide file tree
Showing 15 changed files with 364 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -31,3 +31,5 @@ htmlcov/
test.db
coverage.xml
venv/
env
.eggs
3 changes: 3 additions & 0 deletions .landscape.yml
@@ -0,0 +1,3 @@
pylint:
disable:
- cyclic-import
2 changes: 2 additions & 0 deletions docs/reference/index.rst
Expand Up @@ -48,6 +48,8 @@
kombu.transport.etcd
kombu.transport.zookeeper
kombu.transport.filesystem
kombu.transport.sqlalchemy
kombu.transport.sqlalchemy.models
kombu.transport.SQS
kombu.transport.SLMQ
kombu.transport.pyro
Expand Down
1 change: 1 addition & 0 deletions docs/reference/kombu.transport.SLMQ.rst
Expand Up @@ -2,6 +2,7 @@
SLMQ Transport - ``kombu.transport.SLMQ``
=============================================


.. currentmodule:: kombu.transport.SLMQ

.. automodule:: kombu.transport.SLMQ
Expand Down
32 changes: 32 additions & 0 deletions docs/reference/kombu.transport.sqlalchemy.models.rst
@@ -0,0 +1,32 @@
=====================================================================
SQLAlchemy Transport Model - ``kombu.transport.sqlalchemy.models``
=====================================================================


.. currentmodule:: kombu.transport.sqlalchemy.models

.. automodule:: kombu.transport.sqlalchemy.models

.. contents::
:local:

Models
------

.. autoclass:: Queue

.. autoattribute:: Queue.id

.. autoattribute:: Queue.name

.. autoclass:: Message

.. autoattribute:: Message.id

.. autoattribute:: Message.visible

.. autoattribute:: Message.sent_at

.. autoattribute:: Message.payload

.. autoattribute:: Message.version
25 changes: 25 additions & 0 deletions docs/reference/kombu.transport.sqlalchemy.rst
@@ -0,0 +1,25 @@
===========================================================
SQLAlchemy Transport Model - kombu.transport.sqlalchemy
===========================================================


.. currentmodule:: kombu.transport.sqlalchemy

.. automodule:: kombu.transport.sqlalchemy

.. contents::
:local:

Transport
---------

.. autoclass:: Transport
:members:
:undoc-members:

Channel
-------

.. autoclass:: Channel
:members:
:undoc-members:
2 changes: 2 additions & 0 deletions kombu/transport/__init__.py
Expand Up @@ -28,6 +28,8 @@ def supports_librabbitmq():
'sqs': 'kombu.transport.SQS:Transport',
'mongodb': 'kombu.transport.mongodb:Transport',
'zookeeper': 'kombu.transport.zookeeper:Transport',
'sqlalchemy': 'kombu.transport.sqlalchemy:Transport',
'sqla': 'kombu.transport.sqlalchemy:Transport',
'SLMQ': 'kombu.transport.SLMQ.Transport',
'slmq': 'kombu.transport.SLMQ.Transport',
'filesystem': 'kombu.transport.filesystem:Transport',
Expand Down
1 change: 0 additions & 1 deletion kombu/transport/etcd.py
Expand Up @@ -259,7 +259,6 @@ def driver_version(self):
"""Return the version of the etcd library.
.. note::
python-etcd has no __version__. This is a workaround.
"""
try:
Expand Down
164 changes: 164 additions & 0 deletions kombu/transport/sqlalchemy/__init__.py
@@ -0,0 +1,164 @@
"""Kombu transport using SQLAlchemy as the message store."""
# SQLAlchemy overrides != False to have special meaning and pep8 complains
# flake8: noqa

from __future__ import absolute_import, unicode_literals

from json import loads, dumps

from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import sessionmaker

from kombu.five import Empty
from kombu.transport import virtual
from kombu.utils import cached_property
from kombu.utils.encoding import bytes_to_str
from .models import (ModelBase, Queue as QueueBase, Message as MessageBase,
class_registry, metadata)


VERSION = (1, 1, 0)
__version__ = '.'.join(map(str, VERSION))


class Channel(virtual.Channel):
"""The channel class."""

_session = None
_engines = {} # engine cache

def __init__(self, connection, **kwargs):
self._configure_entity_tablenames(connection.client.transport_options)
super(Channel, self).__init__(connection, **kwargs)

def _configure_entity_tablenames(self, opts):
self.queue_tablename = opts.get('queue_tablename', 'kombu_queue')
self.message_tablename = opts.get('message_tablename', 'kombu_message')

#
# Define the model definitions. This registers the declarative
# classes with the active SQLAlchemy metadata object. This *must* be
# done prior to the ``create_engine`` call.
#
self.queue_cls and self.message_cls

def _engine_from_config(self):
conninfo = self.connection.client
transport_options = conninfo.transport_options.copy()
transport_options.pop('queue_tablename', None)
transport_options.pop('message_tablename', None)
return create_engine(conninfo.hostname, **transport_options)

def _open(self):
conninfo = self.connection.client
if conninfo.hostname not in self._engines:
engine = self._engine_from_config()
Session = sessionmaker(bind=engine)
metadata.create_all(engine)
self._engines[conninfo.hostname] = engine, Session
return self._engines[conninfo.hostname]

@property
def session(self):
if self._session is None:
_, Session = self._open()
self._session = Session()
return self._session

def _get_or_create(self, queue):
obj = self.session.query(self.queue_cls) \
.filter(self.queue_cls.name == queue).first()
if not obj:
obj = self.queue_cls(queue)
self.session.add(obj)
try:
self.session.commit()
except OperationalError:
self.session.rollback()
return obj

def _new_queue(self, queue, **kwargs):
self._get_or_create(queue)

def _put(self, queue, payload, **kwargs):
obj = self._get_or_create(queue)
message = self.message_cls(dumps(payload), obj)
self.session.add(message)
try:
self.session.commit()
except OperationalError:
self.session.rollback()

def _get(self, queue):
obj = self._get_or_create(queue)
if self.session.bind.name == 'sqlite':
self.session.execute('BEGIN IMMEDIATE TRANSACTION')
try:
msg = self.session.query(self.message_cls) \
.with_lockmode('update') \
.filter(self.message_cls.queue_id == obj.id) \
.filter(self.message_cls.visible != False) \
.order_by(self.message_cls.sent_at) \
.order_by(self.message_cls.id) \
.limit(1) \
.first()
if msg:
msg.visible = False
return loads(bytes_to_str(msg.payload))
raise Empty()
finally:
self.session.commit()

def _query_all(self, queue):
obj = self._get_or_create(queue)
return self.session.query(self.message_cls) \
.filter(self.message_cls.queue_id == obj.id)

def _purge(self, queue):
count = self._query_all(queue).delete(synchronize_session=False)
try:
self.session.commit()
except OperationalError:
self.session.rollback()
return count

def _size(self, queue):
return self._query_all(queue).count()

def _declarative_cls(self, name, base, ns):
if name in class_registry:
return class_registry[name]
return type(str(name), (base, ModelBase), ns)

@cached_property
def queue_cls(self):
return self._declarative_cls(
'Queue',
QueueBase,
{'__tablename__': self.queue_tablename}
)

@cached_property
def message_cls(self):
return self._declarative_cls(
'Message',
MessageBase,
{'__tablename__': self.message_tablename}
)


class Transport(virtual.Transport):
"""The transport class."""

Channel = Channel

can_parse_url = True
default_port = 0
driver_type = 'sql'
driver_name = 'sqlalchemy'
connection_errors = (OperationalError, )

def driver_version(self):
import sqlalchemy
return sqlalchemy.__version__
67 changes: 67 additions & 0 deletions kombu/transport/sqlalchemy/models.py
@@ -0,0 +1,67 @@
"""Kombu transport using SQLAlchemy as the message store."""
from __future__ import absolute_import, unicode_literals

import datetime

from sqlalchemy import (Column, Integer, String, Text, DateTime,
Sequence, Boolean, ForeignKey, SmallInteger)
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.orm import relation
from sqlalchemy.schema import MetaData

class_registry = {}
metadata = MetaData()
ModelBase = declarative_base(metadata=metadata, class_registry=class_registry)


class Queue(object):
"""The queue class."""

__table_args__ = {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'}

id = Column(Integer, Sequence('queue_id_sequence'), primary_key=True,
autoincrement=True)
name = Column(String(200), unique=True)

def __init__(self, name):
self.name = name

def __str__(self):
return '<Queue({self.name})>'.format(self=self)

@declared_attr
def messages(cls):
return relation('Message', backref='queue', lazy='noload')


class Message(object):
"""The message class."""

__table_args__ = {'sqlite_autoincrement': True, 'mysql_engine': 'InnoDB'}

id = Column(Integer, Sequence('message_id_sequence'),
primary_key=True, autoincrement=True)
visible = Column(Boolean, default=True, index=True)
sent_at = Column('timestamp', DateTime, nullable=True, index=True,
onupdate=datetime.datetime.now)
payload = Column(Text, nullable=False)
version = Column(SmallInteger, nullable=False, default=1)

__mapper_args__ = {'version_id_col': version}

def __init__(self, payload, queue):
self.payload = payload
self.queue = queue

def __str__(self):
return '<Message: {0.sent_at} {0.payload} {0.queue_id}>'.format(self)

@declared_attr
def queue_id(self):
return Column(
Integer,
ForeignKey(
'%s.id' % class_registry['Queue'].__tablename__,
name='FK_kombu_message_queue'
)
)
1 change: 1 addition & 0 deletions requirements/extras/sqlalchemy.txt
@@ -0,0 +1 @@
sqlalchemy
1 change: 1 addition & 0 deletions requirements/test-ci.txt
Expand Up @@ -4,3 +4,4 @@ redis
PyYAML
msgpack-python>0.2.0
-r extras/sqs.txt
sqlalchemy
1 change: 1 addition & 0 deletions setup.py
Expand Up @@ -141,6 +141,7 @@ def run_tests(self):
'mongodb': extras('mongodb.txt'),
'sqs': extras('sqs.txt'),
'zookeeper': extras('zookeeper.txt'),
'sqlalchemy': extras('sqlalchemy.txt'),
'librabbitmq': extras('librabbitmq.txt'),
'pyro': extras('pyro.txt'),
'slmq': extras('slmq.txt'),
Expand Down
13 changes: 13 additions & 0 deletions t/integration/tests/test_sqla.py
@@ -0,0 +1,13 @@
from __future__ import absolute_import, unicode_literals

from funtests import transport

from kombu.tests.case import skip


@skip.unless_module('sqlalchemy')
class test_sqla(transport.TransportCase):
transport = 'sqlalchemy'
prefix = 'sqlalchemy'
event_loop_max = 10
connection_options = {'hostname': 'sqla+sqlite://'}

5 comments on commit be79b3d

@georgexsh
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, but related documentation is still missing?

@auvipy
Copy link
Member

@auvipy auvipy commented on be79b3d Nov 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess yes

@georgexsh
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@auvipy would database transport continue to be supported?

@auvipy
Copy link
Member

@auvipy auvipy commented on be79b3d Nov 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as it's re added so there was and is intention to support database transport for development purpose ease.

@ZxMYS
Copy link

@ZxMYS ZxMYS commented on be79b3d Apr 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!! We do need db transport in our project. Thanks!

Please sign in to comment.