diff --git a/config/ksql-production-server.properties b/config/ksql-production-server.properties new file mode 100644 index 000000000000..8436544dd0dd --- /dev/null +++ b/config/ksql-production-server.properties @@ -0,0 +1,99 @@ +# +# Copyright 2019 Confluent Inc. +# +# Licensed under the Confluent Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# http://www.confluent.io/confluent-community-license +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +#------ Endpoint config ------- + +### HTTP ### +# The URL the KSQL server will listen on: +# The default is any IPv4 interface on the machine. +listeners=http://0.0.0.0:8088 + +# Use the 'listeners' line below for any IPv6 interface on the machine. +# listeners=http://[::]:8088 + +### HTTPS ### +# To switch KSQL over to communicating using HTTPS comment out the 'listeners' line above +# uncomment and complete the properties below. +# See: https://docs.confluent.io/current/ksql/docs/installation/server-config/security.html#configuring-ksql-cli-for-https +# +# listeners=https://0.0.0.0:8088 +# ssl.keystore.location=? +# ssl.keystore.password=? +# ssl.key.password=? + +#------ Logging config ------- + +# Automatically create the processing log topic if it does not already exist: +ksql.logging.processing.topic.auto.create=true + +# Automatically create a stream within KSQL for the processing log: +ksql.logging.processing.stream.auto.create=true + +# Uncomment the following if you wish the errors written to the processing log to include the +# contents of the row that caused the error. +# Note: care should be taken to restrict access to the processing topic if the data KSQL is +# processing contains sensitive information. +#ksql.logging.processing.rows.include=true + +#------ External service config ------- + +# The set of Kafka brokers to bootstrap Kafka cluster information from: +bootstrap.servers=localhost:9092 + +# uncomment the below to start an embedded Connect worker +# ksql.connect.worker.config=config/connect.properties +# ksql.connect.configs.topic=ksql-connect-configs + +# Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry: +#ksql.schema.registry.url=? + + +#------ Following configs improve performance and reliability of KSQL for critical/production setups. ------- + +# Also see: https://docs.confluent.io/current/ksql/docs/installation/server-config/config-reference.html#recommended-ksql-production-settings + +# Set the batch expiry to Integer.MAX_VALUE to ensure that queries will not +# terminate if the underlying Kafka cluster is unavailable for a period of +# time. +ksql.streams.producer.delivery.timeout.ms=2147483647 + +# Set the maximum allowable time for the producer to block to +# Long.MAX_VALUE. This allows KSQL to pause processing if the underlying +# Kafka cluster is unavailable. +ksql.streams.producer.max.block.ms=9223372036854775807 + +# For better fault tolerance and durability, set the replication factor for the KSQL +# Server's internal topics. Note: the value 3 requires at least 3 brokers in your Kafka cluster. +ksql.internal.topic.replicas=3 + +# Configure underlying Kafka Streams internal topics in order to achieve better fault tolerance and +# durability, even in the face of Kafka broker failures. Highly recommended for mission critical applications. +# Note that value 3 requires at least 3 brokers in your kafka cluster +# See https://docs.confluent.io/current/streams/developer-guide/config-streams.html#recommended-configuration-parameters-for-resiliency +ksql.streams.replication.factor=3 +ksql.streams.producer.acks=all +ksql.streams.topic.min.insync.replicas=2 + +# Set the storage directory for stateful operations like aggregations and +# joins to be at a durable location. By default, they are stored in /tmp. +# Note that the path below needs to be replaced with the actual value +ksql.streams.state.dir=/some/non-temporary-storage-path/ + +# Bump the number of replicas for state storage for stateful operations +# like aggregations and joins. By having two replicas (one active and one +# standby) recovery from node failures is quicker since the state doesn't +# have to be rebuilt from scratch. This configuration is also essential for +# pull queries to be highly available during node failures +ksql.streams.num.standby.replicas=1 diff --git a/docs/installation/server-config/config-reference.rst b/docs/installation/server-config/config-reference.rst index e0e5874a15c1..85f2a4a80deb 100644 --- a/docs/installation/server-config/config-reference.rst +++ b/docs/installation/server-config/config-reference.rst @@ -554,17 +554,23 @@ When deploying KSQL to production, the following settings are recommended in you # Server's internal topics. Note: the value 3 requires at least 3 brokers in your Kafka cluster. ksql.internal.topic.replicas=3 - # For better fault tolerance and durability, set the replication factor for - # the internal topics that Kafka Streams creates for some queries. - # Note: the value 3 requires at least 3 brokers in your Kafka cluster. + # Configure underlying Kafka Streams internal topics in order to achieve better fault tolerance and + # durability, even in the face of Kafka broker failures. Highly recommended for mission critical applications. + # Note that value 3 requires at least 3 brokers in your kafka cluster. ksql.streams.replication.factor=3 + ksql.streams.producer.acks=all + ksql.streams.topic.min.insync.replicas=2 # Set the storage directory for stateful operations like aggregations and # joins to be at a durable location. By default, they are stored in /tmp. + # Note that the path below needs to be replaced with the actual value ksql.streams.state.dir=/some/non-temporary-storage-path/ # Bump the number of replicas for state storage for stateful operations # like aggregations and joins. By having two replicas (one main and one # standby) recovery from node failures is quicker since the state doesn't - # have to be rebuilt from scratch. + # have to be rebuilt from scratch. This configuration is also essential for + # pull queries to be highly available during node failures. ksql.streams.num.standby.replicas=1 + +For your convenience, a sample file is provided at ``/config/ksql-production-server.properties`` diff --git a/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java b/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java index 8beb5b88dcad..cde64ac00e2f 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java +++ b/ksql-common/src/main/java/io/confluent/ksql/config/KsqlConfigResolver.java @@ -77,7 +77,8 @@ private static Optional resolveStreamsConfig( if (propertyName.startsWith(KSQL_STREAMS_PREFIX) && !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.PRODUCER_PREFIX) - && !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.CONSUMER_PREFIX)) { + && !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.CONSUMER_PREFIX) + && !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.TOPIC_PREFIX)) { return Optional.empty(); // Unknown streams config } diff --git a/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java b/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java index 0b397a7f52dc..8ca01d892a48 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/config/KsqlConfigResolverTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.ConfigKey; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.streams.StreamsConfig; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -92,6 +93,13 @@ public void shouldResolveKsqlStreamPrefixedStreamConfig() { is(resolvedItem(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, STREAMS_CONFIG_DEF))); } + @Test + public void shouldReturnUnresolvedForTopicPrefixedStreamsConfig() { + final String prop = StreamsConfig.TOPIC_PREFIX + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG; + assertThat(resolver.resolve( + KsqlConfig.KSQL_STREAMS_PREFIX + prop, false), is(unresolvedItem(prop))); + } + @Test public void shouldNotFindUnknownStreamsProperty() { assertNotFound(KsqlConfig.KSQL_STREAMS_PREFIX + "you.won't.find.me...right"); diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java index 40cbbc00b27b..6a5cb772d539 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/util/KsqlConfigTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.streams.StreamsConfig; import org.junit.Rule; import org.junit.Test; @@ -173,6 +174,29 @@ public void shouldSetStreamsConfigProducerPrefixedProperties() { is(nullValue())); } + @Test + public void shouldSetStreamsConfigTopicUnprefixedProperties() { + final KsqlConfig ksqlConfig = new KsqlConfig( + Collections.singletonMap(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 2)); + final Object result = ksqlConfig.getKsqlStreamConfigProps().get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG); + assertThat(result, equalTo(2)); + } + + @Test + public void shouldSetStreamsConfigKsqlTopicPrefixedProperties() { + final KsqlConfig ksqlConfig = new KsqlConfig( + Collections.singletonMap( + KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.TOPIC_PREFIX + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 2)); + + assertThat(ksqlConfig.getKsqlStreamConfigProps() + .get(StreamsConfig.TOPIC_PREFIX + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), + equalTo(2)); + + assertThat(ksqlConfig.getKsqlStreamConfigProps() + .get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), + is(nullValue())); + } + @Test public void shouldSetStreamsConfigKsqlProducerPrefixedProperties() { final KsqlConfig ksqlConfig = new KsqlConfig(