Skip to content

Commit

Permalink
fix: During ksql startup, avoid recreating deleted command topic when…
Browse files Browse the repository at this point in the history
… a valid backup exists #7753 (#8257)

* fix: Avoid recreating command topic when it has been deleted and a valid backup exists.

* fix Integration test avoidRecreatingCommandTopicWithActiveBackup

* Added a new unit test in CommandTopicBackupImplTest and fixed checkstyle errors.

* Fixed unit test failure due to a NullPointerException. Also, improved error messages.

* Fix integration test failure. Also, refactor integration test code changes into its own test file: QuickDegradeAndRestoreCommandTopicIntegrationTest

* Addressed Steven's review comment. Also, got rid of an unnecessary import in RestoreCommandTopicIntegrationTest

* Addressed Steven's review comment by swapping check for degraded state and topic exists in integration test.
  • Loading branch information
mkandaswamy committed Oct 20, 2021
1 parent 0b4b6d5 commit f3f1d5c
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public Iterable<ConsumerRecord<byte[], byte[]>> getNewCommands(final Duration ti

public List<QueuedCommand> getRestoreCommands(final Duration duration) {
final List<QueuedCommand> restoreCommands = Lists.newArrayList();
if (commandTopicBackup.commandTopicCorruption()) {
log.warn("Corruption detected. "
+ "Use backup to restore command topic.");
return restoreCommands;
}

final long endOffset = getEndOffset();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.rest.server.resources.CommandTopicCorruptionException;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.Pair;
Expand Down Expand Up @@ -54,24 +55,27 @@ public class CommandTopicBackupImpl implements CommandTopicBackup {
private List<Pair<byte[], byte[]>> latestReplay;
private int latestReplayIdx;
private boolean corruptionDetected;
private KafkaTopicClient kafkaTopicClient;

public CommandTopicBackupImpl(
final String location,
final String topicName
final String topicName,
final KafkaTopicClient kafkaTopicClient
) {
this(location, topicName, System::currentTimeMillis);
this(location, topicName, System::currentTimeMillis, kafkaTopicClient);
}

@VisibleForTesting
CommandTopicBackupImpl(
final String location,
final String topicName,
final Supplier<Long> ticker
final Supplier<Long> ticker,
final KafkaTopicClient kafkaTopicClient
) {
this.backupLocation = new File(Objects.requireNonNull(location, "location"));
this.topicName = Objects.requireNonNull(topicName, "topicName");
this.ticker = Objects.requireNonNull(ticker, "ticker");

this.kafkaTopicClient = Objects.requireNonNull(kafkaTopicClient, "kafkaTopicClient");
ensureDirectoryExists(backupLocation);
}

Expand All @@ -91,6 +95,11 @@ public void initialize() {

latestReplayIdx = 0;
corruptionDetected = false;

if (!kafkaTopicClient.isTopicExists(topicName)
&& latestReplay.size() > 0) {
corruptionDetected = true;
}
LOG.info("Command topic will be backup on file: {}", replayFile.getPath());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import io.confluent.ksql.rest.server.services.ServerInternalKsqlClient;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.CommandTopicBackupUtil;
import io.confluent.ksql.rest.util.ConcurrencyLimiter;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.rest.util.KsqlUncaughtExceptionHandler;
Expand Down Expand Up @@ -771,7 +772,8 @@ static KsqlRestApplication buildApplication(
ksqlConfig.addConfluentMetricsContextConfigsKafka(
restConfig.getCommandConsumerProperties()),
ksqlConfig.addConfluentMetricsContextConfigsKafka(
restConfig.getCommandProducerProperties())
restConfig.getCommandProducerProperties()),
serviceContext
);

final InteractiveStatementExecutor statementExecutor =
Expand Down Expand Up @@ -1028,6 +1030,15 @@ private void registerCommandTopic() {

final String commandTopic = commandStore.getCommandTopicName();

if (CommandTopicBackupUtil.commandTopicMissingWithValidBackup(
commandTopic,
serviceContext.getTopicClient(),
ksqlConfigNoPort)) {
log.warn("Command topic is not found and it is not in sync with backup. "
+ "Use backup to recover the command topic.");
return;
}

KsqlInternalTopicUtils.ensureTopic(
commandTopic,
ksqlConfigNoPort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.confluent.ksql.rest.server.CommandTopicBackup;
import io.confluent.ksql.rest.server.CommandTopicBackupImpl;
import io.confluent.ksql.rest.server.CommandTopicBackupNoOp;
import io.confluent.ksql.rest.util.CommandTopicBackupUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
Expand Down Expand Up @@ -77,7 +79,7 @@ public class CommandStore implements CommandQueue, Closeable {
private final Serializer<Command> commandSerializer;
private final Deserializer<CommandId> commandIdDeserializer;
private final CommandTopicBackup commandTopicBackup;


public static final class Factory {

Expand All @@ -89,8 +91,8 @@ public static CommandStore create(
final String commandTopicName,
final Duration commandQueueCatchupTimeout,
final Map<String, Object> kafkaConsumerProperties,
final Map<String, Object> kafkaProducerProperties
) {
final Map<String, Object> kafkaProducerProperties,
final ServiceContext serviceContext) {
kafkaConsumerProperties.put(
ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)
Expand All @@ -109,10 +111,11 @@ public static CommandStore create(
);

CommandTopicBackup commandTopicBackup = new CommandTopicBackupNoOp();
if (!ksqlConfig.getString(KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION).isEmpty()) {
if (!CommandTopicBackupUtil.backupLocation(ksqlConfig).isEmpty()) {
commandTopicBackup = new CommandTopicBackupImpl(
ksqlConfig.getString(KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION),
commandTopicName
CommandTopicBackupUtil.backupLocation(ksqlConfig),
commandTopicName,
serviceContext.getTopicClient()
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.util;

import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import java.io.File;
import java.lang.reflect.Array;
import java.util.Optional;

public final class CommandTopicBackupUtil {

private CommandTopicBackupUtil() {
}

public static String backupLocation(final KsqlConfig ksqlConfig) {
return Optional.ofNullable(
ksqlConfig.getString(KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION))
.orElse("");
}

public static boolean commandTopicMissingWithValidBackup(
final String commandTopic,
final KafkaTopicClient kafkaTopicClient,
final KsqlConfig ksqlConfig) {
if (kafkaTopicClient.isTopicExists(commandTopic)) {
return false;
}

final String backupLocation = CommandTopicBackupUtil.backupLocation(ksqlConfig);
if (!backupLocation.isEmpty()) {
final File backupDir = new File(backupLocation);
if (backupDir.exists() && backupDir.isDirectory()) {
final int fileCount = Optional.ofNullable(backupDir.listFiles())
.map(Array::getLength)
.orElse(0);
return fileCount > 0;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package io.confluent.ksql.api.integration;

import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static io.confluent.ksql.util.KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION;
import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlWarning;
import io.confluent.ksql.rest.entity.SourceInfo;
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.rest.server.restore.KsqlRestoreCommandTopic;
import io.confluent.ksql.test.util.KsqlTestFolder;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.ReservedInternalTopics;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;

@Category({IntegrationTest.class})
public class QuickDegradeAndRestoreCommandTopicIntegrationTest {
private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();

@ClassRule
public static final RuleChain CHAIN = RuleChain
.outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS))
.around(TEST_HARNESS);

@ClassRule
public static final TemporaryFolder TMP_FOLDER = KsqlTestFolder.temporaryFolder();

private static File BACKUP_LOCATION;
private static TestKsqlRestApp REST_APP;
private String commandTopic;
private Path backupFile;
private Path propertiesFile;

@BeforeClass
public static void classSetUp() throws IOException {
BACKUP_LOCATION = TMP_FOLDER.newFolder();

REST_APP = TestKsqlRestApp
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KSQL_METASTORE_BACKUP_LOCATION, BACKUP_LOCATION.getPath())
.build();
}

@Before
public void setup() throws IOException {
REST_APP.start();
final KsqlConfig ksqlConfig = new KsqlConfig(REST_APP.getKsqlRestConfig().getKsqlConfigProperties());
commandTopic = ReservedInternalTopics.commandTopic(ksqlConfig);
backupFile = Files.list(BACKUP_LOCATION.toPath()).findFirst().get();
propertiesFile = TMP_FOLDER.newFile().toPath();
writeServerProperties(propertiesFile);
}

@After
public void teardown() {
REST_APP.stop();
TEST_HARNESS.deleteTopics(Collections.singletonList(commandTopic));
}

@After
public void teardownClass() {
TMP_FOLDER.delete();
}

private static void writeServerProperties(final Path propertiesFile) throws IOException {
final Map<String, Object> map = REST_APP.getKsqlRestConfig().getKsqlConfigProperties();

Files.write(
propertiesFile,
map.keySet().stream()
.map((key -> key + "=" + map.get(key)))
.collect(Collectors.joining("\n"))
.getBytes(StandardCharsets.UTF_8),
StandardOpenOption.CREATE
);
}

@Test
public void shouldBeInDegradeModeAfterCmdTopicDeleteAndRestart() throws Exception {
// Given
TEST_HARNESS.ensureTopics("topic5");

makeKsqlRequest("CREATE STREAM TOPIC5 (ID INT) "
+ "WITH (KAFKA_TOPIC='topic5', VALUE_FORMAT='JSON');");
makeKsqlRequest("CREATE STREAM stream5 AS SELECT * FROM topic5;");

// When
// Delete the command topic and restart
TEST_HARNESS.deleteTopics(Collections.singletonList(commandTopic));
REST_APP.stop();
REST_APP.start();

// Then
assertThatEventually("Degraded State", this::isDegradedState, is(true));
assertThat(TEST_HARNESS.topicExists(commandTopic), is(false));
REST_APP.stop();
KsqlRestoreCommandTopic.main(
new String[]{
"--yes",
"--config-file", propertiesFile.toString(),
backupFile.toString()
});

// Re-load the command topic
REST_APP.start();
final List<String> streamsNames = showStreams();
assertThat("Should have TOPIC5", streamsNames.contains("TOPIC5"), is(true));
assertThat("Should have STREAM5", streamsNames.contains("STREAM5"), is(true));
assertThatEventually("Degraded State", this::isDegradedState, is(false));
}

private boolean isDegradedState() {
// If in degraded state, then the following command will return a warning
final List<KsqlEntity> response = makeKsqlRequest(
"Show Streams;");

final List<KsqlWarning> warnings = response.get(0).getWarnings();
return warnings.size() > 0 &&
(warnings.get(0).getMessage().contains(
DefaultErrorMessages.COMMAND_RUNNER_DEGRADED_CORRUPTED_ERROR_MESSAGE) ||
warnings.get(0).getMessage().contains(
DefaultErrorMessages.COMMAND_RUNNER_DEGRADED_INCOMPATIBLE_COMMANDS_ERROR_MESSAGE));
}

private List<String> showStreams() {
return ((StreamsList)makeKsqlRequest("SHOW STREAMS;").get(0))
.getStreams().stream().map(SourceInfo::getName).collect(Collectors.toList());
}

private List<KsqlEntity> makeKsqlRequest(final String sql) {
return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql);
}
}
Loading

0 comments on commit f3f1d5c

Please sign in to comment.