-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19563][SQL] avoid unnecessary sort in FileFormatWriter #16898
Conversation
// We should first sort by partition columns, then bucket id, and finally sorting columns. | ||
val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns) | ||
.map(SortOrder(_, Ascending)) | ||
val rdd = if (requiredOrdering == queryExecution.executedPlan.outputOrdering) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If data's outputOrdering is [partCol1, partCol2, dataCol1], here the requiredOrdering is [partCol1, partCol2], you will miss this optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I should check the subset, good catch!
val rdd = if (requiredOrdering == queryExecution.executedPlan.outputOrdering) { | ||
queryExecution.toRdd | ||
} else { | ||
SortExec(requiredOrdering, global = false, queryExecution.executedPlan).execute() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using SortExec
here is clever.
val actualOrdering = queryExecution.executedPlan.outputOrdering | ||
// We can still avoid the sort if the required ordering is [partCol] and the actual ordering | ||
// is [partCol, anotherCol]. | ||
val rdd = if (requiredOrdering == actualOrdering.take(requiredOrdering.length)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only care if partition columns are the same between requiredOrdering and actualOrdering. The sort direction doesn't matter.
@@ -120,9 +127,10 @@ object FileFormatWriter extends Logging { | |||
serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), | |||
outputWriterFactory = outputWriterFactory, | |||
allColumns = queryExecution.logical.output, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Directly use allColumns
created above.
currentKey = nextKey.copy() | ||
logDebug(s"Writing partition: $currentKey") | ||
for (row <- iter) { | ||
val nextPartColsAndBucketId = getPartitionColsAndBucketId(row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getPartitionColsAndBucketId
is an unsafe projection. So nextPartColsAndBucketId
is a new unsafe row. Do we still need a copy
when assigning it to currentPartColsAndBucketId
?
Previously we need a copy because getBucketingKey
can be an identity
function. So the nextKey
can be the same unsafe row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you take a look at the GenerateUnsafeProject
, actually it will reuse the same row instance, so we need to copy.
Few comments. Others LGTM. |
Test build #72764 has finished for PR 16898 at commit
|
Test build #72767 has finished for PR 16898 at commit
|
val rdd = if (requiredOrdering == actualOrdering.take(requiredOrdering.length)) { | ||
queryExecution.toRdd | ||
} else { | ||
SortExec(requiredOrdering, global = false, queryExecution.executedPlan).execute() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I met this case before. IIRC, this complains in Scala 2.10. I guess it should be
SortExec(requiredOrdering, global = false, child = queryExecution.executedPlan).execute()
because it seems the complier gets confused the positional/named arguments.
I am running a build with 2.10 to help verify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, it seems it complains.
[error] .../spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala:160: not enough arguments for method apply: (sortOrder: Seq[org.apache.spark.sql.catalyst.expressions.SortOrder], global: Boolean, child: org.apache.spark.sql.execution.SparkPlan, testSpillFrequency: Int)org.apache.spark.sql.execution.SortExec in object SortExec.
[error] Unspecified value parameter child.
[error] SortExec(requiredOrdering, global = false, queryExecution.executedPlan).execute()
[error]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
val partitionSet = AttributeSet(partitionColumns) | ||
val dataColumns = queryExecution.logical.output.filterNot(partitionSet.contains) | ||
val bucketColumns = bucketSpec.toSeq.flatMap { | ||
spec => spec.bucketColumnNames.map(c => allColumns.find(_.name == c).get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: allColumns
-> dataColumns
?
No need to look at all columns since Spark doesn't allow bucketing over partition columns.
scala> df1.write.format("orc").partitionBy("i").bucketBy(8, "i").sortBy("k").saveAsTable("table70")
org.apache.spark.sql.AnalysisException: bucketBy columns 'i' should not be part of partitionBy columns 'i';
HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression | ||
} | ||
// We should first sort by partition columns, then bucket id, and finally sorting columns. | ||
val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible over-optimization : Spark allows sorting over partition columns so requiredOrdering
can be changed to do:
partitionColumns
+ bucketIdExpression
+ (sortColumns
which are not in partitionColumns
)
so that any extra column(s) in sort expression can be deduped.
scala> df1.write.format("orc").partitionBy("i").bucketBy(8, "i").sortBy("k").saveAsTable("table70")
org.apache.spark.sql.AnalysisException: bucketBy columns 'i' should not be part of partitionBy columns 'i';
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it make sense to sort over partition columns in a bucket? I'm surprised if we support this...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not make sense (I thought it was intentional). This should definitely be fixed. I digged the commit logs to see that this was fixed for bucketing columns in #10891 but no discussion around sort columns. Will log a JIRA for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val requiredOrdering = (partitionColumns ++ bucketIdExpression ++ sortColumns) | ||
.map(SortOrder(_, Ascending)) | ||
val actualOrdering = queryExecution.executedPlan.outputOrdering | ||
// We can still avoid the sort if the required ordering is [partCol] and the actual ordering |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment makes it feel like its specific to partition columns but it the code below does not have anything specific to partition columns
val actualOrdering = queryExecution.executedPlan.outputOrdering | ||
// We can still avoid the sort if the required ordering is [partCol] and the actual ordering | ||
// is [partCol, anotherCol]. | ||
val rdd = if (requiredOrdering == actualOrdering.take(requiredOrdering.length)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could do semantic equals and not object equals. I recall using object equals in in EnsureRequirements
was adding unnecessary SORT in some cases : https://github.com/apache/spark/pull/14841/files#diff-cdb577e36041e4a27a605b6b3063fd54
@@ -189,7 +215,7 @@ object FileFormatWriter extends Logging { | |||
committer.setupTask(taskAttemptContext) | |||
|
|||
val writeTask = | |||
if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) { | |||
if (description.partitionColumns.isEmpty && description.numBuckets == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For someone reading the code, this might be non intuitive to understand that you are checking if there is no bucketing. 0
has been used in many places in this PR to check if table has bucketing. Maybe orthogonal to the PR, but in general we could have a util method to do this. I can send a tiny PR for this if you agree that its a good thing to do.
PS: Having 0 buckets is a thing in Hive however logically it makes no sense and confusing. Under the hood, it treats that as a table with single bucket. Its good that Spark does not allow this.
# hive-1.2.1
hive> CREATE TABLE tejasp_temp_can_be_deleted (key string, value string) CLUSTERED BY (key) INTO 0 BUCKETS;
Time taken: 1.144 seconds
hive> desc formatted tejasp_temp_can_be_deleted;
# Storage Information
...
Num Buckets: 0
Bucket Columns: [key]
Sort Columns: []
hive>INSERT OVERWRITE TABLE tejasp_temp_can_be_deleted SELECT * FROM ....;
# doing `ls` on the output directory shows a single file
@@ -329,31 +349,41 @@ object FileFormatWriter extends Logging { | |||
* If bucket id is specified, we will append it to the end of the file name, but before the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit for previous line: Open and returns a ...
this method does not return anything
nit: typo in pr title |
Test build #72942 has finished for PR 16898 at commit
|
Test build #72945 has finished for PR 16898 at commit
|
retest this please |
Test build #72955 has finished for PR 16898 at commit
|
retest this please |
Test build #73059 has finished for PR 16898 at commit
|
Test build #73061 has finished for PR 16898 at commit
|
cc @tejasapatil how is the updated version? |
@cloud-fan : LGTM |
Sorry, I am late. Will review it tonight. Thanks! |
@@ -108,9 +107,21 @@ object FileFormatWriter extends Logging { | |||
job.setOutputValueClass(classOf[InternalRow]) | |||
FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) | |||
|
|||
val allColumns = queryExecution.logical.output | |||
val partitionSet = AttributeSet(partitionColumns) | |||
val dataColumns = queryExecution.logical.output.filterNot(partitionSet.contains) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we rewrite it to val dataColumns = allColumns.filterNot(partitionColumns.contains)
, we do not need partitionSet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's so minor, I'll fix it in my next PR
@@ -287,31 +320,16 @@ object FileFormatWriter extends Logging { | |||
* multiple directories (partitions) or files (bucketing). | |||
*/ | |||
private class DynamicPartitionWriteTask( | |||
description: WriteJobDescription, | |||
desc: WriteJobDescription, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SingleDirectoryWriteTask
is still using description
. Change both or keep it unchanged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to change both to make it consistent.
} else { | ||
requiredOrdering.zip(actualOrdering).forall { | ||
case (requiredOrder, childOutputOrder) => | ||
requiredOrder.semanticEquals(childOutputOrder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because bucketIdExpression
is HashPartitioning
, this will never match, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's HashPartitioning(...).partitionIdExpression
, which returns Pmod(new Murmur3Hash(expressions), Literal(numPartitions))
, so it may match
thanks for the review, merging to master! |
## What changes were proposed in this pull request? In `FileFormatWriter`, we will sort the input rows by partition columns and bucket id and sort columns, if we want to write data out partitioned or bucketed. However, if the data is already sorted, we will sort it again, which is unnecssary. This PR removes the sorting logic in `FileFormatWriter` and use `SortExec` instead. We will not add `SortExec` if the data is already sorted. ## How was this patch tested? I did a micro benchmark manually ``` val df = spark.range(10000000).select($"id", $"id" % 10 as "part").sort("part") spark.time(df.write.partitionBy("part").parquet("/tmp/test")) ``` The result was about 6.4 seconds before this PR, and is 5.7 seconds afterwards. close apache#16724 Author: Wenchen Fan <wenchen@databricks.com> Closes apache#16898 from cloud-fan/writer.
// We should first sort by partition columns, then bucket id, and finally sorting columns. | ||
val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns | ||
// the sort order doesn't matter | ||
val actualOrdering = queryExecution.executedPlan.outputOrdering.map(_.child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan would it be possible to use the logical plan rather than the executedPlan? If the optimizer decides the data is already sorted according according to the logical plan the executedPlan won't include the fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be great, but may need some refactoring.
In `FileFormatWriter`, we will sort the input rows by partition columns and bucket id and sort columns, if we want to write data out partitioned or bucketed. However, if the data is already sorted, we will sort it again, which is unnecssary. This PR removes the sorting logic in `FileFormatWriter` and use `SortExec` instead. We will not add `SortExec` if the data is already sorted. I did a micro benchmark manually ``` val df = spark.range(10000000).select($"id", $"id" % 10 as "part").sort("part") spark.time(df.write.partitionBy("part").parquet("/tmp/test")) ``` The result was about 6.4 seconds before this PR, and is 5.7 seconds afterwards. close apache#16724 Author: Wenchen Fan <wenchen@databricks.com> Closes apache#16898 from cloud-fan/writer.
What changes were proposed in this pull request?
In
FileFormatWriter
, we will sort the input rows by partition columns and bucket id and sort columns, if we want to write data out partitioned or bucketed.However, if the data is already sorted, we will sort it again, which is unnecssary.
This PR removes the sorting logic in
FileFormatWriter
and useSortExec
instead. We will not addSortExec
if the data is already sorted.How was this patch tested?
I did a micro benchmark manually
The result was about 6.4 seconds before this PR, and is 5.7 seconds afterwards.
close #16724