Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41551][SQL] Dynamic/absolute path support in PathOutputCommitters #40221

Conversation

steveloughran
Copy link
Contributor

What changes were proposed in this pull request?

Follow on to SPARK-40034, Dynamic/absolute path support in PathOutputCommitters

Dynamic partitioning though the PathOutputCommitProtocol needs to add the target directories to the superclass's partition list else the partition delete doesn't
take place -the job extends the dataset, rather than replaces it.

Fix:

  • add an addPartition() method subclasses can use
  • add a getPartitions method to return an immutable
    copy of the list for testing.
  • add the test.
  • Also fix newTaskTempFileAbsPath() to return a path, irrespective of committer type.

In dynamic mode, because the parent dir of an absolute path is deleted, there's a safety check to reject any requests for a file in a parent dir. This
is something which could be pulled up to HadoopMapReduceCommitProtocol -it needs the same check, if the risk is considered realistic.

The patch now downgrades from failing on dynamic partitioning if the committer doesn't declare it supports it to printing a warning. Why this? well, it
does work through the s3a committers, it's just O(data). If someone does want to do INSERT OVERWRITE then they can be allowed to, just warned about
it. The outcome will be correct except in the case of: "if the driver fails partway through dir rename, only some of the files will be there"

Finally, it update the protocol spec in HadoopMapReduceCommitProtocol to cover the dynamic partition job commit in more detail.

Why are the changes needed?

  • The code in SPARK-40034 was incomplete. The new tests show that it works now.
  • dynamic partition jobs aren't actually incorrect through the s3 committers, just slow, so if someone really wants it they should be free to.
  • The newFileAbsPath() code is required of all committers, despite its near-total lack of use.

Does this PR introduce any user-facing change?

Updates the cloud docs to say that dynamic partition overwrite does work everywhere, just may be really slow.

How was this patch tested?

New unit tests in CommitterBindingSuite; `

New test suite PathOutputPartitionedWriteSuite extends PartitionedWriteSuite which runs the PartitionedWriteSuite through
the PathOutputCommitter and with, on hadoop 3.3.5+ the manifest committer chosen.

executed with

  • hadoop 3.3.4
  • RC2 of hadoop 3.3.5 through `-Dhadoop.version=3.3.5 -Psnapshots-and-staging

Follow on to SPARK-40034.

Dynamic partitioning though the PathOutputCommitProtocol needs
to add the dirs to the superclass's partition list else the
partition delete doesn't take place.

Fix:
* add an addPartition() method subclasses can use
* add a getPartitions method to return an immutable
  copy of the list for testing.
* add tests to verify all of this.

Also fix newTaskTempFileAbsPath to return
a path, irrespective of committer type.

In dynamic mode, because the parent dir of
an absolute path is deleted, there's a
safety check to reject any requests
for a file in a parent dir.
This is something which could be pulled up to
HadoopMapReduceCommitProtocol because it needs
the same check, if the risk is considered realistic.

The patch now downgrades from failing on dynamic
partitioning
if the committer doesn't declare it supports it to printing
a warning.
Why this? well, it *does* work through the s3a committers,
it's just O(data). If someone does want to do INSERT OVERWRITE
then they can be allowed to, just warned about it.
The outcome will be correct except in the case
of: "if the driver fails partway through dir rename,
only some of the files will be there"

Google GCS has that same non-atomic rename issue.
But: even on an FS with atomic dir rename, any job
which fails partway through the overwrite process
is going to leave the fs in an inconsistent state,
such as
* some parts with new data, some parts not yet overwritten
* a directory deleted and the new data not instantiated

So it's not that much worse.

The patch tries to update the protocol spec in
HadoopMapReduceCommitProtocol to cover both
newFileAbsPath() semantics/commit and failure
modes of dynamic partition commit.

Adds new test suite PathOutputPartitionedWriteSuite

That suite doesn't validate the dynamic overwrite code, which
can be demonstrated by commenting out addPartitionedDir() in
PathOutputCommitProtocol.newTaskTempFile().

The updated CommitterBindingSuite will fail if that line
is commented out, so it is verifying that the base committer
is given the list of updated directories
@steveloughran steveloughran force-pushed the SPARK-41551-path-output-dynamic-lean branch from a25ea81 to 536f2d6 Compare February 28, 2023 16:57
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the comment, let me revert SPARK-40034 from branch-3.4 first, @steveloughran .

The code in SPARK-40034 was incomplete.

@github-actions
Copy link

github-actions bot commented Jun 9, 2023

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jun 9, 2023
@github-actions github-actions bot closed this Jun 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants