-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24763][SS] Remove redundant key data from value in streaming aggregation #21733
Conversation
Test build #92734 has finished for PR 21733 at commit
|
Test build #92735 has finished for PR 21733 at commit
|
@@ -53,7 +54,30 @@ class StreamingAggregationSuite extends StateStoreMetricsTest | |||
|
|||
import testImplicits._ | |||
|
|||
test("simple count, update mode") { | |||
val confAndTestNamePostfixMatrix = List( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this within beforeAll and afterAll with spark.conf.set / spark.conf.unset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withSQLConf
looks like used widely between SQL unit tests, and does additional work (SparkSession.setActiveSession), so I'm not sure it will work technically same. Moreover, we need to run same test "multiple times", with changing configuration.
Could you propose your code if you don't really mind? Thanks in advance!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. "we need to run same test "multiple times", with changing configuration." I missed this.
We could consider like:
Lines 61 to 72 in 489a529
override def beforeEach(): Unit = { | |
super.beforeEach() | |
// Note that there are many tests here that require record-level filtering set to be true. | |
spark.conf.set(SQLConf.PARQUET_RECORD_FILTER_ENABLED.key, "true") | |
} | |
override def afterEach(): Unit = { | |
try { | |
spark.conf.unset(SQLConf.PARQUET_RECORD_FILTER_ENABLED.key) | |
} finally { | |
super.afterEach() | |
} |
e.g.,
StreamingAggregationSuite {
override def afterAll(): Unit = {
// false
}
override def beforeAll(): Unit = {
// false
}
}
RemoveRedundantStreamingAggregationSuite extends StreamingAggregationSuite {
override def afterAll(): Unit = {
// true
}
override def beforeAll(): Unit = {
// true
}
}
but I believe the current implementation works too in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I'd like to wait for other reviewers regarding opinions/suggestions on this. Let me keep this as it is until then.
Test build #92738 has finished for PR 21733 at commit
|
retest this, please |
Test build #92755 has finished for PR 21733 at commit
|
retest this, please |
Test build #92791 has finished for PR 21733 at commit
|
@@ -825,6 +825,16 @@ object SQLConf { | |||
.intConf | |||
.createWithDefault(100) | |||
|
|||
val ADVANCED_REMOVE_REDUNDANT_IN_STATEFUL_AGGREGATION = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get worried when I see things described as "advanced features". What will go wrong if a user who's insufficiently advanced tries to use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not compatible with current stateful aggregation (definitely, that's the improvement of this patch) and there is no undo. So only new query can turn on this option, and once end users enable the option in the query, the option must be enabled unless end users clear out checkpoint as well as state. (I've added the new option to OffsetSeqMetadata to remember the first setting like partition count).
I'm seeing performance on far or even slightly better on specific workload (publicized in description link), but I would say I cannot try out exhaustive workloads. I actually expected a tradeoff between performance vs state memory usage, so assuming if other workloads follow the tradeoff, end users may need to try out this option in their query with non-production environment (for example, staged) to ensure enabling option doesn't break their expectation of performance with benefit of reducing state memory.
That's why I also make changes available as an option instead of modifying default behavior. If we apply this to the default behavior, we need to provide state migration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be a query option instead of a SparkConf, then? I worry it will be very hard to reason about the current scenario, where the conf defines how all states are stored - except that some streams started with a different value will silently override it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, but I'm not sure if I understand your suggestion correctly. I guess defining configuration to spark conf would be easier to guard against modification after starting query, via existing approach - adding conf to OffsetSeqMetadata - whereas I'm not sure we could guard against modification of query option. I might be missing something here.
Could you elaborate a bit more? Thanks in advance!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the default value of this is false
so end users will be aware of existence of this option, and have a chance to read the explanation before setting this option to true
.
We might elaborate a bit more on the config: tradeoff between reduced memory usage vs possible perf. hit and suggest running this in non-production before applying this to production. If we feel safer on elaborating more on this, I'm happy to update it.
@HeartSaVioR , the results looks promising. I am wondering if theres a way to make this default option than introducing new configs. Since this is internal details anyway theres no need to expose any config if we can identify the old vs new format by looking at the fields in the row or by introducing a row version to differentiate old vs new. |
@arunmahadevan Btw, as you commented, there seems two approaches to identify the old and new format:
Actually I tried to do it before (via checking count of fields in value row, since this patch reduces the count of fields in value row), and soon realized I can't do it because HDFSBackedStateStoreProvider relies on provided keySchema and valueSchema when serializing / deserializing rows (only byte arrays are stored... no numFields), not leveraging UnsafeRow's serialization/deserialization mechanism (writeExternal/readExternal or write/read via Kyro), so the numFields in deserialized row could go wrong if the schema doesn't match with actual rows, and we can't verify this. Current approach saves cost to write/read one additional integer with sacrificing the way to verify the rows. If we would want to add the feature back, state migration should be happened or we should have row version to differentiate the twos.
We could do this via applying same approach in #21739 so this is valid, but query with old state format should do state migration (not easy to do since it should be done against multiple versions of states), or continue relying on old state format (same as above). @jose-torres Could you please take a look at @arunmahadevan 's comment as well as this comment and comment yours? Thanks in advance! |
We could still save the value of the option to offsetSeqMetadata and error if it's changed. The value of using an option would just be that there's no global default; a poweruser can set the option for the queries they think would benefit without affecting all the other queries which get run. I agree it would be nice to just have some safe path allowing us to always use the new strategy. Absent that, there's an unfortunate tradeoff of reduced memory footprint vs added complexity. I think we ultimately need a committer to decide whether that's worth it. |
I guess we would have to treat reducing state memory size to have worth to do: as described in above commit, we already optimized in HDFSBackedStateStoreProvider for reducing state store disk size (as well as network transfer) via not storing 4 bytes per each row (from both key and value). This approach would normally save more than previous optimization on value row, given key would have window information which contains two values: start and end. The main issue on this approach for me is possible perf. impact on workloads. Hopefully the workload I've covered shows even slight perf. improvement but not sure for other workloads yet. I might say we need to consider changing default behavior when I have overall good backing numbers afterwards, but in any way, I'm sure I agree that deciding from committer(s) is necessary. Would we be better to initiate mail thread in dev. mailing list? |
I had a chance to test this patch with more kinds of use cases, and in overall enabling option shows on far or slightly better performance whereas it reduces state size according to the ratio of size of key-value pair. I'm now feeling that it would make sense to adopt new strategy to the default and use old behavior as fallback of supporting old app, but the numbers are for persuading committers and I still agree decision would be necessary from committer(s). |
Test build #93221 has finished for PR 21733 at commit
|
Test build #93222 has finished for PR 21733 at commit
|
Test build #93277 has finished for PR 21733 at commit
|
…ggregation * add option to configure enabling new feature: remove redundant key data from value * modify code to respect new option (turning on/off feature) * modify tests to run tests with both on/off * Add guard in OffsetSeqMetadata to prevent modifying option after executing query
…backward compatible"
Now I'd like to propose changing default behavior to apply new path but keeping backward compatibility, so applied it to the patch. I'm still open on decision to apply it as advanced option as first approach, and happy to roll back when we decide on that way. |
Add tests for StatefulOperatorsHelper itself as well. (Sorry for pushing commits multiple times which trigger multiple builds. It might be ideal if older test builds are terminated once newer test build for specific PR is just launched: and looks like it already works like the way via failing with -9.) |
@tdas
rate: 160000
state size: 54.854 % (reduces 45.15%)
rate: 120000
state size: 64.787% (reduces 35.21%)
rate: 25000
state size: 91.709 % (reduces 8.29 %)
rate: 85000
state size: 96.861 % (reduces 3.14 %) I don't find any outstanding perf. hit, and expected state size reduction is shown from all over the cases. |
* add docs
Also added javadoc as well. Most of contents are from StateStore but I didn't copy the note to implementation for state store since it is duplicated. Please let me know if we want to add content for the parameter target state store as well. |
Test build #94403 has finished for PR 21733 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking much better, thank you for the changes! There are still a few improvements that can be made.
@@ -81,4 +85,221 @@ package object state { | |||
storeCoordinator) | |||
} | |||
} | |||
|
|||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ummm why is it in this package class and not in separate file?? Is there any reason it has to be state
package object when not all of stateful require it, only streaming aggregation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I misinterpret your suggestion before. I thought you are suggesting move to state package class. Will place it to separate file.
def remove(store: StateStore, key: UnsafeRow): Unit | ||
|
||
/** | ||
* Return an iterator containing all the key-value pairs in target state store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: some of these can be compressed to a single line doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will address.
* Commit all the updates that have been made to the target state store, and return the | ||
* new version. | ||
* | ||
* @param store The target StateStore instance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
superfluous. just the main statement has all the information.
* Extract columns consisting key from input row, and return the new row for key columns. | ||
* | ||
* @param row The input row. | ||
* @return The row instance which only contains key columns. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: a lot of the @param
and @return
in the docs are a bit superfluous as it just repeats what the main statement already says.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will just remove all the @param
and @return
if they are repeating.
* @param row The input row. | ||
* @return The row instance which only contains key columns. | ||
*/ | ||
def getKey(row: InternalRow): UnsafeRow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why is the input typed InternalRow where everything else is UnsafeRow? seems inconsistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getKey
was basically UnsafeProjection in statefulOperator so didn't necessarily require UnsafeRow. I just followed the usage to make it less restrict, but we know, in reality row
will be always UnsafeRow. So OK to fix if it provides consistency.
// state manager should return row which is same as input row regardless of format version | ||
assert(inputRow === stateManager.get(memoryStateStore, keyRow)) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove.
return savedState | ||
} | ||
|
||
val joinedRow = joiner.join(key, savedState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cant you dedup the code with restoreOriginRow
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed spot. Will leverage restoreOriginRow
.
store.iterator().map(rowPair => restoreOriginRow(rowPair)) | ||
} | ||
|
||
private def restoreOriginRow(rowPair: UnsafeRowPair): UnsafeRow = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to restoreOriginalRow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will rename.
@@ -167,6 +165,18 @@ trait WatermarkSupport extends UnaryExecNode { | |||
} | |||
} | |||
} | |||
|
|||
protected def removeKeysOlderThanWatermark(storeManager: StreamingAggregationStateManager, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incorrect indent of parameters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually... where is this used? This does not seem to be used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed spot. It will be called from Update mode of StateStoreSaveExec. Will address.
val hasInput = iter.hasNext | ||
if (!hasInput && keyExpressions.isEmpty) { | ||
// If our `keyExpressions` are empty, we're getting a global aggregation. In that case | ||
// the `HashAggregateExec` will output a 0 value for the partial merge. We need to | ||
// restore the value, so that we don't overwrite our state with a 0 value, but rather | ||
// merge the 0 with existing state. | ||
// In this case the value should represent origin row, so no need to restore. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this mean? I think this is not needed any more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it can be removed now. Will remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @tdas for the super detailed and thoughtful review! Will address your review comments and update the PR.
"be modified after running.") | ||
.intConf | ||
.checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") | ||
.createWithDefault(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice suggestion. Will add the test.
* Extract columns consisting key from input row, and return the new row for key columns. | ||
* | ||
* @param row The input row. | ||
* @return The row instance which only contains key columns. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will just remove all the @param
and @return
if they are repeating.
* @param row The input row. | ||
* @return The row instance which only contains key columns. | ||
*/ | ||
def getKey(row: InternalRow): UnsafeRow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getKey
was basically UnsafeProjection in statefulOperator so didn't necessarily require UnsafeRow. I just followed the usage to make it less restrict, but we know, in reality row
will be always UnsafeRow. So OK to fix if it provides consistency.
|
||
def getKey(row: InternalRow): UnsafeRow = keyProjector(row) | ||
|
||
override def commit(store: StateStore): Long = store.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is actually based on your review comment: always use state manager and don't directly access state store whenever possible. If your suggestion only applies to operations I can remove commit()
from this interface.
val hasInput = iter.hasNext | ||
if (!hasInput && keyExpressions.isEmpty) { | ||
// If our `keyExpressions` are empty, we're getting a global aggregation. In that case | ||
// the `HashAggregateExec` will output a 0 value for the partial merge. We need to | ||
// restore the value, so that we don't overwrite our state with a 0 value, but rather | ||
// merge the 0 with existing state. | ||
// In this case the value should represent origin row, so no need to restore. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it can be removed now. Will remove.
return savedState | ||
} | ||
|
||
val joinedRow = joiner.join(key, savedState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed spot. Will leverage restoreOriginRow
.
GenerateUnsafeRowJoiner.create(StructType.fromAttributes(keyExpressions), | ||
StructType.fromAttributes(valueExpressions)) | ||
@transient private lazy val restoreValueProjector = GenerateUnsafeProjection.generate( | ||
keyValueJoinedExpressions, inputRowAttributes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad. You're right. Will fix. Btw, needToProjectToRestoreValue is always false, unless sequence of columns for key and value get mixed up.
|
||
private val valueExpressions: Seq[Attribute] = inputRowAttributes.diff(keyExpressions) | ||
private val keyValueJoinedExpressions: Seq[Attribute] = keyExpressions ++ valueExpressions | ||
private val needToProjectToRestoreValue: Boolean = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add.
* - key: Same as key expressions. | ||
* - value: Same as input row attributes. The schema of value contains key expressions as well. | ||
* | ||
* This implementation only works when input row attributes contain all the key attributes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intended to put the sentence as a requirement / precondition for possible future usages, but if you think we don't need to put it explicitly I can remove it.
def remove(store: StateStore, key: UnsafeRow): Unit | ||
|
||
/** | ||
* Return an iterator containing all the key-value pairs in target state store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will address.
TODO list * add test which reads checkpoint from 2.3.1 and runs query
* add test which restores query from Spark 2.3.1
@tdas Addressed review comments. Please take a look again. Thanks in advance. |
Test build #94469 has finished for PR 21733 at commit
|
retest this, please |
Test build #94474 has finished for PR 21733 at commit
|
@tdas Kindly reminder. |
This looks good!! Only one comment, please don't add the .crc files. They are useless and adds unnecessarily clutter. |
LGTM. Will merge when tests pass. :) |
@tdas Removed the .crc files. Thanks for reviewing! |
Good point. That can be minor Pr. |
Test build #95003 has finished for PR 21733 at commit
|
## What changes were proposed in this pull request? Add .crc files to .gitignore so that we don't add .crc files in state checkpoint to git repo which could be added in test resources. This is based on comments in apache#21733, apache#21733 (comment). ## How was this patch tested? Add `.1.delta.crc` and `.2.delta.crc` in `<spark root>/sql/core/src/test/resources`, and confirm git doesn't suggest the files to add to stage. Closes apache#22170 from HeartSaVioR/add-crc-files-to-gitignore. Authored-by: Jungtaek Lim <kabhwan@gmail.com> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
Thanks all for reviewing and thanks @tdas for merging this in! |
What changes were proposed in this pull request?
This patch proposes a new flag option for stateful aggregation: remove redundant key data from value.
Enabling new option runs similar with current, and uses less memory for state according to key/value fields of state operator.
Please refer below link to see detailed perf. test result:
https://issues.apache.org/jira/browse/SPARK-24763?focusedCommentId=16536539&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16536539
Since the state between enabling the option and disabling the option is not compatible, the option is set to 'disable' by default (to ensure backward compatibility), and OffsetSeqMetadata would prevent modifying the option after executing query.
How was this patch tested?
Modify unit tests to cover both disabling option and enabling option.
Also did manual tests to see whether propose patch improves state memory usage.