Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata to TopicPartition #1410

Merged
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Confluent's Python client for Apache Kafka


## v1.10.0
milindl marked this conversation as resolved.
Show resolved Hide resolved

- Add metadata to TopicPartition type and commit() (#1410).


## v1.9.2

v1.9.2 is a maintenance release with the following fixes and enhancements:
Expand Down
52 changes: 41 additions & 11 deletions src/confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -816,15 +816,27 @@ static int TopicPartition_clear (TopicPartition *self) {
Py_DECREF(self->error);
self->error = NULL;
}
if (self->metadata) {
free(self->metadata);
self->metadata = NULL;
}
return 0;
}

static void TopicPartition_setup (TopicPartition *self, const char *topic,
int partition, long long offset,
const char *metadata,
milindl marked this conversation as resolved.
Show resolved Hide resolved
rd_kafka_resp_err_t err) {
self->topic = strdup(topic);
self->partition = partition;
self->offset = offset;

if (metadata != NULL) {
self->metadata = strdup(metadata);
} else {
self->metadata = NULL;
}

self->error = KafkaError_new_or_None(err, NULL);
}

Expand All @@ -843,18 +855,22 @@ static int TopicPartition_init (PyObject *self, PyObject *args,
const char *topic;
int partition = RD_KAFKA_PARTITION_UA;
long long offset = RD_KAFKA_OFFSET_INVALID;
const char *metadata = NULL;

static char *kws[] = { "topic",
"partition",
"offset",
"metadata",
NULL };

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iL", kws,
&topic, &partition, &offset))
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iLs", kws,
milindl marked this conversation as resolved.
Show resolved Hide resolved
&topic, &partition, &offset,
&metadata)) {
return -1;
}

TopicPartition_setup((TopicPartition *)self,
topic, partition, offset, 0);

topic, partition, offset, metadata, 0);
return 0;
}

Expand Down Expand Up @@ -889,6 +905,9 @@ static PyMemberDef TopicPartition_members[] = {
" :py:const:`OFFSET_STORED`,"
" :py:const:`OFFSET_INVALID`\n"
},
{"metadata", T_STRING, offsetof(TopicPartition, metadata), READONLY,
"attribute metadata: Optional application metadata committed with the "
"offset (string)"},
{ "error", T_OBJECT, offsetof(TopicPartition, error), READONLY,
":attribute error: Indicates an error (with :py:class:`KafkaError`) unless None." },
{ NULL }
Expand Down Expand Up @@ -1038,14 +1057,15 @@ PyTypeObject TopicPartitionType = {
* @brief Internal factory to create a TopicPartition object.
*/
static PyObject *TopicPartition_new0 (const char *topic, int partition,
long long offset,
long long offset, const char *metadata,
rd_kafka_resp_err_t err) {
TopicPartition *self;

self = (TopicPartition *)TopicPartitionType.tp_new(
&TopicPartitionType, NULL, NULL);

TopicPartition_setup(self, topic, partition, offset, err);
TopicPartition_setup(self, topic, partition,
offset, metadata, err);

return (PyObject *)self;
}
Expand All @@ -1069,7 +1089,9 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
PyList_SET_ITEM(parts, i,
TopicPartition_new0(
rktpar->topic, rktpar->partition,
rktpar->offset, rktpar->err));
rktpar->offset,
rktpar->metadata,
rktpar->err));
}

return parts;
Expand All @@ -1094,6 +1116,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist));

for (i = 0 ; i < (size_t)PyList_Size(plist) ; i++) {
rd_kafka_topic_partition_t *rktpar;
TopicPartition *tp = (TopicPartition *)
PyList_GetItem(plist, i);

Expand All @@ -1106,10 +1129,17 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
return NULL;
}

rd_kafka_topic_partition_list_add(c_parts,
tp->topic,
tp->partition)->offset =
tp->offset;
rktpar = rd_kafka_topic_partition_list_add(c_parts,
tp->topic,
tp->partition);
rktpar->offset = tp->offset;
if (tp->metadata != NULL) {
rktpar->metadata_size = strlen(tp->metadata) + 1;
rktpar->metadata = strdup(tp->metadata);
} else {
rktpar->metadata_size = 0;
rktpar->metadata = NULL;
}
}

return c_parts;
Expand Down
1 change: 1 addition & 0 deletions src/confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ typedef struct {
char *topic;
int partition;
int64_t offset;
char *metadata;
PyObject *error;
} TopicPartition;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2022 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limit

from confluent_kafka import TopicPartition


def commit_and_check(consumer, topic, metadata):
if metadata is None:
consumer.commit(offsets=[TopicPartition(topic, 0, 1)], asynchronous=False)
else:
consumer.commit(offsets=[TopicPartition(topic, 0, 1, metadata)], asynchronous=False)

offsets = consumer.committed([TopicPartition(topic, 0)], timeout=100)
assert len(offsets) == 1
milindl marked this conversation as resolved.
Show resolved Hide resolved
assert offsets[0].metadata == metadata


def test_consumer_topicpartition_metadata(kafka_cluster):
topic = kafka_cluster.create_topic("test_topicpartition")
consumer_conf = {'group.id': 'pytest'}

c = kafka_cluster.consumer(consumer_conf)

# Commit without any metadata.
metadata = None
commit_and_check(c, topic, metadata)

# Commit with only ASCII metadata.
metadata = 'hello world'
commit_and_check(c, topic, metadata)

# Commit with Unicode characters in metadata.
metadata = 'नमस्ते दुनिया'
milindl marked this conversation as resolved.
Show resolved Hide resolved
commit_and_check(c, topic, metadata)

# Commit with empty string as metadata.
metadata = ''
commit_and_check(c, topic, metadata)

# Commit with invalid metadata (with null byte in the middle).
metadata = 'xyz\x00abc'
try:
commit_and_check(c, topic, metadata)
# We should never reach this point, since the prior statement should throw.
assert False
except ValueError as ve:
assert 'embedded null character' in str(ve)

c.close()