Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2525]Add configuration support in Storm-compatibility #1046

Closed
wants to merge 1 commit into from

Conversation

ffbin
Copy link
Contributor

@ffbin ffbin commented Aug 24, 2015

  • enable config can used in Spouts.open() and Bout.prepare().

Example like this:
public static void main(final String[] args) {
String topologyId = "Streaming WordCount";
final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
...
final Config conf = new Config();
conf.put("wordsFile", "/home/user/");
conf.put("delimitSize", 1024);
final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, conf, builder.createTopology());
Utils.sleep(10 * 1000);
cluster.killTopology(topologyId);
cluster.shutdown();
}

public class WordReader implements IRichSpout {
....
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get("wordsFile"));
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
}
this.collector = collector;
}
}

public final class StormBoltTokenizer implements IRichBolt {
....
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.delimitSize = stormConf.get("delimitSize");
this.collector = collector;
}
}

{
Configuration jobConfiguration = ((StreamingRuntimeContext) ctx).getJobConfiguration();
stormConf = new HashMap<String, Object>();
stormConf.putAll(jobConfiguration.getConfData());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing null test on jobConfiguration (or is this test not necessary -- it is done in StromBoltWrapper)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add null test.In fact, jobConfiguration can not be null. Because StromBoltWrapper unit test mock StreamingRuntimeContext object, so its jobConfiguration can be null.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it cannot be null in test, does not mean it cannot be null in real cluster deployment...

@mjsax
Copy link
Member

mjsax commented Aug 24, 2015

I don't see changes in FlinkClient. Only in FlinkLocalCluster. Did you test by starting Flink via bin/start-local.sh (it would be even better to test in a real cluster)?

@mjsax
Copy link
Member

mjsax commented Aug 24, 2015

Travis fails because you broke something...

@ffbin
Copy link
Contributor Author

ffbin commented Aug 24, 2015

@mjsax Thank you very much.I miss the change in FlinkClient.I will fix it and test via bin/start-local.sh.In china, now we can not see the CI details and it is hard to know why CI failed.Thank you for your reminder.

@mjsax
Copy link
Member

mjsax commented Aug 24, 2015

It fails in two test. You should actually see it, if you execute test locally. You should run test each time before you open/update an PR (at least for the module you did changes).

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.751 sec <<< FAILURE! - in org.apache.flink.stormcompatibility.split.BoltSplitITCase
testTopology(org.apache.flink.stormcompatibility.split.BoltSplitITCase) Time elapsed: 0.635 sec <<< ERROR!
java.lang.NullPointerException: null
at org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopologyWithOpts(FlinkLocalCluster.java:52)
at org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopology(FlinkLocalCluster.java:41)
at org.apache.flink.stormcompatibility.split.StormSplitStreamBoltLocal.main(StormSplitStreamBoltLocal.java:42)
at org.apache.flink.stormcompatibility.split.BoltSplitITCase.testTopology(BoltSplitITCase.java:25)

and

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.704 sec <<< FAILURE! - in org.apache.flink.stormcompatibility.split.SpoutSplitITCase
testTopology(org.apache.flink.stormcompatibility.split.SpoutSplitITCase) Time elapsed: 0.584 sec <<< ERROR!
java.lang.NullPointerException: null
at org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopologyWithOpts(FlinkLocalCluster.java:52)
at org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopology(FlinkLocalCluster.java:41)
at org.apache.flink.stormcompatibility.split.StormSplitStreamSpoutLocal.main(StormSplitStreamSpoutLocal.java:42)
at org.apache.flink.stormcompatibility.split.SpoutSplitITCase.testTopology(SpoutSplitITCase.java:25)

Just out of curiosity: why can you not see Travis details?

@ffbin
Copy link
Contributor Author

ffbin commented Aug 24, 2015

@mjsax .The reason why i can not see Travis details is that(from reply mail):
The problem is that our CDN is currently blocked in mainland China. I'm talking to our CDN provider right now for getting a custom SSL certificate and domain set up, so we should be usable from China within the next weeks hopefully.

I will fix the code.I only run the test of core, and miss the test in example.It is my fault.Thanks!

@ffbin ffbin force-pushed the FLINK-2525 branch 2 times, most recently from 5dcf493 to 705b56d Compare August 25, 2015 07:14
Map mapExpect = new HashMap();
mapExpect.put(new String("path"), new String("/home/user/file.txt"));
mapExpect.put(1, 1024);
verify(bolt).prepare(eq(mapExpect), any(TopologyContext.class), any(OutputCollector.class));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove mapExpected and use stormConf from above to avoid code duplication.

@mjsax
Copy link
Member

mjsax commented Aug 26, 2015

Can anyone have a look at Configuration.java. Not sure if the changes are ok.

@mjsax
Copy link
Member

mjsax commented Aug 26, 2015

@ffbin Can you extend ExclamationTopology such that the number of added !in ExclamationBolt and ExclamationWithStormSpout.ExclamationMap is configurable and adapt the tests accordingly. Please add an additional user parameter to ExclamationWithStormSpout and ExclamationWithStormBolt. Furhtermore, please extend FiniteStormFileSpout or base class with an empty constructor and configure the file to be opened via Storm configuration Map for this case.

@StephanEwen
Copy link
Contributor

Putting a nested "stormConf" into the configuration seems just wrong, sorry. Such a specific hack in a generic utility cannot yield maintainable code.

Why is that needed in the first place? Why not have a dedicated configuration object for storm?

@StephanEwen
Copy link
Contributor

You can turn the storm config into a byte[] via the InstantiationUtil class. That byte[] can be stored in the regular config.

We could also add a "nested config" class which only shows keys that start with a certain key prefix. This would be a config view over another config. Inside the TaskConfig, we use this trick with the delegating config. It may be useful here as well.

@mjsax
Copy link
Member

mjsax commented Aug 27, 2015

I think the byte[]converting approach is the correct way to go. Storm config keys must not be String, thus the specific prefix trick cannot be applied.

@ffbin
Copy link
Contributor Author

ffbin commented Aug 27, 2015

@StephanEwen Thansk.The key of storm config is object, so maybe the confData(HashMap<String, Object>) of Configuration is not enough.

@StephanEwen
Copy link
Contributor

@ffbin: Can you try if you can simply put the serialized Storm Config as a byte[] into the Flink configuration? You can the unpack it inside the storm code, when needed.

@ffbin
Copy link
Contributor Author

ffbin commented Aug 27, 2015

@mjsax hi. I want to make the number of added '!' in ExclamationBolt and ExclamationWithStormSpout.ExclamationMap configurabled by prepare() / open() function.The number can be get from jobConfiguration.What is your suggestion? Thanks

@mjsax
Copy link
Member

mjsax commented Aug 27, 2015

Sorry, but I don't understand your question...

@ffbin
Copy link
Contributor Author

ffbin commented Aug 27, 2015

@StephanEwen Hi. Your suggestion is good.I think i can serialized the key and value together as the value of confData in Job configuration.And create stormconf prefix(like "stormcong_1") as key of confData.Then get storm conf from job configuration and add them into task configuration.I will have a try.

@mjsax
Copy link
Member

mjsax commented Aug 27, 2015

Can you try to serialize the whole Map into a single byte[]?

@ffbin
Copy link
Contributor Author

ffbin commented Aug 27, 2015

Oh. you are right. Serialize the whole Map into a single byte[] is better.Thanks.

@ffbin
Copy link
Contributor Author

ffbin commented Aug 31, 2015

@mjsax @StephanEwen I have finish the code changes.
1.serialize Storm Config as a byte[] into the Flink configuration
2.extend ExclamationTopology such that the number of added !in ExclamationBolt and ExclamationWithStormSpout.ExclamationMap is configurable and adapt the tests.
3.extend FiniteStormFileSpout and base class with an empty constructor and configure the file to be opened via Storm configuration Map.
I have run flink-storm-compatibility test successfully in local machine and do not know why CI failed.
Can you have a look at my code? Thank you very much.

@mjsax
Copy link
Member

mjsax commented Aug 31, 2015

This is the stack trace (occurs in 4/5 runs -- the other run failed before due to unrelated test). It seems you broke something.

Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 5.149 sec <<< FAILURE! - in org.apache.flink.stormcompatibility.exclamation.ExclamationWithStormSpoutITCase
testJobWithoutObjectReuse(org.apache.flink.stormcompatibility.exclamation.ExclamationWithStormSpoutITCase)  Time elapsed: 3.825 sec  <<< FAILURE!
java.lang.AssertionError: Error while calling the test program: Job execution failed.
    at org.junit.Assert.fail(Assert.java:88)
    at org.apache.flink.streaming.util.StreamingProgramTestBase.testJobWithoutObjectReuse(StreamingProgramTestBase.java:102)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264)
    at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
    at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124)
    at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200)
    at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153)
    at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)

I will review after you fixed it.

1.Storm topologies mode example

...
Map conf = new HashMap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please use the same markup code as in the other code examples.

@mjsax
Copy link
Member

mjsax commented Sep 8, 2015

I just had a few "cosmetic" comments. Otherwise it looks good to me to get merged.

@ffbin ffbin force-pushed the FLINK-2525 branch 4 times, most recently from 0165a26 to 687c664 Compare September 9, 2015 10:12
@ffbin
Copy link
Contributor Author

ffbin commented Sep 9, 2015

@mjsax Thanks. I have finish the change about all comments.
@StephanEwen @rmetzger Can you have a look at it if it can be merged? Thank you very much!

@ffbin
Copy link
Contributor Author

ffbin commented Sep 14, 2015

@StephanEwen @rmetzger Can you have a look at it if it can be merged? I am also work on storm task hooks and it depend on this PR. Thank you very much!

@@ -169,6 +169,13 @@ The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.

See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.

## Configure for embedded Spouts/Bolts
Embedded Spouts/Bolts can be configure with user defined parameters.
User defined parameters is stored in a `Map`(as in Storm).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... are stored ...

@fhueske
Copy link
Contributor

fhueske commented Sep 14, 2015

Actually, I think going through the TaskConfig as proposed by @mjsax is the cleaner way. Going through the system-internal JobConfiguration and exposing it to user programs is not a good choice, in my opinion.

