Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22217] [SQL] ParquetFileFormat to support arbitrary OutputCommitters #19448

Closed

Conversation

steveloughran
Copy link
Contributor

@steveloughran steveloughran commented Oct 6, 2017

What changes were proposed in this pull request?

ParquetFileFormat to relax its requirement of output committer class from org.apache.parquet.hadoop.ParquetOutputCommitter or subclass thereof (and so implicitly Hadoop FileOutputCommitter) to any committer implementing org.apache.hadoop.mapreduce.OutputCommitter

This enables output committers which don't write to the filesystem the way FileOutputCommitter does to save parquet data from a dataframe: at present you cannot do this.

Before a committer which isn't a subclass of ParquetOutputCommitter, it checks to see if the context has requested summary metadata by setting parquet.enable.summary-metadata. If true, and the committer class isn't a parquet committer, it raises a RuntimeException with an error message.

(It could downgrade, of course, but raising an exception makes it clear there won't be an summary. It also makes the behaviour testable.)

Note that SQLConf already states that any OutputCommitter can be used, but that typically it's a subclass of ParquetOutputCommitter. That's not currently true. This patch will make the code consistent with the docs, adding tests to verify,

How was this patch tested?

The patch includes a test suite, ParquetCommitterSuite, with a new committer, MarkingFileOutputCommitter which extends FileOutputCommitter and writes a marker file in the destination directory. The presence of the marker file can be used to verify the new committer was used. The tests then try the combinations of Parquet committer summary/no-summary and marking committer summary/no-summary.

committer summary outcome
parquet true success
parquet false success
marking false success with marker
marking true exception

All tests are happy.

…ass, provided saveSummaries is disabled. With Tests

Change-Id: I19872dc1c095068ed5a61985d53cb7258bd9a9bb
@steveloughran
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Oct 6, 2017

Test build #82517 has finished for PR 19448 at commit e6fdbdc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented Oct 7, 2017

+1

I completely agree that using a ParquetOutputCommitter should be optional.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor suggestion otherwise LGTM.

if (conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
&& !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) {
// output summary is requested, but the class is not a Parquet Committer
throw new RuntimeException(s"Committer $committerClass is not a ParquetOutputCommitter" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IllegalArgumentException or some other better exception?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about require maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

&& !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) {
// output summary is requested, but the class is not a Parquet Committer
throw new RuntimeException(s"Committer $committerClass is not a ParquetOutputCommitter" +
s" and cannot create job summaries.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks we can remove this s BTW.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends on the policy about "what to do if it's not a parquet committer and the option for job summaries is set. It could just mean "you don't get summaries", which worksforme :). May want to log at info though?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I mean .. s in s" .. " (s for string interpolation)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah. in the move to require() everything is going back onto a single line. so now moot

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too, just few tiny nits while double checking.


override def afterAll(): Unit = {
spark.stop()
spark = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe super.afterAll()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, + will add a check for spark==null so if a failure happens during setup, the exception doesn't get lost in teardown

}

test("alternative output committer, no merge schema") {
writeDataFrame(MarkingFileOutput.COMMITTER, false, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think It might be a little bit better to use named arguments for readability: writeDataFrame(MarkingFileOutput.COMMITTER, summary = false, check = true)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

…test suite tuning

Change-Id: Ib7e99860fab66cb2bc47e2e4f90f4fc8041c7f03
@@ -138,6 +138,10 @@ class ParquetFileFormat
conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
}

require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to issue an AnalysisException here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnalysisException? Shouldn't this be SparkException? By the time this runs, Spark has already analyzed, optimized, and planned the job. Doesn't seem like failing analysis is appropriate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkException is better. Normally, we want to issue a Spark-specific exception type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkException makes it sound like it's a problem that Spark caused in some way. While this is caused by user input being incorrect, in which case the suggested IllegalArgumentException (which require throws) is better imo.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Spark SQL, we do issue the AnalysisException in many similar cases. I am also fine to use SparkException.

In this specific case, the users are able to control the conf to make it works. Thus, we also need to improve the message to let users know how to resolve it by changing the conf.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer the warn & continue option. It does little good to fail so late in a job, when the caller has already indicated that they want to use a different committer. Let them write the data out since this isn't a correctness issue, and they can add a summary file later if they want. Basically, there's less annoyance and interruption by not writing a summary file than by failing a job and forcing the user to re-run near the end.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for warn and continue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we issuing a warning log, we will see such a warning message for each write operation. Does it look annoying?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. there is that. Options: do something complicated with a static field to only print ones. Log at debug so people only see the message if they are trying to track things down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think once per write operation is fine. It's not like it is once per file.

spark.stop()
spark = null
}
super.afterAll()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    try {
      ...
    } finally {
      super.afterAll()
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

@SparkQA
Copy link

SparkQA commented Oct 11, 2017

Test build #82643 has finished for PR 19448 at commit d634f9e.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

import org.apache.spark.sql.test.SQLTestUtils

/**
* Test logic related to choice of output commtters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: commtters -> committers

@jiangxb1987
Copy link
Contributor

retest this please

Change-Id: I92420bff4afe180eda106337df253b0445e56979
@SparkQA
Copy link

SparkQA commented Oct 12, 2017

Test build #82680 has finished for PR 19448 at commit d634f9e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

require(!conf.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
|| classOf[ParquetOutputCommitter].isAssignableFrom(committerClass),
s"Committer $committerClass is not a ParquetOutputCommitter and cannot create job summaries."
+ " Set Parquet option " + ParquetOutputFormat.ENABLE_JOB_SUMMARY + " to false.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

  ...
  s"Committer $committerClass is not a ParquetOutputCommitter and cannot create job summaries. " +
  s"Set Parquet option '${ParquetOutputFormat.ENABLE_JOB_SUMMARY}' to false.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd thought about that; didn't look any better or worse. Will change it for log message.

@HyukjinKwon
Copy link
Member

Still LGTM except for few nits.

… tells user to unset the parquet property...is that needed now?

Change-Id: I1c34b341fb4e0e3297becec4fc3dd3e63c005b7c
@gatorsmile
Copy link
Member

LGTM pending Jenkins

@SparkQA
Copy link

SparkQA commented Oct 12, 2017

Test build #82692 has finished for PR 19448 at commit c93eb1b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

…text

Change-Id: Ibcc8ada3c57091dd6a03e3efbcbc4791c556a287
@rdblue
Copy link
Contributor

rdblue commented Oct 12, 2017

Still +1 from me as well.

@SparkQA
Copy link

SparkQA commented Oct 12, 2017

Test build #82700 has finished for PR 19448 at commit 42afccb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 12, 2017

Test build #82702 has finished for PR 19448 at commit f486263.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Oct 12, 2017

Merged to master and branch-2.2.

@asfgit asfgit closed this in 9104add Oct 12, 2017
@dongjoon-hyun
Copy link
Member

Hi, All.
Can we have this in Apache Spark 2.2.1?

asfgit pushed a commit that referenced this pull request Oct 13, 2017
…tters

## What changes were proposed in this pull request?

`ParquetFileFormat` to relax its requirement of output committer class from `org.apache.parquet.hadoop.ParquetOutputCommitter` or subclass thereof (and so implicitly Hadoop `FileOutputCommitter`) to any committer implementing `org.apache.hadoop.mapreduce.OutputCommitter`

This enables output committers which don't write to the filesystem the way `FileOutputCommitter` does to save parquet data from a dataframe: at present you cannot do this.

Before a committer which isn't a subclass of `ParquetOutputCommitter`, it checks to see if the context has requested summary metadata by setting `parquet.enable.summary-metadata`. If true, and the committer class isn't a parquet committer, it raises a RuntimeException with an error message.

(It could downgrade, of course, but raising an exception makes it clear there won't be an summary. It also makes the behaviour testable.)

Note that `SQLConf` already states that any `OutputCommitter` can be used, but that typically it's a subclass of ParquetOutputCommitter. That's not currently true. This patch will make the code consistent with the docs, adding tests to verify,

## How was this patch tested?

The patch includes a test suite, `ParquetCommitterSuite`, with a new committer, `MarkingFileOutputCommitter` which extends `FileOutputCommitter` and writes a marker file in the destination directory. The presence of the marker file can be used to verify the new committer was used. The tests then try the combinations of Parquet committer summary/no-summary and marking committer summary/no-summary.

| committer | summary | outcome |
|-----------|---------|---------|
| parquet   | true    | success |
| parquet   | false   | success |
| marking   | false   | success with marker |
| marking   | true    | exception |

All tests are happy.

Author: Steve Loughran <stevel@hortonworks.com>

Closes #19448 from steveloughran/cloud/SPARK-22217-committer.
@HyukjinKwon
Copy link
Member

I didn't backported this one respecting the JIRA issue type, Improvement but yea, it sounds more like a bug fix.

&& !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass)) {
// output summary is requested, but the class is not a Parquet Committer
logWarning(s"Committer $committerClass is not a ParquetOutputCommitter and cannot" +
s" create job summaries. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

D'oh, s ...

@gatorsmile
Copy link
Member

This is not eligible for backporting. We should not do it next time.

@HyukjinKwon
Copy link
Member

I think this is a bug to fix as the previous behaviour does not work as documented:

subclass of org.apache.hadoop.mapreduce.OutputCommitter...

and does not change existing behaviour.

Could you elaborate why you think it is not eligible?

@gatorsmile
Copy link
Member

That conf is an internal one. The end users will not see it. This is not a bug fix.

We should not extend the existing functions or introduce new behaviors/features in 2.2.x releases.

@gatorsmile
Copy link
Member

Since the risk is low, I did not revert it.

@HyukjinKwon
Copy link
Member

How come fixing the behaviour as documented is not a bug fix? I think that basically mean we don't backport fixes for things not working as documented for other internal configurations.

This does not extend the functionailities. This fixes functionalities to work as documented and expected, and I call it a bugfix.

@gatorsmile
Copy link
Member

This one starts at least since Spark 1.5. If you are not confident whether this is bug or not, please check it before merging it.

@HyukjinKwon
Copy link
Member

I did this as I was confident if it is a bug because doc says it should work but actually not, without breaking the previous support.

@gatorsmile
Copy link
Member

Ok. Next time, please check it with the committers who are familiar with Spark SQL.

@HyukjinKwon
Copy link
Member

Will check it if I am not confident next time.

@steveloughran
Copy link
Contributor Author

Thanks for reviewing this/getting it in. Personally, I had it in the "improvement" category rather than bug fix. If it wasn't for that line in the docs, there'd be no ambiguity about improve/vs fix, and there is always a lower-risk way to fix doc/code mismatch: change the docs.

But I'm grateful for it being in; with the backport to branch-2 ryan should be able to use it semi-immediately

@steveloughran
Copy link
Contributor Author

PS, for people who are interested in dynamic committers, MAPREDUCE-6823 is something to look at. It allows you to switch committers under pretty much everything other than parquet...this patch helps make Parquet manageable too

@HyukjinKwon
Copy link
Member

I guess we wouldn't change the docs in branch-2.2 alone as we have a safe fix here for this mismatch anyway. I think I just wanted to say this backport can be justified.

@gatorsmile
Copy link
Member

@steveloughran Thanks for your inputs. Totally agree on your opinions.

Spark is an infrastructure software. We have to be very careful when backporting the PRs.

@yhuai
Copy link
Contributor

yhuai commented Oct 13, 2017

@HyukjinKwon branch-2.2 is in a maintenance branch, I am not sure it is appropriate to merge this change to branch-2.2 since it is not really a bug fix. If the doc is not accurate, we should fix the doc. For a maintenance branch, we need to be very careful on what we merge and we should always avoid of unnecessary changes.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Oct 13, 2017

Okay. I am sorry for this trouble. Should we revert this if you guys feel strongly about it? I am okay with reverting it.

@rdblue
Copy link
Contributor

rdblue commented Oct 13, 2017

I have a lot of sympathy for the argument that infrastructure software shouldn't have too many backports and that those should be generally bug fixes. But, if I were working on a Spark distribution at a vendor, this is something I would definitely include because it's such a useful feature. I think that by not backporting this, we're just pushing that work downstream. Plus, the risk to adding this is low: the main behavior change is that users can specify a previously-banned committer for Parquet writes. Is it a bug fix? Probably not. But it fixes a big blocker.

@yhuai
Copy link
Contributor

yhuai commented Oct 13, 2017

I am not really worried about this particular change. It's already merged and it seems a small and safe change. I am not planning to revert it.

But, in general, let's avoid of merging changes that are not bug fixes to a maintenance branch. If there is an exception, it will be better to make it clear earlier.

@HyukjinKwon
Copy link
Member

Sure, I will and let me note it ahead next time. I made a mistake while trying to think of reasons for this backport.

@yhuai
Copy link
Contributor

yhuai commented Oct 13, 2017

Thank you :)

@steveloughran
Copy link
Contributor Author

But, if I were working on a Spark distribution at a vendor, this is something I would definitely include because it's such a useful feature.

I concur :)

MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
…tters

## What changes were proposed in this pull request?

`ParquetFileFormat` to relax its requirement of output committer class from `org.apache.parquet.hadoop.ParquetOutputCommitter` or subclass thereof (and so implicitly Hadoop `FileOutputCommitter`) to any committer implementing `org.apache.hadoop.mapreduce.OutputCommitter`

This enables output committers which don't write to the filesystem the way `FileOutputCommitter` does to save parquet data from a dataframe: at present you cannot do this.

Before a committer which isn't a subclass of `ParquetOutputCommitter`, it checks to see if the context has requested summary metadata by setting `parquet.enable.summary-metadata`. If true, and the committer class isn't a parquet committer, it raises a RuntimeException with an error message.

(It could downgrade, of course, but raising an exception makes it clear there won't be an summary. It also makes the behaviour testable.)

Note that `SQLConf` already states that any `OutputCommitter` can be used, but that typically it's a subclass of ParquetOutputCommitter. That's not currently true. This patch will make the code consistent with the docs, adding tests to verify,

## How was this patch tested?

The patch includes a test suite, `ParquetCommitterSuite`, with a new committer, `MarkingFileOutputCommitter` which extends `FileOutputCommitter` and writes a marker file in the destination directory. The presence of the marker file can be used to verify the new committer was used. The tests then try the combinations of Parquet committer summary/no-summary and marking committer summary/no-summary.

| committer | summary | outcome |
|-----------|---------|---------|
| parquet   | true    | success |
| parquet   | false   | success |
| marking   | false   | success with marker |
| marking   | true    | exception |

All tests are happy.

Author: Steve Loughran <stevel@hortonworks.com>

Closes apache#19448 from steveloughran/cloud/SPARK-22217-committer.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
9 participants