Skip to content

[SPARK-56588][CORE] Fix PathOutputCommitProtocol dynamic partition overwrite#55622

Open
peter-toth wants to merge 3 commits intoapache:masterfrom
peter-toth:SPARK-56588-fix-pathoutputcommitprotocol-dynamic-partition-overwrite
Open

[SPARK-56588][CORE] Fix PathOutputCommitProtocol dynamic partition overwrite#55622
peter-toth wants to merge 3 commits intoapache:masterfrom
peter-toth:SPARK-56588-fix-pathoutputcommitprotocol-dynamic-partition-overwrite

Conversation

@peter-toth
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Fix PathOutputCommitProtocol to correctly handle dynamicPartitionOverwrite=true when the underlying committer is a FileOutputCommitter. Two bugs were introduced by #37468 (SPARK-40034):

  1. FileOutputCommitter was not redirected to the staging directory (stagingDir) in setupCommitter. As a result, the committer wrote task output directly to the final destination path rather than staging it, so the commitJob rename-and-overwrite logic never ran and INSERT OVERWRITE would append instead of replacing existing partition directories.

  2. newTaskTempFile did not populate partitionPaths. With no recorded partitions, commitJob skipped the delete-before-rename step entirely, again leaving old partition data in place.

The fix mirrors the approach already used in SQLHadoopMapReduceCommitProtocol:

  • In setupCommitter, reinitialize FileOutputCommitter with stagingDir when dynamicPartitionOverwrite=true (matching SQLHadoopMapReduceCommitProtocol).
  • In newTaskTempFile, track partitionPaths += dir.get when the committer is a FileOutputCommitter (matching the parent class HadoopMapReduceCommitProtocol; the guard is intentionally absent for cloud committers that handle dynamic partition overwrite natively via StreamCapabilities).
  • Widen partitionPaths in HadoopMapReduceCommitProtocol from private to protected so the subclass can write to it.

Why are the changes needed?

INSERT OVERWRITE on a partitioned table with partitionOverwriteMode=dynamic silently appends data instead of replacing the written partitions when PathOutputCommitProtocol is in use.

The bug was introduced by #37468 (SPARK-40034), which enabled FileOutputCommitter-backed dynamic partition overwrite in PathOutputCommitProtocol without wiring up the staging mechanism that HadoopMapReduceCommitProtocol and SQLHadoopMapReduceCommitProtocol rely on to delete old partition directories and atomically move staged output into place.

The problem became more visible after #32518 (SPARK-35383) changed SparkContext to activate PathOutputCommitProtocol whenever hadoop-cloud is on the classpath (via fillMissingMagicCommitterConfsIfNeeded()), not only when a magic-committer bucket is explicitly configured.

Does this PR introduce any user-facing change?

Yes. With spark.sql.sources.partitionOverwriteMode=dynamic and PathOutputCommitProtocol active (the default when hadoop-cloud is available), INSERT OVERWRITE now correctly replaces the written partition directories instead of appending to them.

How was this patch tested?

Three unit tests added to CommitterBindingSuite:

  • SPARK-56588: FileOutputCommitter dynamic partition overwrite stages output and tracks partitions — verifies the temp file path goes through .spark-staging- and that partitionPaths is populated for FileOutputCommitter.
  • SPARK-56588: Cloud committer with dynamic partition support does not track partitions in partitionPaths — verifies that cloud committers implementing CAPABILITY_DYNAMIC_PARTITIONING via StreamCapabilities do not have their partitions tracked in Spark's partitionPaths (they manage overwrite themselves).
  • SPARK-56588: FileOutputCommitter without dynamicPartitionOverwrite does not track partitions — baseline regression guard.

Manually verified with a Spark shell session (hadoop-cloud on the classpath):

➜ bin/spark-shell

scala> // Write initial data to two partitions
scala> spark.range(6).selectExpr("id % 2 as p", "id as v").write.partitionBy("p").mode("overwrite").parquet("/tmp/repro")
scala> // p=0: rows 0,2,4   p=1: rows 1,3,5

scala> spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

scala> // Overwrite only p=0
scala> spark.createDataFrame(Seq((0L, 99L))).toDF("p", "v").write.partitionBy("p").mode("overwrite").parquet("/tmp/repro")

scala> spark.read.parquet("/tmp/repro").orderBy("p", "v").show()

// Before fix: p=0 still shows rows 0,2,4,99  (appended, not replaced)
+---+---+
|  v|  p|
+---+---+
|  0|  0|
|  2|  0|
|  4|  0|
| 99|  0|
|  1|  1|
|  3|  1|
|  5|  1|
+---+---+

// After fix:
+---+---+
|  v|  p|
+---+---+
| 99|  0|   <- p=0 correctly replaced
|  1|  1|   <- p=1 untouched
|  3|  1|
|  5|  1|
+---+---+

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6

…erwrite

### What changes were proposed in this pull request?

Fix `PathOutputCommitProtocol` to correctly handle `dynamicPartitionOverwrite=true` when the underlying committer is a `FileOutputCommitter`. Two bugs were introduced by apache#37468 ([SPARK-40034]):

1. `FileOutputCommitter` was not redirected to the staging directory (`stagingDir`) in `setupCommitter`. As a result, the committer wrote task output directly to the final destination path rather than staging it, so the `commitJob` rename-and-overwrite logic never ran and `INSERT OVERWRITE` would append instead of replacing existing partition directories.

2. `newTaskTempFile` did not populate `partitionPaths`. With no recorded partitions, `commitJob` skipped the delete-before-rename step entirely, again leaving old partition data in place.

The fix mirrors the approach already used in `SQLHadoopMapReduceCommitProtocol`:
- In `setupCommitter`, reinitialize `FileOutputCommitter` with `stagingDir` when `dynamicPartitionOverwrite=true` (matching `SQLHadoopMapReduceCommitProtocol`).
- In `newTaskTempFile`, track `partitionPaths += dir.get` when the committer is a `FileOutputCommitter` (matching the parent class `HadoopMapReduceCommitProtocol`; the guard is intentionally absent for cloud committers that handle dynamic partition overwrite natively via `StreamCapabilities`).
- Widen `partitionPaths` in `HadoopMapReduceCommitProtocol` from `private` to `protected` so the subclass can write to it.

### Why are the changes needed?

`INSERT OVERWRITE` on a partitioned table with `partitionOverwriteMode=dynamic` silently appends data instead of replacing the written partitions when `PathOutputCommitProtocol` is in use.

The bug was introduced by apache#37468 ([SPARK-40034]), which enabled `FileOutputCommitter`-backed dynamic partition overwrite in `PathOutputCommitProtocol` without wiring up the staging mechanism that `HadoopMapReduceCommitProtocol` and `SQLHadoopMapReduceCommitProtocol` rely on to delete old partition directories and atomically move staged output into place.

The problem became more visible after apache#32518 ([SPARK-35383]) changed `SparkContext` to activate `PathOutputCommitProtocol` whenever hadoop-cloud is on the classpath (via `fillMissingMagicCommitterConfsIfNeeded()`), not only when a magic-committer bucket is explicitly configured.

### Does this PR introduce _any_ user-facing change?

Yes. With `spark.sql.sources.partitionOverwriteMode=dynamic` and `PathOutputCommitProtocol` active (the default when hadoop-cloud is available), `INSERT OVERWRITE` now correctly replaces the written partition directories instead of appending to them.

### How was this patch tested?

Three unit tests added to `CommitterBindingSuite`:
- `SPARK-56588: FileOutputCommitter dynamic partition overwrite stages output and tracks partitions` — verifies the temp file path goes through `.spark-staging-` and that `partitionPaths` is populated for `FileOutputCommitter`.
- `SPARK-56588: Cloud committer with dynamic partition support does not track partitions in partitionPaths` — verifies that cloud committers implementing `CAPABILITY_DYNAMIC_PARTITIONING` via `StreamCapabilities` do not have their partitions tracked in Spark's `partitionPaths` (they manage overwrite themselves).
- `SPARK-56588: FileOutputCommitter without dynamicPartitionOverwrite does not track partitions` — baseline regression guard.

Manually verified with a Spark shell session (hadoop-cloud on the classpath, which activates `PathOutputCommitProtocol` via `fillMissingMagicCommitterConfsIfNeeded`):

  // Write initial data to two partitions
  spark.range(6).selectExpr("id % 2 as p", "id as v").write.partitionBy("p").mode("overwrite").parquet("/tmp/repro")
  // p=0: rows 0,2,4   p=1: rows 1,3,5

  spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

  // Overwrite only p=0
  spark.createDataFrame(Seq((0L, 99L))).toDF("p", "v").write.partitionBy("p").mode("overwrite").parquet("/tmp/repro")

  spark.read.parquet("/tmp/repro").orderBy("p", "v").show()
  // Before fix: p=0 still shows rows 0,2,4,99  (appended, not replaced)
  // After fix:
  // +---+---+
  // |  v|  p|
  // +---+---+
  // | 99|  0|   <- p=0 correctly replaced
  // |  1|  1|   <- p=1 untouched
  // |  3|  1|
  // |  5|  1|
  // +---+---+

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6
@peter-toth
Copy link
Copy Markdown
Contributor Author

cc @steveloughran, @dongjoon-hyun

@peter-toth peter-toth changed the title [SPARK-56588][CORE] Fix PathOutputCommitProtocol dynamic partition overwrite [SPARK-56588][CORE] Fix PathOutputCommitProtocol dynamic partition overwrite Apr 30, 2026
Copy link
Copy Markdown
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

been a while since I looked at this protocol. I think it's good.

With iceberg et al we don't need all these complications at job time, and get to change partitions in interesting ways...

// partitionPaths.
val ctor =
committer.getClass.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
committer = ctor.newInstance(stagingDir, context)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be unique per job.

we've had escalations where two scheduled jobs overlapped (one committing while the other starting up), and while they never conflicted in write ops, use of shared temp dirs broke things.

do review, include overlap condition in test. thx

Copy link
Copy Markdown
Contributor Author

@peter-toth peter-toth Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is that possible? As far as I see the commit protocol is instantiated via FileCommitProtocol.instantiate() and called from either InsertIntoHadoopFsRelationCommand (DSv1) or FileWrite (DSv2). In both cases the jobId is UUID.randomUUID(), so staging directory collisions should not be possible for this code path.
But if there's an issue then it should be a separate, pre-existing one as this fix copies SQLHadoopMapReduceCommitProtocol logic.

committer.setupJob(tContext)
committer.setupTask(tContext)

committer.newTaskTempFile(tContext, Some("a=1"), FileNameSpec("", ".parquet"))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can add an assert on the path of this too; should be under the staging dir

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added assert in 031b850.

Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much, @peter-toth . I haven't invest my time for this bug.

}

}
/*
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just nit. Could you follow the coding guideline for multi-line comments?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 7af57eb.

* overwriting itself, and the commitJob rename loop must not interfere.
*/
test("SPARK-56588: Cloud committer with dynamic partition support does not track partitions in " +
"partitionPaths") {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding these test cases.

Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

Just a question, did you verify this manually with S3 magic committer on S3 or your HDFS?

@peter-toth
Copy link
Copy Markdown
Contributor Author

Just a question, did you verify this manually with S3 magic committer on S3 or your HDFS?

Not yet, but I can conduct such tests next week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants