Skip to content

Commit

Permalink
feat: validate createTopic permissions on SandboxedKafkaTopicClient (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Aug 22, 2019
1 parent 4d7ef2a commit 0ea157b
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;

/**
Expand All @@ -35,6 +37,27 @@ enum TopicCleanupPolicy {
COMPACT_DELETE
}

default void validateCreateTopic(
final String topic,
final int numPartitions,
final short replicationFactor) {
validateCreateTopic(topic, numPartitions, replicationFactor, Collections.emptyMap());
}

default void validateCreateTopic(
String topic,
int numPartitions,
short replicationFactor,
Map<String, ?> configs) {
createTopic(
topic,
numPartitions,
replicationFactor,
configs,
new CreateTopicsOptions().validateOnly(true)
);
}

/**
* Create a new topic with the specified name, numPartitions and replicationFactor.
*
Expand Down Expand Up @@ -67,11 +90,42 @@ default void createTopic(
* @param numPartitions the partition count of the topic.
* @param configs any additional topic configs to use
*/
void createTopic(
default void createTopic(
String topic,
int numPartitions,
short replicationFactor,
Map<String, ?> configs
) {
createTopic(
topic,
numPartitions,
replicationFactor,
configs,
new CreateTopicsOptions()
);
}

/**
* Create a new topic with the specified name, numPartitions and replicationFactor.
*
* <p>If the topic already exists the method checks that partition count <i>matches</i>matches
* {@code numPartitions} and that the replication factor is <i>at least</i>
* {@code replicationFactor}
*
* @param topic name of the topic to create
* @param replicationFactor the replication factor for the new topic, or
* {@link io.confluent.ksql.topic.TopicProperties#DEFAULT_REPLICAS}
* to use the default replication of the cluster
* @param numPartitions the partition count of the topic.
* @param configs any additional topic configs to use
* @param createOptions the options to use when creating the new topic
*/
void createTopic(
String topic,
int numPartitions,
short replicationFactor,
Map<String, ?> configs,
CreateTopicsOptions createOptions
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
Expand Down Expand Up @@ -86,7 +87,8 @@ public void createTopic(
final String topic,
final int numPartitions,
final short replicationFactor,
final Map<String, ?> configs
final Map<String, ?> configs,
final CreateTopicsOptions createOptions
) {
if (isTopicExists(topic)) {
validateTopicProperties(topic, numPartitions, replicationFactor);
Expand All @@ -101,9 +103,16 @@ public void createTopic(
newTopic.configs(toStringConfigs(configs));

try {
LOG.info("Creating topic '{}'", topic);
LOG.info(String.format("Creating topic '{}' %s",
topic,
(createOptions.shouldValidateOnly()) ? "(ONLY VALIDATE)" : ""
));

ExecutorUtil.executeWithRetries(
() -> adminClient.createTopics(Collections.singleton(newTopic)).all().get(),
() -> adminClient.createTopics(
Collections.singleton(newTopic),
createOptions
).all().get(),
ExecutorUtil.RetryBehaviour.ON_RETRYABLE);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ private void createTopic(
Collections.emptyList()))
.collect(Collectors.toList());

// This is useful to validate permissions to create the topic
delegate.validateCreateTopic(topic, numPartitions, replicationFactor, configs);

createdTopics.put(topic, new SandboxedTopicDescription(
topic,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
Expand Down Expand Up @@ -122,7 +124,8 @@ public void createTopic(
final String topic,
final int numPartitions,
final short replicationFactor,
final Map<String, ?> configs
final Map<String, ?> configs,
final CreateTopicsOptions createOptions
) {
final short replicas = replicationFactor == TopicProperties.DEFAULT_REPLICAS
? 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.services;

import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeClusterResult;
Expand Down Expand Up @@ -118,14 +120,27 @@ public void init() {
@Test
public void shouldCreateTopic() {
expect(adminClient.listTopics()).andReturn(getListTopicsResult());
expect(adminClient.createTopics(anyObject())).andReturn(getCreateTopicsResult());
expect(adminClient.createTopics(anyObject(), shouldValidateCreate(false)))
.andReturn(getCreateTopicsResult());
replay(adminClient);

final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);
kafkaTopicClient.createTopic("test", 1, (short) 1);
verify(adminClient);
}

@Test
public void shouldValidateCreateTopic() {
expect(adminClient.listTopics()).andReturn(getListTopicsResult());
expect(adminClient.createTopics(anyObject(), shouldValidateCreate(true)))
.andReturn(getCreateTopicsResult());
replay(adminClient);

final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);
kafkaTopicClient.validateCreateTopic("test", 1, (short) 1);
verify(adminClient);
}

@Test
public void shouldUseExistingTopicWithTheSameSpecsInsteadOfCreate() {
expect(adminClient.listTopics()).andReturn(getListTopicsResult());
Expand Down Expand Up @@ -160,7 +175,8 @@ public void shouldFailCreateExistingTopic() {
public void shouldFailCreateTopicWhenNoAclsSet() {
// Given:
expect(adminClient.listTopics()).andReturn(getEmptyListTopicResult());
expect(adminClient.createTopics(anyObject())).andReturn(createTopicAuthorizationException());
expect(adminClient.createTopics(anyObject(), anyObject()))
.andReturn(createTopicAuthorizationException());

replay(adminClient);

Expand Down Expand Up @@ -189,7 +205,7 @@ public void shouldNotFailIfTopicAlreadyExistsButCreateUsesDefaultReplicas() {
@Test
public void shouldNotFailIfTopicAlreadyExistsWhenCreating() {
expect(adminClient.listTopics()).andReturn(getEmptyListTopicResult());
expect(adminClient.createTopics(anyObject()))
expect(adminClient.createTopics(anyObject(), anyObject()))
.andReturn(createTopicReturningTopicExistsException());
expect(adminClient.describeTopics(anyObject(), anyObject()))
.andReturn(getDescribeTopicsResult());
Expand All @@ -202,7 +218,7 @@ public void shouldNotFailIfTopicAlreadyExistsWhenCreating() {
@Test
public void shouldRetryDescribeTopicOnRetriableException() {
expect(adminClient.listTopics()).andReturn(getEmptyListTopicResult());
expect(adminClient.createTopics(anyObject()))
expect(adminClient.createTopics(anyObject(), anyObject()))
.andReturn(createTopicReturningTopicExistsException());
expect(adminClient.describeTopics(anyObject(), anyObject()))
.andReturn(describeTopicReturningUnknownPartitionException()).once();
Expand Down Expand Up @@ -433,7 +449,8 @@ public void shouldSetTopicCleanupPolicyToCompact() {
// Verify that the new topic configuration being passed to the admin client is what we expect.
final NewTopic newTopic = new NewTopic(topicName1, 1, (short) 1);
newTopic.configs(Collections.singletonMap("cleanup.policy", "compact"));
expect(adminClient.createTopics(singleNewTopic(newTopic))).andReturn(getCreateTopicsResult());
expect(adminClient.createTopics(singleNewTopic(newTopic), anyObject()))
.andReturn(getCreateTopicsResult());
replay(adminClient);

final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);
Expand Down Expand Up @@ -828,4 +845,21 @@ public void appendTo(final StringBuffer buffer) {
EasyMock.reportMatcher(new NewTopicsMatcher());
return null;
}

private CreateTopicsOptions shouldValidateCreate(final boolean validateOnly) {
EasyMock.reportMatcher(new IArgumentMatcher() {
@Override
public boolean matches(Object argument) {
return argument instanceof CreateTopicsOptions
&& ((CreateTopicsOptions) argument).shouldValidateOnly() == validateOnly;
}

@Override
public void appendTo(StringBuffer buffer) {
buffer.append("validateOnly(\"" + validateOnly + "\")");
}
});

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
Expand All @@ -45,6 +47,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -186,6 +189,37 @@ public void shouldTrackCreatedTopicsDetails() {
Sets.newHashSet(AclOperation.READ, AclOperation.WRITE))));
}

@Test
public void shouldThrowOnCreateIfValidateCreateTopicFails() {
// Given:
doThrow(TopicAuthorizationException.class).when(delegate)
.validateCreateTopic("some topic", 2, (short) 3, configs);

// Expect:
expectedException.expect(TopicAuthorizationException.class);

// When:
sandboxedClient.createTopic("some topic", 2, (short) 3, configs);
}

@Test
public void shouldNotCreateTopicIfValidateCreateTopicFails() {
// Given:
doThrow(TopicAuthorizationException.class).when(delegate)
.validateCreateTopic("some topic", 2, (short) 3, configs);

// When:
try {
sandboxedClient.createTopic("some topic", 2, (short) 3, configs);
} catch (final TopicAuthorizationException e) {
// skip
}

// Then:
verify(delegate, times(0))
.createTopic("some topic", 2, (short) 3, configs);
}

@Test
public void shouldThrowOnCreateIfTopicPreviouslyCreatedInScopeWithDifferentPartitionCount() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Collections;
import java.util.Map;
import java.util.Set;

import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;

/**
Expand All @@ -32,7 +34,8 @@ public class MockKafkaTopicClient implements KafkaTopicClient {
public void createTopic(final String topic,
final int numPartitions,
final short replicationFactor,
final Map<String, ?> configs) {
final Map<String, ?> configs,
final CreateTopicsOptions createOptions) {
}

@Override
Expand Down

0 comments on commit 0ea157b

Please sign in to comment.