Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata to TopicPartition #1410

Merged
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Confluent's Python client for Apache Kafka


## v1.10.0
milindl marked this conversation as resolved.
Show resolved Hide resolved

- Add metadata to TopicPartition type and commit() (TODO(milind) - fill in PR number here once created).


## v1.9.2

v1.9.2 is a maintenance release with the following fixes and enhancements:
Expand Down
52 changes: 40 additions & 12 deletions src/confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -816,15 +816,27 @@ static int TopicPartition_clear (TopicPartition *self) {
Py_DECREF(self->error);
self->error = NULL;
}
if (self->metadata) {
free(self->metadata);
self->metadata = NULL;
}
return 0;
}

static void TopicPartition_setup (TopicPartition *self, const char *topic,
int partition, long long offset,
const char *metadata,
milindl marked this conversation as resolved.
Show resolved Hide resolved
rd_kafka_resp_err_t err) {
self->topic = strdup(topic);
self->partition = partition;
self->offset = offset;

if (metadata != NULL) {
self->metadata = strdup(metadata);
} else {
self->metadata = NULL;
}

self->error = KafkaError_new_or_None(err, NULL);
}

Expand All @@ -843,18 +855,23 @@ static int TopicPartition_init (PyObject *self, PyObject *args,
const char *topic;
int partition = RD_KAFKA_PARTITION_UA;
long long offset = RD_KAFKA_OFFSET_INVALID;
const char *metadata = NULL;

static char *kws[] = { "topic",
"partition",
"offset",
"metadata",
NULL };

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iL", kws,
&topic, &partition, &offset))
return -1;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iLs", kws,
milindl marked this conversation as resolved.
Show resolved Hide resolved
&topic, &partition, &offset,
&metadata)) {
return -1;

TopicPartition_setup((TopicPartition *)self,
topic, partition, offset, 0);
}

TopicPartition_setup((TopicPartition *)self,
topic, partition, offset, metadata, 0);
return 0;
}

Expand Down Expand Up @@ -889,6 +906,9 @@ static PyMemberDef TopicPartition_members[] = {
" :py:const:`OFFSET_STORED`,"
" :py:const:`OFFSET_INVALID`\n"
},
{"metadata", T_STRING, offsetof(TopicPartition, metadata), READONLY,
"attribute metadata: Optional application metadata committed with the "
"offset (string)"},
{ "error", T_OBJECT, offsetof(TopicPartition, error), READONLY,
":attribute error: Indicates an error (with :py:class:`KafkaError`) unless None." },
{ NULL }
Expand Down Expand Up @@ -1038,14 +1058,15 @@ PyTypeObject TopicPartitionType = {
* @brief Internal factory to create a TopicPartition object.
*/
static PyObject *TopicPartition_new0 (const char *topic, int partition,
long long offset,
long long offset, const char *metadata,
rd_kafka_resp_err_t err) {
TopicPartition *self;

self = (TopicPartition *)TopicPartitionType.tp_new(
&TopicPartitionType, NULL, NULL);

TopicPartition_setup(self, topic, partition, offset, err);
TopicPartition_setup(self, topic, partition,
offset, metadata, err);

return (PyObject *)self;
}
Expand All @@ -1069,7 +1090,9 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
PyList_SET_ITEM(parts, i,
TopicPartition_new0(
rktpar->topic, rktpar->partition,
rktpar->offset, rktpar->err));
rktpar->offset,
rktpar->metadata,
rktpar->err));
}

return parts;
Expand Down Expand Up @@ -1106,10 +1129,15 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
return NULL;
}

rd_kafka_topic_partition_list_add(c_parts,
tp->topic,
tp->partition)->offset =
tp->offset;
rd_kafka_topic_partition_t *rktpar =
milindl marked this conversation as resolved.
Show resolved Hide resolved
rd_kafka_topic_partition_list_add(c_parts,
tp->topic,
tp->partition);
rktpar->offset = tp->offset;
if (tp->metadata != NULL) {
rktpar->metadata_size = strlen(tp->metadata) + 1;
rktpar->metadata = strdup(tp->metadata);
}
}

return c_parts;
Expand Down
1 change: 1 addition & 0 deletions src/confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ typedef struct {
char *topic;
int partition;
int64_t offset;
char *metadata;
PyObject *error;
} TopicPartition;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2021 Confluent Inc.
milindl marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limit
#

from confluent_kafka import TopicPartition, KafkaException

def commit_and_check(consumer, topic, metadata):
consumer.commit(offsets=[TopicPartition(topic, 0, 1, metadata)], asynchronous=False)
offsets = consumer.committed([TopicPartition(topic, 0)], timeout=100)
assert len(offsets) == 1
milindl marked this conversation as resolved.
Show resolved Hide resolved
assert offsets[0].metadata == metadata

def test_consumer_topicpartition_metadata(kafka_cluster):

topic = kafka_cluster.create_topic("test_topicpartition")
consumer_conf = {'group.id': 'pytest'}

c = kafka_cluster.consumer(consumer_conf)

# Commit without any metadata.
c.commit(offsets=[TopicPartition(topic, 0, 0)], asynchronous=False)
offsets = c.committed([TopicPartition(topic, 0)])
assert len(offsets) == 1
assert offsets[0].metadata is None

# Commit with only ASCII metadata.
metadata = 'hello world'
commit_and_check(c, topic, metadata)

# Commit with Unicode characters in metadata.
metadata = 'नमस्ते दुनिया'
milindl marked this conversation as resolved.
Show resolved Hide resolved
commit_and_check(c, topic, metadata)

# Commit with invalid metadata (with null byte in the middle).
metadata = 'xyz\x00abc'
try:
commit_and_check(c, topic, metadata)
except ValueError as ve:
assert 'embedded null character' in str(ve)

c.close()