-
Notifications
You must be signed in to change notification settings - Fork 224
Producer futures #269
Producer futures #269
Conversation
The tuples handed to _produce() were starting to look like full-on Messages anyway. In preparation for work that would add yet another element to the tuple (a delivery-reporting future), I figured we might as well "upgrade" to a real Message. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
I've so far only done some clumsy testing with this, where I'd manually disconnect broker connections, and wait for SocketDisconnectedError to emerge from the Future - that works. Test suite expansion coming soon. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
…utures This pulls in a fix to handle required_acks=0, which will break this branch (because that setting would leave futures pending forever). * parsely/master: (22 commits) increment dev version raise helpful error if kafka <0.8.2 is in use during offset manager discovery include kafka version at readme top cluster: fix bug with internal topic lookups test_cluster: demonstrate issue #277 producer: don't expect response if required_acks=0 test_producer: add test demonstrating issue #278 allow wheels fix rst update comparison link increment version keep one strong reference to the cluster lower log level for common situation remove unused import hold a weak reference to topic in partition use typeerror, not valueerror better changelog enfore breaking api change with a runtime error cluster: improve topic auto-creation cluster: fix topic auto-create (even on fresh cluster) ... Conflicts: pykafka/producer.py
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
This adds a test for the new producer futures feature, and immediately exposed a bug: I pushed an exception type onto the future instead of an instance. We fix that here. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
@rduplain @emmett9001 @kbourgoin I've just updated the description above. Would be grateful if you can check that this makes sense in terms of interface - and implementation of course. |
Related to #238. |
Connected to #238. |
@emmett9001 can I bug you to review yet another one? :) |
pykafka/producer.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] No need for this linebreak, I think. I've been using 90 characters as a length limit, since github windows hold at least that without scrolling left to right.
Thanks, yep, that's it! There are also some convenience functions in the |
Yes, I think it's a good idea to include a code sample like this right above your paragraph in the README on how to use the futures. |
As per discussion on #269. In order for this to work, parent classes also needed to be given `__slots__` (otherwise instances would still have a `__dict__`). A few tests used the instance's `__dict__` directly and thus needed a tweak. While working that out, I found that `test_snappy_decompression` wasn't running, because it had been misnamed. The test wasn't even py3-compatible yet, but luckily the code it tests was. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
9b7ba1d
to
3c38c82
Compare
This exercises the path in the new producer-futures code where the error encountered is recoverable. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
3c38c82
to
92d1291
Compare
Just some cleanup. Should have done this as part of defining Message.__slots__ Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Users will likely always want to have the message content stapled together with the delivery-future, for context when delivery fails, so doing that for them makes things that bit more convenient. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
5fb609c
to
7aa90e6
Compare
Ok, I was poking around a bit with |
Oh wow. In that case we may want a way to turn off futures with a kwarg or something. We should keep that in mind if memory usage ever becomes a problem, but I don't think we necessarily need to add it now. |
With the changes I made as part of f4f6df1, if the user ignores the future, it's gone immediately (the message only holds a weakref to it). So you'd still have the overhead of creating and destroying one, but you wouldn't have one lying around for every message in your outbound-queue. (The weakref is luckily not in possession of a |
As per discussion on #269. In order for this to work, parent classes also needed to be given `__slots__` (otherwise instances would still have a `__dict__`). A few tests used the instance's `__dict__` directly and thus needed a tweak. While working that out, I found that `test_snappy_decompression` wasn't running, because it had been misnamed. The test wasn't even py3-compatible yet, but luckily the code it tests was. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl> (cherry picked from commit 6b4b1ce) Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
As per discussion on #269. In order for this to work, parent classes also needed to be given `__slots__` (otherwise instances would still have a `__dict__`). A few tests used the instance's `__dict__` directly and thus needed a tweak. While working that out, I found that `test_snappy_decompression` wasn't running, because it had been misnamed. The test wasn't even py3-compatible yet, but luckily the code it tests was. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl> (cherry picked from commit 6b4b1ce) Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
This closes #238.
Error responses from kafka previously only appeared in log output. With the changes here,
produce()
returns aconcurrent.futures.Future
, through which any errors that persist aftermax_retries
can be communicated to user code.For
sync=True
, we simply evaluate theFuture
immediately, so that kafka error responses are raised directly fromproduce()
.This touches a few more lines than a minimal implementation would have done, I hope that's ok. I felt that slapping yet another item (ie the
delivery_future
) ontomessage_partition_tup
was just going to get too unwieldly, so instead ofmessage_partition_tup
we're now moving instances ofMessage
around. The second structural change is in_send_request()
, whereto_retry
previously was an iterable of messages, but given that we were now going to add exceptions to that, and the error responses always apply to a wholeMessageSet
,to_retry
is now an iterable ofMessageSet, Exception
.