Skip to content

Commit

Permalink
retrying only the http response
Browse files Browse the repository at this point in the history
  • Loading branch information
ahuang98 committed Feb 26, 2021
1 parent dd71233 commit 8076cb4
Showing 1 changed file with 28 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,20 @@ public void listConsumerLags_returnsConsumerLags() {
KafkaConsumer<?, ?> consumer1 = createConsumer(group1, "client-1");
KafkaConsumer<?, ?> consumer2 = createConsumer(group1, "client-2");

consumer1.subscribe(Collections.singletonList(topic1));
consumer2.subscribe(Collections.singletonList(topic2));
consumer1.poll(Duration.ofSeconds(1));
consumer2.poll(Duration.ofSeconds(1));
// After polling once, only one of the consumers will be member of the group, so we poll again
// to force the other consumer to join the group.
consumer1.poll(Duration.ofSeconds(1));
consumer2.poll(Duration.ofSeconds(1));
// commit offsets from consuming from subscribed topics
consumer1.commitSync();
consumer2.commitSync();

testWithRetry(
() -> {
consumer1.subscribe(Collections.singletonList(topic1));
consumer2.subscribe(Collections.singletonList(topic2));
consumer1.poll(Duration.ofSeconds(1));
consumer2.poll(Duration.ofSeconds(1));
// After polling once, only one of the consumers will be member of the group, so we poll again
// to force the other consumer to join the group.
consumer1.poll(Duration.ofSeconds(1));
consumer2.poll(Duration.ofSeconds(1));
// commit offsets from consuming from subscribed topics
consumer1.commitSync();
consumer2.commitSync();

Response response =
request("/v3/clusters/" + clusterId + "/consumer-groups/" + group1 + "/lags")
.accept(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -224,32 +224,33 @@ public void getConsumerLag_returnsConsumerLag() {
// all other values default to 0L

KafkaConsumer<?, ?> consumer = createConsumer(group1, "client-1");
testWithRetry(
() -> {
consumer.subscribe(Arrays.asList(topic1, topic2));
// consume from subscribed topics (zero lag)
consumer.poll(Duration.ofSeconds(1));
consumer.commitSync();
consumer.subscribe(Arrays.asList(topic1, topic2));
// consume from subscribed topics (zero lag)
consumer.poll(Duration.ofSeconds(1));
consumer.commitSync();

for (int t = 0; t < numTopics; t++) {
for (int p = 0; p < numPartitions; p++) {
consumer.seekToEnd(Collections.singletonList(new TopicPartition(topics[t], p)));
for (int t = 0; t < numTopics; t++) {
for (int p = 0; p < numPartitions; p++) {
final int finalP = p;
final int finalT = t;
testWithRetry(
() -> {
Response response =
request("/v3/clusters/" + clusterId + "/consumer-groups/" + group1 +
"/lags/" + topics[t] + "/partitions/" + p)
"/lags/" + topics[finalT] + "/partitions/" + finalP)
.accept(MediaType.APPLICATION_JSON)
.get();

assertEquals(Status.OK.getStatusCode(), response.getStatus());
ConsumerLagData consumerLagData =
response.readEntity(GetConsumerLagResponse.class).getValue();
assertEquals(expectedOffsets[t][p], (long) consumerLagData.getCurrentOffset());
assertEquals(expectedOffsets[t][p], (long) consumerLagData.getLogEndOffset());
assertEquals(expectedOffsets[finalT][finalP], (long) consumerLagData.getCurrentOffset());
assertEquals(expectedOffsets[finalT][finalP], (long) consumerLagData.getLogEndOffset());
assertEquals(0, (long) consumerLagData.getLag());
}
}
}
);
);
}
}

// produce again to topic2 partition1
BinaryPartitionProduceRequest request2 =
Expand Down

0 comments on commit 8076cb4

Please sign in to comment.