From 6d7fd2e563f7e6501d54e1cc9d484e38c58a1659 Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Thu, 27 Apr 2017 11:36:46 -0700 Subject: [PATCH] SAMZA-1245: Make stream samza.physical.name config name string public --- .../main/java/org/apache/samza/execution/StreamManager.java | 5 +++++ .../main/scala/org/apache/samza/config/StreamConfig.scala | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java index 3c133827e2..c6ab036228 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.samza.SamzaException; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; @@ -62,6 +63,10 @@ Map getStreamPartitionCounts(String systemName, Set str Map streamToPartitionCount = new HashMap<>(); SystemAdmin systemAdmin = sysAdmins.get(systemName); + if (systemAdmin == null) { + throw new SamzaException(String.format("System %s does not exist.", systemName)); + } + // retrieve the metadata for the streams in this system Map streamToMetadata = systemAdmin.getSystemStreamMetadata(streamNames); // set the partitions of a stream to its StreamEdge diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala index 910ae632bb..e40994ae1f 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala @@ -43,7 +43,7 @@ object StreamConfig { protected val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s." protected val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM - protected val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME + val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME implicit def Config2Stream(config: Config) = new StreamConfig(config) }