-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29043][Core] Improve the concurrent performance of History Server #25797
Conversation
ff7b9b9
to
fa89be3
Compare
ok to test cc @wangyum |
retest please |
ok to test |
cc @vanzin too |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code change looks OK, but I might be missing the effect where "listing" and "reloading applications" run concurrently. It would bring accessing some places via multi-threads which they weren't, so may need to check.
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Show resolved
Hide resolved
ok to test |
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Outdated
Show resolved
Hide resolved
d939301
to
ec387f3
Compare
Test build #110636 has finished for PR 25797 at commit
|
gentle ping @dongjoon-hyun @HyukjinKwon |
I just modify the a method from protected to private in new commit, it would not impact the test result. |
Test build #111038 has finished for PR 25797 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable.
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Outdated
Show resolved
Hide resolved
I just add some comment and rename a method, which would not impact the test result. |
Test build #111074 has finished for PR 25797 at commit
|
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Show resolved
Hide resolved
@cloud-fan Could you help take a look? |
Seems good to me. |
I have tested it for a week, it works well. |
Test build #113321 has finished for PR 25797 at commit
|
Test build #113328 has finished for PR 25797 at commit
|
Test build #113330 has finished for PR 25797 at commit
|
Test build #113332 has finished for PR 25797 at commit
|
endProcessing(reader.rootPath) | ||
pendingReplayTasksCount.decrementAndGet() | ||
|
||
val isExpired = scanTime + conf.get(MAX_LOG_AGE_S) * 1000 < clock.getTimeMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not right. Expiration is based on the time the log was last updated, not the time it was last scanned.
|
||
val isExpired = scanTime + conf.get(MAX_LOG_AGE_S) * 1000 < clock.getTimeMillis() | ||
if (isExpired) { | ||
listing.delete(classOf[LogInfo], reader.rootPath.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may also need to remove the application attempt that refers to the log from the listing database.
Basically you have to do what cleanLogs
does, both to define whether the log is expired, and what needs to be deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
Test build #113379 has finished for PR 25797 at commit
|
retest this please. |
d5ff72c
to
579c1ad
Compare
rebased. |
Test build #113625 has finished for PR 25797 at commit
|
Test build #113626 has finished for PR 25797 at commit
|
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Outdated
Show resolved
Hide resolved
@turboFei Hi, could you address the review comments? This is good to have and seems close to be merged (according to #26416 (review) ). |
Thanks, I will address it as soon as possible. |
7d1301c
to
c6ac35e
Compare
|
||
log.appId.foreach { appId => | ||
val app = listing.read(classOf[ApplicationInfoWrapper], appId) | ||
if (app.oldestAttempt() <= maxTime) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the logic is consistent with cleanLogs().
But, I think there is an overlap between app.oldestAttempt() <= maxTime
and attempt.info.lastUpdated.getTime() >= maxTime
, even it does not matter.
Test build #115300 has finished for PR 25797 at commit
|
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
Outdated
Show resolved
Hide resolved
Test build #115334 has finished for PR 25797 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks ok now. Merging to master.
@@ -1321,6 +1321,35 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { | |||
assertSerDe(serializer, attemptInfoWithIndex) | |||
} | |||
|
|||
test("SPARK-29043: clean up specified event log") { | |||
val clock = new ManualClock() | |||
val conf = createTestConf().set(MAX_LOG_AGE_S.key, "0").set(CLEANER_ENABLED.key, "true") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to use .key
here.
(I'll fix this during merge.)
What changes were proposed in this pull request?
Even we set spark.history.fs.numReplayThreads to a large number, such as 30.
The history server still replays logs slowly.
We found that, if there is a straggler in a batch of replay tasks, all the other threads will wait for this
straggler.
In this PR, we create processing to save the logs which are being replayed.
So that the replay tasks can execute Asynchronously.
Why are the changes needed?
It can accelerate the speed to replay logs for history server.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UT.