From e55d9b7d47a8ea62174be106af0f9ab1cea402eb Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 23 Sep 2016 11:26:34 -0500 Subject: [PATCH] STORM-2122: Cache dependency data, and serialize reading of the data. --- .../daemon/supervisor/BasicContainer.java | 93 ++++++++++++++++--- 1 file changed, 79 insertions(+), 14 deletions(-) diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java index 93c10c7417e..b89d0f19e81 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java @@ -23,8 +23,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Iterator; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -470,6 +471,82 @@ private String getWorkerLoggingConfigFile() { return log4jConfigurationDir + Utils.FILE_PATH_SEPARATOR + "worker.xml"; } + private static class DependencyLocations { + private List _data = null; + private final Map _conf; + private final String _topologyId; + private final AdvancedFSOps _ops; + private final String _stormRoot; + + public DependencyLocations(final Map conf, final String topologyId, final AdvancedFSOps ops, final String stormRoot) { + _conf = conf; + _topologyId = topologyId; + _ops = ops; + _stormRoot = stormRoot; + } + + public String toString() { + List data; + synchronized(this) { + data = _data; + } + return "DEP_LOCS for " + _topologyId +" => " + data; + } + + public synchronized List get() throws IOException { + if (_data != null) { + return _data; + } + final StormTopology stormTopology = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _ops); + final List dependencyLocations = new ArrayList<>(); + if (stormTopology.get_dependency_jars() != null) { + for (String dependency : stormTopology.get_dependency_jars()) { + dependencyLocations.add(new File(_stormRoot, dependency).getAbsolutePath()); + } + } + + if (stormTopology.get_dependency_artifacts() != null) { + for (String dependency : stormTopology.get_dependency_artifacts()) { + dependencyLocations.add(new File(_stormRoot, dependency).getAbsolutePath()); + } + } + _data = dependencyLocations; + return _data; + } + } + + static class DepLRUCache { + public final int _maxSize = 100; //We could make this configurable in the future... + + @SuppressWarnings("serial") + private LinkedHashMap _cache = new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return (size() > _maxSize); + } + }; + + public synchronized DependencyLocations get(final Map conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) { + //Only go off of the topology id for now. + DependencyLocations dl = _cache.get(topologyId); + if (dl == null) { + _cache.putIfAbsent(topologyId, new DependencyLocations(conf, topologyId, ops, stormRoot)); + dl = _cache.get(topologyId); + } + return dl; + } + + public synchronized void clear() { + _cache.clear(); + } + } + + static final DepLRUCache DEP_LOC_CACHE = new DepLRUCache(); + + public static List getDependencyLocationsFor(final Map conf, final String topologyId, final AdvancedFSOps ops, String stormRoot) throws IOException { + return DEP_LOC_CACHE.get(conf, topologyId, ops, stormRoot).get(); + } + /** * Get parameters for the class path of the worker process. Also used by the * log Writer @@ -479,19 +556,7 @@ private String getWorkerLoggingConfigFile() { */ private List getClassPathParams(final String stormRoot) throws IOException { final String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot); - final StormTopology stormTopology = ConfigUtils.readSupervisorTopology(_conf, _topologyId, _ops); - final List dependencyLocations = new ArrayList<>(); - if (stormTopology.get_dependency_jars() != null) { - for (String dependency : stormTopology.get_dependency_jars()) { - dependencyLocations.add(new File(stormRoot, dependency).getAbsolutePath()); - } - } - - if (stormTopology.get_dependency_artifacts() != null) { - for (String dependency : stormTopology.get_dependency_artifacts()) { - dependencyLocations.add(new File(stormRoot, dependency).getAbsolutePath()); - } - } + final List dependencyLocations = getDependencyLocationsFor(_conf, _topologyId, _ops, stormRoot); final String workerClassPath = getWorkerClassPath(stormJar, dependencyLocations); List classPathParams = new ArrayList<>();