Skip to content

Commit

Permalink
adding additional retry to getConsumerLag_returnsConsumerLag
Browse files Browse the repository at this point in the history
  • Loading branch information
ahuang98 committed Feb 17, 2021
1 parent 1e7652a commit 52fc930
Showing 1 changed file with 43 additions and 42 deletions.
Expand Up @@ -78,28 +78,26 @@ public void setUp() throws Exception {

@Test
public void listConsumerLags_returnsConsumerLags() {
KafkaConsumer<?, ?> consumer1 = createConsumer(group1, "client-1");
KafkaConsumer<?, ?> consumer2 = createConsumer(group1, "client-2");
consumer1.subscribe(Collections.singletonList(topic1));
consumer2.subscribe(Collections.singletonList(topic2));
consumer1.poll(Duration.ofSeconds(1));
consumer2.poll(Duration.ofSeconds(1));
// After polling once, only one of the consumers will be member of the group, so we poll again
// to force the other consumer to join the group.
consumer1.poll(Duration.ofSeconds(1));
consumer2.poll(Duration.ofSeconds(1));

// produce to topic1 partition0 and topic2 partition1
BinaryPartitionProduceRequest request1 =
BinaryPartitionProduceRequest.create(partitionRecordsWithoutKeys);
produce(topic1, 0, request1);
produce(topic2, 1, request1);

KafkaConsumer<?, ?> consumer1 = createConsumer(group1, "client-1");
KafkaConsumer<?, ?> consumer2 = createConsumer(group1, "client-2");

testWithRetry(
() -> {
// consume from subscribed topics
consumer1.subscribe(Collections.singletonList(topic1));
consumer2.subscribe(Collections.singletonList(topic2));
consumer1.poll(Duration.ofSeconds(1));
consumer2.poll(Duration.ofSeconds(1));
// After polling once, only one of the consumers will be member of the group, so we poll again
// to force the other consumer to join the group.
consumer1.poll(Duration.ofSeconds(1));
consumer2.poll(Duration.ofSeconds(1));
// commit offsets from consuming from subscribed topics
consumer1.commitSync();
consumer2.commitSync();

Expand Down Expand Up @@ -212,43 +210,46 @@ public void listConsumerLags_nonExistingConsumerGroup_returnsNotFound() {

@Test
public void getConsumerLag_returnsConsumerLag() {
KafkaConsumer<?, ?> consumer = createConsumer(group1, "client-1");
consumer.subscribe(Arrays.asList(topic1, topic2));
// produce to topic1 partition0 and topic2 partition1
BinaryPartitionProduceRequest request1 =
BinaryPartitionProduceRequest.create(partitionRecordsWithoutKeys);
produce(topic1, 0, request1);
produce(topic2, 1, request1);

// consume from subscribed topics (zero lag)
consumer.poll(Duration.ofSeconds(1));
consumer.commitSync();
KafkaConsumer<?, ?> consumer = createConsumer(group1, "client-1");
testWithRetry(
() -> {
consumer.subscribe(Arrays.asList(topic1, topic2));
// consume from subscribed topics (zero lag)
consumer.poll(Duration.ofSeconds(1));
consumer.commitSync();

// stores expected currentOffsets and logEndOffsets for each topic partition after sending
// 3 records to topic1 partition0 and topic2 partition1
long[][] expectedOffsets = new long[numTopics][numPartitions];
expectedOffsets[0][0] = 3;
expectedOffsets[1][1] = 3;
// all other values default to 0L

for (int t = 0; t < numTopics; t++) {
for (int p = 0; p < numPartitions; p++) {
consumer.seekToEnd(Collections.singletonList(new TopicPartition(topics[t], p)));
// consumer.seek(new TopicPartition(topics[t], p), expectedOffsets[t][p]);
Response response =
request("/v3/clusters/" + clusterId + "/consumer-groups/" + group1 +
"/lags/" + topics[t] + "/partitions/" + p)
.accept(MediaType.APPLICATION_JSON)
.get();

assertEquals(Status.OK.getStatusCode(), response.getStatus());
ConsumerLagData consumerLagData =
response.readEntity(GetConsumerLagResponse.class).getValue();
assertEquals(expectedOffsets[t][p], (long) consumerLagData.getCurrentOffset());
assertEquals(expectedOffsets[t][p], (long) consumerLagData.getLogEndOffset());
assertEquals(0, (long) consumerLagData.getLag());
}
}
// stores expected currentOffsets and logEndOffsets for each topic partition after sending
// 3 records to topic1 partition0 and topic2 partition1
long[][] expectedOffsets = new long[numTopics][numPartitions];
expectedOffsets[0][0] = 3;
expectedOffsets[1][1] = 3;
// all other values default to 0L

for (int t = 0; t < numTopics; t++) {
for (int p = 0; p < numPartitions; p++) {
consumer.seekToEnd(Collections.singletonList(new TopicPartition(topics[t], p)));
Response response =
request("/v3/clusters/" + clusterId + "/consumer-groups/" + group1 +
"/lags/" + topics[t] + "/partitions/" + p)
.accept(MediaType.APPLICATION_JSON)
.get();

assertEquals(Status.OK.getStatusCode(), response.getStatus());
ConsumerLagData consumerLagData =
response.readEntity(GetConsumerLagResponse.class).getValue();
assertEquals(expectedOffsets[t][p], (long) consumerLagData.getCurrentOffset());
assertEquals(expectedOffsets[t][p], (long) consumerLagData.getLogEndOffset());
assertEquals(0, (long) consumerLagData.getLag());
}
}
}
);

// produce again to topic2 partition1
BinaryPartitionProduceRequest request2 =
Expand Down

0 comments on commit 52fc930

Please sign in to comment.