Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,48 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.execution.StreamManager;
import org.apache.samza.util.StreamUtil;


/**
* a java version of the storage config
* Config helper methods related to storage.
*/
public class JavaStorageConfig extends MapConfig {

public class StorageConfig extends MapConfig {
private static final String FACTORY_SUFFIX = ".factory";
private static final String CHANGELOG_SUFFIX = ".changelog";
private static final String STORE_PREFIX = "stores.";
private static final String FACTORY = "stores.%s.factory";
private static final String KEY_SERDE = "stores.%s.key.serde";
private static final String MSG_SERDE = "stores.%s.msg.serde";
private static final String CHANGELOG_STREAM = "stores.%s.changelog";
private static final String CHANGELOG_SYSTEM = "job.changelog.system";
private static final String ACCESSLOG_STREAM_SUFFIX = "access-log";
private static final String ACCESSLOG_SAMPLING_RATIO = "stores.%s.accesslog.sampling.ratio";
private static final String ACCESSLOG_ENABLED = "stores.%s.accesslog.enabled";
private static final int DEFAULT_ACCESSLOG_SAMPLING_RATIO = 50;

public static final String SIDE_INPUTS = "stores.%s.side.inputs";
public static final String SIDE_INPUTS_PROCESSOR_FACTORY = "stores.%s.side.inputs.processor.factory";
public static final String SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE = "stores.%s.side.inputs.processor.serialized.instance";

public JavaStorageConfig(Config config) {

public static final String FACTORY = STORE_PREFIX + "%s" + FACTORY_SUFFIX;
public static final String KEY_SERDE = STORE_PREFIX + "%s.key.serde";
public static final String MSG_SERDE = STORE_PREFIX + "%s.msg.serde";
public static final String CHANGELOG_STREAM = STORE_PREFIX + "%s" + CHANGELOG_SUFFIX;
public static final String ACCESSLOG_STREAM_SUFFIX = "access-log";
public static final String CHANGELOG_REPLICATION_FACTOR = STORE_PREFIX + "%s.changelog.replication.factor";

static final String CHANGELOG_SYSTEM = "job.changelog.system";
static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + "%s.changelog.delete.retention.ms";
static final long DEFAULT_CHANGELOG_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1);
static final String ACCESSLOG_SAMPLING_RATIO = STORE_PREFIX + "%s.accesslog.sampling.ratio";
static final String ACCESSLOG_ENABLED = STORE_PREFIX + "%s.accesslog.enabled";
static final int DEFAULT_ACCESSLOG_SAMPLING_RATIO = 50;
static final String SIDE_INPUTS = STORE_PREFIX + "%s.side.inputs";
static final String SIDE_INPUTS_PROCESSOR_FACTORY = STORE_PREFIX + "%s.side.inputs.processor.factory";
static final String SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE =
STORE_PREFIX + "%s.side.inputs.processor.serialized.instance";

public StorageConfig(Config config) {
super(config);
}

public List<String> getStoreNames() {
Config subConfig = subset(STORE_PREFIX, true);
List<String> storeNames = new ArrayList<String>();
List<String> storeNames = new ArrayList<>();
for (String key : subConfig.keySet()) {
if (key.endsWith(FACTORY_SUFFIX)) {
storeNames.add(key.substring(0, key.length() - FACTORY_SUFFIX.length()));
Expand All @@ -66,19 +73,21 @@ public List<String> getStoreNames() {
return storeNames;
}

public String getChangelogStream(String storeName) {

// If the config specifies 'stores.<storename>.changelog' as '<system>.<stream>' combination - it will take precedence.
// If this config only specifies <astream> and there is a value in job.changelog.system=<asystem> -
// these values will be combined into <asystem>.<astream>
/**
* If the config specifies 'stores.&lt;storename&gt;.changelog' as '&lt;system&gt;.&lt;stream&gt;' combination - it will take
* precedence.
* If this config only specifies &lt;astream&gt; and there is a value in job.changelog.system=&lt;asystem&gt; - these values will
* be combined into &lt;asystem&gt;.&lt;astream&gt;
*/
public Optional<String> getChangelogStream(String storeName) {
String systemStream = StringUtils.trimToNull(get(String.format(CHANGELOG_STREAM, storeName), null));

String systemStreamRes;
if (systemStream != null && !systemStream.contains(".")) {
String changelogSystem = getChangelogSystem();
if (systemStream != null && !systemStream.contains(".")) {
Optional<String> changelogSystem = getChangelogSystem();
// contains only stream name
if (changelogSystem != null) {
systemStreamRes = changelogSystem + "." + systemStream;
if (changelogSystem.isPresent()) {
systemStreamRes = changelogSystem.get() + "." + systemStream;
} else {
throw new SamzaException("changelog system is not defined:" + systemStream);
}
Expand All @@ -89,7 +98,7 @@ public String getChangelogStream(String storeName) {
if (systemStreamRes != null) {
systemStreamRes = StreamManager.createUniqueNameForBatch(systemStreamRes, this);
Comment thread
shanthoosh marked this conversation as resolved.
}
return systemStreamRes;
return Optional.ofNullable(systemStreamRes);
}

public boolean getAccessLogEnabled(String storeName) {
Expand All @@ -104,24 +113,24 @@ public int getAccessLogSamplingRatio(String storeName) {
return getInt(String.format(ACCESSLOG_SAMPLING_RATIO, storeName), DEFAULT_ACCESSLOG_SAMPLING_RATIO);
}

public String getStorageFactoryClassName(String storeName) {
return get(String.format(FACTORY, storeName), null);
public Optional<String> getStorageFactoryClassName(String storeName) {
return Optional.ofNullable(get(String.format(FACTORY, storeName)));
}

public String getStorageKeySerde(String storeName) {
return get(String.format(KEY_SERDE, storeName), null);
public Optional<String> getStorageKeySerde(String storeName) {
return Optional.ofNullable(get(String.format(KEY_SERDE, storeName)));
}

public String getStorageMsgSerde(String storeName) {
return get(String.format(MSG_SERDE, storeName), null);
public Optional<String> getStorageMsgSerde(String storeName) {
return Optional.ofNullable(get(String.format(MSG_SERDE, storeName)));
}

/**
* Gets the System to use for reading/writing checkpoints. Uses the following precedence.
* Gets the System to use for changelogs. Uses the following precedence.
*
* 1. If job.changelog.system is defined, that value is used.
* 2. If job.default.system is defined, that value is used.
* 3. null
* 3. empty optional
*
* Note: Changelogs can be defined using
* stores.storeName.changelog=systemName.streamName or
Expand All @@ -131,8 +140,8 @@ public String getStorageMsgSerde(String storeName) {
*
* @return the name of the system to use by default for all changelogs, if defined.
*/
public String getChangelogSystem() {
Comment thread
cameronlee314 marked this conversation as resolved.
return get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM(), null));
public Optional<String> getChangelogSystem() {
return Optional.ofNullable(get(CHANGELOG_SYSTEM, get(JobConfig.JOB_DEFAULT_SYSTEM())));
}

/**
Expand All @@ -158,19 +167,42 @@ public List<String> getSideInputs(String storeName) {
* Gets the SideInputsProcessorFactory associated with the {@code storeName}.
*
* @param storeName name of the store
* @return the class name of SideInputsProcessorFactory if present, null otherwise
* @return the class name of SideInputsProcessorFactory if present, empty optional otherwise
*/
public String getSideInputsProcessorFactory(String storeName) {
return get(String.format(SIDE_INPUTS_PROCESSOR_FACTORY, storeName), null);
public Optional<String> getSideInputsProcessorFactory(String storeName) {
return Optional.ofNullable(get(String.format(SIDE_INPUTS_PROCESSOR_FACTORY, storeName)));
}

/**
* Gets the serialized instance of SideInputsProcessor associated with the {@code storeName}.
*
* @param storeName name of the store
* @return the serialized instance of SideInputsProcessor if present, null otherwise
* @return the serialized instance of SideInputsProcessor if present, empty optional otherwise
*/
public Optional<String> getSideInputsProcessorSerializedInstance(String storeName) {
return Optional.ofNullable(get(String.format(SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, storeName)));
}

public long getChangeLogDeleteRetentionInMs(String storeName) {
return getLong(String.format(CHANGELOG_DELETE_RETENTION_MS, storeName), DEFAULT_CHANGELOG_DELETE_RETENTION_MS);
}

/**
* Helper method to check if a system has a changelog attached to it.
*/
public boolean isChangelogSystem(String systemName) {
return getStoreNames().stream()
.map(this::getChangelogStream)
.filter(Optional::isPresent)
.map(systemStreamName -> StreamUtil.getSystemStreamFromNames(systemStreamName.get()).getSystem())
.anyMatch(system -> system.equals(systemName));
}

/**
* Helper method to check if there is any stores configured w/ a changelog
*/
public String getSideInputsProcessorSerializedInstance(String storeName) {
return get(String.format(SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, storeName), null);
public boolean hasDurableStores() {
Config subConfig = subset(STORE_PREFIX, true);
return subConfig.keySet().stream().anyMatch(key -> key.endsWith(CHANGELOG_SUFFIX));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,12 @@ private void configureSerdes(Map<String, String> configs, Map<String, StreamEdge

// set key and msg serdes for stores to the serde names generated above
storeKeySerdes.forEach((storeName, serde) -> {
String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE(), storeName);
String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE, storeName);
configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
});

storeMsgSerdes.forEach((storeName, serde) -> {
String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), storeName);
String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE, storeName);
configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde));
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,8 @@ public void clearStreamsFromPreviousRun(Config prevConfig) {

//Find changelog streams and remove them
StorageConfig storageConfig = new StorageConfig(prevConfig);
for (String store : JavaConversions.asJavaCollection(storageConfig.getStoreNames())) {
String changelog = storageConfig.getChangelogStream(store)
.getOrElse(defaultValue(null));
for (String store : storageConfig.getStoreNames()) {
String changelog = storageConfig.getChangelogStream(store).orElse(null);
if (changelog != null) {
LOGGER.info("Clear store {} changelog {}", store, changelog);
SystemStream systemStream = StreamUtil.getSystemStreamFromNames(changelog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.samza.operators.spec;

import org.apache.samza.config.JavaStorageConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.serializers.Serde;
Expand Down Expand Up @@ -59,12 +58,12 @@ public Serde getMsgSerde() {
return msgSerde;
}

public JavaStorageConfig getStorageConfigs() {
public StorageConfig getStorageConfigs() {
HashMap<String, String> configs = new HashMap<>();
configs.put(String.format(StorageConfig.FACTORY(), this.getStoreName()), this.getStoreFactory());
configs.put(String.format(StorageConfig.CHANGELOG_STREAM(), this.getStoreName()), this.getChangelogStream());
configs.put(String.format(StorageConfig.FACTORY, this.getStoreName()), this.getStoreFactory());
configs.put(String.format(StorageConfig.CHANGELOG_STREAM, this.getStoreName()), this.getChangelogStream());
configs.putAll(this.getOtherProperties());
return new JavaStorageConfig(new MapConfig(configs));
return new StorageConfig(new MapConfig(configs));
}

private String getStoreFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
package org.apache.samza.storage;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaStorageConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
Expand Down Expand Up @@ -124,12 +125,15 @@ public void updatePartitionMapping(Map<TaskName, Integer> prevChangelogEntries,
*/
public static void createChangelogStreams(Config config, int maxChangeLogStreamPartitions) {
// Get changelog store config
JavaStorageConfig storageConfig = new JavaStorageConfig(config);
Map<String, SystemStream> storeNameSystemStreamMapping = storageConfig.getStoreNames()
.stream()
.filter(name -> StringUtils.isNotBlank(storageConfig.getChangelogStream(name)))
.collect(Collectors.toMap(name -> name,
name -> StreamUtil.getSystemStreamFromNames(storageConfig.getChangelogStream(name))));
StorageConfig storageConfig = new StorageConfig(config);
ImmutableMap.Builder<String, SystemStream> storeNameSystemStreamMapBuilder = new ImmutableMap.Builder<>();
storageConfig.getStoreNames().forEach(storeName -> {
Optional<String> changelogStream = storageConfig.getChangelogStream(storeName);
if (changelogStream.isPresent() && StringUtils.isNotBlank(changelogStream.get())) {
storeNameSystemStreamMapBuilder.put(storeName, StreamUtil.getSystemStreamFromNames(changelogStream.get()));
}
});
Map<String, SystemStream> storeNameSystemStreamMapping = storeNameSystemStreamMapBuilder.build();

// Get SystemAdmin for changelog store's system and attempt to create the stream
SystemConfig systemConfig = new SystemConfig(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JavaStorageConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.ContainerContextImpl;
Expand Down Expand Up @@ -145,23 +146,23 @@ private void getContainerModels() {
* and put them into the maps
*/
private void getChangeLogSystemStreamsAndStorageFactories() {
JavaStorageConfig config = new JavaStorageConfig(jobConfig);
StorageConfig config = new StorageConfig(jobConfig);
List<String> storeNames = config.getStoreNames();

LOG.info("Got store names: " + storeNames.toString());

for (String storeName : storeNames) {
String streamName = config.getChangelogStream(storeName);
Optional<String> streamName = config.getChangelogStream(storeName);

LOG.info("stream name for " + storeName + " is " + streamName);
LOG.info("stream name for " + storeName + " is " + streamName.orElse(null));

if (streamName != null) {
changeLogSystemStreams.put(storeName, StreamUtil.getSystemStreamFromNames(streamName));
if (streamName.isPresent()) {
changeLogSystemStreams.put(storeName, StreamUtil.getSystemStreamFromNames(streamName.get()));
}

String factoryClass = config.getStorageFactoryClassName(storeName);
if (factoryClass != null) {
storageEngineFactories.put(storeName, Util.getObj(factoryClass, StorageEngineFactory.class));
Optional<String> factoryClass = config.getStorageFactoryClassName(storeName);
if (factoryClass.isPresent()) {
storageEngineFactories.put(storeName, Util.getObj(factoryClass.get(), StorageEngineFactory.class));
} else {
throw new SamzaException("Missing storage factory for " + storeName + ".");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.container.grouper.task.GrouperMetadata;
Expand Down
Loading