Skip to content

Commit

Permalink
Make test 0147 more reliable
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Apr 15, 2024
1 parent 2f8a20a commit a047935
Showing 1 changed file with 61 additions and 57 deletions.
118 changes: 61 additions & 57 deletions tests/0147-consumer_group_consumer_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ static void rebalance_cb(rd_kafka_t *rk,
rd_kafka_yield(rk);
}

static rd_bool_t is_heartbeat_request(rd_kafka_mock_request_t *request,
void *opaque) {
return rd_kafka_mock_request_api_key(request) ==
RD_KAFKAP_ConsumerGroupHeartbeat;
}

/**
* @brief Wait at least \p num heartbeats
* have been received by the mock cluster
Expand All @@ -105,31 +111,8 @@ static void rebalance_cb(rd_kafka_t *rk,
static int wait_all_heartbeats_done(rd_kafka_mock_cluster_t *mcluster,
int num,
int confidence_interval) {
size_t i;
rd_kafka_mock_request_t **requests;
size_t request_cnt;
int current_heartbeats = 0;
rd_bool_t last_time = rd_true;

while (current_heartbeats < num || last_time) {
if (current_heartbeats >= num) {
rd_usleep(confidence_interval * 1000, 0);
last_time = rd_false;
}
requests = rd_kafka_mock_get_requests(mcluster, &request_cnt);
current_heartbeats = 0;
for (i = 0; i < request_cnt; i++) {
if (rd_kafka_mock_request_api_key(requests[i]) ==
RD_KAFKAP_ConsumerGroupHeartbeat)
current_heartbeats++;
}
rd_kafka_mock_request_destroy_array(requests, request_cnt);
rd_usleep(100 * 1000, 0);
}
if (test_on_ci && current_heartbeats == num + 1) {
return num;
}
return current_heartbeats;
return test_mock_wait_maching_requests(
mcluster, num, confidence_interval, is_heartbeat_request, NULL);
}

static rd_kafka_t *create_consumer(const char *bootstraps,
Expand Down Expand Up @@ -172,7 +155,7 @@ do_test_consumer_group_heartbeat_fatal_error(rd_kafka_resp_err_t err,
SUB_TEST_QUICK("%s, variation %d", rd_kafka_err2name(err), variation);

mcluster = test_mock_cluster_new(1, &bootstraps);
rd_kafka_mock_set_default_heartbeat_interval(mcluster, 500);
rd_kafka_mock_set_default_heartbeat_interval(mcluster, 1000);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);

TIMING_START(&timing, "consumer_group_heartbeat_fatal_error");
Expand All @@ -196,13 +179,16 @@ do_test_consumer_group_heartbeat_fatal_error(rd_kafka_resp_err_t err,
* rd_kafka_subscribe() */
RD_KAFKA_PARTITION_UA);

TEST_SAY("Subscribing to topic\n");
rd_kafka_mock_start_request_tracking(mcluster);
TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription));
rd_kafka_topic_partition_list_destroy(subscription);

expected_heartbeats = 1;
if (variation == 1)
expected_heartbeats++;

