diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java similarity index 50% rename from samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java rename to samza-core/src/main/java/org/apache/samza/config/StorageConfig.java index 1e3dbff42f..bc38e5675b 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java @@ -23,41 +23,48 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; import org.apache.samza.execution.StreamManager; +import org.apache.samza.util.StreamUtil; /** - * a java version of the storage config + * Config helper methods related to storage. */ -public class JavaStorageConfig extends MapConfig { - +public class StorageConfig extends MapConfig { private static final String FACTORY_SUFFIX = ".factory"; + private static final String CHANGELOG_SUFFIX = ".changelog"; private static final String STORE_PREFIX = "stores."; - private static final String FACTORY = "stores.%s.factory"; - private static final String KEY_SERDE = "stores.%s.key.serde"; - private static final String MSG_SERDE = "stores.%s.msg.serde"; - private static final String CHANGELOG_STREAM = "stores.%s.changelog"; - private static final String CHANGELOG_SYSTEM = "job.changelog.system"; - private static final String ACCESSLOG_STREAM_SUFFIX = "access-log"; - private static final String ACCESSLOG_SAMPLING_RATIO = "stores.%s.accesslog.sampling.ratio"; - private static final String ACCESSLOG_ENABLED = "stores.%s.accesslog.enabled"; - private static final int DEFAULT_ACCESSLOG_SAMPLING_RATIO = 50; - - public static final String SIDE_INPUTS = "stores.%s.side.inputs"; - public static final String SIDE_INPUTS_PROCESSOR_FACTORY = "stores.%s.side.inputs.processor.factory"; - public static final String SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE = "stores.%s.side.inputs.processor.serialized.instance"; - - public JavaStorageConfig(Config config) { + + public static final String FACTORY = STORE_PREFIX + "%s" + FACTORY_SUFFIX; + public static final String KEY_SERDE = STORE_PREFIX + "%s.key.serde"; + public static final String MSG_SERDE = STORE_PREFIX + "%s.msg.serde"; + public static final String CHANGELOG_STREAM = STORE_PREFIX + "%s" + CHANGELOG_SUFFIX; + public static final String ACCESSLOG_STREAM_SUFFIX = "access-log"; + public static final String CHANGELOG_REPLICATION_FACTOR = STORE_PREFIX + "%s.changelog.replication.factor"; + + static final String CHANGELOG_SYSTEM = "job.changelog.system"; + static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + "%s.changelog.delete.retention.ms"; + static final long DEFAULT_CHANGELOG_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1); + static final String ACCESSLOG_SAMPLING_RATIO = STORE_PREFIX + "%s.accesslog.sampling.ratio"; + static final String ACCESSLOG_ENABLED = STORE_PREFIX + "%s.accesslog.enabled"; + static final int DEFAULT_ACCESSLOG_SAMPLING_RATIO = 50; + static final String SIDE_INPUTS = STORE_PREFIX + "%s.side.inputs"; + static final String SIDE_INPUTS_PROCESSOR_FACTORY = STORE_PREFIX + "%s.side.inputs.processor.factory"; + static final String SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE = + STORE_PREFIX + "%s.side.inputs.processor.serialized.instance"; + + public StorageConfig(Config config) { super(config); } public List getStoreNames() { Config subConfig = subset(STORE_PREFIX, true); - List storeNames = new ArrayList(); + List storeNames = new ArrayList<>(); for (String key : subConfig.keySet()) { if (key.endsWith(FACTORY_SUFFIX)) { storeNames.add(key.substring(0, key.length() - FACTORY_SUFFIX.length())); @@ -66,19 +73,21 @@ public List getStoreNames() { return storeNames; } - public String getChangelogStream(String storeName) { - - // If the config specifies 'stores..changelog' as '.' combination - it will take precedence. - // If this config only specifies and there is a value in job.changelog.system= - - // these values will be combined into . + /** + * If the config specifies 'stores.<storename>.changelog' as '<system>.<stream>' combination - it will take + * precedence. + * If this config only specifies <astream> and there is a value in job.changelog.system=<asystem> - these values will + * be combined into <asystem>.<astream> + */ + public Optional getChangelogStream(String storeName) { String systemStream = StringUtils.trimToNull(get(String.format(CHANGELOG_STREAM, storeName), null)); String systemStreamRes; - if (systemStream != null && !systemStream.contains(".")) { - String changelogSystem = getChangelogSystem(); + if (systemStream != null && !systemStream.contains(".")) { + Optional changelogSystem = getChangelogSystem(); // contains only stream name - if (changelogSystem != null) { - systemStreamRes = changelogSystem + "." + systemStream; + if (changelogSystem.isPresent()) { + systemStreamRes = changelogSystem.get() + "." + systemStream; } else { throw new SamzaException("changelog system is not defined:" + systemStream); } @@ -89,7 +98,7 @@ public String getChangelogStream(String storeName) { if (systemStreamRes != null) { systemStreamRes = StreamManager.createUniqueNameForBatch(systemStreamRes, this); } - return systemStreamRes; + return Optional.ofNullable(systemStreamRes); } public boolean getAccessLogEnabled(String storeName) { @@ -104,24 +113,24 @@ public int getAccessLogSamplingRatio(String storeName) { return getInt(String.format(ACCESSLOG_SAMPLING_RATIO, storeName), DEFAULT_ACCESSLOG_SAMPLING_RATIO); } - public String getStorageFactoryClassName(String storeName) { - return get(String.format(FACTORY, storeName), null); + public Optional getStorageFactoryClassName(String storeName) { + return Optional.ofNullable(get(String.format(FACTORY, storeName))); } - public String getStorageKeySerde(String storeName) { - return get(String.format(KEY_SERDE, storeName), null); + public Optional getStorageKeySerde(String storeName) { + return Optional.ofNullable(get(String.format(KEY_SERDE, storeName))); } - public String getStorageMsgSerde(String storeName) { - return get(String.format(MSG_SERDE, storeName), null); + public Optional getStorageMsgSerde(String storeName) { + return Optional.ofNullable(get(String.format(MSG_SERDE, storeName))); } /** - * Gets the System to use for reading/writing checkpoints. Uses the following precedence. + * Gets the System to use for changelogs. Uses the following precedence. * * 1. If job.changelog.system is defined, that value is used. * 2. If job.default.system is defined, that value is used. - * 3. null + * 3. empty optional * * Note: Changelogs can be defined using * stores.storeName.changelog=systemName.streamName or @@ -131,8 +140,8 @@ public String getStorageMsgSerde(String storeName) { * * @return the name of the system to use by default for all changelogs, if defined. */ - public String getChangelogSystem() { - return get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM(), null)); + public Optional getChangelogSystem() { + return Optional.ofNullable(get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM()))); } /** @@ -158,19 +167,42 @@ public List getSideInputs(String storeName) { * Gets the SideInputsProcessorFactory associated with the {@code storeName}. * * @param storeName name of the store - * @return the class name of SideInputsProcessorFactory if present, null otherwise + * @return the class name of SideInputsProcessorFactory if present, empty optional otherwise */ - public String getSideInputsProcessorFactory(String storeName) { - return get(String.format(SIDE_INPUTS_PROCESSOR_FACTORY, storeName), null); + public Optional getSideInputsProcessorFactory(String storeName) { + return Optional.ofNullable(get(String.format(SIDE_INPUTS_PROCESSOR_FACTORY, storeName))); } /** * Gets the serialized instance of SideInputsProcessor associated with the {@code storeName}. * * @param storeName name of the store - * @return the serialized instance of SideInputsProcessor if present, null otherwise + * @return the serialized instance of SideInputsProcessor if present, empty optional otherwise + */ + public Optional getSideInputsProcessorSerializedInstance(String storeName) { + return Optional.ofNullable(get(String.format(SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, storeName))); + } + + public long getChangeLogDeleteRetentionInMs(String storeName) { + return getLong(String.format(CHANGELOG_DELETE_RETENTION_MS, storeName), DEFAULT_CHANGELOG_DELETE_RETENTION_MS); + } + + /** + * Helper method to check if a system has a changelog attached to it. + */ + public boolean isChangelogSystem(String systemName) { + return getStoreNames().stream() + .map(this::getChangelogStream) + .filter(Optional::isPresent) + .map(systemStreamName -> StreamUtil.getSystemStreamFromNames(systemStreamName.get()).getSystem()) + .anyMatch(system -> system.equals(systemName)); + } + + /** + * Helper method to check if there is any stores configured w/ a changelog */ - public String getSideInputsProcessorSerializedInstance(String storeName) { - return get(String.format(SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, storeName), null); + public boolean hasDurableStores() { + Config subConfig = subset(STORE_PREFIX, true); + return subConfig.keySet().stream().anyMatch(key -> key.endsWith(CHANGELOG_SUFFIX)); } } diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java index 92acabdec9..15a1801354 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java @@ -328,12 +328,12 @@ private void configureSerdes(Map configs, Map { - String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE(), storeName); + String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE, storeName); configs.put(keySerdeConfigKey, serdeUUIDs.get(serde)); }); storeMsgSerdes.forEach((storeName, serde) -> { - String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), storeName); + String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE, storeName); configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde)); }); diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java index 2921f3b722..6dd7fa1c17 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java @@ -138,9 +138,8 @@ public void clearStreamsFromPreviousRun(Config prevConfig) { //Find changelog streams and remove them StorageConfig storageConfig = new StorageConfig(prevConfig); - for (String store : JavaConversions.asJavaCollection(storageConfig.getStoreNames())) { - String changelog = storageConfig.getChangelogStream(store) - .getOrElse(defaultValue(null)); + for (String store : storageConfig.getStoreNames()) { + String changelog = storageConfig.getChangelogStream(store).orElse(null); if (changelog != null) { LOGGER.info("Clear store {} changelog {}", store, changelog); SystemStream systemStream = StreamUtil.getSystemStreamFromNames(changelog); diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java index 8aa2dd9edb..1aec511b4c 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StoreDescriptor.java @@ -18,7 +18,6 @@ */ package org.apache.samza.operators.spec; -import org.apache.samza.config.JavaStorageConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StorageConfig; import org.apache.samza.serializers.Serde; @@ -59,12 +58,12 @@ public Serde getMsgSerde() { return msgSerde; } - public JavaStorageConfig getStorageConfigs() { + public StorageConfig getStorageConfigs() { HashMap configs = new HashMap<>(); - configs.put(String.format(StorageConfig.FACTORY(), this.getStoreName()), this.getStoreFactory()); - configs.put(String.format(StorageConfig.CHANGELOG_STREAM(), this.getStoreName()), this.getChangelogStream()); + configs.put(String.format(StorageConfig.FACTORY, this.getStoreName()), this.getStoreFactory()); + configs.put(String.format(StorageConfig.CHANGELOG_STREAM, this.getStoreName()), this.getChangelogStream()); configs.putAll(this.getOtherProperties()); - return new JavaStorageConfig(new MapConfig(configs)); + return new StorageConfig(new MapConfig(configs)); } private String getStoreFactory() { diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java index 463585615a..e86e21a60f 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java @@ -20,13 +20,14 @@ package org.apache.samza.storage; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.Map; -import java.util.stream.Collectors; +import java.util.Optional; import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; -import org.apache.samza.config.JavaStorageConfig; +import org.apache.samza.config.StorageConfig; import org.apache.samza.config.SystemConfig; import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde; @@ -124,12 +125,15 @@ public void updatePartitionMapping(Map prevChangelogEntries, */ public static void createChangelogStreams(Config config, int maxChangeLogStreamPartitions) { // Get changelog store config - JavaStorageConfig storageConfig = new JavaStorageConfig(config); - Map storeNameSystemStreamMapping = storageConfig.getStoreNames() - .stream() - .filter(name -> StringUtils.isNotBlank(storageConfig.getChangelogStream(name))) - .collect(Collectors.toMap(name -> name, - name -> StreamUtil.getSystemStreamFromNames(storageConfig.getChangelogStream(name)))); + StorageConfig storageConfig = new StorageConfig(config); + ImmutableMap.Builder storeNameSystemStreamMapBuilder = new ImmutableMap.Builder<>(); + storageConfig.getStoreNames().forEach(storeName -> { + Optional changelogStream = storageConfig.getChangelogStream(storeName); + if (changelogStream.isPresent() && StringUtils.isNotBlank(changelogStream.get())) { + storeNameSystemStreamMapBuilder.put(storeName, StreamUtil.getSystemStreamFromNames(changelogStream.get())); + } + }); + Map storeNameSystemStreamMapping = storeNameSystemStreamMapBuilder.build(); // Get SystemAdmin for changelog store's system and attempt to create the stream SystemConfig systemConfig = new SystemConfig(config); diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index 4a9ab266aa..75b6abd052 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -23,11 +23,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; -import org.apache.samza.config.JavaStorageConfig; -import org.apache.samza.config.SystemConfig; import org.apache.samza.config.SerializerConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.config.SystemConfig; import org.apache.samza.container.SamzaContainerMetrics; import org.apache.samza.context.ContainerContext; import org.apache.samza.context.ContainerContextImpl; @@ -145,23 +146,23 @@ private void getContainerModels() { * and put them into the maps */ private void getChangeLogSystemStreamsAndStorageFactories() { - JavaStorageConfig config = new JavaStorageConfig(jobConfig); + StorageConfig config = new StorageConfig(jobConfig); List storeNames = config.getStoreNames(); LOG.info("Got store names: " + storeNames.toString()); for (String storeName : storeNames) { - String streamName = config.getChangelogStream(storeName); + Optional streamName = config.getChangelogStream(storeName); - LOG.info("stream name for " + storeName + " is " + streamName); + LOG.info("stream name for " + storeName + " is " + streamName.orElse(null)); - if (streamName != null) { - changeLogSystemStreams.put(storeName, StreamUtil.getSystemStreamFromNames(streamName)); + if (streamName.isPresent()) { + changeLogSystemStreams.put(storeName, StreamUtil.getSystemStreamFromNames(streamName.get())); } - String factoryClass = config.getStorageFactoryClassName(storeName); - if (factoryClass != null) { - storageEngineFactories.put(storeName, Util.getObj(factoryClass, StorageEngineFactory.class)); + Optional factoryClass = config.getStorageFactoryClassName(storeName); + if (factoryClass.isPresent()) { + storageEngineFactories.put(storeName, Util.getObj(factoryClass.get(), StorageEngineFactory.class)); } else { throw new SamzaException("Missing storage factory for " + storeName + "."); } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 69ef648596..aa72d1ce7a 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -32,8 +32,8 @@ import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; -import org.apache.samza.config.TaskConfigJava; import org.apache.samza.config.StorageConfig; +import org.apache.samza.config.TaskConfigJava; import org.apache.samza.config.ZkConfig; import org.apache.samza.container.TaskName; import org.apache.samza.container.grouper.task.GrouperMetadata; diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala deleted file mode 100644 index 1577e62837..0000000000 --- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.config - - -import java.util.concurrent.TimeUnit -import scala.collection.JavaConverters._ -import org.apache.samza.util.{Logging, StreamUtil} - -object StorageConfig { - // stream config constants - val FACTORY = "stores.%s.factory" - val KEY_SERDE = "stores.%s.key.serde" - val MSG_SERDE = "stores.%s.msg.serde" - val CHANGELOG_STREAM = "stores.%s.changelog" - val CHANGELOG_SYSTEM = "job.changelog.system" - val CHANGELOG_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor" - val CHANGELOG_DELETE_RETENTION_MS = "stores.%s.changelog.delete.retention.ms" - val DEFAULT_CHANGELOG_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1) - val ACCESSLOG_STREAM_SUFFIX = "access-log" - val ACCESSLOG_SAMPLING_RATIO = "stores.%s.accesslog.sampling.ratio" - val ACCESSLOG_ENABLED = "stores.%s.accesslog.enabled" - val DEFAULT_ACCESSLOG_SAMPLING_RATIO = 50 - - - implicit def Config2Storage(config: Config) = new StorageConfig(config) -} - -class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging { - import StorageConfig._ - def getStorageFactoryClassName(name: String) = getOption(FACTORY.format(name)) - def getStorageKeySerde(name: String) = getOption(StorageConfig.KEY_SERDE format name) - def getStorageMsgSerde(name: String) = getOption(StorageConfig.MSG_SERDE format name) - - def getAccessLogEnabled(storeName: String) = { - new JavaStorageConfig(config).getAccessLogEnabled(storeName) - } - - def getChangelogStream(name: String) = { - val javaStorageConfig = new JavaStorageConfig(config) - Option(javaStorageConfig.getChangelogStream(name)) - } - - //Returns the accesslog stream name given a changelog stream name - def getAccessLogStream(changeLogStream: String) = { - new JavaStorageConfig(config).getAccessLogStream(changeLogStream) - } - - def getAccessLogSamplingRatio(storeName: String) = { - new JavaStorageConfig(config).getAccessLogSamplingRatio(storeName) - } - - def getChangeLogDeleteRetentionInMs(storeName: String) = { - getLong(CHANGELOG_DELETE_RETENTION_MS format storeName, DEFAULT_CHANGELOG_DELETE_RETENTION_MS) - } - - def getStoreNames: Seq[String] = { - val conf = config.subset("stores.", true) - conf.asScala.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq - } - - def getSideInputs(storeName: String): Seq[String] = { - new JavaStorageConfig(config).getSideInputs(storeName).asScala - } - - def getSideInputsProcessorFactory(storeName: String): Option[String] = { - Option(new JavaStorageConfig(config).getSideInputsProcessorFactory(storeName)) - } - - def getSideInputsProcessorSerializedInstance(storeName: String): Option[String] = { - Option(new JavaStorageConfig(config).getSideInputsProcessorSerializedInstance(storeName)) - } - - /** - * Build a map of storeName to changeLogDeleteRetention for all of the stores. - * @return a map from storeName to the changeLogDeleteRetention of the store in ms. - */ - def getChangeLogDeleteRetentionsInMs: Map[String, Long] = { - Map(getStoreNames map {storeName => (storeName, getChangeLogDeleteRetentionInMs(storeName))} : _*) - } - - /** - * Helper method to check if a system has a changelog attached to it. - */ - def isChangelogSystem(systemName: String) = { - config - .getStoreNames - // Get changelogs for all stores in the format of "system.stream" - .map(getChangelogStream(_)) - .filter(_.isDefined) - // Convert "system.stream" to systemName - .map(systemStreamName => StreamUtil.getSystemStreamFromNames(systemStreamName.get).getSystem) - .contains(systemName) - } - - /** - * Helper method to check if there is any stores configured w/ a changelog - */ - def hasDurableStores : Boolean = { - val conf = config.subset("stores.", true) - conf.asScala.keys.exists(k => k.endsWith(".changelog")) - } -} diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 30434ce62d..1b356f6611 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -35,7 +35,6 @@ import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.MetricsConfig.Config2Metrics import org.apache.samza.config.SerializerConfig.Config2Serializer -import org.apache.samza.config.StorageConfig.Config2Storage import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config.TaskConfig.Config2Task import org.apache.samza.config._ @@ -174,8 +173,9 @@ object SamzaContainer extends Logging { .flatMap(_.getSystemStreamPartitions.asScala) .toSet - val sideInputStoresToSystemStreams = config.getStoreNames - .map { storeName => (storeName, config.getSideInputs(storeName)) } + val storageConfig = new StorageConfig(config) + val sideInputStoresToSystemStreams = storageConfig.getStoreNames.asScala + .map { storeName => (storeName, storageConfig.getSideInputs(storeName).asScala) } .filter { case (storeName, sideInputs) => sideInputs.nonEmpty } .map { case (storeName, sideInputs) => (storeName, sideInputs.map(StreamUtil.getSystemStreamFromNameOrId(config, _))) } .toMap @@ -337,10 +337,10 @@ object SamzaContainer extends Logging { debug("Got system stream message serdes: %s" format systemStreamMessageSerdes) - val changeLogSystemStreams = config - .getStoreNames - .filter(config.getChangelogStream(_).isDefined) - .map(name => (name, config.getChangelogStream(name).get)).toMap + val changeLogSystemStreams = storageConfig + .getStoreNames.asScala + .filter(storageConfig.getChangelogStream(_).isPresent) + .map(name => (name, storageConfig.getChangelogStream(name).get)).toMap .mapValues(StreamUtil.getSystemStreamFromNames(_)) info("Got change log system streams: %s" format changeLogSystemStreams) @@ -461,11 +461,11 @@ object SamzaContainer extends Logging { metrics = systemProducersMetrics, dropSerializationError = dropSerializationError) - val storageEngineFactories = config - .getStoreNames + val storageEngineFactories = storageConfig + .getStoreNames.asScala .map(storeName => { - val storageFactoryClassName = config - .getStorageFactoryClassName(storeName) + val storageFactoryClassName = + JavaOptionals.toRichOptional(storageConfig.getStorageFactoryClassName(storeName)).toOption .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName)) (storeName, Util.getObj(storageFactoryClassName, classOf[StorageEngineFactory[Object, Object]])) }).toMap diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala index b6011f4060..ccab4ff8cd 100644 --- a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala @@ -20,11 +20,11 @@ package org.apache.samza.serializers import org.apache.samza.SamzaException +import org.apache.samza.config.StorageConfig import org.apache.samza.system.ControlMessage import org.apache.samza.system.SystemStream import org.apache.samza.system.OutgoingMessageEnvelope import org.apache.samza.system.IncomingMessageEnvelope -import org.apache.samza.config.StorageConfig class SerdeManager( serdes: Map[String, Serde[Object]] = Map(), diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index 61505f43b5..0d0e0d847b 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -453,24 +453,26 @@ private StorageEngine createStore(String storeName, TaskName taskName, TaskModel this.storeDirectoryPaths.add(storeDirectory.toPath()); - if (storageConfig.getStorageKeySerde(storeName).isEmpty()) { + Optional storageKeySerde = storageConfig.getStorageKeySerde(storeName); + if (!storageKeySerde.isPresent()) { throw new SamzaException("No key serde defined for store: " + storeName); } - Serde keySerde = serdes.get(storageConfig.getStorageKeySerde(storeName).get()); + Serde keySerde = serdes.get(storageKeySerde.get()); if (keySerde == null) { throw new SamzaException( - "StorageKeySerde: No class defined for serde: " + storageConfig.getStorageKeySerde(storeName)); + "StorageKeySerde: No class defined for serde: " + storageKeySerde.get()); } - if (storageConfig.getStorageMsgSerde(storeName).isEmpty()) { + Optional storageMsgSerde = storageConfig.getStorageMsgSerde(storeName); + if (!storageMsgSerde.isPresent()) { throw new SamzaException("No msg serde defined for store: " + storeName); } - Serde messageSerde = serdes.get(storageConfig.getStorageMsgSerde(storeName).get()); + Serde messageSerde = serdes.get(storageMsgSerde.get()); if (messageSerde == null) { throw new SamzaException( - "StorageMsgSerde: No class defined for serde: " + storageConfig.getStorageMsgSerde(storeName)); + "StorageMsgSerde: No class defined for serde: " + storageMsgSerde.get()); } // if taskInstanceMetrics are specified use those for store metrics, @@ -486,17 +488,20 @@ private StorageEngine createStore(String storeName, TaskName taskName, TaskModel // Create side input store processors, one per store per task - private Map> createSideInputProcessors(StorageConfig config, ContainerModel containerModel, - Map> sideInputSystemStreams, Map taskInstanceMetrics) { + private Map> createSideInputProcessors(StorageConfig config, + ContainerModel containerModel, Map> sideInputSystemStreams, + Map taskInstanceMetrics) { Map> sideInputStoresToProcessors = new HashMap<>(); getTasks(containerModel, TaskMode.Active).forEach((taskName, taskModel) -> { sideInputStoresToProcessors.put(taskName, new HashMap<>()); for (String storeName : sideInputSystemStreams.keySet()) { - if (config.getSideInputsProcessorSerializedInstance(storeName).isDefined()) { + Optional sideInputsProcessorSerializedInstance = + config.getSideInputsProcessorSerializedInstance(storeName); + if (sideInputsProcessorSerializedInstance.isPresent()) { sideInputStoresToProcessors.get(taskName) .put(storeName, SerdeUtils.deserialize("Side Inputs Processor", - config.getSideInputsProcessorSerializedInstance(storeName).get())); + sideInputsProcessorSerializedInstance.get())); } else { sideInputStoresToProcessors.get(taskName) .put(storeName, Util.getObj(config.getSideInputsProcessorFactory(storeName).get(), @@ -512,8 +517,8 @@ private Map> createSideInputProcessor for (String storeName : sideInputSystemStreams.keySet()) { // have to use the right serde because the sideInput stores are created - Serde keySerde = serdes.get(new StorageConfig(config).getStorageKeySerde(storeName).get()); - Serde msgSerde = serdes.get(new StorageConfig(config).getStorageMsgSerde(storeName).get()); + Serde keySerde = serdes.get(config.getStorageKeySerde(storeName).get()); + Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName).get()); sideInputStoresToProcessors.get(taskName).put(storeName, new SideInputsProcessor() { @Override public Collection> process(IncomingMessageEnvelope message, KeyValueStore store) { @@ -530,7 +535,9 @@ private Map> createSideInputProcessor private Map createSideInputStorageManagers(Clock clock) { // creating side input store processors, one per store per task - Map> taskSideInputProcessors = createSideInputProcessors(new StorageConfig(config), this.containerModel, this.sideInputSystemStreams, this.taskInstanceMetrics); + Map> taskSideInputProcessors = + createSideInputProcessors(new StorageConfig(config), this.containerModel, this.sideInputSystemStreams, + this.taskInstanceMetrics); Map sideInputStorageManagers = new HashMap<>(); @@ -956,12 +963,7 @@ private void cleanBaseDirsAndReadOffsetFiles() { * @return true if the logged store is valid, false otherwise. */ private boolean isLoggedStoreValid(String storeName, File loggedStoreDir) { - long changeLogDeleteRetentionInMs = StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS(); - - if (new StorageConfig(config).getChangeLogDeleteRetentionsInMs().get(storeName).isDefined()) { - changeLogDeleteRetentionInMs = - (long) new StorageConfig(config).getChangeLogDeleteRetentionsInMs().get(storeName).get(); - } + long changeLogDeleteRetentionInMs = new StorageConfig(config).getChangeLogDeleteRetentionInMs(storeName); if (changelogSystemStreams.containsKey(storeName)) { SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition()); diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java deleted file mode 100644 index c04d14fcbc..0000000000 --- a/samza-core/src/test/java/org/apache/samza/config/TestJavaStorageConfig.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.config; - -import static org.junit.Assert.*; - -import java.util.HashMap; -import java.util.Map; - -import org.junit.Test; - -public class TestJavaStorageConfig { - - @Test - public void testStorageConfig() { - Map map = new HashMap(); - map.put("stores.test.factory", "testFactory"); - map.put("stores.test.changelog", "testSystem.testChangelog"); - map.put("stores.test.key.serde", "string"); - map.put("stores.test.msg.serde", "integer"); - JavaStorageConfig config = new JavaStorageConfig(new MapConfig(map)); - - assertEquals("testFactory", config.getStorageFactoryClassName("test")); - assertEquals("testSystem.testChangelog", config.getChangelogStream("test")); - assertEquals("string", config.getStorageKeySerde("test")); - assertEquals("integer", config.getStorageMsgSerde("test")); - assertEquals("test", config.getStoreNames().get(0)); - } - - - @Test - public void testIsChangelogSystemSetting() { - Map configMap = new HashMap<>(); - configMap.put("stores.store1.changelog", "system1.stream1"); - configMap.put("job.changelog.system", "system2"); - configMap.put("stores.store2.changelog", "stream2"); - - JavaStorageConfig config = new JavaStorageConfig(new MapConfig(configMap)); - - assertEquals("system1.stream1", config.getChangelogStream("store1")); - assertEquals("system2.stream2", config.getChangelogStream("store2")); - - Map configMapErr = new HashMap<>(); - configMapErr.put("stores.store4.changelog", "stream4"); // incorrect - JavaStorageConfig configErr = new JavaStorageConfig(new MapConfig(configMapErr)); - - try { - configErr.getChangelogStream("store4"); - fail("store4 has no system defined. Should've failed."); - } catch (Exception e) { - // do nothing, it is expected - } - } - - @Test - public void testEmptyStringOrNullChangelogStream() { - Map configMap = new HashMap<>(); - configMap.put("stores.store1.changelog", ""); - configMap.put("stores.store2.changelog", " "); - - JavaStorageConfig config = new JavaStorageConfig(new MapConfig(configMap)); - - assertNull(config.getChangelogStream("store1")); - assertNull(config.getChangelogStream("store2")); - assertNull(config.getChangelogStream("store-changelog-none")); - } -} diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java new file mode 100644 index 0000000000..8fc77270b4 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.samza.SamzaException; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + + +public class TestStorageConfig { + private static final String STORE_NAME0 = "store0"; + private static final String STORE_NAME1 = "store1"; + + @Test + public void testGetStoreNames() { + // empty config, so no stores + assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig()).getStoreNames()); + + // has stores + StorageConfig storageConfig = new StorageConfig(new MapConfig( + ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), "store0.factory.class", + String.format(StorageConfig.FACTORY, STORE_NAME1), "store1.factory.class"))); + List actual = storageConfig.getStoreNames(); + // ordering shouldn't matter + assertEquals(2, actual.size()); + assertEquals(ImmutableSet.of(STORE_NAME0, STORE_NAME1), ImmutableSet.copyOf(actual)); + } + + @Test + public void testGetChangelogStream() { + // empty config, so no changelog stream + assertEquals(Optional.empty(), new StorageConfig(new MapConfig()).getChangelogStream(STORE_NAME0)); + + // store has empty string for changelog stream + StorageConfig storageConfig = new StorageConfig( + new MapConfig(ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), ""))); + assertEquals(Optional.empty(), storageConfig.getChangelogStream(STORE_NAME0)); + + // store has full changelog system-stream defined + storageConfig = new StorageConfig(new MapConfig( + ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), + "changelog-system.changelog-stream0"))); + assertEquals(Optional.of("changelog-system.changelog-stream0"), storageConfig.getChangelogStream(STORE_NAME0)); + + // store has changelog stream defined, but system comes from job.changelog.system + storageConfig = new StorageConfig(new MapConfig( + ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), "changelog-stream0", + StorageConfig.CHANGELOG_SYSTEM, "changelog-system"))); + assertEquals(Optional.of("changelog-system.changelog-stream0"), storageConfig.getChangelogStream(STORE_NAME0)); + + // batch mode: create unique stream name + storageConfig = new StorageConfig(new MapConfig( + ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), + "changelog-system.changelog-stream0", ApplicationConfig.APP_MODE, + ApplicationConfig.ApplicationMode.BATCH.name().toLowerCase(), ApplicationConfig.APP_RUN_ID, "run-id"))); + assertEquals(Optional.of("changelog-system.changelog-stream0-run-id"), + storageConfig.getChangelogStream(STORE_NAME0)); + } + + @Test(expected = SamzaException.class) + public void testGetChangelogStreamMissingSystem() { + StorageConfig storageConfig = new StorageConfig(new MapConfig( + ImmutableMap.of(String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), "changelog-stream0"))); + storageConfig.getChangelogStream(STORE_NAME0); + } + + @Test + public void testGetAccessLogEnabled() { + // empty config, access log disabled + assertFalse(new StorageConfig(new MapConfig()).getAccessLogEnabled(STORE_NAME0)); + + assertFalse(new StorageConfig(new MapConfig( + ImmutableMap.of(String.format(StorageConfig.ACCESSLOG_ENABLED, STORE_NAME0), "false"))).getAccessLogEnabled( + STORE_NAME0)); + + assertTrue(new StorageConfig(new MapConfig( + ImmutableMap.of(String.format(StorageConfig.ACCESSLOG_ENABLED, STORE_NAME0), "true"))).getAccessLogEnabled( + STORE_NAME0)); + } + + @Test + public void testGetAccessLogStream() { + String changelogStream = "changelog-stream"; + assertEquals(changelogStream + "-" + StorageConfig.ACCESSLOG_STREAM_SUFFIX, + new StorageConfig(new MapConfig()).getAccessLogStream(changelogStream)); + } + + @Test + public void testGetAccessLogSamplingRatio() { + // empty config, return default sampling ratio + assertEquals(StorageConfig.DEFAULT_ACCESSLOG_SAMPLING_RATIO, + new StorageConfig(new MapConfig()).getAccessLogSamplingRatio(STORE_NAME0)); + + assertEquals(40, new StorageConfig(new MapConfig( + ImmutableMap.of(String.format(StorageConfig.ACCESSLOG_SAMPLING_RATIO, STORE_NAME0), + "40"))).getAccessLogSamplingRatio(STORE_NAME0)); + } + + @Test + public void testGetStorageFactoryClassName() { + // empty config, so no factory + assertEquals(Optional.empty(), new StorageConfig(new MapConfig()).getStorageFactoryClassName(STORE_NAME0)); + + StorageConfig storageConfig = new StorageConfig( + new MapConfig(ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), "my.factory.class"))); + assertEquals(Optional.of("my.factory.class"), storageConfig.getStorageFactoryClassName(STORE_NAME0)); + } + + @Test + public void testGetStorageKeySerde() { + // empty config, so no key serde + assertEquals(Optional.empty(), new StorageConfig(new MapConfig()).getStorageKeySerde(STORE_NAME0)); + + StorageConfig storageConfig = new StorageConfig( + new MapConfig(ImmutableMap.of(String.format(StorageConfig.KEY_SERDE, STORE_NAME0), "my.key.serde.class"))); + assertEquals(Optional.of("my.key.serde.class"), storageConfig.getStorageKeySerde(STORE_NAME0)); + } + + @Test + public void testGetStorageMsgSerde() { + // empty config, so no msg serde + assertEquals(Optional.empty(), new StorageConfig(new MapConfig()).getStorageMsgSerde(STORE_NAME0)); + + StorageConfig storageConfig = new StorageConfig( + new MapConfig(ImmutableMap.of(String.format(StorageConfig.MSG_SERDE, STORE_NAME0), "my.msg.serde.class"))); + assertEquals(Optional.of("my.msg.serde.class"), storageConfig.getStorageMsgSerde(STORE_NAME0)); + } + + @Test + public void testGetChangelogSystem() { + // empty config, so no system + assertEquals(Optional.empty(), new StorageConfig(new MapConfig()).getChangelogSystem()); + + // job.changelog.system takes precedence over job.default.system + StorageConfig storageConfig = new StorageConfig(new MapConfig( + ImmutableMap.of(StorageConfig.CHANGELOG_SYSTEM, "changelog-system", JobConfig.JOB_DEFAULT_SYSTEM(), + "should-not-be-used"))); + assertEquals(Optional.of("changelog-system"), storageConfig.getChangelogSystem()); + + // fall back to job.default.system if job.changelog.system is not specified + storageConfig = + new StorageConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_DEFAULT_SYSTEM(), "default-system"))); + assertEquals(Optional.of("default-system"), storageConfig.getChangelogSystem()); + } + + @Test + public void testGetSideInputs() { + // empty config, so no system + assertEquals(Collections.emptyList(), new StorageConfig(new MapConfig()).getSideInputs(STORE_NAME0)); + + // single side input + StorageConfig storageConfig = new StorageConfig( + new MapConfig(ImmutableMap.of(String.format(StorageConfig.SIDE_INPUTS, STORE_NAME0), "side-input"))); + assertEquals(Collections.singletonList("side-input"), storageConfig.getSideInputs(STORE_NAME0)); + + // multiple side inputs + storageConfig = new StorageConfig(new MapConfig( + ImmutableMap.of(String.format(StorageConfig.SIDE_INPUTS, STORE_NAME0), "side-input0,side-input1"))); + assertEquals(ImmutableList.of("side-input0", "side-input1"), storageConfig.getSideInputs(STORE_NAME0)); + + // ignore whitespace + storageConfig = new StorageConfig(new MapConfig( + ImmutableMap.of(String.format(StorageConfig.SIDE_INPUTS, STORE_NAME0), ", side-input0 ,,side-input1,"))); + assertEquals(ImmutableList.of("side-input0", "side-input1"), storageConfig.getSideInputs(STORE_NAME0)); + } + + @Test + public void testGetSideInputsProcessorFactory() { + // empty config, so no factory + assertEquals(Optional.empty(), new StorageConfig(new MapConfig()).getSideInputsProcessorFactory(STORE_NAME0)); + + StorageConfig storageConfig = new StorageConfig(new MapConfig( + ImmutableMap.of(String.format(StorageConfig.SIDE_INPUTS_PROCESSOR_FACTORY, STORE_NAME0), + "my.side.inputs.factory.class"))); + assertEquals(Optional.of("my.side.inputs.factory.class"), storageConfig.getSideInputsProcessorFactory(STORE_NAME0)); + } + + @Test + public void testGetSideInputsProcessorSerializedInstance() { + // empty config, so no factory + assertEquals(Optional.empty(), + new StorageConfig(new MapConfig()).getSideInputsProcessorSerializedInstance(STORE_NAME0)); + + StorageConfig storageConfig = new StorageConfig(new MapConfig( + ImmutableMap.of(String.format(StorageConfig.SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, STORE_NAME0), + "serialized_instance"))); + assertEquals(Optional.of("serialized_instance"), + storageConfig.getSideInputsProcessorSerializedInstance(STORE_NAME0)); + } + + @Test + public void testGetChangeLogDeleteRetentionInMs() { + // empty config, return default sampling ratio + assertEquals(StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS, + new StorageConfig(new MapConfig()).getChangeLogDeleteRetentionInMs(STORE_NAME0)); + + StorageConfig storageConfig = new StorageConfig(new MapConfig( + ImmutableMap.of(String.format(StorageConfig.CHANGELOG_DELETE_RETENTION_MS, STORE_NAME0), + Long.toString(StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS * 2)))); + assertEquals(StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS * 2, + storageConfig.getChangeLogDeleteRetentionInMs(STORE_NAME0)); + } + + @Test + public void testIsChangelogSystem() { + StorageConfig storageConfig = new StorageConfig(new MapConfig(ImmutableMap.of( + // store0 has a changelog stream + String.format(StorageConfig.FACTORY, STORE_NAME0), "factory.class", + String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), "system0.changelog-stream", + // store1 does not have a changelog stream + String.format(StorageConfig.FACTORY, STORE_NAME1), "factory.class"))); + assertTrue(storageConfig.isChangelogSystem("system0")); + assertFalse(storageConfig.isChangelogSystem("other-system")); + } + + @Test + public void testHasDurableStores() { + // no changelog, which means no durable stores + StorageConfig storageConfig = new StorageConfig( + new MapConfig(ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), "factory.class"))); + assertFalse(storageConfig.hasDurableStores()); + + storageConfig = new StorageConfig(new MapConfig( + ImmutableMap.of(String.format(StorageConfig.FACTORY, STORE_NAME0), "factory.class", + String.format(StorageConfig.CHANGELOG_STREAM, STORE_NAME0), "system0.changelog-stream"))); + assertTrue(storageConfig.hasDurableStores()); + } +} diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestChangelogStreamManager.java b/samza-core/src/test/java/org/apache/samza/storage/TestChangelogStreamManager.java new file mode 100644 index 0000000000..1cb88c2672 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/storage/TestChangelogStreamManager.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.StreamValidationException; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemProducer; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class TestChangelogStreamManager { + private static final String SYSTEM = "system"; + private static final String STREAM = "stream"; + private static final int MAX_CHANGELOG_STREAM_PARTITIONS = 10; + private static final Set EXPECTED_STREAM_SPECS = ImmutableSet.of( + // changelog + StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM, MAX_CHANGELOG_STREAM_PARTITIONS), + // access log + new StreamSpec(STREAM + "-access-log", STREAM + "-access-log", SYSTEM, MAX_CHANGELOG_STREAM_PARTITIONS)); + + @Test + public void createChangelogStreams() { + Map map = new ImmutableMap.Builder() + .put("stores.store0.factory", "factory.class") + .put("stores.store0.changelog", SYSTEM + "." + STREAM) + .put("stores.store0.accesslog.enabled", "true") + .put(String.format("systems.%s.samza.factory", SYSTEM), MockSystemAdminFactory.class.getName()) + .build(); + Config config = new MapConfig(map); + ChangelogStreamManager.createChangelogStreams(config, MAX_CHANGELOG_STREAM_PARTITIONS); + } + + public static class MockSystemAdminFactory implements SystemFactory { + @Override + public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { + throw new UnsupportedOperationException("Unused in test"); + } + + @Override + public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { + throw new UnsupportedOperationException("Unused in test"); + } + + @Override + public SystemAdmin getAdmin(String systemName, Config config) { + return new MockSystemAdmin(); + } + } + + private static class MockSystemAdmin implements SystemAdmin { + private final Set streamsCreated = new HashSet<>(); + private final Set streamsValidated = new HashSet<>(); + + @Override + public boolean createStream(StreamSpec streamSpec) { + assertTrue(EXPECTED_STREAM_SPECS.contains(streamSpec)); + this.streamsCreated.add(streamSpec); + return true; + } + + @Override + public void validateStream(StreamSpec streamSpec) { + if (!EXPECTED_STREAM_SPECS.contains(streamSpec)) { + throw new StreamValidationException("Did not see expected stream spec"); + } + this.streamsValidated.add(streamSpec); + } + + @Override + public void stop() { + assertEquals(EXPECTED_STREAM_SPECS, this.streamsCreated); + assertEquals(EXPECTED_STREAM_SPECS, this.streamsValidated); + } + + @Override + public Map getOffsetsAfter(Map offsets) { + throw new UnsupportedOperationException("Unused in test"); + } + + @Override + public Map getSystemStreamMetadata(Set streamNames) { + throw new UnsupportedOperationException("Unused in test"); + } + + @Override + public Integer offsetComparator(String offset1, String offset2) { + throw new UnsupportedOperationException("Unused in test"); + } + } +} diff --git a/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java index a764a8b917..bf8c564f67 100644 --- a/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java +++ b/samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java @@ -23,9 +23,9 @@ import junit.framework.Assert; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; +import org.apache.samza.config.StorageConfig; import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.MapConfig; -import org.apache.samza.config.StorageConfig; import org.apache.samza.context.Context; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; @@ -71,7 +71,7 @@ public void testChangelogDisabled() { Map tableConfig = createTableDescriptor() .toConfig(createJobConfig()); Assert.assertEquals(1, tableConfig.size()); - Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_STREAM(), TABLE_ID))); + Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_STREAM, TABLE_ID))); } @Test @@ -81,7 +81,7 @@ public void testChangelogEnabled() { .toConfig(createJobConfig()); Assert.assertEquals(2, tableConfig.size()); Assert.assertEquals("test-job-10-table-t1", String.format( - tableConfig.get(String.format(StorageConfig.CHANGELOG_STREAM(), TABLE_ID)))); + tableConfig.get(String.format(StorageConfig.CHANGELOG_STREAM, TABLE_ID)))); } @Test @@ -92,9 +92,9 @@ public void testChangelogEnabledWithCustomParameters() { .toConfig(createJobConfig()); Assert.assertEquals(3, tableConfig.size()); Assert.assertEquals("my-stream", String.format( - tableConfig.get(String.format(StorageConfig.CHANGELOG_STREAM(), TABLE_ID)))); + tableConfig.get(String.format(StorageConfig.CHANGELOG_STREAM, TABLE_ID)))); Assert.assertEquals("100", String.format( - tableConfig.get(String.format(StorageConfig.CHANGELOG_REPLICATION_FACTOR(), TABLE_ID)))); + tableConfig.get(String.format(StorageConfig.CHANGELOG_REPLICATION_FACTOR, TABLE_ID)))); } @Test(expected = NullPointerException.class) diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala deleted file mode 100644 index e279639100..0000000000 --- a/samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.config - - -import org.junit.Test - -import scala.collection.JavaConverters._ -import org.apache.samza.config.StorageConfig._ -import org.junit.Assert.assertFalse -import org.junit.Assert.assertTrue -import org.junit.Assert.assertEquals -import org.junit.Assert.fail - -class TestStorageConfig { - @Test - def testIsChangelogSystem { - val configMap = Map[String, String]( - FACTORY.format("store1") -> "some.factory.Class", - CHANGELOG_STREAM.format("store1") -> "system1.stream1", - FACTORY.format("store2") -> "some.factory.Class") - val config = new MapConfig(configMap.asJava) - assertFalse(config.isChangelogSystem("system3")) - assertFalse(config.isChangelogSystem("system2")) - assertTrue(config.isChangelogSystem("system1")) - } - - @Test - def testIsChangelogSystemSetting { - val configMap = Map[String, String]( - FACTORY.format("store1") -> "some.factory.Class", - CHANGELOG_STREAM.format("store1") -> "system1.stream1", - CHANGELOG_SYSTEM -> "system2", - CHANGELOG_STREAM.format("store2") -> "stream2", - CHANGELOG_STREAM.format("store4") -> "stream4", - FACTORY.format("store2") -> "some.factory.Class") - val config = new MapConfig(configMap.asJava) - assertFalse(config.isChangelogSystem("system3")) - assertTrue(config.isChangelogSystem("system2")) - assertTrue(config.isChangelogSystem("system1")) - - assertEquals("system1.stream1", config.getChangelogStream("store1").getOrElse("")); - assertEquals("system2.stream2", config.getChangelogStream("store2").getOrElse("")); - - val configMapErr = Map[String, String](CHANGELOG_STREAM.format("store4")->"stream4") - val configErr = new MapConfig(configMapErr.asJava) - - try { - configErr.getChangelogStream("store4").getOrElse("") - fail("store4 has no system defined. Should've failed."); - } catch { - case e: Exception => // do nothing, it is expected - } - } -} \ No newline at end of file diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 10f879c6c2..fb5709426b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -30,6 +30,7 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.samza.SamzaException import org.apache.samza.config.ApplicationConfig.ApplicationMode +import org.apache.samza.util.ScalaJavaUtil.JavaOptionals import org.apache.samza.util.{Logging, StreamUtil} import scala.collection.JavaConverters._ @@ -214,12 +215,12 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { * 2. If systems.changelog-system.default.stream.replication.factor is configured, that value is used. * 3. 2 * - * Note that the changelog-system has a similar precedence. See [[JavaStorageConfig]] + * Note that the changelog-system has a similar precedence. See [[StorageConfig]] */ def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name).getOrElse(getDefaultChangelogStreamReplicationFactor) def getDefaultChangelogStreamReplicationFactor() = { - val changelogSystem = new JavaStorageConfig(config).getChangelogSystem() + val changelogSystem = new StorageConfig(config).getChangelogSystem.orElse(null) getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse(getSystemDefaultReplicationFactor(changelogSystem, "2")) } @@ -236,7 +237,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { val matcher = pattern.matcher(changelogConfig) val storeName = if (matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + cn) - storageConfig.getChangelogStream(storeName).foreach(changelogName => { + JavaOptionals.toRichOptional(storageConfig.getChangelogStream(storeName)).toOption.foreach(changelogName => { val systemStream = StreamUtil.getSystemStreamFromNames(changelogName) storeToChangelog += storeName -> systemStream.getStream }) diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index d6d3340ec9..da73d0cf35 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting import org.apache.kafka.clients.producer.KafkaProducer import org.apache.samza.config.ApplicationConfig.ApplicationMode import org.apache.samza.config.KafkaConfig.Config2Kafka -import org.apache.samza.config.StorageConfig._ import org.apache.samza.config.TaskConfig.Config2Task import org.apache.samza.config._ import org.apache.samza.metrics.MetricsRegistry @@ -34,7 +33,7 @@ import org.apache.samza.util._ object KafkaSystemFactory extends Logging { @VisibleForTesting - def getInjectedProducerProperties(systemName: String, config: Config) = if (config.isChangelogSystem(systemName)) { + def getInjectedProducerProperties(systemName: String, config: Config) = if (new StorageConfig(config).isChangelogSystem(systemName)) { warn("System name '%s' is being used as a changelog. Disabling compression since Kafka does not support compression for log compacted topics." format systemName) Map[String, String]("compression.type" -> "none") } else { diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala index c33393599a..596d67bc64 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala @@ -20,8 +20,7 @@ package org.apache.samza.system.kafka import org.apache.samza.SamzaException -import org.apache.samza.config.MapConfig -import org.apache.samza.config.StorageConfig +import org.apache.samza.config.{StorageConfig, MapConfig} import org.apache.samza.metrics.MetricsRegistryMap import org.junit.Assert._ import org.junit.Test diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/descriptors/InMemoryTableDescriptor.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/descriptors/InMemoryTableDescriptor.java index fad3286123..8a24a6be8b 100644 --- a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/descriptors/InMemoryTableDescriptor.java +++ b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/descriptors/InMemoryTableDescriptor.java @@ -55,7 +55,7 @@ public String getProviderFactoryClassName() { public Map toConfig(Config jobConfig) { Map tableConfig = new HashMap<>(super.toConfig(jobConfig)); // Store factory configuration - tableConfig.put(String.format(StorageConfig.FACTORY(), tableId), + tableConfig.put(String.format(StorageConfig.FACTORY, tableId), InMemoryKeyValueStorageEngineFactory.class.getName()); return Collections.unmodifiableMap(tableConfig); } diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java index fc9ce76ee4..ee3f9b7e6b 100644 --- a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java +++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java @@ -21,9 +21,9 @@ import java.util.Map; import org.apache.samza.config.Config; +import org.apache.samza.config.StorageConfig; import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.MapConfig; -import org.apache.samza.config.StorageConfig; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.storage.kv.LocalTableProviderFactory; @@ -53,7 +53,7 @@ public void testTableProviderFactoryConfig() { Assert.assertEquals(LocalTableProviderFactory.class.getName(), tableConfig.get(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID))); Assert.assertEquals(InMemoryKeyValueStorageEngineFactory.class.getName(), - tableConfig.get(String.format(StorageConfig.FACTORY(), TABLE_ID))); + tableConfig.get(String.format(StorageConfig.FACTORY, TABLE_ID))); } private Config createJobConfig() { diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java index 5dc51fcea6..054eb02ed2 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java @@ -23,8 +23,8 @@ import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JavaSerializerConfig; -import org.apache.samza.config.JavaStorageConfig; import org.apache.samza.config.SerializerConfig$; +import org.apache.samza.config.StorageConfig; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.SerdeFactory; import org.apache.samza.storage.StorageEngineFactory; @@ -55,11 +55,11 @@ public class RocksDbKeyValueReader { */ public RocksDbKeyValueReader(String storeName, String dbPath, Config config) { // get the key serde and value serde from the config - JavaStorageConfig storageConfig = new JavaStorageConfig(config); + StorageConfig storageConfig = new StorageConfig(config); JavaSerializerConfig serializerConfig = new JavaSerializerConfig(config); - keySerde = getSerdeFromName(storageConfig.getStorageKeySerde(storeName), serializerConfig); - valueSerde = getSerdeFromName(storageConfig.getStorageMsgSerde(storeName), serializerConfig); + keySerde = getSerdeFromName(storageConfig.getStorageKeySerde(storeName).orElse(null), serializerConfig); + valueSerde = getSerdeFromName(storageConfig.getStorageMsgSerde(storeName).orElse(null), serializerConfig); // get db options Options options = RocksDbOptionsHelper.options(config, 1, new File(dbPath), StorageEngineFactory.StoreMode.ReadWrite); diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java index 0703317f7a..00449041b3 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java @@ -284,7 +284,7 @@ public Map toConfig(Config jobConfig) { Map tableConfig = new HashMap<>(super.toConfig(jobConfig)); // Store factory configuration - tableConfig.put(String.format(StorageConfig.FACTORY(), tableId), + tableConfig.put(String.format(StorageConfig.FACTORY, tableId), RocksDbKeyValueStorageEngineFactory.class.getName()); if (writeBatchSize != null) { diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala index 8d9c294d88..d02d6235f3 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala @@ -21,7 +21,7 @@ package org.apache.samza.storage.kv import java.io.File -import org.apache.samza.config.StorageConfig._ +import org.apache.samza.config.StorageConfig import org.apache.samza.context.{ContainerContext, JobContext} import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.storage.StorageEngineFactory.StoreMode @@ -44,20 +44,20 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi changeLogSystemStreamPartition: SystemStreamPartition, jobContext: JobContext, containerContext: ContainerContext, storeMode: StoreMode): KeyValueStore[Array[Byte], Array[Byte]] = { - val storageConfig = jobContext.getConfig.subset("stores." + storeName + ".", true) - val isLoggedStore = jobContext.getConfig.getChangelogStream(storeName).isDefined + val storageConfigSubset = jobContext.getConfig.subset("stores." + storeName + ".", true) + val isLoggedStore = new StorageConfig(jobContext.getConfig).getChangelogStream(storeName).isPresent val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry) val numTasksForContainer = containerContext.getContainerModel.getTasks.keySet().size() rocksDbMetrics.newGauge("rocksdb.block-cache-size", - () => RocksDbOptionsHelper.getBlockCacheSize(storageConfig, numTasksForContainer)) + () => RocksDbOptionsHelper.getBlockCacheSize(storageConfigSubset, numTasksForContainer)) - val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, numTasksForContainer, storeDir, storeMode) + val rocksDbOptions = RocksDbOptionsHelper.options(storageConfigSubset, numTasksForContainer, storeDir, storeMode) val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true) val rocksDbFlushOptions = new FlushOptions().setWaitForFlush(true) val rocksDb = new RocksDbKeyValueStore( storeDir, rocksDbOptions, - storageConfig, + storageConfigSubset, isLoggedStore, storeName, rocksDbWriteOptions, diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java index 319fb0ff9b..59afaefb6d 100644 --- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java +++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java @@ -21,9 +21,9 @@ import java.util.Map; import org.apache.samza.config.Config; +import org.apache.samza.config.StorageConfig; import org.apache.samza.config.JavaTableConfig; import org.apache.samza.config.MapConfig; -import org.apache.samza.config.StorageConfig; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; @@ -54,7 +54,7 @@ public void testTableProviderFactoryConfig() { Assert.assertEquals(LocalTableProviderFactory.class.getName(), tableConfig.get(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID))); Assert.assertEquals(RocksDbKeyValueStorageEngineFactory.class.getName(), - tableConfig.get(String.format(StorageConfig.FACTORY(), TABLE_ID))); + tableConfig.get(String.format(StorageConfig.FACTORY, TABLE_ID))); } @Test @@ -88,8 +88,8 @@ public void testRocksDbConfig() { assertEquals("9", RocksDbTableDescriptor.CONTAINER_WRITE_BUFFER_SIZE_BYTES, tableConfig); assertEquals("snappy", RocksDbTableDescriptor.ROCKSDB_COMPRESSION, tableConfig); assertEquals("fifo", RocksDbTableDescriptor.ROCKSDB_COMPACTION_STYLE, tableConfig); - Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_STREAM(), TABLE_ID))); - Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_REPLICATION_FACTOR(), TABLE_ID))); + Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_STREAM, TABLE_ID))); + Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_REPLICATION_FACTOR, TABLE_ID))); Assert.assertEquals("xyz", tableConfig.get("abc")); } diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala index 6f1e0f6a33..fdb473a7d1 100644 --- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala +++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala @@ -23,6 +23,7 @@ import java.io.File import org.apache.samza.SamzaException import org.apache.samza.config.MetricsConfig.Config2Metrics +import org.apache.samza.config.StorageConfig import org.apache.samza.context.{ContainerContext, JobContext} import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.serializers.Serde @@ -30,7 +31,8 @@ import org.apache.samza.storage.StorageEngineFactory.StoreMode import org.apache.samza.storage.{StorageEngine, StorageEngineFactory, StoreProperties} import org.apache.samza.system.SystemStreamPartition import org.apache.samza.task.MessageCollector -import org.apache.samza.util.HighResolutionClock +import org.apache.samza.util.ScalaJavaUtil.JavaOptionals +import org.apache.samza.util.{HighResolutionClock, ScalaJavaUtil} /** * A key value storage engine factory implementation @@ -81,20 +83,21 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] changeLogSystemStreamPartition: SystemStreamPartition, jobContext: JobContext, containerContext: ContainerContext, storeMode : StoreMode): StorageEngine = { - val storageConfig = jobContext.getConfig.subset("stores." + storeName + ".", true) - val storeFactory = storageConfig.get("factory") + val storageConfigSubset = jobContext.getConfig.subset("stores." + storeName + ".", true) + val storageConfig = new StorageConfig(jobContext.getConfig) + val storeFactory = JavaOptionals.toRichOptional(storageConfig.getStorageFactoryClassName(storeName)).toOption var storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder() - val accessLog = storageConfig.getBoolean("accesslog.enabled", false) + val accessLog = storageConfig.getAccessLogEnabled(storeName) - if (storeFactory == null) { + if (storeFactory.isEmpty) { throw new SamzaException("Store factory not defined. Cannot proceed with KV store creation!") } - if (!storeFactory.equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) { + if (!storeFactory.get.equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) { storePropertiesBuilder = storePropertiesBuilder.setPersistedToDisk(true) } - val batchSize = storageConfig.getInt("write.batch.size", 500) - val cacheSize = storageConfig.getInt("object.cache.size", math.max(batchSize, 1000)) + val batchSize = storageConfigSubset.getInt("write.batch.size", 500) + val cacheSize = storageConfigSubset.getInt("object.cache.size", math.max(batchSize, 1000)) val enableCache = cacheSize > 0 if (cacheSize > 0 && cacheSize < batchSize) { diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java index 7789aca9ba..17590e8b19 100644 --- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java @@ -50,7 +50,7 @@ import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.collection.JavaConverters; + /** * {@link TaskProxy} interface implementation for samza jobs running in yarn execution environment. @@ -140,7 +140,7 @@ protected List readTasksFromCoordinatorStream(CoordinatorStreamSystemConsu TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskContainerMapping.TYPE), new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetTaskModeMapping.TYPE)); Map taskNameToContainerIdMapping = taskAssignmentManager.readTaskAssignment(); StorageConfig storageConfig = new StorageConfig(consumer.getConfig()); - List storeNames = JavaConverters.seqAsJavaListConverter(storageConfig.getStoreNames()).asJava(); + List storeNames = storageConfig.getStoreNames(); return taskNameToContainerIdMapping.entrySet() .stream() .map(entry -> { diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala index 69811b0d95..33819a9b59 100644 --- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala @@ -29,10 +29,7 @@ import com.google.common.base.Stopwatch import com.google.common.collect.ImmutableList import com.google.common.collect.ImmutableMap import org.apache.commons.lang.RandomStringUtils -import org.apache.samza.config.Config -import org.apache.samza.config.JobConfig -import org.apache.samza.config.MapConfig -import org.apache.samza.config.StorageConfig._ +import org.apache.samza.config.{Config, JobConfig, MapConfig, StorageConfig} import org.apache.samza.container.TaskName import org.apache.samza.context.ContainerContextImpl import org.apache.samza.context.JobContextImpl @@ -53,9 +50,10 @@ import org.apache.samza.task.TaskInstanceCollector import org.apache.samza.util.CommandLine import org.apache.samza.util.FileUtil import org.apache.samza.util.Logging -import org.apache.samza.util. Util +import org.apache.samza.util.Util import org.apache.samza.Partition import org.apache.samza.SamzaException +import org.apache.samza.util.ScalaJavaUtil.JavaOptionals import scala.collection.JavaConverters._ import scala.util.Random @@ -121,18 +119,19 @@ object TestKeyValuePerformance extends Logging { Map[String, SystemProducer](), new SerdeManager ) + val storageConfig = new StorageConfig(config) // Build a Map[String, StorageEngineFactory]. The key is the store name. - val storageEngineMappings = config - .getStoreNames + val storageEngineMappings = storageConfig + .getStoreNames.asScala .map(storeName => { val storageFactoryClassName = - config.getStorageFactoryClassName(storeName) + JavaOptionals.toRichOptional(storageConfig.getStorageFactoryClassName(storeName)).toOption .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName)) (storeName, Util.getObj(storageFactoryClassName, classOf[StorageEngineFactory[Array[Byte], Array[Byte]]])) }) for((storeName, storageEngine) <- storageEngineMappings) { - val testSetCount = config.getInt("set.count", 1) + val testSetCount = storageConfig.getInt("set.count", 1) (1 to testSetCount).foreach(testSet => { //Create a new DB instance for each test set val output = new File("/tmp/" + UUID.randomUUID()) @@ -145,7 +144,7 @@ object TestKeyValuePerformance extends Logging { new TaskInstanceCollector(producerMultiplexer), new MetricsRegistryMap, null, - JobContextImpl.fromConfigWithDefaults(config), + JobContextImpl.fromConfigWithDefaults(storageConfig), new ContainerContextImpl(new ContainerModel("0", tasks.asJava), new MetricsRegistryMap), StoreMode.ReadWrite ) @@ -156,7 +155,7 @@ object TestKeyValuePerformance extends Logging { } // Run the test method - testMethod(db, config.subset("set-" + testSet + ".", true)) + testMethod(db, storageConfig.subset("set-" + testSet + ".", true)) FileUtil.rm(output) })