Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

Conversation

emmettbutler
Copy link
Contributor

This pull request provides a mechanism by which the Producer can report errors on a per-message basis. Currently, this is implemented as Futures returned from produce(), though this method has some problems with memory inefficiency and will need to be changed. @yungchin had the idea of instead using a deque of errored messages that client code can consume.

See also #269, #240

The tuples handed to _produce() were starting to look like full-on
Messages anyway.  In preparation for work that would add yet another
element to the tuple (a delivery-reporting future), I figured we might
as well "upgrade" to a real Message.

Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit 862e86e)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
I've so far only done some clumsy testing with this, where I'd manually
disconnect broker connections, and wait for SocketDisconnectedError to
emerge from the Future - that works.  Test suite expansion coming soon.

Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit c6bb40e)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>

Conflicts:
	pykafka/producer.py
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit 1613635)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>

Conflicts:
	.travis.yml
See also the related fix on the (future-less) master branch, c434696,
and issue #278 for context.

Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit 4c4e128)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit aff59c3)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
This adds a test for the new producer futures feature, and immediately
exposed a bug: I pushed an exception type onto the future instead of an
instance.  We fix that here.

Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit bd5a15c)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit a63797c)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit b932fb1)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit 1a680f6)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
As per discussion on #269.  In order for this to work, parent classes
also needed to be given `__slots__` (otherwise instances would still
have a `__dict__`).

A few tests used the instance's `__dict__` directly and thus needed a
tweak.  While working that out, I found that `test_snappy_decompression`
wasn't running, because it had been misnamed.  The test wasn't even
py3-compatible yet, but luckily the code it tests was.

Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit 6b4b1ce)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
This exercises the path in the new producer-futures code where the error
encountered is recoverable.

Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit 92d1291)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Just some cleanup. Should have done this as part of defining
Message.__slots__

Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit 330a0fe)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit a1d9c3b)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Users will likely always want to have the message content stapled
together with the delivery-future, for context when delivery fails, so
doing that for them makes things that bit more convenient.

Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit f4f6df1)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
(cherry picked from commit 7aa90e6)
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
The main motivation for this change was that the futures implementation
was just too resource-intensive, both in time and memory consumption.
On that point I think it delivers: informally benchmarking this commit
with 10**5 calls to produce(), then 10**5 calls to get_delivery_report()
results in a time of 4.5s on my box, whereas the parent commit with
10**5 calls to produce(), and the results passed to futures.wait()
results in a time of 7.3s.

This is just a first shot, and obviously needs some more work: we need
to test that the thread-local queues work as intended (so that you can
use a single producer instance across multiple threads in say, a web
server, and not get the delivery reports mixed up), we need to make
reporting optional (if users don't need delivery reports we don't want
to fill up a queue).

Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
@yungchin yungchin force-pushed the feature/producer-futures-3 branch from 785e335 to f85e743 Compare November 18, 2015 15:10
@yungchin
Copy link
Contributor

Status update

I just pushed a first shot at replacing the cancelled futures interface with a queue that delivery reports need to be fetched from, see f85e743 - this is still a work in progress, but I just want to solicit some opinions on the usability of this interface, not to be botching it a second time. There are some rough performance numbers at the bottom of this update.

@emmett9001 @kbourgoin @rduplain @amontalenti

Design considerations / questions

  • For now, this just adds one new public method Producer.get_delivery_report(self, block, timeout) where the arguments are just the same as those to queue.Queue.get(), and the return value is a (Message, Exception/None) tuple (where None means success). You'd call this once for each call to produce() - see example code with benchmarks below
  • I will need to add an init parameter to enable/disable reporting (with reporting disabled it should then run as fast as current master). We may also want other modes of reporting than the currently implemented one, eg "report only failures", or "use one global queue" (rather than the current use of a thread-local queue). I think I'd like to do that sort of switching by just dynamically replacing the get_delivery_report() method based on an option passed to __init__(), rather than creating mixin classes that require bigger overall changes (users would then see different producer types), but I am open to all suggestions and ideas.
  • The main reason I started out with a thread-local queue here is that it nicely supports a use model that the futures interface also supported well: if you have something like a multi-threaded web server, with all threads sharing a single producer, but each thread checking its own delivery failures (or each running with sync=True), then you don't want them to de-queue each others reports. On the other hand, I suppose there may be use models where you'd produce() from multiple threads but have a single thread somewhere handling all delivery failures. This would not be possible with the currently proposed code, so maybe we really do need that "use one global queue" option too? (Not that I know of anyone with that particular use model...)

Quick benchmark

This ran on my laptop, with the kafka cluster also on there, so I've kept only one significant decimal in the timings. They ran in ipython with timeit bench_prod(10**5, linger_ms=100). All these were rebased against current master to make them otherwise similar.

  • current master at 66adb22 ran in 3.8s with this benchmark:
