Skip to content

Commit

Permalink
Merge pull request apache#1440 from agresch/ystorm_6345
Browse files Browse the repository at this point in the history
YSTORM-6345 YSTORM-6346 prevent topo conf from overriding some system properties
  • Loading branch information
Aaron Gresch authored and GitHub Enterprise committed Sep 27, 2019
2 parents fdc7a0f + 71b91a5 commit 7a35a6c
Showing 1 changed file with 23 additions and 0 deletions.
Expand Up @@ -1084,6 +1084,13 @@ private static void addToSerializers(Map<String, String> ser, List<Object> conf)
}

@SuppressWarnings("unchecked")
/**
* Create a normalized topology conf.
*
* @param conf the nimbus conf
* @param topoConf initial topology conf
* @param topology the Storm topology
*/
private static Map<String, Object> normalizeConf(Map<String, Object> conf, Map<String, Object> topoConf, StormTopology topology) {
//ensure that serializations are same for all tasks no matter what's on
// the supervisors. this also allows you to declare the serializations as a sequence
Expand Down Expand Up @@ -1112,6 +1119,20 @@ private static Map<String, Object> normalizeConf(Map<String, Object> conf, Map<S
ret.put(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS));
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, mergedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));

// Don't allow topoConf to override various cluster-specific properties.
// Specifically adding the cluster settings to the topoConf here will make sure these settings
// also override the subsequently generated conf picked up locally on the classpath.
//
// We will be dealing with 3 confs:
// 1) the submitted topoConf created here
// 2) the combined classpath conf with the topoConf added on top
// 3) the nimbus conf with conf 2 above added on top.
//
// By first forcing the topology conf to contain the nimbus settings, we guarantee all three confs
// will have the correct settings that cannot be overriden by the submitter.
ret.put(Config.STORM_CGROUP_HIERARCHY_DIR, conf.get(Config.STORM_CGROUP_HIERARCHY_DIR));
ret.put(Config.WORKER_METRICS, conf.get(Config.WORKER_METRICS));

if (mergedConf.containsKey(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) {
int workerTimeoutSecs = (Integer) ObjectReader.getInt(mergedConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS));
int workerMaxTimeoutSecs = (Integer) ObjectReader.getInt(mergedConf.get(Config.WORKER_MAX_TIMEOUT_SECS));
Expand All @@ -1122,6 +1143,7 @@ private static Map<String, Object> normalizeConf(Map<String, Object> conf, Map<S
topoId, workerTimeoutSecs, workerMaxTimeoutSecs);
}
}

return ret;
}

Expand Down Expand Up @@ -3105,6 +3127,7 @@ public void submitTopologyWithOpts(String topoName, String uploadedJarLocation,
if (!(Boolean) conf.getOrDefault(DaemonConfig.STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED, false)) {
topoConf.remove(Config.TOPOLOGY_CLASSPATH_BEGINNING);
}

String topoVersionString = topology.get_storm_version();
if (topoVersionString == null) {
topoVersionString = (String) conf.getOrDefault(Config.SUPERVISOR_WORKER_DEFAULT_VERSION, VersionInfo.getVersion());
Expand Down

0 comments on commit 7a35a6c

Please sign in to comment.