New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17159][STREAM] Significant speed up for running spark streaming against Object store. #22339
Conversation
…Object store. Based on apache#17745. Original work by Steve Loughran. This is a minimal patch of changes to FileInputDStream to reduce File status requests when querying files. This is a minor optimisation when working with filesystems, but significant when working with object stores. Change-Id: I269d98902f615818941c88de93a124c65453756e
Test build #95706 has finished for PR 22339 at commit
|
Hi, @ScrapCodes . Could you do the followings?
|
Retest this please. |
Test build #96047 has finished for PR 22339 at commit
|
For numbers, while testing with object store having 50 files/dirs, without this patch it took 130 REST requests for 2 batches to complete and with this patch it took 56 rest requests. So number of rest calls are reduced, and this translates to speedup. How much speed up is dependent on number of files, but for the particular test, I have run, it was 2x. |
Hi @srowen, would you like to take a look? Is there anything I can do, if this patch is missing something? I have tested it thoroughly against an object store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If @steveloughran is into it, I think this is OK. I see why it's faster.
val newFiles = directories.flatMap(dir => | ||
fs.listStatus(dir, newFileFilter).map(_.getPath.toString)) | ||
fs.listStatus(dir) | ||
.filter(isNewFile(_, currentTime, modTimeIgnoreThreshold)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think the indent is too deep here?
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime | ||
logInfo("Finding new files took " + timeTaken + " ms") | ||
logDebug("# cached file times = " + fileToModTime.size) | ||
logInfo(s"Finding new files took $timeTaken ms") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should be a debug statement. I don't feel strongly about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was originally @ info, so if it it filled up logs too much there'd be complaints. What's important is that the time to scan is printed, either @ info or debug, so someone can see what's happening. Probably what does need logging @ warn is when the time to scan is greater than the window, or just getting close to it.
override def accept(path: Path): Boolean = fs.getFileStatus(path).isDirectory | ||
} | ||
val directories = fs.globStatus(directoryPath, directoryFilter).map(_.getPath) | ||
val directories = Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the .getOrElse
could come at the end, but it hardly matters.
Why the speedups? Comes from that glob filter calling getFileStatus() on every entry, which is is 1-3 HTTP requests and a few hundred millis per call, when instead that can be handled later. As a result, the more files you have in a path, the more time the scan takes, until eventually the scan time > window interval at which point your code is dead. The other stuff is simply associated optimisations. Now, I'm obviously happy with this, especially as I seem I getting credit. And it will help speedup working with any store. But I need to warn people: it is not sufficient The key problem here is: files uploaded by S3 multipart upload get a timestamp on when the upload began, not finished —yet only become visible at the end of the upload. If a caller starts up an upload in window t, and doesn't complete it until window t+1, then it may get missed. There's not much which can be done here, except in documenting the risk. What is a good solution? It'd be to use the cloud-infra-providers own event notification mechanism and subscribe to changes in a store. AWS, Azure and GCS all offer something like this. There's a home for the S3 one of those in spark-kinesis, perhaps. Not got free time to work on it, I'm afraid, but if someone starts coding it, list me on the PR and I'll take a look |
Yeah I agree, I was saying I do think it will speed things up. If it's a non-trivial win it's worthwhile even if it isn't the last optimization here. Is there any downside to this? |
no, no cost penalties. Slightly lower namenode load too. If you had many, many spark streaming clients scanning directories, HDFS ops teams would eventually get upset. This will postpone the day |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ScrapCodes looks good to me except perhaps the tiny style comment above, and possibly the log statement question
Retest this please. |
Test build #96843 has finished for PR 22339 at commit
|
Test build #96886 has finished for PR 22339 at commit
|
Test build #96885 has finished for PR 22339 at commit
|
Test build #96887 has finished for PR 22339 at commit
|
Merged to master |
Thank you @srowen and @steveloughran. |
…g against Object store. ## What changes were proposed in this pull request? Original work by Steve Loughran. Based on apache#17745. This is a minimal patch of changes to FileInputDStream to reduce File status requests when querying files. Each call to file status is 3+ http calls to object store. This patch eliminates the need for it, by using FileStatus objects. This is a minor optimisation when working with filesystems, but significant when working with object stores. ## How was this patch tested? Tests included. Existing tests pass. Closes apache#22339 from ScrapCodes/PR_17745. Lead-authored-by: Prashant Sharma <prashant@apache.org> Co-authored-by: Steve Loughran <stevel@hortonworks.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
…reaming against Object store. apache#22339 Original work by Steve Loughran. Based on apache#17745. This is a minimal patch of changes to FileInputDStream to reduce File status requests when querying files. Each call to file status is 3+ http calls to object store. This patch eliminates the need for it, by using FileStatus objects. This is a minor optimisation when working with filesystems, but significant when working with object stores.
### What changes were proposed in this pull request? The pr aims to delete `TimeStampedHashMap` and its UT. ### Why are the changes needed? During Pr #43578, we found that the class `TimeStampedHashMap` is no longer in use. Based on the suggestion, we have removed it. #43578 (comment) - First time this class `TimeStampedHashMap` be introduced: b18d708#diff-77b12178a7036c71135074c6ddf7d659e5a69906264d5e3061087e4352e304ed introduced this data structure - After #22339, this class `TimeStampedHashMap` is only being used in UT `TimeStampedHashMapSuite`. So, after Spark 3.0, this data structure has not been used by any production code of Spark. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43633 from panbingkun/remove_TimeStampedHashMap. Authored-by: panbingkun <pbk1982@gmail.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
What changes were proposed in this pull request?
Original work by Steve Loughran.
Based on #17745.
This is a minimal patch of changes to FileInputDStream to reduce File status requests when querying files. Each call to file status is 3+ http calls to object store. This patch eliminates the need for it, by using FileStatus objects.
This is a minor optimisation when working with filesystems, but significant when working with object stores.
How was this patch tested?
Tests included. Existing tests pass.