Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLIENTS-1521: Remove GET Topic from LIST/GET/UPDATE/REST Topic Configuration. #644

Merged
merged 1 commit into from Mar 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -35,24 +35,21 @@
final class TopicConfigurationManagerImpl implements TopicConfigurationManager {

private final Admin adminClient;
private final TopicManager topicManager;
private final ClusterManager clusterManager;

@Inject
TopicConfigurationManagerImpl(Admin adminClient, TopicManager topicManager) {
TopicConfigurationManagerImpl(Admin adminClient, ClusterManager clusterManager) {
this.adminClient = Objects.requireNonNull(adminClient);
this.topicManager = Objects.requireNonNull(topicManager);
this.clusterManager = Objects.requireNonNull(clusterManager);
}

@Override
public CompletableFuture<List<TopicConfiguration>> listTopicConfigurations(
String clusterId, String topicName) {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);

return topicManager.getTopic(clusterId, topicName)
.thenApply(
topic ->
checkEntityExists(
topic, "Topic %s cannot be found in cluster %s.", topicName, clusterId))
return clusterManager.getCluster(clusterId)
.thenApply(cluster -> checkEntityExists(cluster, "Cluster %s cannot be found.", clusterId))
.thenCompose(
topic ->
KafkaFutures.toCompletableFuture(
Expand Down
Expand Up @@ -15,7 +15,6 @@

package io.confluent.kafkarest.controllers;

import static io.confluent.kafkarest.CompletableFutures.failedFuture;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
Expand All @@ -27,7 +26,8 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;

import io.confluent.kafkarest.entities.Topic;
import io.confluent.kafkarest.TestUtils;
import io.confluent.kafkarest.entities.Cluster;
import io.confluent.kafkarest.entities.TopicConfiguration;
import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -43,6 +43,7 @@
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.easymock.EasyMockRule;
import org.easymock.Mock;
import org.junit.Before;
Expand All @@ -57,13 +58,8 @@ public class TopicConfigurationManagerImplTest {
private static final String CLUSTER_ID = "cluster-1";
private static final String TOPIC_NAME = "topic-1";

private static final Topic TOPIC =
new Topic(
CLUSTER_ID,
TOPIC_NAME,
emptyList(),
/* replicationFactor= */ (short) 1,
/* isInternal= */ false);
private static final Cluster CLUSTER =
new Cluster(CLUSTER_ID, /* controller= */ null, emptyList());

private static final TopicConfiguration CONFIGURATION_1 =
new TopicConfiguration(
Expand Down Expand Up @@ -122,7 +118,7 @@ public class TopicConfigurationManagerImplTest {
private Admin adminClient;

@Mock
private TopicManager topicManager;
private ClusterManager clusterManager;

@Mock
private DescribeConfigsResult describeConfigsResult;
Expand All @@ -134,13 +130,12 @@ public class TopicConfigurationManagerImplTest {

@Before
public void setUp() {
topicConfigurationManager = new TopicConfigurationManagerImpl(adminClient, topicManager);
topicConfigurationManager = new TopicConfigurationManagerImpl(adminClient, clusterManager);
}

@Test
public void listTopicConfigurations_existingTopic_returnsConfigurations() throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(completedFuture(Optional.of(TOPIC)));
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.of(CLUSTER)));
expect(
adminClient.describeConfigs(
singletonList(new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME))))
Expand All @@ -150,7 +145,7 @@ public void listTopicConfigurations_existingTopic_returnsConfigurations() throws
singletonMap(
new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME),
KafkaFuture.completedFuture(CONFIG)));
replay(adminClient, topicManager, describeConfigsResult);
replay(adminClient, clusterManager, describeConfigsResult);

List<TopicConfiguration> configurations =
topicConfigurationManager.listTopicConfigurations(CLUSTER_ID, TOPIC_NAME).get();
Expand All @@ -162,23 +157,30 @@ public void listTopicConfigurations_existingTopic_returnsConfigurations() throws

@Test
public void listTopicConfigurations_nonExistingTopic_throwsNotFound() throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(completedFuture(Optional.empty()));
replay(topicManager);
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.of(CLUSTER)));
expect(
adminClient.describeConfigs(
singletonList(new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME))))
.andReturn(describeConfigsResult);
expect(describeConfigsResult.values())
.andReturn(
singletonMap(
new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME),
TestUtils.failedFuture(new UnknownTopicOrPartitionException())));
replay(clusterManager, adminClient, describeConfigsResult);

try {
topicConfigurationManager.listTopicConfigurations(CLUSTER_ID, TOPIC_NAME).get();
fail();
} catch (ExecutionException e) {
assertEquals(NotFoundException.class, e.getCause().getClass());
assertEquals(UnknownTopicOrPartitionException.class, e.getCause().getClass());
}
}

@Test
public void listTopicConfigurations_nonExistingCluster_throwsNotFound() throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(failedFuture(new NotFoundException()));
replay(topicManager);
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.empty()));
replay(clusterManager);

