From d2d924ae2410562830b0d21a9077165f9cf51d12 Mon Sep 17 00:00:00 2001 From: Jacob Maes Date: Wed, 26 Apr 2017 12:32:18 -0700 Subject: [PATCH 1/4] SAMZA-1214: Allow users to set a default replication.factor for intermediate topics --- .../versioned/jobs/configuration-table.html | 69 ++------ .../org/apache/samza/config/MapConfig.java | 7 +- .../samza/config/JavaStorageConfig.java | 6 +- .../apache/samza/config/JavaSystemConfig.java | 14 +- .../org/apache/samza/config/JobConfig.scala | 41 ++--- .../apache/samza/config/StorageConfig.scala | 20 +-- .../apache/samza/config/StreamConfig.scala | 15 +- .../apache/samza/config/TestStreamConfig.java | 49 ++++++ .../org/apache/samza/config/KafkaConfig.scala | 52 +++++- .../apache/samza/config/TestKafkaConfig.scala | 159 +++++++++++++++--- 10 files changed, 295 insertions(+), 137 deletions(-) diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index df59547c4e..0fc30c5877 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -720,8 +720,19 @@

Samza Configuration Reference

- systems.system-name.
samza.key.serde - + systems.system-name.
default.stream.* + + + A set of default properties for any stream associated with the system. For example, if + "systems.kafka-system.default.stream.replication.factor"=2 was configured, then every Kafka stream + created on the kafka-system will have a replication factor of 2 unless the property is explicitly + overridden at the stream scope using streams properties. + + + + + systems.system-name.
default.stream.samza.key.serde + The serde which will be used to deserialize the key of messages on input streams, and to serialize the key of messages on @@ -734,17 +745,10 @@

Samza Configuration Reference

the task and the output stream producer. - - systems.system-name.
streams.stream-name.
samza.key.serde - - This is deprecated in favor of - streams.stream-id.samza.key.serde. - - - systems.system-name.
samza.msg.serde - + systems.system-name.
default.stream.samza.msg.serde + The serde which will be used to deserialize the value of messages on input streams, and to serialize the value of messages on @@ -757,17 +761,10 @@

Samza Configuration Reference

the task and the output stream producer. - - systems.system-name.
streams.stream-name.
samza.msg.serde - - This is deprecated in favor of - streams.stream-id.samza.msg.serde. - - - systems.system-name.
samza.offset.default - upcoming + systems.system-name.
default.stream.samza.offset.default + upcoming If a container starts up without a checkpoint, this property determines where in the input stream we should start consuming. The value must be an @@ -786,13 +783,6 @@

Samza Configuration Reference

If both are defined, the stream-level definition takes precedence. - - systems.system-name.
streams.stream-name.
samza.offset.default - - This is deprecated in favor of - streams.stream-id.samza.offset.default. - - task.consumer.batch.size @@ -1352,7 +1342,7 @@

Samza Configuration Reference

This property defines a store, Samza's mechanism for efficient stateful stream processing. You can give a store any store-name except default (the store-name - default is reserved for defining default store parameters), and use that name to get a + default is reserved for defining default store parameters), and use that name to get a reference to the store in your stream task (call TaskContext.getStore() in your task's @@ -1983,29 +1973,6 @@

Samza Configuration Reference

Staging directory for storing partition description. By default (if not set by users) the value is inherited from "yarn.job.staging.directory" internally. The default value is typically good enough unless you want explicitly use a separate location. - - - - Migrating from Samza 0.9.1 to 0.10.0
- - (This section applies if you are upgrading from Samza 0.9.1 to 0.10.0 and have set - task.checkpoint.factory to anything other than - org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory) - - - - - - task.checkpoint.skip-migration - false - - When migrating from 0.9.1 to 0.10.0, the taskName-to-changelog partition mapping was moved from the checkpoint stream to the coordinator stream.
- If you are using a checkpoint manager other than kafka - (org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory), you have to - manually migrate taskName-to-changelog partition mapping to the coordinator stream.
- This can be achieved with the assistance of the checkpoint-tool.sh. - - diff --git a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java index 0c3f14a493..d72d486d1f 100644 --- a/samza-api/src/main/java/org/apache/samza/config/MapConfig.java +++ b/samza-api/src/main/java/org/apache/samza/config/MapConfig.java @@ -19,6 +19,7 @@ package org.apache.samza.config; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -41,11 +42,15 @@ public MapConfig(Map map) { } public MapConfig(List> maps) { - this.map = new HashMap(); + this.map = new HashMap<>(); for (Map m: maps) this.map.putAll(m); } + public MapConfig(Map... maps) { + this(Arrays.asList(maps)); + } + public String get(Object k) { return map.get(k); } diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java index 5db94a65b5..7fd1198aa4 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java @@ -58,7 +58,7 @@ public String getChangelogStream(String storeName) { // If this config only specifies and there is a value in job.changelog.system= - // these values will be combined into . String systemStream = get(String.format(CHANGELOG_STREAM, storeName), null); - String changelogSystem = get(CHANGELOG_SYSTEM, null); + String changelogSystem = getChangelogSystem(null); String systemStreamRes; if (systemStream != null && !systemStream.contains(".")) { @@ -85,4 +85,8 @@ public String getStorageKeySerde(String storeName) { public String getStorageMsgSerde(String storeName) { return get(String.format(MSG_SERDE, storeName), null); } + + public String getChangelogSystem(String defaultValue) { + return get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM(), defaultValue)); + } } diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java index 47d90a4a01..2d016e02fa 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java @@ -35,7 +35,8 @@ public class JavaSystemConfig extends MapConfig { private static final String SYSTEM_PREFIX = "systems."; private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory"; - private static final String SYSTEM_FACTORY = "systems.%s.samza.factory"; + private static final String SYSTEM_FACTORY_FORMAT = SYSTEM_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX; + private static final String SYSTEM_DEFAULT_STREAMS_PREFIX = SYSTEM_PREFIX + "%s" + ".default.stream."; private static final String EMPTY = ""; public JavaSystemConfig(Config config) { @@ -46,7 +47,7 @@ public String getSystemFactory(String name) { if (name == null) { return null; } - String systemFactory = String.format(SYSTEM_FACTORY, name); + String systemFactory = String.format(SYSTEM_FACTORY_FORMAT, name); return get(systemFactory, null); } @@ -99,4 +100,13 @@ public Map getSystemFactories() { return systemFactories; } + + /** + * + * @param systemName + * @return + */ + public Config getDefaultStreamProperties(String systemName) { + return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX, systemName), true); + } } diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 4e86b7c79d..60f31283c8 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -23,7 +23,6 @@ package org.apache.samza.config import java.io.File import org.apache.samza.container.grouper.stream.GroupByPartitionFactory -import org.apache.samza.system.{RegexSystemStreamPartitionMatcher, SystemStreamPartitionMatcher} import org.apache.samza.util.Logging object JobConfig { @@ -47,10 +46,10 @@ object JobConfig { val JOB_CONTAINER_COUNT = "job.container.count" val JOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size" val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode" - val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor" - val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes" val JOB_INTERMEDIATE_STREAM_PARTITIONS = "job.intermediate.stream.partitions" + val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes" + val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory" val SSP_MATCHER_CLASS = "job.systemstreampartition.matcher.class" @@ -104,8 +103,15 @@ object JobConfig { class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getName = getOption(JobConfig.JOB_NAME) - def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse( - throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution.")) + def getCoordinatorSystemName = { + val system = getCoordinatorSystemNameOrNull + if (system == null) { + throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution.") + } + system + } + + def getCoordinatorSystemNameOrNull = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse(getDefaultSystem.orNull) def getDefaultSystem = getOption(JobConfig.JOB_DEFAULT_SYSTEM) @@ -144,31 +150,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getSecurityManagerFactory = getOption(JobConfig.JOB_SECURITY_MANAGER_FACTORY) - val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes" - val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor" - - def getCoordinatorReplicationFactor = getOption(JobConfig.JOB_REPLICATION_FACTOR) match { - case Some(rplFactor) => rplFactor - case _ => - getOption(CHECKPOINT_REPLICATION_FACTOR) match { - case Some(rplFactor) => - info("%s was not found. Using %s=%s for coordinator stream" format (JobConfig.JOB_REPLICATION_FACTOR, CHECKPOINT_REPLICATION_FACTOR, rplFactor)) - rplFactor - case _ => "3" - } - } - - def getCoordinatorSegmentBytes = getOption(JobConfig.JOB_SEGMENT_BYTES) match { - case Some(segBytes) => segBytes - case _ => - getOption(CHECKPOINT_SEGMENT_BYTES) match { - case Some(segBytes) => - info("%s was not found. Using %s=%s for coordinator stream" format (JobConfig.JOB_SEGMENT_BYTES, CHECKPOINT_SEGMENT_BYTES, segBytes)) - segBytes - case _ => "26214400" - } - } - def getSSPMatcherClass = getOption(JobConfig.SSP_MATCHER_CLASS) def getSSPMatcherConfigRegex = getExcept(JobConfig.SSP_MATCHER_CONFIG_REGEX) diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala index 10b4d1d42e..8dbf7394b6 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala @@ -46,24 +46,8 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging def getStorageMsgSerde(name: String) = getOption(StorageConfig.MSG_SERDE format name) def getChangelogStream(name: String) = { - // If the config specifies 'stores..changelog' as '.' combination - it will take precedence. - // If this config only specifies and there is a value in job.changelog.system= - - // these values will be combined into . - val systemStream = getOption(CHANGELOG_STREAM format name) - val changelogSystem = getOption(CHANGELOG_SYSTEM) - val systemStreamRes = - if ( systemStream.isDefined && ! systemStream.getOrElse("").contains('.')) { - // contains only stream name - if (changelogSystem.isDefined) { - Some(changelogSystem.get + "." + systemStream.get) - } - else { - throw new SamzaException("changelog system is not defined:" + systemStream.get) - } - } else { - systemStream - } - systemStreamRes + val javaStorageConfig = new JavaStorageConfig(config) + Option(javaStorageConfig.getChangelogStream(name)) } def getChangeLogDeleteRetentionInMs(storeName: String) = { diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala index 910ae632bb..8a9b2541bd 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala @@ -81,11 +81,14 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { * Returns a list of all SystemStreams that have a serde defined from the config file. */ def getSerdeStreams(systemName: String) = { + val defaultStreamProperties = new JavaSystemConfig(config).getDefaultStreamProperties(systemName) + val hasSystemDefaultSerde = defaultStreamProperties.containsKey(StreamConfig.MSG_SERDE) || defaultStreamProperties.containsKey(StreamConfig.KEY_SERDE) + val subConf = config.subset("systems.%s.streams." format systemName, true) val legacySystemStreams = subConf .asScala .keys - .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE)) + .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE) || hasSystemDefaultSerde) .map(k => { val streamName = k.substring(0, k.length - 16 /* .samza.XXX.serde length */ ) new SystemStream(systemName, streamName) @@ -94,7 +97,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { val systemStreams = subset(StreamConfig.STREAMS_PREFIX) .asScala .keys - .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE)) + .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE) || hasSystemDefaultSerde) .map(k => k.substring(0, k.length - 16 /* .samza.XXX.serde length */ )) .filter(streamId => systemName.equals(getSystem(streamId))) .map(streamId => streamIdToSystemStream(streamId)).toSet @@ -220,10 +223,14 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { * @return the map of properties for the stream */ private def getSystemStreamProperties(systemName: String, streamName: String) = { - if (systemName == null || streamName == null) { + if (systemName == null) { Map() } - config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true) + val jsc = new JavaSystemConfig(config); + + val defaults = jsc.getDefaultStreamProperties(systemName); + val explicitConfigs = config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true) + new MapConfig(defaults, explicitConfigs) } /** diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java index 639be3d20e..4580cc4f06 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java @@ -41,6 +41,13 @@ public class TestStreamConfig { private static final String STREAM2_STREAM_ID = "streamId2"; private static final SystemStream SYSTEM_STREAM_2 = new SystemStream(STREAM2_SYSTEM, STREAM2_PHYSICAL_NAME); + private static final String STREAM3_SYSTEM = "Sys3"; + private static final String STREAM3_PHYSICAL_NAME = "Str3"; + private static final String STREAM3_STREAM_ID = "streamId3"; + private static final SystemStream SYSTEM_STREAM_3 = new SystemStream(STREAM3_SYSTEM, STREAM3_PHYSICAL_NAME); + + private static final String SYSTEM_DEFAULT_STREAM_PATTERN = "systems.%s.default.stream."; + @Test(expected = IllegalArgumentException.class) public void testGetSamzaPropertyThrowsIfInvalidPropertyName() { @@ -185,6 +192,44 @@ public void testGetSerdeStreamsFromSystemStreamAndStreamId() { assertEquals("value2", config.getStreamMsgSerde(SYSTEM_STREAM_2).get()); } + @Test + public void testStreamPropertyDefaults() { + final String nonSamzaProperty = "replication.factor"; + StreamConfig config = buildConfig( + buildSystemDefaultProp(STREAM1_SYSTEM, nonSamzaProperty), "1", + buildSystemDefaultProp(STREAM1_SYSTEM, StreamConfig.KEY_SERDE()), "value1", + buildSystemDefaultProp(STREAM1_SYSTEM, StreamConfig.CONSUMER_OFFSET_DEFAULT()), "newest", + buildProp(SYSTEM_STREAM_1, "dummyStreamProperty"), "dummyValue", + buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM, + buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME, + buildSystemDefaultProp(STREAM2_SYSTEM, nonSamzaProperty), "2", + buildProp(STREAM2_STREAM_ID, StreamConfig.SYSTEM()), STREAM2_SYSTEM, + buildProp(STREAM2_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM2_PHYSICAL_NAME, + buildProp(STREAM2_STREAM_ID, nonSamzaProperty), "3", + buildSystemDefaultProp(STREAM3_SYSTEM, nonSamzaProperty), "4", + buildProp(STREAM3_STREAM_ID, StreamConfig.SYSTEM()), STREAM3_SYSTEM, + buildProp(STREAM3_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM3_PHYSICAL_NAME, + buildProp(SYSTEM_STREAM_3, nonSamzaProperty), "5", + "key3", "value3"); + + + + // Ensure that we can set legacy system properties via the new system wide default + assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get()); + assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size()); + assertEquals("newest", config.getDefaultStreamOffset(SYSTEM_STREAM_1).get()); + + // Property set via systems.x.default.stream.* only + assertEquals("1", config.getStreamProperties(STREAM1_STREAM_ID).get(nonSamzaProperty)); + + // Property set via systems.x.default.stream.* and streams.y.* + assertEquals("3", config.getStreamProperties(STREAM2_STREAM_ID).get(nonSamzaProperty)); + + // Property set via systems.x.default.stream.* and system.x.streams.z.* + assertEquals("5", config.getStreamProperties(STREAM3_STREAM_ID).get(nonSamzaProperty)); + } + + private StreamConfig buildConfig(String... kvs) { if (kvs.length % 2 != 0) { throw new IllegalArgumentException("There must be parity between the keys and values"); @@ -205,6 +250,10 @@ private String buildProp(SystemStream systemStream, String suffix) { return String.format(SYSTEM_STREAM_PATTERN, systemStream.getSystem(), systemStream.getStream()) + suffix; } + private String buildSystemDefaultProp(String system, String suffix) { + return String.format(SYSTEM_DEFAULT_STREAM_PATTERN, system) + suffix; + } + private Config addConfigs(Config original, String... kvs) { Map result = new HashMap<>(); result.putAll(original); diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index a8c1f3a096..93d2f73f11 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -47,9 +47,11 @@ object KafkaConfig { val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system" val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config" + val SEGMENT_BYTES = "segment.bytes" + val CHECKPOINT_SYSTEM = "task.checkpoint.system" val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint." + TOPIC_REPLICATION_FACTOR - val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes" + val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint." + SEGMENT_BYTES val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog." + TOPIC_REPLICATION_FACTOR val DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR = CHANGELOG_STREAM_REPLICATION_FACTOR format "default" @@ -60,6 +62,9 @@ object KafkaConfig { // Helper regular expression definitions to extract/match configurations val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$" + val JOB_COORDINATOR_REPLICATION_FACTOR = "job.coordinator.replication.factor" + val JOB_COORDINATOR_SEGMENT_BYTES = "job.coordinator." + SEGMENT_BYTES + /** * Defines how low a queue can get for a single system/stream/partition * combination before trying to fetch more messages for it. @@ -82,11 +87,40 @@ object KafkaConfig { class KafkaConfig(config: Config) extends ScalaMapConfig(config) { // checkpoints - def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM) + def getCheckpointSystem = Option(getOrElse(KafkaConfig.CHECKPOINT_SYSTEM, new JobConfig(config).getDefaultSystem.orNull)) + + def getCheckpointReplicationFactor() = { + val defaultReplicationFactor: String = getSystemDefaultReplicationFactor(getCheckpointSystem.orNull, null) + val replicationFactor = getOrDefault(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR, defaultReplicationFactor) + + Option(replicationFactor) + } + + private def getSystemDefaultReplicationFactor(systemName: String, defaultValue: String) = { + val defaultReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue) + defaultReplicationFactor + } - def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR) + def getCheckpointSegmentBytes() = { + val defaultsegBytes = new JavaSystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES) + getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, defaultsegBytes) + } + + def getCoordinatorReplicationFactor = getOption(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR) match { + case Some(rplFactor) => rplFactor + case _ => + val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull + val systemReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3") + systemReplicationFactor + } - def getCheckpointSegmentBytes() = getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES) + def getCoordinatorSegmentBytes = getOption(KafkaConfig.JOB_COORDINATOR_SEGMENT_BYTES) match { + case Some(segBytes) => segBytes + case _ => + val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull + val segBytes = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400") + segBytes + } // custom consumer config def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name) @@ -133,8 +167,14 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true) - def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name).getOrElse(getDefaultChangelogStreamReplicationFactor) - def getDefaultChangelogStreamReplicationFactor = getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse("2") + def getChangelogStreamReplicationFactor(name: String) = { + getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name).getOrElse(getDefaultChangelogStreamReplicationFactor("2")) + } + + def getDefaultChangelogStreamReplicationFactor(defaultValue: String) = { + val changelogSystem = new JavaStorageConfig(config).getChangelogSystem(null) + getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse(getSystemDefaultReplicationFactor(changelogSystem, defaultValue)) + } // The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream def getKafkaChangelogEnabledStores() = { diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index 106a0d5681..a0d020d55a 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -29,20 +29,20 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.junit.Before class TestKafkaConfig { - + var props : Properties = new Properties val SYSTEM_NAME = "kafka"; val KAFKA_PRODUCER_PROPERTY_PREFIX = "systems." + SYSTEM_NAME + ".producer." val TEST_CLIENT_ID = "TestClientId" val TEST_GROUP_ID = "TestGroupId" - + @Before def setupProperties() { props = new Properties props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092") props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/") } - + @Test def testIdGeneration = { val factory = new PropertiesConfigFactory() @@ -93,7 +93,7 @@ class TestKafkaConfig { val consumerConfig1 = kafkaConfig1.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID) // shared fetch size assertEquals(512*512, consumerConfig1.fetchMessageMaxBytes) - + props.setProperty("systems." + SYSTEM_NAME + ".streams.topic1.consumer.fetch.message.max.bytes", "65536") val mapConfig2 = new MapConfig(props.asScala.asJava) val kafkaConfig2 = new KafkaConfig(mapConfig2) @@ -120,7 +120,7 @@ class TestKafkaConfig { props.setProperty("job.changelog.system", "kafka") props.setProperty("stores.test3.changelog", "otherstream") props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete") - + val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) assertEquals(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete") @@ -131,14 +131,14 @@ class TestKafkaConfig { assertEquals("mychangelog2", storeToChangelog.get("test2").getOrElse("")) assertEquals("otherstream", storeToChangelog.get("test3").getOrElse("")) } - + @Test def testDefaultValuesForProducerProperties() { val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID) val producerProperties = kafkaProducerConfig.getProducerProperties - + assertEquals(classOf[ByteArraySerializer].getCanonicalName, producerProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) assertEquals(classOf[ByteArraySerializer].getCanonicalName, producerProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) assertEquals(kafkaProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DEFAULT, producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) @@ -148,45 +148,45 @@ class TestKafkaConfig { @Test def testMaxInFlightRequestsPerConnectionOverride() { val expectedValue = "200"; - + props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, expectedValue); - + val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID) val producerProperties = kafkaProducerConfig.getProducerProperties - + assertEquals(expectedValue, producerProperties.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) } - + @Test def testRetriesOverride() { val expectedValue = "200"; - + props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, expectedValue); - + val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID) val producerProperties = kafkaProducerConfig.getProducerProperties - + assertEquals(expectedValue, producerProperties.get(ProducerConfig.RETRIES_CONFIG)) } - + @Test(expected = classOf[NumberFormatException]) def testMaxInFlightRequestsPerConnectionWrongNumberFormat() { props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "Samza"); - + val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID) kafkaProducerConfig.getProducerProperties } - + @Test(expected = classOf[NumberFormatException]) def testRetriesWrongNumberFormat() { props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, "Samza"); - + val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID) @@ -195,25 +195,136 @@ class TestKafkaConfig { @Test def testChangeLogReplicationFactor() { + props.setProperty("stores.store-with-override.changelog", "kafka-system.changelog-topic") props.setProperty("stores.store-with-override.changelog.replication.factor", "3") val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) - assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"), "3") - assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"), "2") - assertEquals(kafkaConfig.getDefaultChangelogStreamReplicationFactor , "2") + assertEquals("3", kafkaConfig.getChangelogStreamReplicationFactor("store-with-override")) + assertEquals("2", kafkaConfig.getChangelogStreamReplicationFactor("store-without-override")) + assertEquals("2", kafkaConfig.getDefaultChangelogStreamReplicationFactor("2")) } @Test def testChangeLogReplicationFactorWithOverriddenDefault() { + props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "kafka-system") + props.setProperty("stores.store-with-override.changelog", "changelog-topic") props.setProperty("stores.store-with-override.changelog.replication.factor", "4") // Override the "default" default value props.setProperty("stores.default.changelog.replication.factor", "5") val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) - assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"), "4") - assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"), "5") - assertEquals(kafkaConfig.getDefaultChangelogStreamReplicationFactor , "5") + assertEquals("4", kafkaConfig.getChangelogStreamReplicationFactor("store-with-override")) + assertEquals("5", kafkaConfig.getChangelogStreamReplicationFactor("store-without-override")) + assertEquals("5", kafkaConfig.getDefaultChangelogStreamReplicationFactor("2")) + } + + @Test + def testChangeLogReplicationFactorWithSystemOverriddenDefault() { + props.setProperty(StorageConfig.CHANGELOG_SYSTEM, "kafka-system") + props.setProperty("systems.kafka-system.default.stream.replication.factor", "8") + props.setProperty("stores.store-with-override.changelog.replication.factor", "4") + + val mapConfig = new MapConfig(props.asScala.asJava) + val kafkaConfig = new KafkaConfig(mapConfig) + assertEquals("4", kafkaConfig.getChangelogStreamReplicationFactor("store-with-override")) + assertEquals("8", kafkaConfig.getChangelogStreamReplicationFactor("store-without-override")) + assertEquals("8", kafkaConfig.getDefaultChangelogStreamReplicationFactor("2")) + } + + @Test + def testCheckpointReplicationFactor() { + val emptyConfig = new KafkaConfig(new MapConfig()) + assertNull(emptyConfig.getCheckpointReplicationFactor.orNull) + assertNull(emptyConfig.getCheckpointSystem.orNull) + + props.setProperty(KafkaConfig.CHECKPOINT_SYSTEM, "kafka-system") + props.setProperty("task.checkpoint.replication.factor", "4") + + val mapConfig = new MapConfig(props.asScala.asJava) + val kafkaConfig = new KafkaConfig(mapConfig) + assertEquals("kafka-system", kafkaConfig.getCheckpointSystem.orNull) + assertEquals("4", kafkaConfig.getCheckpointReplicationFactor.orNull) + } + + @Test + def testCheckpointReplicationFactorWithSystemDefault() { + props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "kafka-system-39-not38-not40-39") + props.setProperty("systems.kafka-system-39-not38-not40-39.default.stream.replication.factor", "8") + props.setProperty("systems.kafka-system-39-not38-not40-39.default.stream.segment.bytes", "8675309") + + val mapConfig = new MapConfig(props.asScala.asJava) + val kafkaConfig = new KafkaConfig(mapConfig) + assertEquals("kafka-system-39-not38-not40-39", kafkaConfig.getCheckpointSystem.orNull) + assertEquals("8", kafkaConfig.getCheckpointReplicationFactor.orNull) + assertEquals(8675309, kafkaConfig.getCheckpointSegmentBytes) + } + + @Test + def testCheckpointReplicationFactorWithSystemOverriddenDefault() { + // Defaults + props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "kafka-system-39-not38-not40-39") + props.setProperty("systems.kafka-system-39-not38-not40-39.default.stream.replication.factor", "8") + props.setProperty("systems.kafka-system.default.stream.segment.bytes", "8675309") + + // Overrides + props.setProperty(KafkaConfig.CHECKPOINT_SYSTEM, "kafka-system") + props.setProperty(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR, "4") + + val mapConfig = new MapConfig(props.asScala.asJava) + val kafkaConfig = new KafkaConfig(mapConfig) + assertEquals("kafka-system", kafkaConfig.getCheckpointSystem.orNull) + assertEquals("4", kafkaConfig.getCheckpointReplicationFactor.orNull) + assertEquals(8675309, kafkaConfig.getCheckpointSegmentBytes) + } + + @Test + def testCoordinatorReplicationFactor() { + val emptyConfig = new KafkaConfig(new MapConfig()) + assertEquals("3", emptyConfig.getCoordinatorReplicationFactor) + assertNull(new JobConfig(new MapConfig()).getCoordinatorSystemNameOrNull) + + props.setProperty(JobConfig.JOB_COORDINATOR_SYSTEM, "kafka-system") + props.setProperty(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR, "4") + + val mapConfig = new MapConfig(props.asScala.asJava) + val kafkaConfig = new KafkaConfig(mapConfig) + val jobConfig = new JobConfig(mapConfig) + assertEquals("kafka-system", jobConfig.getCoordinatorSystemNameOrNull) + assertEquals("4", kafkaConfig.getCoordinatorReplicationFactor) + } + + @Test + def testCoordinatorReplicationFactorWithSystemDefault() { + props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "kafka-system-39-not38-not40-39") + props.setProperty("systems.kafka-system-39-not38-not40-39.default.stream.replication.factor", "8") + props.setProperty("systems.kafka-system-39-not38-not40-39.default.stream.segment.bytes", "8675309") + + val mapConfig = new MapConfig(props.asScala.asJava) + val kafkaConfig = new KafkaConfig(mapConfig) + val jobConfig = new JobConfig(mapConfig) + assertEquals("kafka-system-39-not38-not40-39", jobConfig.getCoordinatorSystemNameOrNull) + assertEquals("8", kafkaConfig.getCoordinatorReplicationFactor) + assertEquals("8675309", kafkaConfig.getCoordinatorSegmentBytes) + } + + @Test + def testCoordinatorReplicationFactorWithSystemOverriddenDefault() { + // Defaults + props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "kafka-system-39-not38-not40-39") + props.setProperty("systems.kafka-system-39-not38-not40-39.default.stream.replication.factor", "8") + props.setProperty("systems.kafka-system.default.stream.segment.bytes", "8675309") + + // Overrides + props.setProperty(JobConfig.JOB_COORDINATOR_SYSTEM, "kafka-system") + props.setProperty(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR, "4") + + val mapConfig = new MapConfig(props.asScala.asJava) + val kafkaConfig = new KafkaConfig(mapConfig) + val jobConfig = new JobConfig(mapConfig) + assertEquals("kafka-system", jobConfig.getCoordinatorSystemNameOrNull) + assertEquals("4", kafkaConfig.getCoordinatorReplicationFactor) + assertEquals("8675309", kafkaConfig.getCoordinatorSegmentBytes) } } From 7ecb4b8c725a4d42c94a4378e83ed0340a24ffd6 Mon Sep 17 00:00:00 2001 From: Jacob Maes Date: Wed, 26 Apr 2017 12:58:53 -0700 Subject: [PATCH 2/4] Some cleanup after looking at the PR --- .../java/org/apache/samza/config/JavaSystemConfig.java | 4 +--- .../main/scala/org/apache/samza/config/JobConfig.scala | 2 -- .../scala/org/apache/samza/config/KafkaConfig.scala | 10 ++++------ .../org/apache/samza/config/TestKafkaConfig.scala | 6 +++--- 4 files changed, 8 insertions(+), 14 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java index 2d016e02fa..6408438dbe 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java @@ -102,9 +102,7 @@ public Map getSystemFactories() { } /** - * - * @param systemName - * @return + * Gets the system-wide defaults for streams. */ public Config getDefaultStreamProperties(String systemName) { return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX, systemName), true); diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 60f31283c8..602fc1ebad 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -48,8 +48,6 @@ object JobConfig { val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode" val JOB_INTERMEDIATE_STREAM_PARTITIONS = "job.intermediate.stream.partitions" - val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes" - val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory" val SSP_MATCHER_CLASS = "job.systemstreampartition.matcher.class" diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 93d2f73f11..870ca03dd8 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -62,7 +62,7 @@ object KafkaConfig { // Helper regular expression definitions to extract/match configurations val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$" - val JOB_COORDINATOR_REPLICATION_FACTOR = "job.coordinator.replication.factor" + val JOB_COORDINATOR_REPLICATION_FACTOR = "job.coordinator." + TOPIC_REPLICATION_FACTOR val JOB_COORDINATOR_SEGMENT_BYTES = "job.coordinator." + SEGMENT_BYTES /** @@ -167,13 +167,11 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true) - def getChangelogStreamReplicationFactor(name: String) = { - getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name).getOrElse(getDefaultChangelogStreamReplicationFactor("2")) - } + def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name).getOrElse(getDefaultChangelogStreamReplicationFactor) - def getDefaultChangelogStreamReplicationFactor(defaultValue: String) = { + def getDefaultChangelogStreamReplicationFactor() = { val changelogSystem = new JavaStorageConfig(config).getChangelogSystem(null) - getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse(getSystemDefaultReplicationFactor(changelogSystem, defaultValue)) + getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse(getSystemDefaultReplicationFactor(changelogSystem, "2")) } // The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index a0d020d55a..81652b6ea4 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -202,7 +202,7 @@ class TestKafkaConfig { val kafkaConfig = new KafkaConfig(mapConfig) assertEquals("3", kafkaConfig.getChangelogStreamReplicationFactor("store-with-override")) assertEquals("2", kafkaConfig.getChangelogStreamReplicationFactor("store-without-override")) - assertEquals("2", kafkaConfig.getDefaultChangelogStreamReplicationFactor("2")) + assertEquals("2", kafkaConfig.getDefaultChangelogStreamReplicationFactor) } @Test @@ -217,7 +217,7 @@ class TestKafkaConfig { val kafkaConfig = new KafkaConfig(mapConfig) assertEquals("4", kafkaConfig.getChangelogStreamReplicationFactor("store-with-override")) assertEquals("5", kafkaConfig.getChangelogStreamReplicationFactor("store-without-override")) - assertEquals("5", kafkaConfig.getDefaultChangelogStreamReplicationFactor("2")) + assertEquals("5", kafkaConfig.getDefaultChangelogStreamReplicationFactor) } @Test @@ -230,7 +230,7 @@ class TestKafkaConfig { val kafkaConfig = new KafkaConfig(mapConfig) assertEquals("4", kafkaConfig.getChangelogStreamReplicationFactor("store-with-override")) assertEquals("8", kafkaConfig.getChangelogStreamReplicationFactor("store-without-override")) - assertEquals("8", kafkaConfig.getDefaultChangelogStreamReplicationFactor("2")) + assertEquals("8", kafkaConfig.getDefaultChangelogStreamReplicationFactor) } @Test From 87c68c4682b1983477b56c522642c99c42447740 Mon Sep 17 00:00:00 2001 From: Jacob Maes Date: Wed, 26 Apr 2017 13:25:47 -0700 Subject: [PATCH 3/4] Adding one more test to verify that the defaults and overrides work from the StreamSpec perspective --- .../TestAbstractApplicationRunner.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java index 8e0d5fcc31..bd95d0bc19 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java @@ -195,6 +195,26 @@ public void testStreamConfigOverrides() { assertEquals("systemValue2", properties.get("systemProperty2")); } + // Verify that we use a default specified with systems.x.default.stream.*, if specified + @Test + public void testStreamConfigOverridesWithSystemDefaults() { + Config config = addConfigs(buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM, + "segment.bytes", "5309"), + String.format("systems.%s.default.stream.replication.factor", TEST_SYSTEM), "4", // System default property + String.format("systems.%s.default.stream.segment.bytest", TEST_SYSTEM), "867" + ); + + AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config); + StreamSpec spec = env.getStreamSpec(STREAM_ID); + + Map properties = spec.getConfig(); + assertEquals(3, properties.size()); + assertEquals("4", properties.get("replication.factor")); // Uses system default + assertEquals("5309", properties.get("segment.bytes")); // Overrides system default + } + // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config @Test public void testGetStreamPhysicalNameArgSimple() { From fe7200b7adf1c6d4d1d9f8cacbb7dfb096c60cea Mon Sep 17 00:00:00 2001 From: Jacob Maes Date: Thu, 27 Apr 2017 09:46:02 -0700 Subject: [PATCH 4/4] Address review feedback --- .../samza/config/JavaStorageConfig.java | 19 +++++- .../org/apache/samza/config/JobConfig.scala | 7 +++ .../apache/samza/config/StreamConfig.scala | 5 +- .../org/apache/samza/config/KafkaConfig.scala | 58 +++++++++++++++++-- .../apache/samza/config/TestKafkaConfig.scala | 26 ++++----- 5 files changed, 92 insertions(+), 23 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java index 7fd1198aa4..a1f0ec0af6 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java @@ -58,7 +58,7 @@ public String getChangelogStream(String storeName) { // If this config only specifies and there is a value in job.changelog.system= - // these values will be combined into . String systemStream = get(String.format(CHANGELOG_STREAM, storeName), null); - String changelogSystem = getChangelogSystem(null); + String changelogSystem = getChangelogSystem(); String systemStreamRes; if (systemStream != null && !systemStream.contains(".")) { @@ -86,7 +86,20 @@ public String getStorageMsgSerde(String storeName) { return get(String.format(MSG_SERDE, storeName), null); } - public String getChangelogSystem(String defaultValue) { - return get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM(), defaultValue)); + /** + * Gets the System to use for reading/writing checkpoints. Uses the following precedence. + * + * 1. If job.changelog.system is defined, that value is used. + * 2. If job.default.system is defined, that value is used. + * 3. null + * + * Note: Changelogs can be defined using + * stores.storeName.changelog=systemName.streamName or + * stores.storeName.changelog=streamName + * + * If the former syntax is used, that system name will still be honored. For the latter syntax, this method is used. + */ + public String getChangelogSystem() { + return get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM(), null)); } } diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 602fc1ebad..030d945459 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -109,6 +109,13 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { system } + /** + * Gets the System to use for reading/writing the coordinator stream. Uses the following precedence. + * + * 1. If job.coordinator.system is defined, that value is used. + * 2. If job.default.system is defined, that value is used. + * 3. None + */ def getCoordinatorSystemNameOrNull = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse(getDefaultSystem.orNull) def getDefaultSystem = getOption(JobConfig.JOB_DEFAULT_SYSTEM) diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala index 8a9b2541bd..f125bde323 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala @@ -226,9 +226,8 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { if (systemName == null) { Map() } - val jsc = new JavaSystemConfig(config); - - val defaults = jsc.getDefaultStreamProperties(systemName); + val systemConfig = new JavaSystemConfig(config); + val defaults = systemConfig.getDefaultStreamProperties(systemName); val explicitConfigs = config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true) new MapConfig(defaults, explicitConfigs) } diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 870ca03dd8..9ac21ef134 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -86,11 +86,26 @@ object KafkaConfig { } class KafkaConfig(config: Config) extends ScalaMapConfig(config) { - // checkpoints + /** + * Gets the System to use for reading/writing checkpoints. Uses the following precedence. + * + * 1. If task.checkpoint.system is defined, that value is used. + * 2. If job.default.system is defined, that value is used. + * 3. None + */ def getCheckpointSystem = Option(getOrElse(KafkaConfig.CHECKPOINT_SYSTEM, new JobConfig(config).getDefaultSystem.orNull)) + /** + * Gets the replication factor for the checkpoint topic. Uses the following precedence. + * + * 1. If task.checkpoint.replication.factor is configured, that value is used. + * 2. If systems.checkpoint-system.default.stream.replication.factor is configured, that value is used. + * 3. None + * + * Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]] + */ def getCheckpointReplicationFactor() = { - val defaultReplicationFactor: String = getSystemDefaultReplicationFactor(getCheckpointSystem.orNull, null) + val defaultReplicationFactor: String = getSystemDefaultReplicationFactor(getCheckpointSystem.orNull, "3") val replicationFactor = getOrDefault(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR, defaultReplicationFactor) Option(replicationFactor) @@ -101,11 +116,29 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { defaultReplicationFactor } + /** + * Gets the segment bytes for the checkpoint topic. Uses the following precedence. + * + * 1. If task.checkpoint.segment.bytes is configured, that value is used. + * 2. If systems.checkpoint-system.default.stream.segment.bytes is configured, that value is used. + * 3. None + * + * Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]] + */ def getCheckpointSegmentBytes() = { val defaultsegBytes = new JavaSystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES) getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, defaultsegBytes) } + /** + * Gets the replication factor for the coordinator topic. Uses the following precedence. + * + * 1. If job.coordinator.replication.factor is configured, that value is used. + * 2. If systems.coordinator-system.default.stream.replication.factor is configured, that value is used. + * 3. 3 + * + * Note that the coordinator-system has a similar precedence. See [[JobConfig.getCoordinatorSystemName]] + */ def getCoordinatorReplicationFactor = getOption(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR) match { case Some(rplFactor) => rplFactor case _ => @@ -114,6 +147,15 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { systemReplicationFactor } + /** + * Gets the segment bytes for the coordinator topic. Uses the following precedence. + * + * 1. If job.coordinator.segment.bytes is configured, that value is used. + * 2. If systems.coordinator-system.default.stream.segment.bytes is configured, that value is used. + * 3. None + * + * Note that the coordinator-system has a similar precedence. See [[JobConfig.getCoordinatorSystemName]] + */ def getCoordinatorSegmentBytes = getOption(KafkaConfig.JOB_COORDINATOR_SEGMENT_BYTES) match { case Some(segBytes) => segBytes case _ => @@ -129,7 +171,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0 - /** * Returns a map of topic -> fetch.message.max.bytes value for all streams that * are defined with this property in the config. @@ -167,10 +208,19 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true) + /** + * Gets the replication factor for the changelog topics. Uses the following precedence. + * + * 1. If stores.myStore.changelog.replication.factor is configured, that value is used. + * 2. If systems.changelog-system.default.stream.replication.factor is configured, that value is used. + * 3. 2 + * + * Note that the changelog-system has a similar precedence. See [[JavaStorageConfig]] + */ def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name).getOrElse(getDefaultChangelogStreamReplicationFactor) def getDefaultChangelogStreamReplicationFactor() = { - val changelogSystem = new JavaStorageConfig(config).getChangelogSystem(null) + val changelogSystem = new JavaStorageConfig(config).getChangelogSystem() getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse(getSystemDefaultReplicationFactor(changelogSystem, "2")) } diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index 81652b6ea4..d4b8150d07 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -236,7 +236,7 @@ class TestKafkaConfig { @Test def testCheckpointReplicationFactor() { val emptyConfig = new KafkaConfig(new MapConfig()) - assertNull(emptyConfig.getCheckpointReplicationFactor.orNull) + assertEquals("3", emptyConfig.getCheckpointReplicationFactor.orNull) assertNull(emptyConfig.getCheckpointSystem.orNull) props.setProperty(KafkaConfig.CHECKPOINT_SYSTEM, "kafka-system") @@ -250,13 +250,13 @@ class TestKafkaConfig { @Test def testCheckpointReplicationFactorWithSystemDefault() { - props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "kafka-system-39-not38-not40-39") - props.setProperty("systems.kafka-system-39-not38-not40-39.default.stream.replication.factor", "8") - props.setProperty("systems.kafka-system-39-not38-not40-39.default.stream.segment.bytes", "8675309") + props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system") + props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8") + props.setProperty("systems.other-kafka-system.default.stream.segment.bytes", "8675309") val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) - assertEquals("kafka-system-39-not38-not40-39", kafkaConfig.getCheckpointSystem.orNull) + assertEquals("other-kafka-system", kafkaConfig.getCheckpointSystem.orNull) assertEquals("8", kafkaConfig.getCheckpointReplicationFactor.orNull) assertEquals(8675309, kafkaConfig.getCheckpointSegmentBytes) } @@ -264,8 +264,8 @@ class TestKafkaConfig { @Test def testCheckpointReplicationFactorWithSystemOverriddenDefault() { // Defaults - props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "kafka-system-39-not38-not40-39") - props.setProperty("systems.kafka-system-39-not38-not40-39.default.stream.replication.factor", "8") + props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system") + props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8") props.setProperty("systems.kafka-system.default.stream.segment.bytes", "8675309") // Overrides @@ -297,14 +297,14 @@ class TestKafkaConfig { @Test def testCoordinatorReplicationFactorWithSystemDefault() { - props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "kafka-system-39-not38-not40-39") - props.setProperty("systems.kafka-system-39-not38-not40-39.default.stream.replication.factor", "8") - props.setProperty("systems.kafka-system-39-not38-not40-39.default.stream.segment.bytes", "8675309") + props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system") + props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8") + props.setProperty("systems.other-kafka-system.default.stream.segment.bytes", "8675309") val mapConfig = new MapConfig(props.asScala.asJava) val kafkaConfig = new KafkaConfig(mapConfig) val jobConfig = new JobConfig(mapConfig) - assertEquals("kafka-system-39-not38-not40-39", jobConfig.getCoordinatorSystemNameOrNull) + assertEquals("other-kafka-system", jobConfig.getCoordinatorSystemNameOrNull) assertEquals("8", kafkaConfig.getCoordinatorReplicationFactor) assertEquals("8675309", kafkaConfig.getCoordinatorSegmentBytes) } @@ -312,8 +312,8 @@ class TestKafkaConfig { @Test def testCoordinatorReplicationFactorWithSystemOverriddenDefault() { // Defaults - props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "kafka-system-39-not38-not40-39") - props.setProperty("systems.kafka-system-39-not38-not40-39.default.stream.replication.factor", "8") + props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system") + props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8") props.setProperty("systems.kafka-system.default.stream.segment.bytes", "8675309") // Overrides