New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qpid Transport Support #335

Merged
merged 184 commits into from Aug 18, 2014

Conversation

Projects
None yet
4 participants
@bmbouter
Contributor

bmbouter commented Apr 4, 2014

This PR adds support for Qpid as a valid transport type for Kombu, and includes code, unit tests, docstrings, doc changes, and dependency updates. This is very functional transport; it has been heavily tested with Celery, supports Celery events, and supports SSL. This is being used in the upcoming release of Pulp.

Issues related to this code can be routed to me for bugfix and troubleshooting.

The code

The bulk of the code itself is contained in the kombu.transport.qpid module.

NOTE: The code currently still does monkey patching of qpid.messaging. Current versions of qpid.messaging that are out in the wild (pypi, rhel6, fedora, etc) are missing two important patches, QPID-5637 and QPID-5557. Those patches have been accepted upstream, but not all places that provide Qpid bits contain those patches. This codebase introduces those patches at runtime, and eventually will be removed. It is safe to patch any version of Qpid 0.18+, and is safe to apply to an already patched installation.

The monkey patching has been tested currently to work with clients using 0.22, 0.24, and 0.26 versions of qpid.messaging, so it should produce the expected behavior no matter what version of tools it is patching. The patching has been tested against the versions currently on Pypi.

The code is flake8 compliant. You should be able to run everything as described on this celery page and it should look clean.

Unit Tests

The unit tests are all located in the kombu.tests.transport.test_qpid module. You can run all just these new tests using:

nosetests --with-coverage --cover-html -v kombu.tests.transport.test_qpid

To get any continuous integration environments to run these unit tests, you'll need to install the dependencies. Please install the dependencies in the CI environment by running:

pip install qpid-tools qpid-python

The Docs

The code itself contains a huge number of docstrings. They follow the syntax expectations of Kombu/Celery (line width, strict PEP8, etc).

The more general documentation has been updated in two important ways.

  1. The transport has its own section which is partially from the docstrings. This contains important things like instructions on how to download dependencies.
  2. Updates to things that are no longer true. For instance the docs use to say things like "only rabbitMQ supports SSL" which is no longer true.

You can build the docs using these instructions.

You also should build the API reference since most of the docs live in there. You can follow these instructions

Dependencies

This library relies on the pure-python packages named 'qpidtoollibs' and 'qpid.messaging'. Kombu expects all pure python dependencies to be put onto pypi, and they can be "bundled" into Kombu as a group of optional dependencies.

The files introduced have all imports guarded against ImportError Exceptions so that you can import any file without installing the dependencies, and you'll encounter no Exceptions.

qpidtoollibs is currently published on pypi at version 0.26 here.
qpid.messaging is currently published on pypi at version 0.26 here.

A bundle in Kombu has been created called 'qpid' which should allow someone to install the bundle using kombu[qpid]. The docs also explain how to do this.

bmbouter added some commits Feb 11, 2014

Basic qpid transport for kombu. somewhat working...
Working:
1. Celery worker will start and run
2. Transport supports queue and exchange management operations
3. Dispatch using delay() dispatches
4. The worker will properly run your task!

Known Issues:
1. Message acknowledgement does not work
2. amqp:// only no amqps:// implemented or tested
3. A few small TODOs left in there
4. No tests
Kombu functional tests pass. Updates below:
1. Monkey patching qpid.messaging to sidestep the bug link below.
    https://issues.apache.org/jira/browse/QPID-5557

2. Better receiver and sender cleanup to reduce growth of connections to broker

3. Explicit handling for submission to an exchange vs a queue

4. FDShimThread consumers pass back a tuple with the queue name the message came from
Fixed two bugs the caused celery tasks not to run.
1. celery calls on_readable, and expects that method to not raise a socket.timeout. on_readable now catches socket.timeout and does not re-raise it
2. calling drain_events without a timeout, or timeout=0 would cause the method to return without trying to drain even once
Mostly centralized qpid.messaging Connection object generation. All k…
…ombu funtests tests pass, and pulp is still working.
A quick fix for the CPU performance issue, and fixed a SASL bug relat…
…ing to PLAIN not being specified as an auth mechanism.
Refactored shared objects between threads. Kombu tests pass.
Kombu tests were failing because the transport object is reused
between kombu tests, while channels are deleted and recreated.
Some object references were not consistent between the transport
and its subobjects (channels, threads, etc).

Also, the qpid.messaging sessions, receivers, and senders now
clean up after themselves well.
@coveralls

This comment has been minimized.

Show comment
Hide comment
@coveralls

coveralls Jul 31, 2014

Coverage Status

Coverage increased (+13.27%) when pulling 38cd23a on pulp:qpid-transport into 41791db on celery:master.

coveralls commented Jul 31, 2014

Coverage Status

Coverage increased (+13.27%) when pulling 38cd23a on pulp:qpid-transport into 41791db on celery:master.

@coveralls

This comment has been minimized.

Show comment
Hide comment
@coveralls

coveralls Aug 5, 2014

Coverage Status

Coverage increased (+13.27%) when pulling 65fa159 on pulp:qpid-transport into 41791db on celery:master.

coveralls commented Aug 5, 2014

Coverage Status

Coverage increased (+13.27%) when pulling 65fa159 on pulp:qpid-transport into 41791db on celery:master.

@coveralls

This comment has been minimized.

Show comment
Hide comment
@coveralls

coveralls Aug 7, 2014

Coverage Status

Coverage increased (+13.27%) when pulling 7b546d9 on pulp:qpid-transport into 146758c on celery:master.

coveralls commented Aug 7, 2014

Coverage Status

Coverage increased (+13.27%) when pulling 7b546d9 on pulp:qpid-transport into 146758c on celery:master.

@coveralls

This comment has been minimized.

Show comment
Hide comment
@coveralls

coveralls Aug 8, 2014

Coverage Status

Coverage increased (+1.87%) when pulling 3054b74 on pulp:qpid-transport into e44ec99 on celery:master.

coveralls commented Aug 8, 2014

Coverage Status

Coverage increased (+1.87%) when pulling 3054b74 on pulp:qpid-transport into e44ec99 on celery:master.

@bmbouter

This comment has been minimized.

Show comment
Hide comment
@bmbouter

bmbouter Aug 8, 2014

Contributor

As we talked about, the tests now pass on Travis without any 3rd part libraries needing to be installed!

It's almost 4K lines, but it's almost entirely in the three files that this PR introduces. Given your time availability, a deep review doesn't seem feasible. It's been tested heavily by two different QE teams, over 20 developers, and is included in a product being released very soon. We feel it is high quality. Every line of code introduced has also been reviewed by other RedHat developers as part of our internal process. As an aside, I'm committed to fixing defects that are discovered and adding features over time.

Could this please be merged? If you have any specific feedback or questions please let me know. After merging I can open a second PR to backport the changes to the earlier (Python 2.6x) Kombu that has already been released.

As always, thanks for your consideration and for creating such a great project. It's powering a RedHat product!

Contributor

bmbouter commented Aug 8, 2014

As we talked about, the tests now pass on Travis without any 3rd part libraries needing to be installed!

It's almost 4K lines, but it's almost entirely in the three files that this PR introduces. Given your time availability, a deep review doesn't seem feasible. It's been tested heavily by two different QE teams, over 20 developers, and is included in a product being released very soon. We feel it is high quality. Every line of code introduced has also been reviewed by other RedHat developers as part of our internal process. As an aside, I'm committed to fixing defects that are discovered and adding features over time.

Could this please be merged? If you have any specific feedback or questions please let me know. After merging I can open a second PR to backport the changes to the earlier (Python 2.6x) Kombu that has already been released.

As always, thanks for your consideration and for creating such a great project. It's powering a RedHat product!

@ask

View changes

Show outdated Hide outdated kombu/transport/qpid.py
functionality.
:type loop: kombu.async.Hub
"""
symbol = os.read(self.r, 1)

This comment has been minimized.

@ask

ask Aug 18, 2014

Member

Isn't read blocking here? I don't see anything doing setblocking()?

It should be set non-blocking, and since this is the first read call it needs to catch if getattr(exc, errno) == errno.EAGAIN: raise ConnectionError.

@ask

ask Aug 18, 2014

Member

Isn't read blocking here? I don't see anything doing setblocking()?

It should be set non-blocking, and since this is the first read call it needs to catch if getattr(exc, errno) == errno.EAGAIN: raise ConnectionError.

This comment has been minimized.

@ask

ask Aug 18, 2014

Member

I guess qpid does not support async use, if using a non-blocking socket is difficult then you can keep the blocking behavior. Not optimal though, since it can cause pauses in the worker for big messages especially.

@ask

ask Aug 18, 2014

Member

I guess qpid does not support async use, if using a non-blocking socket is difficult then you can keep the blocking behavior. Not optimal though, since it can cause pauses in the worker for big messages especially.

This comment has been minimized.

@bmbouter

bmbouter Aug 18, 2014

Contributor

This read call is non blocking. The blocking behavior of the file descriptor is set at line 1398 using fnctl.

qpid.messaging is the library that is used, and it does support asyncrhonous message fetching. My use of it is consistent with that and expects qpid.messaging to be fetching messages behind the scenes asynchronously.

Given that the os.read() is not reading from qpid.messaging, do you expect it to raise errno.EAGAIN? If so in what situations? I couldn't think of any, but I could catch that exception like you described if there is a possibility it could be raised.

@bmbouter

bmbouter Aug 18, 2014

Contributor

This read call is non blocking. The blocking behavior of the file descriptor is set at line 1398 using fnctl.

qpid.messaging is the library that is used, and it does support asyncrhonous message fetching. My use of it is consistent with that and expects qpid.messaging to be fetching messages behind the scenes asynchronously.

Given that the os.read() is not reading from qpid.messaging, do you expect it to raise errno.EAGAIN? If so in what situations? I couldn't think of any, but I could catch that exception like you described if there is a possibility it could be raised.

@ask

View changes

Show outdated Hide outdated kombu/transport/qpid.py
message is ready, write a '0' to _w_fd.
"""
while True:
self._session.next_receiver()

This comment has been minimized.

@ask

ask Aug 18, 2014

Member

It's better to have the worker block on self._session.next_receiver() than to start a thread. Eventually a process will be forked at the wrong time and the worker deadlocks.

@ask

ask Aug 18, 2014

Member

It's better to have the worker block on self._session.next_receiver() than to start a thread. Eventually a process will be forked at the wrong time and the worker deadlocks.

This comment has been minimized.

@bmbouter

bmbouter Aug 18, 2014

Contributor

If I eliminated the thread altogether, where would the self._session.next_receiver() be moved to? I can only think of two options, and neither one works.

  1. Move it to the AIO callback function. In that case nothing would ever write to the file descriptor that celery is actually monitoring, so the blocking read would occur correctly except that on_readable would never be called. Qpid.messaging does not provide a file descriptor to monitor by so it's up to my code to create that construct for celery.
  2. Move it to the drain_events method and make it a synchronous transport. That won't work because our code experiences an actual task cancellation bug #2070, and synchronous transports are unsupported and slow anyway.
@bmbouter

bmbouter Aug 18, 2014

Contributor

If I eliminated the thread altogether, where would the self._session.next_receiver() be moved to? I can only think of two options, and neither one works.

  1. Move it to the AIO callback function. In that case nothing would ever write to the file descriptor that celery is actually monitoring, so the blocking read would occur correctly except that on_readable would never be called. Qpid.messaging does not provide a file descriptor to monitor by so it's up to my code to create that construct for celery.
  2. Move it to the drain_events method and make it a synchronous transport. That won't work because our code experiences an actual task cancellation bug #2070, and synchronous transports are unsupported and slow anyway.

ask added a commit that referenced this pull request Aug 18, 2014

@ask ask merged commit cd5747f into celery:master Aug 18, 2014

@bmbouter bmbouter deleted the pulp:qpid-transport branch Oct 24, 2014

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment