Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-3776] Flink Scala shell does not allow to set configuration fo… #1945

Closed
wants to merge 1 commit into from
Closed

Conversation

eastcirclek
Copy link
Contributor

The current version of FlinkShell creates a new Configuration object when creating a LocalFlinkMiniCluster in FlinkShell.fetchConnectionInfo().
Instead of creating a new one, FlinkShell just needs to get a configuration object which was already created when GlobalConfiguration.loadConfiguration() is called (which is before FlinkShell.fetchConnectionInfo() is called).
Therefore, just one line modification figures out this issue as shown in this pull request.

@mxm
Copy link
Contributor

mxm commented May 11, 2016

Thanks for the pull request. Looks good. Have you tested that the configuration is loaded correctly?

@eastcirclek
Copy link
Contributor Author

eastcirclek commented May 12, 2016

@mxm

Yes, I tested it by simply assigning a non-existent directory to taskmanager.tmp.dirs in $FLINK_CONF_DIR/flink-conf.yaml which is read and parsed by GlobalConfiguration.loadConfiguration().

I got the following error messages:
~/flink/flink-dist/target/flink-1.1-SNAPSHOT-bin/flink-1.1-SNAPSHOT/bin$ start-scala-shell.sh local
Starting Flink Shell:
Exception in thread "main" java.io.IOException: Temporary file directory /tmp/east does not exist.
at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$checkTempDirs$1.apply(TaskManager.scala:2162)
at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$checkTempDirs$1.apply(TaskManager.scala:2157)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.flink.runtime.taskmanager.TaskManager$.checkTempDirs(TaskManager.scala:2157)
at org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1752)
at org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.startTaskManager(LocalFlinkMiniCluster.scala:142)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:319)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:312)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:312)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:269)
at org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:148)
at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:187)
at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:186)
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:134)
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)

The above error implies that the modification to $FLINK_CONF_DIR/flink-conf.yaml is loaded correctly :-)

@mxm
Copy link
Contributor

mxm commented May 12, 2016

Looks good. Do you think we could make this more explicit by passing the loaded configuration object to the fetchConnectionInfo method? That way we don't depend on global variables and we load the config only once.

@eastcirclek
Copy link
Contributor Author

eastcirclek commented May 13, 2016

Seems fine but it seems to go against the design of GlobalConfiguration.
The singleton GlobalConfiguration object is already get and used in that way on few places.
How do you think?

@mxm
Copy link
Contributor

mxm commented May 13, 2016

GlobalConfiguration doesn't ensure that the config has been loaded when you call get(). It will give you an empty Configuration if you do not call loadConfiguration explicitly. If you pass the config after you called the load method, it is clear that the config has been loaded.

Your code works, I'll will just open a follow-up issue to make GlobalConfiguration more explicit, i.e. fail on get() if the config hasn't been loaded explicitly.

@asfgit asfgit closed this in 099fdfa May 13, 2016
@eastcirclek
Copy link
Contributor Author

Okay, I got the idea.

@mxm
Copy link
Contributor

mxm commented May 13, 2016

Thanks for the PR!

gna-phetsarath pushed a commit to gna-phetsarath/flink that referenced this pull request May 13, 2016
…r local execution

This closes apache#1945

[FLINK-3655] Multiple Directories (Paths) for FileInputFormat.
Removed Guava.

[FLINK-3655] Multiple Directories (Paths) for FileInputFormat.
Removed Guava.

[FLINK-3655] Multiple Directories (Paths) for FileInputFormat.
Removed Guava.

[FLINK-3655] Multiple Directories (Paths) for FileInputFormat.
Removed Guava.

[FLINK-3655] Multiple Directories (Paths) for FileInputFormat.
Removed Guava.
markreddy pushed a commit to markreddy/flink that referenced this pull request May 14, 2016
mbode pushed a commit to mbode/flink that referenced this pull request May 27, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants