Skip to content

Commit

Permalink
Add metadata to TopicPartition (#1410)
Browse files Browse the repository at this point in the history
A Kafka offset commit message can include optional metadata,
this adds support for it in this client.

Co-authored-by: Jing Liu <jl5311@nyu.edu>
  • Loading branch information
milindl and jliunyu committed Oct 11, 2022
1 parent 6cd2e73 commit fb76d8e
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 11 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Confluent's Python client for Apache Kafka


## v1.10.0

- Add metadata to TopicPartition type and commit() (#1410).


## v1.9.2

v1.9.2 is a maintenance release with the following fixes and enhancements:
Expand Down
52 changes: 41 additions & 11 deletions src/confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -816,15 +816,27 @@ static int TopicPartition_clear (TopicPartition *self) {
Py_DECREF(self->error);
self->error = NULL;
}
if (self->metadata) {
free(self->metadata);
self->metadata = NULL;
}
return 0;
}

static void TopicPartition_setup (TopicPartition *self, const char *topic,
int partition, long long offset,
const char *metadata,
rd_kafka_resp_err_t err) {
self->topic = strdup(topic);
self->partition = partition;
self->offset = offset;

if (metadata != NULL) {
self->metadata = strdup(metadata);
} else {
self->metadata = NULL;
}

self->error = KafkaError_new_or_None(err, NULL);
}

Expand All @@ -843,18 +855,22 @@ static int TopicPartition_init (PyObject *self, PyObject *args,
const char *topic;
int partition = RD_KAFKA_PARTITION_UA;
long long offset = RD_KAFKA_OFFSET_INVALID;
const char *metadata = NULL;

static char *kws[] = { "topic",
"partition",
"offset",
"metadata",
NULL };

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iL", kws,
&topic, &partition, &offset))
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iLs", kws,
&topic, &partition, &offset,
&metadata)) {
return -1;
}

TopicPartition_setup((TopicPartition *)self,
topic, partition, offset, 0);

topic, partition, offset, metadata, 0);
return 0;
}

Expand Down Expand Up @@ -889,6 +905,9 @@ static PyMemberDef TopicPartition_members[] = {
" :py:const:`OFFSET_STORED`,"
" :py:const:`OFFSET_INVALID`\n"
},
{"metadata", T_STRING, offsetof(TopicPartition, metadata), READONLY,
"attribute metadata: Optional application metadata committed with the "
"offset (string)"},
{ "error", T_OBJECT, offsetof(TopicPartition, error), READONLY,
":attribute error: Indicates an error (with :py:class:`KafkaError`) unless None." },
{ NULL }
Expand Down Expand Up @@ -1038,14 +1057,15 @@ PyTypeObject TopicPartitionType = {
* @brief Internal factory to create a TopicPartition object.
*/
static PyObject *TopicPartition_new0 (const char *topic, int partition,
long long offset,
long long offset, const char *metadata,
rd_kafka_resp_err_t err) {
TopicPartition *self;

self = (TopicPartition *)TopicPartitionType.tp_new(
&TopicPartitionType, NULL, NULL);

TopicPartition_setup(self, topic, partition, offset, err);
TopicPartition_setup(self, topic, partition,
offset, metadata, err);

return (PyObject *)self;
}
Expand All @@ -1069,7 +1089,9 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
PyList_SET_ITEM(parts, i,
TopicPartition_new0(
rktpar->topic, rktpar->partition,
rktpar->offset, rktpar->err));
rktpar->offset,
rktpar->metadata,
rktpar->err));
}

return parts;
Expand All @@ -1094,6 +1116,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist));

for (i = 0 ; i < (size_t)PyList_Size(plist) ; i++) {
rd_kafka_topic_partition_t *rktpar;
TopicPartition *tp = (TopicPartition *)
PyList_GetItem(plist, i);

Expand All @@ -1106,10 +1129,17 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
return NULL;
}

rd_kafka_topic_partition_list_add(c_parts,
tp->topic,
tp->partition)->offset =
tp->offset;
rktpar = rd_kafka_topic_partition_list_add(c_parts,
tp->topic,
tp->partition);
rktpar->offset = tp->offset;
if (tp->metadata != NULL) {
rktpar->metadata_size = strlen(tp->metadata) + 1;
rktpar->metadata = strdup(tp->metadata);
} else {
rktpar->metadata_size = 0;
rktpar->metadata = NULL;
}
}

return c_parts;
Expand Down
1 change: 1 addition & 0 deletions src/confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ typedef struct {
char *topic;
int partition;
int64_t offset;
char *metadata;
PyObject *error;
} TopicPartition;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2022 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limit

from confluent_kafka import TopicPartition


def commit_and_check(consumer, topic, metadata):
if metadata is None:
consumer.commit(offsets=[TopicPartition(topic, 0, 1)], asynchronous=False)
else:
consumer.commit(offsets=[TopicPartition(topic, 0, 1, metadata)], asynchronous=False)

offsets = consumer.committed([TopicPartition(topic, 0)], timeout=100)
assert len(offsets) == 1
assert offsets[0].metadata == metadata


def test_consumer_topicpartition_metadata(kafka_cluster):
topic = kafka_cluster.create_topic("test_topicpartition")
consumer_conf = {'group.id': 'pytest'}

c = kafka_cluster.consumer(consumer_conf)

# Commit without any metadata.
metadata = None
commit_and_check(c, topic, metadata)

# Commit with only ASCII metadata.
metadata = 'hello world'
commit_and_check(c, topic, metadata)

# Commit with Unicode characters in metadata.
metadata = 'नमस्ते दुनिया'
commit_and_check(c, topic, metadata)

# Commit with empty string as metadata.
metadata = ''
commit_and_check(c, topic, metadata)

# Commit with invalid metadata (with null byte in the middle).
metadata = 'xyz\x00abc'
try:
commit_and_check(c, topic, metadata)
# We should never reach this point, since the prior statement should throw.
assert False
except ValueError as ve:
assert 'embedded null character' in str(ve)

c.close()

0 comments on commit fb76d8e

Please sign in to comment.