Skip to content

Commit

Permalink
fix: delete created Kafka if the corresponding command fails (#10040)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Aug 16, 2023
1 parent 8aa118d commit fc5159a
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 53 deletions.
Expand Up @@ -91,7 +91,7 @@ public KafkaTopicClientImpl(final Supplier<Admin> sharedAdminClient) {
}

@Override
public void createTopic(
public boolean createTopic(
final String topic,
final int numPartitions,
final short replicationFactor,
Expand All @@ -102,7 +102,7 @@ public void createTopic(

if (isTopicExists(topic)) {
validateTopicProperties(topic, numPartitions, replicationFactor, retentionMs);
return;
return false;
}

final short resolvedReplicationFactor = replicationFactor == TopicProperties.DEFAULT_REPLICAS
Expand All @@ -124,6 +124,8 @@ public void createTopic(
createOptions
).all().get(),
ExecutorUtil.RetryBehaviour.ON_RETRYABLE);
return true;

} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new KafkaResponseGetFailedException(
Expand All @@ -134,6 +136,7 @@ public void createTopic(
// ensure that it matches the partition count, replication factor, and retention
// before returning success
validateTopicProperties(topic, numPartitions, replicationFactor, retentionMs);
return false;

} catch (final TopicAuthorizationException e) {
throw new KsqlTopicAuthorizationException(
Expand Down
Expand Up @@ -84,15 +84,15 @@ private SandboxedKafkaTopicClient(final KafkaTopicClient delegate,
this.adminClient = Objects.requireNonNull(sharedAdminClient, "sharedAdminClient");
}

private void createTopic(
private boolean createTopic(
final String topic,
final int numPartitions,
final short replicationFactor
) {
createTopic(topic, numPartitions, replicationFactor, Collections.emptyMap());
return createTopic(topic, numPartitions, replicationFactor, Collections.emptyMap());
}

private void createTopic(
private boolean createTopic(
final String topic,
final int numPartitions,
final short replicationFactor,
Expand All @@ -101,7 +101,7 @@ private void createTopic(
if (isTopicExists(topic)) {
final Optional<Long> retentionMs = KafkaTopicClient.getRetentionMs(configs);
validateTopicProperties(topic, numPartitions, replicationFactor, retentionMs);
return;
return false;
}

final short resolvedReplicationFactor = replicationFactor == TopicProperties.DEFAULT_REPLICAS
Expand Down Expand Up @@ -131,6 +131,7 @@ private void createTopic(
));

createdTopicsConfig.put(topic, toStringConfigs(configs));
return true;
}

private short getDefaultClusterReplication() {
Expand Down Expand Up @@ -185,6 +186,7 @@ public Map<String, String> getTopicConfig(final String topicName) {

private void deleteTopics(final Collection<String> topicsToDelete) {
topicsToDelete.forEach(createdTopics::remove);
delegate.deleteTopics(topicsToDelete);
}

private void validateTopicProperties(
Expand Down
Expand Up @@ -17,12 +17,13 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.parser.tree.Statement;
import java.util.ArrayList;
import java.util.List;

/**
* Encapsulates a chain of injectors, ordered, into a single entity.
*/
public final class InjectorChain implements Injector {
public final class InjectorChain implements InjectorWithSideEffects {

private final List<Injector> injectors;

Expand All @@ -43,4 +44,34 @@ public <T extends Statement> ConfiguredStatement<T> inject(
}
return injected;
}

@Override
public <T extends Statement> ConfiguredStatementWithSideEffects<T> injectWithSideEffects(
final ConfiguredStatement<T> statement
) {
ConfiguredStatement<T> injected = statement;
final List<Object> allSideEffects = new ArrayList<>();
for (final Injector injector : injectors) {
if (injector instanceof InjectorWithSideEffects) {
final ConfiguredStatementWithSideEffects<T> wse =
((InjectorWithSideEffects) injector).injectWithSideEffects(injected);
injected = wse.getStatement();
allSideEffects.addAll(wse.getSideEffects());
} else {
injected = injector.inject(injected);
}
}
return new ConfiguredStatementWithSideEffects<>(injected, allSideEffects);
}

@Override
public <T extends Statement> void revertSideEffects(
final ConfiguredStatementWithSideEffects<T> statement
) {
for (final Injector injector : injectors) {
if (injector instanceof InjectorWithSideEffects) {
((InjectorWithSideEffects) injector).revertSideEffects(statement);
}
}
}
}
@@ -0,0 +1,99 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.statement;

import io.confluent.ksql.parser.tree.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* An {@link Injector} that might produce side effects during
* {@link #injectWithSideEffects(ConfiguredStatement) the injection}
* and revert those side effects if necessary.
*/
public interface InjectorWithSideEffects extends Injector {

<T extends Statement> ConfiguredStatementWithSideEffects<T> injectWithSideEffects(
ConfiguredStatement<T> statement
);

<T extends Statement> void revertSideEffects(ConfiguredStatementWithSideEffects<T> statement);

/**
* Container for {@link Statement} and side effects produced during injection.
* @param <T> type of statement
*/
class ConfiguredStatementWithSideEffects<T extends Statement> {
private final ConfiguredStatement<T> statement;
private final List<Object> sideEffects;

public ConfiguredStatementWithSideEffects(
final ConfiguredStatement<T> statement,
final List<Object> sideEffects
) {
this.statement = statement;
this.sideEffects = new ArrayList<>(sideEffects);
}

public ConfiguredStatement<T> getStatement() {
return statement;
}

public List<Object> getSideEffects() {
return Collections.unmodifiableList(sideEffects);
}

public static <T extends Statement> ConfiguredStatementWithSideEffects<T> withNoEffects(
final ConfiguredStatement<T> statement
) {
return new ConfiguredStatementWithSideEffects<>(statement, Collections.emptyList());
}
}

/**
* Wrap an injector into {@link InjectorWithSideEffects}.
*/
static InjectorWithSideEffects wrap(Injector injector) {
if (injector instanceof InjectorWithSideEffects) {
return (InjectorWithSideEffects) injector;
} else {
return new InjectorWithSideEffects() {

@Override
public <T extends Statement> ConfiguredStatementWithSideEffects<T> injectWithSideEffects(
final ConfiguredStatement<T> statement
) {
return ConfiguredStatementWithSideEffects.withNoEffects(inject(statement));
}

@Override
public <T extends Statement> ConfiguredStatement<T> inject(
final ConfiguredStatement<T> statement
) {
return injector.inject(statement);
}

@Override
public <T extends Statement> void revertSideEffects(
final ConfiguredStatementWithSideEffects<T> statement
) {
}
};
}
}
}

0 comments on commit fc5159a

Please sign in to comment.