try {
topicConfigurationManager.listTopicConfigurations(CLUSTER_ID, TOPIC_NAME).get();
Expand All @@ -190,8 +192,7 @@ public void listTopicConfigurations_nonExistingCluster_throwsNotFound() throws E

@Test
public void getTopicConfiguration_existingConfiguration_returnsConfiguration() throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(completedFuture(Optional.of(TOPIC)));
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.of(CLUSTER)));
expect(
adminClient.describeConfigs(
singletonList(new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME))))
Expand All @@ -201,7 +202,7 @@ public void getTopicConfiguration_existingConfiguration_returnsConfiguration() t
singletonMap(
new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME),
KafkaFuture.completedFuture(CONFIG)));
replay(adminClient, topicManager, describeConfigsResult);
replay(adminClient, clusterManager, describeConfigsResult);

TopicConfiguration configuration =
topicConfigurationManager.getTopicConfiguration(
Expand All @@ -214,8 +215,7 @@ public void getTopicConfiguration_existingConfiguration_returnsConfiguration() t

@Test
public void getTopicConfiguration_nonExistingConfiguration_returnsEmpty() throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(completedFuture(Optional.of(TOPIC)));
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.of(CLUSTER)));
expect(
adminClient.describeConfigs(
singletonList(new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME))))
Expand All @@ -225,7 +225,7 @@ public void getTopicConfiguration_nonExistingConfiguration_returnsEmpty() throws
singletonMap(
new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME),
KafkaFuture.completedFuture(CONFIG)));
replay(adminClient, topicManager, describeConfigsResult);
replay(adminClient, clusterManager, describeConfigsResult);

Optional<TopicConfiguration> configuration =
topicConfigurationManager.getTopicConfiguration(CLUSTER_ID, TOPIC_NAME, "foobar").get();
Expand All @@ -235,24 +235,31 @@ public void getTopicConfiguration_nonExistingConfiguration_returnsEmpty() throws

@Test
public void getTopicConfiguration_nonExistingTopic_throwsNotFound() throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(completedFuture(Optional.empty()));
replay(topicManager);
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.of(CLUSTER)));
expect(
adminClient.describeConfigs(
singletonList(new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME))))
.andReturn(describeConfigsResult);
expect(describeConfigsResult.values())
.andReturn(
singletonMap(
new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME),
TestUtils.failedFuture(new UnknownTopicOrPartitionException())));
replay(clusterManager, adminClient, describeConfigsResult);

try {
topicConfigurationManager.getTopicConfiguration(
CLUSTER_ID, TOPIC_NAME, CONFIGURATION_1.getName()).get();
fail();
} catch (ExecutionException e) {
assertEquals(NotFoundException.class, e.getCause().getClass());
assertEquals(UnknownTopicOrPartitionException.class, e.getCause().getClass());
}
}

@Test
public void getTopicConfiguration_nonExistingCluster_throwsNotFound() throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(failedFuture(new NotFoundException()));
replay(topicManager);
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.empty()));
replay(clusterManager);

try {
topicConfigurationManager.getTopicConfiguration(
Expand All @@ -266,8 +273,7 @@ public void getTopicConfiguration_nonExistingCluster_throwsNotFound() throws Exc
@Test
public void updateTopicConfiguration_existingConfiguration_updatesConfiguration()
throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(completedFuture(Optional.of(TOPIC)));
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.of(CLUSTER)));
expect(
adminClient.describeConfigs(
singletonList(new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME))))
Expand All @@ -291,7 +297,7 @@ public void updateTopicConfiguration_existingConfiguration_updatesConfiguration(
singletonMap(
new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME),
KafkaFuture.completedFuture(null)));
replay(topicManager, adminClient, describeConfigsResult, alterConfigsResult);
replay(clusterManager, adminClient, describeConfigsResult, alterConfigsResult);

topicConfigurationManager.updateTopicConfiguration(
CLUSTER_ID, TOPIC_NAME, CONFIGURATION_1.getName(), "new-value").get();
Expand All @@ -302,8 +308,7 @@ public void updateTopicConfiguration_existingConfiguration_updatesConfiguration(
@Test
public void updateTopicConfiguration_nonExistingConfiguration_throwsNotFound()
throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(completedFuture(Optional.of(TOPIC)));
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.of(CLUSTER)));
expect(
adminClient.describeConfigs(
singletonList(new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME))))
Expand All @@ -313,7 +318,7 @@ public void updateTopicConfiguration_nonExistingConfiguration_throwsNotFound()
singletonMap(
new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME),
KafkaFuture.completedFuture(CONFIG)));
replay(topicManager, adminClient, describeConfigsResult);
replay(clusterManager, adminClient, describeConfigsResult);

try {
topicConfigurationManager.updateTopicConfiguration(
Expand All @@ -329,16 +334,24 @@ public void updateTopicConfiguration_nonExistingConfiguration_throwsNotFound()
@Test
public void updateTopicConfiguration_nonExistingTopic_throwsNotFound()
throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(completedFuture(Optional.empty()));
replay(topicManager, adminClient);
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.of(CLUSTER)));
expect(
adminClient.describeConfigs(
singletonList(new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME))))
.andReturn(describeConfigsResult);
expect(describeConfigsResult.values())
.andReturn(
singletonMap(
new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME),
TestUtils.failedFuture(new UnknownTopicOrPartitionException())));
replay(clusterManager, adminClient, describeConfigsResult);

try {
topicConfigurationManager.updateTopicConfiguration(
CLUSTER_ID, TOPIC_NAME, CONFIGURATION_1.getName(), "new-value").get();
fail();
} catch (ExecutionException e) {
assertEquals(NotFoundException.class, e.getCause().getClass());
assertEquals(UnknownTopicOrPartitionException.class, e.getCause().getClass());
}

verify(adminClient);
Expand All @@ -347,9 +360,8 @@ public void updateTopicConfiguration_nonExistingTopic_throwsNotFound()
@Test
public void updateTopicConfiguration_nonExistingCluster_throwsNotFound()
throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(failedFuture(new NotFoundException()));
replay(topicManager, adminClient);
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.empty()));
replay(clusterManager, adminClient);

try {
topicConfigurationManager.updateTopicConfiguration(
Expand All @@ -365,8 +377,7 @@ public void updateTopicConfiguration_nonExistingCluster_throwsNotFound()
@Test
public void resetTopicConfiguration_existingConfiguration_resetsConfiguration()
throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(completedFuture(Optional.of(TOPIC)));
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.of(CLUSTER)));
expect(
adminClient.describeConfigs(
singletonList(new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME))))
Expand All @@ -390,7 +401,7 @@ public void resetTopicConfiguration_existingConfiguration_resetsConfiguration()
singletonMap(
new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME),
KafkaFuture.completedFuture(null)));
replay(topicManager, adminClient, describeConfigsResult, alterConfigsResult);
replay(clusterManager, adminClient, describeConfigsResult, alterConfigsResult);

topicConfigurationManager.resetTopicConfiguration(
CLUSTER_ID, TOPIC_NAME, CONFIGURATION_1.getName()).get();
Expand All @@ -401,8 +412,7 @@ public void resetTopicConfiguration_existingConfiguration_resetsConfiguration()
@Test
public void resetTopicConfiguration_nonExistingConfiguration_throwsNotFound()
throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(completedFuture(Optional.of(TOPIC)));
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.of(CLUSTER)));
expect(
adminClient.describeConfigs(
singletonList(new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME))))
Expand All @@ -412,7 +422,7 @@ public void resetTopicConfiguration_nonExistingConfiguration_throwsNotFound()
singletonMap(
new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME),
KafkaFuture.completedFuture(CONFIG)));
replay(topicManager, adminClient, describeConfigsResult);
replay(clusterManager, adminClient, describeConfigsResult);

try {
topicConfigurationManager.resetTopicConfiguration(CLUSTER_ID, TOPIC_NAME, "foobar").get();
Expand All @@ -427,16 +437,24 @@ public void resetTopicConfiguration_nonExistingConfiguration_throwsNotFound()
@Test
public void resetTopicConfiguration_nonExistingTopic_throwsNotFound()
throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(completedFuture(Optional.empty()));
replay(topicManager, adminClient);
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.of(CLUSTER)));
expect(
adminClient.describeConfigs(
singletonList(new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME))))
.andReturn(describeConfigsResult);
expect(describeConfigsResult.values())
.andReturn(
singletonMap(
new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME),
TestUtils.failedFuture(new UnknownTopicOrPartitionException())));
replay(clusterManager, adminClient, describeConfigsResult);

try {
topicConfigurationManager.resetTopicConfiguration(
CLUSTER_ID, TOPIC_NAME, CONFIGURATION_1.getName()).get();
fail();
} catch (ExecutionException e) {
assertEquals(NotFoundException.class, e.getCause().getClass());
assertEquals(UnknownTopicOrPartitionException.class, e.getCause().getClass());
}

verify(adminClient);
Expand All @@ -445,9 +463,8 @@ public void resetTopicConfiguration_nonExistingTopic_throwsNotFound()
@Test
public void resetTopicConfiguration_nonExistingCluster_throwsNotFound()
throws Exception {
expect(topicManager.getTopic(CLUSTER_ID, TOPIC_NAME))
.andReturn(failedFuture(new NotFoundException()));
replay(topicManager, adminClient);
expect(clusterManager.getCluster(CLUSTER_ID)).andReturn(completedFuture(Optional.empty()));
replay(clusterManager, adminClient);

try {
topicConfigurationManager.resetTopicConfiguration(
Expand Down