-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter #34568
Conversation
@@ -206,3 +179,43 @@ case class SortExec( | |||
override protected def withNewChildInternal(newChild: SparkPlan): SortExec = | |||
copy(child = newChild) | |||
} | |||
object SortExec { | |||
def createSorter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change is because maxConcurrentOutputFileWriters
need to create sorter at FileFormatWriter
Kubernetes integration test starting |
19a367f
to
e38ed44
Compare
Kubernetes integration test status failure |
Kubernetes integration test starting |
Test build #145149 has finished for PR 34568 at commit
|
Kubernetes integration test status failure |
Test build #145155 has finished for PR 34568 at commit
|
e38ed44
to
d673f8d
Compare
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #145163 has finished for PR 34568 at commit
|
cc @MaxGekk @cloud-fan if you have time to take a look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ulysses-you for improving on this! Have some questions. Thanks.
* V1 write includes both datasoruce and hive, that requires a specific ordering of data. | ||
* It should be resolved by [[V1Writes]]. | ||
* | ||
* TODO: we can also support specific distribution here if necessary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you help create a JIRA here? Thanks, cc @wangyum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created SPARK-37333
.map(SortOrder(_, Ascending)) | ||
} | ||
|
||
def prepareQuery( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's safe to check output ordering inside logical plan. The output ordering may be changed quite much during physical planning (e.g. shuffle added for join/aggregate can destroy output ordering). Ideally we should rely on physical plan rule EnsureRequirements
to add proper sort.
I am wondering, how hard to make the code rely on SparkPlan.requiredChildOrdering
for DSv1 write code path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually after taking a deeper look, LogicalPlan.outputOrdering
was introduced to eliminate unnecessary sort in logical planning (#20560), and only several operators preserves ordering (like Filter
, Project
), so it won't cause correctness issue here.
But the problem of LogicalPlan.outputOrdering
is it being too conservative. We may add unnecessary sort here for complex queries (e.g. query with sort merge join on partition columns before writing to table with dynamic partitions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you @c21 for pointing out this and I see what you concern about. The reason I used the LogicalPlan.outputOrdering
is:
- Add sort at logical side has benefits if the plan exists a sort. e.g.
We can eliminate the user specified sort using
InsertIntoTable (partition) Sort (not dynamic columns) ....
EliminateSorts
inOptimizer
. But if we add the sort at physical plan, we will do the sort twice even the first sort has no effect. - For now, I prefer to keep the same approach with
V2Writes
which also add the required ordering even distribution at logical side. We can optimize them together if we find a more better approach in future. - I thnk it's safe and no perf regression that add a sort at logical side. Since we have the
RemoveRedundantSorts
at physical side, that rule can remove the sort we added if it's uncessary (e.g. sort + smj with dynamic partitions).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ulysses-you - ah yes, we are also having RemoveRedundantSorts
at physical planning, so I think we are good here. Thanks for explanation!
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #145238 has finished for PR 34568 at commit
|
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #145453 has finished for PR 34568 at commit
|
SchemaPruning :: V2ScanRelationPushDown :: V1Writes :: V2Writes :: | ||
PruneFileSourcePartitions:: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does slightly confuses that V*Writes
are here. Look, earlyScanPushDownRules
is about:
" ... projection and filter pushdown to scans"
but V1Writes:
"... makes sure the v1 write requirement, e.g. requiredOrdering"
something like opposite thing - pulling from instead of pushing down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is some history of why V2Writes
is at earlyScanPushDownRules
, #30806 (comment).
I agree the name is not matched, do you have other better place to go ?
val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { | ||
private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix | ||
override def computePrefix(row: InternalRow): | ||
UnsafeExternalRowSorter.PrefixComputer.Prefix = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, fix indentation like in the original code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
dfd7435
to
7fddb62
Compare
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #145740 has finished for PR 34568 at commit
|
7fddb62
to
d36f2f0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ulysses-you for the work! Having some comments. @cloud-fan and @wangyum could you guys help take a look when you have time? Thanks.
val enableRadixSort = sparkSession.sessionState.conf.enableRadixSort | ||
val outputSchema = empty2NullPlan.schema | ||
Some(ConcurrentOutputWriterSpec(maxWriters, | ||
() => SortExec.createSorter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel this refactoring (SortExec.createSorter
) is not very necessary. Why can't we create a SortExec
operator and call createSorter()
as before? What's the advantage of current code compared to before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look at the previous code, we create and eval a SortExec is mainly for the ordering of dynamic partition. For the concurrent writers, we only need the sorter. After we pull out the sort, create a new SortExec seems overkill.
import org.apache.spark.sql.internal.SQLConf | ||
|
||
/** | ||
* V1 write includes both datasoruce and hive, that requires a specific ordering of data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: datasoruce
-> datasource v1
|
||
trait V1WritesHelper { | ||
|
||
def getBucketSpec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about naming it as getWriterBucketSpec
? BucketSpec
is another class in Spark, which is different from WriterBucketSpec
. Also bucketSpec
is a parameter, so getWriterBucketSpec
looks less confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense !
} | ||
} | ||
|
||
trait V1WritesHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking through the subclasses of this one, I found this class is meant to be a utility class, but not an interface to implement. Shall we change this to object V1WritesUtils
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea of the V1WritesHelper
is from the AdaptiveSparkPlanHelper
which also contains some util methods. And there are many place in sql use the helper even if they are not stateful. Personally, I don't have a big option about utility and helper.
import org.apache.spark.sql.execution.datasources.BucketingUtils | ||
import org.apache.spark.sql.hive.client.HiveClientImpl | ||
|
||
trait V1HiveWritesHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems to be a utility class as well, how about object V1WritesForHiveUtils
?
import org.apache.spark.sql.hive.client.HiveClientImpl | ||
|
||
trait V1HiveWritesHelper { | ||
def options(bucketSpec: Option[BucketSpec]): Map[String, String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can make the function name more verbose, e.g. getOptionsWithHiveBucketWrite
* | ||
* TODO(SPARK-37333): Specify the required distribution at V1Write | ||
*/ | ||
trait V1Write extends DataWritingCommand with V1WritesHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V1Write
extending an interface called V1WritesHelper
, looks a bit weird. I think V1WrtiesHelper
is just a utility class, so we don't need extend here (per https://github.com/apache/spark/pull/34568/files#r807523085).
val requiredOrdering = | ||
partitionColumns ++ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns | ||
// the sort order doesn't matter | ||
val actualOrdering = empty2NullPlan.outputOrdering.map(_.child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a issue here, since we have AQE. The plan is the AdaptiveSparkPlanExec
who has no outputOrdering
. For dynamic partition write, the code will always add an extra sort.
This pr can resolve this issue together. @cloud-fan @c21
…leFormatWriter ### What changes were proposed in this pull request? `FileFormatWriter.write` is used by all V1 write commands including data source and hive tables. Depending on dynamic partitions, bucketed, and sort columns in the V1 write command, `FileFormatWriter` can add a physical sort on top of the query plan which is not visible from plan directly. This PR (based on #34568) intends to pull out the physical sort added by `FileFormatWriter` into logical planning. It adds a new logical rule `V1Writes` to add logical Sort operators based on the required ordering of a V1 write command. This behavior can be controlled by the new config **spark.sql.optimizer.plannedWrite.enabled** (default: true). ### Why are the changes needed? Improve observability of V1 write, and unify the logic of V1 and V2 write commands. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit tests. Closes #37099 from allisonwang-db/spark-37287-v1-writes. Authored-by: allisonwang-db <allison.wang@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
V1Write
to hold some sort infos of v1 write. e.g., partition columns, bucket spec.V1Write
, includes both datasource and hiveV1Writes
to decide if we should add aSort
operator based itsV1Write.requiredOrdering
. This rule should be similar withV2Writes
.SortExec
inFileFormatWriter.write
.Why are the changes needed?
FileFormatWriter.write
now is used by all V1 write which includes datasource and hive table. However it contains a sort which is based on dynamic partition and bucket columns that can not be seen in plan directly.V2 write has a better approach that it satisfies the order or even distribution by using rule
V2Writes
.V1 write should do the similar thing with V2 write.
Does this PR introduce any user-facing change?
no.
How was this patch tested?
this is a code refactor, so it should pass CI