Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STORM-3685] Detect and prevent cycles when Topology is submitted. #3322

Merged
merged 8 commits into from
Sep 4, 2020

Conversation

bipinprasad
Copy link
Contributor

What is the purpose of the change

Topology is expected to be a Directed Acyclic Graph. Cycles in component flow can cause unexpected behavior, for example a deadlock when one of the loop components signals a back-pressure. Or properly detecting proximity when scheduling.

How was the change tested

New test to create topology with and without cycles and test detection.

@bipinprasad
Copy link
Contributor Author

Close/Reopen for the rebuild - failures due to license file failures for JDK 11?

@bipinprasad bipinprasad reopened this Aug 10, 2020
@bipinprasad
Copy link
Contributor Author

close/reopen to rebuild.

@bipinprasad bipinprasad reopened this Aug 11, 2020
@bipinprasad
Copy link
Contributor Author

close/reopen to rebuild.

@bipinprasad bipinprasad reopened this Aug 13, 2020
Copy link
Contributor

@Ethanlm Ethanlm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some minor comments (including previous comments above). Otherwise looks good to me.

storm-client/src/jvm/org/apache/storm/utils/Utils.java Outdated Show resolved Hide resolved
@@ -233,7 +235,7 @@ public static void submitTopologyAs(String name, Map<String, Object> topoConf, S
conf.putAll(topoConf);
topoConf.putAll(prepareZookeeperAuthentication(conf));

validateConfs(conf, topology);
validateConfs(conf, topology, name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is not the right place to detect cycles. We should probably put it in another function, like
StormCommon.validateBasic

https://git.vzbuilders.com/storm/storm/blob/master/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java#L156

or create a new method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created new method StormCommon.validateCycleFree

Copy link
Contributor

@Ethanlm Ethanlm Sep 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving it to Utils is also okay, like
Utils.validateTopologyBlobStoreMap
Utils.validateTopologyName
etc.

Probably makes more sense since findComponentCycles is implemented inside Utils.

And we could make Utils.findComponentCycles "@VisibleForTesting". So the public method will be only Utils. validateCycleFree and the internal implementation can be changed if we'd like in the future.

Copy link
Contributor

@Ethanlm Ethanlm Sep 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bipinprasad
Sorry for not being clear at the first comment. What do you think about moving it to Utils?^ Trying to not expose too many public methods. Also StormCommon is org.apache.storm.daemon.StormCommon. StormSubmitter is completely on client side. I feel like Utils is a better place for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Will switch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from StormCommons to Utils.

@@ -51,6 +52,7 @@
import org.apache.storm.validation.ConfigValidation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.net.ProgressListener;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Will remove this.

StormCommon.validateCycleFree(topology, name);
} catch (InvalidTopologyException ex) {
LOG.warn(ex.get_msg(), ex);
System.out.println(ex.get_msg());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we not want to warn the user when submitting?

Copy link
Contributor

@Ethanlm Ethanlm Sep 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG.warn should be sufficient. We don't need System.out.println()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. removed sysout.

try {
StormCommon.validateCycleFree(topology, name);
} catch (InvalidTopologyException ex) {
LOG.warn(ex.get_msg(), ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will ex already contains ex.get_msg()? If so, we will see repeating messages, which should be avoid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor seems to imply that the message will not be in the Exception, since the proper super method is not called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have an output example that I can take a look?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InvalidTopologyException constructor does not call Exception.super(msg): https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/generated/InvalidTopologyException.java#L111

hence, its msg variable is not available in the Exception.getMessage()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. So the log will look like

19:22:23.981 [main] WARN  o.a.s.StormSubmitter - Topology wc contains cycles in components "count,split"
org.apache.storm.generated.InvalidTopologyException: null
	at org.apache.storm.daemon.StormCommon.validateCycleFree(StormCommon.java:596) ~[storm-client-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
	at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:244) [storm-client-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
	at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:214) [storm-client-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
	at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:177) [storm-client-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
	at org.apache.storm.topology.ConfigurableTopology.submit(ConfigurableTopology.java:119) [storm-client-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
	at org.apache.storm.starter.WordCountTopology.run(WordCountTopology.java:58) [storm-starter-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
	at org.apache.storm.topology.ConfigurableTopology.start(ConfigurableTopology.java:68) [storm-client-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
	at org.apache.storm.starter.WordCountTopology.main(WordCountTopology.java:36) [storm-starter-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]

We can probably use WrappedInvalidTopologyException inside validateCycleFree method so the stacktrace will make more sense. What do you think?
It will be like

org.apache.storm.utils.WrappedInvalidTopologyException: Topology wc2 contains cycles in components "count,split"
	at org.apache.storm.daemon.StormCommon.validateCycleFree(StormCommon.java:596) ~[storm-client-2.3.0-SNAPSHOT.jar:2.3.0-SNAPSHOT]
	.....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is better than a scary "null" in the stack trace.
Changed.

Copy link
Contributor

@Ethanlm Ethanlm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Thank you so much for addressing all the review comments

@Ethanlm Ethanlm merged commit 8399edc into apache:master Sep 4, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants