-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54383][SQL] Add precomputed schema variant for InternalRowComparableWrapper util #53097
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@cloud-fan @szehon-ho Could you take a look at this PR when you get the chance? |
| StructType(dataTypes.map(t => StructField("f", t))) -> | ||
| RowOrdering.createNaturalAscendingOrdering(dataTypes) | ||
|
|
||
| def mergePartitions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like the only caller for this method in the original class (InternalRowComparableWrapper) is the InternalRowComparableWrapper benchmark. Should we just migrate that over and deprecate this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, if you have time, be interesting to see the numbers after running the benchmark against the new class,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Attached some benchmarks in PR description.
| * Effectively the same as [[InternalRowComparableWrapper]], but using a precomputed `ordering` | ||
| * and `structType` to avoid the cache lookup for each row. | ||
| */ | ||
| class BoundInternalRowComparableWrapper( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As there's no checks now that derive structType/ordering to dataType, it seems a bit dangerous to not include them in hash/equals. Should we do that?
Alternatively we could also keep the binding by making a factory and keep this class constructor pviate, ie
object BoundInternalRowComparableFactory(dataTypes) {
val structType = getStructType(dataTypes);
val ordering = getOrdering(dataTypes)
def newBoundInternalRowComparableWrapper(row) => BoundInternalRowComparableWrapper(row, structType, ordering, dataTypes)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion - I actually removed the new BoundInternalRowComparableWrapper and used this pattern on the original InternalRowComparableWrapper (since it would work equally well there as well). I kept the original constructor for binary compatibility, but marked it deprecated for the reasons explained in the comment.
...atalyst/src/main/scala/org/apache/spark/sql/catalyst/util/InternalRowComparableWrapper.scala
Outdated
Show resolved
Hide resolved
szehon-ho
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! FYI @sunchao as well
sunchao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @chirag-s-db @szehon-ho , this LGTM as well. Pending CI.
|
Seems a real test failure in |
|
@cloud-fan Fixed here: c5ae5b7 (was missing one parameter in a migrated method) |
|
Thanks! Merged to master. |
…arableWrapper util ### What changes were proposed in this pull request? The InternalRowComparableWrapper util is often used in a very hot-path for physical planning (most often, to compare partition values for key-grouped partitioned scans). While the current implementation does schema lookup that each instance uses to create a new instance of this object, this cache lookup itself can become a bottleneck for planning when there are large numbers of partitions. This PR adds a new InternalRowComparableWrapper factory for this util that has a precomputed schema and ordering that can be shared across multiple objects, removing this schema or cache lookup from the hot-path for physical planning. ### Why are the changes needed? Removes a physical planning bottleneck. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This change should not change any behavior (existing tests should suffice). This PR also includes changes to the `InternalRowComparableWrapperBenchmark` to use these new utils. Results before change: ``` [info] internal row comparable wrapper: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] toSet 74 76 2 2.7 367.5 1.0X [info] mergePartitions 136 143 11 1.5 680.0 0.5X [success] Total time: 11 s, completed Nov 17, 2025, 2:29:22 PM ``` Results after change: ``` [info] internal row comparable wrapper: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] toSet 13 13 1 15.9 62.9 1.0X [info] mergePartitions 17 17 1 11.8 84.7 0.7X ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53097 from chirag-s-db/birc. Authored-by: Chirag Singh <chirag.singh@databricks.com> Signed-off-by: Chao Sun <chao@openai.com>
What changes were proposed in this pull request?
The InternalRowComparableWrapper util is often used in a very hot-path for physical planning (most often, to compare partition values for key-grouped partitioned scans). While the current implementation does schema lookup that each instance uses to create a new instance of this object, this cache lookup itself can become a bottleneck for planning when there are large numbers of partitions. This PR adds a new InternalRowComparableWrapper factory for this util that has a precomputed schema and ordering that can be shared across multiple objects, removing this schema or cache lookup from the hot-path for physical planning.
Why are the changes needed?
Removes a physical planning bottleneck.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
This change should not change any behavior (existing tests should suffice).
This PR also includes changes to the
InternalRowComparableWrapperBenchmarkto use these new utils. Results before change:Results after change:
Was this patch authored or co-authored using generative AI tooling?
No.