Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37287][SQL] Pull out dynamic partition and bucket sort from FileFormatWriter #37099

Closed

Conversation

allisonwang-db
Copy link
Contributor

@allisonwang-db allisonwang-db commented Jul 6, 2022

What changes were proposed in this pull request?

FileFormatWriter.write is used by all V1 write commands including data source and hive tables. Depending on dynamic partitions, bucketed, and sort columns in the V1 write command, FileFormatWriter can add a physical sort on top of the query plan which is not visible from plan directly.

This PR (based on #34568) intends to pull out the physical sort added by FileFormatWriter into logical planning. It adds a new logical rule V1Writes to add logical Sort operators based on the required ordering of a V1 write command. This behavior can be controlled by the new config spark.sql.optimizer.plannedWrite.enabled (default: true).

Why are the changes needed?

Improve observability of V1 write, and unify the logic of V1 and V2 write commands.

Does this PR introduce any user-facing change?

No

How was this patch tested?

New unit tests.

@allisonwang-db allisonwang-db marked this pull request as draft July 6, 2022 06:47
@github-actions github-actions bot added the SQL label Jul 6, 2022
Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @allisonwang-db for picking this up! Knowing the PR is still draft, but just leave some early questions/comments. cc @ulysses-you and @cloud-fan as well.

.internal()
.doc("When set to true, Spark adds logical sorts to V1 write commands if needed so that " +
"`FileFormatWriter` does not need to insert physical sorts.")
.version("3.2.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 3.4.0?

@@ -3781,6 +3781,14 @@ object SQLConf {
.intConf
.createWithDefault(0)

val PLANNED_WRITE_ENABLED = buildConf("spark.sql.plannedWrite.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: feeling the config name is a little bit obscure. could it be spark.sql.requireOrderingForV1Writers or something similar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed the name is not very descriptive. Planned write here means we want to explicitly plan file writes instead of adding various operations when executing the write. It could include things other than required ordering in the future. I am happy to brainstorm more here.

@@ -3781,6 +3781,14 @@ object SQLConf {
.intConf
.createWithDefault(0)

val PLANNED_WRITE_ENABLED = buildConf("spark.sql.plannedWrite.enabled")
.internal()
.doc("When set to true, Spark adds logical sorts to V1 write commands if needed so that " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark -> Spark optimizer could be clearer that sort is added during query planning.

outputColumns,
query,
SparkSession.active.sessionState.conf.resolver)
// We do not need the path option from the table location to get writer bucket spec.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry why we need the comment here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the confusing comment. It means we don't need other option values like table path when getting the writer bucket spec.

val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
Some(sessionState.catalog.defaultTablePath(table.identifier))
} else {
table.storage.locationUri
}

@@ -211,6 +180,7 @@ object FileFormatWriter extends Logging {

try {
val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
logInfo(s"Output ordering is matched for write job ${description.uuid}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it just for debugging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in unit test to check when v1 writes is enabled, we should have added a logical sort and thus do not need to add a physical sort (ordering should match).

trait V1WriteCommand extends DataWritingCommand {
// Specify the required ordering for the V1 write command. `FileFormatWriter` will
// add SortExec if necessary when the requiredOrdering is empty.
def requiredOrdering: Seq[SortOrder]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just brainstorming here, if we plan to add a requirement for partitioning, e.g. support shuffle before writing bucket table. Do we want to add a similar RequiresDistributionAndOrdering as v2 now or not?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add one more method: requiredPartitioning

@allisonwang-db allisonwang-db marked this pull request as ready for review July 19, 2022 05:27
@cloud-fan cloud-fan closed this in 2562274 Jul 19, 2022
@c21
Copy link
Contributor

c21 commented Jul 19, 2022

Thanks @cloud-fan and @allisonwang-db for pushing on this. This is great!

I will work on supporting required partitioning for V1 in this week (https://issues.apache.org/jira/browse/SPARK-37287). The motivation is to support shuffling on bucket columns when writing Hive bucket table. cc @cloud-fan, @allisonwang-db and @ulysses-you FYI.

@gengliangwang
Copy link
Member

So there was an optimization in #32198 which can avoid local sort if there are only a small set of partition/bucket values.
Is there optimization gone after the changes in this PR?

@cloud-fan
Copy link
Contributor

That optimization is off by default. When it's turned on, we skip planned write.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @allisonwang-db and @cloud-fan

There is a correctness issue report for this configuration, SPARK-44512, for Apache Spark 3.4.0+. Could you take a look at that?

@dongjoon-hyun
Copy link
Member

Sorry all. After checking the reported use case once more, I found that that it's a false alarm. I closed the issue as Not A Problem.

dongjoon-hyun pushed a commit that referenced this pull request Nov 13, 2023
…rom `DataSource`

### What changes were proposed in this pull request?
`resolvePartitionColumns` was introduced by SPARK-37287 (#37099) and become unused after SPARK-41713 (#39220), so this pr remove it from `DataSource`.

### Why are the changes needed?
Clean up unused code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43779 from LuciferYang/SPARK-45902.

Lead-authored-by: yangjie01 <yangjie01@baidu.com>
Co-authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants