Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLINK-2380: allow to specify the default filesystem scheme in the flink configuration file. #1524

Closed
wants to merge 1 commit into from

Conversation

kl0u
Copy link
Contributor

@kl0u kl0u commented Jan 19, 2016

No description provided.

@@ -52,6 +52,14 @@ The configuration files for the TaskManagers can be different, Flink does not as
- `parallelism.default`: The default parallelism to use for programs that have no parallelism specified. (DEFAULT: 1). For setups that have no concurrent jobs running, setting this value to NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all available execution resources for the program's execution. **Note**: The default parallelism can be overwriten for an entire job by calling `setParallelism(int parallelism)` on the `ExecutionEnvironment` or by passing `-p <parallelism>` to the Flink Command-line frontend. It can be overwritten for single transformations by calling `setParallelism(int
parallelism)` on an operator. See the [programming guide]({{site.baseurl}}/apis/programming_guide.html#parallel-execution) for more information about the parallelism.

- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority (if needed).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean with necessary authority?

@uce
Copy link
Contributor

uce commented Jan 20, 2016

Hey Klou! Nice addition. A couple of users have asked for this. I guess they will be happy. I made some minor comments inline. The changes look good, I haven't tested it yet manually, but I've noticed that there are no test cases. I think we should add a simple test that ensures the basic expected behaviour (use default scheme is file, use default scheme if specified, don't use default scheme if explicit in URI etc.) After adding the tests and trying it out manually, I think it will be good to merge. :)

@kl0u kl0u force-pushed the fs_param branch 6 times, most recently from 9aefa6f to 2fc305b Compare January 25, 2016 13:13
@@ -19,13 +19,19 @@
package org.apache.flink.configuration;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These added imports are probably not necessary

@StephanEwen
Copy link
Contributor

Thanks for picking up this issue. This goes into a very good direction.
Few remaining points:

  • There are a lot of unrelated reformattings. The general rule is not to reformat comments or indentation for anything not touched in the pull request. Otherwise all people keep just formatting back and forth each others changes, because everyone believes they personally figured out the better code style ;-) Would be good to undo the changes in the FileSystem class, for example.
  • Some tests would be great. With refactorings of YARN going on, I can see that this change will be accidentally undone next time something with a main() method is touched, unless a test guards this.

@kl0u
Copy link
Contributor Author

kl0u commented Feb 2, 2016

Could you explain what more tests do you have in mind? So far I am testing 1) if the scheme provided in the configuration is used when one is not explicitly provided, 2) if an explicit scheme overrides the configuration one, and 3) if a scheme from the configuration overrides the default one.

@kl0u kl0u force-pushed the fs_param branch 4 times, most recently from 656b17d to 7be180c Compare February 7, 2016 15:45
@kl0u kl0u force-pushed the fs_param branch 2 times, most recently from 3119123 to 8233a5b Compare February 9, 2016 16:02
@rmetzger
Copy link
Contributor

rmetzger commented Feb 9, 2016

I'm testing the change on a cluster (with YARN) to see if everything is working as expected.

@rmetzger
Copy link
Contributor

rmetzger commented Feb 9, 2016

I identified the following issues:

  • Setting the configuration using the yarn session "dynamic properties": ./bin/yarn-session.sh -n 2 -Dfs.default-scheme=hdfs:/// does not really work (the configuration parameter shows up in the web interface, but the job fails)
  • Setting a false schema leads to a null pointer exception on job submission. In the flink-conf.yaml, I have fs.default-scheme: thisIsWrong. Look at this:
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter:  ))': null
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
    at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:804)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:331)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1127)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1175)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter:  ))': null
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:981)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:965)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:965)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:378)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:105)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:255)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
    at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:275)
    at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:977)
    ... 29 more
robert@cdh544-master:~/flink/build-target$ ./bin/flink run ./examples/batch/WordCount.jar /user/robert/tpch-100lineitems.csv /user/robert/elasdoijwef

@rmetzger
Copy link
Contributor

rmetzger commented Feb 9, 2016

Setting the value to fs.default-scheme: thisIsWrong:///

is good:

robert@cdh544-master:~/flink/build-target$ ./bin/flink run ./examples/batch/WordCount.jar /user/robert/tpch-100lineitems.csv /user/robert/elasdoijwef

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter:  ))': No file system found with scheme thisIsWrong, referenced in file URI 'thisIsWrong:/user/robert/elasdoijwef'.
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
    at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:78)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:804)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:331)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1127)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1175)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (CsvOutputFormat (path: /user/robert/elasdoijwef, delimiter:  ))': No file system found with scheme thisIsWrong, referenced in file URI 'thisIsWrong:/user/robert/elasdoijwef'.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:981)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:965)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:965)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:378)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:105)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: No file system found with scheme thisIsWrong, referenced in file URI 'thisIsWrong:/user/robert/elasdoijwef'.
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:290)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
    at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:275)
    at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:84)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$6.apply(JobManager.scala:977)
    ... 29 more

@kl0u
Copy link
Contributor Author

kl0u commented Feb 11, 2016

Thanks @rmetzger for the comment. Will fix it!

@kl0u
Copy link
Contributor Author

kl0u commented Feb 11, 2016

I have updated the PR with the new comments.
Please review the new PR.

@kl0u kl0u force-pushed the fs_param branch 2 times, most recently from 0da5fbe to 5e20f40 Compare February 11, 2016 22:54
@rmetzger
Copy link
Contributor

Thank you.

  • ./bin/yarn-session.sh -tm 1024 -n 7 -s 2 -Dfs.default-scheme=hdfs:/// is now working
  • Setting the fs.default-scheme to thisIsWrong and trying to read from /user/robert/trashaa leads to:
Caused by: java.io.IOException: The URI '/user/robert/trashaa' is invalid. Hint: Did you specify only the PATH to your file (without the scheme) but forgot the initial slash (/) in the beginning???

I don't think putting three question marks there is a good idea. I think it implicates that the user is too stupid to properly specify a file path.
And I would like to see the default scheme there. Right now, its quite hard to see what's wrong.

the error reporting for the default scheme: thisIsWrong:/// and the path /user/robert/trashaa is good again:

Caused by: java.io.IOException: No file system found with scheme thisIsWrong, referenced in file URI 'thisIsWrong:/user/robert/trashaaa'.

I would expect a similar error reporting for fs.default-scheme=thisIsWrong

@kl0u
Copy link
Contributor Author

kl0u commented Feb 12, 2016

Thanks for the comment @rmetzger. I changed the error message. Please review and let me know.

@@ -176,6 +184,8 @@ abstract class ApplicationMasterBase {
jobManagerPort, webServerPort, slots, taskManagerCount,
dynamicPropertiesEncodedString)

//todo should I also set the FS default here????
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I'll remove this TODO when merging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rmetzger Yes I know. That comment was forgotten since earlier.

@rmetzger
Copy link
Contributor

I tested the change again on a cluster. Everything is working nicely with YARN.

I'll merge the PR.

@kl0u
Copy link
Contributor Author

kl0u commented Feb 16, 2016

Perfect! Thanks a lot @rmetzger

@asfgit asfgit closed this in 76d3a63 Feb 16, 2016
subhankarb pushed a commit to subhankarb/flink that referenced this pull request Mar 17, 2016
@kl0u kl0u deleted the fs_param branch June 9, 2016 13:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants