From fb76d8ef16527358f9818e87315366e1c78440a5 Mon Sep 17 00:00:00 2001 From: Milind L Date: Tue, 11 Oct 2022 12:32:23 +0530 Subject: [PATCH] Add metadata to TopicPartition (#1410) A Kafka offset commit message can include optional metadata, this adds support for it in this client. Co-authored-by: Jing Liu --- CHANGELOG.md | 6 ++ src/confluent_kafka/src/confluent_kafka.c | 52 +++++++++++---- src/confluent_kafka/src/confluent_kafka.h | 1 + .../test_consumer_topicpartition_metadata.py | 63 +++++++++++++++++++ 4 files changed, 111 insertions(+), 11 deletions(-) create mode 100644 tests/integration/consumer/test_consumer_topicpartition_metadata.py diff --git a/CHANGELOG.md b/CHANGELOG.md index d5e2a3b86..dae7d54ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Confluent's Python client for Apache Kafka + +## v1.10.0 + +- Add metadata to TopicPartition type and commit() (#1410). + + ## v1.9.2 v1.9.2 is a maintenance release with the following fixes and enhancements: diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index e2329f706..bd57f2877 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -816,15 +816,27 @@ static int TopicPartition_clear (TopicPartition *self) { Py_DECREF(self->error); self->error = NULL; } + if (self->metadata) { + free(self->metadata); + self->metadata = NULL; + } return 0; } static void TopicPartition_setup (TopicPartition *self, const char *topic, int partition, long long offset, + const char *metadata, rd_kafka_resp_err_t err) { self->topic = strdup(topic); self->partition = partition; self->offset = offset; + + if (metadata != NULL) { + self->metadata = strdup(metadata); + } else { + self->metadata = NULL; + } + self->error = KafkaError_new_or_None(err, NULL); } @@ -843,18 +855,22 @@ static int TopicPartition_init (PyObject *self, PyObject *args, const char *topic; int partition = RD_KAFKA_PARTITION_UA; long long offset = RD_KAFKA_OFFSET_INVALID; + const char *metadata = NULL; + static char *kws[] = { "topic", "partition", "offset", + "metadata", NULL }; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iL", kws, - &topic, &partition, &offset)) + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iLs", kws, + &topic, &partition, &offset, + &metadata)) { return -1; + } TopicPartition_setup((TopicPartition *)self, - topic, partition, offset, 0); - + topic, partition, offset, metadata, 0); return 0; } @@ -889,6 +905,9 @@ static PyMemberDef TopicPartition_members[] = { " :py:const:`OFFSET_STORED`," " :py:const:`OFFSET_INVALID`\n" }, + {"metadata", T_STRING, offsetof(TopicPartition, metadata), READONLY, + "attribute metadata: Optional application metadata committed with the " + "offset (string)"}, { "error", T_OBJECT, offsetof(TopicPartition, error), READONLY, ":attribute error: Indicates an error (with :py:class:`KafkaError`) unless None." }, { NULL } @@ -1038,14 +1057,15 @@ PyTypeObject TopicPartitionType = { * @brief Internal factory to create a TopicPartition object. */ static PyObject *TopicPartition_new0 (const char *topic, int partition, - long long offset, + long long offset, const char *metadata, rd_kafka_resp_err_t err) { TopicPartition *self; self = (TopicPartition *)TopicPartitionType.tp_new( &TopicPartitionType, NULL, NULL); - TopicPartition_setup(self, topic, partition, offset, err); + TopicPartition_setup(self, topic, partition, + offset, metadata, err); return (PyObject *)self; } @@ -1069,7 +1089,9 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) { PyList_SET_ITEM(parts, i, TopicPartition_new0( rktpar->topic, rktpar->partition, - rktpar->offset, rktpar->err)); + rktpar->offset, + rktpar->metadata, + rktpar->err)); } return parts; @@ -1094,6 +1116,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) { c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist)); for (i = 0 ; i < (size_t)PyList_Size(plist) ; i++) { + rd_kafka_topic_partition_t *rktpar; TopicPartition *tp = (TopicPartition *) PyList_GetItem(plist, i); @@ -1106,10 +1129,17 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) { return NULL; } - rd_kafka_topic_partition_list_add(c_parts, - tp->topic, - tp->partition)->offset = - tp->offset; + rktpar = rd_kafka_topic_partition_list_add(c_parts, + tp->topic, + tp->partition); + rktpar->offset = tp->offset; + if (tp->metadata != NULL) { + rktpar->metadata_size = strlen(tp->metadata) + 1; + rktpar->metadata = strdup(tp->metadata); + } else { + rktpar->metadata_size = 0; + rktpar->metadata = NULL; + } } return c_parts; diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index 07086d715..45aba2f9e 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -352,6 +352,7 @@ typedef struct { char *topic; int partition; int64_t offset; + char *metadata; PyObject *error; } TopicPartition; diff --git a/tests/integration/consumer/test_consumer_topicpartition_metadata.py b/tests/integration/consumer/test_consumer_topicpartition_metadata.py new file mode 100644 index 000000000..4c01c1df7 --- /dev/null +++ b/tests/integration/consumer/test_consumer_topicpartition_metadata.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2022 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limit + +from confluent_kafka import TopicPartition + + +def commit_and_check(consumer, topic, metadata): + if metadata is None: + consumer.commit(offsets=[TopicPartition(topic, 0, 1)], asynchronous=False) + else: + consumer.commit(offsets=[TopicPartition(topic, 0, 1, metadata)], asynchronous=False) + + offsets = consumer.committed([TopicPartition(topic, 0)], timeout=100) + assert len(offsets) == 1 + assert offsets[0].metadata == metadata + + +def test_consumer_topicpartition_metadata(kafka_cluster): + topic = kafka_cluster.create_topic("test_topicpartition") + consumer_conf = {'group.id': 'pytest'} + + c = kafka_cluster.consumer(consumer_conf) + + # Commit without any metadata. + metadata = None + commit_and_check(c, topic, metadata) + + # Commit with only ASCII metadata. + metadata = 'hello world' + commit_and_check(c, topic, metadata) + + # Commit with Unicode characters in metadata. + metadata = 'नमस्ते दुनिया' + commit_and_check(c, topic, metadata) + + # Commit with empty string as metadata. + metadata = '' + commit_and_check(c, topic, metadata) + + # Commit with invalid metadata (with null byte in the middle). + metadata = 'xyz\x00abc' + try: + commit_and_check(c, topic, metadata) + # We should never reach this point, since the prior statement should throw. + assert False + except ValueError as ve: + assert 'embedded null character' in str(ve) + + c.close()