Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17159][Streaming] optimise check for new files in FileInputDStream #17745

Conversation

steveloughran
Copy link
Contributor

What changes were proposed in this pull request?

Changes to FileInputDStream to eliminate multiple getFileStatus() calls when scanning directories for new files.

This is a minor optimisation when working with filesystems, but significant when working with object stores, as it eliminates HTTP requests per source file scanning the system. The current cost is 1-3 probing to see if a path is a directory or not, one more to actually timestamp a file. The new patch gets the file status and retains it through all the operations, so does not need to re-evaluate it.

The impact of this optimisation is 3 HTTP requests per source directory and 1 per file, for every single directory in the scan list, and for every file in the scanned directories, irrespective of the age of the directories. At 100+mS per HEAD request against S3, the speedup is significant, even when there are few files in the scanned directories.

Before

  1. Two separate list operations, globStatus() to find directories, then listStatus() to scan for new files under directories.
  2. The path filter in the globStatus() operations calls getFileStatus(filename) to probe for a file being a directory;
  3. getFileStatus() is also used in the listStatus() call to check the timestamp.

Against an object store getFileStatus() can cost 1-4 HTTPS requests per call (HEAD path, HEAD path + "/", LIST path),

As both list operations return an array or iterator of FileStatus objects, the operations are utterly superfluous. Instead the filtering can take
place after the listing has returned.

After

  1. The output of globStatus() is filtered to select only directories.
  2. The output of listStatus() is filtered by timestamp.
  3. The special failure case of globStatus(): no path, is handled specially in the warning text by saying "No Directory to scan", and omitting the full stack trace.
  4. The fileToModTime map is superflous, and so deleted.

How was this patch tested?

  1. There is a new test in org.apache.spark.streaming.InputStreamsSuite
  2. I have object store integration tests in an external repository, which have been used to verify functionality and that the number of HTTP requests is reduced when invoked against S3A endpoints.

…ile status requests when querying files. This is a minor optimisation when working with filesystems, but significant when working with object stores.

Change-Id: I269d98902f615818941c88de93a124c65453756e
@SparkQA
Copy link

SparkQA commented Apr 24, 2017

Test build #76106 has finished for PR 17745 at commit f3ffe1d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@steveloughran
Copy link
Contributor Author

Due to lack of support/interest, moved to https://github.com/hortonworks-spark/cloud-integration

override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory
}
val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath)
val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this approach, we might be fetching a very large list of files and then filtering through the directories. If the fetched, list is too large, then it can be a problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, on looking at the code of glob status, it does filter at the end, so doing something like above might just be ok.

Also globStatus does a listStatus() per child directory or a getFileStatus() in case input pattern is not a glob, each call to listStatus does 3+ http calls and each call to getFileStatus does 2 http calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

globStatus is flawed; key limit is that it does a tree walk. It needs to be replaced with an object-store-list specific one. See HADOOP-13371.

The issue with implementing an s3a flat-list and filter is that if the wildcard is a few entries up from the child path and there are lots of children, e..g

s3a://bucket/data/year=201?/month=*/day=*/

then if there are many files under year/month/day entries, all get listed during the filter.

What I think would need to be done is to be able to config the FS to limit the depth of where it switches to bulk listing; so here I could say "depth=2", and so the year=? would be done via globbing, but the month= and day= would be better.

Or maybe just start with making the whole thing optional, and let the caller deal with it.

Anyway, options here

  • fix the Hadoop side call. Nice and broadly useful
  • see if spark can be moved off the globStatus call. Will change matching. But if you provide a new "cloudstore" connector, that could be done, couldn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, having an object store specific version of glob, will be broadly helpful. In the mean time, this patch seems to be saving a lot of http requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still a lot; I think we can do a new one.

Latest version of this code is here; I think its time to set up a module in bahir for this

@ScrapCodes
Copy link
Member

Can you please reopen this? I had like to discuss, if we can merge it in the spark itself.

@ScrapCodes
Copy link
Member

It appears, there are more people using object store now, than ever. For those who are attached to old versions of spark streaming, having this would be good.

Hi @steveloughran, are you planning to work on it ? or shall I take it forward from here?
I am contemplating what can be done. So far the plan is we will temporarily maintain it as an experimental component in Apache Bahir, for the time it is not merged in mainstream spark. If you are willing to maintain the component, then please send a pull request to Bahir with just this patch applied.

@steveloughran
Copy link
Contributor Author

Patch is in the spark cloud integration module, you can take it and try to get into ASF spark provided you also add some credit to me in the patch.

ScrapCodes pushed a commit to ScrapCodes/spark that referenced this pull request Sep 5, 2018
…Object store.

Based on apache#17745. Original work by Steve Loughran.

This is a minimal patch of changes to FileInputDStream to reduce File status requests when querying files.

This is a minor optimisation when working with filesystems, but significant when working with object stores.

Change-Id: I269d98902f615818941c88de93a124c65453756e
asfgit pushed a commit that referenced this pull request Oct 5, 2018
…g against Object store.

## What changes were proposed in this pull request?

Original work by Steve Loughran.
Based on #17745.

This is a minimal patch of changes to FileInputDStream to reduce File status requests when querying files. Each call to file status is 3+ http calls to object store. This patch eliminates the need for it, by using FileStatus objects.

This is a minor optimisation when working with filesystems, but significant when working with object stores.

## How was this patch tested?

Tests included. Existing tests pass.

Closes #22339 from ScrapCodes/PR_17745.

Lead-authored-by: Prashant Sharma <prashant@apache.org>
Co-authored-by: Steve Loughran <stevel@hortonworks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…g against Object store.

## What changes were proposed in this pull request?

Original work by Steve Loughran.
Based on apache#17745.

This is a minimal patch of changes to FileInputDStream to reduce File status requests when querying files. Each call to file status is 3+ http calls to object store. This patch eliminates the need for it, by using FileStatus objects.

This is a minor optimisation when working with filesystems, but significant when working with object stores.

## How was this patch tested?

Tests included. Existing tests pass.

Closes apache#22339 from ScrapCodes/PR_17745.

Lead-authored-by: Prashant Sharma <prashant@apache.org>
Co-authored-by: Steve Loughran <stevel@hortonworks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
zzcclp added a commit to zzcclp/spark that referenced this pull request Sep 20, 2019
…reaming against Object store. apache#22339

Original work by Steve Loughran.
Based on apache#17745.

This is a minimal patch of changes to FileInputDStream to reduce File status requests when querying files. Each call to file status is 3+ http calls to object store. This patch eliminates the need for it, by using FileStatus objects.

This is a minor optimisation when working with filesystems, but significant when working with object stores.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants