From b45c3531180aff5a7d4e015beaa84d7f907184be Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Tue, 16 Apr 2024 16:40:51 +0530 Subject: [PATCH 1/6] Handle overflow in rd_buf_write_remains --- src/rdbuf.h | 6 +++++- tests/0011-produce_batch.c | 21 ++++++++++++++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/rdbuf.h b/src/rdbuf.h index 90d61401b0..a415f9d2ec 100644 --- a/src/rdbuf.h +++ b/src/rdbuf.h @@ -147,7 +147,11 @@ static RD_INLINE RD_UNUSED size_t rd_buf_write_pos(const rd_buf_t *rbuf) { * @returns the number of bytes available for writing (before growing). */ static RD_INLINE RD_UNUSED size_t rd_buf_write_remains(const rd_buf_t *rbuf) { - return rbuf->rbuf_size - (rbuf->rbuf_len + rbuf->rbuf_erased); + ssize_t remaining = + rbuf->rbuf_size - (rbuf->rbuf_len + rbuf->rbuf_erased); + if (remaining < 0) + return 0; + return remaining; } diff --git a/tests/0011-produce_batch.c b/tests/0011-produce_batch.c index 1507d76f9e..13b5d22538 100644 --- a/tests/0011-produce_batch.c +++ b/tests/0011-produce_batch.c @@ -90,6 +90,8 @@ static void test_single_partition(void) { int failcnt = 0; int i; rd_kafka_message_t *rkmessages; + const int topicSuffixLength = 56, clientIdLength = 239; + char *topicSuffix, *clientId; SUB_TEST_QUICK(); @@ -97,6 +99,13 @@ static void test_single_partition(void) { test_conf_init(&conf, &topic_conf, 20); + clientId = (char *)malloc(clientIdLength + 1 * sizeof(char)); + for (i = 0; i < clientIdLength; i++) { + clientId[i] = 'c'; + } + clientId[clientIdLength] = '\0'; + rd_kafka_conf_set(conf, "client.id", clientId, NULL, 0); + /* Set delivery report callback */ rd_kafka_conf_set_dr_cb(conf, dr_single_partition_cb); @@ -106,7 +115,14 @@ static void test_single_partition(void) { TEST_SAY("test_single_partition: Created kafka instance %s\n", rd_kafka_name(rk)); - rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf); + topicSuffix = (char *)malloc(topicSuffixLength + 1 * sizeof(char)); + for (i = 0; i < topicSuffixLength; i++) { + topicSuffix[i] = 'b'; + } + topicSuffix[topicSuffixLength] = '\0'; + + rkt = rd_kafka_topic_new(rk, test_mk_topic_name(topicSuffix, 0), + topic_conf); if (!rkt) TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); @@ -178,6 +194,9 @@ static void test_single_partition(void) { TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); rd_kafka_destroy(rk); + free(clientId); + free(topicSuffix); + SUB_TEST_PASS(); } From 7c017706c9ff8b7b72bf8fe3d1b7732a13192ef3 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Wed, 15 May 2024 11:26:27 +0530 Subject: [PATCH 2/6] PR Feedback --- src/rdbuf.c | 4 +++- src/rdbuf.h | 6 +----- tests/0011-produce_batch.c | 28 ++++++++++++++-------------- 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/src/rdbuf.c b/src/rdbuf.c index 6df64a9dee..88ec7912b5 100644 --- a/src/rdbuf.c +++ b/src/rdbuf.c @@ -665,8 +665,10 @@ size_t rd_buf_erase(rd_buf_t *rbuf, size_t absof, size_t size) { of += toerase; /* If segment is now empty, remove it */ - if (seg->seg_of == 0) + if (seg->seg_of == 0) { rd_buf_destroy_segment(rbuf, seg); + rbuf->rbuf_erased -= toerase; + } } /* Update absolute offset of remaining segments */ diff --git a/src/rdbuf.h b/src/rdbuf.h index a415f9d2ec..90d61401b0 100644 --- a/src/rdbuf.h +++ b/src/rdbuf.h @@ -147,11 +147,7 @@ static RD_INLINE RD_UNUSED size_t rd_buf_write_pos(const rd_buf_t *rbuf) { * @returns the number of bytes available for writing (before growing). */ static RD_INLINE RD_UNUSED size_t rd_buf_write_remains(const rd_buf_t *rbuf) { - ssize_t remaining = - rbuf->rbuf_size - (rbuf->rbuf_len + rbuf->rbuf_erased); - if (remaining < 0) - return 0; - return remaining; + return rbuf->rbuf_size - (rbuf->rbuf_len + rbuf->rbuf_erased); } diff --git a/tests/0011-produce_batch.c b/tests/0011-produce_batch.c index 13b5d22538..b73ad0345a 100644 --- a/tests/0011-produce_batch.c +++ b/tests/0011-produce_batch.c @@ -90,8 +90,8 @@ static void test_single_partition(void) { int failcnt = 0; int i; rd_kafka_message_t *rkmessages; - const int topicSuffixLength = 56, clientIdLength = 239; - char *topicSuffix, *clientId; + const int topic_suffix_length = 56, client_id_length = 239; + char *topic_suffix, *client_id; SUB_TEST_QUICK(); @@ -99,12 +99,12 @@ static void test_single_partition(void) { test_conf_init(&conf, &topic_conf, 20); - clientId = (char *)malloc(clientIdLength + 1 * sizeof(char)); - for (i = 0; i < clientIdLength; i++) { - clientId[i] = 'c'; + client_id = malloc((client_id_length + 1) * sizeof(char)); + for (i = 0; i < client_id_length; i++) { + client_id[i] = 'c'; } - clientId[clientIdLength] = '\0'; - rd_kafka_conf_set(conf, "client.id", clientId, NULL, 0); + client_id[client_id_length] = '\0'; + rd_kafka_conf_set(conf, "client.id", client_id, NULL, 0); /* Set delivery report callback */ rd_kafka_conf_set_dr_cb(conf, dr_single_partition_cb); @@ -115,13 +115,13 @@ static void test_single_partition(void) { TEST_SAY("test_single_partition: Created kafka instance %s\n", rd_kafka_name(rk)); - topicSuffix = (char *)malloc(topicSuffixLength + 1 * sizeof(char)); - for (i = 0; i < topicSuffixLength; i++) { - topicSuffix[i] = 'b'; + topic_suffix = (char *)malloc(topic_suffix_length + 1 * sizeof(char)); + for (i = 0; i < topic_suffix_length; i++) { + topic_suffix[i] = 'b'; } - topicSuffix[topicSuffixLength] = '\0'; + topic_suffix[topic_suffix_length] = '\0'; - rkt = rd_kafka_topic_new(rk, test_mk_topic_name(topicSuffix, 0), + rkt = rd_kafka_topic_new(rk, test_mk_topic_name(topic_suffix, 0), topic_conf); if (!rkt) TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); @@ -194,8 +194,8 @@ static void test_single_partition(void) { TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); rd_kafka_destroy(rk); - free(clientId); - free(topicSuffix); + free(client_id); + free(topic_suffix); SUB_TEST_PASS(); } From 4071f8bd79a5ee58b5a14d8002ce4c2097e886eb Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Tue, 4 Jun 2024 11:16:07 +0530 Subject: [PATCH 3/6] Add seg_erased --- src/rdbuf.c | 2 ++ src/rdbuf.h | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/rdbuf.c b/src/rdbuf.c index 88ec7912b5..a4b75b01e5 100644 --- a/src/rdbuf.c +++ b/src/rdbuf.c @@ -660,6 +660,7 @@ size_t rd_buf_erase(rd_buf_t *rbuf, size_t absof, size_t size) { segremains); seg->seg_of -= toerase; + seg->seg_erased += toerase; rbuf->rbuf_len -= toerase; of += toerase; @@ -712,6 +713,7 @@ int rd_buf_write_seek(rd_buf_t *rbuf, size_t absof) { rd_segment_t *this = next; next = TAILQ_PREV(this, rd_segment_head, seg_link); rd_buf_destroy_segment(rbuf, this); + rbuf->rbuf_erased -= seg->seg_erased; } /* Update relative write offset */ diff --git a/src/rdbuf.h b/src/rdbuf.h index 90d61401b0..d8f98422cc 100644 --- a/src/rdbuf.h +++ b/src/rdbuf.h @@ -70,6 +70,8 @@ typedef struct rd_segment_s { * beginning in the grand rd_buf_t */ void (*seg_free)(void *p); /**< Optional free function for seg_p */ int seg_flags; /**< Segment flags */ + size_t seg_erased; /** Total number of bytes erased from + * this segment. */ #define RD_SEGMENT_F_RDONLY 0x1 /**< Read-only segment */ #define RD_SEGMENT_F_FREE \ 0x2 /**< Free segment on destroy, \ From 396f7cc16a9353348f93cbc5d2c22520b46ce668 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Tue, 4 Jun 2024 11:23:31 +0530 Subject: [PATCH 4/6] Remove topic_suffix --- tests/0011-produce_batch.c | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/tests/0011-produce_batch.c b/tests/0011-produce_batch.c index b73ad0345a..07a7fd358e 100644 --- a/tests/0011-produce_batch.c +++ b/tests/0011-produce_batch.c @@ -90,8 +90,8 @@ static void test_single_partition(void) { int failcnt = 0; int i; rd_kafka_message_t *rkmessages; - const int topic_suffix_length = 56, client_id_length = 239; - char *topic_suffix, *client_id; + const int client_id_length = 239; + char *client_id; SUB_TEST_QUICK(); @@ -115,14 +115,7 @@ static void test_single_partition(void) { TEST_SAY("test_single_partition: Created kafka instance %s\n", rd_kafka_name(rk)); - topic_suffix = (char *)malloc(topic_suffix_length + 1 * sizeof(char)); - for (i = 0; i < topic_suffix_length; i++) { - topic_suffix[i] = 'b'; - } - topic_suffix[topic_suffix_length] = '\0'; - - rkt = rd_kafka_topic_new(rk, test_mk_topic_name(topic_suffix, 0), - topic_conf); + rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf); if (!rkt) TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); @@ -195,7 +188,6 @@ static void test_single_partition(void) { rd_kafka_destroy(rk); free(client_id); - free(topic_suffix); SUB_TEST_PASS(); } From 48425036469509428d7d5e555fb6a32cd08baa18 Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Mon, 10 Jun 2024 15:32:37 +0530 Subject: [PATCH 5/6] PR feedback --- CHANGELOG.md | 4 ++++ src/rdbuf.c | 4 ++-- tests/0011-produce_batch.c | 17 +++++++---------- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cbdf0fbc9f..a831553adb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# vNext + +* Fix segfault when using long client id because of erased segment when using flexver. (#4689) + # librdkafka v2.4.0 librdkafka v2.4.0 is a feature release: diff --git a/src/rdbuf.c b/src/rdbuf.c index a4b75b01e5..427d632eb7 100644 --- a/src/rdbuf.c +++ b/src/rdbuf.c @@ -667,8 +667,8 @@ size_t rd_buf_erase(rd_buf_t *rbuf, size_t absof, size_t size) { /* If segment is now empty, remove it */ if (seg->seg_of == 0) { + rbuf->rbuf_erased -= seg->seg_erased; rd_buf_destroy_segment(rbuf, seg); - rbuf->rbuf_erased -= toerase; } } @@ -712,8 +712,8 @@ int rd_buf_write_seek(rd_buf_t *rbuf, size_t absof) { next != seg;) { rd_segment_t *this = next; next = TAILQ_PREV(this, rd_segment_head, seg_link); + rbuf->rbuf_erased -= this->seg_erased; rd_buf_destroy_segment(rbuf, this); - rbuf->rbuf_erased -= seg->seg_erased; } /* Update relative write offset */ diff --git a/tests/0011-produce_batch.c b/tests/0011-produce_batch.c index 07a7fd358e..f745a6d310 100644 --- a/tests/0011-produce_batch.c +++ b/tests/0011-produce_batch.c @@ -90,20 +90,19 @@ static void test_single_partition(void) { int failcnt = 0; int i; rd_kafka_message_t *rkmessages; - const int client_id_length = 239; - char *client_id; - + char client_id[271]; SUB_TEST_QUICK(); msgid_next = 0; test_conf_init(&conf, &topic_conf, 20); - client_id = malloc((client_id_length + 1) * sizeof(char)); - for (i = 0; i < client_id_length; i++) { - client_id[i] = 'c'; - } - client_id[client_id_length] = '\0'; + /* A long client id must not cause a segmentation fault + * because of an erased segment when using flexver. + * See: + * https://github.com/confluentinc/confluent-kafka-dotnet/issues/2084 */ + memset(client_id, 'c', sizeof(client_id) - 1); + client_id[sizeof(client_id) - 1] = '\0'; rd_kafka_conf_set(conf, "client.id", client_id, NULL, 0); /* Set delivery report callback */ @@ -187,8 +186,6 @@ static void test_single_partition(void) { TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); rd_kafka_destroy(rk); - free(client_id); - SUB_TEST_PASS(); } From 48170e3c1fa2a888c9de6a98fe923f269a36c23c Mon Sep 17 00:00:00 2001 From: Anchit Jain Date: Mon, 10 Jun 2024 23:01:04 +0530 Subject: [PATCH 6/6] Update CHANGELOG.md --- CHANGELOG.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dae9277520..2de9f83148 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,17 +2,24 @@ librdkafka v2.5.0 is a feature release. +* Fix segfault when using long client id because of erased segment when using flexver. (#4689) + + ## Enhancements * Update bundled lz4 (used when `./configure --disable-lz4-ext`) to [v1.9.4](https://github.com/lz4/lz4/releases/tag/v1.9.4), which contains bugfixes and performance improvements (#4726). + ## Fixes ### General fixes - * Fix segfault when using long client id because of erased segment when using flexver. (#4689) +* Issues: [confluentinc/confluent-kafka-dotnet#2084](https://github.com/confluentinc/confluent-kafka-dotnet/issues/2084) + Fix segfault when a segment is erased and more data is written to the buffer. + Happens since 1.x when a portion of the buffer (segment) is erased for flexver or compression. + More likely to happen since 2.1.0, because of the upgrades to flexver, with certain string sizes like a long client id (#4689).