The purpose of TaskConfig is exactly to give parameters to a task (function, spout). Also the StreamingRuntimeContext would not need to be adapted, because it already offers a method getTaskStubParameters(). Would that work as well or are there major issues preventing you from using the TaskConfig?

@mjsax
Copy link
Member

mjsax commented Sep 14, 2015

It is not clear (at least to me) how to do this. The API does not offer an (obvious) way to set a configuration... (or I just don't get it). StreamExecutionEnvironment only offers .getConfig() and there is no .withParameters(...) in Streaming API (which is "deprecated" even in Batch API according to the discussion on the dev list).

IHMO, the best way would be the possibility to set a configuration in the environment that is distributed to all operators. Should be extend Streaming API for this?

@StephanEwen
Copy link
Contributor

As per discussion on the dev list, the ExecuionConfig has the GlobalJobParameters, which are useful if one type of config is used across all operators.

If each of the operators needs its own config, can you create an abstract base class for the storm functions which takes a configuration as an argumen?

BTW: There is no plan to remove the withParameters() method in the batch API. It is just not the encouraged mechanism any more...

@mjsax
Copy link
Member

mjsax commented Sep 15, 2015

Storm only supports one global configuration that is shared over all spout/bolts. So GlobalJobParameter will work just fine.

@fhueske
Copy link
Contributor

fhueske commented Sep 18, 2015

Hi @ffbin,
not sure if you followed the discussion on the mailing list, but we discussed to use the ExecutionConfig instead of the JobConfig. The reason is that ExecutionConfig is user-facing and JobConfig is used for system internal configurations.

See the discussion here.

It would be nice, if you could update the PR to use ExecutionConfig.
Thanks a lot, Fabian

@ffbin
Copy link
Contributor Author

ffbin commented Sep 21, 2015

Hi @fhueske , i will update the PR to use ExecutionConfig. Thanks.

@asfgit asfgit closed this in 9fe285a Oct 2, 2015
sachingoel0101 pushed a commit to sachingoel0101/flink that referenced this pull request Oct 8, 2015
lofifnc pushed a commit to lofifnc/flink that referenced this pull request Oct 8, 2015
cfmcgrady pushed a commit to cfmcgrady/flink that referenced this pull request Oct 23, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants