-
Notifications
You must be signed in to change notification settings - Fork 881
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add metadata field to topicpartition #1126
Conversation
9f53a6e
to
a000505
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't forget a CHANGELOG.md update
8030780
to
27060b4
Compare
A Kafka offset commit message includes optional metadata, but python client doesn't support it yet.
27060b4
to
a0a30b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metadata is binary and needs to be handled as such. The string operations will not work.
return 0; | ||
} | ||
|
||
static void TopicPartition_setup (TopicPartition *self, const char *topic, | ||
int partition, long long offset, | ||
const char *metadata, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
metadata is a binary blob, not a string, so we can't use strlen() to know the end of the data, the size must be passed explicitly. The corresponding Python type should be bytes
(for py3, there are macros to help with py2 in confluent_kafka.h)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not seem to be fixed
dcc236d
to
d709176
Compare
d8a4a5e
to
6db4f9c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good but needs some further tweaking
tests/integration/consumer/test_consumer_topicpartition_metadata.py
Outdated
Show resolved
Hide resolved
for tp in offsets: | ||
assert tp.metadata is None | ||
|
||
metadata = '\x01abcdefgafd\xff' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For string include byte works fine on linux, but report errors on mac, not sure if it's caused by different gcc or others, will waiting for ci to verify this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some nul-characters into the string too.
And add the b
byte prefix: metadata = b'...'
return 0; | ||
} | ||
|
||
static void TopicPartition_setup (TopicPartition *self, const char *topic, | ||
int partition, long long offset, | ||
const char *metadata, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not seem to be fixed
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iL", kws, | ||
&topic, &partition, &offset)) | ||
|
||
if (PyArg_ParseTupleAndKeywords (args, kwargs, "s|iLy", kws, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove space before paren
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The y
formatter token docs says:
The bytes buffer must not contain embedded null bytes
But this is binary data, so it can contain anything. Need to use another formatter token here.
NULL }; | ||
|
||
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iL", kws, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We typically try to keep the happy-path (non-error path) unindented, so restore the original:
if (!Py..))
return -1;
TopicPartition_setup(),..
@@ -1084,6 +1106,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) { | |||
rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) { | |||
rd_kafka_topic_partition_list_t *c_parts; | |||
size_t i; | |||
rd_kafka_topic_partition_t *rktpar; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this variable is only used in the for-loop, put it there instead.
&metadata_size)) == -1) { | ||
PyErr_SetString(PyExc_RuntimeError, | ||
"Failed to parse metadata " | ||
"object"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The partition list is leaked
c.commit(offsets=[TopicPartition(topic, 0, 0)], | ||
asynchronous=False) | ||
except KafkaException as e: | ||
print('commit failed with %s' % e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure you want to catch this and just print, if the commit fails the test should fail, so just leave it without a try:
for tp in offsets: | ||
assert tp.metadata is None | ||
|
||
metadata = '\x01abcdefgafd\xff' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some nul-characters into the string too.
And add the b
byte prefix: metadata = b'...'
try: | ||
c.commit(offsets=[TopicPartition(topic, 0, 1, binarymetadata)], | ||
asynchronous=False) | ||
except KafkaException as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dito
assert tp.metadata is None | ||
|
||
metadata = '\x01abcdefgafd\xff' | ||
binarymetadata = str.encode(metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is simpler to just define metadata as a byte string and not do any encoding.
except KafkaException as e: | ||
print('commit failed with %s' % e) | ||
|
||
offsets = c.committed([TopicPartition(topic, 0)], timeout=100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add: assert len(offsets) == 1
Closing this since #1410 is merged. |
No description provided.