Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STORM-3685] Detect and prevent cycles when Topology is submitted. #3322

Merged
merged 8 commits into from
Sep 4, 2020
15 changes: 13 additions & 2 deletions storm-client/src/jvm/org/apache/storm/StormSubmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.storm.dependency.DependencyPropertiesParser;
import org.apache.storm.dependency.DependencyUploader;
import org.apache.storm.generated.AlreadyAliveException;
Expand Down Expand Up @@ -233,7 +235,7 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
conf.putAll(topoConf);
topoConf.putAll(prepareZookeeperAuthentication(conf));

validateConfs(conf, topology);
validateConfs(conf, topology, name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is not the right place to detect cycles. We should probably put it in another function, like
StormCommon.validateBasic

https://git.vzbuilders.com/storm/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java#L156

or create a new method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created new method StormCommon.validateCycleFree

Copy link
Contributor

@Ethanlm Ethanlm Sep 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving it to Utils is also okay, like
Utils.validateTopologyBlobStoreMap
Utils.validateTopologyName
etc.

Probably makes more sense since findComponentCycles is implemented inside Utils.

And we could make Utils.findComponentCycles "@VisibleForTesting". So the public method will be only Utils. validateCycleFree and the internal implementation can be changed if we'd like in the future.

Copy link
Contributor

@Ethanlm Ethanlm Sep 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bipinprasad
Sorry for not being clear at the first comment. What do you think about moving it to Utils?^ Trying to not expose too many public methods. Also StormCommon is org.apache.storm.daemon.StormCommon. StormSubmitter is completely on client side. I feel like Utils is a better place for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Will switch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from StormCommons to Utils.


Map<String, String> passedCreds = new HashMap<>();
if (opts != null) {
Expand Down Expand Up @@ -524,10 +526,19 @@ public static String submitJarAs(Map<String, Object> conf, String localJar, Prog
}
}

private static void validateConfs(Map<String, Object> topoConf, StormTopology topology) throws IllegalArgumentException,
private static void validateConfs(Map<String, Object> topoConf, StormTopology topology, String name) throws IllegalArgumentException,
InvalidTopologyException, AuthorizationException {
ConfigValidation.validateTopoConf(topoConf);
Utils.validateTopologyBlobStoreMap(topoConf);

List<List<String>> cycles = Utils.findComponentCycles(topology, name);
if (!cycles.isEmpty()) {
String err = String.format("Topology %s contains cycles in components \"%s\"", name,
cycles.stream()
.map(x -> String.join(",", x))
.collect(Collectors.joining(" ; ")));
LOG.warn(err);
}
}

/**
Expand Down
129 changes: 129 additions & 0 deletions storm-client/src/jvm/org/apache/storm/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.Stack;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.jar.JarFile;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
Expand Down Expand Up @@ -1907,4 +1909,131 @@ private void readArchive(ZipFile zipFile) throws IOException {
}
}
}

/**
* Create a map of forward edges for spouts and bolts in a topology. The mapping contains ids of the spouts and bolts.
*
* @param topology StormTopology to examine.
* @return a map with entry for each SpoutId/BoltId to a set of out bound edges of SpoutIds/BoltIds.
*/
private static Map<String, Set<String>> getStormTopologyForwardGraph(StormTopology topology) {
Map<String, Set<String>> edgesOut = new HashMap<>();

if (topology.get_spouts() != null) {
topology.get_spouts().entrySet().forEach(entry -> {
if (!Utils.isSystemId(entry.getKey())) {
Ethanlm marked this conversation as resolved.
Show resolved Hide resolved
entry.getValue().get_common().get_inputs().forEach((k, v) -> {
edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
});
}
});
}
if (topology.get_bolts() != null) {
topology.get_bolts().entrySet().forEach(entry -> {
if (!Utils.isSystemId(entry.getKey())) {
entry.getValue().get_common().get_inputs().forEach((k, v) -> {
edgesOut.computeIfAbsent(k.get_componentId(), x -> new HashSet<>()).add(entry.getKey());
});
}
});
}
return edgesOut;
}

/**
* Use recursive descent to detect cycles. This is a Depth First recursion. Component Cycle is recorded when encountered.
* In addition, the last link in the cycle is removed to avoid re-detecting same cycle/subcycle.
*
* @param stack used for recursion.
* @param edgesOut outbound edge connections, modified when cycle is detected.
* @param seen keeps track of component ids that have already been seen.
* @param cycles list of cycles seen so far.
*/
private static void findComponentCyclesRecursion(
Stack<String> stack, Map<String, Set<String>> edgesOut, Set<String> seen, List<List<String>> cycles) {
if (stack.isEmpty()) {
return;
}
String compId1 = stack.peek();
if (!edgesOut.containsKey(compId1) || edgesOut.get(compId1).isEmpty()) {
stack.pop();
return;
}
Set<String> children = new HashSet<>(edgesOut.get(compId1));
for (String compId2: children) {
if (seen.contains(compId2)) {
// cycle detected
List<String> cycle = new ArrayList<>();
if (compId1.equals(compId2)) {
cycle.add(compId2);
} else if (edgesOut.get(compId2).contains(compId1)) {
Ethanlm marked this conversation as resolved.
Show resolved Hide resolved
cycle.addAll(Arrays.asList(compId1, compId2));
} else {
List<String> tmp = Collections.list(stack.elements());
int prevIdx = tmp.indexOf(compId2);
if (prevIdx >= 0) {
tmp.subList(prevIdx, tmp.size());
Ethanlm marked this conversation as resolved.
Show resolved Hide resolved
}
tmp.add(compId2);
cycle.addAll(tmp);
}
cycles.add(cycle);
edgesOut.get(compId1).remove(compId2); // disconnect this cycle
continue;
}
seen.add(compId2);
stack.push(compId2);
findComponentCyclesRecursion(stack, edgesOut, seen, cycles);
}
stack.pop();
}

/**
* Find and return components cycles in the topology graph when starting from spout.
* Return a list of cycles. Each cycle may consist of one or more components.
* Components that cannot be reached from any of the spouts are ignored.
*
* @return a List of cycles. Each cycle has a list of component names.
*
*/
public static List<List<String>> findComponentCycles(StormTopology topology, String topoId) {
Ethanlm marked this conversation as resolved.
Show resolved Hide resolved
List<List<String>> ret = new ArrayList<>();
Map<String, Set<String>> edgesOut = getStormTopologyForwardGraph(topology);
Set<String> allComponentIds = new HashSet<>();
edgesOut.forEach((k, v) -> {
allComponentIds.add(k) ;
allComponentIds.addAll(v);
});

if (topology.get_spouts_size() == 0) {
LOG.error("Topology {} does not contain any spouts, cannot traverse graph to determine cycles", topoId);
ret.add(new ArrayList(edgesOut.keySet()));
Ethanlm marked this conversation as resolved.
Show resolved Hide resolved
return ret;
}

Set<String> unreachable = new HashSet<>(edgesOut.keySet());
topology.get_spouts().forEach((spoutId, spout) -> {
Stack<String> dfsStack = new Stack<>();
dfsStack.push(spoutId);
Set<String> seen = new HashSet<>();
seen.add(spoutId);
findComponentCyclesRecursion(dfsStack, edgesOut, seen, ret);
unreachable.removeAll(seen);
});

// warning about unreachable components
if (!unreachable.isEmpty()) {
LOG.warn("Topology {} contains unreachable components \"{}\"", topoId, String.join(",", unreachable));
}

// detected cycles
if (!ret.isEmpty()) {
LOG.error("Topology {} contains cycles {}", topoId,
Ethanlm marked this conversation as resolved.
Show resolved Hide resolved
ret.stream()
.map(x -> String.join(",", x))
.collect(Collectors.joining(" ; "))
);
}
return ret;
}
}
Loading