Skip to content

Commit

Permalink
Add mock tests for unknown topic id
Browse files Browse the repository at this point in the history
in metadata request and partial reconciliation
  • Loading branch information
emasab committed Apr 15, 2024
1 parent 0cf0ec8 commit 2f8a20a
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 13 deletions.
6 changes: 5 additions & 1 deletion src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,11 @@ rd_kafka_mock_buf_write_Metadata_Topic(rd_kafka_mock_cluster_t *mcluster,
/* Response: Topics.ErrorCode */
rd_kafka_buf_write_i16(resp, err);
/* Response: Topics.Name */
rd_kafka_buf_write_str(resp, topic, -1);
if (ApiVersion >= 12 && err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID) {
rd_kafka_buf_write_str(resp, NULL, -1);
} else {
rd_kafka_buf_write_str(resp, topic, -1);
}

if (ApiVersion >= 10) {
if (mtopic) {
Expand Down
113 changes: 101 additions & 12 deletions tests/0147-consumer_group_consumer_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,16 @@ static int wait_all_heartbeats_done(rd_kafka_mock_cluster_t *mcluster,
return current_heartbeats;
}

static rd_kafka_t *create_consumer(const char *bootstraps, const char *topic) {
static rd_kafka_t *create_consumer(const char *bootstraps,
const char *topic,
rd_bool_t with_rebalance_cb) {
rd_kafka_conf_t *conf;
test_conf_init(&conf, NULL, 0);
test_conf_set(conf, "bootstrap.servers", bootstraps);
test_conf_set(conf, "group.protocol", "consumer");
test_conf_set(conf, "auto.offset.reset", "earliest");
return test_create_consumer(topic, rebalance_cb, conf, NULL);
return test_create_consumer(
topic, with_rebalance_cb ? rebalance_cb : NULL, conf, NULL);
}

/**
Expand All @@ -149,7 +152,7 @@ static rd_kafka_t *create_consumer(const char *bootstraps, const char *topic) {
* - no final leave group heartbeat is sent
*
* @param err The error code to test.
* @param variation See test main.
* @param variation See calling code.
*/
static void
do_test_consumer_group_heartbeat_fatal_error(rd_kafka_resp_err_t err,
Expand Down Expand Up @@ -184,7 +187,7 @@ do_test_consumer_group_heartbeat_fatal_error(rd_kafka_resp_err_t err,
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, 1, RD_KAFKAP_ConsumerGroupHeartbeat, 1, err, 0);

c = create_consumer(bootstraps, topic);
c = create_consumer(bootstraps, topic, rd_true);

/* Subscribe to the input topic */
subscription = rd_kafka_topic_partition_list_new(1);
Expand Down Expand Up @@ -261,6 +264,8 @@ do_test_consumer_group_heartbeat_fatal_error(rd_kafka_resp_err_t err,

/**
* @brief Test all kind of fatal errors in a ConsumerGroupHeartbeat call.
* variation 0: errors on first HB
* variation 1: errors on second HB
*/
static void do_test_consumer_group_heartbeat_fatal_errors(void) {
rd_kafka_resp_err_t fatal_errors[] = {
Expand All @@ -287,7 +292,7 @@ static void do_test_consumer_group_heartbeat_fatal_errors(void) {
* - final leave group heartbeat is sent
*
* @param err The error code to test.
* @param variation See test main.
* @param variation See calling code.
*/
static void
do_test_consumer_group_heartbeat_retriable_error(rd_kafka_resp_err_t err,
Expand All @@ -311,7 +316,7 @@ do_test_consumer_group_heartbeat_retriable_error(rd_kafka_resp_err_t err,
rd_kafka_mock_set_default_heartbeat_interval(mcluster, 500);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);

c = create_consumer(bootstraps, topic);
c = create_consumer(bootstraps, topic, rd_true);

TIMING_START(&timing, "consumer_group_heartbeat_retriable_error");

Expand Down Expand Up @@ -390,6 +395,8 @@ do_test_consumer_group_heartbeat_retriable_error(rd_kafka_resp_err_t err,

/**
* @brief Test all kind of retriable errors in a ConsumerGroupHeartbeat call.
* variation 0: errors on first HB
* variation 1: errors on second HB
*/
static void do_test_consumer_group_heartbeat_retriable_errors(void) {
rd_kafka_resp_err_t retriable_errors[] = {
Expand All @@ -413,7 +420,7 @@ static void do_test_consumer_group_heartbeat_retriable_errors(void) {
* - a final leave group heartbeat is sent
*
* @param err The error code to test.
* @param variation See test main.
* @param variation See calling code.
*/
static void
do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,
Expand Down Expand Up @@ -448,7 +455,7 @@ do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, 1, RD_KAFKAP_ConsumerGroupHeartbeat, 1, err, 0);

c = create_consumer(bootstraps, topic);
c = create_consumer(bootstraps, topic, rd_true);

/* Subscribe to the input topic */
subscription = rd_kafka_topic_partition_list_new(1);
Expand Down Expand Up @@ -552,7 +559,9 @@ do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,

/**
* @brief Test all kind of consumer fenced errors in a ConsumerGroupHeartbeat
* call.
* call.
* variation 0: errors on first HB
* variation 1: errors on second HB
*/
static void do_test_consumer_group_heartbeat_fenced_errors(void) {
rd_kafka_resp_err_t fenced_errors[] = {
Expand All @@ -567,17 +576,97 @@ static void do_test_consumer_group_heartbeat_fenced_errors(void) {
}
}

/**
* @brief Test consumer group behavior with missing topic id when retrieving
* metadata for assigned topics.
* ensuring:
* - initially a partial acknoledgement is started, with an empty list
* (variation 0) or a single topic (variation 1)
* - fetch doesn't start until broker returns an unknown topic id error
* - when error isn't returned anymore the client finishes assigning
* the partition and reads a message.
*
* @param variation See calling code.
*/
static void do_test_metadata_unknown_topic_id_error(int variation) {
rd_kafka_mock_cluster_t *mcluster;
const char *bootstraps;
rd_kafka_topic_partition_list_t *subscription;
rd_kafka_t *c;
test_timing_t timing;
const char *topic = "do_test_metadata_unknown_topic_id_error";
const char *topic2 = "do_test_metadata_unknown_topic_id_error2";

SUB_TEST_QUICK("variation: %d", variation);

mcluster = test_mock_cluster_new(1, &bootstraps);
rd_kafka_mock_set_default_heartbeat_interval(mcluster, 500);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
if (variation == 1) {
rd_kafka_mock_topic_create(mcluster, topic2, 1, 1);
}

c = create_consumer(bootstraps, topic, rd_false);

/* Seed the topic with messages */
test_produce_msgs_easy_v(topic, 0, 0, 0, 1, 1000, "bootstrap.servers",
bootstraps, NULL);

TIMING_START(&timing, "do_test_metadata_unknown_topic_id_error");

/* Subscribe to the input topic */
subscription = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(subscription, topic,
RD_KAFKA_PARTITION_UA);
if (variation == 1) {
rd_kafka_topic_partition_list_add(subscription, topic2,
RD_KAFKA_PARTITION_UA);
}

rd_kafka_mock_topic_set_error(mcluster, topic,
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID);

TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription));
rd_kafka_topic_partition_list_destroy(subscription);

/* Cannot fetch until Metadata calls replies with UNKNOWN_TOPIC_ID */
test_consumer_poll_no_msgs("no messages", c, 0, 1000);

rd_kafka_mock_topic_set_error(mcluster, topic,
RD_KAFKA_RESP_ERR_NO_ERROR);

/* Fetch the message */
test_consumer_poll_timeout("message", c, 0, 0, 0, 1, NULL, 2000);

rd_kafka_destroy(c);
test_mock_cluster_destroy(mcluster);

TIMING_ASSERT(&timing, 500, 4000);
SUB_TEST_PASS();
}

/**
* @brief Test these variations of a UNKNOWN_TOPIC_ID in a Metadata call
* before reconciliation.
*
* variation 0: single topic
* variation 1: two topics: first gives this error, second exists.
*/
static void do_test_metadata_unknown_topic_id_tests(void) {
do_test_metadata_unknown_topic_id_error(0);
do_test_metadata_unknown_topic_id_error(1);
}

int main_0147_consumer_group_consumer_mock(int argc, char **argv) {
TEST_SKIP_MOCK_CLUSTER(0);

/* variation 0: errors on first HB
* variation 1: errors on second HB */

do_test_consumer_group_heartbeat_fatal_errors();

do_test_consumer_group_heartbeat_retriable_errors();

do_test_consumer_group_heartbeat_fenced_errors();

do_test_metadata_unknown_topic_id_tests();

return 0;
}

0 comments on commit 2f8a20a

Please sign in to comment.