New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30964][Core][WebUI] Accelerate InMemoryStore with a new index #27716
Conversation
Test build #118993 has finished for PR 27716 at commit
|
Test build #118995 has finished for PR 27716 at commit
|
Test build #118997 has finished for PR 27716 at commit
|
retest this please |
Test build #118994 has finished for PR 27716 at commit
|
Test build #118998 has finished for PR 27716 at commit
|
Test build #118996 has finished for PR 27716 at commit
|
Test build #118999 has finished for PR 27716 at commit
|
Test build #119005 has finished for PR 27716 at commit
|
retest this please. |
Test build #119015 has finished for PR 27716 at commit
|
Test build #119018 has finished for PR 27716 at commit
|
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
Outdated
Show resolved
Hide resolved
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
Outdated
Show resolved
Hide resolved
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
Show resolved
Hide resolved
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
Outdated
Show resolved
Hide resolved
common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
Outdated
Show resolved
Hide resolved
Test build #119099 has finished for PR 27716 at commit
|
for (NaturalKeys v : parentToChildrenMap.values()) { | ||
if (v.remove(asKey(key))) { | ||
break; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a parent key in parentToChildrenMap
points to empty NaturalKeys
, we can also remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, nothing will change if the NaturalKeys v
doesn't contain key
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I meant after v.remove(asKey(key))
, if v
is empty, can we remove the (parent key, empty NaturalKeys) from parentToChildrenMap
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, parentToChildrenMap
is a concurrent map and checking emptiness costs time.
The method here is to delete one entry. I think we can make it simple and keep it this way.
Comparable<Object> parentKey = asKey(parent); | ||
if (!naturalParentIndexName.isEmpty() && | ||
naturalParentIndexName.equals(ti.getParentIndexName(index))) { | ||
// If there is a parent index for the natural index and the parent of`index` happens to be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
of`index`
-> of `index`
.collect(Collectors.toList()); | ||
Comparable<Object> parentKey = asKey(parent); | ||
if (!naturalParentIndexName.isEmpty() && | ||
naturalParentIndexName.equals(ti.getParentIndexName(index))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that naturalParentIndexName
doesn't equal to ti.getParentIndexName(index)
? Isn't String index = KVIndex.NATURAL_INDEX_NAME
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible. I have explained in https://github.com/apache/spark/pull/27716/files#r385846069.
Test build #119116 has finished for PR 27716 at commit
|
retest this please. |
Test build #119119 has finished for PR 27716 at commit
|
retest this please. |
Test build #119130 has finished for PR 27716 at commit
|
Test build #119134 has finished for PR 27716 at commit
|
retest this please. |
Test build #119140 has finished for PR 27716 at commit
|
linear scan all the tasks data to look up only one stage looks like a performance issue to me, and we should fix it in 3.0 as well. Thanks, merging to master/3.0! |
### What changes were proposed in this pull request? Spark uses the class `InMemoryStore` as the KV storage for live UI and history server(by default if no LevelDB file path is provided). In `InMemoryStore`, all the task data in one application is stored in a hashmap, which key is the task ID and the value is the task data. This fine for getting or deleting with a provided task ID. However, Spark stage UI always shows all the task data in one stage and the current implementation is to look up all the values in the hashmap. The time complexity is O(numOfTasks). Also, when there are too many stages (>spark.ui.retainedStages), Spark will linearly try to look up all the task data of the stages to be deleted as well. This can be very bad for a large application with many stages and tasks. We can improve it by allowing the natural key of an entity to have a real parent index. So that on each lookup with parent node provided, Spark can look up all the natural keys(in our case, the task IDs) first, and then find the data with the natural keys in the hashmap. ### Why are the changes needed? The in-memory KV store becomes really slow for large applications. We can improve it with a new index. The performance can be 10 times, 100 times, even 1000 times faster. This is also possible to make the Spark driver more stable for large applications. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing unit tests. Also, I run a benchmark with the following code ``` val store = new InMemoryStore() val numberOfTasksPerStage = 10000 (0 until 1000).map { sId => (0 until numberOfTasksPerStage).map { taskId => val task = newTaskData(sId * numberOfTasksPerStage + taskId, "SUCCESS", sId) store.write(task) } } val appStatusStore = new AppStatusStore(store) var start = System.nanoTime() appStatusStore.taskSummary(2, attemptId, Array(0, 0.25, 0.5, 0.75, 1)) println("task summary run time: " + ((System.nanoTime() - start) / 1000000)) val stageIds = Seq(1, 11, 66, 88) val stageKeys = stageIds.map(Array(_, attemptId)) start = System.nanoTime() store.removeAllByIndexValues(classOf[TaskDataWrapper], TaskIndexNames.STAGE, stageKeys.asJavaCollection) println("clean up tasks run time: " + ((System.nanoTime() - start) / 1000000)) ``` Task summary before the changes: 98642ms Task summary after the changes: 120ms Task clean up before the changes: 4900ms Task clean up before the changes: 4ms It's 800x faster after the changes in the micro-benchmark. Closes #27716 from gengliangwang/liveUIStore. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 6b64143) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan @viirya Thanks for the review ! |
…p when removing key from CountingRemoveIfForEach ### What changes were proposed in this pull request? This patch addresses missed spot on SPARK-30964 (#27716) - SPARK-30964 added secondary index which defines the relationship between parent - children and able to operate all children for given parent faster. While SPARK-30964 handled the addition and deletion of secondary index in InstanceList properly, it missed to add code to handle deletion of secondary index in CountingRemoveIfForEach, resulting to the leak of indices. This patch adds the deletion of secondary index in CountingRemoveIfForEach. ### Why are the changes needed? Described above. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? N/A, as relevant field and class are marked as private, and it cannot be checked in higher level. I'm not sure we want to adjust scope to add a test. Closes #27765 from HeartSaVioR/SPARK-31014. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
…p when removing key from CountingRemoveIfForEach ### What changes were proposed in this pull request? This patch addresses missed spot on SPARK-30964 (apache#27716) - SPARK-30964 added secondary index which defines the relationship between parent - children and able to operate all children for given parent faster. While SPARK-30964 handled the addition and deletion of secondary index in InstanceList properly, it missed to add code to handle deletion of secondary index in CountingRemoveIfForEach, resulting to the leak of indices. This patch adds the deletion of secondary index in CountingRemoveIfForEach. ### Why are the changes needed? Described above. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? N/A, as relevant field and class are marked as private, and it cannot be checked in higher level. I'm not sure we want to adjust scope to add a test. Closes apache#27765 from HeartSaVioR/SPARK-31014. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
…renMap when removing key from CountingRemoveIfForEach ### What changes were proposed in this pull request? This patch addresses missed spot on SPARK-30964 (#27716) - SPARK-30964 added secondary index which defines the relationship between parent - children and able to operate all children for given parent faster. While SPARK-30964 handled the addition and deletion of secondary index in InstanceList properly, it missed to add code to handle deletion of secondary index in CountingRemoveIfForEach, resulting to the leak of indices. This patch adds the deletion of secondary index in CountingRemoveIfForEach. ### Why are the changes needed? Described above. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? N/A, as relevant field and class are marked as private, and it cannot be checked in higher level. I'm not sure we want to adjust scope to add a test. Closes #27825 from HeartSaVioR/SPARK-31014-branch-3.0. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request? Spark uses the class `InMemoryStore` as the KV storage for live UI and history server(by default if no LevelDB file path is provided). In `InMemoryStore`, all the task data in one application is stored in a hashmap, which key is the task ID and the value is the task data. This fine for getting or deleting with a provided task ID. However, Spark stage UI always shows all the task data in one stage and the current implementation is to look up all the values in the hashmap. The time complexity is O(numOfTasks). Also, when there are too many stages (>spark.ui.retainedStages), Spark will linearly try to look up all the task data of the stages to be deleted as well. This can be very bad for a large application with many stages and tasks. We can improve it by allowing the natural key of an entity to have a real parent index. So that on each lookup with parent node provided, Spark can look up all the natural keys(in our case, the task IDs) first, and then find the data with the natural keys in the hashmap. ### Why are the changes needed? The in-memory KV store becomes really slow for large applications. We can improve it with a new index. The performance can be 10 times, 100 times, even 1000 times faster. This is also possible to make the Spark driver more stable for large applications. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing unit tests. Also, I run a benchmark with the following code ``` val store = new InMemoryStore() val numberOfTasksPerStage = 10000 (0 until 1000).map { sId => (0 until numberOfTasksPerStage).map { taskId => val task = newTaskData(sId * numberOfTasksPerStage + taskId, "SUCCESS", sId) store.write(task) } } val appStatusStore = new AppStatusStore(store) var start = System.nanoTime() appStatusStore.taskSummary(2, attemptId, Array(0, 0.25, 0.5, 0.75, 1)) println("task summary run time: " + ((System.nanoTime() - start) / 1000000)) val stageIds = Seq(1, 11, 66, 88) val stageKeys = stageIds.map(Array(_, attemptId)) start = System.nanoTime() store.removeAllByIndexValues(classOf[TaskDataWrapper], TaskIndexNames.STAGE, stageKeys.asJavaCollection) println("clean up tasks run time: " + ((System.nanoTime() - start) / 1000000)) ``` Task summary before the changes: 98642ms Task summary after the changes: 120ms Task clean up before the changes: 4900ms Task clean up before the changes: 4ms It's 800x faster after the changes in the micro-benchmark. Closes apache#27716 from gengliangwang/liveUIStore. Authored-by: Gengliang Wang <gengliang.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…p when removing key from CountingRemoveIfForEach ### What changes were proposed in this pull request? This patch addresses missed spot on SPARK-30964 (apache#27716) - SPARK-30964 added secondary index which defines the relationship between parent - children and able to operate all children for given parent faster. While SPARK-30964 handled the addition and deletion of secondary index in InstanceList properly, it missed to add code to handle deletion of secondary index in CountingRemoveIfForEach, resulting to the leak of indices. This patch adds the deletion of secondary index in CountingRemoveIfForEach. ### Why are the changes needed? Described above. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? N/A, as relevant field and class are marked as private, and it cannot be checked in higher level. I'm not sure we want to adjust scope to add a test. Closes apache#27765 from HeartSaVioR/SPARK-31014. Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
…with InMemoryStore ### What changes were proposed in this pull request? #27716 introduced parent index for InMemoryStore. When the method "deleteParentIndex(Object key)" in InMemoryStore.java is called and the key is not contained in "NaturalKeys v", A java.lang.NullPointerException will be thrown. This patch fixed the issue by updating the if condition. ### Why are the changes needed? Fixed a minor bug. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added a unit test for deleteParentIndex. Closes #28378 from baohe-zhang/SPARK-31584. Authored-by: Baohe Zhang <baohe.zhang@verizonmedia.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
…with InMemoryStore ### What changes were proposed in this pull request? #27716 introduced parent index for InMemoryStore. When the method "deleteParentIndex(Object key)" in InMemoryStore.java is called and the key is not contained in "NaturalKeys v", A java.lang.NullPointerException will be thrown. This patch fixed the issue by updating the if condition. ### Why are the changes needed? Fixed a minor bug. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added a unit test for deleteParentIndex. Closes #28378 from baohe-zhang/SPARK-31584. Authored-by: Baohe Zhang <baohe.zhang@verizonmedia.com> Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com> (cherry picked from commit 3808014) Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
What changes were proposed in this pull request?
Spark uses the class
InMemoryStore
as the KV storage for live UI and history server(by default if no LevelDB file path is provided).In
InMemoryStore
, all the task data in one application is stored in a hashmap, which key is the task ID and the value is the task data. This fine for getting or deleting with a provided task ID.However, Spark stage UI always shows all the task data in one stage and the current implementation is to look up all the values in the hashmap. The time complexity is O(numOfTasks).
Also, when there are too many stages (>spark.ui.retainedStages), Spark will linearly try to look up all the task data of the stages to be deleted as well.
This can be very bad for a large application with many stages and tasks. We can improve it by allowing the natural key of an entity to have a real parent index. So that on each lookup with parent node provided, Spark can look up all the natural keys(in our case, the task IDs) first, and then find the data with the natural keys in the hashmap.
Why are the changes needed?
The in-memory KV store becomes really slow for large applications. We can improve it with a new index. The performance can be 10 times, 100 times, even 1000 times faster.
This is also possible to make the Spark driver more stable for large applications.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing unit tests.
Also, I run a benchmark with the following code
Task summary before the changes: 98642ms
Task summary after the changes: 120ms
Task clean up before the changes: 4900ms
Task clean up before the changes: 4ms
It's 800x faster after the changes in the micro-benchmark.