Skip to content

Commit

Permalink
Make test 0147 more reliable
Browse files Browse the repository at this point in the history
  • Loading branch information
emasab committed Apr 5, 2024
1 parent a50531a commit 99f4833
Showing 1 changed file with 31 additions and 48 deletions.
79 changes: 31 additions & 48 deletions tests/0147-consumer_group_consumer_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ static void rebalance_cb(rd_kafka_t *rk,
rd_kafka_yield(rk);
}

static rd_bool_t is_heartbeat_request(rd_kafka_mock_request_t *request,
void *opaque) {
return rd_kafka_mock_request_api_key(request) ==
RD_KAFKAP_ConsumerGroupHeartbeat;
}

/**
* @brief Wait at least \p num heartbeats
* have been received by the mock cluster
Expand All @@ -105,31 +111,8 @@ static void rebalance_cb(rd_kafka_t *rk,
static int wait_all_heartbeats_done(rd_kafka_mock_cluster_t *mcluster,
int num,
int confidence_interval) {
size_t i;
rd_kafka_mock_request_t **requests;
size_t request_cnt;
int current_heartbeats = 0;
rd_bool_t last_time = rd_true;

while (current_heartbeats < num || last_time) {
if (current_heartbeats >= num) {
rd_usleep(confidence_interval * 1000, 0);
last_time = rd_false;
}
requests = rd_kafka_mock_get_requests(mcluster, &request_cnt);
current_heartbeats = 0;
for (i = 0; i < request_cnt; i++) {
if (rd_kafka_mock_request_api_key(requests[i]) ==
RD_KAFKAP_ConsumerGroupHeartbeat)
current_heartbeats++;
}
rd_kafka_mock_request_destroy_array(requests, request_cnt);
rd_usleep(100 * 1000, 0);
}
if (test_on_ci && current_heartbeats == num + 1) {
return num;
}
return current_heartbeats;
return test_mock_wait_maching_requests(
mcluster, num, confidence_interval, is_heartbeat_request, NULL);
}

static rd_kafka_t *create_consumer(const char *bootstraps,
Expand Down Expand Up @@ -172,7 +155,7 @@ do_test_consumer_group_heartbeat_fatal_error(rd_kafka_resp_err_t err,
SUB_TEST_QUICK("%s, variation %d", rd_kafka_err2name(err), variation);

mcluster = test_mock_cluster_new(1, &bootstraps);
rd_kafka_mock_set_default_heartbeat_interval(mcluster, 500);
rd_kafka_mock_set_default_heartbeat_interval(mcluster, 1000);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);

TIMING_START(&timing, "consumer_group_heartbeat_fatal_error");
Expand Down Expand Up @@ -301,7 +284,7 @@ do_test_consumer_group_heartbeat_retriable_error(rd_kafka_resp_err_t err,
const char *bootstraps;
rd_kafka_topic_partition_list_t *subscription;
rd_kafka_t *c;
int expected_heartbeats, found_heartbeats, observation_window_ms;
int expected_heartbeats, found_heartbeats;
test_timing_t timing;
const char *topic = test_mk_topic_name(__FUNCTION__, 0);
test_curr->is_fatal_cb = error_is_fatal_cb;
Expand All @@ -313,7 +296,7 @@ do_test_consumer_group_heartbeat_retriable_error(rd_kafka_resp_err_t err,


mcluster = test_mock_cluster_new(1, &bootstraps);
rd_kafka_mock_set_default_heartbeat_interval(mcluster, 500);
rd_kafka_mock_set_default_heartbeat_interval(mcluster, 1000);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);

c = create_consumer(bootstraps, topic, rd_true);
Expand Down Expand Up @@ -341,22 +324,24 @@ do_test_consumer_group_heartbeat_retriable_error(rd_kafka_resp_err_t err,
TEST_CALL_ERR__(rd_kafka_subscribe(c, subscription));
rd_kafka_topic_partition_list_destroy(subscription);

/* First HB and retry */
expected_heartbeats = 2;
/* Time for first HB and retry */
observation_window_ms = 100;
rebalance_exp_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
if (variation == 1) {
/* Consume from c, no message is returned, but assign
* callback is processed */
test_consumer_poll_no_msgs("after heartbeat", c, 0, 200);

/* wait 1 HB interval more */
observation_window_ms += 600;
expected_heartbeats++;
expected_heartbeats += 1;
rebalance_exp_event = RD_KAFKA_RESP_ERR_NO_ERROR;
}
TEST_ASSERT((found_heartbeats =
wait_all_heartbeats_done(mcluster, expected_heartbeats,
200)) == expected_heartbeats,
"Expected %d heartbeats, got %d", expected_heartbeats,
found_heartbeats);

rebalance_exp_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;

/* Consume from c, no message is returned */
test_consumer_poll_no_msgs("after heartbeat", c, 0, 250);

Expand Down Expand Up @@ -440,11 +425,9 @@ do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,
SUB_TEST_QUICK("%s, variation %d", rd_kafka_err2name(err), variation);

mcluster = test_mock_cluster_new(1, &bootstraps);
rd_kafka_mock_set_default_heartbeat_interval(mcluster, 500);
rd_kafka_mock_set_default_heartbeat_interval(mcluster, 1000);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);

TIMING_START(&timing, "consumer_group_heartbeat_fenced_error");

if (variation == 1) {
/* First HB returns assignment */
rd_kafka_mock_broker_push_request_error_rtts(
Expand All @@ -457,6 +440,8 @@ do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,

c = create_consumer(bootstraps, topic, rd_true);

TIMING_START(&timing, "consumer_group_heartbeat_fenced_error");

/* Subscribe to the input topic */
subscription = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(subscription, topic,
Expand Down Expand Up @@ -487,15 +472,14 @@ do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,
rebalance_exp_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;

/* First HB assigned */
rkmessage = rd_kafka_consumer_poll(c, 750);
rkmessage = rd_kafka_consumer_poll(c, 100);
TEST_ASSERT(!rkmessage, "No message should be returned");

/* Needs to wait HB interval */
/* Second HB acks and loses partitions */
expected_rebalance_cnt++;
rebalance_exp_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
rebalance_exp_lost = rd_true;

/* Second HB loses partitions */
/* Needs to wait HB interval */
rkmessage = rd_kafka_consumer_poll(c, 750);
TEST_ASSERT(!rkmessage, "No message should be returned");

Expand All @@ -518,17 +502,16 @@ do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,
expected_rebalance_cnt, rebalance_cnt);

if (variation == 0) {
/*Ack for assignment HB*/
/* Ack for assignment HB */
expected_heartbeats++;
} else if (variation == 1) {
/* First HB is fenced
* Second receives assignment
* Third acks assignment */
expected_heartbeats += 3;
/* First HB assigns again
* Second HB acks assignment */
expected_heartbeats += 2;
}
TEST_ASSERT((found_heartbeats =
wait_all_heartbeats_done(mcluster, expected_heartbeats,
250)) == expected_heartbeats,
100)) == expected_heartbeats,
"Expected %d heartbeats, got %d", expected_heartbeats,
found_heartbeats);

Expand All @@ -553,7 +536,7 @@ do_test_consumer_group_heartbeat_fenced_error(rd_kafka_resp_err_t err,
rd_kafka_destroy(c);
test_mock_cluster_destroy(mcluster);

TIMING_ASSERT(&timing, 500, 3000);
TIMING_ASSERT(&timing, 500, 2000);
SUB_TEST_PASS();
}

Expand Down

0 comments on commit 99f4833

Please sign in to comment.