New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-37224][SS] Optimize write path on RocksDB state store provider #34502
Conversation
@@ -196,6 +210,29 @@ class RocksDB( | |||
} | |||
} | |||
|
|||
private def countKeys(): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't leverage iterator() since here we don't need to deserialize the key-value from RocksDB to produce UnsafeRowPair.
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #144951 has finished for PR 34502 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #144952 has finished for PR 34502 at commit
|
@zsxwing @viirya @xuanyuanking Please take a look. Thanks in advance! |
Please note that 553 Lines are regarding benchmark code and the result. I'll just separate the benchmark and the result to the next PR on top of this, to reduce down the amount of actual code to review. |
1715105
to
01e9db5
Compare
Friendly reminder, @tdas @zsxwing @viirya @xuanyuanking |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #145329 has finished for PR 34502 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments
@@ -1956,8 +1956,21 @@ Here are the configs regarding to RocksDB instance of the state store provider: | |||
<td>Whether we resets all ticker and histogram stats for RocksDB on load.</td> | |||
<td>True</td> | |||
</tr> | |||
<tr> | |||
<td>spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows</td> | |||
<td>Whether we track the total number of rows in state store. Please refer the details in "Performance-aspect considerations".</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use a link such as in [Performance-aspect considerations](#performance-aspect-considerations)
(the link syntax may be wrong. Could you try to build the docs locally to check it?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK will check it.
// Attempt to close this iterator if there is a task failure, or a task interruption. | ||
// This is a hack because it assumes that the RocksDB is running inside a task. | ||
Option(TaskContext.get()).foreach { tc => | ||
tc.addTaskCompletionListener[Unit] { _ => iter.close() } | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can put iter.close()
in finally
instead. This method doesn't return an Iterator to the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice finding! I was blindly following the iterator() method, my bad.
|
||
val numKeys = if (!conf.trackTotalNumberOfRows) { | ||
// we don't track the total number of rows - discard the number being track | ||
-1L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you point out where we turn -1
to 0
? I don't find it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
Lines 40 to 44 in 6450f6b
class SQLMetric(val metricType: String, initValue: Long = 0L) extends AccumulatorV2[Long, Long] { | |
// This is a workaround for SPARK-11013. | |
// We may use -1 as initial value of the accumulator, if the accumulator is valid, we will | |
// update it at the end of task and the value will be at least 0. Then we can filter out the -1 | |
// values before calculate max, min, etc. |
Even we separate the values for "no key" vs "don't know", the value will go through SQLMetric and negative values are not contributing on accumulation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the point.
Thanks! I addressed review comments. Please have a look at new change. Thanks in advance! |
Test build #145366 has finished for PR 34502 at commit
|
IDEA made fun of me; it didn't give the notice on compilation error. Just fixed. |
Kubernetes integration test unable to build dist. exiting with code: 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Kubernetes integration test starting |
Kubernetes integration test status failure |
Thanks for the review @zsxwing ! Merging to master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late LGTM. Just some small comments. We can address it in the second PR.
</table> | ||
|
||
##### Performance-aspect considerations | ||
|
||
1. For write-heavy workloads, you may want to disable the track of total number of rows. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have others considerations here? Or we'll add more in the future? (Just want to double confirm the 1.
is not a typo.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What it means "write-heavy workloads" in this context? Should we use the terms that are more understandable under streaming context? E.g., throughput? rows per second?
Because this seems indicating state store, I'm not sure how users measure if it is write-heavy on the state store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1.
is not a typo. Just wanted to reserve a space we would eventually add more. I'm not an expert of RocksDB so don't have insights to put some guides on tuning, but RocksDB itself seems to provide lots of things to tune so it may come up later.
I agree that "write-heavy workloads" sounds unclear; basically it means higher amount of updates (write/delete) against state store. This cannot be inferred from the volume of inputs depending on the operator and window - if the input produces lots of state keys on streaming aggregation, then it's going to issue lots of writes against state store. If the input are huge but binds to a few windows, then a few writes against state store.
Probably we can leverage the state metric "rows to update" and "rows to delete". They represent the amount of updates. Technically this change doesn't introduce perf. regression in any workloads so it's not limited to write-heavy workloads - we make a trade-off on observability so it's up to end users to choose performance vs observability.
Looks like it'd be better to remove the representation "For write-heavy workloads" and simply add "to gain additional performance on state store", with hinting that it will be more effective if the state metric "rows to update" and "rows to delete" are high.
Thanks for the inputs!
@@ -144,26 +156,28 @@ class RocksDB( | |||
* Put the given value for the given key and return the last written value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also change the comment correspondingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice finding! Missed that.
} | ||
oldValue | ||
writeBatch.put(key, value) | ||
} | ||
|
||
/** | ||
* Remove the key if present, and return the previous value if it was present (null otherwise). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. Just a question about the doc.
Test build #145368 has finished for PR 34502 at commit
|
### What changes were proposed in this pull request? This PR is a follow-up of #34502 to address post-reviews. This PR rewords on the explanation on performance tune on RocksDB state store to make it less confused, and also fix the method docs to be in sync with the code changes. ### Why are the changes needed? 1. The explanation on performance tune on RocksDB state store was unclear in a couple of spots. 2. We changed the method signature, but the change was not reflected to the method doc. ### Does this PR introduce _any_ user-facing change? Yes, end users will get less confused from the explanation on performance tune on RocksDB state store. ### How was this patch tested? N/A Closes #34652 from HeartSaVioR/SPARK-37224-follow-up-postreview. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…ions ### What changes were proposed in this pull request? This PR proposes to add a new benchmark to measure the performance on basic state store operations, and the result file. The proposed change of SPARK-37224 (#34502) is applied in the benchmark. As the benchmark number provides, turning off the config brings lots of performance gain in micro-benchmark perspective, while it is still slower than memory-based state store. ### Why are the changes needed? To track and verify further performance improvements. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Result file from manual run is included in this PR. Closes #34630 from HeartSaVioR/SPARK-37224-follow-up-benchmark. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
What changes were proposed in this pull request?
This PR proposes to optimize write path on RocksDB via removing unnecessary lookup. Removing unnecessary lookup unfortunately also disables the feasibility to track the number of rows, so this PR also introduces a new configuration for RocksDB state store provider to let end users turn it on and off based on their needs.
The new configuration is following:
spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows
We will give "0" for the number of keys in the state store metric when the config is turned off. The ideal value seems to be a negative one, but currently SQL metric doesn't allow negative value.
We will also handle the case the config is flipped during restart. This will enable the way end users enjoy the benefit but also not lose the chance to know the number of state rows. End users can turn off the flag to maximize the performance, and turn on the flag (restart required) when they want to see the actual number of keys (for observability/debugging/etc).
Why are the changes needed?
This addresses unnecessary lookup in write path, which only needs to track the number of rows. While the metric is a part of basic metrics for stateful operator, we can sacrifice some observability to gain performance on heavy write load.
Does this PR introduce any user-facing change?
Yes, new configuration is added. This is neither a backward incompatible change nor behavior change, since default value of the flag is retaining the behavior as it is.
But there's a glitch regarding rolling back to previous Spark version: if you run query with turning the config off (so that the number of keys is lost) and restart the query in older Spark version, older Spark version will still try to track the number and the number will get messed up. You may want to turn the config on and run some micro-batches before going back to previous Spark version.
How was this patch tested?
New UT. Benchmark will follow in next PR.