def bench_prod(n, **kwargs):                                           
    with topic.get_producer(**kwargs) as prod:
        for _ in xrange(n):
            prod.produce(None, partition_key="yow")
  • the futures implementation, rebased at 470daf2 ran in 7.3s with
def bench_prod(n, **kwargs):                                           
    with topic.get_producer(**kwargs) as prod:
        concurrent.futures.wait([
            prod.produce(None, partition_key="yow") for _ in xrange(n)])
  • the queue implementation at f85e743 ran in 4.5s with
def bench_prod(n, **kwargs):                                           
    with topic.get_producer(**kwargs) as prod:                                   
        for _ in xrange(n):
            prod.produce(None, partition_key="yow") 
        for _ in xrange(n):
            msg, exc = prod.get_delivery_report()
            assert exc is None

Of course master is at a considerable advantage, not doing any reporting, and I think the overhead of the reports-queue implementation is quite reasonable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function should be factored out, since it's only one line and doesn't make that line significantly more readable than it already is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might grow to a two-liner when we check if reporting is enabled or not, but yeah, it could still be quickly factored out. I had mostly been thinking of what it might all look like if we ended up with a sort of mixin :)

@emmettbutler
Copy link
Contributor Author

@yungchin this looks great, I'm excited to get it into master. My main thought in response to your design considerations is that I'd prefer to avoid adding switchable modes to this system too eagerly. I think we should figure out which mode is the most likely to be generally useful and present it as the only option. If after that we find that users would like tweaks that support different use cases, we can keep these mode suggestions in mind. (This doesn't include enabling/disabling delivery reporting entirely - I agree that's necessary).

I think we should stick with reporting all deliveries, not just errors. The original impetus for this feature was a user who needed confirmation that each message had been produced, and I think it's fine to risk providing slightly more information than some use cases will require. Making this switchable seems like needless machinery for too small a benefit.

After a bit of thought, I believe it's preferable to use thread local queues. Doing so does force delivery reports to be consumed on the same thread, meaning users can't asynchronously retry their failed messages. On the other hand, a global queue means that we break any situation where a producer is being shared across multiple threads, which I think is something that's reasonable to support. The question boils down to how important we think it is to allow delivery reports to be consumed and retries to happen asynchronously - and my opinion is that it's not extremely important.

@yungchin
Copy link
Contributor

Thanks @emmett9001, cool, that sounds like the best plan really - if nobody comes asking for other modes, we'll have avoided a bunch of code that nobody ends up using. The mode that's here now, with thread-local queues, is probably the most flexible one and would be the one I'd be using.

Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
…utures-3

* parsely/master:
  fix held_offsets in balancedconsumer
  fixup sourcecode directives
  add Kafka 0.9 roadmap to ReadTheDocs
  balancedconsumer test pep8
  pass zk connect string through to balanced consumer
  tests: fixes after changes to SimpleConsumer.held_offsets
  remove unused import
  test for zookeeper connect strings in Cluster()
  have Cluster accept zookeeper connect strings as well as broker lists
  return -2 if last_offset_consumed is -1. fixes #216
  wrap another _socket call in a disconnect handler
  balancedconsumer: re-raise ConsumerStoppedException
  Set disconnected on socket errors from util.
  retry topic creation on a random broker. fixes #338
@yungchin
Copy link
Contributor

@emmett9001 I've updated the docs, and delivery-reporting is now disabled with default settings. When it's disabled, the mini-benchmark posted previously runs .1s faster than current master - presumably that's a little Message.__slots__ boost.

If we wanted to have a "global queue" option later, it would be a very small addition: we could just drop in basically the same class as _DeliveryReportQueue, but not inheriting from threading.local.

Also for later reference: I ran a very quick benchmark with a "report errors only" adaptation, and that runs as fast as the no-reporting option (as you'd expect, if no delivery errors occur). So basically that's 10e5 messages in 3.7s for errors-only, and 4.1/4.5s for report-all (the 4.5s figure includes the calls to get_delivery_report()).

@yungchin yungchin assigned emmettbutler and unassigned yungchin Nov 21, 2015
@yungchin yungchin changed the title [WIP] Producer error reporting Producer error reporting Nov 21, 2015
@emmettbutler emmettbutler mentioned this pull request Nov 23, 2015
11 tasks
@emmettbutler emmettbutler force-pushed the feature/producer-futures-3 branch from 28d4332 to 73851a5 Compare November 23, 2015 19:46
@yungchin yungchin force-pushed the feature/producer-futures-3 branch from 73851a5 to 28d4332 Compare November 23, 2015 20:21
emmettbutler added a commit that referenced this pull request Nov 23, 2015
@emmettbutler emmettbutler merged commit 0d4dd32 into master Nov 23, 2015
@emmettbutler emmettbutler deleted the feature/producer-futures-3 branch November 23, 2015 20:33
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants