New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-16002][SQL]Sleep when no new data arrives to avoid 100% CPU usage #13718
Conversation
val listingTimeMs = (endTime.toDouble - startTime) / 1000000 | ||
if (listingTimeMs > 2000) { | ||
// Output a warning when listing files uses more than 2 seconds. | ||
logWarning(s"Listed ${files.size} file(s) in $listingTimeMs ms") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This log is pretty useful when listing files is extremely slow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be at DEBUG level ?
The user wouldn't know what to do seeing these warnings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tedyu as this is a Streaming application, this log may help the user find the bottleneck when the app becomes slow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should some kind of metric be introduced so that the user doesn't need to examine logs ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should some kind of metric be introduced so that the user doesn't need to examine logs ?
Sounds great. But that's not the priority task right now.
Also /cc @tdas |
LGTM |
@@ -120,7 +120,13 @@ class FileStreamSource( | |||
val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType)) | |||
val files = catalog.allFiles().map(_.getPath.toUri.toString) | |||
val endTime = System.nanoTime | |||
logInfo(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms") | |||
val listingTimeMs = (endTime.toDouble - startTime) / 1000000 | |||
if (listingTimeMs > 2000) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 2 seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No special reason... Just randomly choose a number. @tdas any suggestion for this number?
Test build #60665 has finished for PR 13718 at commit
|
Test build #60667 has finished for PR 13718 at commit
|
@@ -164,6 +164,7 @@ class StreamSuite extends StreamTest { | |||
/* -- batch 0 ----------------------- */ | |||
// Add some data in batch 0 | |||
AddData(inputData, 1, 2, 3), | |||
EnsureManualClockInWaitingState, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is necessary. Otherwise, the manual clock may be advanced too early and then ManualClock.waitTillTime
will block forever.
Test build #60732 has finished for PR 13718 at commit
|
Test build #60733 has finished for PR 13718 at commit
|
Test build #60734 has finished for PR 13718 at commit
|
@tdas I fixed the test. Could you take another look? |
/** | ||
* Returns whether there is any thread being blocked in `waitTillTime`. | ||
*/ | ||
def isWaiting: Boolean = synchronized { _isWaiting } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this ever return true
, given that its synchronized
with the only method that mutates _isWaiting
(and that method sets it to false
in a finally)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait(10)
in waitTillTime
will give up the lock.
wait
's javadoc:
This method causes the current thread (call it T) to place itself in the wait set for this object and then to relinquish any and all synchronization claims on this object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So isWaiting
returns true only when some threads is blocked by wait(10)
.
Test build #60862 has finished for PR 13718 at commit
|
retest this please |
Test build #60875 has finished for PR 13718 at commit
|
LGTM. Thanks. Merging to master and branch 2.0. |
…sage ## What changes were proposed in this pull request? Add a configuration to allow people to set a minimum polling delay when no new data arrives (default is 10ms). This PR also cleans up some INFO logs. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13718 from zsxwing/SPARK-16002. (cherry picked from commit c399c7f) Signed-off-by: Yin Huai <yhuai@databricks.com>
What changes were proposed in this pull request?
Add a configuration to allow people to set a minimum polling delay when no new data arrives (default is 10ms). This PR also cleans up some INFO logs.
How was this patch tested?
Existing unit tests.