New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Twisted consume APIs #139
Add Twisted consume APIs #139
Conversation
0b1f44b
to
2a9b277
Compare
Codecov Report
@@ Coverage Diff @@
## master #139 +/- ##
=========================================
- Coverage 96.93% 96.84% -0.1%
=========================================
Files 13 14 +1
Lines 1110 1393 +283
Branches 155 188 +33
=========================================
+ Hits 1076 1349 +273
- Misses 24 32 +8
- Partials 10 12 +2
Continue to review full report at Codecov.
|
1fa6fc5
to
ba96d49
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please also add tests in tox for the Twisted version we have en EPEL7?
fedora_messaging/twisted/protocol.py
Outdated
|
||
Returns: | ||
Consumer: A namedtuple that identifies this consumer. | ||
Deferred: A Deferred that fires when the consumer is successfully | ||
registered with the message broker. The callback receives a list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a list really? It looks like it's only getting the consumer object.
fedora_messaging/twisted/protocol.py
Outdated
) | ||
for c in self._consumers.values(): | ||
c._running = False | ||
yield defer.gatherResults([c._read_loop for c in self._consumers.values()]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That logic is also in the cancel
method above, maybe we should move it to the Consumer class, who knows best how to stop consuming (and optionnaly close the channel).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I'll refactor this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing this refactor made me realize we can just get rid of the twisted_cancel API. Users just call cancel on the consumers directly, and the Consumer can have a private reference to the protocol (and factory) that made it to do the necessary book-keeping on the consumers dictionaries.
fedora_messaging/twisted/protocol.py
Outdated
confirms = False | ||
self._confirms = confirms | ||
self._channel = None | ||
self._running = False | ||
# Map queue names to dictionaries representing consumers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mapping is now to Consumer instances.
|
||
|
||
def halt_exit_0(message): | ||
"""Exit with code when it gets a message""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A number seems to be missing in this docstring.
|
||
def halt_exit_42(message): | ||
"""Exit with code 42 when it gets a message""" | ||
raise exceptions.HaltConsumer(exit_code=42, reason="Life, the universe, everything") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Life, the universe, and everything.
"callback,exit_code,msg", | ||
[ | ||
("halt_exit_0", 0, b"Consumer indicated it wishes consumption to halt"), | ||
("halt_exit_42", 42, b"Life, the universe, everything"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Life, the universe, and everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This critical issue will be addressed in the next revision of this patch
I've added a second commit (which I'll squash when we're happy with things) that should address most of the PR comments, but this isn't quite ready for re-review since I've not figured out the EL7 testing situation yet. Unfortunately, some test dependencies don't work with the version of Twisted in EL7, or aren't in EL7 at all, and pyOpenSSL 13.1 won't even compile with current GCC flags... It's too late in the day to deal with all this, unfortunately. |
Okay, I've poked around enough to be depressed about all our options. There's a few problems:
We could do any of:
|
Ouch :-( I've looked a how Pika recommends doing things, and they have an example of a threaded consumer in examples/basic_consumer_threaded.py. That would be for option 4. I'm however a bit uneasy about providing an API where you can do things like I'm fine with option 3 if we can give a small example of how consumers should run their long-running code in a thread in their message callback without blocking the heartbeating (a couple lines should be enough, creating the thread object and starting it) but mentionning that if their thread fails, the message won't be re-delivered (because it'd had been acked) so it's their responsability. It kinda pushes them to have a task working system for their long-running tasks, and not use fedora-messaging for that. I'm very worried that option 5 will result in you being so annoyed that you stop contributing to this project, and then we'd have lost much more than a nicer solution to option 3. I'm pretty unhappy that using Twisted on EL7 is basically impossible because it's so ancient. Thankfully the new world order of OpenShift should fix this issue. |
raise exceptions.HaltConsumer() | ||
|
||
|
||
def halt_exit_42(message): | ||
"""Exit with code 42 when it gets a message""" | ||
raise exceptions.HaltConsumer(exit_code=42, reason="Life, the universe, everything") | ||
raise exceptions.HaltConsumer( | ||
exit_code=42, reason="Life, the universe, and everything" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:towel:
On 3/13/19 5:14 AM, Aurélien Bompard wrote:
Ouch :-(
I've looked a how Pika recommends doing things, and they have an example of a threaded consumer in [examples/basic_consumer_threaded.py](https://github.com/pika/pika/blob/master/examples/basic_consumer_threaded.py). That would be for option 4. I'm however a bit uneasy about providing an API where you can do things like `blockingCallFromThread()`, but if you're running on EL7 it'll crash. Maybe it's not too bad though, I'm not entirely sure.
Yeah, that's not an angle I'd considered and isn't very appealing...
Although as you point out later anyone using Twisted on EL7 wrote that
code 6 years ago because it's too painful to use that version now.
How many EL7 hosts do we really have out there consuming that couldn't
be on OpenShift? It seems like anything running as a consumer is a prime
candidate for OpenShift so maybe we should just say if you want it on
EL7 you need to pip-install it in a venv.
I'm fine with option 3 if we can give a small example of how consumers should run their long-running code in a thread in their message callback without blocking the heartbeating (a couple lines should be enough, creating the thread object and starting it) but mentionning that if their thread fails, the message won't be re-delivered (because it'd had been acked) so it's their responsability. It kinda pushes them to have a task working system for their long-running tasks, and not use fedora-messaging for that.
Hmm. That is another option, just saying if you have a long-running
tasks you should use celery (we do have RabbitMQ available, after all).
I'm very worried that option 5 will result in you being so annoyed that you stop contributing to this project, and then we'd have lost much more than a nicer solution to option 3.
I'm not too worried about that, I throw out most of the code I write :)
Besides, it's my fault for not doing the threaded approach to our
current code to start with. In any case, I won't be annoyed if that's
what we end up doing - presumably we'll do it because it's the best
option which is what I care about.
Twisted is pretty good regardless so if we go with 5 "toss out"
may be just stash the API in "fedora_messaging.twisted" for Twisted
users.
|
From the top of my head there's going to be Mailman and all the hosts that require persistent storage, since that's still an issue in OpenShift.
Yeah, fedmsg has been used in apps as a distributed task queue, and it's not really its purpose. It's a pretty useful capability though, and transition will be harder if we have to add a task queue system to apps when we migrate them (new code to add, new config, new daemon to run, etc)
Yeah I also think it's the best option we have. I'd like to keep the twisted code in the twisted module, though, just not use it by default. |
670458a
to
e9e7a72
Compare
Okay, so I've had a week to mull and hack at things, and I've got it working with Twisted (see the new commits) on EL7. The problem now is how to get CI for it, which is actually a problem we had before this PR. We test on Python 2.7, but we don't test using the EL7 versions of our dependencies, and we already have some problems (like #142). A number of other tests fail due to pytest being too old, never mind that there's no pytest-twisted or treq. @abompard, you've done a lot of CI stuff for Bodhi, can we hook in a builder from Centos CI or something to get a EL7 environment to install into and test? |
9136c4e
to
d93a19f
Compare
Thanks a lot! The CI error can be ignored, it's just the docs buiding process that couldn't fetch the intersphinx indexes. I've asked Brian for instructions on setting up CentosCI, I'll be happy to see all this work on EL7! :-) |
ad5cc01
to
048bdd1
Compare
Okay @abompard, this is ready for a final review. Note that the test coverage in the CLI drops because some tests move to integration-like tests that use subprocess. Also there's a couple blocks that aren't reachable with Twisted 12. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, here's my review. There are quite a few comments that are totally optional.
Thank you so much for this awesome work Jeremy, it looks really great :-)
|
||
def callback(message): | ||
"""An *exceptionally* useless callback""" | ||
raise Exception("Oh the huge manatee") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I learned a new meme today. Thank you for that.
config.conf = config.LazyConfig() | ||
config.conf["client_properties"]["app"] = function.__name__ | ||
if api._twisted_service: | ||
api._twisted_service.stopService() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think stopService()
returns a Deferred
, we should probably wait for it here.
pytest.fail("Timeout reached without consumer receiving message!") | ||
|
||
assert not message_processed.called | ||
api._twisted_service.stopService() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think stopService()
returns a Deferred
, we should probably yield to it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually added a yield at the end of the function here and asserted the service still hasn't stopped by the time the message is processed, but it's the same idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I don't understand what you mean. Which yield are you talking about?
message_received, message_processed = defer.Deferred(), defer.Deferred() | ||
|
||
def callback(message): | ||
"""Count to 3 and quit.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This docstring seems incorrect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That, or I am incredibly bad at counting to three
except defer.TimeoutError: | ||
pytest.fail("Timeout reached without consumer halting!") | ||
finally: | ||
consumers[0].cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consumer.cancel()
seems to return a Deferred
, we should yield it here.
except exceptions.HaltConsumer: | ||
assert len(messages_received) == 3 | ||
except defer.TimeoutError: | ||
consumers[0].cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consumer.cancel()
seems to return a Deferred
, we should yield it here.
response = yield treq.post(url, json=body, auth=HTTP_AUTH, timeout=3) | ||
response = yield response.json() | ||
assert response["routed"] is True | ||
consumers[0].cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consumer.cancel()
seems to return a Deferred
, we should yield it here. Otherwise I don't think we have any guarantee that the consumer will actually process the incoming invalid message, and this test will always succeed.
|
||
assert consumers1[0]._tag == consumers2[0]._tag | ||
|
||
consumers2[0].cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consumer.cancel()
seems to return a Deferred
, we should yield it here.
news/pr.139
Outdated
@@ -0,0 +1,4 @@ | |||
A new API, :func:`fedora_messaging.api.twisted_consume`, has been added to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file should be named PR139.feature
.
Thanks for the review, I've pushed a new commit with (I think) everything addressed. If you're happy with it I'll squash and merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, looks great, thanks again! :-)
634baad
to
93edf5f
Compare
This adds a new API to fedora-messaging, twisted_consume(), which provides a way for users to start AMQP consumers that run until explicitly canceled or until a fatal error occurs on the broker. This introduces new versions of the Protocol, Factory, and Service classes as the APIs of these Twisted classes had to change. The original classes have been deprecated and will be removed in fedora-messaging v2.0.0. One thing to note is that these new classes use the standard library's logging module. While it is true that these are blocking calls which may use the network and happen in the reactor thread, it makes configuring logging for consumers much easier. Signed-off-by: Jeremy Cline <jcline@redhat.com>
Additionally, the documentation for the CLI is expanded to cover expected exit codes and how signals are handled. Signed-off-by: Jeremy Cline <jcline@redhat.com>
Signed-off-by: Jeremy Cline <jcline@redhat.com>
93edf5f
to
4f02b77
Compare
This adds two new APIs to fedora-messaging:
twisted_consume() provides a way for users to start AMQP consumers that
run until explicitly canceled or until a fatal error occurs on the
broker.
twisted_cancel_consume() stops consumers started by
twisted_consume().
This introduces new versions of the Protocol, Factory, and Service
classes as the APIs of these Twisted classes had to change. The original
classes have been deprecated and will be removed in fedora-messaging
v2.0.0.
Note: this is currently a work-in-progress - documentation and a few more tests are required. I'm just making sure the Ubuntu-based CI is passing with the new integration tests.