TEST_SAY("Awaiting first HBs\n");
TEST_ASSERT((found_heartbeats =
wait_all_heartbeats_done(mcluster, expected_heartbeats,
200)) == expected_heartbeats,
Expand All @@ -221,7 +207,7 @@ do_test_consumer_group_heartbeat_fatal_error(rd_kafka_resp_err_t err,
TEST_ASSERT(!rkmessage, "No message should be returned");
}

/* Consume from c, a fatal error is returned */
TEST_SAY("Consume from c, a fatal error is returned\n");
rkmessage = rd_kafka_consumer_poll(c, 500);
TEST_ASSERT(rkmessage != NULL, "An error message should be returned");
TEST_ASSERT(rkmessage->err == RD_KAFKA_RESP_ERR__FATAL,
Expand All @@ -248,7 +234,7 @@ do_test_consumer_group_heartbeat_fatal_error(rd_kafka_resp_err_t err,
"Expected %d rebalance events, got %d",
expected_rebalance_cnt, rebalance_cnt);

/* After closing the consumer, no heartbeat should have been sent */
TEST_SAY("Ensuring there are no leave group HBs\n");
TEST_ASSERT((found_heartbeats =
wait_all_heartbeats_done(mcluster, 0, 200)) == 0,
"Expected no leave group heartbeat, got %d",
Expand Down Expand Up @@ -301,7 +287,7 @@ do_test_consumer_group_heartbeat_retriable_error(rd_kafka_resp_err_t err,
const char *bootstraps;
rd_kafka_topic_partition_list_t *subscription;
rd_kafka_t *c;
int expected_heartbeats, found_heartbeats, observation_window_ms;
int expected_heartbeats, found_heartbeats;
test_timing_t timing;
const char *topic = test_mk_topic_name(__FUNCTION__, 0);
test_curr->is_fatal_cb = error_is_fatal_cb;
Expand All @@ -313,7 +299,7 @@ do_test_consumer_group_heartbeat_retriable_error(rd_kafka_resp_err_t err,


mcluster = test_mock_cluster_new(1, &bootstraps);
rd_kafka_mock_set_default_heartbeat_interval(mcluster, 500);
rd_kafka_mock_set_default_heartbeat_interval(mcluster, 1000);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);

c = create_consumer(bootstraps, topic, rd_true);
Expand All @@ -337,27 +323,33 @@ do_test_consumer_group_heartbeat_retriable_error(rd_kafka_resp_err_t err,
* rd_kafka_subscribe() */
RD_KAFKA_PARTITION_UA);

TEST_SAY("Subscribing to topic\n");
rd_kafka_mock_start_request_tracking(mcluster);
TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription));
rd_kafka_topic_partition_list_destroy(subscription);

/* First HB and retry */
expected_heartbeats = 2;
/* Time for first HB and retry */
observation_window_ms = 100;
rebalance_exp_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
if (variation == 1) {
TEST_SAY(
"Consume from c, no message is returned, "
"but assign callback is processed\n");
test_consumer_poll_no_msgs("after heartbeat", c, 0, 200);

/* wait 1 HB interval more */
observation_window_ms += 600;
expected_heartbeats++;
expected_heartbeats += 1;
rebalance_exp_event = RD_KAFKA_RESP_ERR_NO_ERROR;
}

TEST_SAY("Awaiting first HBs\n");
TEST_ASSERT((found_heartbeats =
wait_all_heartbeats_done(mcluster, expected_heartbeats,
200)) == expected_heartbeats,
"Expected %d heartbeats, got %d", expected_heartbeats,
found_heartbeats);

rebalance_exp_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;

/* Consume from c, no message is returned */
TEST_SAY("Consume from c, no message is returned\n");
test_consumer_poll_no_msgs("after heartbeat", c, 0, 250);

TEST_ASSERT(rebalance_cnt > 0, "Expected > 0 rebalance events, got %d",
Expand All @@ -374,8 +366,7 @@ do_test_consumer_group_heartbeat_retriable_error(rd_kafka_resp_err_t err,
rebalance_cnt);
rebalance_exp_event = RD_KAFKA_RESP_ERR_NO_ERROR;

/* After closing the consumer, leave group heartbeat should have been
* sent */
TEST_SAY("Awaiting leave group HB\n");
TEST_ASSERT((found_heartbeats =
wait_all_heartbeats_done(mcluster, 1, 200)) == 1,
"Expected 1 leave group heartbeat, got %d",
Expand Down Expand Up @@ -440,11 +431,9 @@ do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,
SUB_TEST_QUICK("%s, variation %d", rd_kafka_err2name(err), variation);

mcluster = test_mock_cluster_new(1, &bootstraps);
rd_kafka_mock_set_default_heartbeat_interval(mcluster, 500);
rd_kafka_mock_set_default_heartbeat_interval(mcluster, 1000);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);

TIMING_START(&timing, "consumer_group_heartbeat_fenced_error");

if (variation == 1) {
/* First HB returns assignment */
rd_kafka_mock_broker_push_request_error_rtts(
Expand All @@ -457,13 +446,16 @@ do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,

c = create_consumer(bootstraps, topic, rd_true);

TIMING_START(&timing, "consumer_group_heartbeat_fenced_error");

/* Subscribe to the input topic */
subscription = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(subscription, topic,
/* The partition is ignored in
* rd_kafka_subscribe() */
RD_KAFKA_PARTITION_UA);

TEST_SAY("Subscribing to topic\n");
rd_kafka_mock_start_request_tracking(mcluster);
TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription));
rd_kafka_topic_partition_list_destroy(subscription);
Expand All @@ -474,6 +466,7 @@ do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,
/*First HB receives assignment*/
expected_heartbeats = 1;

TEST_SAY("Awaiting initial HBs\n");
TEST_ASSERT((found_heartbeats =
wait_all_heartbeats_done(mcluster, expected_heartbeats,
200)) == expected_heartbeats,
Expand All @@ -487,15 +480,15 @@ do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,
rebalance_exp_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;

/* First HB assigned */
rkmessage = rd_kafka_consumer_poll(c, 750);
rkmessage = rd_kafka_consumer_poll(c, 100);
TEST_ASSERT(!rkmessage, "No message should be returned");

/* Needs to wait HB interval */
TEST_SAY("Awaiting partition lost callback\n");
/* Second HB acks and loses partitions */
expected_rebalance_cnt++;
rebalance_exp_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
rebalance_exp_lost = rd_true;

/* Second HB loses partitions */
/* Needs to wait HB interval */
rkmessage = rd_kafka_consumer_poll(c, 750);
TEST_ASSERT(!rkmessage, "No message should be returned");

Expand All @@ -506,29 +499,32 @@ do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,
rebalance_exp_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
rebalance_exp_lost = rd_false;

TEST_SAY("Clearing mock requests\n");
rd_kafka_mock_clear_requests(mcluster);
expected_heartbeats = 0;

TEST_SAY("Awaiting rebalance callback\n");
/* Consume from c, partitions are lost if assigned */
rkmessage = rd_kafka_consumer_poll(c, 1000);
rkmessage = rd_kafka_consumer_poll(c, 500);
TEST_ASSERT(!rkmessage, "No message should be returned");

TEST_ASSERT(rebalance_cnt == expected_rebalance_cnt,
"Expected %d rebalance events, got %d",
expected_rebalance_cnt, rebalance_cnt);

if (variation == 0) {
/*Ack for assignment HB*/
/* Ack for assignment HB */
expected_heartbeats++;
} else if (variation == 1) {
/* First HB is fenced
* Second receives assignment
* Third acks assignment */
expected_heartbeats += 3;
/* First HB assigns again
* Second HB acks assignment */
expected_heartbeats += 2;
}

TEST_SAY("Awaiting acknowledge heartbeat\n");
TEST_ASSERT((found_heartbeats =
wait_all_heartbeats_done(mcluster, expected_heartbeats,
250)) == expected_heartbeats,
100)) == expected_heartbeats,
"Expected %d heartbeats, got %d", expected_heartbeats,
found_heartbeats);

Expand All @@ -543,6 +539,7 @@ do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,
"Expected %d rebalance events, got %d",
expected_rebalance_cnt, rebalance_cnt);

TEST_SAY("Awaiting leave group heartbeat\n");
/* After closing the consumer, 1 heartbeat should been sent */
TEST_ASSERT((found_heartbeats =
wait_all_heartbeats_done(mcluster, 1, 200)) == 1,
Expand All @@ -553,7 +550,7 @@ do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,
rd_kafka_destroy(c);
test_mock_cluster_destroy(mcluster);

TIMING_ASSERT(&timing, 500, 3000);
TIMING_ASSERT(&timing, 500, 2000);
SUB_TEST_PASS();
}

Expand Down Expand Up @@ -614,7 +611,6 @@ static void do_test_metadata_unknown_topic_id_error(int variation) {

TIMING_START(&timing, "do_test_metadata_unknown_topic_id_error");

/* Subscribe to the input topic */
subscription = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(subscription, topic,
RD_KAFKA_PARTITION_UA);
Expand All @@ -626,16 +622,19 @@ static void do_test_metadata_unknown_topic_id_error(int variation) {
rd_kafka_mock_topic_set_error(mcluster, topic,
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_ID);

TEST_SAY("Subscribing to topic\n");
TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription));
rd_kafka_topic_partition_list_destroy(subscription);

/* Cannot fetch until Metadata calls replies with UNKNOWN_TOPIC_ID */
TEST_SAY(
"Cannot fetch until Metadata calls replies with "
"UNKNOWN_TOPIC_ID\n");
test_consumer_poll_no_msgs("no messages", c, 0, 1000);

rd_kafka_mock_topic_set_error(mcluster, topic,
RD_KAFKA_RESP_ERR_NO_ERROR);

/* Fetch the message */
TEST_SAY("Reconciliation and fetch is now possible\n");
test_consumer_poll_timeout("message", c, 0, 0, 0, 1, NULL, 2000);

rd_kafka_destroy(c);
Expand All @@ -660,6 +659,11 @@ static void do_test_metadata_unknown_topic_id_tests(void) {
int main_0147_consumer_group_consumer_mock(int argc, char **argv) {
TEST_SKIP_MOCK_CLUSTER(0);

if (test_consumer_group_protocol_classic()) {
TEST_SKIP("Test only for group.protocol=consumer\n");
return 0;
}

do_test_consumer_group_heartbeat_fatal_errors();

do_test_consumer_group_heartbeat_retriable_errors();
Expand Down

0 comments on commit a047935

Please sign in to comment.