-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-1795 - Add recursive directory file search to fileInputStream #537
Conversation
Can you please add a JIRA for this and add the JIRA number in the title, like other PRs. |
Also, please add a unit test for this usecase in the InputStreamsSuite |
F <: NewInputFormat[K, V]: ClassTag | ||
] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean, recursive: Boolean): DStream[(K, V)] = { | ||
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, recursive) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like an api change - please add default value to recursive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have included a default value on the FileInputDStream but not on the API itself.
Wondering if we want to introduce default values to the more granular version of the API. Currently, it looks like the exposed API essentially has two versions for these methods -- one that assumes default values and one that exposes all the parameters of the DStream constructor.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which version of spark can we get the API with support for nested directory streaming?
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) | ||
|
||
val filePaths: Array[Path] = if (recursive) | ||
recursiveFileList(fs.listStatus(directoryPath).toList).toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the input directory is already the lowest level of directory then it will not consider any files in it.
example:
Consider the following directory.
/a/file1.txt
/a/file2.txt and so on .
If the input directory is given as "/a", there will be no output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can call this like
val filePaths: Array[Path] = if (recursive)
recursiveListDirs(List(fs.getFileStatus(new Path(directoryPath)))).toArray
Can one of the admins verify this patch? |
@patrickotoole Sorry for this patch sitting around here for so long without any attention. Mind updating this patch to the latest code. |
I suggest we close this in favor of #2765 since it implements recursion with max depth, merges, and was active more recently. |
Can one of the admins verify this patch? |
Mind closing this PR? |
One-line code change which is the initial patch for [HADOOP-16248](https://issues.apache.org/jira/browse/HADOOP-16248). See internal ticket number 87611 for more context.
* [SPARK-27267][CORE] Update snappy to avoid error when decompressing empty serialized data (apache#531) * [SPARK-27514][SQL] Skip collapsing windows with empty window expressions (apache#538) * Bump hadoop to 2.9.2-palantir.5 (apache#537)
Update ssh_known_hosts
Added recursive directory search to fileInputStream. Want spark to be able to find files in the subdirectories rather than just the parent directory.