Skip to content

Commit

Permalink
fix: allow streams topic prefixed configs (#3691)
Browse files Browse the repository at this point in the history
Fixes #817
- Adds new property file for production settings
- Changes to allow topic prefixed streams configs such as min.insync.replicas
- Unit tests added
- Verfied locally that the property file, creates topics with correct configuration
  • Loading branch information
vinothchandar committed Oct 29, 2019
1 parent 79828c7 commit 939c45a
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 5 deletions.
99 changes: 99 additions & 0 deletions config/ksql-production-server.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#
# Copyright 2019 Confluent Inc.
#
# Licensed under the Confluent Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# http://www.confluent.io/confluent-community-license
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

#------ Endpoint config -------

### HTTP ###
# The URL the KSQL server will listen on:
# The default is any IPv4 interface on the machine.
listeners=http://0.0.0.0:8088

# Use the 'listeners' line below for any IPv6 interface on the machine.
# listeners=http://[::]:8088

### HTTPS ###
# To switch KSQL over to communicating using HTTPS comment out the 'listeners' line above
# uncomment and complete the properties below.
# See: https://docs.confluent.io/current/ksql/docs/installation/server-config/security.html#configuring-ksql-cli-for-https
#
# listeners=https://0.0.0.0:8088
# ssl.keystore.location=?
# ssl.keystore.password=?
# ssl.key.password=?

#------ Logging config -------

# Automatically create the processing log topic if it does not already exist:
ksql.logging.processing.topic.auto.create=true

# Automatically create a stream within KSQL for the processing log:
ksql.logging.processing.stream.auto.create=true

# Uncomment the following if you wish the errors written to the processing log to include the
# contents of the row that caused the error.
# Note: care should be taken to restrict access to the processing topic if the data KSQL is
# processing contains sensitive information.
#ksql.logging.processing.rows.include=true

#------ External service config -------

# The set of Kafka brokers to bootstrap Kafka cluster information from:
bootstrap.servers=localhost:9092

# uncomment the below to start an embedded Connect worker
# ksql.connect.worker.config=config/connect.properties
# ksql.connect.configs.topic=ksql-connect-configs

# Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry:
#ksql.schema.registry.url=?


#------ Following configs improve performance and reliability of KSQL for critical/production setups. -------

# Also see: https://docs.confluent.io/current/ksql/docs/installation/server-config/config-reference.html#recommended-ksql-production-settings

# Set the batch expiry to Integer.MAX_VALUE to ensure that queries will not
# terminate if the underlying Kafka cluster is unavailable for a period of
# time.
ksql.streams.producer.delivery.timeout.ms=2147483647

# Set the maximum allowable time for the producer to block to
# Long.MAX_VALUE. This allows KSQL to pause processing if the underlying
# Kafka cluster is unavailable.
ksql.streams.producer.max.block.ms=9223372036854775807

# For better fault tolerance and durability, set the replication factor for the KSQL
# Server's internal topics. Note: the value 3 requires at least 3 brokers in your Kafka cluster.
ksql.internal.topic.replicas=3

# Configure underlying Kafka Streams internal topics in order to achieve better fault tolerance and
# durability, even in the face of Kafka broker failures. Highly recommended for mission critical applications.
# Note that value 3 requires at least 3 brokers in your kafka cluster
# See https://docs.confluent.io/current/streams/developer-guide/config-streams.html#recommended-configuration-parameters-for-resiliency
ksql.streams.replication.factor=3
ksql.streams.producer.acks=all
ksql.streams.topic.min.insync.replicas=2

# Set the storage directory for stateful operations like aggregations and
# joins to be at a durable location. By default, they are stored in /tmp.
# Note that the path below needs to be replaced with the actual value
ksql.streams.state.dir=/some/non-temporary-storage-path/

# Bump the number of replicas for state storage for stateful operations
# like aggregations and joins. By having two replicas (one active and one
# standby) recovery from node failures is quicker since the state doesn't
# have to be rebuilt from scratch. This configuration is also essential for
# pull queries to be highly available during node failures
ksql.streams.num.standby.replicas=1
14 changes: 10 additions & 4 deletions docs/installation/server-config/config-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -554,17 +554,23 @@ When deploying KSQL to production, the following settings are recommended in you
# Server's internal topics. Note: the value 3 requires at least 3 brokers in your Kafka cluster.
ksql.internal.topic.replicas=3

# For better fault tolerance and durability, set the replication factor for
# the internal topics that Kafka Streams creates for some queries.
# Note: the value 3 requires at least 3 brokers in your Kafka cluster.
# Configure underlying Kafka Streams internal topics in order to achieve better fault tolerance and
# durability, even in the face of Kafka broker failures. Highly recommended for mission critical applications.
# Note that value 3 requires at least 3 brokers in your kafka cluster.
ksql.streams.replication.factor=3
ksql.streams.producer.acks=all
ksql.streams.topic.min.insync.replicas=2

# Set the storage directory for stateful operations like aggregations and
# joins to be at a durable location. By default, they are stored in /tmp.
# Note that the path below needs to be replaced with the actual value
ksql.streams.state.dir=/some/non-temporary-storage-path/

# Bump the number of replicas for state storage for stateful operations
# like aggregations and joins. By having two replicas (one main and one
# standby) recovery from node failures is quicker since the state doesn't
# have to be rebuilt from scratch.
# have to be rebuilt from scratch. This configuration is also essential for
# pull queries to be highly available during node failures.
ksql.streams.num.standby.replicas=1

For your convenience, a sample file is provided at ``<path-to-ksql-repo>/config/ksql-production-server.properties``
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ private static Optional<ConfigItem> resolveStreamsConfig(

if (propertyName.startsWith(KSQL_STREAMS_PREFIX)
&& !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.PRODUCER_PREFIX)
&& !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.CONSUMER_PREFIX)) {
&& !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.CONSUMER_PREFIX)
&& !propertyName.startsWith(KSQL_STREAMS_PREFIX + StreamsConfig.TOPIC_PREFIX)) {
return Optional.empty(); // Unknown streams config
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.streams.StreamsConfig;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
Expand Down Expand Up @@ -92,6 +93,13 @@ public void shouldResolveKsqlStreamPrefixedStreamConfig() {
is(resolvedItem(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, STREAMS_CONFIG_DEF)));
}

@Test
public void shouldReturnUnresolvedForTopicPrefixedStreamsConfig() {
final String prop = StreamsConfig.TOPIC_PREFIX + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
assertThat(resolver.resolve(
KsqlConfig.KSQL_STREAMS_PREFIX + prop, false), is(unresolvedItem(prop)));
}

@Test
public void shouldNotFindUnknownStreamsProperty() {
assertNotFound(KsqlConfig.KSQL_STREAMS_PREFIX + "you.won't.find.me...right");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -173,6 +174,29 @@ public void shouldSetStreamsConfigProducerPrefixedProperties() {
is(nullValue()));
}

@Test
public void shouldSetStreamsConfigTopicUnprefixedProperties() {
final KsqlConfig ksqlConfig = new KsqlConfig(
Collections.singletonMap(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 2));
final Object result = ksqlConfig.getKsqlStreamConfigProps().get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG);
assertThat(result, equalTo(2));
}

@Test
public void shouldSetStreamsConfigKsqlTopicPrefixedProperties() {
final KsqlConfig ksqlConfig = new KsqlConfig(
Collections.singletonMap(
KsqlConfig.KSQL_STREAMS_PREFIX + StreamsConfig.TOPIC_PREFIX + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, 2));

assertThat(ksqlConfig.getKsqlStreamConfigProps()
.get(StreamsConfig.TOPIC_PREFIX + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG),
equalTo(2));

assertThat(ksqlConfig.getKsqlStreamConfigProps()
.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG),
is(nullValue()));
}

@Test
public void shouldSetStreamsConfigKsqlProducerPrefixedProperties() {
final KsqlConfig ksqlConfig = new KsqlConfig(
Expand Down

0 comments on commit 939c45a

Please sign in to comment.