diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java index 2a7dde9b81ef..ed13b0728fab 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java @@ -102,6 +102,11 @@ public Iterable> getNewCommands(final Duration ti public List getRestoreCommands(final Duration duration) { final List restoreCommands = Lists.newArrayList(); + if (commandTopicBackup.commandTopicCorruption()) { + log.warn("Corruption detected. " + + "Use backup to restore command topic."); + return restoreCommands; + } final long endOffset = getEndOffset(); diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java index 9ed71fcc5c01..b58d1941086f 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java @@ -17,6 +17,7 @@ import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.rest.server.resources.CommandTopicCorruptionException; +import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlServerException; import io.confluent.ksql.util.Pair; @@ -54,24 +55,27 @@ public class CommandTopicBackupImpl implements CommandTopicBackup { private List> latestReplay; private int latestReplayIdx; private boolean corruptionDetected; + private KafkaTopicClient kafkaTopicClient; public CommandTopicBackupImpl( final String location, - final String topicName + final String topicName, + final KafkaTopicClient kafkaTopicClient ) { - this(location, topicName, System::currentTimeMillis); + this(location, topicName, System::currentTimeMillis, kafkaTopicClient); } @VisibleForTesting CommandTopicBackupImpl( final String location, final String topicName, - final Supplier ticker + final Supplier ticker, + final KafkaTopicClient kafkaTopicClient ) { this.backupLocation = new File(Objects.requireNonNull(location, "location")); this.topicName = Objects.requireNonNull(topicName, "topicName"); this.ticker = Objects.requireNonNull(ticker, "ticker"); - + this.kafkaTopicClient = Objects.requireNonNull(kafkaTopicClient, "kafkaTopicClient"); ensureDirectoryExists(backupLocation); } @@ -91,6 +95,11 @@ public void initialize() { latestReplayIdx = 0; corruptionDetected = false; + + if (!kafkaTopicClient.isTopicExists(topicName) + && latestReplay.size() > 0) { + corruptionDetected = true; + } LOG.info("Command topic will be backup on file: {}", replayFile.getPath()); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 1280f9a1d3dd..ef42782c8661 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -88,6 +88,7 @@ import io.confluent.ksql.rest.server.services.ServerInternalKsqlClient; import io.confluent.ksql.rest.server.state.ServerState; import io.confluent.ksql.rest.util.ClusterTerminator; +import io.confluent.ksql.rest.util.CommandTopicBackupUtil; import io.confluent.ksql.rest.util.ConcurrencyLimiter; import io.confluent.ksql.rest.util.KsqlInternalTopicUtils; import io.confluent.ksql.rest.util.KsqlUncaughtExceptionHandler; @@ -771,7 +772,8 @@ static KsqlRestApplication buildApplication( ksqlConfig.addConfluentMetricsContextConfigsKafka( restConfig.getCommandConsumerProperties()), ksqlConfig.addConfluentMetricsContextConfigsKafka( - restConfig.getCommandProducerProperties()) + restConfig.getCommandProducerProperties()), + serviceContext ); final InteractiveStatementExecutor statementExecutor = @@ -1028,6 +1030,15 @@ private void registerCommandTopic() { final String commandTopic = commandStore.getCommandTopicName(); + if (CommandTopicBackupUtil.commandTopicMissingWithValidBackup( + commandTopic, + serviceContext.getTopicClient(), + ksqlConfigNoPort)) { + log.warn("Command topic is not found and it is not in sync with backup. " + + "Use backup to recover the command topic."); + return; + } + KsqlInternalTopicUtils.ensureTopic( commandTopic, ksqlConfigNoPort, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java index 3fdfed1a16a6..f42510ab8af8 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java @@ -22,6 +22,8 @@ import io.confluent.ksql.rest.server.CommandTopicBackup; import io.confluent.ksql.rest.server.CommandTopicBackupImpl; import io.confluent.ksql.rest.server.CommandTopicBackupNoOp; +import io.confluent.ksql.rest.util.CommandTopicBackupUtil; +import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlServerException; @@ -77,7 +79,7 @@ public class CommandStore implements CommandQueue, Closeable { private final Serializer commandSerializer; private final Deserializer commandIdDeserializer; private final CommandTopicBackup commandTopicBackup; - + public static final class Factory { @@ -89,8 +91,8 @@ public static CommandStore create( final String commandTopicName, final Duration commandQueueCatchupTimeout, final Map kafkaConsumerProperties, - final Map kafkaProducerProperties - ) { + final Map kafkaProducerProperties, + final ServiceContext serviceContext) { kafkaConsumerProperties.put( ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT) @@ -109,10 +111,11 @@ public static CommandStore create( ); CommandTopicBackup commandTopicBackup = new CommandTopicBackupNoOp(); - if (!ksqlConfig.getString(KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION).isEmpty()) { + if (!CommandTopicBackupUtil.backupLocation(ksqlConfig).isEmpty()) { commandTopicBackup = new CommandTopicBackupImpl( - ksqlConfig.getString(KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION), - commandTopicName + CommandTopicBackupUtil.backupLocation(ksqlConfig), + commandTopicName, + serviceContext.getTopicClient() ); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/CommandTopicBackupUtil.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/CommandTopicBackupUtil.java new file mode 100644 index 000000000000..12002744b52b --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/util/CommandTopicBackupUtil.java @@ -0,0 +1,55 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.util; + +import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.util.KsqlConfig; +import java.io.File; +import java.lang.reflect.Array; +import java.util.Optional; + +public final class CommandTopicBackupUtil { + + private CommandTopicBackupUtil() { + } + + public static String backupLocation(final KsqlConfig ksqlConfig) { + return Optional.ofNullable( + ksqlConfig.getString(KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION)) + .orElse(""); + } + + public static boolean commandTopicMissingWithValidBackup( + final String commandTopic, + final KafkaTopicClient kafkaTopicClient, + final KsqlConfig ksqlConfig) { + if (kafkaTopicClient.isTopicExists(commandTopic)) { + return false; + } + + final String backupLocation = CommandTopicBackupUtil.backupLocation(ksqlConfig); + if (!backupLocation.isEmpty()) { + final File backupDir = new File(backupLocation); + if (backupDir.exists() && backupDir.isDirectory()) { + final int fileCount = Optional.ofNullable(backupDir.listFiles()) + .map(Array::getLength) + .orElse(0); + return fileCount > 0; + } + } + return false; + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java new file mode 100644 index 000000000000..cb619ebb000b --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/QuickDegradeAndRestoreCommandTopicIntegrationTest.java @@ -0,0 +1,163 @@ +package io.confluent.ksql.api.integration; + +import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually; +import static io.confluent.ksql.util.KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION; +import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import io.confluent.common.utils.IntegrationTest; +import io.confluent.ksql.integration.IntegrationTestHarness; +import io.confluent.ksql.integration.Retry; +import io.confluent.ksql.rest.DefaultErrorMessages; +import io.confluent.ksql.rest.entity.KsqlEntity; +import io.confluent.ksql.rest.entity.KsqlWarning; +import io.confluent.ksql.rest.entity.SourceInfo; +import io.confluent.ksql.rest.entity.StreamsList; +import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; +import io.confluent.ksql.rest.server.TestKsqlRestApp; +import io.confluent.ksql.rest.server.restore.KsqlRestoreCommandTopic; +import io.confluent.ksql.test.util.KsqlTestFolder; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.ReservedInternalTopics; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import kafka.zookeeper.ZooKeeperClientException; +import org.apache.kafka.streams.StreamsConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.RuleChain; +import org.junit.rules.TemporaryFolder; + +@Category({IntegrationTest.class}) +public class QuickDegradeAndRestoreCommandTopicIntegrationTest { + private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build(); + + @ClassRule + public static final RuleChain CHAIN = RuleChain + .outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS)) + .around(TEST_HARNESS); + + @ClassRule + public static final TemporaryFolder TMP_FOLDER = KsqlTestFolder.temporaryFolder(); + + private static File BACKUP_LOCATION; + private static TestKsqlRestApp REST_APP; + private String commandTopic; + private Path backupFile; + private Path propertiesFile; + + @BeforeClass + public static void classSetUp() throws IOException { + BACKUP_LOCATION = TMP_FOLDER.newFolder(); + + REST_APP = TestKsqlRestApp + .builder(TEST_HARNESS::kafkaBootstrapServers) + .withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) + .withProperty(KSQL_METASTORE_BACKUP_LOCATION, BACKUP_LOCATION.getPath()) + .build(); + } + + @Before + public void setup() throws IOException { + REST_APP.start(); + final KsqlConfig ksqlConfig = new KsqlConfig(REST_APP.getKsqlRestConfig().getKsqlConfigProperties()); + commandTopic = ReservedInternalTopics.commandTopic(ksqlConfig); + backupFile = Files.list(BACKUP_LOCATION.toPath()).findFirst().get(); + propertiesFile = TMP_FOLDER.newFile().toPath(); + writeServerProperties(propertiesFile); + } + + @After + public void teardown() { + REST_APP.stop(); + TEST_HARNESS.deleteTopics(Collections.singletonList(commandTopic)); + } + + @After + public void teardownClass() { + TMP_FOLDER.delete(); + } + + private static void writeServerProperties(final Path propertiesFile) throws IOException { + final Map map = REST_APP.getKsqlRestConfig().getKsqlConfigProperties(); + + Files.write( + propertiesFile, + map.keySet().stream() + .map((key -> key + "=" + map.get(key))) + .collect(Collectors.joining("\n")) + .getBytes(StandardCharsets.UTF_8), + StandardOpenOption.CREATE + ); + } + + @Test + public void shouldBeInDegradeModeAfterCmdTopicDeleteAndRestart() throws Exception { + // Given + TEST_HARNESS.ensureTopics("topic5"); + + makeKsqlRequest("CREATE STREAM TOPIC5 (ID INT) " + + "WITH (KAFKA_TOPIC='topic5', VALUE_FORMAT='JSON');"); + makeKsqlRequest("CREATE STREAM stream5 AS SELECT * FROM topic5;"); + + // When + // Delete the command topic and restart + TEST_HARNESS.deleteTopics(Collections.singletonList(commandTopic)); + REST_APP.stop(); + REST_APP.start(); + + // Then + assertThatEventually("Degraded State", this::isDegradedState, is(true)); + assertThat(TEST_HARNESS.topicExists(commandTopic), is(false)); + REST_APP.stop(); + KsqlRestoreCommandTopic.main( + new String[]{ + "--yes", + "--config-file", propertiesFile.toString(), + backupFile.toString() + }); + + // Re-load the command topic + REST_APP.start(); + final List streamsNames = showStreams(); + assertThat("Should have TOPIC5", streamsNames.contains("TOPIC5"), is(true)); + assertThat("Should have STREAM5", streamsNames.contains("STREAM5"), is(true)); + assertThatEventually("Degraded State", this::isDegradedState, is(false)); + } + + private boolean isDegradedState() { + // If in degraded state, then the following command will return a warning + final List response = makeKsqlRequest( + "Show Streams;"); + + final List warnings = response.get(0).getWarnings(); + return warnings.size() > 0 && + (warnings.get(0).getMessage().contains( + DefaultErrorMessages.COMMAND_RUNNER_DEGRADED_CORRUPTED_ERROR_MESSAGE) || + warnings.get(0).getMessage().contains( + DefaultErrorMessages.COMMAND_RUNNER_DEGRADED_INCOMPATIBLE_COMMANDS_ERROR_MESSAGE)); + } + + private List showStreams() { + return ((StreamsList)makeKsqlRequest("SHOW STREAMS;").get(0)) + .getStreams().stream().map(SourceInfo::getName).collect(Collectors.toList()); + } + + private List makeKsqlRequest(final String sql) { + return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java index 783c14c258d9..bb5b5df08087 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import io.confluent.ksql.rest.server.resources.CommandTopicCorruptionException; +import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.test.util.KsqlTestFolder; import io.confluent.ksql.util.KsqlServerException; import io.confluent.ksql.util.Pair; @@ -55,6 +56,9 @@ public class CommandTopicBackupImplTest { @Mock private Supplier ticker; + @Mock + private KafkaTopicClient topicClient; + @Rule public TemporaryFolder backupLocation = KsqlTestFolder.temporaryFolder(); @@ -62,8 +66,9 @@ public class CommandTopicBackupImplTest { @Before public void setup() { + when(topicClient.isTopicExists(COMMAND_TOPIC_NAME)).thenReturn(true); commandTopicBackup = new CommandTopicBackupImpl( - backupLocation.getRoot().getAbsolutePath(), COMMAND_TOPIC_NAME, ticker); + backupLocation.getRoot().getAbsolutePath(), COMMAND_TOPIC_NAME, ticker, topicClient); } private ConsumerRecord newStreamRecord(final String streamName) { @@ -100,7 +105,7 @@ public void shouldThrowWhenBackupLocationIsNotDirectory() throws IOException { // When final Exception e = assertThrows( KsqlServerException.class, - () -> new CommandTopicBackupImpl(file.getAbsolutePath(), COMMAND_TOPIC_NAME) + () -> new CommandTopicBackupImpl(file.getAbsolutePath(), COMMAND_TOPIC_NAME, topicClient) ); // Then @@ -119,7 +124,7 @@ public void shouldThrowWhenBackupLocationIsNotWritable() throws IOException { // When final Exception e = assertThrows( KsqlServerException.class, - () -> new CommandTopicBackupImpl(file.getAbsolutePath(), COMMAND_TOPIC_NAME) + () -> new CommandTopicBackupImpl(file.getAbsolutePath(), COMMAND_TOPIC_NAME, topicClient) ); // Then @@ -138,7 +143,7 @@ public void shouldThrowWhenBackupLocationIsNotReadable() throws IOException { // When final Exception e = assertThrows( KsqlServerException.class, - () -> new CommandTopicBackupImpl(dir.getAbsolutePath(), COMMAND_TOPIC_NAME) + () -> new CommandTopicBackupImpl(dir.getAbsolutePath(), COMMAND_TOPIC_NAME, topicClient) ); // Then @@ -156,7 +161,7 @@ public void shouldCreateBackupLocationWhenDoesNotExist() throws IOException { assertThat(Files.exists(dir), is(false)); // When - new CommandTopicBackupImpl(dir.toString(), COMMAND_TOPIC_NAME); + new CommandTopicBackupImpl(dir.toString(), COMMAND_TOPIC_NAME, topicClient); // Then assertThat(Files.exists(dir), is(true)); @@ -236,7 +241,7 @@ public void shouldIgnoreRecordPreviouslyReplayed() throws IOException { public void shouldNotCreateNewReplayFileIfNewRecordsDoNotMatchPreviousBackups() { // Given commandTopicBackup = new CommandTopicBackupImpl( - backupLocation.getRoot().getAbsolutePath(), COMMAND_TOPIC_NAME, ticker); + backupLocation.getRoot().getAbsolutePath(), COMMAND_TOPIC_NAME, ticker, topicClient); commandTopicBackup.initialize(); commandTopicBackup.writeRecord(command1); final BackupReplayFile previousReplayFile = commandTopicBackup.getReplayFile(); @@ -377,4 +382,21 @@ public void shouldOpenReplayFileAndIgnoreFileWithInvalidTimestamp() throws IOExc "%s/backup_command_topic_111", backupLocation.getRoot().getAbsolutePath() ))); } + + @Test + public void deletedCommandTopicMarksCommandBackupAsCorrupted() { + // Given + commandTopicBackup.initialize(); + commandTopicBackup.writeRecord(command1); + commandTopicBackup.writeRecord(command2); + + // When + // Set command topic as deleted and a second initialize call will + // detect it is out of sync and will mark command back as corrupted. + when(topicClient.isTopicExists(COMMAND_TOPIC_NAME)).thenReturn(false); + commandTopicBackup.initialize(); + + // Then: + assertThat(commandTopicBackup.commandTopicCorruption(), is(true)); + } }