Skip to content

Commit

Permalink
fix: update restore command topic tool to work with command topic con…
Browse files Browse the repository at this point in the history
…figs (#8802)

* fix: update restore command topic tool

* review updates

* review part 2
  • Loading branch information
lct45 committed Feb 25, 2022
1 parent 6646f00 commit 371b200
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ public class KsqlRestConfig extends AbstractConfig {

private static final String KSQL_CONFIG_PREFIX = "ksql.";

private static final String COMMAND_CONSUMER_PREFIX =
public static final String COMMAND_CONSUMER_PREFIX =
KSQL_CONFIG_PREFIX + "server.command.consumer.";
private static final String COMMAND_PRODUCER_PREFIX =
public static final String COMMAND_PRODUCER_PREFIX =
KSQL_CONFIG_PREFIX + "server.command.producer.";

public static final String ADVERTISED_LISTENER_CONFIG =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.BackupReplayFile;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import io.confluent.ksql.rest.util.KsqlInternalTopicUtils;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.KafkaTopicClientImpl;
import io.confluent.ksql.services.ServiceContextFactory;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
Expand Down Expand Up @@ -278,6 +278,9 @@ private static KafkaProducer<byte[], byte[]> transactionalProducer(
ProducerConfig.ACKS_CONFIG,
"all"
);
transactionalProperties.putAll(
serverConfig.originalsWithPrefix(KsqlRestConfig.COMMAND_CONSUMER_PREFIX)
);

return new KafkaProducer<>(
transactionalProperties,
Expand All @@ -290,8 +293,7 @@ private static KafkaProducer<byte[], byte[]> transactionalProducer(
this(
serverConfig,
ReservedInternalTopics.commandTopic(serverConfig),
ServiceContextFactory.create(serverConfig,
() -> /* no ksql client */ null).getTopicClient(),
new KafkaTopicClientImpl(() -> createAdminClient(serverConfig)),
() -> transactionalProducer(serverConfig)
);
}
Expand Down Expand Up @@ -449,4 +451,13 @@ private static void maybeCleanUpQuery(final byte[] command, final KsqlConfig ksq
private static boolean hasKey(final JSONObject jsonObject, final String key) {
return jsonObject != null && jsonObject.has(key);
}

private static Admin createAdminClient(final KsqlConfig serverConfig) {
final Map<String, Object> adminClientConfigs =
new HashMap<>(serverConfig.getKsqlAdminClientConfigProps());
adminClientConfigs.putAll(
serverConfig.originalsWithPrefix(KsqlRestConfig.COMMAND_CONSUMER_PREFIX)
);
return new DefaultKafkaClientSupplier().getAdmin(adminClientConfigs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Copyright 2022 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.api.integration;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.integration.IntegrationTestHarness;
import io.confluent.ksql.integration.Retry;
import io.confluent.ksql.rest.DefaultErrorMessages;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlWarning;
import io.confluent.ksql.rest.entity.SourceInfo;
import io.confluent.ksql.rest.entity.StreamsList;
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
import io.confluent.ksql.rest.server.BackupReplayFile;
import io.confluent.ksql.rest.server.KsqlRestConfig;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.rest.server.computation.InternalTopicSerdes;
import io.confluent.ksql.rest.server.restore.KsqlRestoreCommandTopic;
import io.confluent.ksql.test.util.KsqlTestFolder;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.ReservedInternalTopics;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static io.confluent.ksql.util.KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION;
import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX;
import static java.lang.Thread.sleep;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* Tests covering integration tests for backup/restore the command topic when the command topic and streams/tables are on different kafkas.
*/
@Category({IntegrationTest.class})
public class RestoreCommandTopicMultipleKafkasIntegrationTest {
private static final IntegrationTestHarness TEST_HARNESS = IntegrationTestHarness.build();
private static final IntegrationTestHarness INTERNAL_TEST_HARNESS = IntegrationTestHarness.build();

@ClassRule
public static final RuleChain CHAIN = RuleChain
.outerRule(Retry.of(3, ZooKeeperClientException.class, 3, TimeUnit.SECONDS))
.around(TEST_HARNESS);

@ClassRule
public static final TemporaryFolder TMP_FOLDER = KsqlTestFolder.temporaryFolder();

private static File BACKUP_LOCATION;
private static TestKsqlRestApp REST_APP;
private String commandTopic;
private Path backupFile;
private Path propertiesFile;

@BeforeClass
public static void classSetUp() throws Exception {
BACKUP_LOCATION = TMP_FOLDER.newFolder();
INTERNAL_TEST_HARNESS.getKafkaCluster().start();

REST_APP = TestKsqlRestApp
.builder(TEST_HARNESS::kafkaBootstrapServers)
.withProperty(KSQL_STREAMS_PREFIX + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
.withProperty(KSQL_METASTORE_BACKUP_LOCATION, BACKUP_LOCATION.getPath())
.withProperty(StreamsConfig.STATE_DIR_CONFIG, "/tmp/cat/")
.withProperty(KsqlRestConfig.COMMAND_CONSUMER_PREFIX + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, INTERNAL_TEST_HARNESS.kafkaBootstrapServers())
.withProperty(KsqlRestConfig.COMMAND_PRODUCER_PREFIX + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, INTERNAL_TEST_HARNESS.kafkaBootstrapServers())
.build();
}

@Before
public void setup() throws IOException {
REST_APP.start();

final KsqlConfig ksqlConfig = new KsqlConfig(REST_APP.getKsqlRestConfig().getKsqlConfigProperties());
commandTopic = ReservedInternalTopics.commandTopic(ksqlConfig);
backupFile = Files.list(BACKUP_LOCATION.toPath()).findFirst().get();
propertiesFile = TMP_FOLDER.newFile().toPath();
writeServerProperties(propertiesFile);
}

@After
public void teardown() {
REST_APP.stop();
INTERNAL_TEST_HARNESS.deleteTopics(Collections.singletonList(commandTopic));
}

@After
public void teardownClass() {
TMP_FOLDER.delete();
}

private static void writeServerProperties(final Path propertiesFile) throws IOException {
final Map<String, Object> map = REST_APP.getKsqlRestConfig().getKsqlConfigProperties();

Files.write(
propertiesFile,
map.keySet().stream()
.map((key -> key + "=" + map.get(key)))
.collect(Collectors.joining("\n"))
.getBytes(StandardCharsets.UTF_8),
StandardOpenOption.CREATE
);
}

@Test
public void shouldBackupAndRestoreCommandTopic() throws Exception {
// Given
TEST_HARNESS.ensureTopics("topic1", "topic2");
assertFalse(TEST_HARNESS.topicExists(commandTopic));

makeKsqlRequest("CREATE STREAM TOPIC1 (ID INT) "
+ "WITH (KAFKA_TOPIC='topic1', VALUE_FORMAT='JSON');");
makeKsqlRequest("CREATE STREAM TOPIC2 (ID INT) "
+ "WITH (KAFKA_TOPIC='topic2', VALUE_FORMAT='JSON');");
makeKsqlRequest("CREATE STREAM stream1 AS SELECT * FROM topic1;");
makeKsqlRequest("CREATE STREAM stream2 AS SELECT * FROM topic2;");

// When

// Delete the command topic and check the server is in degraded state
INTERNAL_TEST_HARNESS.deleteTopics(Collections.singletonList(commandTopic));
assertThatEventually("Degraded State", this::isDegradedState, is(true));

// Restore the command topic
KsqlRestoreCommandTopic.main(
new String[]{
"--yes",
"--config-file", propertiesFile.toString(),
backupFile.toString()
});

// Re-load the command topic
REST_APP.stop();
REST_APP.start();

// Then
assertFalse(TEST_HARNESS.topicExists(commandTopic));
assertTrue(INTERNAL_TEST_HARNESS.topicExists(commandTopic));
final List<String> streamsNames = showStreams();
assertThat("Should have TOPIC1", streamsNames.contains("TOPIC1"), is(true));
assertThat("Should have TOPIC2", streamsNames.contains("TOPIC2"), is(true));
assertThat("Should have STREAM1", streamsNames.contains("STREAM1"), is(true));
assertThat("Should have STREAM2", streamsNames.contains("STREAM2"), is(true));
assertThat("Server should NOT be in degraded state", isDegradedState(), is(false));

}

private boolean isDegradedState() {
// If in degraded state, then the following command will return a warning
final List<KsqlEntity> response = makeKsqlRequest(
"Show Streams;");

final List<KsqlWarning> warnings = response.get(0).getWarnings();
return warnings.size() > 0 &&
(warnings.get(0).getMessage().contains(
DefaultErrorMessages.COMMAND_RUNNER_DEGRADED_CORRUPTED_ERROR_MESSAGE) ||
warnings.get(0).getMessage().contains(
DefaultErrorMessages.COMMAND_RUNNER_DEGRADED_INCOMPATIBLE_COMMANDS_ERROR_MESSAGE));
}

private List<String> showStreams() {
return ((StreamsList)makeKsqlRequest("SHOW STREAMS;").get(0))
.getStreams().stream().map(SourceInfo::getName).collect(Collectors.toList());
}

private List<KsqlEntity> makeKsqlRequest(final String sql) {
return RestIntegrationTestUtil.makeKsqlRequest(REST_APP, sql);
}
}

0 comments on commit 371b200

Please sign in to comment.