-
Notifications
You must be signed in to change notification settings - Fork 224
rdkafka-backed Producer #234
rdkafka-backed Producer #234
Conversation
This compiles, that's all I promise right now. Working on the python wrapper class to go with it, and also the delivery-callback is still a todo. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
…oducer Brings in the new async producer code. * parsely/master: (80 commits) retry in response to LeaderNotAvailable clearer instructions on backporting changelog and stubbed contribution guide better readme example raise_worker_exceptions should be private change producer example in readme use partition key for partitioning, not the whole message remove stale docstring Fix _partition_messages in producer major version bump for producer api change workable default for linger_ms fix tests after linger_ms bugfix fix bug causing linger_ms=0 to trigger flush attempts even when the queue is empty signal on min_queued_messages, not empty queue add test for unicode ensure message is str waiting on stop() is actually fine, since _wait_all can raise exceptions simplify resolve_event_state simpler signature for produce() test thread failure ...
To start, for now use something like prod = pykafka.rdkafka.RdKafkaProducer(topic._cluster, topic) Still upcoming: delivery reporting. And tests. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
For those who don't like to wait forever all the time... Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Trying to un-confuse github. * feature/rdkafka_extension:
pykafka/rdkafka/producer.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kbourgoin @emmett9001 @rduplain can I bug you guys - and also everyone else who's listening in - for some input on this design choice in particular, before I spend too many hours hammering out the further implementation in the C code?
What this would do is make produce()
return a future (either a pykafka.handlers.ResponseFuture
or a backported concurrent.futures.Future
or something else, if you prefer) and that would just evaluate to None
if the message made it to the broker or would raise an appropriate exception from pykafka.exceptions
if not (ie if max_retries got exceeded for some messages, and what went wrong). And for self._synchronous=True
it could then just evaluate the future as part of produce()
.
(I'd also volunteer to write the pullreq to make that work the same way in pykafka.Producer
if we like it.)
Now the question: I'd like this futures-based design mostly because it is easy to reason about from a user perspective - 1 message in, 1 future out, and you can check it or ignore it, whatever suits - but I'd like your thoughts about the practicality of it in a bigger application. Any and all alternative interface suggestions very welcome.
(I'll note that concurrent.futures
can also facilitate a callback interface, if needs be...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the reason to do this is to take advantage of rdkafka's delivery reports, which makes sense. Implementing these in the Producer
should be straightforward, since we're checking for message send success anyway. I believe I'd do it by having OwnedBroker.enqueue
create and return a future that it also adds to the enqueued (kv, partition_id)
tuple. On the other side, we could fill in the future with whatever values were appropriate based on the fate of the message.
Given that I can picture the implementation already, there's a clear upside (more descriptive return values from produce
) and I can't come up with any huge downsides for the average user (who can simply ignore that produce
returns anything), I'm +1 to this idea. The question in my mind is whether there is any better way to provide message statuses to the client - a callback interface sounds clumsy, and we already know that a synchronously returned status from produce
only gets us half of the way there.
Assuming we agree this makes sense, it should have its own issue number so we can more easily partition the pure python and rdkafka implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, yes that's precisely the reason. I was wondering about the same thing - would there be any better way?
I suspect that most users who'd want to evaluate the futures returned (to act on errors) would want to bulk-process them (and with concurrent.futures
some facilities for that come provided) in a pattern roughly like so:
# Warning: potential pseudo-code ahead
pending = []
while True:
for _ in xrange(big_number):
msg = rawdata_consumer.consume()
result = do_stuff_with_msg(msg)
future = nextstage_producer.produce(result)
pending.append(future)
timeout = 0 # ie don't wait, just grab what's been delivered
done, not_done = concurrent.futures.wait(pending, timeout)
errors = [f.exception() for f in done if f.exception() is not None]
handle_errors(errors)
pending = not_done
... and so, one might argue that if that's going to be the pattern, you ought to abstract the pattern into the producer interface, to save everyone the hassle of writing it all the time. On the other hand, it's not a terribly big or complicated pattern, and not baking it into the interface is more flexible, and we can always add convenience-wrappers for this later? So... I guess I'm starting to lean the way of just returning a future (but not a handlers.ResponseFuture
, which would lack all the stuff around it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming we agree this makes sense, it should have its own issue number [...]
I've put it up at #238 :)
Bringing in all the py3 compatibility work. * feature/rdkafka_extension: (57 commits) tox: pass env vars for gcc Revert "travis/tox: reconfigure compiler search paths" tox: pass env vars to detect test cluster travis/tox: dumb down paths travis/tox: reconfigure compiler search paths rdkafka: py3 compatibility travis: wait let me fix that again travis: additional search paths for gcc build travis: add back testinstances install fix typos, add callout in readme write up initial offset logic. fixes #251 separate listings add usage page to docs reset version to 2.0.0-dev. 2.0.0 hasn't been pushed to PyPI and it has some issues, so hold back on calling it ready. produce a bit more in LATEST test to avoid hanging produce one more to fix hanging test style Produce fewer messages in LATEST balanced consumer test Fixes #241 so that auto_offset_reset=-1 consumes from latest available offset when no committed offsets are availble use "dev" marker to clarify the current master release. fixes #243 ... Conflicts: pykafka/producer.py
These are currently failing - fixes coming up. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Note this still throws some test errors, but those aren't about interpreter compatibility. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
See docstring for details. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Previously we would raise an _rd_kafka.Error here, which is not pretty, because it means user code would have to handle different exceptions depending on whether they used Producer or RdKafkaProducer. This commit imports pykafka.exceptions into the C module, so from here on we can raise the appropriate exceptions immediately, without having the wrapper class translate them. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
* feature/rdkafka_extension: rdkafka: toss out ConsumerStoppedException-translation rdkafka: import pykafka.exceptions into C module Conflicts: pykafka/rdkafka/_rd_kafkamodule.c
This passes tests on py27 and py34, but for now crashes and burns on pypy: I have a hunch that has to do with our reliance on relatively long-lived opaque pointers to futures (it may mean that we'll have to disable support for delivery-futures on pypy, or find a better way - that's for an other commit). The previous beginnings of futures-support that I'd already put in place have been given a rethink: the incref'ing of futures, without storing the actual reference anywhere was bound to cause ref leaks, so now we actually stick them in a PySet, from which they are deleted when the delivery callback runs. Also, the futures we're emitting are now the py34-native concurrent.futures.Futures. Still to come: tests, and some workarounds for pypy. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Quick status update: this now passes all tests that we have for The delivery-reporting needs some tests, which I intend to write as part of #238 - but seems to work well enough. That is, save for some test suite additions and bug fixes, this branch is feature complete. |
These are the minimal changes to make things work on pypy. The problem with the previous implementation was that we'd pass raw PyObject pointers as opaque pointers through librdkafka, when in fact pypy might move objects around in memory, so we'd end up with invalid pointers. As a workaround, we now store Futures in a dict, using a simple size_t counter to generate dict keys. It's this size_t key that we pass around as the msg_opaque. In addition, we changed the overall application opaque pointer too: we previously used an RdkHandle pointer, of which pypy might not be aware that we were shipping its pointer around (and thus another way to fix that would have been to incref that object), but now we use the direct pointer to the dict holding the Futures - which we already held an explicit reference to, and which should have been a better choice to begin with. Still upcoming: we need to do some refactoring (marked with TODOs) to create a simpler references-cleanup flow. Also, some comments to explain the pypy-trickery. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
With some refactoring to make this more tractable. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
This comes with a big refactor, making it a lot more readable. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Another update: delivery-reporting futures now also work fine on pypy (that took a bit of an overhaul actually). That means we can probably declare this branch feature-complete. The tests still show one minor quirk, which is that librdkafka producer threads aren't cleaned up consistently on pypy, but that should be easily fixed if we define an explicit stop() method like we did for the consumer. |
…oducer This pulls in changes to Producer's internal interface (which break RdKafkaProducer), as well as new tests for required_acks settings (which may or may not fail, after I've fixed the other breakage). * parsely/master: (49 commits) increment dev version raise helpful error if kafka <0.8.2 is in use during offset manager discovery include kafka version at readme top cluster: fix bug with internal topic lookups test_cluster: demonstrate issue #277 producer: don't expect response if required_acks=0 test_producer: add test demonstrating issue #278 allow wheels fix rst update comparison link increment version keep one strong reference to the cluster lower log level for common situation remove unused import hold a weak reference to topic in partition use typeerror, not valueerror better changelog enfore breaking api change with a runtime error cluster: improve topic auto-creation gitignore cache; ... Conflicts: pykafka/producer.py
That turned out a simple one-line update; the new required_acks tests mentioned in the parent commit pass without any modifications. (Note there is still a pypy test failure here, unrelated to recent commits - to fix that we'll be explicitly cleaning up rdkafka handles in the producer.) Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
This avoids a test failure on pypy, where, if the rdkafka producer-tests run before the consumer-tests that this commit alters, the latter get confused because the former's threads haven't disappeared yet. I thought about forcing the thread clean-up, by making that happen in a stop() method rather than only when the destructor runs, and then making sure all tests always call stop() (this is the route that the simple_consumer tests have taken). But the code should also work if users don't call stop(), and so it's better that not all tests call stop() either. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
This isn't perhaps a necessary change, but it brings behaviour that bit closer to that of pykafka.Producer: background threads will now terminate when stop() is called, rather than staying around until the producer object is deallocated. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Mostly just un-confusing github views again. * feature/rdkafka_extension: cherrypick zookeeper ownership check. fixes #272 reimplement __iter__ in BalancedConsumer. fixes #284 Revert "add new python versions to travis.yml" add new python versions to travis.yml improve balancedconsumer docs typo disallow consume() on a stopped consumer
This was an omission that should have been committed as part of 56352a4 - without it, the tests just grab a pure-python producer all the time. Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
I'm going to merge this into #176, because there's too much stuff here that I turn out to want to use in the consumer code as well, and cherry-picking and resolving conflicts will become nigh-intractable. As a final status update here: all the issues mentioned in earlier comments above have been resolved. The one thing left doing, which I'll carry over to #176, is that once #269 has been merged, we should carry the tests over to assert the two producers really have matching interfaces. |
This will depend on the new async Producer coming up in #177.