Skip to content

Commit

Permalink
spring-projectsGH-2853: Handle null Cluster ID
Browse files Browse the repository at this point in the history
Resolves spring-projects#2853

Apparently, some third party Kafka replacements can return a `null` `clusterId`.

This causes an attempt to retrieve it on every operation.

**cherry-pick to 3.0.x**
  • Loading branch information
garyrussell committed Oct 19, 2023
1 parent 61bc219 commit a7f2d06
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ public String clusterId() {
if (this.clusterId == null) {
try (AdminClient client = createAdmin()) {
this.clusterId = client.describeCluster().clusterId().get(this.operationTimeout, TimeUnit.SECONDS);
if (this.clusterId == null) {
this.clusterId = "null";
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -358,7 +361,7 @@ public Map<String, TopicDescription> describeTopics(String... topicNames) {
}
}

private AdminClient createAdmin() {
AdminClient createAdmin() {
Map<String, Object> configs2 = new HashMap<>(this.configs);
checkBootstrap(configs2);
return AdminClient.create(configs2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.awaitility.Awaitility.await;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

import java.lang.reflect.Method;
import java.util.Arrays;
Expand All @@ -37,11 +39,13 @@
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.TopicConfig;
Expand Down Expand Up @@ -268,6 +272,24 @@ void toggleBootstraps() {
.isEqualTo("a,b,c");
}

@Test
void nullClusterId() {
AdminClient mock = mock(AdminClient.class);
DescribeClusterResult result = mock(DescribeClusterResult.class);
KafkaFuture<String> fut = KafkaFuture.completedFuture(null);
given(result.clusterId()).willReturn(fut);
given(mock.describeCluster()).willReturn(result);
KafkaAdmin admin = new KafkaAdmin(Map.of()) {

@Override
AdminClient createAdmin() {
return mock;
}

};
assertThat(admin.clusterId()).isEqualTo("null");
}

@Configuration
public static class Config {

Expand Down

0 comments on commit a7f2d06

Please sign in to comment.