From 4d7ef2a8d2d7cf1f52c0b135895fb21dd0139e19 Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Thu, 22 Aug 2019 11:13:00 -0700 Subject: [PATCH] feat: spin up a connect worker embedded inside the KSQL JVM (#3241) --- config/connect.properties | 60 ++++++++++ config/ksql-server.properties | 5 + config/log4j-rolling.properties | 3 +- config/log4j.properties | 1 + .../io/confluent/ksql/util/KsqlConfig.java | 11 ++ .../ksql/rest/server/ConnectExecutable.java | 77 ++++++++++++ .../ksql/rest/server/KsqlServerMain.java | 19 ++- .../ksql/rest/server/MultiExecutable.java | 82 +++++++++++++ .../ksql/rest/server/MultiExecutableTest.java | 110 ++++++++++++++++++ 9 files changed, 363 insertions(+), 5 deletions(-) create mode 100644 config/connect.properties create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ConnectExecutable.java create mode 100644 ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/MultiExecutable.java create mode 100644 ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/MultiExecutableTest.java diff --git a/config/connect.properties b/config/connect.properties new file mode 100644 index 000000000000..8869adbbf105 --- /dev/null +++ b/config/connect.properties @@ -0,0 +1,60 @@ +# Sample configuration for a distributed Kafka Connect worker that uses Avro serialization and +# integrates the the Schema Registry. This sample configuration assumes a local installation of +# Confluent Platform with all services running on their default ports. + +# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated. +bootstrap.servers=localhost:9092 + +# The group ID is a unique identifier for the set of workers that form a single Kafka Connect +# cluster +group.id=ksql-connect-cluster + +# The converters specify the format of data in Kafka and how to translate it into Connect data. +# Every Connect user will need to configure these based on the format they want their data in +# when loaded from or stored into Kafka +key.converter=io.confluent.connect.avro.AvroConverter +key.converter.schema.registry.url=http://localhost:8081 +value.converter=io.confluent.connect.avro.AvroConverter +value.converter.schema.registry.url=http://localhost:8081 + +# Internal Storage Topics. +# +# Kafka Connect distributed workers store the connector and task configurations, connector offsets, +# and connector statuses in three internal topics. These topics MUST be compacted. +# When the Kafka Connect distributed worker starts, it will check for these topics and attempt to create them +# as compacted topics if they don't yet exist, using the topic name, replication factor, and number of partitions +# as specified in these properties, and other topic-specific settings inherited from your brokers' +# auto-creation settings. If you need more control over these other topic-specific settings, you may want to +# manually create these topics before starting Kafka Connect distributed workers. +# +# The following properties set the names of these three internal topics for storing configs, offsets, and status. +config.storage.topic=ksql-connect-configs +offset.storage.topic=ksql-connect-offsets +status.storage.topic=ksql-connect-statuses + +# The following properties set the replication factor for the three internal topics, defaulting to 3 for each +# and therefore requiring a minimum of 3 brokers in the cluster. Since we want the examples to run with +# only a single broker, we set the replication factor here to just 1. That's okay for the examples, but +# ALWAYS use a replication factor of AT LEAST 3 for production environments to reduce the risk of +# losing connector offsets, configurations, and status. +config.storage.replication.factor=1 +offset.storage.replication.factor=1 +status.storage.replication.factor=1 + +# The config storage topic must have a single partition, and this cannot be changed via properties. +# Offsets for all connectors and tasks are written quite frequently and therefore the offset topic +# should be highly partitioned; by default it is created with 25 partitions, but adjust accordingly +# with the number of connector tasks deployed to a distributed worker cluster. Kafka Connect records +# the status less frequently, and so by default the topic is created with 5 partitions. +#offset.storage.partitions=25 +#status.storage.partitions=5 + +# The offsets, status, and configurations are written to the topics using converters specified through +# the following required properties. Most users will always want to use the JSON converter without schemas. +# Offset and config data is never visible outside of Connect in this format. +internal.key.converter=org.apache.kafka.connect.json.JsonConverter +internal.value.converter=org.apache.kafka.connect.json.JsonConverter +internal.key.converter.schemas.enable=false + +# fill this configuration in to use custom connectors +# plugin.path= diff --git a/config/ksql-server.properties b/config/ksql-server.properties index 1437b05945de..3171284e7fc7 100644 --- a/config/ksql-server.properties +++ b/config/ksql-server.properties @@ -47,7 +47,12 @@ ksql.logging.processing.stream.auto.create=true # The set of Kafka brokers to bootstrap Kafka cluster information from: bootstrap.servers=localhost:9092 + ksql.connect.polling.enable=true +# uncomment the below to start an embedded Connect worker +# ksql.connect.worker.config=config/connect.properties +# ksql.connect.configs.topic=ksql-connect-configs + # Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry: #ksql.schema.registry.url=? diff --git a/config/log4j-rolling.properties b/config/log4j-rolling.properties index d6e86a1b80fa..9bc1e0887122 100644 --- a/config/log4j-rolling.properties +++ b/config/log4j-rolling.properties @@ -44,7 +44,6 @@ log4j.appender.kafka_appender=org.apache.kafka.log4jappender.KafkaLog4jAppender log4j.appender.kafka_appender.layout=io.confluent.common.logging.log4j.StructuredJsonLayout log4j.appender.kafka_appender.BrokerList=localhost:9092 log4j.appender.kafka_appender.Topic=default_ksql_processing_log -log4j.logger.processing=ERROR, kafka_appender # loggers log4j.logger.org.apache.kafka.streams=INFO, streams @@ -64,3 +63,5 @@ log4j.additivity.org.I0Itec.zkclient=false log4j.logger.processing=ERROR, kafka_appender log4j.additivity.processing=false + +log4j.logger.org.reflections=ERROR, main diff --git a/config/log4j.properties b/config/log4j.properties index 3fb20c082e94..db227d237a25 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -29,3 +29,4 @@ log4j.additivity.org.apache.kafka.streams=false log4j.logger.org.apache.zookeeper=ERROR, stdout log4j.logger.org.apache.kafka=ERROR, stdout log4j.logger.org.I0Itec.zkclient=ERROR, stdout +log4j.logger.org.reflections=ERROR, stdout diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index 7f463e70a673..6ae8ffe563a4 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -69,6 +69,8 @@ public class KsqlConfig extends AbstractConfig { public static final String CONNECT_CONFIGS_TOPIC_PROPERTY = "ksql.connect.configs.topic"; + public static final String CONNECT_WORKER_CONFIG_FILE_PROPERTY = "ksql.connect.worker.config"; + public static final String KSQL_ENABLE_UDFS = "ksql.udfs.enabled"; public static final String KSQL_EXT_DIR = "ksql.extension.dir"; @@ -458,6 +460,15 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { DEFAULT_CONNECT_CONFIGS_TOPIC, Importance.LOW, "The name for the connect configuration topic, defaults to 'connect-configs'" + ).define( + CONNECT_WORKER_CONFIG_FILE_PROPERTY, + ConfigDef.Type.STRING, + "", + Importance.LOW, + "The path to a connect worker configuration file. An empty value for this configuration" + + "will prevent connect from starting up embedded within KSQL. For more information" + + " on configuring connect, see " + + "https://docs.confluent.io/current/connect/userguide.html#configuring-workers." ).define( KSQL_ENABLE_UDFS, ConfigDef.Type.BOOLEAN, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ConnectExecutable.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ConnectExecutable.java new file mode 100644 index 000000000000..4b240ac8705e --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ConnectExecutable.java @@ -0,0 +1,77 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import java.io.IOException; +import java.net.BindException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.cli.ConnectDistributed; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.Connect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class ConnectExecutable implements Executable { + + private static final Logger LOG = LoggerFactory.getLogger(ConnectExecutable.class); + + private final ConnectDistributed connectDistributed; + private final Map workerProps; + private Connect connect; + + public static ConnectExecutable of(final String configFile) throws IOException { + final Map workerProps = !configFile.isEmpty() + ? Utils.propsToStringMap(Utils.loadProps(configFile)) + : Collections.emptyMap(); + + return new ConnectExecutable(workerProps); + } + + private ConnectExecutable(final Map workerProps) { + this.workerProps = Objects.requireNonNull(workerProps, "workerProps"); + connectDistributed = new ConnectDistributed(); + } + + @Override + public void start() { + try { + connect = connectDistributed.startConnect(workerProps); + } catch (final ConnectException e) { + if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof BindException) { + LOG.warn("Cannot start a local connect instance because connect is running locally!", e); + } else { + throw e; + } + } + } + + @Override + public void stop() { + if (connect != null) { + connect.stop(); + } + } + + @Override + public void join() { + if (connect != null) { + connect.awaitStop(); + } + } +} diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java index 649466c53a3c..e542eae866cb 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java @@ -23,6 +23,7 @@ import io.confluent.ksql.util.KsqlServerException; import io.confluent.ksql.version.metrics.KsqlVersionCheckerAgent; import java.io.File; +import java.io.IOException; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -56,7 +57,8 @@ public static void main(final String[] args) { StreamsConfig.configDef().defaultValues().get(StreamsConfig.STATE_DIR_CONFIG)).toString(); enforceStreamStateDirAvailability(new File(streamsStateDirPath)); final Optional queriesFile = serverOptions.getQueriesFile(properties); - final Executable executable = createExecutable(properties, queriesFile, installDir); + final Executable executable = createExecutable( + properties, queriesFile, installDir, ksqlConfig); new KsqlServerMain(executable).tryStartApp(); } catch (final Exception e) { log.error("Failed to start KSQL", e); @@ -84,18 +86,27 @@ void tryStartApp() throws Exception { private static Executable createExecutable( final Map properties, final Optional queriesFile, - final String installDir - ) { + final String installDir, + final KsqlConfig ksqlConfig + ) throws IOException { if (queriesFile.isPresent()) { return StandaloneExecutorFactory.create(properties, queriesFile.get(), installDir); } final KsqlRestConfig restConfig = new KsqlRestConfig(ensureValidProps(properties)); - return KsqlRestApplication.buildApplication( + final Executable restApp = KsqlRestApplication.buildApplication( restConfig, KsqlVersionCheckerAgent::new, Integer.MAX_VALUE ); + final String connectConfigFile = + ksqlConfig.getString(KsqlConfig.CONNECT_WORKER_CONFIG_FILE_PROPERTY); + if (connectConfigFile.isEmpty()) { + return restApp; + } + + final Executable connect = ConnectExecutable.of(connectConfigFile); + return MultiExecutable.of(connect, restApp); } private static Map ensureValidProps(final Map properties) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/MultiExecutable.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/MultiExecutable.java new file mode 100644 index 000000000000..065912876174 --- /dev/null +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/MultiExecutable.java @@ -0,0 +1,82 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import java.util.Objects; + +/** + * {@code MultiExecutable} wraps multiple {@code Executable}s and ensures that when + * an action is called all internal executables will perform the action, regardless + * of whether or not previous executables succeeded. + * + *

The executables will be started, stopped and joined in the order that they + * are supplied in {@link #of(Executable...)}.

+ */ +public final class MultiExecutable implements Executable { + + private final Executable[] executables; + + public static Executable of(final Executable... executables) { + return new MultiExecutable(executables); + } + + private MultiExecutable(final Executable... executables) { + this.executables = Objects.requireNonNull(executables, "executables"); + } + + @Override + public void start() throws Exception { + doAction(Executable::start); + } + + @Override + public void stop() throws Exception { + doAction(Executable::stop); + } + + @Override + public void join() throws InterruptedException { + doAction(Executable::join); + } + + @SuppressWarnings("unchecked") + private void doAction( + final ExceptionalConsumer action + ) throws T { + + T exception = null; + for (final Executable executable : executables) { + try { + action.accept(executable); + } catch (final Exception e) { + if (exception == null) { + exception = (T) e; + } else { + exception.addSuppressed(e); + } + } + } + + if (exception != null) { + throw exception; + } + } + + @FunctionalInterface + private interface ExceptionalConsumer { + void accept(I value) throws T; + } +} diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/MultiExecutableTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/MultiExecutableTest.java new file mode 100644 index 000000000000..f382d4e51462 --- /dev/null +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/MultiExecutableTest.java @@ -0,0 +1,110 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import static org.mockito.Mockito.doThrow; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class MultiExecutableTest { + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Mock + Executable executable1; + @Mock + Executable executable2; + + private Executable multiExecutable; + + @Before + public void setup() { + multiExecutable = MultiExecutable.of(executable1, executable2); + } + + @Test + public void shouldStartAll() throws Exception { + // When: + multiExecutable.start(); + + // Then: + final InOrder inOrder = Mockito.inOrder(executable1, executable2); + inOrder.verify(executable1).start(); + inOrder.verify(executable2).start(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void shouldJoinAll() throws Exception { + // When: + multiExecutable.join(); + + // Then: + final InOrder inOrder = Mockito.inOrder(executable1, executable2); + inOrder.verify(executable1).join(); + inOrder.verify(executable2).join(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void shouldStopAll() throws Exception { + // When: + multiExecutable.stop(); + + // Then: + final InOrder inOrder = Mockito.inOrder(executable1, executable2); + inOrder.verify(executable1).stop(); + inOrder.verify(executable2).stop(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void shouldSuppressExceptions() throws Exception { + // Given: + doThrow(new RuntimeException("danger executable1!")).when(executable1).start(); + doThrow(new RuntimeException("danger executable2!")).when(executable2).start(); + + // Expect: + expectedException.expectMessage("danger executable1!"); + expectedException.expect(new BaseMatcher() { + @Override + public void describeTo(final Description description) { + } + + @Override + public boolean matches(final Object o) { + return o instanceof Exception + && ((Exception) o).getSuppressed()[0].getMessage().equals("danger executable2!"); + } + }); + + // When: + multiExecutable.start(); + } + +} \ No newline at end of file