New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-6655] Add validateAndNormalizeUri method to MemoryArchivist #4156
Conversation
|
||
// some validity checks | ||
if (scheme == null) { | ||
throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's include the config option key in the error message.
} | ||
|
||
if (path == null) { | ||
throw new IllegalArgumentException("The path to store the archive job is null. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, config option key. Also, it should say "to store the job archives is null,"
|
||
if (path == null) { | ||
throw new IllegalArgumentException("The path to store the archive job is null. " + | ||
"Please specify a directory path for archive.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> "for storing job archives."
} | ||
|
||
if (path.length == 0 || path == "/") { | ||
throw new IllegalArgumentException("Cannot use the root directory for archive.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> "for storing job archives."
// this is because the required filesystem classes may not be available to the flink client | ||
new Path(archivePathUri) | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need this branch, as we only access the path from the jobmanager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, That's true. Only FsStateBackend needs this branch access the path from the file system.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if and else branch both can delete. If I am wrong, please let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should throw an exception if the scheme isn't supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not very sure what kind of scheme that flink support. For example, I just know existing scheme from what those codes say hdfs
or file
. If I know what kinda scheme that flink support, I probably fix the code like if (scheme.startsWith("hdfs") or scheme.startsWith("file"))
or something like that. Please helps check the newest code again and give some advices. :XD
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you also just use FileSystem.isFlinkSupportedScheme
which was used in the original method,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohhh, yeah. What a silly mistake I was making. I can get what kinda scheme from FileSystem.isFlinkSupportedScheme
. I have updated the code, please check again. :D
01dda64
to
469b7e0
Compare
if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) { | ||
// skip verification checks for non-flink supported filesystem | ||
// this is because the required filesystem classes may not be available to the flink client | ||
throw new IllegalArgumentException("Cannot use the " + archivePathUri.getScheme + " scheme, only hdfs, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is bound to be outdated at some point, so tet's re-use an exception from the FileSystem class:
"No file system found with scheme " + scheme + ", referenced in file URI '" + archivePathUri.toString() + "'."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:D I have updated the code. Thanks @zentol VERY careful review. I am very appreciate it!
will try this out and merge it afterwards. |
Nice ~ |
@zentol If you dont mind, you can give me your sample test code via my personal email |
For the local test i wouldn't use test code. build Flink -> mess with HistoryServer config -> start HistoryServer -> see what happens |
Okay, Let me try. |
BTW, What is your Program arguments ? start with --configDir ... |
Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.
General
Documentation
Tests & Build
mvn clean verify
has been executed successfully locally or a Travis build has passed