Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first attempt at batch consumption of messages #282

Merged
merged 11 commits into from Dec 7, 2017

Conversation

tburmeister
Copy link
Contributor

This is a first attempt at adding batch consumption, a la #252

Have not done a ton of testing yet, but I am able to retrieve messages. Let me know what you think.

@ghost
Copy link

ghost commented Dec 4, 2017

It looks like @tburmeister hasn't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence.
Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

@@ -15,6 +15,7 @@
*/

#include "confluent_kafka.h"
#include <stdio.h>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had some debug printfs in earlier, will remove this.

(Py_ssize_t *)&num_messages, &tmout))
return NULL;

CallState_begin(self, &cs); // This unlocks GIL
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment was for me; will remove.


CallState_begin(self, &cs); // This unlocks GIL

rkqu = rd_kafka_queue_get_consumer(self->rk);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this returns NULL if consumer was not properly initialized, which leads to a segfault further down; maybe just need to check this and exit if NULL?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should not happen, so no need to check

PyObject *kwargs) {
size_t num_messages = 100;
double tmout = -1.0f;
static char *kws[] = { "timeout", "num_messages", NULL };
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is backwards, should be { "num_messages", "timeout", NULL }.

@@ -743,6 +743,67 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
}


static PyObject *Consumer_consume (Handle *self, PyObject *args,
PyObject *kwargs) {
size_t num_messages = 100;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 seems like a better default.


CallState_begin(self, &cs);

rkqu = rd_kafka_queue_get_consumer(self->rk);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional future improvement: cache this on self

return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|nd", kws,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use I for num_messages (unsigned int) to avoid checking for < 0.

}

if (n < 0) {
free(rkmessages);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should raise exception from rd_kafka_last_error() here, something like:
cfl_PyErr_Format(rd_kafka_last_error(), "%s", rd_kafka_err2str(rd_kafka_last_error());

and return NULL (to raise the exception)

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|nd", kws,
(Py_ssize_t *)&num_messages, &tmout))
return NULL;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to make sure num_messages is not completely out of bounds since we're using the value directly to allocate memory.
If it is higher than 1M messages, raise an ValueError with a proper error message

Py_ssize_t n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu,
tmout >= 0 ? (int)(tmout * 1000.0f) : -1,
rkmessages,
num_messages > 0 ? (size_t)num_messages : 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can drop this check if using unsigned int

" .. note: Callbacks may be called from this method, "
"such as ``on_assign``, ``on_revoke``, et.al.\n"
"\n"
" :param int num_messages: Maximum number of messages to return.\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • (default: 1)

"\n"
" :param int num_messages: Maximum number of messages to return.\n"
" :param float timeout: Maximum time to block waiting for message, event or callback.\n"
" :returns: A list of Message objects or None on timeout\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to use a different return type for timeout? An empty list seems easier to handle from the application's perspective.

"such as ``on_assign``, ``on_revoke``, et.al.\n"
"\n"
" :param int num_messages: Maximum number of messages to return.\n"
" :param float timeout: Maximum time to block waiting for message, event or callback.\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(default: infinite (-1))


CallState_begin(self, &cs); // This unlocks GIL

rkqu = rd_kafka_queue_get_consumer(self->rk);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should not happen, so no need to check

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good stuff!
What needs to be done:

  • minor edits from code review
  • unit test in tests/test_Consumer.py (to verify the API)
  • integration test in examples/integration_test.py (to verify it works in practice).

@tburmeister
Copy link
Contributor Author

[clabot:check]

@ghost
Copy link

ghost commented Dec 5, 2017

@confluentinc It looks like @tburmeister just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@tburmeister
Copy link
Contributor Author

Made suggested changes. Will work on unit tests and integration tests next, and might take a stab at caching the queue.

@edenhill
Copy link
Contributor

edenhill commented Dec 5, 2017

Great! For caching the queue, to avoid locking issues, I suggest extracting the queue in Consumer_init() right after poll_set_consumer() and destryoing the queue in Consumer_close() after the rd_kafka_consumer_close() call, and in Consumer_dealloc() prior to rd_kafka_destroy() (and outside the Callstate )

}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|Id", kws,
(unsigned int *)&num_messages, &tmout))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not safe to cast to integer pointers of possibly other size.
Use unsigned int for num_message's type directly to avoid the case.

" :param int num_messages: Maximum number of messages to return (default: 1).\n"
" :param float timeout: Maximum time to block waiting for message, event or callback (default: infinite (-1)).\n"
" :returns: A list of Message objects\n"
" :rtype: list(Message) or None\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not return None anymore.

"\n"
" :param int num_messages: Maximum number of messages to return (default: 1).\n"
" :param float timeout: Maximum time to block waiting for message, event or callback (default: infinite (-1)).\n"
" :returns: A list of Message objects\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • (possibly empty on timeout)

" :param float timeout: Maximum time to block waiting for message, event or callback (default: infinite (-1)).\n"
" :returns: A list of Message objects\n"
" :rtype: list(Message) or None\n"
" :raises: RuntimeError if called on a closed consumer\n"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or KafkaError in case of internal error.

@tburmeister
Copy link
Contributor Author

Missed your comment before pushing my last commit with caching, but that all makes sense.

@tburmeister
Copy link
Contributor Author

I added some tests, but I'm having trouble running the tests because synchronous commits crash on my machine; commit_return.c_parts appears to be NULL in Consumer_commit.

@edenhill
Copy link
Contributor

edenhill commented Dec 5, 2017

What librdkafka version?

@tburmeister
Copy link
Contributor Author

tburmeister commented Dec 5, 2017

Version 0.11.0.

Edit: upgraded to 0.11.3 but still segfaulting.

@edenhill
Copy link
Contributor

edenhill commented Dec 5, 2017

Ack, do you have a gdb backtrace to share?
list and bt

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting closer! :)


if msgcnt == 1:
t_first_msg = time.time()
if msgcnt >= max_msgcnt:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elif

if msgcnt >= max_msgcnt:
break

if msgcnt >= max_msgcnt:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this to the while ..


if msgcnt > 0:
t_spent = time.time() - t_first_msg
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' %
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what thruput are you getting with the batch interface, compared to the standard one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having trouble running the integration test, but running a quick test I wrote on an actual Kafka stream I got:

consume took 1.33 seconds
1000000 messages, 662.56 MB, 497.54 MB/sec

poll tool 2.17 seconds
1000000 messages, 662.62 MB, 304.57 MB/sec

So definitely an improvement, though I found I had to tune the batch size to get the best results, e.g. 1k messages performs better than 10k messages or 100 messages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

damn!

# Consume until EOF or error

# Consume message (error()==0) or event (error()!=0)
msglist = c.consume(max_msgcnt, 1.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Knowing that there is at least N messages in the topic (thanks to the produce tests), and where N > max_msgcnt (which it should be), we should check that we actually get max_msgcnt back for each call until we've hit our expected message count (which is some arbitrary value <=N).
Relying on EOF is not safe since the topic may feature multiple partitions and we don't know what the message distribution is like.

@@ -41,6 +41,17 @@ def dummy_assign_revoke(consumer, partitions):
if msg is not None:
assert msg.timestamp() == (TIMESTAMP_NOT_AVAILABLE, -1)

msglist = kc.consume(num_messages=10, timeout=0.001)
if len(msglist) == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't we expecting exactly 0 messages here? use assert len(msglist) == 0, "expected 0 messages, not %d" % len(msglist)

@@ -756,6 +815,11 @@ static PyObject *Consumer_close (Handle *self, PyObject *ignore) {

rd_kafka_consumer_close(self->rk);

if (self->u.Consumer.rkqu) {
rd_kafka_queue_destroy(self->u.Consumer.rkqu);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent looks weird.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix. I noticed that there are a bunch of tabs in the code currently, and I gather from the majority of the code and the librdkafka style guide that spaces are preferred - I can clean those up as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the tabs are part of the dark past.
But you don't need to change whitespace of existing code, we'll do that later in one big go across the code base as we also instate coding guidelines. But thanks anyway!

break

print('OK: consumed messages')

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a failing test for invalid number of num_messages (e.g., -100 and > 1M)

@tburmeister
Copy link
Contributor Author

tburmeister commented Dec 5, 2017

Struggling to get debug symbols working properly on my Mac, so the best I can give you right now is:

(gdb) bt
#0  0x0000000101be67e0 in c_parts_to_py () from /Users/tburmeister/anaconda/lib/python3.6/site-packages/confluent_kafka/cimpl.cpython-36m-darwin.so
#1  0x0000000101becceb in Consumer_commit () from /Users/tburmeister/anaconda/lib/python3.6/site-packages/confluent_kafka/cimpl.cpython-36m-darwin.so
#2  0x000000010005894a in _PyCFunction_FastCallDict () from /Users/tburmeister/anaconda/lib/libpython3.6m.dylib
#3  0x0000000100058b5b in _PyCFunction_FastCallKeywords () from /Users/tburmeister/anaconda/lib/libpython3.6m.dylib
#4  0x00000001000da787 in call_function () from /Users/tburmeister/anaconda/lib/libpython3.6m.dylib
#5  0x00000001000d7001 in _PyEval_EvalFrameDefault () from /Users/tburmeister/anaconda/lib/libpython3.6m.dylib
#6  0x00000001000db9ed in fast_function () from /Users/tburmeister/anaconda/lib/libpython3.6m.dylib
#7  0x00000001000da761 in call_function () from /Users/tburmeister/anaconda/lib/libpython3.6m.dylib
#8  0x00000001000d6f62 in _PyEval_EvalFrameDefault () from /Users/tburmeister/anaconda/lib/libpython3.6m.dylib
#9  0x00000001000db1ef in _PyEval_EvalCodeWithName () from /Users/tburmeister/anaconda/lib/libpython3.6m.dylib
#10 0x00000001000d03d4 in PyEval_EvalCode () from /Users/tburmeister/anaconda/lib/libpython3.6m.dylib
#11 0x00000001001060f1 in PyRun_FileExFlags () from /Users/tburmeister/anaconda/lib/libpython3.6m.dylib
#12 0x00000001001058a3 in PyRun_SimpleFileExFlags () from /Users/tburmeister/anaconda/lib/libpython3.6m.dylib
#13 0x000000010011e70f in Py_Main () from /Users/tburmeister/anaconda/lib/libpython3.6m.dylib
#14 0x0000000100000df8 in main ()

I do not have this issue on my Ubuntu dev machine.

@edenhill
Copy link
Contributor

edenhill commented Dec 5, 2017

I added some tests, but I'm having trouble running the tests because synchronous commits crash on my machine; commit_return.c_parts appears to be NULL in Consumer_commit.

That is weird since there is assert just above that checks that commit_return.c_parts is not NULL:
https://github.com/confluentinc/confluent-kafka-python/blob/master/confluent_kafka/src/Consumer.c#L480

Can you add some printfs around that part to figure out what is going on?

@tburmeister
Copy link
Contributor Author

tburmeister commented Dec 5, 2017

The assert seems to not actually be checked. I added:

                /* sync commit returns the topic,partition,offset,err list */
                assert(commit_return.c_parts);
                printf("commit_return.c_parts: %p\n", commit_return.c_parts);

at line 477, and get:

============================== Verifying Consumer ==============================
test[0]@193: key=b'0', value=b'Message #0', tstype=1, timestamp=1512499907092
test[0]@194: key=b'1', value=b'Message #1', tstype=1, timestamp=1512499907092
test[0]@195: key=b'2', value=b'Message #2', tstype=1, timestamp=1512499907092
test[0]@196: key=b'3', value=b'Message #3', tstype=1, timestamp=1512499907092
commit_return.c_parts: 0x0
Segmentation fault: 11

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only nits left now!

@@ -589,21 +589,17 @@ def verify_batch_consumer():
max_msgcnt = 100
msgcnt = 0

while True:
while msgcnt < max_msgcnt:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let max_msgcnt=1000 and batch_cnt=100

@@ -715,12 +706,9 @@ def my_on_revoke(consumer, partitions):

if msgcnt == 1:
t_first_msg = time.time()
if msgcnt >= max_msgcnt:
elif msgcnt >= max_msgcnt:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can get rid of this one now with the while loop

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The crash might be that no offsets were committed, it is most likely not related to your changes though so we can fix them separately, you can ignore it for now (you can change it to return None if c_parts is empty in your local code).

# Consume message (error()==0) or event (error()!=0)
msglist = c.consume(max_msgcnt, 1.0)
# Consume messages (error()==0) or event (error()!=0)
msglist = c.consume(batch_cnt, 1.0)
assert len(msglist) == max_msgcnt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is failing, right?
the assert should check for batch_cnt, not max_msgcnt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good catch.

# Consume message (error()==0) or event (error()!=0)
msglist = c.consume(max_msgcnt, 1.0)
# Consume messages (error()==0) or event (error()!=0)
msglist = c.consume(batch_cnt, 1.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the test more robust by setting timeout to >session.timeout.ms, such as 10


# Consume messages (error()==0) or event (error()!=0)
msglist = c.consume(batch_cnt, 1.0)
assert len(msglist) == max_msgcnt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should compare to batch_cnt


msgcnt += 1

print('max_msgcnt %d reached' % msgcnt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong indent, should be outside while loop.


if msgcnt > 0:
t_spent = time.time() - t_first_msg
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' %
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

damn!

@tburmeister
Copy link
Contributor Author

Ohhhh I figured out why commits were crashing and it is my fault - will push a fix soon.

@tburmeister
Copy link
Contributor Author

I can now run the full integration test locally, without errors. I really should have confirmed the tests worked locally on master first.

@tburmeister
Copy link
Contributor Author

So what needs to be done to get this merged? Looking at the CI tests, it looks like it's passing the actual tests but failing because of pep8 violations, which I believe are addressed in #279

@edenhill edenhill merged commit c864b64 into confluentinc:master Dec 7, 2017
@edenhill
Copy link
Contributor

edenhill commented Dec 7, 2017

Huge thanks for doing this! 😍

@tburmeister
Copy link
Contributor Author

Awesome! Thanks! This feature will be super useful to me. By the way, we are big librdkafka users at AppNexus.

@isamaru
Copy link

isamaru commented Dec 13, 2017

Thanks! This is very useful to us. Any timeline for when this becomes part of a pypi release?

@edenhill
Copy link
Contributor

@isamaru We're aiming for a feature release end of january.

@messense
Copy link
Contributor

@edenhill Maybe you can upload a pre-release version to PyPi so we can try it? Thanks!

https://packaging.python.org/tutorials/distributing-packages/#pre-release-versioning

@aswinjoseroy
Copy link

Awesome work adding this feature! Any updates as to when the pypi release will happen?

@edenhill
Copy link
Contributor

Final release in about 2 weeks, we'll have an rc out hopefully this week

@aswinjoseroy
Copy link

Is the batch consumer released yet? Any links to usage / any documentation etc are highly appreciated! Thanks!

@alannesta
Copy link

alannesta commented Oct 12, 2018

Is the batch consumer released yet? Any links to usage / any documentation etc are highly appreciated! Thanks!

@aswinjoseroy Take a look at the consume method in the doc. That's the new function added in this PR
https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.Consumer.consume

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants